diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/BaseMessage.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/BaseMessage.java index 91a1efa..bd41328 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/BaseMessage.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/BaseMessage.java @@ -22,10 +22,15 @@ public class BaseMessage { protected String source = ""; /** - * 消息主题 + * 推送主题 */ protected String topic; + /** + * 返回主题 + */ + protected String returnTopic; + /** * 消息tag */ diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/pay/PayMqLocalRecordMessage.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/MqRecordMessage.java similarity index 60% rename from bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/pay/PayMqLocalRecordMessage.java rename to bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/MqRecordMessage.java index ed18553..e76dddb 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/pay/PayMqLocalRecordMessage.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/MqRecordMessage.java @@ -1,7 +1,6 @@ -package com.bnyer.common.rocketmq.domain.pay; +package com.bnyer.common.rocketmq.domain; import com.bnyer.common.core.enums.EnumMessageStatus; -import com.bnyer.common.rocketmq.domain.BaseMessage; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; @@ -9,19 +8,19 @@ import lombok.Setter; /** * @author :WXC * @Date :2023/05/18 - * @description :支付服务本地消息表记录 + * @description :统一消息推送 */ @Getter @Setter @NoArgsConstructor -public class PayMqLocalRecordMessage extends BaseMessage { +public class MqRecordMessage extends BaseMessage { /** - * 主键id + * 消息记录表id */ private Long id; /** - * 消息状态 + * 消息记录表状态 */ private EnumMessageStatus status; diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/OrderMqLocalRecordMessage.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/OrderMqLocalRecordMessage.java deleted file mode 100644 index 73584bd..0000000 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/OrderMqLocalRecordMessage.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.bnyer.common.rocketmq.domain.order; - -import com.bnyer.common.core.enums.EnumMessageStatus; -import com.bnyer.common.rocketmq.domain.BaseMessage; -import com.fasterxml.jackson.annotation.JsonFormat; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; - -import java.util.Date; - -/** - * @author :WXC - * @Date :2023/05/18 - * @description :订单服务本地消息表记录 - */ -@Getter -@Setter -@NoArgsConstructor -public class OrderMqLocalRecordMessage extends BaseMessage { - /** - * 主键id - */ - private Long id; - - /** - * 消息状态 - */ - private EnumMessageStatus status; - - /** - * 消息内容 - */ - private String content; - -} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java index 2dfbf2e..9998856 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java @@ -1,24 +1,18 @@ package com.bnyer.common.rocketmq.handle; import cn.hutool.core.util.IdUtil; -import cn.hutool.core.util.ReflectUtil; import com.alibaba.fastjson.JSONObject; -import com.bnyer.common.core.constant.ServiceNameConstants; import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.rocketmq.config.RepeatConsumerConfig; import com.bnyer.common.rocketmq.constant.RocketMqConstant; -import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.domain.BaseMessage; +import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.domain.RepeatElement; -import com.bnyer.common.rocketmq.domain.img.ImgMqLocalRecordMessage; -import com.bnyer.common.rocketmq.domain.order.OrderMqLocalRecordMessage; -import com.bnyer.common.rocketmq.domain.pay.PayMqLocalRecordMessage; import com.bnyer.common.rocketmq.persist.IPersist; -import com.bnyer.common.rocketmq.strategy.RepeatConsumerStrategy; import com.bnyer.common.rocketmq.strategy.NormalRepeatStrategy; import com.bnyer.common.rocketmq.strategy.RedisRepeatStrategy; +import com.bnyer.common.rocketmq.strategy.RepeatConsumerStrategy; import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; -import com.sun.org.apache.regexp.internal.RE; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; @@ -228,41 +222,16 @@ public abstract class EnhanceMessageHandler { * @return * @param */ - protected T buildReturnMessage(String returnTopic,Long id,EnumMessageStatus status){ - switch (returnTopic){ - case RocketMqTopic.ORDER_RETURN_MSG_TOPIC: - OrderMqLocalRecordMessage orderMqLocalRecordMessage = new OrderMqLocalRecordMessage(); - String orderReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(RocketMqTopic.ORDER_RETURN_MSG_TOPIC); - orderMqLocalRecordMessage.setMessageKey(IdUtil.randomUUID()); - orderMqLocalRecordMessage.setSource(applicationName); - orderMqLocalRecordMessage.setTopic(orderReturnTopic); - orderMqLocalRecordMessage.setConsumerGroupName(orderReturnTopic); - orderMqLocalRecordMessage.setStatus(status); - orderMqLocalRecordMessage.setId(id); - return (T) orderMqLocalRecordMessage; - case RocketMqTopic.IMG_RETURN_MSG_TOPIC: - ImgMqLocalRecordMessage imgMqLocalRecordMessage = new ImgMqLocalRecordMessage(); - String imgReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(RocketMqTopic.IMG_RETURN_MSG_TOPIC); - imgMqLocalRecordMessage.setMessageKey(IdUtil.randomUUID()); - imgMqLocalRecordMessage.setSource(applicationName); - imgMqLocalRecordMessage.setTopic(imgReturnTopic); - imgMqLocalRecordMessage.setConsumerGroupName(imgReturnTopic); - imgMqLocalRecordMessage.setStatus(status); - imgMqLocalRecordMessage.setId(id); - return (T) imgMqLocalRecordMessage; - case RocketMqTopic.PAY_RETURN_MSG_TOPIC: - PayMqLocalRecordMessage payMqLocalRecordMessage = new PayMqLocalRecordMessage(); - String payReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(RocketMqTopic.PAY_RETURN_MSG_TOPIC); - payMqLocalRecordMessage.setMessageKey(IdUtil.randomUUID()); - payMqLocalRecordMessage.setSource(applicationName); - payMqLocalRecordMessage.setTopic(payReturnTopic); - payMqLocalRecordMessage.setConsumerGroupName(payReturnTopic); - payMqLocalRecordMessage.setStatus(status); - payMqLocalRecordMessage.setId(id); - return (T) payMqLocalRecordMessage; - default: - throw new RuntimeException("返回消息主题匹配有误"); - } + protected MqRecordMessage buildReturnMessage(String returnTopic,Long id,EnumMessageStatus status){ + MqRecordMessage mqRecordMessage = new MqRecordMessage(); + String orderReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(returnTopic); + mqRecordMessage.setMessageKey(IdUtil.randomUUID()); + mqRecordMessage.setSource(applicationName); + mqRecordMessage.setTopic(orderReturnTopic); + mqRecordMessage.setConsumerGroupName(orderReturnTopic); + mqRecordMessage.setStatus(status); + mqRecordMessage.setId(id); + return mqRecordMessage; } } \ No newline at end of file diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java index 7d750b4..598a48f 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java @@ -1,7 +1,6 @@ package com.bnyer.img.listener; import com.alibaba.fastjson.JSON; -import com.bnyer.common.core.constant.ServiceNameConstants; import com.bnyer.common.core.domain.FhUser; import com.bnyer.common.core.domain.GoldLog; import com.bnyer.common.core.domain.TiktokUser; @@ -10,8 +9,8 @@ import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.core.enums.GoldEnum; import com.bnyer.common.core.utils.StringUtils; import com.bnyer.common.rocketmq.constant.RocketMqTopic; +import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.domain.img.GoldRewardMessage; -import com.bnyer.common.rocketmq.domain.order.OrderMqLocalRecordMessage; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import com.bnyer.img.service.FhUserService; @@ -34,7 +33,7 @@ import javax.annotation.Resource; @Slf4j @Component @RocketMQMessageListener(topic = RocketMqTopic.GOLD_REWARD_TOPIC,consumerGroup = RocketMqTopic.GOLD_REWARD_TOPIC) -public class GoldRewardConsumer extends EnhanceMessageHandler implements RocketMQListener { +public class GoldRewardConsumer extends EnhanceMessageHandler implements RocketMQListener { @Autowired private TiktokUserService tiktokUserService; @@ -52,7 +51,7 @@ public class GoldRewardConsumer extends EnhanceMessageHandler implements RocketMQListener { +public class ImgReturnMessageConsumer extends EnhanceMessageHandler implements RocketMQListener { @Resource private ImgMqMessageRecordService imgMqMessageRecordService; @Override - public void onMessage(ImgMqLocalRecordMessage message) { + public void onMessage(MqRecordMessage message) { super.dispatchMessage(message); imgMqMessageRecordService.editMessageRecordStatus(message.getId(),message.getStatus(),null); } @Override - protected void handleMessage(ImgMqLocalRecordMessage message) throws Exception { + protected void handleMessage(MqRecordMessage message) throws Exception { } @Override - protected void handleMaxRetriesExceeded(ImgMqLocalRecordMessage message) { + protected void handleMaxRetriesExceeded(MqRecordMessage message) { } @Override - protected boolean filter(ImgMqLocalRecordMessage message) { + protected boolean filter(MqRecordMessage message) { return super.handleMsgRepeat(message); } diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java index 9196303..c7a995d 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java @@ -1,13 +1,12 @@ package com.bnyer.img.listener; import com.alibaba.fastjson.JSON; -import com.bnyer.common.core.constant.ServiceNameConstants; import com.bnyer.common.core.dto.AddUserVipRecordDto; import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.core.utils.bean.EntityConvertUtil; import com.bnyer.common.rocketmq.constant.RocketMqTopic; +import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.domain.img.AddUserVipRecordMessage; -import com.bnyer.common.rocketmq.domain.order.OrderMqLocalRecordMessage; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import com.bnyer.img.service.UserVipRecordService; @@ -27,7 +26,7 @@ import javax.annotation.Resource; @Slf4j @Component @RocketMQMessageListener(topic = RocketMqTopic.VIP_RECORD_CREATE_TOPIC,consumerGroup = RocketMqTopic.VIP_RECORD_CREATE_TOPIC) -public class VipRecordCreateConsumer extends EnhanceMessageHandler implements RocketMQListener { +public class VipRecordCreateConsumer extends EnhanceMessageHandler implements RocketMQListener { @Resource private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; @@ -36,7 +35,7 @@ public class VipRecordCreateConsumer extends EnhanceMessageHandler sendMessage = MessageBuilder.withPayload(mqLocalMessage).setHeader(RocketMQHeaders.KEYS, imgMqMessageRecord.getMessageKey()).build(); + MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(imgMqMessageRecord, MqRecordMessage.class); + mqRecordMessage.setSource(applicationName); + mqRecordMessage.setReturnTopic(RocketMqTopic.IMG_RETURN_MSG_TOPIC); + Message sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, imgMqMessageRecord.getMessageKey()).build(); SendResult sendResult; try { sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage); log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); } catch (Exception e) { - editMessageRecordStatus(imgMqMessageRecord.getId(), imgMqMessageRecord.getStatus(),e.getMessage()); log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), e.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); + editMessageRecordStatus(imgMqMessageRecord.getId(), imgMqMessageRecord.getStatus(),e.getMessage()); // throw new RuntimeException(e); } } @@ -86,9 +98,10 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService log.info("消息发送中,开始入库本地消息记录表"); ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, message); //发消息 - ImgMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(imgMqMessageRecord, ImgMqLocalRecordMessage.class); - mqLocalMessage.setSource(ServiceNameConstants.IMG_SERVICE); - rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() { + MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(imgMqMessageRecord, MqRecordMessage.class); + mqRecordMessage.setSource(applicationName); + mqRecordMessage.setReturnTopic(RocketMqTopic.IMG_RETURN_MSG_TOPIC); + rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message), JSON.toJSONString(sendResult)); @@ -97,8 +110,7 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService public void onException(Throwable throwable) { log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); - imgMqMessageRecord.setErrMsg(throwable.getMessage()); - editMessageRecordStatus(imgMqMessageRecord.getId(),imgMqMessageRecord.getStatus(),imgMqMessageRecord.getErrMsg()); + editMessageRecordStatus(imgMqMessageRecord.getId(),imgMqMessageRecord.getStatus(),throwable.getMessage()); } }); } @@ -115,9 +127,10 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService log.info("消息发送中,开始入库本地消息记录表"); ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, message); //发消息 - ImgMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(imgMqMessageRecord, ImgMqLocalRecordMessage.class); - mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE); - rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() { + MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(imgMqMessageRecord, MqRecordMessage.class); + mqRecordMessage.setSource(applicationName); + mqRecordMessage.setReturnTopic(RocketMqTopic.IMG_RETURN_MSG_TOPIC); + rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message),JSON.toJSONString(sendResult)); diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/OrderReturnMessageConsumer.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/OrderReturnMessageConsumer.java index 554caf9..f3b7de0 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/OrderReturnMessageConsumer.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/OrderReturnMessageConsumer.java @@ -1,9 +1,7 @@ package com.bnyer.order.listener; -import com.bnyer.common.core.domain.OrderMqMessageRecord; -import com.bnyer.common.core.utils.bean.EntityConvertUtil; import com.bnyer.common.rocketmq.constant.RocketMqTopic; -import com.bnyer.common.rocketmq.domain.order.OrderMqLocalRecordMessage; +import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; import com.bnyer.order.service.OrderMqMessageRecordService; import lombok.extern.slf4j.Slf4j; @@ -21,29 +19,29 @@ import javax.annotation.Resource; @Slf4j @Component @RocketMQMessageListener(topic = RocketMqTopic.ORDER_RETURN_MSG_TOPIC,consumerGroup = RocketMqTopic.ORDER_RETURN_MSG_TOPIC) -public class OrderReturnMessageConsumer extends EnhanceMessageHandler implements RocketMQListener { +public class OrderReturnMessageConsumer extends EnhanceMessageHandler implements RocketMQListener { @Resource private OrderMqMessageRecordService orderMqMessageRecordService; @Override - public void onMessage(OrderMqLocalRecordMessage message) { + public void onMessage(MqRecordMessage message) { super.dispatchMessage(message); orderMqMessageRecordService.editMessageRecordStatus(message.getId(),message.getStatus(),null); } @Override - protected void handleMessage(OrderMqLocalRecordMessage message) throws Exception { + protected void handleMessage(MqRecordMessage message) throws Exception { } @Override - protected void handleMaxRetriesExceeded(OrderMqLocalRecordMessage message) { + protected void handleMaxRetriesExceeded(MqRecordMessage message) { } @Override - protected boolean filter(OrderMqLocalRecordMessage message) { + protected boolean filter(MqRecordMessage message) { return super.handleMsgRepeat(message); } diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java index 4b9257b..8d95034 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java @@ -6,8 +6,8 @@ import com.bnyer.common.core.domain.VipOrder; import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.rocketmq.constant.RocketMqTag; import com.bnyer.common.rocketmq.constant.RocketMqTopic; +import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.domain.order.VipOrderPayNotifyMessage; -import com.bnyer.common.rocketmq.domain.pay.PayMqLocalRecordMessage; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import com.bnyer.order.mapper.VipOrderMapper; @@ -29,7 +29,7 @@ import java.util.Objects; @Component @RocketMQMessageListener(topic = RocketMqTopic.ORDER_PAY_NOTIFY_TOPIC,selectorExpression = RocketMqTag.ORDER_VIP_TAG ,consumerGroup = RocketMqTopic.ORDER_PAY_NOTIFY_TOPIC) -public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler implements RocketMQListener { +public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler implements RocketMQListener { @Resource private VipOrderMapper vipOrderMapper; @@ -41,7 +41,7 @@ public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler sendMessage = MessageBuilder.withPayload(mqLocalMessage).setHeader(RocketMQHeaders.KEYS, orderMqMessageRecord.getMessageKey()).build(); + MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(orderMqMessageRecord, MqRecordMessage.class); + mqRecordMessage.setSource(applicationName); + mqRecordMessage.setReturnTopic(RocketMqTopic.ORDER_RETURN_MSG_TOPIC); + Message sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, orderMqMessageRecord.getMessageKey()).build(); SendResult sendResult; try { sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage); log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); } catch (Exception e) { - editMessageRecordStatus(orderMqMessageRecord.getId(), orderMqMessageRecord.getStatus(),e.getMessage()); log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), e.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); + editMessageRecordStatus(orderMqMessageRecord.getId(), orderMqMessageRecord.getStatus(),e.getMessage()); // throw new RuntimeException(e); } } @@ -84,9 +97,10 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ log.info("消息发送中,开始入库本地消息记录表"); OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, message); //发消息 - OrderMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(orderMqMessageRecord, OrderMqLocalRecordMessage.class); - mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE); - rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() { + MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(orderMqMessageRecord, MqRecordMessage.class); + mqRecordMessage.setSource(applicationName); + mqRecordMessage.setReturnTopic(RocketMqTopic.ORDER_RETURN_MSG_TOPIC); + rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message), JSON.toJSONString(sendResult)); @@ -95,8 +109,7 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ public void onException(Throwable throwable) { log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); - orderMqMessageRecord.setErrMsg(throwable.getMessage()); - editMessageRecordStatus(orderMqMessageRecord.getId(),orderMqMessageRecord.getStatus(),orderMqMessageRecord.getErrMsg()); + editMessageRecordStatus(orderMqMessageRecord.getId(),orderMqMessageRecord.getStatus(),throwable.getMessage()); } }); } @@ -113,9 +126,10 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ log.info("消息发送中,开始入库本地消息记录表"); OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, message); //发消息 - OrderMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(orderMqMessageRecord, OrderMqLocalRecordMessage.class); - mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE); - rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() { + MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(orderMqMessageRecord, MqRecordMessage.class); + mqRecordMessage.setSource(applicationName); + mqRecordMessage.setReturnTopic(RocketMqTopic.ORDER_RETURN_MSG_TOPIC); + rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message),JSON.toJSONString(sendResult)); diff --git a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/listener/PayReturnMessageConsumer.java b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/listener/PayReturnMessageConsumer.java index 82f07d1..0532ba8 100644 --- a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/listener/PayReturnMessageConsumer.java +++ b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/listener/PayReturnMessageConsumer.java @@ -1,7 +1,7 @@ package com.bnyer.pay.listener; import com.bnyer.common.rocketmq.constant.RocketMqTopic; -import com.bnyer.common.rocketmq.domain.pay.PayMqLocalRecordMessage; +import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; import com.bnyer.pay.service.PayMqMessageRecordService; import lombok.extern.slf4j.Slf4j; @@ -19,29 +19,29 @@ import javax.annotation.Resource; @Slf4j @Component @RocketMQMessageListener(topic = RocketMqTopic.PAY_RETURN_MSG_TOPIC,consumerGroup = RocketMqTopic.PAY_RETURN_MSG_TOPIC) -public class PayReturnMessageConsumer extends EnhanceMessageHandler implements RocketMQListener { +public class PayReturnMessageConsumer extends EnhanceMessageHandler implements RocketMQListener { @Resource private PayMqMessageRecordService payMqMessageRecordService; @Override - public void onMessage(PayMqLocalRecordMessage message) { + public void onMessage(MqRecordMessage message) { super.dispatchMessage(message); payMqMessageRecordService.editMessageRecordStatus(message.getId(),message.getStatus(),null); } @Override - protected void handleMessage(PayMqLocalRecordMessage message) throws Exception { + protected void handleMessage(MqRecordMessage message) throws Exception { } @Override - protected void handleMaxRetriesExceeded(PayMqLocalRecordMessage message) { + protected void handleMaxRetriesExceeded(MqRecordMessage message) { } @Override - protected boolean filter(PayMqLocalRecordMessage message) { + protected boolean filter(MqRecordMessage message) { return super.handleMsgRepeat(message); } diff --git a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayMqMessageRecordServiceImpl.java b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayMqMessageRecordServiceImpl.java index 3cf0f93..3ccf2cc 100644 --- a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayMqMessageRecordServiceImpl.java +++ b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayMqMessageRecordServiceImpl.java @@ -2,13 +2,13 @@ package com.bnyer.pay.service.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; -import com.bnyer.common.core.constant.ServiceNameConstants; import com.bnyer.common.core.domain.PayMqMessageRecord; import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.core.utils.bean.EntityConvertUtil; import com.bnyer.common.core.utils.uuid.IdUtils; import com.bnyer.common.rocketmq.constant.RocketMqConstant; -import com.bnyer.common.rocketmq.domain.pay.PayMqLocalRecordMessage; +import com.bnyer.common.rocketmq.constant.RocketMqTopic; +import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import com.bnyer.pay.mapper.PayMqMessageRecordMapper; import com.bnyer.pay.service.PayMqMessageRecordService; @@ -17,11 +17,13 @@ import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; +import org.springframework.core.env.Environment; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.Date; @@ -40,6 +42,17 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService @Resource private PayMqMessageRecordMapper payMqMessageRecordMapper; + @Resource + private Environment env; + + private String applicationName; + + @PostConstruct + public void init(){ + this.applicationName = env.getProperty("spring.application.name"); + } + + /** * 发送同步步消息 * @param topic @@ -57,17 +70,18 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService log.info("消息发送中,开始入库本地消息记录表"); PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, message); //发消息 - PayMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(payMqMessageRecord, PayMqLocalRecordMessage.class); - mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE); - Message sendMessage = MessageBuilder.withPayload(mqLocalMessage).setHeader(RocketMQHeaders.KEYS, payMqMessageRecord.getMessageKey()).build(); + MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(payMqMessageRecord, MqRecordMessage.class); + mqRecordMessage.setSource(applicationName); + mqRecordMessage.setReturnTopic(RocketMqTopic.PAY_RETURN_MSG_TOPIC); + Message sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, payMqMessageRecord.getMessageKey()).build(); SendResult sendResult; try { sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage); log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); } catch (Exception e) { - editMessageRecordStatus(payMqMessageRecord.getId(), payMqMessageRecord.getStatus(),e.getMessage()); log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), e.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); + editMessageRecordStatus(payMqMessageRecord.getId(), payMqMessageRecord.getStatus(),e.getMessage()); // throw new RuntimeException(e); } } @@ -84,9 +98,10 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService log.info("消息发送中,开始入库本地消息记录表"); PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, message); //发消息 - PayMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(payMqMessageRecord, PayMqLocalRecordMessage.class); - mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE); - rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() { + MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(payMqMessageRecord, MqRecordMessage.class); + mqRecordMessage.setSource(applicationName); + mqRecordMessage.setReturnTopic(RocketMqTopic.PAY_RETURN_MSG_TOPIC); + rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message), JSON.toJSONString(sendResult)); @@ -95,8 +110,7 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService public void onException(Throwable throwable) { log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); - payMqMessageRecord.setErrMsg(throwable.getMessage()); - editMessageRecordStatus(payMqMessageRecord.getId(),payMqMessageRecord.getStatus(),payMqMessageRecord.getErrMsg()); + editMessageRecordStatus(payMqMessageRecord.getId(),payMqMessageRecord.getStatus(),throwable.getMessage()); } }); } @@ -113,9 +127,10 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService log.info("消息发送中,开始入库本地消息记录表"); PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, message); //发消息 - PayMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(payMqMessageRecord, PayMqLocalRecordMessage.class); - mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE); - rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() { + MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(payMqMessageRecord, MqRecordMessage.class); + mqRecordMessage.setSource(applicationName); + mqRecordMessage.setReturnTopic(RocketMqTopic.PAY_RETURN_MSG_TOPIC); + rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message),JSON.toJSONString(sendResult));