diff --git a/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/enums/EnumMessageType.java b/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/enums/EnumMessageType.java new file mode 100644 index 0000000..6100b5e --- /dev/null +++ b/bnyer-common/bnyer-common-core/src/main/java/com/bnyer/common/core/enums/EnumMessageType.java @@ -0,0 +1,18 @@ +package com.bnyer.common.core.enums; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** + * @author :WXC + * @Date :2023/05/18 + * @description :消息类型 + */ +@Getter +@AllArgsConstructor +public enum EnumMessageType { + //普通 + GENERAL, + //延时 + DELAY +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java index aa0f95e..f39c4f2 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java @@ -217,12 +217,13 @@ public abstract class EnhanceMessageHandler { * @param success * @param message */ - protected void buildRerunMessage(EnumMessageStatus success, OrderMqLocalRecordMessage message) { + protected OrderMqLocalRecordMessage buildRerunMessage(EnumMessageStatus success, OrderMqLocalRecordMessage message) { OrderMqLocalRecordMessage orderMqLocalRecordMessage = new OrderMqLocalRecordMessage(); orderMqLocalRecordMessage.setStatus(success); orderMqLocalRecordMessage.setId(message.getId()); orderMqLocalRecordMessage.setSource(ServiceNameConstants.IMG_SERVICE); orderMqLocalRecordMessage.setMessageKey(IdUtil.randomUUID()); + return orderMqLocalRecordMessage; } } \ No newline at end of file diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java index b13494b..da33d57 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java @@ -52,8 +52,8 @@ public class VipRecordCreateConsumer extends EnhanceMessageHandler + */ + void send(String topic, String tag, T message); + /** * 发送异步消息 * @param topic * @param tag - * @param addUserVipRecordMessage + * @param message * @param */ - void sendAsyncMsg(String topic, String tag, T addUserVipRecordMessage); + void sendAsyncMsg(String topic, String tag, T message); /** * 发送异步延时消息 + * 废弃:延时消息不走消息表,因为延时消息最少支持秒级延时,定时任务不可能对消息表进行秒级扫描,那样性能损耗太大了 + * 所以确保消息一定发送成功,走同步发送,如果发送失败直接抛异常,确保本地事物回滚 * @param topic * @param tag * @param message * @param delayLevel * @param */ + @Deprecated void sendAsyncMsg(String topic, String tag, T message, int delayLevel); /** diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java index cbc5487..8981048 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java @@ -1,6 +1,7 @@ package com.bnyer.order.service.impl; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import com.bnyer.common.core.constant.ServiceNameConstants; import com.bnyer.common.core.domain.OrderMqMessageRecord; import com.bnyer.common.core.enums.EnumMessageStatus; @@ -14,6 +15,8 @@ import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.apache.rocketmq.spring.support.RocketMQHeaders; +import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -41,6 +44,38 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ @Resource private OrderMqMessageRecordMapper orderMqMessageRecordMapper; + /** + * 发送同步步消息 + * @param topic + * @param tag + * @param message + * @param + */ + @Transactional + @Override + public void send(String topic, String tag, T message) { + // 设置业务键,此处根据公共的参数进行处理 + // 更多的其它基础业务处理... + RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); + //保存消息记录 + log.info("消息发送中,开始入库本地消息记录表"); + OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, message); + //发消息 + OrderMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(orderMqMessageRecord, OrderMqLocalRecordMessage.class); + mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE); + Message sendMessage = MessageBuilder.withPayload(mqLocalMessage).setHeader(RocketMQHeaders.KEYS, orderMqMessageRecord.getMessageKey()).build(); + SendResult sendResult; + try { + sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage); + log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); + } catch (Exception e) { + editMessageRecordStatus(orderMqMessageRecord.getId(), orderMqMessageRecord.getStatus(),e.getMessage()); + log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), e.getMessage()); + log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); +// throw new RuntimeException(e); + } + } + /** * 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) * (适合对响应时间敏感的业务场景) diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java index fcf58ab..f43db22 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java @@ -34,6 +34,7 @@ import com.bnyer.order.mapper.VipOrderMapper; import com.bnyer.order.service.OrderMqMessageRecordService; import com.bnyer.order.service.VipOrderService; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -118,8 +119,12 @@ public class VipOrderServiceImpl implements VipOrderService { //发送消息,如果三十分钟后没有支付,则取消订单 VipOrderCancelMessage vipOrderCancelMessage = new VipOrderCancelMessage(); vipOrderCancelMessage.setOrderNo(orderNo); -// rocketMQEnhanceTemplate.send(RocketMqTopic.ORDER_CANCEL_TOPIC, RocketMqTag.ORDER_VIP_TAG,vipOrderCancelMessage,RocketMqConstant.THIRTY_MINUTES); - orderMqMessageRecordService.sendAsyncMsg(RocketMqTopic.ORDER_CANCEL_TOPIC, RocketMqTag.ORDER_VIP_TAG,vipOrderCancelMessage,RocketMqConstant.THIRTY_MINUTES); + //延时消息不走消息表,因为延时消息最少支持秒级延时,定时任务不可能对消息表进行秒级扫描,那样性能损耗太大了 + //所以确保消息一定发送成功,走同步发送,如果发送失败直接抛异常,确保本地事物回滚 + SendStatus sendStatus = rocketMQEnhanceTemplate.send(RocketMqTopic.ORDER_CANCEL_TOPIC, RocketMqTag.ORDER_VIP_TAG, vipOrderCancelMessage, RocketMqConstant.THIRTY_MINUTES).getSendStatus(); + if (SendStatus.SEND_OK != sendStatus){ + throw new ServiceException("下单失败,请重试"); + } return orderNo; }