Browse Source

优化

feature-1.1
wuxicheng 3 years ago
parent
commit
8d438644ef
  1. 2
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/BnyerMessage.java
  2. 53
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java
  3. 48
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java
  4. 4
      bnyer-services/bnyer-img/src/main/java/com/bnyer/img/BnyerImgApplication.java
  5. 25
      bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java
  6. 12
      bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/ImgReturnMessageConsumer.java
  7. 20
      bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java
  8. 4
      bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/ImgMqMessageRecordService.java
  9. 64
      bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/ImgMqMessageRecordServiceImpl.java
  10. 4
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/BnyerOrderApplication.java
  11. 12
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/OrderReturnMessageConsumer.java
  12. 12
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderCancelConsumer.java
  13. 22
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java
  14. 4
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/OrderMqMessageRecordService.java
  15. 60
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java
  16. 4
      bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/BnyerPayApplication.java
  17. 12
      bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/listener/PayReturnMessageConsumer.java
  18. 4
      bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/PayMqMessageRecordService.java
  19. 62
      bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayMqMessageRecordServiceImpl.java

2
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/MqRecordMessage.java → bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/BnyerMessage.java

@ -14,7 +14,7 @@ import lombok.Setter;
@Getter @Getter
@Setter @Setter
@NoArgsConstructor @NoArgsConstructor
public class MqRecordMessage extends BaseMessage { public class BnyerMessage extends BaseMessage {
/** /**
* 消息记录表id * 消息记录表id
*/ */

53
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java

@ -2,10 +2,11 @@ package com.bnyer.common.rocketmq.handle;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.core.utils.StringUtils;
import com.bnyer.common.rocketmq.config.RepeatConsumerConfig; import com.bnyer.common.rocketmq.config.RepeatConsumerConfig;
import com.bnyer.common.rocketmq.constant.RocketMqConstant; import com.bnyer.common.rocketmq.constant.RocketMqConstant;
import com.bnyer.common.rocketmq.domain.BaseMessage; import com.bnyer.common.rocketmq.domain.BaseMessage;
import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.domain.BnyerMessage;
import com.bnyer.common.rocketmq.domain.RepeatElement; import com.bnyer.common.rocketmq.domain.RepeatElement;
import com.bnyer.common.rocketmq.persist.IPersist; import com.bnyer.common.rocketmq.persist.IPersist;
import com.bnyer.common.rocketmq.strategy.NormalRepeatStrategy; import com.bnyer.common.rocketmq.strategy.NormalRepeatStrategy;
@ -29,7 +30,7 @@ import java.util.function.Function;
* @description : 消息模板抽象类父类提供公共模板方法 * @description : 消息模板抽象类父类提供公共模板方法
*/ */
@Slf4j @Slf4j
public abstract class EnhanceMessageHandler<T extends BaseMessage> { public abstract class EnhanceMessageHandler {
/** /**
* 默认重试次数 * 默认重试次数
*/ */
@ -65,14 +66,14 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
* @param message 待处理消息 * @param message 待处理消息
* @throws Exception 消费异常 * @throws Exception 消费异常
*/ */
protected abstract void handleMessage(T message) throws Exception; protected abstract void handleMessage(BnyerMessage message) throws Exception;
/** /**
* 超过重试次数消息需要启用isRetry * 超过重试次数消息需要启用isRetry
* *
* @param message 待处理消息 * @param message 待处理消息
*/ */
protected abstract void handleMaxRetriesExceeded(T message); protected abstract void handleMaxRetriesExceeded(BnyerMessage message);
/** /**
@ -80,7 +81,7 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
* @param message 待处理消息 * @param message 待处理消息
* @return true: 本次消息被过滤false不过滤 * @return true: 本次消息被过滤false不过滤
*/ */
protected abstract boolean filter(T message); protected abstract boolean filter(BnyerMessage message);
/** /**
* 是否异常时重复发送 * 是否异常时重复发送
@ -117,7 +118,7 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
/** /**
* 使用模板模式构建消息消费框架可自由扩展或删减 * 使用模板模式构建消息消费框架可自由扩展或删减
*/ */
public void dispatchMessage(T message) { public void dispatchMessage(BnyerMessage message) {
// 基础日志记录被父类处理了 // 基础日志记录被父类处理了
log.info("消费者收到消息[{}]", JSONObject.toJSON(message)); log.info("消费者收到消息[{}]", JSONObject.toJSON(message));
if (filter(message)) { if (filter(message)) {
@ -127,6 +128,8 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
//超过最大重试次数时调用子类方法处理 //超过最大重试次数时调用子类方法处理
if (message.getRetryTimes() > getMaxRetryTimes()) { if (message.getRetryTimes() > getMaxRetryTimes()) {
handleMaxRetriesExceeded(message); handleMaxRetriesExceeded(message);
//发送失败返回通知
sendReturnFailsMessage(message);
return; return;
} }
IPersist persist = repeatConsumerConfig.getPersist(); IPersist persist = repeatConsumerConfig.getPersist();
@ -137,6 +140,8 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
try { try {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
handleMessage(message); handleMessage(message);
//发送成功返回通知
sendReturnSuccessMessage(message);
long costTime = System.currentTimeMillis() - now; long costTime = System.currentTimeMillis() - now;
log.info("消息{}消费成功,耗时[{}ms],修改缓存状态为已消费:{}", message.getMessageKey(),costTime, repeatElement); log.info("消息{}消费成功,耗时[{}ms],修改缓存状态为已消费:{}", message.getMessageKey(),costTime, repeatElement);
persist.markConsumed(repeatElement, repeatConsumerConfig.getRecordReserveMinutes()); persist.markConsumed(repeatElement, repeatConsumerConfig.getRecordReserveMinutes());
@ -161,7 +166,7 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
* 消息重试 * 消息重试
* @param message * @param message
*/ */
protected void handleRetry(T message) { protected void handleRetry(BnyerMessage message) {
// 获取子类RocketMQMessageListener注解拿到topic和tag // 获取子类RocketMQMessageListener注解拿到topic和tag
RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class); RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
if (annotation == null) { if (annotation == null) {
@ -196,7 +201,7 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
* @param message * @param message
* @return * @return
*/ */
protected Boolean handleMsgRepeat(final T message) { protected Boolean handleMsgRepeat(final BnyerMessage message) {
RepeatConsumerStrategy strategy = new NormalRepeatStrategy(); RepeatConsumerStrategy strategy = new NormalRepeatStrategy();
Function<BaseMessage, String> repeatKeyFunction = baseMessage -> repeatMessageKey(message); Function<BaseMessage, String> repeatKeyFunction = baseMessage -> repeatMessageKey(message);
if (repeatConsumerConfig.getRepeatStrategy() == RepeatConsumerConfig.REPEAT_STRATEGY_CONSUME_LATER) { if (repeatConsumerConfig.getRepeatStrategy() == RepeatConsumerConfig.REPEAT_STRATEGY_CONSUME_LATER) {
@ -209,7 +214,7 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
/** /**
* 默认拿消息key 作为去重的标识子类可复写该方法自定义去重标识 * 默认拿消息key 作为去重的标识子类可复写该方法自定义去重标识
*/ */
protected String repeatMessageKey(T message) { protected String repeatMessageKey(BnyerMessage message) {
return message.getMessageKey(); return message.getMessageKey();
} }
@ -217,22 +222,32 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
* 返回成功通知 * 返回成功通知
* @param message * @param message
*/ */
protected void sendReturnSuccessMessage(MqRecordMessage message){ protected void sendReturnSuccessMessage(BnyerMessage message){
MqRecordMessage returnMessage = new MqRecordMessage(); //发送返回队列,告知已经处理成功,完成最终一致性
returnMessage.setStatus(EnumMessageStatus.SUCCESS); //TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免
returnMessage.setId(message.getId()); // 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的
rocketMQEnhanceTemplate.sendAsyncMsg(message.getReturnTopic(),null,returnMessage); if (StringUtils.isNotBlank(message.getReturnTopic())){
BnyerMessage returnMessage = new BnyerMessage();
returnMessage.setStatus(EnumMessageStatus.SUCCESS);
returnMessage.setId(message.getId());
rocketMQEnhanceTemplate.sendAsyncMsg(message.getReturnTopic(),null,returnMessage);
}
} }
/** /**
* 返回失败通知 * 返回失败通知
* @param message * @param message
*/ */
protected void sendReturnFailsMessage(MqRecordMessage message){ protected void sendReturnFailsMessage(BnyerMessage message){
MqRecordMessage returnMessage = new MqRecordMessage(); //发送返回队列,告知已经超过最大重试次数,消费失败,需人工处理该记录
returnMessage.setStatus(EnumMessageStatus.FAILS); //TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免
returnMessage.setId(message.getId()); // 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的
rocketMQEnhanceTemplate.sendAsyncMsg(message.getReturnTopic(),null,returnMessage); if (StringUtils.isNotBlank(message.getReturnTopic())){
BnyerMessage returnMessage = new BnyerMessage();
returnMessage.setStatus(EnumMessageStatus.FAILS);
returnMessage.setId(message.getId());
rocketMQEnhanceTemplate.sendAsyncMsg(message.getReturnTopic(),null,returnMessage);
}
} }
} }

48
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java

@ -5,7 +5,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.bnyer.common.rocketmq.config.RocketEnhanceProperties; import com.bnyer.common.rocketmq.config.RocketEnhanceProperties;
import com.bnyer.common.rocketmq.constant.RocketMqConstant; import com.bnyer.common.rocketmq.constant.RocketMqConstant;
import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.domain.BnyerMessage;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -84,8 +84,8 @@ public class RocketMQEnhanceTemplate {
public <T> SendResult send(String topic, String tag, T message) { public <T> SendResult send(String topic, String tag, T message) {
// 设置业务键,此处根据公共的参数进行处理 // 设置业务键,此处根据公共的参数进行处理
// 更多的其它基础业务处理... // 更多的其它基础业务处理...
MqRecordMessage mqRecordMessage = buildBaseMessage(topic, tag, message); BnyerMessage bnyerMessage = buildBaseMessage(topic, tag, message);
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, mqRecordMessage.getMessageKey()).build(); Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, bnyerMessage.getMessageKey()).build();
SendResult sendResult; SendResult sendResult;
try { try {
sendResult = template.syncSend(buildDestination(topic,tag), sendMessage); sendResult = template.syncSend(buildDestination(topic,tag), sendMessage);
@ -93,7 +93,7 @@ public class RocketMQEnhanceTemplate {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
// 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集 // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(mqRecordMessage), JSONObject.toJSON(sendResult)); log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(bnyerMessage), JSONObject.toJSON(sendResult));
return sendResult; return sendResult;
} }
@ -107,15 +107,15 @@ public class RocketMQEnhanceTemplate {
* @param <T> * @param <T>
*/ */
public <T> SendResult send(String topic, String tag, T message, int delayLevel) { public <T> SendResult send(String topic, String tag, T message, int delayLevel) {
MqRecordMessage mqRecordMessage = buildBaseMessage(topic, tag, message); BnyerMessage bnyerMessage = buildBaseMessage(topic, tag, message);
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, mqRecordMessage.getMessageKey()).build(); Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, bnyerMessage.getMessageKey()).build();
SendResult sendResult; SendResult sendResult;
try { try {
sendResult = template.syncSend(buildDestination(topic,tag), sendMessage, RocketMqConstant.TIME_OUT, delayLevel); sendResult = template.syncSend(buildDestination(topic,tag), sendMessage, RocketMqConstant.TIME_OUT, delayLevel);
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
log.info("topic:[{}]tag:[{}]延迟等级[{}]消息[{}]发送结果[{}]", topic,tag, delayLevel, JSONObject.toJSON(mqRecordMessage), JSONObject.toJSON(sendResult)); log.info("topic:[{}]tag:[{}]延迟等级[{}]消息[{}]发送结果[{}]", bnyerMessage.getTopic(),bnyerMessage.getTag(), delayLevel, JSONObject.toJSON(bnyerMessage), JSONObject.toJSON(sendResult));
return sendResult; return sendResult;
} }
@ -124,17 +124,17 @@ public class RocketMQEnhanceTemplate {
* 适合对响应时间敏感的业务场景 * 适合对响应时间敏感的业务场景
*/ */
public <T> void sendAsyncMsg(String topic, String tag, T message) { public <T> void sendAsyncMsg(String topic, String tag, T message) {
MqRecordMessage mqRecordMessage = buildBaseMessage(topic, tag, message); BnyerMessage bnyerMessage = buildBaseMessage(topic, tag, message);
template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() { template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
// 处理消息发送成功逻辑 // 处理消息发送成功逻辑
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(mqRecordMessage), JSON.toJSONString(sendResult)); log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",bnyerMessage.getTopic(),bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage), JSON.toJSONString(sendResult));
} }
@Override @Override
public void onException(Throwable throwable) { public void onException(Throwable throwable) {
// 处理消息发送失败逻辑 // 处理消息发送失败逻辑
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage()); log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",bnyerMessage.getTopic(),bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),throwable.getMessage());
} }
}); });
} }
@ -144,16 +144,16 @@ public class RocketMQEnhanceTemplate {
* 适合对响应时间敏感的业务场景 * 适合对响应时间敏感的业务场景
*/ */
public <T> void sendAsyncMsg(String topic, String tag, T message, int delayLevel) { public <T> void sendAsyncMsg(String topic, String tag, T message, int delayLevel) {
MqRecordMessage mqRecordMessage = buildBaseMessage(topic, tag, message); BnyerMessage bnyerMessage = buildBaseMessage(topic, tag, message);
template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() { template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
// 处理消息发送成功逻辑 // 处理消息发送成功逻辑
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(mqRecordMessage),JSON.toJSONString(sendResult)); log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",bnyerMessage.getTopic(),bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),JSON.toJSONString(sendResult));
} }
@Override @Override
public void onException(Throwable throwable) { public void onException(Throwable throwable) {
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage()); log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",bnyerMessage.getTopic(),bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),throwable.getMessage());
} }
}, RocketMqConstant.TIME_OUT,delayLevel); }, RocketMqConstant.TIME_OUT,delayLevel);
} }
@ -167,9 +167,9 @@ public class RocketMQEnhanceTemplate {
* @param <T> * @param <T>
*/ */
public <T> boolean sendTransactionalMsg(String topic, String tag,String arg, T message) { public <T> boolean sendTransactionalMsg(String topic, String tag,String arg, T message) {
MqRecordMessage mqRecordMessage = buildBaseMessage(topic, tag, message); BnyerMessage bnyerMessage = buildBaseMessage(topic, tag, message);
String destination = buildDestination(topic, tag); String destination = buildDestination(topic, tag);
TransactionSendResult sendResult = template.sendMessageInTransaction(destination, MessageBuilder.withPayload(mqRecordMessage).build(), arg); TransactionSendResult sendResult = template.sendMessageInTransaction(destination, MessageBuilder.withPayload(bnyerMessage).build(), arg);
log.info("Send transaction msg result: " + sendResult); log.info("Send transaction msg result: " + sendResult);
return sendResult.getSendStatus() == SendStatus.SEND_OK; return sendResult.getSendStatus() == SendStatus.SEND_OK;
} }
@ -182,16 +182,16 @@ public class RocketMQEnhanceTemplate {
* @param message * @param message
* @param <T> * @param <T>
*/ */
private <T> MqRecordMessage buildBaseMessage(String topic,String tag,T message){ private <T> BnyerMessage buildBaseMessage(String topic, String tag, T message){
MqRecordMessage mqRecordMessage = new MqRecordMessage(); BnyerMessage bnyerMessage = new BnyerMessage();
String buildTopic = reBuildTopic(topic); String buildTopic = reBuildTopic(topic);
mqRecordMessage.setSource(applicationName); bnyerMessage.setSource(applicationName);
mqRecordMessage.setMessageKey(applicationName + "-" + IdUtil.getSnowflakeNextIdStr()); bnyerMessage.setMessageKey(applicationName + "-" + IdUtil.getSnowflakeNextIdStr());
mqRecordMessage.setTopic(buildTopic); bnyerMessage.setTopic(buildTopic);
mqRecordMessage.setTag(tag); bnyerMessage.setTag(tag);
mqRecordMessage.setConsumerGroupName(buildTopic); bnyerMessage.setConsumerGroupName(buildTopic);
mqRecordMessage.setContent(JSON.toJSONString(message)); bnyerMessage.setContent(JSON.toJSONString(message));
return mqRecordMessage; return bnyerMessage;
} }
} }

4
bnyer-services/bnyer-img/src/main/java/com/bnyer/img/BnyerImgApplication.java

@ -5,6 +5,7 @@ import com.bnyer.common.security.annotation.EnableRyFeignClients;
import com.bnyer.common.swagger.annotation.EnableCustomSwagger2; import com.bnyer.common.swagger.annotation.EnableCustomSwagger2;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableAsync;
/** /**
@ -15,7 +16,8 @@ import org.springframework.scheduling.annotation.EnableAsync;
@EnableCustomConfig @EnableCustomConfig
@EnableCustomSwagger2 @EnableCustomSwagger2
@EnableRyFeignClients @EnableRyFeignClients
@SpringBootApplication(scanBasePackages = { "com.bnyer.common"}) @ComponentScan(basePackages = "com.bnyer")
@SpringBootApplication
@EnableAsync @EnableAsync
public class BnyerImgApplication public class BnyerImgApplication
{ {

25
bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java

@ -1,18 +1,15 @@
package com.bnyer.img.listener; package com.bnyer.img.listener;
import com.alibaba.fastjson.JSON;
import com.bnyer.common.core.domain.FhUser; import com.bnyer.common.core.domain.FhUser;
import com.bnyer.common.core.domain.GoldLog; import com.bnyer.common.core.domain.GoldLog;
import com.bnyer.common.core.domain.TiktokUser; import com.bnyer.common.core.domain.TiktokUser;
import com.bnyer.common.core.domain.WxUser; import com.bnyer.common.core.domain.WxUser;
import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.core.enums.GoldEnum; import com.bnyer.common.core.enums.GoldEnum;
import com.bnyer.common.core.utils.StringUtils; import com.bnyer.common.core.utils.StringUtils;
import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.domain.BnyerMessage;
import com.bnyer.common.rocketmq.domain.img.GoldRewardMessage; import com.bnyer.common.rocketmq.domain.img.GoldRewardMessage;
import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler;
import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate;
import com.bnyer.img.service.FhUserService; import com.bnyer.img.service.FhUserService;
import com.bnyer.img.service.GoldLogService; import com.bnyer.img.service.GoldLogService;
import com.bnyer.img.service.TiktokUserService; import com.bnyer.img.service.TiktokUserService;
@ -23,8 +20,6 @@ import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/** /**
* @author :penny * @author :penny
* @Date :2023/05/17 * @Date :2023/05/17
@ -33,7 +28,7 @@ import javax.annotation.Resource;
@Slf4j @Slf4j
@Component @Component
@RocketMQMessageListener(topic = RocketMqTopic.GOLD_REWARD_TOPIC,consumerGroup = RocketMqTopic.GOLD_REWARD_TOPIC) @RocketMQMessageListener(topic = RocketMqTopic.GOLD_REWARD_TOPIC,consumerGroup = RocketMqTopic.GOLD_REWARD_TOPIC)
public class GoldRewardConsumer extends EnhanceMessageHandler<MqRecordMessage> implements RocketMQListener<MqRecordMessage> { public class GoldRewardConsumer extends EnhanceMessageHandler implements RocketMQListener<BnyerMessage> {
@Autowired @Autowired
private TiktokUserService tiktokUserService; private TiktokUserService tiktokUserService;
@ -48,7 +43,7 @@ public class GoldRewardConsumer extends EnhanceMessageHandler<MqRecordMessage> i
private GoldLogService goldLogService; private GoldLogService goldLogService;
@Override @Override
public void onMessage(MqRecordMessage message) { public void onMessage(BnyerMessage message) {
super.dispatchMessage(message); super.dispatchMessage(message);
GoldRewardMessage goldReward = message.getObject(GoldRewardMessage.class); GoldRewardMessage goldReward = message.getObject(GoldRewardMessage.class);
if(StringUtils.isNotNull(goldReward.getPlatform())){ if(StringUtils.isNotNull(goldReward.getPlatform())){
@ -133,23 +128,15 @@ public class GoldRewardConsumer extends EnhanceMessageHandler<MqRecordMessage> i
} }
@Override @Override
protected void handleMessage(MqRecordMessage message) throws Exception { protected void handleMessage(BnyerMessage message) throws Exception {
//发送返回队列,告知已经处理成功,完成最终一致性
//TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免
// 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的
super.sendReturnSuccessMessage(message);
} }
@Override @Override
protected void handleMaxRetriesExceeded(MqRecordMessage message) { protected void handleMaxRetriesExceeded(BnyerMessage message) {
//发送返回队列,告知已经超过最大重试次数,消费失败,需人工处理该记录
//TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免
// 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的
super.sendReturnFailsMessage(message);
} }
@Override @Override
protected boolean filter(MqRecordMessage message) { protected boolean filter(BnyerMessage message) {
return super.handleMsgRepeat(message); return super.handleMsgRepeat(message);
} }

12
bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/ImgReturnMessageConsumer.java

@ -1,7 +1,7 @@
package com.bnyer.img.listener; package com.bnyer.img.listener;
import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.domain.BnyerMessage;
import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler;
import com.bnyer.img.service.ImgMqMessageRecordService; import com.bnyer.img.service.ImgMqMessageRecordService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -19,27 +19,27 @@ import javax.annotation.Resource;
@Slf4j @Slf4j
@Component @Component
@RocketMQMessageListener(topic = RocketMqTopic.IMG_RETURN_MSG_TOPIC,consumerGroup = RocketMqTopic.IMG_RETURN_MSG_TOPIC) @RocketMQMessageListener(topic = RocketMqTopic.IMG_RETURN_MSG_TOPIC,consumerGroup = RocketMqTopic.IMG_RETURN_MSG_TOPIC)
public class ImgReturnMessageConsumer extends EnhanceMessageHandler<MqRecordMessage> implements RocketMQListener<MqRecordMessage> { public class ImgReturnMessageConsumer extends EnhanceMessageHandler implements RocketMQListener<BnyerMessage> {
@Resource @Resource
private ImgMqMessageRecordService imgMqMessageRecordService; private ImgMqMessageRecordService imgMqMessageRecordService;
@Override @Override
public void onMessage(MqRecordMessage message) { public void onMessage(BnyerMessage message) {
super.dispatchMessage(message); super.dispatchMessage(message);
imgMqMessageRecordService.editMessageRecordStatus(message.getId(),message.getStatus(),null); imgMqMessageRecordService.editMessageRecordStatus(message.getId(),message.getStatus(),null);
} }
@Override @Override
protected void handleMessage(MqRecordMessage message) throws Exception { protected void handleMessage(BnyerMessage message) throws Exception {
} }
@Override @Override
protected void handleMaxRetriesExceeded(MqRecordMessage message) { protected void handleMaxRetriesExceeded(BnyerMessage message) {
} }
@Override @Override
protected boolean filter(MqRecordMessage message) { protected boolean filter(BnyerMessage message) {
return super.handleMsgRepeat(message); return super.handleMsgRepeat(message);
} }

20
bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java

@ -3,7 +3,7 @@ package com.bnyer.img.listener;
import com.bnyer.common.core.dto.AddUserVipRecordDto; import com.bnyer.common.core.dto.AddUserVipRecordDto;
import com.bnyer.common.core.utils.bean.EntityConvertUtil; import com.bnyer.common.core.utils.bean.EntityConvertUtil;
import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.domain.BnyerMessage;
import com.bnyer.common.rocketmq.domain.img.AddUserVipRecordMessage; import com.bnyer.common.rocketmq.domain.img.AddUserVipRecordMessage;
import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler;
import com.bnyer.img.service.UserVipRecordService; import com.bnyer.img.service.UserVipRecordService;
@ -21,13 +21,13 @@ import org.springframework.stereotype.Component;
@Slf4j @Slf4j
@Component @Component
@RocketMQMessageListener(topic = RocketMqTopic.VIP_RECORD_CREATE_TOPIC,consumerGroup = RocketMqTopic.VIP_RECORD_CREATE_TOPIC) @RocketMQMessageListener(topic = RocketMqTopic.VIP_RECORD_CREATE_TOPIC,consumerGroup = RocketMqTopic.VIP_RECORD_CREATE_TOPIC)
public class VipRecordCreateConsumer extends EnhanceMessageHandler<MqRecordMessage> implements RocketMQListener<MqRecordMessage> { public class VipRecordCreateConsumer extends EnhanceMessageHandler implements RocketMQListener<BnyerMessage> {
@Autowired @Autowired
private UserVipRecordService userVipRecordService; private UserVipRecordService userVipRecordService;
@Override @Override
public void onMessage(MqRecordMessage message) { public void onMessage(BnyerMessage message) {
super.dispatchMessage(message); super.dispatchMessage(message);
AddUserVipRecordMessage addUserVipRecordMessage = message.getObject(AddUserVipRecordMessage.class); AddUserVipRecordMessage addUserVipRecordMessage = message.getObject(AddUserVipRecordMessage.class);
AddUserVipRecordDto addUserVipRecordDto = EntityConvertUtil.copy(addUserVipRecordMessage, AddUserVipRecordDto.class); AddUserVipRecordDto addUserVipRecordDto = EntityConvertUtil.copy(addUserVipRecordMessage, AddUserVipRecordDto.class);
@ -36,23 +36,15 @@ public class VipRecordCreateConsumer extends EnhanceMessageHandler<MqRecordMessa
} }
@Override @Override
protected void handleMessage(MqRecordMessage message) throws Exception { protected void handleMessage(BnyerMessage message) throws Exception {
//发送返回队列,告知已经处理成功,完成最终一致性
//TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免
// 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的
super.sendReturnSuccessMessage(message);
} }
@Override @Override
protected void handleMaxRetriesExceeded(MqRecordMessage message) { protected void handleMaxRetriesExceeded(BnyerMessage message) {
//发送返回队列,告知已经超过最大重试次数,消费失败,需人工处理该记录
//TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免
// 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的
super.sendReturnFailsMessage(message);
} }
@Override @Override
protected boolean filter(MqRecordMessage message) { protected boolean filter(BnyerMessage message) {
return super.handleMsgRepeat(message); return super.handleMsgRepeat(message);
} }

4
bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/ImgMqMessageRecordService.java

@ -2,7 +2,7 @@ package com.bnyer.img.service;
import com.bnyer.common.core.domain.ImgMqMessageRecord; import com.bnyer.common.core.domain.ImgMqMessageRecord;
import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.domain.BnyerMessage;
/** /**
* @author :WXC * @author :WXC
@ -47,7 +47,7 @@ public interface ImgMqMessageRecordService {
* @param message * @param message
* @return * @return
*/ */
ImgMqMessageRecord saveMessageRecord(MqRecordMessage message); ImgMqMessageRecord saveMessageRecord(BnyerMessage message);
/** /**
* 修改消息记录状态 * 修改消息记录状态

64
bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/ImgMqMessageRecordServiceImpl.java

@ -1,8 +1,6 @@
package com.bnyer.img.service.impl; package com.bnyer.img.service.impl;
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
@ -12,7 +10,7 @@ import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.core.utils.DateUtils; import com.bnyer.common.core.utils.DateUtils;
import com.bnyer.common.rocketmq.constant.RocketMqConstant; import com.bnyer.common.rocketmq.constant.RocketMqConstant;
import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.domain.BnyerMessage;
import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate;
import com.bnyer.img.mapper.ImgMqMessageRecordMapper; import com.bnyer.img.mapper.ImgMqMessageRecordMapper;
import com.bnyer.img.service.ImgMqMessageRecordService; import com.bnyer.img.service.ImgMqMessageRecordService;
@ -68,18 +66,18 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService
* @param <T> * @param <T>
*/ */
@NotNull @NotNull
private <T> MqRecordMessage getMqRecordMessage(String topic, String tag, T message) { private <T> BnyerMessage getMqRecordMessage(String topic, String tag, T message) {
String buildTopic = rocketMQEnhanceTemplate.reBuildTopic(topic); String buildTopic = rocketMQEnhanceTemplate.reBuildTopic(topic);
String buildReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(RocketMqTopic.IMG_RETURN_MSG_TOPIC); String buildReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(RocketMqTopic.IMG_RETURN_MSG_TOPIC);
MqRecordMessage mqRecordMessage = new MqRecordMessage(); BnyerMessage bnyerMessage = new BnyerMessage();
mqRecordMessage.setTopic(buildTopic); bnyerMessage.setTopic(buildTopic);
mqRecordMessage.setTag(tag); bnyerMessage.setTag(tag);
mqRecordMessage.setConsumerGroupName(buildTopic); bnyerMessage.setConsumerGroupName(buildTopic);
mqRecordMessage.setMessageKey(applicationName + "-"+ IdUtil.getSnowflakeNextIdStr()); bnyerMessage.setMessageKey(applicationName + "-"+ IdUtil.getSnowflakeNextIdStr());
mqRecordMessage.setContent(JSON.toJSONString(message)); bnyerMessage.setContent(JSON.toJSONString(message));
mqRecordMessage.setSource(applicationName); bnyerMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(buildReturnTopic); bnyerMessage.setReturnTopic(buildReturnTopic);
return mqRecordMessage; return bnyerMessage;
} }
/** /**
@ -97,17 +95,17 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
//保存消息记录 //保存消息记录
log.info("消息发送中,开始入库本地消息记录表"); log.info("消息发送中,开始入库本地消息记录表");
MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); BnyerMessage bnyerMessage = getMqRecordMessage(topic, tag, message);
ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(mqRecordMessage); ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(bnyerMessage);
mqRecordMessage.setId(imgMqMessageRecord.getId()); bnyerMessage.setId(imgMqMessageRecord.getId());
//发消息 //发消息
Message<MqRecordMessage> sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, mqRecordMessage.getMessageKey()).build(); Message<BnyerMessage> sendMessage = MessageBuilder.withPayload(bnyerMessage).setHeader(RocketMQHeaders.KEYS, bnyerMessage.getMessageKey()).build();
SendResult sendResult; SendResult sendResult;
try { try {
sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage); sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage);
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),mqRecordMessage.getTag(), JSONObject.toJSON(mqRecordMessage), JSONObject.toJSON(sendResult)); log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", bnyerMessage.getTopic(), bnyerMessage.getTag(), JSONObject.toJSON(bnyerMessage), JSONObject.toJSON(sendResult));
} catch (Exception e) { } catch (Exception e) {
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),mqRecordMessage.getTag(), JSONObject.toJSON(mqRecordMessage), e.getMessage()); log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", bnyerMessage.getTopic(), bnyerMessage.getTag(), JSONObject.toJSON(bnyerMessage), e.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
editMessageRecordStatus(imgMqMessageRecord.getId(), imgMqMessageRecord.getStatus(),e.getMessage()); editMessageRecordStatus(imgMqMessageRecord.getId(), imgMqMessageRecord.getStatus(),e.getMessage());
// throw new RuntimeException(e); // throw new RuntimeException(e);
@ -124,18 +122,18 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
//保存消息记录 //保存消息记录
log.info("消息发送中,开始入库本地消息记录表"); log.info("消息发送中,开始入库本地消息记录表");
MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); BnyerMessage bnyerMessage = getMqRecordMessage(topic, tag, message);
ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(mqRecordMessage); ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(bnyerMessage);
mqRecordMessage.setId(imgMqMessageRecord.getId()); bnyerMessage.setId(imgMqMessageRecord.getId());
//发消息 //发消息
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(bnyerMessage).build(), new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage), JSON.toJSONString(sendResult)); log.info("消息发送成功,topic:{},tag:{},message:{},result:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage), JSON.toJSONString(sendResult));
} }
@Override @Override
public void onException(Throwable throwable) { public void onException(Throwable throwable) {
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage),throwable.getMessage()); log.error("消息发送失败,topic:{},tag:{},message:{},error:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),throwable.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
editMessageRecordStatus(imgMqMessageRecord.getId(),imgMqMessageRecord.getStatus(),throwable.getMessage()); editMessageRecordStatus(imgMqMessageRecord.getId(),imgMqMessageRecord.getStatus(),throwable.getMessage());
} }
@ -152,18 +150,18 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
//保存消息记录 //保存消息记录
log.info("消息发送中,开始入库本地消息记录表"); log.info("消息发送中,开始入库本地消息记录表");
MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); BnyerMessage bnyerMessage = getMqRecordMessage(topic, tag, message);
ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(mqRecordMessage); ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(bnyerMessage);
mqRecordMessage.setId(imgMqMessageRecord.getId()); bnyerMessage.setId(imgMqMessageRecord.getId());
//发消息 //发消息
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(bnyerMessage).build(), new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage),JSON.toJSONString(sendResult)); log.info("消息发送成功,topic:{},tag:{},message:{},result:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),JSON.toJSONString(sendResult));
} }
@Override @Override
public void onException(Throwable throwable) { public void onException(Throwable throwable) {
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage),throwable.getMessage()); log.error("消息发送失败,topic:{},tag:{},message:{},error:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),throwable.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
editMessageRecordStatus(imgMqMessageRecord.getId(),imgMqMessageRecord.getStatus(),throwable.getMessage()); editMessageRecordStatus(imgMqMessageRecord.getId(),imgMqMessageRecord.getStatus(),throwable.getMessage());
} }
@ -175,7 +173,7 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService
* @param message * @param message
*/ */
@Transactional @Transactional
public ImgMqMessageRecord saveMessageRecord(MqRecordMessage message){ public ImgMqMessageRecord saveMessageRecord(BnyerMessage message){
ImgMqMessageRecord imgMqMessageRecord = new ImgMqMessageRecord(); ImgMqMessageRecord imgMqMessageRecord = new ImgMqMessageRecord();
imgMqMessageRecord.setStatus(EnumMessageStatus.PROCESS); imgMqMessageRecord.setStatus(EnumMessageStatus.PROCESS);
imgMqMessageRecord.setConsumerGroupName(message.getConsumerGroupName()); imgMqMessageRecord.setConsumerGroupName(message.getConsumerGroupName());
@ -216,7 +214,7 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService
String content = imgMqMessageRecord.getContent(); String content = imgMqMessageRecord.getContent();
JSONObject jsonObject = JSON.parseObject(content); JSONObject jsonObject = JSON.parseObject(content);
String msg = jsonObject.getString("content"); String msg = jsonObject.getString("content");
MqRecordMessage message = new MqRecordMessage(); BnyerMessage message = new BnyerMessage();
message.setId(imgMqMessageRecord.getId()); message.setId(imgMqMessageRecord.getId());
message.setReturnTopic(imgMqMessageRecord.getReturnTopic()); message.setReturnTopic(imgMqMessageRecord.getReturnTopic());
message.setContent(msg); message.setContent(msg);

4
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/BnyerOrderApplication.java

@ -5,6 +5,7 @@ import com.bnyer.common.security.annotation.EnableRyFeignClients;
import com.bnyer.common.swagger.annotation.EnableCustomSwagger2; import com.bnyer.common.swagger.annotation.EnableCustomSwagger2;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableAsync;
/** /**
@ -15,7 +16,8 @@ import org.springframework.scheduling.annotation.EnableAsync;
@EnableCustomConfig @EnableCustomConfig
@EnableCustomSwagger2 @EnableCustomSwagger2
@EnableRyFeignClients @EnableRyFeignClients
@SpringBootApplication(scanBasePackages = { "com.bnyer.common"}) @ComponentScan(basePackages = "com.bnyer")
@SpringBootApplication
@EnableAsync @EnableAsync
public class BnyerOrderApplication public class BnyerOrderApplication
{ {

12
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/OrderReturnMessageConsumer.java

@ -1,7 +1,7 @@
package com.bnyer.order.listener; package com.bnyer.order.listener;
import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.domain.BnyerMessage;
import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler;
import com.bnyer.order.service.OrderMqMessageRecordService; import com.bnyer.order.service.OrderMqMessageRecordService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -19,29 +19,29 @@ import javax.annotation.Resource;
@Slf4j @Slf4j
@Component @Component
@RocketMQMessageListener(topic = RocketMqTopic.ORDER_RETURN_MSG_TOPIC,consumerGroup = RocketMqTopic.ORDER_RETURN_MSG_TOPIC) @RocketMQMessageListener(topic = RocketMqTopic.ORDER_RETURN_MSG_TOPIC,consumerGroup = RocketMqTopic.ORDER_RETURN_MSG_TOPIC)
public class OrderReturnMessageConsumer extends EnhanceMessageHandler<MqRecordMessage> implements RocketMQListener<MqRecordMessage> { public class OrderReturnMessageConsumer extends EnhanceMessageHandler implements RocketMQListener<BnyerMessage> {
@Resource @Resource
private OrderMqMessageRecordService orderMqMessageRecordService; private OrderMqMessageRecordService orderMqMessageRecordService;
@Override @Override
public void onMessage(MqRecordMessage message) { public void onMessage(BnyerMessage message) {
super.dispatchMessage(message); super.dispatchMessage(message);
orderMqMessageRecordService.editMessageRecordStatus(message.getId(),message.getStatus(),null); orderMqMessageRecordService.editMessageRecordStatus(message.getId(),message.getStatus(),null);
} }
@Override @Override
protected void handleMessage(MqRecordMessage message) throws Exception { protected void handleMessage(BnyerMessage message) throws Exception {
} }
@Override @Override
protected void handleMaxRetriesExceeded(MqRecordMessage message) { protected void handleMaxRetriesExceeded(BnyerMessage message) {
} }
@Override @Override
protected boolean filter(MqRecordMessage message) { protected boolean filter(BnyerMessage message) {
return super.handleMsgRepeat(message); return super.handleMsgRepeat(message);
} }

12
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderCancelConsumer.java

@ -2,7 +2,7 @@ package com.bnyer.order.listener.vip;
import com.bnyer.common.rocketmq.constant.RocketMqTag; import com.bnyer.common.rocketmq.constant.RocketMqTag;
import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.domain.BnyerMessage;
import com.bnyer.common.rocketmq.domain.order.VipOrderCancelMessage; import com.bnyer.common.rocketmq.domain.order.VipOrderCancelMessage;
import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler;
import com.bnyer.order.bean.dto.CancelVipOrderDto; import com.bnyer.order.bean.dto.CancelVipOrderDto;
@ -22,13 +22,13 @@ import org.springframework.stereotype.Component;
@Slf4j @Slf4j
@Component @Component
@RocketMQMessageListener(topic = RocketMqTopic.ORDER_CANCEL_TOPIC,selectorExpression = RocketMqTag.ORDER_VIP_TAG,consumerGroup = RocketMqTopic.ORDER_CANCEL_TOPIC) @RocketMQMessageListener(topic = RocketMqTopic.ORDER_CANCEL_TOPIC,selectorExpression = RocketMqTag.ORDER_VIP_TAG,consumerGroup = RocketMqTopic.ORDER_CANCEL_TOPIC)
public class VipOrderCancelConsumer extends EnhanceMessageHandler<MqRecordMessage> implements RocketMQListener<MqRecordMessage> { public class VipOrderCancelConsumer extends EnhanceMessageHandler implements RocketMQListener<BnyerMessage> {
@Autowired @Autowired
private VipOrderService vipOrderService; private VipOrderService vipOrderService;
@Override @Override
public void onMessage(MqRecordMessage message) { public void onMessage(BnyerMessage message) {
super.dispatchMessage(message); super.dispatchMessage(message);
VipOrderCancelMessage orderCancelMessage = message.getObject(VipOrderCancelMessage.class); VipOrderCancelMessage orderCancelMessage = message.getObject(VipOrderCancelMessage.class);
// 如果订单未支付的话,将订单设为取消状态 // 如果订单未支付的话,将订单设为取消状态
@ -39,17 +39,17 @@ public class VipOrderCancelConsumer extends EnhanceMessageHandler<MqRecordMessag
} }
@Override @Override
protected void handleMessage(MqRecordMessage message) throws Exception { protected void handleMessage(BnyerMessage message) throws Exception {
} }
@Override @Override
protected void handleMaxRetriesExceeded(MqRecordMessage message) { protected void handleMaxRetriesExceeded(BnyerMessage message) {
log.error("消息消费失败,可扩展执行后续处理"); log.error("消息消费失败,可扩展执行后续处理");
} }
@Override @Override
protected boolean filter(MqRecordMessage message) { protected boolean filter(BnyerMessage message) {
return super.handleMsgRepeat(message); return super.handleMsgRepeat(message);
} }

22
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java

@ -1,12 +1,10 @@
package com.bnyer.order.listener.vip; package com.bnyer.order.listener.vip;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.bnyer.common.core.domain.VipOrder; import com.bnyer.common.core.domain.VipOrder;
import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.rocketmq.constant.RocketMqTag; import com.bnyer.common.rocketmq.constant.RocketMqTag;
import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.domain.BnyerMessage;
import com.bnyer.common.rocketmq.domain.order.VipOrderPayNotifyMessage; import com.bnyer.common.rocketmq.domain.order.VipOrderPayNotifyMessage;
import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler;
import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate;
@ -29,7 +27,7 @@ import java.util.Objects;
@Component @Component
@RocketMQMessageListener(topic = RocketMqTopic.ORDER_PAY_NOTIFY_TOPIC,selectorExpression = RocketMqTag.ORDER_VIP_TAG @RocketMQMessageListener(topic = RocketMqTopic.ORDER_PAY_NOTIFY_TOPIC,selectorExpression = RocketMqTag.ORDER_VIP_TAG
,consumerGroup = RocketMqTopic.ORDER_PAY_NOTIFY_TOPIC) ,consumerGroup = RocketMqTopic.ORDER_PAY_NOTIFY_TOPIC)
public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler<MqRecordMessage> implements RocketMQListener<MqRecordMessage> { public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler implements RocketMQListener<BnyerMessage> {
@Resource @Resource
private VipOrderMapper vipOrderMapper; private VipOrderMapper vipOrderMapper;
@ -41,7 +39,7 @@ public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler<MqRecordMes
private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;
@Override @Override
public void onMessage(MqRecordMessage message) { public void onMessage(BnyerMessage message) {
super.dispatchMessage(message); super.dispatchMessage(message);
//修改订单并添加会员记录 //修改订单并添加会员记录
VipOrderPayNotifyMessage vipOrderPayNotifyMessage = message.getObject(VipOrderPayNotifyMessage.class); VipOrderPayNotifyMessage vipOrderPayNotifyMessage = message.getObject(VipOrderPayNotifyMessage.class);
@ -56,23 +54,15 @@ public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler<MqRecordMes
} }
@Override @Override
protected void handleMessage(MqRecordMessage message) throws Exception { protected void handleMessage(BnyerMessage message) throws Exception {
//发送返回队列,告知已经处理成功,完成最终一致性
//TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免
// 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的
super.sendReturnSuccessMessage(message);
} }
@Override @Override
protected void handleMaxRetriesExceeded(MqRecordMessage message) { protected void handleMaxRetriesExceeded(BnyerMessage message) {
//发送返回队列,告知已经超过最大重试次数,消费失败,需人工处理该记录
//TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免
// 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的
super.sendReturnFailsMessage(message);
} }
@Override @Override
protected boolean filter(MqRecordMessage message) { protected boolean filter(BnyerMessage message) {
return super.handleMsgRepeat(message); return super.handleMsgRepeat(message);
} }

4
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/OrderMqMessageRecordService.java

@ -2,7 +2,7 @@ package com.bnyer.order.service;
import com.bnyer.common.core.domain.OrderMqMessageRecord; import com.bnyer.common.core.domain.OrderMqMessageRecord;
import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.domain.BnyerMessage;
/** /**
* @author :WXC * @author :WXC
@ -47,7 +47,7 @@ public interface OrderMqMessageRecordService {
* @param message * @param message
* @return * @return
*/ */
OrderMqMessageRecord saveMessageRecord(MqRecordMessage message); OrderMqMessageRecord saveMessageRecord(BnyerMessage message);
/** /**
* 修改消息记录状态 * 修改消息记录状态

60
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java

@ -10,7 +10,7 @@ import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.core.utils.DateUtils; import com.bnyer.common.core.utils.DateUtils;
import com.bnyer.common.rocketmq.constant.RocketMqConstant; import com.bnyer.common.rocketmq.constant.RocketMqConstant;
import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.domain.BnyerMessage;
import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate;
import com.bnyer.order.mapper.OrderMqMessageRecordMapper; import com.bnyer.order.mapper.OrderMqMessageRecordMapper;
import com.bnyer.order.service.OrderMqMessageRecordService; import com.bnyer.order.service.OrderMqMessageRecordService;
@ -65,18 +65,18 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ
* @param <T> * @param <T>
*/ */
@NotNull @NotNull
private <T> MqRecordMessage getMqRecordMessage(String topic, String tag, T message) { private <T> BnyerMessage getMqRecordMessage(String topic, String tag, T message) {
String buildTopic = rocketMQEnhanceTemplate.reBuildTopic(topic); String buildTopic = rocketMQEnhanceTemplate.reBuildTopic(topic);
String buildReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(RocketMqTopic.ORDER_RETURN_MSG_TOPIC); String buildReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(RocketMqTopic.ORDER_RETURN_MSG_TOPIC);
MqRecordMessage mqRecordMessage = new MqRecordMessage(); BnyerMessage bnyerMessage = new BnyerMessage();
mqRecordMessage.setTopic(buildTopic); bnyerMessage.setTopic(buildTopic);
mqRecordMessage.setTag(tag); bnyerMessage.setTag(tag);
mqRecordMessage.setConsumerGroupName(buildTopic); bnyerMessage.setConsumerGroupName(buildTopic);
mqRecordMessage.setMessageKey(applicationName + "-" + IdUtil.getSnowflakeNextIdStr()); bnyerMessage.setMessageKey(applicationName + "-" + IdUtil.getSnowflakeNextIdStr());
mqRecordMessage.setContent(JSON.toJSONString(message)); bnyerMessage.setContent(JSON.toJSONString(message));
mqRecordMessage.setSource(applicationName); bnyerMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(buildReturnTopic); bnyerMessage.setReturnTopic(buildReturnTopic);
return mqRecordMessage; return bnyerMessage;
} }
/** /**
@ -94,16 +94,16 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
//保存消息记录 //保存消息记录
log.info("消息发送中,开始入库本地消息记录表"); log.info("消息发送中,开始入库本地消息记录表");
MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); BnyerMessage bnyerMessage = getMqRecordMessage(topic, tag, message);
OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(mqRecordMessage); OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(bnyerMessage);
//发消息 //发消息
Message<MqRecordMessage> sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, mqRecordMessage.getMessageKey()).build(); Message<BnyerMessage> sendMessage = MessageBuilder.withPayload(bnyerMessage).setHeader(RocketMQHeaders.KEYS, bnyerMessage.getMessageKey()).build();
SendResult sendResult; SendResult sendResult;
try { try {
sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage); sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage);
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),mqRecordMessage.getTag(), JSONObject.toJSON(mqRecordMessage), JSONObject.toJSON(sendResult)); log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", bnyerMessage.getTopic(), bnyerMessage.getTag(), JSONObject.toJSON(bnyerMessage), JSONObject.toJSON(sendResult));
} catch (Exception e) { } catch (Exception e) {
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),mqRecordMessage.getTag(), JSONObject.toJSON(mqRecordMessage), e.getMessage()); log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", bnyerMessage.getTopic(), bnyerMessage.getTag(), JSONObject.toJSON(bnyerMessage), e.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
editMessageRecordStatus(orderMqMessageRecord.getId(), orderMqMessageRecord.getStatus(),e.getMessage()); editMessageRecordStatus(orderMqMessageRecord.getId(), orderMqMessageRecord.getStatus(),e.getMessage());
// throw new RuntimeException(e); // throw new RuntimeException(e);
@ -120,18 +120,18 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
//保存消息记录 //保存消息记录
log.info("消息发送中,开始入库本地消息记录表"); log.info("消息发送中,开始入库本地消息记录表");
MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); BnyerMessage bnyerMessage = getMqRecordMessage(topic, tag, message);
OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(mqRecordMessage); OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(bnyerMessage);
mqRecordMessage.setId(orderMqMessageRecord.getId()); bnyerMessage.setId(orderMqMessageRecord.getId());
//发消息 //发消息
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(bnyerMessage).build(), new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage), JSON.toJSONString(sendResult)); log.info("消息发送成功,topic:{},tag:{},message:{},result:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage), JSON.toJSONString(sendResult));
} }
@Override @Override
public void onException(Throwable throwable) { public void onException(Throwable throwable) {
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage),throwable.getMessage()); log.error("消息发送失败,topic:{},tag:{},message:{},error:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),throwable.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
editMessageRecordStatus(orderMqMessageRecord.getId(),orderMqMessageRecord.getStatus(),throwable.getMessage()); editMessageRecordStatus(orderMqMessageRecord.getId(),orderMqMessageRecord.getStatus(),throwable.getMessage());
} }
@ -148,18 +148,18 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
//保存消息记录 //保存消息记录
log.info("消息发送中,开始入库本地消息记录表"); log.info("消息发送中,开始入库本地消息记录表");
MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); BnyerMessage bnyerMessage = getMqRecordMessage(topic, tag, message);
OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(mqRecordMessage); OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(bnyerMessage);
mqRecordMessage.setId(orderMqMessageRecord.getId()); bnyerMessage.setId(orderMqMessageRecord.getId());
//发消息 //发消息
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(bnyerMessage).build(), new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage),JSON.toJSONString(sendResult)); log.info("消息发送成功,topic:{},tag:{},message:{},result:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),JSON.toJSONString(sendResult));
} }
@Override @Override
public void onException(Throwable throwable) { public void onException(Throwable throwable) {
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage),throwable.getMessage()); log.error("消息发送失败,topic:{},tag:{},message:{},error:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),throwable.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
editMessageRecordStatus(orderMqMessageRecord.getId(),orderMqMessageRecord.getStatus(),throwable.getMessage()); editMessageRecordStatus(orderMqMessageRecord.getId(),orderMqMessageRecord.getStatus(),throwable.getMessage());
} }
@ -172,7 +172,7 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ
* @param <T> * @param <T>
*/ */
@Transactional @Transactional
public OrderMqMessageRecord saveMessageRecord(MqRecordMessage message){ public OrderMqMessageRecord saveMessageRecord(BnyerMessage message){
OrderMqMessageRecord orderMqMessageRecord = new OrderMqMessageRecord(); OrderMqMessageRecord orderMqMessageRecord = new OrderMqMessageRecord();
orderMqMessageRecord.setStatus(EnumMessageStatus.PROCESS); orderMqMessageRecord.setStatus(EnumMessageStatus.PROCESS);
orderMqMessageRecord.setConsumerGroupName(message.getConsumerGroupName()); orderMqMessageRecord.setConsumerGroupName(message.getConsumerGroupName());
@ -213,7 +213,7 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ
String content = orderMqMessageRecord.getContent(); String content = orderMqMessageRecord.getContent();
JSONObject jsonObject = JSON.parseObject(content); JSONObject jsonObject = JSON.parseObject(content);
String msg = jsonObject.getString("content"); String msg = jsonObject.getString("content");
MqRecordMessage message = new MqRecordMessage(); BnyerMessage message = new BnyerMessage();
message.setId(orderMqMessageRecord.getId()); message.setId(orderMqMessageRecord.getId());
message.setReturnTopic(orderMqMessageRecord.getReturnTopic()); message.setReturnTopic(orderMqMessageRecord.getReturnTopic());
message.setContent(msg); message.setContent(msg);

4
bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/BnyerPayApplication.java

@ -5,6 +5,7 @@ import com.bnyer.common.security.annotation.EnableRyFeignClients;
import com.bnyer.common.swagger.annotation.EnableCustomSwagger2; import com.bnyer.common.swagger.annotation.EnableCustomSwagger2;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableAsync;
/** /**
@ -15,7 +16,8 @@ import org.springframework.scheduling.annotation.EnableAsync;
@EnableCustomConfig @EnableCustomConfig
@EnableCustomSwagger2 @EnableCustomSwagger2
@EnableRyFeignClients @EnableRyFeignClients
@SpringBootApplication(scanBasePackages = { "com.bnyer" }) @ComponentScan(basePackages = "com.bnyer")
@SpringBootApplication
@EnableAsync @EnableAsync
public class BnyerPayApplication public class BnyerPayApplication
{ {

12
bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/listener/PayReturnMessageConsumer.java

@ -1,7 +1,7 @@
package com.bnyer.pay.listener; package com.bnyer.pay.listener;
import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.domain.BnyerMessage;
import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler;
import com.bnyer.pay.service.PayMqMessageRecordService; import com.bnyer.pay.service.PayMqMessageRecordService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -19,29 +19,29 @@ import javax.annotation.Resource;
@Slf4j @Slf4j
@Component @Component
@RocketMQMessageListener(topic = RocketMqTopic.PAY_RETURN_MSG_TOPIC,consumerGroup = RocketMqTopic.PAY_RETURN_MSG_TOPIC) @RocketMQMessageListener(topic = RocketMqTopic.PAY_RETURN_MSG_TOPIC,consumerGroup = RocketMqTopic.PAY_RETURN_MSG_TOPIC)
public class PayReturnMessageConsumer extends EnhanceMessageHandler<MqRecordMessage> implements RocketMQListener<MqRecordMessage> { public class PayReturnMessageConsumer extends EnhanceMessageHandler implements RocketMQListener<BnyerMessage> {
@Resource @Resource
private PayMqMessageRecordService payMqMessageRecordService; private PayMqMessageRecordService payMqMessageRecordService;
@Override @Override
public void onMessage(MqRecordMessage message) { public void onMessage(BnyerMessage message) {
super.dispatchMessage(message); super.dispatchMessage(message);
payMqMessageRecordService.editMessageRecordStatus(message.getId(),message.getStatus(),null); payMqMessageRecordService.editMessageRecordStatus(message.getId(),message.getStatus(),null);
} }
@Override @Override
protected void handleMessage(MqRecordMessage message) throws Exception { protected void handleMessage(BnyerMessage message) throws Exception {
} }
@Override @Override
protected void handleMaxRetriesExceeded(MqRecordMessage message) { protected void handleMaxRetriesExceeded(BnyerMessage message) {
} }
@Override @Override
protected boolean filter(MqRecordMessage message) { protected boolean filter(BnyerMessage message) {
return super.handleMsgRepeat(message); return super.handleMsgRepeat(message);
} }

4
bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/PayMqMessageRecordService.java

@ -2,7 +2,7 @@ package com.bnyer.pay.service;
import com.bnyer.common.core.domain.PayMqMessageRecord; import com.bnyer.common.core.domain.PayMqMessageRecord;
import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.domain.BnyerMessage;
/** /**
* @author :WXC * @author :WXC
@ -47,7 +47,7 @@ public interface PayMqMessageRecordService {
* @param message * @param message
* @return * @return
*/ */
PayMqMessageRecord saveMessageRecord(MqRecordMessage message); PayMqMessageRecord saveMessageRecord(BnyerMessage message);
/** /**
* 修改消息记录状态 * 修改消息记录状态

62
bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayMqMessageRecordServiceImpl.java

@ -10,7 +10,7 @@ import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.core.utils.DateUtils; import com.bnyer.common.core.utils.DateUtils;
import com.bnyer.common.rocketmq.constant.RocketMqConstant; import com.bnyer.common.rocketmq.constant.RocketMqConstant;
import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.MqRecordMessage; import com.bnyer.common.rocketmq.domain.BnyerMessage;
import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate;
import com.bnyer.pay.mapper.PayMqMessageRecordMapper; import com.bnyer.pay.mapper.PayMqMessageRecordMapper;
import com.bnyer.pay.service.PayMqMessageRecordService; import com.bnyer.pay.service.PayMqMessageRecordService;
@ -66,18 +66,18 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService
* @param <T> * @param <T>
*/ */
@NotNull @NotNull
private <T> MqRecordMessage getMqRecordMessage(String topic, String tag, T message) { private <T> BnyerMessage getMqRecordMessage(String topic, String tag, T message) {
String buildTopic = rocketMQEnhanceTemplate.reBuildTopic(topic); String buildTopic = rocketMQEnhanceTemplate.reBuildTopic(topic);
String buildReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(RocketMqTopic.PAY_RETURN_MSG_TOPIC); String buildReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(RocketMqTopic.PAY_RETURN_MSG_TOPIC);
MqRecordMessage mqRecordMessage = new MqRecordMessage(); BnyerMessage bnyerMessage = new BnyerMessage();
mqRecordMessage.setTopic(buildTopic); bnyerMessage.setTopic(buildTopic);
mqRecordMessage.setTag(tag); bnyerMessage.setTag(tag);
mqRecordMessage.setConsumerGroupName(buildTopic); bnyerMessage.setConsumerGroupName(buildTopic);
mqRecordMessage.setMessageKey(applicationName + "-" + IdUtil.getSnowflakeNextIdStr()); bnyerMessage.setMessageKey(applicationName + "-" + IdUtil.getSnowflakeNextIdStr());
mqRecordMessage.setContent(JSON.toJSONString(message)); bnyerMessage.setContent(JSON.toJSONString(message));
mqRecordMessage.setSource(applicationName); bnyerMessage.setSource(applicationName);
mqRecordMessage.setReturnTopic(buildReturnTopic); bnyerMessage.setReturnTopic(buildReturnTopic);
return mqRecordMessage; return bnyerMessage;
} }
/** /**
@ -95,17 +95,17 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
//保存消息记录 //保存消息记录
log.info("消息发送中,开始入库本地消息记录表"); log.info("消息发送中,开始入库本地消息记录表");
MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); BnyerMessage bnyerMessage = getMqRecordMessage(topic, tag, message);
PayMqMessageRecord payMqMessageRecord = saveMessageRecord(mqRecordMessage); PayMqMessageRecord payMqMessageRecord = saveMessageRecord(bnyerMessage);
mqRecordMessage.setId(payMqMessageRecord.getId()); bnyerMessage.setId(payMqMessageRecord.getId());
//发消息 //发消息
Message<MqRecordMessage> sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, mqRecordMessage.getMessageKey()).build(); Message<BnyerMessage> sendMessage = MessageBuilder.withPayload(bnyerMessage).setHeader(RocketMQHeaders.KEYS, bnyerMessage.getMessageKey()).build();
SendResult sendResult; SendResult sendResult;
try { try {
sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage); sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage);
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),mqRecordMessage.getTag(), JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", bnyerMessage.getTopic(), bnyerMessage.getTag(), JSONObject.toJSON(message), JSONObject.toJSON(sendResult));
} catch (Exception e) { } catch (Exception e) {
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),mqRecordMessage.getTag(), JSONObject.toJSON(message), e.getMessage()); log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", bnyerMessage.getTopic(), bnyerMessage.getTag(), JSONObject.toJSON(message), e.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
editMessageRecordStatus(payMqMessageRecord.getId(), payMqMessageRecord.getStatus(),e.getMessage()); editMessageRecordStatus(payMqMessageRecord.getId(), payMqMessageRecord.getStatus(),e.getMessage());
// throw new RuntimeException(e); // throw new RuntimeException(e);
@ -122,18 +122,18 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
//保存消息记录 //保存消息记录
log.info("消息发送中,开始入库本地消息记录表"); log.info("消息发送中,开始入库本地消息记录表");
MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); BnyerMessage bnyerMessage = getMqRecordMessage(topic, tag, message);
PayMqMessageRecord payMqMessageRecord = saveMessageRecord(mqRecordMessage); PayMqMessageRecord payMqMessageRecord = saveMessageRecord(bnyerMessage);
mqRecordMessage.setId(payMqMessageRecord.getId()); bnyerMessage.setId(payMqMessageRecord.getId());
//发消息 //发消息
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(bnyerMessage).build(), new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage), JSON.toJSONString(sendResult)); log.info("消息发送成功,topic:{},tag:{},message:{},result:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage), JSON.toJSONString(sendResult));
} }
@Override @Override
public void onException(Throwable throwable) { public void onException(Throwable throwable) {
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage),throwable.getMessage()); log.error("消息发送失败,topic:{},tag:{},message:{},error:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),throwable.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
editMessageRecordStatus(payMqMessageRecord.getId(),payMqMessageRecord.getStatus(),throwable.getMessage()); editMessageRecordStatus(payMqMessageRecord.getId(),payMqMessageRecord.getStatus(),throwable.getMessage());
} }
@ -150,18 +150,18 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate();
//保存消息记录 //保存消息记录
log.info("消息发送中,开始入库本地消息记录表"); log.info("消息发送中,开始入库本地消息记录表");
MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); BnyerMessage bnyerMessage = getMqRecordMessage(topic, tag, message);
PayMqMessageRecord payMqMessageRecord = saveMessageRecord(mqRecordMessage); PayMqMessageRecord payMqMessageRecord = saveMessageRecord(bnyerMessage);
mqRecordMessage.setId(payMqMessageRecord.getId()); bnyerMessage.setId(payMqMessageRecord.getId());
//发消息 //发消息
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(bnyerMessage).build(), new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage),JSON.toJSONString(sendResult)); log.info("消息发送成功,topic:{},tag:{},message:{},result:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),JSON.toJSONString(sendResult));
} }
@Override @Override
public void onException(Throwable throwable) { public void onException(Throwable throwable) {
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage),throwable.getMessage()); log.error("消息发送失败,topic:{},tag:{},message:{},error:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),throwable.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
editMessageRecordStatus(payMqMessageRecord.getId(),payMqMessageRecord.getStatus(),throwable.getMessage()); editMessageRecordStatus(payMqMessageRecord.getId(),payMqMessageRecord.getStatus(),throwable.getMessage());
} }
@ -173,7 +173,7 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService
* @param message * @param message
*/ */
@Transactional @Transactional
public PayMqMessageRecord saveMessageRecord(MqRecordMessage message){ public PayMqMessageRecord saveMessageRecord(BnyerMessage message){
PayMqMessageRecord payMqMessageRecord = new PayMqMessageRecord(); PayMqMessageRecord payMqMessageRecord = new PayMqMessageRecord();
payMqMessageRecord.setStatus(EnumMessageStatus.PROCESS); payMqMessageRecord.setStatus(EnumMessageStatus.PROCESS);
payMqMessageRecord.setConsumerGroupName(message.getConsumerGroupName()); payMqMessageRecord.setConsumerGroupName(message.getConsumerGroupName());
@ -214,7 +214,7 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService
String content = payMqMessageRecord.getContent(); String content = payMqMessageRecord.getContent();
JSONObject jsonObject = JSON.parseObject(content); JSONObject jsonObject = JSON.parseObject(content);
String msg = jsonObject.getString("content"); String msg = jsonObject.getString("content");
MqRecordMessage message = new MqRecordMessage(); BnyerMessage message = new BnyerMessage();
message.setId(payMqMessageRecord.getId()); message.setId(payMqMessageRecord.getId());
message.setReturnTopic(payMqMessageRecord.getReturnTopic()); message.setReturnTopic(payMqMessageRecord.getReturnTopic());
message.setContent(msg); message.setContent(msg);

Loading…
Cancel
Save