Browse Source

调整

feature-1.1
wuxicheng 3 years ago
parent
commit
482689abe4
  1. 7
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/BaseMessage.java
  2. 11
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/MqRecordMessage.java
  3. 36
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/OrderMqLocalRecordMessage.java
  4. 55
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java
  5. 21
      bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java
  6. 16
      bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/ImgReturnMessageConsumer.java
  7. 21
      bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java
  8. 43
      bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/ImgMqMessageRecordServiceImpl.java
  9. 14
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/OrderReturnMessageConsumer.java
  10. 20
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java
  11. 42
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java
  12. 12
      bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/listener/PayReturnMessageConsumer.java
  13. 43
      bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayMqMessageRecordServiceImpl.java

7
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/BaseMessage.java

@ -22,10 +22,15 @@ public class BaseMessage {
protected String source = "";
/**
* 消息主题
* 推送主题
*/
protected String topic;
/**
* 返回主题
*/
protected String returnTopic;
/**
* 消息tag
*/

11
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/pay/PayMqLocalRecordMessage.java → bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/MqRecordMessage.java

@ -1,7 +1,6 @@
package com.bnyer.common.rocketmq.domain.pay;
package com.bnyer.common.rocketmq.domain;
import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.rocketmq.domain.BaseMessage;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@ -9,19 +8,19 @@ import lombok.Setter;
/**
* @author :WXC
* @Date :2023/05/18
* @description :支付服务本地消息表记录
* @description :统一消息推送
*/
@Getter
@Setter
@NoArgsConstructor
public class PayMqLocalRecordMessage extends BaseMessage {
public class MqRecordMessage extends BaseMessage {
/**
* 主键id
* 消息记录表id
*/
private Long id;
/**
* 消息状态
* 消息记录表状态
*/
private EnumMessageStatus status;

36
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/OrderMqLocalRecordMessage.java

@ -1,36 +0,0 @@
package com.bnyer.common.rocketmq.domain.order;
import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.rocketmq.domain.BaseMessage;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.util.Date;
/**
* @author :WXC
* @Date :2023/05/18
* @description :订单服务本地消息表记录
*/
@Getter
@Setter
@NoArgsConstructor
public class OrderMqLocalRecordMessage extends BaseMessage {
/**
* 主键id
*/
private Long id;
/**
* 消息状态
*/
private EnumMessageStatus status;
/**
* 消息内容
*/
private String content;
}

55
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java

@ -1,24 +1,18 @@
package com.bnyer.common.rocketmq.handle;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ReflectUtil;
import com.alibaba.fastjson.JSONObject;
import com.bnyer.common.core.constant.ServiceNameConstants;
import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.rocketmq.config.RepeatConsumerConfig;
import com.bnyer.common.rocketmq.constant.RocketMqConstant;
import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.BaseMessage;
import com.bnyer.common.rocketmq.domain.MqRecordMessage;
import com.bnyer.common.rocketmq.domain.RepeatElement;
import com.bnyer.common.rocketmq.domain.img.ImgMqLocalRecordMessage;
import com.bnyer.common.rocketmq.domain.order.OrderMqLocalRecordMessage;
import com.bnyer.common.rocketmq.domain.pay.PayMqLocalRecordMessage;
import com.bnyer.common.rocketmq.persist.IPersist;
import com.bnyer.common.rocketmq.strategy.RepeatConsumerStrategy;
import com.bnyer.common.rocketmq.strategy.NormalRepeatStrategy;
import com.bnyer.common.rocketmq.strategy.RedisRepeatStrategy;
import com.bnyer.common.rocketmq.strategy.RepeatConsumerStrategy;
import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate;
import com.sun.org.apache.regexp.internal.RE;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
@ -228,41 +222,16 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
* @return
* @param <T>
*/
protected <T> T buildReturnMessage(String returnTopic,Long id,EnumMessageStatus status){
switch (returnTopic){
case RocketMqTopic.ORDER_RETURN_MSG_TOPIC:
OrderMqLocalRecordMessage orderMqLocalRecordMessage = new OrderMqLocalRecordMessage();
String orderReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(RocketMqTopic.ORDER_RETURN_MSG_TOPIC);
orderMqLocalRecordMessage.setMessageKey(IdUtil.randomUUID());
orderMqLocalRecordMessage.setSource(applicationName);
orderMqLocalRecordMessage.setTopic(orderReturnTopic);
orderMqLocalRecordMessage.setConsumerGroupName(orderReturnTopic);
orderMqLocalRecordMessage.setStatus(status);
orderMqLocalRecordMessage.setId(id);
return (T) orderMqLocalRecordMessage;
case RocketMqTopic.IMG_RETURN_MSG_TOPIC:
ImgMqLocalRecordMessage imgMqLocalRecordMessage = new ImgMqLocalRecordMessage();
String imgReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(RocketMqTopic.IMG_RETURN_MSG_TOPIC);
imgMqLocalRecordMessage.setMessageKey(IdUtil.randomUUID());
imgMqLocalRecordMessage.setSource(applicationName);
imgMqLocalRecordMessage.setTopic(imgReturnTopic);
imgMqLocalRecordMessage.setConsumerGroupName(imgReturnTopic);
imgMqLocalRecordMessage.setStatus(status);
imgMqLocalRecordMessage.setId(id);
return (T) imgMqLocalRecordMessage;
case RocketMqTopic.PAY_RETURN_MSG_TOPIC:
PayMqLocalRecordMessage payMqLocalRecordMessage = new PayMqLocalRecordMessage();
String payReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(RocketMqTopic.PAY_RETURN_MSG_TOPIC);
payMqLocalRecordMessage.setMessageKey(IdUtil.randomUUID());
payMqLocalRecordMessage.setSource(applicationName);
payMqLocalRecordMessage.setTopic(payReturnTopic);
payMqLocalRecordMessage.setConsumerGroupName(payReturnTopic);
payMqLocalRecordMessage.setStatus(status);
payMqLocalRecordMessage.setId(id);
return (T) payMqLocalRecordMessage;
default:
throw new RuntimeException("返回消息主题匹配有误");
}
protected MqRecordMessage buildReturnMessage(String returnTopic,Long id,EnumMessageStatus status){
MqRecordMessage mqRecordMessage = new MqRecordMessage();
String orderReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(returnTopic);
mqRecordMessage.setMessageKey(IdUtil.randomUUID());
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setTopic(orderReturnTopic);
mqRecordMessage.setConsumerGroupName(orderReturnTopic);
mqRecordMessage.setStatus(status);
mqRecordMessage.setId(id);
return mqRecordMessage;
}
}

21
bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java

@ -1,7 +1,6 @@
package com.bnyer.img.listener;
import com.alibaba.fastjson.JSON;
import com.bnyer.common.core.constant.ServiceNameConstants;
import com.bnyer.common.core.domain.FhUser;
import com.bnyer.common.core.domain.GoldLog;
import com.bnyer.common.core.domain.TiktokUser;
@ -10,8 +9,8 @@ import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.core.enums.GoldEnum;
import com.bnyer.common.core.utils.StringUtils;
import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.MqRecordMessage;
import com.bnyer.common.rocketmq.domain.img.GoldRewardMessage;
import com.bnyer.common.rocketmq.domain.order.OrderMqLocalRecordMessage;
import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler;
import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate;
import com.bnyer.img.service.FhUserService;
@ -34,7 +33,7 @@ import javax.annotation.Resource;
@Slf4j
@Component
@RocketMQMessageListener(topic = RocketMqTopic.GOLD_REWARD_TOPIC,consumerGroup = RocketMqTopic.GOLD_REWARD_TOPIC)
public class GoldRewardConsumer extends EnhanceMessageHandler<OrderMqLocalRecordMessage> implements RocketMQListener<OrderMqLocalRecordMessage> {
public class GoldRewardConsumer extends EnhanceMessageHandler<MqRecordMessage> implements RocketMQListener<MqRecordMessage> {
@Autowired
private TiktokUserService tiktokUserService;
@ -52,7 +51,7 @@ public class GoldRewardConsumer extends EnhanceMessageHandler<OrderMqLocalRecord
private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;
@Override
public void onMessage(OrderMqLocalRecordMessage message) {
public void onMessage(MqRecordMessage message) {
super.dispatchMessage(message);
String content = message.getContent();
GoldRewardMessage goldReward = JSON.parseObject(content, GoldRewardMessage.class);
@ -138,25 +137,25 @@ public class GoldRewardConsumer extends EnhanceMessageHandler<OrderMqLocalRecord
}
@Override
protected void handleMessage(OrderMqLocalRecordMessage message) throws Exception {
protected void handleMessage(MqRecordMessage message) throws Exception {
//发送返回队列,告知已经处理成功,完成最终一致性
//TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免
// 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的
OrderMqLocalRecordMessage orderMqLocalRecordMessage = super.buildReturnMessage(RocketMqTopic.ORDER_RETURN_MSG_TOPIC, message.getId(), EnumMessageStatus.SUCCESS);
rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.IMG_RETURN_MSG_TOPIC,null,orderMqLocalRecordMessage);
MqRecordMessage mqRecordMessage = super.buildReturnMessage(message.getReturnTopic(), message.getId(), EnumMessageStatus.SUCCESS);
rocketMQEnhanceTemplate.sendAsyncMsg(message.getReturnTopic(),null,mqRecordMessage);
}
@Override
protected void handleMaxRetriesExceeded(OrderMqLocalRecordMessage message) {
protected void handleMaxRetriesExceeded(MqRecordMessage message) {
//发送返回队列,告知已经超过最大重试次数,消费失败,需人工处理该记录
//TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免
// 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的
OrderMqLocalRecordMessage orderMqLocalRecordMessage = super.buildReturnMessage(RocketMqTopic.ORDER_RETURN_MSG_TOPIC, message.getId(), EnumMessageStatus.FAILS);
rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.IMG_RETURN_MSG_TOPIC,null,orderMqLocalRecordMessage);
MqRecordMessage mqRecordMessage = super.buildReturnMessage(message.getReturnTopic(), message.getId(), EnumMessageStatus.FAILS);
rocketMQEnhanceTemplate.sendAsyncMsg(message.getReturnTopic(),null,mqRecordMessage);
}
@Override
protected boolean filter(OrderMqLocalRecordMessage message) {
protected boolean filter(MqRecordMessage message) {
return super.handleMsgRepeat(message);
}

16
bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/ImgReturnMessageConsumer.java

@ -1,12 +1,8 @@
package com.bnyer.img.listener;
import com.bnyer.common.core.constant.ServiceNameConstants;
import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.img.ImgMqLocalRecordMessage;
import com.bnyer.common.rocketmq.domain.pay.PayMqLocalRecordMessage;
import com.bnyer.common.rocketmq.domain.MqRecordMessage;
import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler;
import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate;
import com.bnyer.img.service.ImgMqMessageRecordService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
@ -23,27 +19,27 @@ import javax.annotation.Resource;
@Slf4j
@Component
@RocketMQMessageListener(topic = RocketMqTopic.IMG_RETURN_MSG_TOPIC,consumerGroup = RocketMqTopic.IMG_RETURN_MSG_TOPIC)
public class ImgReturnMessageConsumer extends EnhanceMessageHandler<ImgMqLocalRecordMessage> implements RocketMQListener<ImgMqLocalRecordMessage> {
public class ImgReturnMessageConsumer extends EnhanceMessageHandler<MqRecordMessage> implements RocketMQListener<MqRecordMessage> {
@Resource
private ImgMqMessageRecordService imgMqMessageRecordService;
@Override
public void onMessage(ImgMqLocalRecordMessage message) {
public void onMessage(MqRecordMessage message) {
super.dispatchMessage(message);
imgMqMessageRecordService.editMessageRecordStatus(message.getId(),message.getStatus(),null);
}
@Override
protected void handleMessage(ImgMqLocalRecordMessage message) throws Exception {
protected void handleMessage(MqRecordMessage message) throws Exception {
}
@Override
protected void handleMaxRetriesExceeded(ImgMqLocalRecordMessage message) {
protected void handleMaxRetriesExceeded(MqRecordMessage message) {
}
@Override
protected boolean filter(ImgMqLocalRecordMessage message) {
protected boolean filter(MqRecordMessage message) {
return super.handleMsgRepeat(message);
}

21
bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java

@ -1,13 +1,12 @@
package com.bnyer.img.listener;
import com.alibaba.fastjson.JSON;
import com.bnyer.common.core.constant.ServiceNameConstants;
import com.bnyer.common.core.dto.AddUserVipRecordDto;
import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.core.utils.bean.EntityConvertUtil;
import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.MqRecordMessage;
import com.bnyer.common.rocketmq.domain.img.AddUserVipRecordMessage;
import com.bnyer.common.rocketmq.domain.order.OrderMqLocalRecordMessage;
import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler;
import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate;
import com.bnyer.img.service.UserVipRecordService;
@ -27,7 +26,7 @@ import javax.annotation.Resource;
@Slf4j
@Component
@RocketMQMessageListener(topic = RocketMqTopic.VIP_RECORD_CREATE_TOPIC,consumerGroup = RocketMqTopic.VIP_RECORD_CREATE_TOPIC)
public class VipRecordCreateConsumer extends EnhanceMessageHandler<OrderMqLocalRecordMessage> implements RocketMQListener<OrderMqLocalRecordMessage> {
public class VipRecordCreateConsumer extends EnhanceMessageHandler<MqRecordMessage> implements RocketMQListener<MqRecordMessage> {
@Resource
private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;
@ -36,7 +35,7 @@ public class VipRecordCreateConsumer extends EnhanceMessageHandler<OrderMqLocalR
private UserVipRecordService userVipRecordService;
@Override
public void onMessage(OrderMqLocalRecordMessage message) {
public void onMessage(MqRecordMessage message) {
super.dispatchMessage(message);
String content = message.getContent();
AddUserVipRecordMessage addUserVipRecordMessage = JSON.parseObject(content, AddUserVipRecordMessage.class);
@ -46,25 +45,25 @@ public class VipRecordCreateConsumer extends EnhanceMessageHandler<OrderMqLocalR
}
@Override
protected void handleMessage(OrderMqLocalRecordMessage message) throws Exception {
protected void handleMessage(MqRecordMessage message) throws Exception {
//发送返回队列,告知已经处理成功,完成最终一致性
//TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免
// 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的
OrderMqLocalRecordMessage orderMqLocalRecordMessage = super.buildReturnMessage(RocketMqTopic.ORDER_RETURN_MSG_TOPIC, message.getId(), EnumMessageStatus.SUCCESS);
rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.ORDER_RETURN_MSG_TOPIC,null,orderMqLocalRecordMessage);
MqRecordMessage mqRecordMessage = super.buildReturnMessage(message.getReturnTopic(), message.getId(), EnumMessageStatus.SUCCESS);
rocketMQEnhanceTemplate.sendAsyncMsg(message.getReturnTopic(),null,mqRecordMessage);
}
@Override
protected void handleMaxRetriesExceeded(OrderMqLocalRecordMessage message) {
protected void handleMaxRetriesExceeded(MqRecordMessage message) {
//发送返回队列,告知已经超过最大重试次数,消费失败,需人工处理该记录
//TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免
// 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的
OrderMqLocalRecordMessage orderMqLocalRecordMessage = super.buildReturnMessage(RocketMqTopic.ORDER_RETURN_MSG_TOPIC, message.getId(), EnumMessageStatus.FAILS);
rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.ORDER_RETURN_MSG_TOPIC,null,orderMqLocalRecordMessage);
MqRecordMessage mqRecordMessage = super.buildReturnMessage(message.getReturnTopic(), message.getId(), EnumMessageStatus.FAILS);
rocketMQEnhanceTemplate.sendAsyncMsg(message.getReturnTopic(),null,mqRecordMessage);
}
@Override
protected boolean filter(OrderMqLocalRecordMessage message) {
protected boolean filter(MqRecordMessage message) {
return super.handleMsgRepeat(message);
}

43
bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/ImgMqMessageRecordServiceImpl.java

@ -2,14 +2,13 @@ package com.bnyer.img.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bnyer.common.core.constant.ServiceNameConstants;
import com.bnyer.common.core.domain.ImgMqMessageRecord;
import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.core.utils.bean.EntityConvertUtil;
import com.bnyer.common.core.utils.uuid.IdUtils;
import com.bnyer.common.rocketmq.constant.RocketMqConstant;
import com.bnyer.common.rocketmq.domain.img.ImgMqLocalRecordMessage;
import com.bnyer.common.rocketmq.domain.img.ImgMqLocalRecordMessage;
import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.MqRecordMessage;
import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate;
import com.bnyer.img.mapper.ImgMqMessageRecordMapper;
import com.bnyer.img.service.ImgMqMessageRecordService;
@ -18,11 +17,13 @@ import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.core.env.Environment;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;
@ -42,6 +43,16 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService
@Resource
private ImgMqMessageRecordMapper imgMqMessageRecordMapper;
@Resource
private Environment env;
private String applicationName;
@PostConstruct
public void init(){
this.applicationName = env.getProperty("spring.application.name");
}
/**
* 发送同步步消息
* @param topic
@ -59,17 +70,18 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService
log.info("消息发送中,开始入库本地消息记录表");
ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, message);
//发消息
ImgMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(imgMqMessageRecord, ImgMqLocalRecordMessage.class);
mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE);
Message<ImgMqLocalRecordMessage> sendMessage = MessageBuilder.withPayload(mqLocalMessage).setHeader(RocketMQHeaders.KEYS, imgMqMessageRecord.getMessageKey()).build();
MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(imgMqMessageRecord, MqRecordMessage.class);
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(RocketMqTopic.IMG_RETURN_MSG_TOPIC);
Message<MqRecordMessage> sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, imgMqMessageRecord.getMessageKey()).build();
SendResult sendResult;
try {
sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage);
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
} catch (Exception e) {
editMessageRecordStatus(imgMqMessageRecord.getId(), imgMqMessageRecord.getStatus(),e.getMessage());
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), e.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
editMessageRecordStatus(imgMqMessageRecord.getId(), imgMqMessageRecord.getStatus(),e.getMessage());
// throw new RuntimeException(e);
}
}
@ -86,9 +98,10 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService
log.info("消息发送中,开始入库本地消息记录表");
ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, message);
//发消息
ImgMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(imgMqMessageRecord, ImgMqLocalRecordMessage.class);
mqLocalMessage.setSource(ServiceNameConstants.IMG_SERVICE);
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() {
MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(imgMqMessageRecord, MqRecordMessage.class);
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(RocketMqTopic.IMG_RETURN_MSG_TOPIC);
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message), JSON.toJSONString(sendResult));
@ -97,8 +110,7 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService
public void onException(Throwable throwable) {
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
imgMqMessageRecord.setErrMsg(throwable.getMessage());
editMessageRecordStatus(imgMqMessageRecord.getId(),imgMqMessageRecord.getStatus(),imgMqMessageRecord.getErrMsg());
editMessageRecordStatus(imgMqMessageRecord.getId(),imgMqMessageRecord.getStatus(),throwable.getMessage());
}
});
}
@ -115,9 +127,10 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService
log.info("消息发送中,开始入库本地消息记录表");
ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, message);
//发消息
ImgMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(imgMqMessageRecord, ImgMqLocalRecordMessage.class);
mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE);
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() {
MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(imgMqMessageRecord, MqRecordMessage.class);
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(RocketMqTopic.IMG_RETURN_MSG_TOPIC);
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message),JSON.toJSONString(sendResult));

14
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/OrderReturnMessageConsumer.java

@ -1,9 +1,7 @@
package com.bnyer.order.listener;
import com.bnyer.common.core.domain.OrderMqMessageRecord;
import com.bnyer.common.core.utils.bean.EntityConvertUtil;
import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.order.OrderMqLocalRecordMessage;
import com.bnyer.common.rocketmq.domain.MqRecordMessage;
import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler;
import com.bnyer.order.service.OrderMqMessageRecordService;
import lombok.extern.slf4j.Slf4j;
@ -21,29 +19,29 @@ import javax.annotation.Resource;
@Slf4j
@Component
@RocketMQMessageListener(topic = RocketMqTopic.ORDER_RETURN_MSG_TOPIC,consumerGroup = RocketMqTopic.ORDER_RETURN_MSG_TOPIC)
public class OrderReturnMessageConsumer extends EnhanceMessageHandler<OrderMqLocalRecordMessage> implements RocketMQListener<OrderMqLocalRecordMessage> {
public class OrderReturnMessageConsumer extends EnhanceMessageHandler<MqRecordMessage> implements RocketMQListener<MqRecordMessage> {
@Resource
private OrderMqMessageRecordService orderMqMessageRecordService;
@Override
public void onMessage(OrderMqLocalRecordMessage message) {
public void onMessage(MqRecordMessage message) {
super.dispatchMessage(message);
orderMqMessageRecordService.editMessageRecordStatus(message.getId(),message.getStatus(),null);
}
@Override
protected void handleMessage(OrderMqLocalRecordMessage message) throws Exception {
protected void handleMessage(MqRecordMessage message) throws Exception {
}
@Override
protected void handleMaxRetriesExceeded(OrderMqLocalRecordMessage message) {
protected void handleMaxRetriesExceeded(MqRecordMessage message) {
}
@Override
protected boolean filter(OrderMqLocalRecordMessage message) {
protected boolean filter(MqRecordMessage message) {
return super.handleMsgRepeat(message);
}

20
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java

@ -6,8 +6,8 @@ import com.bnyer.common.core.domain.VipOrder;
import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.rocketmq.constant.RocketMqTag;
import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.MqRecordMessage;
import com.bnyer.common.rocketmq.domain.order.VipOrderPayNotifyMessage;
import com.bnyer.common.rocketmq.domain.pay.PayMqLocalRecordMessage;
import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler;
import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate;
import com.bnyer.order.mapper.VipOrderMapper;
@ -29,7 +29,7 @@ import java.util.Objects;
@Component
@RocketMQMessageListener(topic = RocketMqTopic.ORDER_PAY_NOTIFY_TOPIC,selectorExpression = RocketMqTag.ORDER_VIP_TAG
,consumerGroup = RocketMqTopic.ORDER_PAY_NOTIFY_TOPIC)
public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler<PayMqLocalRecordMessage> implements RocketMQListener<PayMqLocalRecordMessage> {
public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler<MqRecordMessage> implements RocketMQListener<MqRecordMessage> {
@Resource
private VipOrderMapper vipOrderMapper;
@ -41,7 +41,7 @@ public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler<PayMqLocalR
private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;
@Override
public void onMessage(PayMqLocalRecordMessage message) {
public void onMessage(MqRecordMessage message) {
super.dispatchMessage(message);
//修改订单并添加会员记录
String content = message.getContent();
@ -57,25 +57,25 @@ public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler<PayMqLocalR
}
@Override
protected void handleMessage(PayMqLocalRecordMessage message) throws Exception {
protected void handleMessage(MqRecordMessage message) throws Exception {
//发送返回队列,告知已经处理成功,完成最终一致性
//TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免
// 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的
PayMqLocalRecordMessage payMqLocalRecordMessage = super.buildReturnMessage(RocketMqTopic.PAY_RETURN_MSG_TOPIC, message.getId(), EnumMessageStatus.SUCCESS);
rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.PAY_RETURN_MSG_TOPIC,null,payMqLocalRecordMessage);
MqRecordMessage mqRecordMessage = super.buildReturnMessage(message.getReturnTopic(), message.getId(), EnumMessageStatus.SUCCESS);
rocketMQEnhanceTemplate.sendAsyncMsg(message.getReturnTopic(),null,mqRecordMessage);
}
@Override
protected void handleMaxRetriesExceeded(PayMqLocalRecordMessage message) {
protected void handleMaxRetriesExceeded(MqRecordMessage message) {
//发送返回队列,告知已经超过最大重试次数,消费失败,需人工处理该记录
//TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免
// 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的
PayMqLocalRecordMessage payMqLocalRecordMessage = super.buildReturnMessage(RocketMqTopic.PAY_RETURN_MSG_TOPIC, message.getId(), EnumMessageStatus.FAILS);
rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.PAY_RETURN_MSG_TOPIC,null,payMqLocalRecordMessage);
MqRecordMessage mqRecordMessage = super.buildReturnMessage(message.getReturnTopic(), message.getId(), EnumMessageStatus.FAILS);
rocketMQEnhanceTemplate.sendAsyncMsg(message.getReturnTopic(),null,mqRecordMessage);
}
@Override
protected boolean filter(PayMqLocalRecordMessage message) {
protected boolean filter(MqRecordMessage message) {
return super.handleMsgRepeat(message);
}

42
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java

@ -2,13 +2,13 @@ package com.bnyer.order.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bnyer.common.core.constant.ServiceNameConstants;
import com.bnyer.common.core.domain.OrderMqMessageRecord;
import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.core.utils.bean.EntityConvertUtil;
import com.bnyer.common.core.utils.uuid.IdUtils;
import com.bnyer.common.rocketmq.constant.RocketMqConstant;
import com.bnyer.common.rocketmq.domain.order.OrderMqLocalRecordMessage;
import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.MqRecordMessage;
import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate;
import com.bnyer.order.mapper.OrderMqMessageRecordMapper;
import com.bnyer.order.service.OrderMqMessageRecordService;
@ -17,11 +17,13 @@ import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.core.env.Environment;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;
@ -40,6 +42,16 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ
@Resource
private OrderMqMessageRecordMapper orderMqMessageRecordMapper;
@Resource
private Environment env;
private String applicationName;
@PostConstruct
public void init(){
this.applicationName = env.getProperty("spring.application.name");
}
/**
* 发送同步步消息
* @param topic
@ -57,17 +69,18 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ
log.info("消息发送中,开始入库本地消息记录表");
OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, message);
//发消息
OrderMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(orderMqMessageRecord, OrderMqLocalRecordMessage.class);
mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE);
Message<OrderMqLocalRecordMessage> sendMessage = MessageBuilder.withPayload(mqLocalMessage).setHeader(RocketMQHeaders.KEYS, orderMqMessageRecord.getMessageKey()).build();
MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(orderMqMessageRecord, MqRecordMessage.class);
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(RocketMqTopic.ORDER_RETURN_MSG_TOPIC);
Message<MqRecordMessage> sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, orderMqMessageRecord.getMessageKey()).build();
SendResult sendResult;
try {
sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage);
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
} catch (Exception e) {
editMessageRecordStatus(orderMqMessageRecord.getId(), orderMqMessageRecord.getStatus(),e.getMessage());
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), e.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
editMessageRecordStatus(orderMqMessageRecord.getId(), orderMqMessageRecord.getStatus(),e.getMessage());
// throw new RuntimeException(e);
}
}
@ -84,9 +97,10 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ
log.info("消息发送中,开始入库本地消息记录表");
OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, message);
//发消息
OrderMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(orderMqMessageRecord, OrderMqLocalRecordMessage.class);
mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE);
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() {
MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(orderMqMessageRecord, MqRecordMessage.class);
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(RocketMqTopic.ORDER_RETURN_MSG_TOPIC);
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message), JSON.toJSONString(sendResult));
@ -95,8 +109,7 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ
public void onException(Throwable throwable) {
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
orderMqMessageRecord.setErrMsg(throwable.getMessage());
editMessageRecordStatus(orderMqMessageRecord.getId(),orderMqMessageRecord.getStatus(),orderMqMessageRecord.getErrMsg());
editMessageRecordStatus(orderMqMessageRecord.getId(),orderMqMessageRecord.getStatus(),throwable.getMessage());
}
});
}
@ -113,9 +126,10 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ
log.info("消息发送中,开始入库本地消息记录表");
OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, message);
//发消息
OrderMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(orderMqMessageRecord, OrderMqLocalRecordMessage.class);
mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE);
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() {
MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(orderMqMessageRecord, MqRecordMessage.class);
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(RocketMqTopic.ORDER_RETURN_MSG_TOPIC);
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message),JSON.toJSONString(sendResult));

12
bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/listener/PayReturnMessageConsumer.java

@ -1,7 +1,7 @@
package com.bnyer.pay.listener;
import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.pay.PayMqLocalRecordMessage;
import com.bnyer.common.rocketmq.domain.MqRecordMessage;
import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler;
import com.bnyer.pay.service.PayMqMessageRecordService;
import lombok.extern.slf4j.Slf4j;
@ -19,29 +19,29 @@ import javax.annotation.Resource;
@Slf4j
@Component
@RocketMQMessageListener(topic = RocketMqTopic.PAY_RETURN_MSG_TOPIC,consumerGroup = RocketMqTopic.PAY_RETURN_MSG_TOPIC)
public class PayReturnMessageConsumer extends EnhanceMessageHandler<PayMqLocalRecordMessage> implements RocketMQListener<PayMqLocalRecordMessage> {
public class PayReturnMessageConsumer extends EnhanceMessageHandler<MqRecordMessage> implements RocketMQListener<MqRecordMessage> {
@Resource
private PayMqMessageRecordService payMqMessageRecordService;
@Override
public void onMessage(PayMqLocalRecordMessage message) {
public void onMessage(MqRecordMessage message) {
super.dispatchMessage(message);
payMqMessageRecordService.editMessageRecordStatus(message.getId(),message.getStatus(),null);
}
@Override
protected void handleMessage(PayMqLocalRecordMessage message) throws Exception {
protected void handleMessage(MqRecordMessage message) throws Exception {
}
@Override
protected void handleMaxRetriesExceeded(PayMqLocalRecordMessage message) {
protected void handleMaxRetriesExceeded(MqRecordMessage message) {
}
@Override
protected boolean filter(PayMqLocalRecordMessage message) {
protected boolean filter(MqRecordMessage message) {
return super.handleMsgRepeat(message);
}

43
bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayMqMessageRecordServiceImpl.java

@ -2,13 +2,13 @@ package com.bnyer.pay.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bnyer.common.core.constant.ServiceNameConstants;
import com.bnyer.common.core.domain.PayMqMessageRecord;
import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.core.utils.bean.EntityConvertUtil;
import com.bnyer.common.core.utils.uuid.IdUtils;
import com.bnyer.common.rocketmq.constant.RocketMqConstant;
import com.bnyer.common.rocketmq.domain.pay.PayMqLocalRecordMessage;
import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.MqRecordMessage;
import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate;
import com.bnyer.pay.mapper.PayMqMessageRecordMapper;
import com.bnyer.pay.service.PayMqMessageRecordService;
@ -17,11 +17,13 @@ import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.core.env.Environment;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Date;
@ -40,6 +42,17 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService
@Resource
private PayMqMessageRecordMapper payMqMessageRecordMapper;
@Resource
private Environment env;
private String applicationName;
@PostConstruct
public void init(){
this.applicationName = env.getProperty("spring.application.name");
}
/**
* 发送同步步消息
* @param topic
@ -57,17 +70,18 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService
log.info("消息发送中,开始入库本地消息记录表");
PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, message);
//发消息
PayMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(payMqMessageRecord, PayMqLocalRecordMessage.class);
mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE);
Message<PayMqLocalRecordMessage> sendMessage = MessageBuilder.withPayload(mqLocalMessage).setHeader(RocketMQHeaders.KEYS, payMqMessageRecord.getMessageKey()).build();
MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(payMqMessageRecord, MqRecordMessage.class);
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(RocketMqTopic.PAY_RETURN_MSG_TOPIC);
Message<MqRecordMessage> sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, payMqMessageRecord.getMessageKey()).build();
SendResult sendResult;
try {
sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage);
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
} catch (Exception e) {
editMessageRecordStatus(payMqMessageRecord.getId(), payMqMessageRecord.getStatus(),e.getMessage());
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), e.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
editMessageRecordStatus(payMqMessageRecord.getId(), payMqMessageRecord.getStatus(),e.getMessage());
// throw new RuntimeException(e);
}
}
@ -84,9 +98,10 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService
log.info("消息发送中,开始入库本地消息记录表");
PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, message);
//发消息
PayMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(payMqMessageRecord, PayMqLocalRecordMessage.class);
mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE);
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() {
MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(payMqMessageRecord, MqRecordMessage.class);
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(RocketMqTopic.PAY_RETURN_MSG_TOPIC);
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message), JSON.toJSONString(sendResult));
@ -95,8 +110,7 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService
public void onException(Throwable throwable) {
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
payMqMessageRecord.setErrMsg(throwable.getMessage());
editMessageRecordStatus(payMqMessageRecord.getId(),payMqMessageRecord.getStatus(),payMqMessageRecord.getErrMsg());
editMessageRecordStatus(payMqMessageRecord.getId(),payMqMessageRecord.getStatus(),throwable.getMessage());
}
});
}
@ -113,9 +127,10 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService
log.info("消息发送中,开始入库本地消息记录表");
PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, message);
//发消息
PayMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(payMqMessageRecord, PayMqLocalRecordMessage.class);
mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE);
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() {
MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(payMqMessageRecord, MqRecordMessage.class);
mqRecordMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(RocketMqTopic.PAY_RETURN_MSG_TOPIC);
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message),JSON.toJSONString(sendResult));

Loading…
Cancel
Save