diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java index 9998856..36c1327 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java @@ -215,23 +215,39 @@ public abstract class EnhanceMessageHandler { } /** - * 构建返回队列通知 - * @param returnTopic - * @param id + * 构建返回队列通知消息 + * @param oldMessage * @param status * @return - * @param */ - protected MqRecordMessage buildReturnMessage(String returnTopic,Long id,EnumMessageStatus status){ + protected MqRecordMessage buildReturnMessage(MqRecordMessage oldMessage,EnumMessageStatus status){ MqRecordMessage mqRecordMessage = new MqRecordMessage(); - String orderReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(returnTopic); + String orderReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(oldMessage.getReturnTopic()); mqRecordMessage.setMessageKey(IdUtil.randomUUID()); mqRecordMessage.setSource(applicationName); mqRecordMessage.setTopic(orderReturnTopic); mqRecordMessage.setConsumerGroupName(orderReturnTopic); mqRecordMessage.setStatus(status); - mqRecordMessage.setId(id); + mqRecordMessage.setId(oldMessage.getId()); return mqRecordMessage; } + /** + * 返回成功通知 + * @param message + */ + protected void sendReturnSuccessMessage(MqRecordMessage message){ + MqRecordMessage returnMessage = buildReturnMessage(message, EnumMessageStatus.SUCCESS); + rocketMQEnhanceTemplate.sendAsyncMsg(returnMessage.getReturnTopic(),null,returnMessage); + } + + /** + * 返回失败通知 + * @param message + */ + protected void sendReturnFailsMessage(MqRecordMessage message){ + MqRecordMessage returnMessage = buildReturnMessage(message, EnumMessageStatus.FAILS); + rocketMQEnhanceTemplate.sendAsyncMsg(returnMessage.getReturnTopic(),null,returnMessage); + } + } \ No newline at end of file diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java index 598a48f..15833ca 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java @@ -141,8 +141,7 @@ public class GoldRewardConsumer extends EnhanceMessageHandler i //发送返回队列,告知已经处理成功,完成最终一致性 //TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免 // 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的 - MqRecordMessage mqRecordMessage = super.buildReturnMessage(message.getReturnTopic(), message.getId(), EnumMessageStatus.SUCCESS); - rocketMQEnhanceTemplate.sendAsyncMsg(message.getReturnTopic(),null,mqRecordMessage); + super.sendReturnSuccessMessage(message); } @Override @@ -150,8 +149,7 @@ public class GoldRewardConsumer extends EnhanceMessageHandler i //发送返回队列,告知已经超过最大重试次数,消费失败,需人工处理该记录 //TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免 // 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的 - MqRecordMessage mqRecordMessage = super.buildReturnMessage(message.getReturnTopic(), message.getId(), EnumMessageStatus.FAILS); - rocketMQEnhanceTemplate.sendAsyncMsg(message.getReturnTopic(),null,mqRecordMessage); + super.sendReturnFailsMessage(message); } @Override diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java index c7a995d..86e304b 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java @@ -49,8 +49,7 @@ public class VipRecordCreateConsumer extends EnhanceMessageHandler