diff --git a/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/domain/ImgMqMessageRecord.java b/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/domain/ImgMqMessageRecord.java new file mode 100644 index 0000000..6d75e6e --- /dev/null +++ b/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/domain/ImgMqMessageRecord.java @@ -0,0 +1,19 @@ +package com.bnyer.common.core.domain; + +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +/** + * @author :WXC + * @Date :2023/05/18 + * @description :图片服务本地消息表 + */ +@Getter +@Setter +@NoArgsConstructor +@TableName(value = "img_mq_message_record") +public class ImgMqMessageRecord extends BaseMqMessage{ + +} diff --git a/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/domain/PayMqMessageRecord.java b/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/domain/PayMqMessageRecord.java new file mode 100644 index 0000000..e3a2d0c --- /dev/null +++ b/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/domain/PayMqMessageRecord.java @@ -0,0 +1,19 @@ +package com.bnyer.common.core.domain; + +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +/** + * @author :WXC + * @Date :2023/05/18 + * @description :支付服务本地消息表 + */ +@Getter +@Setter +@NoArgsConstructor +@TableName(value = "pay_mq_message_record") +public class PayMqMessageRecord extends BaseMqMessage{ + +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqConstant.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqConstant.java index a124c4b..dff2dd9 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqConstant.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqConstant.java @@ -7,6 +7,11 @@ package com.bnyer.common.rocketmq.constant; */ public class RocketMqConstant { + /** + * 发送超时时间 + */ + public final static int TIME_OUT = 10000; + /** * 数据来源前缀 */ diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqTopic.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqTopic.java index 28c4146..18a4004 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqTopic.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/constant/RocketMqTopic.java @@ -10,7 +10,15 @@ public class RocketMqTopic { /** * 订单服务本地消息表记录返回队列 */ - public static final String ORDER_RETURN_MSG_TOPIC = "order-rerun-msg-topic"; + public static final String ORDER_RETURN_MSG_TOPIC = "order-return-msg-topic"; + /** + * 图片服务本地消息表记录返回队列 + */ + public static final String IMG_RETURN_MSG_TOPIC = "img-return-msg-topic"; + /** + * 支付服务本地消息表记录返回队列 + */ + public static final String PAY_RETURN_MSG_TOPIC = "pay-return-msg-topic"; /** * 订单取消 diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/img/GoldRewardMessage.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/img/GoldRewardMessage.java index 4bcb198..846f902 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/img/GoldRewardMessage.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/img/GoldRewardMessage.java @@ -1,14 +1,10 @@ package com.bnyer.common.rocketmq.domain.img; -import com.bnyer.common.rocketmq.domain.BaseMessage; -import com.fasterxml.jackson.annotation.JsonFormat; import io.swagger.annotations.ApiModelProperty; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; -import java.util.Date; - /** * @author :WXC * @Date :2023/05/17 diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/img/ImgMqLocalRecordMessage.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/img/ImgMqLocalRecordMessage.java new file mode 100644 index 0000000..896f621 --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/img/ImgMqLocalRecordMessage.java @@ -0,0 +1,33 @@ +package com.bnyer.common.rocketmq.domain.img; + +import com.bnyer.common.core.enums.EnumMessageStatus; +import com.bnyer.common.rocketmq.domain.BaseMessage; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +/** + * @author :WXC + * @Date :2023/05/18 + * @description :图片服务本地消息表记录 + */ +@Getter +@Setter +@NoArgsConstructor +public class ImgMqLocalRecordMessage extends BaseMessage { + /** + * 主键id + */ + private Long id; + + /** + * 消息状态 + */ + private EnumMessageStatus status; + + /** + * 消息内容 + */ + private String content; + +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/OrderMqLocalRecordMessage.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/OrderMqLocalRecordMessage.java index 198e508..73584bd 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/OrderMqLocalRecordMessage.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/OrderMqLocalRecordMessage.java @@ -18,6 +18,9 @@ import java.util.Date; @Setter @NoArgsConstructor public class OrderMqLocalRecordMessage extends BaseMessage { + /** + * 主键id + */ private Long id; /** diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/VipOrderPayNotifyMessage.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/VipOrderPayNotifyMessage.java index bcfb90d..94656bb 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/VipOrderPayNotifyMessage.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/VipOrderPayNotifyMessage.java @@ -1,6 +1,5 @@ package com.bnyer.common.rocketmq.domain.order; -import com.bnyer.common.rocketmq.domain.BaseMessage; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; @@ -13,7 +12,7 @@ import lombok.Setter; @Getter @Setter @NoArgsConstructor -public class VipOrderPayNotifyMessage extends BaseMessage { +public class VipOrderPayNotifyMessage { /** * 订单号 */ diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/pay/PayMqLocalRecordMessage.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/pay/PayMqLocalRecordMessage.java new file mode 100644 index 0000000..ed18553 --- /dev/null +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/pay/PayMqLocalRecordMessage.java @@ -0,0 +1,33 @@ +package com.bnyer.common.rocketmq.domain.pay; + +import com.bnyer.common.core.enums.EnumMessageStatus; +import com.bnyer.common.rocketmq.domain.BaseMessage; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +/** + * @author :WXC + * @Date :2023/05/18 + * @description :支付服务本地消息表记录 + */ +@Getter +@Setter +@NoArgsConstructor +public class PayMqLocalRecordMessage extends BaseMessage { + /** + * 主键id + */ + private Long id; + + /** + * 消息状态 + */ + private EnumMessageStatus status; + + /** + * 消息内容 + */ + private String content; + +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java index f39c4f2..2dfbf2e 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java @@ -1,19 +1,24 @@ package com.bnyer.common.rocketmq.handle; import cn.hutool.core.util.IdUtil; +import cn.hutool.core.util.ReflectUtil; import com.alibaba.fastjson.JSONObject; import com.bnyer.common.core.constant.ServiceNameConstants; import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.rocketmq.config.RepeatConsumerConfig; import com.bnyer.common.rocketmq.constant.RocketMqConstant; +import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.domain.BaseMessage; import com.bnyer.common.rocketmq.domain.RepeatElement; +import com.bnyer.common.rocketmq.domain.img.ImgMqLocalRecordMessage; import com.bnyer.common.rocketmq.domain.order.OrderMqLocalRecordMessage; +import com.bnyer.common.rocketmq.domain.pay.PayMqLocalRecordMessage; import com.bnyer.common.rocketmq.persist.IPersist; import com.bnyer.common.rocketmq.strategy.RepeatConsumerStrategy; import com.bnyer.common.rocketmq.strategy.NormalRepeatStrategy; import com.bnyer.common.rocketmq.strategy.RedisRepeatStrategy; import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; +import com.sun.org.apache.regexp.internal.RE; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; @@ -45,6 +50,8 @@ public abstract class EnhanceMessageHandler { @Resource private Environment env ; + private String applicationName; + private RepeatConsumerConfig repeatConsumerConfig; @Resource @@ -55,7 +62,8 @@ public abstract class EnhanceMessageHandler { @PostConstruct public void init(){ - this.repeatConsumerConfig = RepeatConsumerConfig.enableRedisConfig(env.getProperty("spring.application.name"),stringRedisTemplate); + this.applicationName = env.getProperty("spring.application.name"); + this.repeatConsumerConfig = RepeatConsumerConfig.enableRedisConfig(applicationName,stringRedisTemplate); } /** @@ -214,16 +222,47 @@ public abstract class EnhanceMessageHandler { /** * 构建返回队列通知 - * @param success - * @param message + * @param returnTopic + * @param id + * @param status + * @return + * @param */ - protected OrderMqLocalRecordMessage buildRerunMessage(EnumMessageStatus success, OrderMqLocalRecordMessage message) { - OrderMqLocalRecordMessage orderMqLocalRecordMessage = new OrderMqLocalRecordMessage(); - orderMqLocalRecordMessage.setStatus(success); - orderMqLocalRecordMessage.setId(message.getId()); - orderMqLocalRecordMessage.setSource(ServiceNameConstants.IMG_SERVICE); - orderMqLocalRecordMessage.setMessageKey(IdUtil.randomUUID()); - return orderMqLocalRecordMessage; + protected T buildReturnMessage(String returnTopic,Long id,EnumMessageStatus status){ + switch (returnTopic){ + case RocketMqTopic.ORDER_RETURN_MSG_TOPIC: + OrderMqLocalRecordMessage orderMqLocalRecordMessage = new OrderMqLocalRecordMessage(); + String orderReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(RocketMqTopic.ORDER_RETURN_MSG_TOPIC); + orderMqLocalRecordMessage.setMessageKey(IdUtil.randomUUID()); + orderMqLocalRecordMessage.setSource(applicationName); + orderMqLocalRecordMessage.setTopic(orderReturnTopic); + orderMqLocalRecordMessage.setConsumerGroupName(orderReturnTopic); + orderMqLocalRecordMessage.setStatus(status); + orderMqLocalRecordMessage.setId(id); + return (T) orderMqLocalRecordMessage; + case RocketMqTopic.IMG_RETURN_MSG_TOPIC: + ImgMqLocalRecordMessage imgMqLocalRecordMessage = new ImgMqLocalRecordMessage(); + String imgReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(RocketMqTopic.IMG_RETURN_MSG_TOPIC); + imgMqLocalRecordMessage.setMessageKey(IdUtil.randomUUID()); + imgMqLocalRecordMessage.setSource(applicationName); + imgMqLocalRecordMessage.setTopic(imgReturnTopic); + imgMqLocalRecordMessage.setConsumerGroupName(imgReturnTopic); + imgMqLocalRecordMessage.setStatus(status); + imgMqLocalRecordMessage.setId(id); + return (T) imgMqLocalRecordMessage; + case RocketMqTopic.PAY_RETURN_MSG_TOPIC: + PayMqLocalRecordMessage payMqLocalRecordMessage = new PayMqLocalRecordMessage(); + String payReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(RocketMqTopic.PAY_RETURN_MSG_TOPIC); + payMqLocalRecordMessage.setMessageKey(IdUtil.randomUUID()); + payMqLocalRecordMessage.setSource(applicationName); + payMqLocalRecordMessage.setTopic(payReturnTopic); + payMqLocalRecordMessage.setConsumerGroupName(payReturnTopic); + payMqLocalRecordMessage.setStatus(status); + payMqLocalRecordMessage.setId(id); + return (T) payMqLocalRecordMessage; + default: + throw new RuntimeException("返回消息主题匹配有误"); + } } } \ No newline at end of file diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/RedisPersist.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/RedisPersist.java index 6345d27..d1b96a0 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/RedisPersist.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/persist/RedisPersist.java @@ -1,6 +1,7 @@ package com.bnyer.common.rocketmq.persist; +import com.bnyer.common.rocketmq.constant.RocketMqRepeatConstant; import com.bnyer.common.rocketmq.domain.BaseMessage; import com.bnyer.common.rocketmq.domain.RepeatElement; import org.apache.commons.lang3.StringUtils; @@ -11,8 +12,6 @@ import org.springframework.data.redis.core.types.Expiration; import java.util.concurrent.TimeUnit; -import static com.bnyer.common.rocketmq.constant.RocketMqRepeatConstant.*; - /** * @author :WXC * @Date :2023/05/20 @@ -34,7 +33,7 @@ public class RedisPersist implements IPersist { String repeatKey = buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey()); //setnx, 成功就可以消费 Boolean execute = redisTemplate.execute((RedisCallback) redisConnection -> redisConnection.set(repeatKey.getBytes(), - (CONSUME_STATUS_CONSUMING).getBytes(), Expiration.milliseconds(processingExpireMilliSeconds), RedisStringCommands.SetOption.SET_IF_ABSENT)); + (RocketMqRepeatConstant.CONSUME_STATUS_CONSUMING).getBytes(), Expiration.milliseconds(processingExpireMilliSeconds), RedisStringCommands.SetOption.SET_IF_ABSENT)); if (execute == null) { return false; } @@ -50,7 +49,7 @@ public class RedisPersist implements IPersist { @Override public void markConsumed(RepeatElement repeatElement, long recordReserveMinutes) { String repeatKey = buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey()); - redisTemplate.opsForValue().set(repeatKey, CONSUME_STATUS_SUCCESS, recordReserveMinutes, TimeUnit.MINUTES); + redisTemplate.opsForValue().set(repeatKey, RocketMqRepeatConstant.CONSUME_STATUS_SUCCESS, recordReserveMinutes, TimeUnit.MINUTES); } @Override @@ -73,7 +72,7 @@ public class RedisPersist implements IPersist { * @return */ private String buildRepeatMessageRedisKey(String applicationName, String topic, String tag, String messageKey) { - String prefix = REPEAT_REDIS_KEY_PREFIX + applicationName + ":" + topic + (StringUtils.isNotEmpty(tag) ? ":"+ tag :""); + String prefix = RocketMqRepeatConstant.REPEAT_REDIS_KEY_PREFIX + applicationName + ":" + topic + (StringUtils.isNotEmpty(tag) ? ":"+ tag :""); return prefix + ":" + messageKey; } diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java index 227021c..7d750b4 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java @@ -1,14 +1,19 @@ package com.bnyer.img.listener; +import com.alibaba.fastjson.JSON; +import com.bnyer.common.core.constant.ServiceNameConstants; import com.bnyer.common.core.domain.FhUser; import com.bnyer.common.core.domain.GoldLog; import com.bnyer.common.core.domain.TiktokUser; import com.bnyer.common.core.domain.WxUser; +import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.core.enums.GoldEnum; import com.bnyer.common.core.utils.StringUtils; import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.domain.img.GoldRewardMessage; +import com.bnyer.common.rocketmq.domain.order.OrderMqLocalRecordMessage; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; +import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import com.bnyer.img.service.FhUserService; import com.bnyer.img.service.GoldLogService; import com.bnyer.img.service.TiktokUserService; @@ -19,6 +24,8 @@ import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import javax.annotation.Resource; + /** * @author :penny * @Date :2023/05/17 @@ -27,7 +34,7 @@ import org.springframework.stereotype.Component; @Slf4j @Component @RocketMQMessageListener(topic = RocketMqTopic.GOLD_REWARD_TOPIC,consumerGroup = RocketMqTopic.GOLD_REWARD_TOPIC) -public class GoldRewardConsumer extends EnhanceMessageHandler implements RocketMQListener { +public class GoldRewardConsumer extends EnhanceMessageHandler implements RocketMQListener { @Autowired private TiktokUserService tiktokUserService; @@ -41,9 +48,14 @@ public class GoldRewardConsumer extends EnhanceMessageHandler @Autowired private GoldLogService goldLogService; + @Resource + private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; + @Override - public void onMessage(GoldRewardMessage goldReward) { - super.dispatchMessage(goldReward); + public void onMessage(OrderMqLocalRecordMessage message) { + super.dispatchMessage(message); + String content = message.getContent(); + GoldRewardMessage goldReward = JSON.parseObject(content, GoldRewardMessage.class); if(StringUtils.isNotNull(goldReward.getPlatform())){ switch (goldReward.getPlatform()){ case "0": @@ -126,17 +138,25 @@ public class GoldRewardConsumer extends EnhanceMessageHandler } @Override - protected void handleMessage(GoldRewardMessage message) throws Exception { - + protected void handleMessage(OrderMqLocalRecordMessage message) throws Exception { + //发送返回队列,告知已经处理成功,完成最终一致性 + //TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免 + // 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的 + OrderMqLocalRecordMessage orderMqLocalRecordMessage = super.buildReturnMessage(RocketMqTopic.ORDER_RETURN_MSG_TOPIC, message.getId(), EnumMessageStatus.SUCCESS); + rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.IMG_RETURN_MSG_TOPIC,null,orderMqLocalRecordMessage); } @Override - protected void handleMaxRetriesExceeded(GoldRewardMessage message) { - log.error("消息消费失败,可扩展执行后续处理"); + protected void handleMaxRetriesExceeded(OrderMqLocalRecordMessage message) { + //发送返回队列,告知已经超过最大重试次数,消费失败,需人工处理该记录 + //TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免 + // 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的 + OrderMqLocalRecordMessage orderMqLocalRecordMessage = super.buildReturnMessage(RocketMqTopic.ORDER_RETURN_MSG_TOPIC, message.getId(), EnumMessageStatus.FAILS); + rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.IMG_RETURN_MSG_TOPIC,null,orderMqLocalRecordMessage); } @Override - protected boolean filter(GoldRewardMessage message) { + protected boolean filter(OrderMqLocalRecordMessage message) { return super.handleMsgRepeat(message); } diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/ImgReturnMessageConsumer.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/ImgReturnMessageConsumer.java new file mode 100644 index 0000000..6f2e651 --- /dev/null +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/ImgReturnMessageConsumer.java @@ -0,0 +1,60 @@ +package com.bnyer.img.listener; + +import com.bnyer.common.core.constant.ServiceNameConstants; +import com.bnyer.common.core.enums.EnumMessageStatus; +import com.bnyer.common.rocketmq.constant.RocketMqTopic; +import com.bnyer.common.rocketmq.domain.img.ImgMqLocalRecordMessage; +import com.bnyer.common.rocketmq.domain.pay.PayMqLocalRecordMessage; +import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; +import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; +import com.bnyer.img.service.ImgMqMessageRecordService; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * @author :WXC + * @Date :2023/05/20 + * @description : + */ +@Slf4j +@Component +@RocketMQMessageListener(topic = RocketMqTopic.IMG_RETURN_MSG_TOPIC,consumerGroup = RocketMqTopic.IMG_RETURN_MSG_TOPIC) +public class ImgReturnMessageConsumer extends EnhanceMessageHandler implements RocketMQListener { + + @Resource + private ImgMqMessageRecordService imgMqMessageRecordService; + + @Override + public void onMessage(ImgMqLocalRecordMessage message) { + super.dispatchMessage(message); + imgMqMessageRecordService.editMessageRecordStatus(message.getId(),message.getStatus(),null); + } + + @Override + protected void handleMessage(ImgMqLocalRecordMessage message) throws Exception { + } + + @Override + protected void handleMaxRetriesExceeded(ImgMqLocalRecordMessage message) { + } + + @Override + protected boolean filter(ImgMqLocalRecordMessage message) { + return super.handleMsgRepeat(message); + } + + @Override + protected boolean isRetry() { + return true; + } + + @Override + protected boolean throwException() { + return false; + } + +} diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java index da33d57..9196303 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java @@ -1,12 +1,10 @@ package com.bnyer.img.listener; -import cn.hutool.core.util.IdUtil; import com.alibaba.fastjson.JSON; import com.bnyer.common.core.constant.ServiceNameConstants; import com.bnyer.common.core.dto.AddUserVipRecordDto; import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.core.utils.bean.EntityConvertUtil; -import com.bnyer.common.core.utils.uuid.IdUtils; import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.domain.img.AddUserVipRecordMessage; import com.bnyer.common.rocketmq.domain.order.OrderMqLocalRecordMessage; @@ -52,8 +50,8 @@ public class VipRecordCreateConsumer extends EnhanceMessageHandler { + /** + * 更新状态 + * @param status + * @return + */ + Integer updateStatusByStatus(@Param("id") Long id, @Param("status") EnumMessageStatus status, @Param("errMsg") String errMsg); +} diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/ImgMqMessageRecordService.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/ImgMqMessageRecordService.java new file mode 100644 index 0000000..3a1d838 --- /dev/null +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/ImgMqMessageRecordService.java @@ -0,0 +1,62 @@ +package com.bnyer.img.service; + +import com.bnyer.common.core.domain.ImgMqMessageRecord; +import com.bnyer.common.core.enums.EnumMessageStatus; + +/** + * @author :WXC + * @Date :2023/05/20 + * @description :图片服务本地消息service层 + */ +public interface ImgMqMessageRecordService { + + /** + * 发送同步步消息 + * @param topic + * @param tag + * @param message + * @param + */ + void send(String topic, String tag, T message); + + /** + * 发送异步消息 + * @param topic + * @param tag + * @param message + * @param + */ + void sendAsyncMsg(String topic, String tag, T message); + + /** + * 发送异步延时消息 + * 废弃:延时消息不走消息表,因为延时消息最少支持秒级延时,定时任务不可能对消息表进行秒级扫描,那样性能损耗太大了 + * 所以确保消息一定发送成功,走同步发送,如果发送失败直接抛异常,确保本地事物回滚 + * @param topic + * @param tag + * @param message + * @param delayLevel + * @param + */ + @Deprecated + void sendAsyncMsg(String topic, String tag, T message, int delayLevel); + + /** + * 保存消息记录 + * @param topic + * @param tag + * @param message + * @return + * @param + */ + ImgMqMessageRecord saveMessageRecord(String topic, String tag, T message); + + /** + * 修改消息记录状态 + * @param id + * @param status + * @param errMsg + */ + void editMessageRecordStatus(Long id, EnumMessageStatus status,String errMsg); + +} diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/CreatorProfitServiceImpl.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/CreatorProfitServiceImpl.java index 290d105..47b4b6c 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/CreatorProfitServiceImpl.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/CreatorProfitServiceImpl.java @@ -20,6 +20,7 @@ import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import com.bnyer.img.mapper.CreatorProfitMapper; import com.bnyer.img.mapper.InviteLogMapper; import com.bnyer.img.service.CreatorProfitService; +import com.bnyer.img.service.ImgMqMessageRecordService; import com.bnyer.img.service.TiktokImgService; import com.bnyer.img.vo.*; import lombok.extern.slf4j.Slf4j; @@ -56,6 +57,9 @@ public class CreatorProfitServiceImpl implements CreatorProfitService { @Resource private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; + @Resource + private ImgMqMessageRecordService mqMessageRecordService; + @Override public boolean checkCreatorProfitExist(String mark) { CreatorProfit creatorProfit = creatorProfitMapper.checkCreatorProfitExist(mark); @@ -135,7 +139,8 @@ public class CreatorProfitServiceImpl implements CreatorProfitService { //发送下图画意值奖励并写入记录消息 GoldRewardMessage msg = buildGoldRewardMsg(params.getUserId(), 2, GoldEnum.DOWNLOAD_IMG.getCode(), params.getPlatform(), null); - rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.GOLD_REWARD_TOPIC,null,msg); +// rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.GOLD_REWARD_TOPIC,null,msg); + mqMessageRecordService.sendAsyncMsg(RocketMqTopic.GOLD_REWARD_TOPIC,null,msg); return insert; } @@ -150,8 +155,6 @@ public class CreatorProfitServiceImpl implements CreatorProfitService { */ private GoldRewardMessage buildGoldRewardMsg(Long userId, int goldNum, String goldCode, String platform, Integer userClientType) { GoldRewardMessage message = new GoldRewardMessage(); - message.setMessageKey(IdUtils.randomUUID()); - message.setSource(ServiceNameConstants.IMG_SERVICE); message.setUserId(userId); message.setGoldNum(goldNum); message.setGoldCode(goldCode); diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/ImgMqMessageRecordServiceImpl.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/ImgMqMessageRecordServiceImpl.java new file mode 100644 index 0000000..5735133 --- /dev/null +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/ImgMqMessageRecordServiceImpl.java @@ -0,0 +1,164 @@ +package com.bnyer.img.service.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.bnyer.common.core.constant.ServiceNameConstants; +import com.bnyer.common.core.domain.ImgMqMessageRecord; +import com.bnyer.common.core.enums.EnumMessageStatus; +import com.bnyer.common.core.utils.bean.EntityConvertUtil; +import com.bnyer.common.core.utils.uuid.IdUtils; +import com.bnyer.common.rocketmq.constant.RocketMqConstant; +import com.bnyer.common.rocketmq.domain.img.ImgMqLocalRecordMessage; +import com.bnyer.common.rocketmq.domain.img.ImgMqLocalRecordMessage; +import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; +import com.bnyer.img.mapper.ImgMqMessageRecordMapper; +import com.bnyer.img.service.ImgMqMessageRecordService; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.apache.rocketmq.spring.support.RocketMQHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import javax.annotation.Resource; +import java.util.Date; + + +/** + * @author :WXC + * @Date :2023/05/19 + * @description : + */ +@Slf4j +@Service +public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService { + + @Resource + private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; + + @Resource + private ImgMqMessageRecordMapper imgMqMessageRecordMapper; + + /** + * 发送同步步消息 + * @param topic + * @param tag + * @param message + * @param + */ + @Transactional + @Override + public void send(String topic, String tag, T message) { + // 设置业务键,此处根据公共的参数进行处理 + // 更多的其它基础业务处理... + RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); + //保存消息记录 + log.info("消息发送中,开始入库本地消息记录表"); + ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, message); + //发消息 + ImgMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(imgMqMessageRecord, ImgMqLocalRecordMessage.class); + mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE); + Message sendMessage = MessageBuilder.withPayload(mqLocalMessage).setHeader(RocketMQHeaders.KEYS, imgMqMessageRecord.getMessageKey()).build(); + SendResult sendResult; + try { + sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage); + log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); + } catch (Exception e) { + editMessageRecordStatus(imgMqMessageRecord.getId(), imgMqMessageRecord.getStatus(),e.getMessage()); + log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), e.getMessage()); + log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); +// throw new RuntimeException(e); + } + } + + /** + * 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) + * (适合对响应时间敏感的业务场景) + */ + @Transactional + @Override + public void sendAsyncMsg(String topic, String tag, T message) { + RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); + //保存消息记录 + log.info("消息发送中,开始入库本地消息记录表"); + ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, message); + //发消息 + ImgMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(imgMqMessageRecord, ImgMqLocalRecordMessage.class); + mqLocalMessage.setSource(ServiceNameConstants.IMG_SERVICE); + rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message), JSON.toJSONString(sendResult)); + } + @Override + public void onException(Throwable throwable) { + log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); + log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); + imgMqMessageRecord.setErrMsg(throwable.getMessage()); + editMessageRecordStatus(imgMqMessageRecord.getId(),imgMqMessageRecord.getStatus(),imgMqMessageRecord.getErrMsg()); + } + }); + } + + /** + * 发送异步延时消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) + * (适合对响应时间敏感的业务场景) + */ + @Transactional + @Override + public void sendAsyncMsg(String topic, String tag, T message, int delayLevel) { + RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); + //保存消息记录 + log.info("消息发送中,开始入库本地消息记录表"); + ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, message); + //发消息 + ImgMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(imgMqMessageRecord, ImgMqLocalRecordMessage.class); + mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE); + rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message),JSON.toJSONString(sendResult)); + } + @Override + public void onException(Throwable throwable) { + log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); + log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); + editMessageRecordStatus(imgMqMessageRecord.getId(),imgMqMessageRecord.getStatus(),throwable.getMessage()); + } + }, RocketMqConstant.TIME_OUT,delayLevel); + } + + /** + * 添加消息记录 + * @param message + * @param + */ + @Transactional + public ImgMqMessageRecord saveMessageRecord(String topic, String tag, T message){ + topic = rocketMQEnhanceTemplate.reBuildTopic(topic); + ImgMqMessageRecord imgMqMessageRecord = new ImgMqMessageRecord(); + imgMqMessageRecord.setStatus(EnumMessageStatus.PROCESS); + imgMqMessageRecord.setConsumerGroupName(topic); + imgMqMessageRecord.setTopic(topic); + imgMqMessageRecord.setTag(tag); + imgMqMessageRecord.setCreateTime(new Date()); + imgMqMessageRecord.setMessageKey(IdUtils.randomUUID()); + imgMqMessageRecord.setContent(JSON.toJSONString(message)); + imgMqMessageRecordMapper.insert(imgMqMessageRecord); + return imgMqMessageRecord; + } + + /** + * 修改消息记录状态 + * @param id + * @param status + */ + @Transactional + public void editMessageRecordStatus(Long id,EnumMessageStatus status,String errMsg){ + imgMqMessageRecordMapper.updateStatusByStatus(id,status,errMsg); + } + +} diff --git a/bnyer-services/bnyer-img/src/main/resources/com/bnyer/img/mapper/ImgMqMessageRecordMapper.xml b/bnyer-services/bnyer-img/src/main/resources/com/bnyer/img/mapper/ImgMqMessageRecordMapper.xml new file mode 100644 index 0000000..ef4a915 --- /dev/null +++ b/bnyer-services/bnyer-img/src/main/resources/com/bnyer/img/mapper/ImgMqMessageRecordMapper.xml @@ -0,0 +1,37 @@ + + + + + + + + + + + + + + + + + + t.id, + t.message_key, + t.topic, + t.tag, + t.consumer_group_name, + t.status, + t.err_msg, + t.content, + t.create_time, + + + update img_mq_message_record + set status = #{status} + + ,err_msg = #{errMsg} + + where id = #{id} + + + diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java index 9b44878..4b9257b 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java @@ -1,11 +1,15 @@ package com.bnyer.order.listener.vip; +import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.bnyer.common.core.domain.VipOrder; +import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.rocketmq.constant.RocketMqTag; import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.domain.order.VipOrderPayNotifyMessage; +import com.bnyer.common.rocketmq.domain.pay.PayMqLocalRecordMessage; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; +import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import com.bnyer.order.mapper.VipOrderMapper; import com.bnyer.order.service.VipOrderService; import lombok.extern.slf4j.Slf4j; @@ -25,7 +29,7 @@ import java.util.Objects; @Component @RocketMQMessageListener(topic = RocketMqTopic.ORDER_PAY_NOTIFY_TOPIC,selectorExpression = RocketMqTag.ORDER_VIP_TAG ,consumerGroup = RocketMqTopic.ORDER_PAY_NOTIFY_TOPIC) -public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler implements RocketMQListener { +public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler implements RocketMQListener { @Resource private VipOrderMapper vipOrderMapper; @@ -33,11 +37,16 @@ public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler().eq(VipOrder::getOrderNo, orderNo)); if (Objects.isNull(vipOrder)){ log.error("订单不存在,订单号:{}",orderNo); @@ -48,17 +57,25 @@ public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler implements RocketMQListener { + + @Resource + private PayMqMessageRecordService payMqMessageRecordService; + + @Override + public void onMessage(PayMqLocalRecordMessage message) { + super.dispatchMessage(message); + payMqMessageRecordService.editMessageRecordStatus(message.getId(),message.getStatus(),null); + } + + @Override + protected void handleMessage(PayMqLocalRecordMessage message) throws Exception { + + } + + @Override + protected void handleMaxRetriesExceeded(PayMqLocalRecordMessage message) { + + } + + @Override + protected boolean filter(PayMqLocalRecordMessage message) { + return super.handleMsgRepeat(message); + } + + @Override + protected boolean isRetry() { + return true; + } + + @Override + protected boolean throwException() { + return false; + } + +} diff --git a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/mapper/PayMqMessageRecordMapper.java b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/mapper/PayMqMessageRecordMapper.java new file mode 100644 index 0000000..c77e9e5 --- /dev/null +++ b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/mapper/PayMqMessageRecordMapper.java @@ -0,0 +1,22 @@ +package com.bnyer.pay.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.bnyer.common.core.domain.OrderMqMessageRecord; +import com.bnyer.common.core.domain.PayMqMessageRecord; +import com.bnyer.common.core.enums.EnumMessageStatus; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +/** + * @author :WXC + * @description : + */ +@Mapper +public interface PayMqMessageRecordMapper extends BaseMapper { + /** + * 更新状态 + * @param status + * @return + */ + Integer updateStatusByStatus(@Param("id") Long id, @Param("status") EnumMessageStatus status, @Param("errMsg") String errMsg); +} diff --git a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/PayMqMessageRecordService.java b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/PayMqMessageRecordService.java new file mode 100644 index 0000000..99dac52 --- /dev/null +++ b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/PayMqMessageRecordService.java @@ -0,0 +1,62 @@ +package com.bnyer.pay.service; + +import com.bnyer.common.core.domain.PayMqMessageRecord; +import com.bnyer.common.core.enums.EnumMessageStatus; + +/** + * @author :WXC + * @Date :2023/05/20 + * @description :支付服务本地消息service层 + */ +public interface PayMqMessageRecordService { + + /** + * 发送同步步消息 + * @param topic + * @param tag + * @param message + * @param + */ + void send(String topic, String tag, T message); + + /** + * 发送异步消息 + * @param topic + * @param tag + * @param message + * @param + */ + void sendAsyncMsg(String topic, String tag, T message); + + /** + * 发送异步延时消息 + * 废弃:延时消息不走消息表,因为延时消息最少支持秒级延时,定时任务不可能对消息表进行秒级扫描,那样性能损耗太大了 + * 所以确保消息一定发送成功,走同步发送,如果发送失败直接抛异常,确保本地事物回滚 + * @param topic + * @param tag + * @param message + * @param delayLevel + * @param + */ + @Deprecated + void sendAsyncMsg(String topic, String tag, T message, int delayLevel); + + /** + * 保存消息记录 + * @param topic + * @param tag + * @param message + * @return + * @param + */ + PayMqMessageRecord saveMessageRecord(String topic, String tag, T message); + + /** + * 修改消息记录状态 + * @param id + * @param status + * @param errMsg + */ + void editMessageRecordStatus(Long id, EnumMessageStatus status,String errMsg); + +} diff --git a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java index b1e0ec6..e011df4 100644 --- a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java +++ b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java @@ -19,11 +19,13 @@ import com.bnyer.pay.bean.dto.EditPayInfoSingleDto; import com.bnyer.pay.bean.vo.PayInfoDetailsVo; import com.bnyer.pay.mapper.PayInfoMapper; import com.bnyer.pay.service.PayInfoService; +import com.bnyer.pay.service.PayMqMessageRecordService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import javax.annotation.Resource; import java.util.Objects; /** @@ -40,6 +42,9 @@ public class PayInfoServiceImpl implements PayInfoService { @Autowired private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; + @Resource + private PayMqMessageRecordService mqMessageRecordService; + /** * 添加支付订单 * @param dto @@ -81,12 +86,10 @@ public class PayInfoServiceImpl implements PayInfoService { //会员充值场景 case VIP_RECHARGE: // 发送消息,订单支付成功 - // TODO: 2023/05/20 暂时使用普通消息发送,待修改成本地消息表模式 VipOrderPayNotifyMessage vipOrderPayNotifyMessage = new VipOrderPayNotifyMessage(); - vipOrderPayNotifyMessage.setMessageKey(IdUtils.randomUUID()); - vipOrderPayNotifyMessage.setSource(ServiceNameConstants.PAY_SERVICE); vipOrderPayNotifyMessage.setOrderNo(orderNo); - rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.ORDER_PAY_NOTIFY_TOPIC, RocketMqTag.ORDER_VIP_TAG, vipOrderPayNotifyMessage); +// rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.ORDER_PAY_NOTIFY_TOPIC, RocketMqTag.ORDER_VIP_TAG, vipOrderPayNotifyMessage); + mqMessageRecordService.sendAsyncMsg(RocketMqTopic.ORDER_PAY_NOTIFY_TOPIC, RocketMqTag.ORDER_VIP_TAG, vipOrderPayNotifyMessage); break; } } diff --git a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayMqMessageRecordServiceImpl.java b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayMqMessageRecordServiceImpl.java new file mode 100644 index 0000000..3cf0f93 --- /dev/null +++ b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayMqMessageRecordServiceImpl.java @@ -0,0 +1,162 @@ +package com.bnyer.pay.service.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.bnyer.common.core.constant.ServiceNameConstants; +import com.bnyer.common.core.domain.PayMqMessageRecord; +import com.bnyer.common.core.enums.EnumMessageStatus; +import com.bnyer.common.core.utils.bean.EntityConvertUtil; +import com.bnyer.common.core.utils.uuid.IdUtils; +import com.bnyer.common.rocketmq.constant.RocketMqConstant; +import com.bnyer.common.rocketmq.domain.pay.PayMqLocalRecordMessage; +import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; +import com.bnyer.pay.mapper.PayMqMessageRecordMapper; +import com.bnyer.pay.service.PayMqMessageRecordService; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.apache.rocketmq.spring.support.RocketMQHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import javax.annotation.Resource; +import java.util.Date; + +/** + * @author :WXC + * @Date :2023/05/19 + * @description : + */ +@Slf4j +@Service +public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService { + + @Resource + private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; + + @Resource + private PayMqMessageRecordMapper payMqMessageRecordMapper; + + /** + * 发送同步步消息 + * @param topic + * @param tag + * @param message + * @param + */ + @Transactional + @Override + public void send(String topic, String tag, T message) { + // 设置业务键,此处根据公共的参数进行处理 + // 更多的其它基础业务处理... + RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); + //保存消息记录 + log.info("消息发送中,开始入库本地消息记录表"); + PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, message); + //发消息 + PayMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(payMqMessageRecord, PayMqLocalRecordMessage.class); + mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE); + Message sendMessage = MessageBuilder.withPayload(mqLocalMessage).setHeader(RocketMQHeaders.KEYS, payMqMessageRecord.getMessageKey()).build(); + SendResult sendResult; + try { + sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage); + log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); + } catch (Exception e) { + editMessageRecordStatus(payMqMessageRecord.getId(), payMqMessageRecord.getStatus(),e.getMessage()); + log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), e.getMessage()); + log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); +// throw new RuntimeException(e); + } + } + + /** + * 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) + * (适合对响应时间敏感的业务场景) + */ + @Transactional + @Override + public void sendAsyncMsg(String topic, String tag, T message) { + RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); + //保存消息记录 + log.info("消息发送中,开始入库本地消息记录表"); + PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, message); + //发消息 + PayMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(payMqMessageRecord, PayMqLocalRecordMessage.class); + mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE); + rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message), JSON.toJSONString(sendResult)); + } + @Override + public void onException(Throwable throwable) { + log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); + log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); + payMqMessageRecord.setErrMsg(throwable.getMessage()); + editMessageRecordStatus(payMqMessageRecord.getId(),payMqMessageRecord.getStatus(),payMqMessageRecord.getErrMsg()); + } + }); + } + + /** + * 发送异步延时消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) + * (适合对响应时间敏感的业务场景) + */ + @Transactional + @Override + public void sendAsyncMsg(String topic, String tag, T message, int delayLevel) { + RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); + //保存消息记录 + log.info("消息发送中,开始入库本地消息记录表"); + PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, message); + //发消息 + PayMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(payMqMessageRecord, PayMqLocalRecordMessage.class); + mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE); + rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message),JSON.toJSONString(sendResult)); + } + @Override + public void onException(Throwable throwable) { + log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); + log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); + editMessageRecordStatus(payMqMessageRecord.getId(),payMqMessageRecord.getStatus(),throwable.getMessage()); + } + }, RocketMqConstant.TIME_OUT,delayLevel); + } + + /** + * 添加消息记录 + * @param message + * @param + */ + @Transactional + public PayMqMessageRecord saveMessageRecord(String topic, String tag, T message){ + topic = rocketMQEnhanceTemplate.reBuildTopic(topic); + PayMqMessageRecord payMqMessageRecord = new PayMqMessageRecord(); + payMqMessageRecord.setStatus(EnumMessageStatus.PROCESS); + payMqMessageRecord.setConsumerGroupName(topic); + payMqMessageRecord.setTopic(topic); + payMqMessageRecord.setTag(tag); + payMqMessageRecord.setCreateTime(new Date()); + payMqMessageRecord.setMessageKey(IdUtils.randomUUID()); + payMqMessageRecord.setContent(JSON.toJSONString(message)); + payMqMessageRecordMapper.insert(payMqMessageRecord); + return payMqMessageRecord; + } + + /** + * 修改消息记录状态 + * @param id + * @param status + */ + @Transactional + public void editMessageRecordStatus(Long id,EnumMessageStatus status,String errMsg){ + payMqMessageRecordMapper.updateStatusByStatus(id,status,errMsg); + } + +} diff --git a/bnyer-services/bnyer-pay/src/main/resources/com/bnyer/pay/mapper/PayMqMessageRecordMapper.xml b/bnyer-services/bnyer-pay/src/main/resources/com/bnyer/pay/mapper/PayMqMessageRecordMapper.xml new file mode 100644 index 0000000..89af895 --- /dev/null +++ b/bnyer-services/bnyer-pay/src/main/resources/com/bnyer/pay/mapper/PayMqMessageRecordMapper.xml @@ -0,0 +1,37 @@ + + + + + + + + + + + + + + + + + + t.id, + t.message_key, + t.topic, + t.tag, + t.consumer_group_name, + t.status, + t.err_msg, + t.content, + t.create_time, + + + update pay_mq_message_record + set status = #{status} + + ,err_msg = #{errMsg} + + where id = #{id} + + +