diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/MqRecordMessage.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/BnyerMessage.java similarity index 93% rename from bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/MqRecordMessage.java rename to bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/BnyerMessage.java index 44821ab..5f70673 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/MqRecordMessage.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/BnyerMessage.java @@ -14,7 +14,7 @@ import lombok.Setter; @Getter @Setter @NoArgsConstructor -public class MqRecordMessage extends BaseMessage { +public class BnyerMessage extends BaseMessage { /** * 消息记录表id */ diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java index 7908243..c321cb5 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java @@ -2,10 +2,11 @@ package com.bnyer.common.rocketmq.handle; import com.alibaba.fastjson.JSONObject; import com.bnyer.common.core.enums.EnumMessageStatus; +import com.bnyer.common.core.utils.StringUtils; import com.bnyer.common.rocketmq.config.RepeatConsumerConfig; import com.bnyer.common.rocketmq.constant.RocketMqConstant; import com.bnyer.common.rocketmq.domain.BaseMessage; -import com.bnyer.common.rocketmq.domain.MqRecordMessage; +import com.bnyer.common.rocketmq.domain.BnyerMessage; import com.bnyer.common.rocketmq.domain.RepeatElement; import com.bnyer.common.rocketmq.persist.IPersist; import com.bnyer.common.rocketmq.strategy.NormalRepeatStrategy; @@ -29,7 +30,7 @@ import java.util.function.Function; * @description : 消息模板抽象类,父类提供公共模板方法 */ @Slf4j -public abstract class EnhanceMessageHandler { +public abstract class EnhanceMessageHandler { /** * 默认重试次数 */ @@ -65,14 +66,14 @@ public abstract class EnhanceMessageHandler { * @param message 待处理消息 * @throws Exception 消费异常 */ - protected abstract void handleMessage(T message) throws Exception; + protected abstract void handleMessage(BnyerMessage message) throws Exception; /** * 超过重试次数消息,需要启用isRetry * * @param message 待处理消息 */ - protected abstract void handleMaxRetriesExceeded(T message); + protected abstract void handleMaxRetriesExceeded(BnyerMessage message); /** @@ -80,7 +81,7 @@ public abstract class EnhanceMessageHandler { * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ - protected abstract boolean filter(T message); + protected abstract boolean filter(BnyerMessage message); /** * 是否异常时重复发送 @@ -117,7 +118,7 @@ public abstract class EnhanceMessageHandler { /** * 使用模板模式构建消息消费框架,可自由扩展或删减 */ - public void dispatchMessage(T message) { + public void dispatchMessage(BnyerMessage message) { // 基础日志记录被父类处理了 log.info("消费者收到消息[{}]", JSONObject.toJSON(message)); if (filter(message)) { @@ -127,6 +128,8 @@ public abstract class EnhanceMessageHandler { //超过最大重试次数时调用子类方法处理 if (message.getRetryTimes() > getMaxRetryTimes()) { handleMaxRetriesExceeded(message); + //发送失败返回通知 + sendReturnFailsMessage(message); return; } IPersist persist = repeatConsumerConfig.getPersist(); @@ -137,6 +140,8 @@ public abstract class EnhanceMessageHandler { try { long now = System.currentTimeMillis(); handleMessage(message); + //发送成功返回通知 + sendReturnSuccessMessage(message); long costTime = System.currentTimeMillis() - now; log.info("消息{}消费成功,耗时[{}ms],修改缓存状态为已消费:{}", message.getMessageKey(),costTime, repeatElement); persist.markConsumed(repeatElement, repeatConsumerConfig.getRecordReserveMinutes()); @@ -161,7 +166,7 @@ public abstract class EnhanceMessageHandler { * 消息重试 * @param message */ - protected void handleRetry(T message) { + protected void handleRetry(BnyerMessage message) { // 获取子类RocketMQMessageListener注解拿到topic和tag RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class); if (annotation == null) { @@ -196,7 +201,7 @@ public abstract class EnhanceMessageHandler { * @param message * @return */ - protected Boolean handleMsgRepeat(final T message) { + protected Boolean handleMsgRepeat(final BnyerMessage message) { RepeatConsumerStrategy strategy = new NormalRepeatStrategy(); Function repeatKeyFunction = baseMessage -> repeatMessageKey(message); if (repeatConsumerConfig.getRepeatStrategy() == RepeatConsumerConfig.REPEAT_STRATEGY_CONSUME_LATER) { @@ -209,7 +214,7 @@ public abstract class EnhanceMessageHandler { /** * 默认拿消息key 作为去重的标识,子类可复写该方法自定义去重标识 */ - protected String repeatMessageKey(T message) { + protected String repeatMessageKey(BnyerMessage message) { return message.getMessageKey(); } @@ -217,22 +222,32 @@ public abstract class EnhanceMessageHandler { * 返回成功通知 * @param message */ - protected void sendReturnSuccessMessage(MqRecordMessage message){ - MqRecordMessage returnMessage = new MqRecordMessage(); - returnMessage.setStatus(EnumMessageStatus.SUCCESS); - returnMessage.setId(message.getId()); - rocketMQEnhanceTemplate.sendAsyncMsg(message.getReturnTopic(),null,returnMessage); + protected void sendReturnSuccessMessage(BnyerMessage message){ + //发送返回队列,告知已经处理成功,完成最终一致性 + //TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免 + // 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的 + if (StringUtils.isNotBlank(message.getReturnTopic())){ + BnyerMessage returnMessage = new BnyerMessage(); + returnMessage.setStatus(EnumMessageStatus.SUCCESS); + returnMessage.setId(message.getId()); + rocketMQEnhanceTemplate.sendAsyncMsg(message.getReturnTopic(),null,returnMessage); + } } /** * 返回失败通知 * @param message */ - protected void sendReturnFailsMessage(MqRecordMessage message){ - MqRecordMessage returnMessage = new MqRecordMessage(); - returnMessage.setStatus(EnumMessageStatus.FAILS); - returnMessage.setId(message.getId()); - rocketMQEnhanceTemplate.sendAsyncMsg(message.getReturnTopic(),null,returnMessage); + protected void sendReturnFailsMessage(BnyerMessage message){ + //发送返回队列,告知已经超过最大重试次数,消费失败,需人工处理该记录 + //TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免 + // 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的 + if (StringUtils.isNotBlank(message.getReturnTopic())){ + BnyerMessage returnMessage = new BnyerMessage(); + returnMessage.setStatus(EnumMessageStatus.FAILS); + returnMessage.setId(message.getId()); + rocketMQEnhanceTemplate.sendAsyncMsg(message.getReturnTopic(),null,returnMessage); + } } } diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java index 4c07283..9cf27e3 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/template/RocketMQEnhanceTemplate.java @@ -5,7 +5,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.bnyer.common.rocketmq.config.RocketEnhanceProperties; import com.bnyer.common.rocketmq.constant.RocketMqConstant; -import com.bnyer.common.rocketmq.domain.MqRecordMessage; +import com.bnyer.common.rocketmq.domain.BnyerMessage; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -84,8 +84,8 @@ public class RocketMQEnhanceTemplate { public SendResult send(String topic, String tag, T message) { // 设置业务键,此处根据公共的参数进行处理 // 更多的其它基础业务处理... - MqRecordMessage mqRecordMessage = buildBaseMessage(topic, tag, message); - Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, mqRecordMessage.getMessageKey()).build(); + BnyerMessage bnyerMessage = buildBaseMessage(topic, tag, message); + Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, bnyerMessage.getMessageKey()).build(); SendResult sendResult; try { sendResult = template.syncSend(buildDestination(topic,tag), sendMessage); @@ -93,7 +93,7 @@ public class RocketMQEnhanceTemplate { throw new RuntimeException(e); } // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集 - log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(mqRecordMessage), JSONObject.toJSON(sendResult)); + log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(bnyerMessage), JSONObject.toJSON(sendResult)); return sendResult; } @@ -107,15 +107,15 @@ public class RocketMQEnhanceTemplate { * @param */ public SendResult send(String topic, String tag, T message, int delayLevel) { - MqRecordMessage mqRecordMessage = buildBaseMessage(topic, tag, message); - Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, mqRecordMessage.getMessageKey()).build(); + BnyerMessage bnyerMessage = buildBaseMessage(topic, tag, message); + Message sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, bnyerMessage.getMessageKey()).build(); SendResult sendResult; try { sendResult = template.syncSend(buildDestination(topic,tag), sendMessage, RocketMqConstant.TIME_OUT, delayLevel); } catch (Exception e) { throw new RuntimeException(e); } - log.info("topic:[{}]tag:[{}]延迟等级[{}]消息[{}]发送结果[{}]", topic,tag, delayLevel, JSONObject.toJSON(mqRecordMessage), JSONObject.toJSON(sendResult)); + log.info("topic:[{}]tag:[{}]延迟等级[{}]消息[{}]发送结果[{}]", bnyerMessage.getTopic(),bnyerMessage.getTag(), delayLevel, JSONObject.toJSON(bnyerMessage), JSONObject.toJSON(sendResult)); return sendResult; } @@ -124,17 +124,17 @@ public class RocketMQEnhanceTemplate { * (适合对响应时间敏感的业务场景) */ public void sendAsyncMsg(String topic, String tag, T message) { - MqRecordMessage mqRecordMessage = buildBaseMessage(topic, tag, message); + BnyerMessage bnyerMessage = buildBaseMessage(topic, tag, message); template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 处理消息发送成功逻辑 - log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(mqRecordMessage), JSON.toJSONString(sendResult)); + log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",bnyerMessage.getTopic(),bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage), JSON.toJSONString(sendResult)); } @Override public void onException(Throwable throwable) { // 处理消息发送失败逻辑 - log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage()); + log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",bnyerMessage.getTopic(),bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),throwable.getMessage()); } }); } @@ -144,16 +144,16 @@ public class RocketMQEnhanceTemplate { * (适合对响应时间敏感的业务场景) */ public void sendAsyncMsg(String topic, String tag, T message, int delayLevel) { - MqRecordMessage mqRecordMessage = buildBaseMessage(topic, tag, message); + BnyerMessage bnyerMessage = buildBaseMessage(topic, tag, message); template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 处理消息发送成功逻辑 - log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(mqRecordMessage),JSON.toJSONString(sendResult)); + log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",bnyerMessage.getTopic(),bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),JSON.toJSONString(sendResult)); } @Override public void onException(Throwable throwable) { - log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage()); + log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",bnyerMessage.getTopic(),bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),throwable.getMessage()); } }, RocketMqConstant.TIME_OUT,delayLevel); } @@ -167,9 +167,9 @@ public class RocketMQEnhanceTemplate { * @param */ public boolean sendTransactionalMsg(String topic, String tag,String arg, T message) { - MqRecordMessage mqRecordMessage = buildBaseMessage(topic, tag, message); + BnyerMessage bnyerMessage = buildBaseMessage(topic, tag, message); String destination = buildDestination(topic, tag); - TransactionSendResult sendResult = template.sendMessageInTransaction(destination, MessageBuilder.withPayload(mqRecordMessage).build(), arg); + TransactionSendResult sendResult = template.sendMessageInTransaction(destination, MessageBuilder.withPayload(bnyerMessage).build(), arg); log.info("Send transaction msg result: " + sendResult); return sendResult.getSendStatus() == SendStatus.SEND_OK; } @@ -182,16 +182,16 @@ public class RocketMQEnhanceTemplate { * @param message * @param */ - private MqRecordMessage buildBaseMessage(String topic,String tag,T message){ - MqRecordMessage mqRecordMessage = new MqRecordMessage(); + private BnyerMessage buildBaseMessage(String topic, String tag, T message){ + BnyerMessage bnyerMessage = new BnyerMessage(); String buildTopic = reBuildTopic(topic); - mqRecordMessage.setSource(applicationName); - mqRecordMessage.setMessageKey(applicationName + "-" + IdUtil.getSnowflakeNextIdStr()); - mqRecordMessage.setTopic(buildTopic); - mqRecordMessage.setTag(tag); - mqRecordMessage.setConsumerGroupName(buildTopic); - mqRecordMessage.setContent(JSON.toJSONString(message)); - return mqRecordMessage; + bnyerMessage.setSource(applicationName); + bnyerMessage.setMessageKey(applicationName + "-" + IdUtil.getSnowflakeNextIdStr()); + bnyerMessage.setTopic(buildTopic); + bnyerMessage.setTag(tag); + bnyerMessage.setConsumerGroupName(buildTopic); + bnyerMessage.setContent(JSON.toJSONString(message)); + return bnyerMessage; } } diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/BnyerImgApplication.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/BnyerImgApplication.java index 8b5767f..9292a64 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/BnyerImgApplication.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/BnyerImgApplication.java @@ -5,6 +5,7 @@ import com.bnyer.common.security.annotation.EnableRyFeignClients; import com.bnyer.common.swagger.annotation.EnableCustomSwagger2; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.ComponentScan; import org.springframework.scheduling.annotation.EnableAsync; /** @@ -15,7 +16,8 @@ import org.springframework.scheduling.annotation.EnableAsync; @EnableCustomConfig @EnableCustomSwagger2 @EnableRyFeignClients -@SpringBootApplication(scanBasePackages = { "com.bnyer.common"}) +@ComponentScan(basePackages = "com.bnyer") +@SpringBootApplication @EnableAsync public class BnyerImgApplication { diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java index 497fabd..cbb406c 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/GoldRewardConsumer.java @@ -1,18 +1,15 @@ package com.bnyer.img.listener; -import com.alibaba.fastjson.JSON; import com.bnyer.common.core.domain.FhUser; import com.bnyer.common.core.domain.GoldLog; import com.bnyer.common.core.domain.TiktokUser; import com.bnyer.common.core.domain.WxUser; -import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.core.enums.GoldEnum; import com.bnyer.common.core.utils.StringUtils; import com.bnyer.common.rocketmq.constant.RocketMqTopic; -import com.bnyer.common.rocketmq.domain.MqRecordMessage; +import com.bnyer.common.rocketmq.domain.BnyerMessage; import com.bnyer.common.rocketmq.domain.img.GoldRewardMessage; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; -import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import com.bnyer.img.service.FhUserService; import com.bnyer.img.service.GoldLogService; import com.bnyer.img.service.TiktokUserService; @@ -23,8 +20,6 @@ import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import javax.annotation.Resource; - /** * @author :penny * @Date :2023/05/17 @@ -33,7 +28,7 @@ import javax.annotation.Resource; @Slf4j @Component @RocketMQMessageListener(topic = RocketMqTopic.GOLD_REWARD_TOPIC,consumerGroup = RocketMqTopic.GOLD_REWARD_TOPIC) -public class GoldRewardConsumer extends EnhanceMessageHandler implements RocketMQListener { +public class GoldRewardConsumer extends EnhanceMessageHandler implements RocketMQListener { @Autowired private TiktokUserService tiktokUserService; @@ -48,7 +43,7 @@ public class GoldRewardConsumer extends EnhanceMessageHandler i private GoldLogService goldLogService; @Override - public void onMessage(MqRecordMessage message) { + public void onMessage(BnyerMessage message) { super.dispatchMessage(message); GoldRewardMessage goldReward = message.getObject(GoldRewardMessage.class); if(StringUtils.isNotNull(goldReward.getPlatform())){ @@ -133,23 +128,15 @@ public class GoldRewardConsumer extends EnhanceMessageHandler i } @Override - protected void handleMessage(MqRecordMessage message) throws Exception { - //发送返回队列,告知已经处理成功,完成最终一致性 - //TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免 - // 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的 - super.sendReturnSuccessMessage(message); + protected void handleMessage(BnyerMessage message) throws Exception { } @Override - protected void handleMaxRetriesExceeded(MqRecordMessage message) { - //发送返回队列,告知已经超过最大重试次数,消费失败,需人工处理该记录 - //TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免 - // 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的 - super.sendReturnFailsMessage(message); + protected void handleMaxRetriesExceeded(BnyerMessage message) { } @Override - protected boolean filter(MqRecordMessage message) { + protected boolean filter(BnyerMessage message) { return super.handleMsgRepeat(message); } diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/ImgReturnMessageConsumer.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/ImgReturnMessageConsumer.java index d9584ec..ba1ef07 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/ImgReturnMessageConsumer.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/ImgReturnMessageConsumer.java @@ -1,7 +1,7 @@ package com.bnyer.img.listener; import com.bnyer.common.rocketmq.constant.RocketMqTopic; -import com.bnyer.common.rocketmq.domain.MqRecordMessage; +import com.bnyer.common.rocketmq.domain.BnyerMessage; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; import com.bnyer.img.service.ImgMqMessageRecordService; import lombok.extern.slf4j.Slf4j; @@ -19,27 +19,27 @@ import javax.annotation.Resource; @Slf4j @Component @RocketMQMessageListener(topic = RocketMqTopic.IMG_RETURN_MSG_TOPIC,consumerGroup = RocketMqTopic.IMG_RETURN_MSG_TOPIC) -public class ImgReturnMessageConsumer extends EnhanceMessageHandler implements RocketMQListener { +public class ImgReturnMessageConsumer extends EnhanceMessageHandler implements RocketMQListener { @Resource private ImgMqMessageRecordService imgMqMessageRecordService; @Override - public void onMessage(MqRecordMessage message) { + public void onMessage(BnyerMessage message) { super.dispatchMessage(message); imgMqMessageRecordService.editMessageRecordStatus(message.getId(),message.getStatus(),null); } @Override - protected void handleMessage(MqRecordMessage message) throws Exception { + protected void handleMessage(BnyerMessage message) throws Exception { } @Override - protected void handleMaxRetriesExceeded(MqRecordMessage message) { + protected void handleMaxRetriesExceeded(BnyerMessage message) { } @Override - protected boolean filter(MqRecordMessage message) { + protected boolean filter(BnyerMessage message) { return super.handleMsgRepeat(message); } diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java index 9673f03..f5c0e5d 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java @@ -3,7 +3,7 @@ package com.bnyer.img.listener; import com.bnyer.common.core.dto.AddUserVipRecordDto; import com.bnyer.common.core.utils.bean.EntityConvertUtil; import com.bnyer.common.rocketmq.constant.RocketMqTopic; -import com.bnyer.common.rocketmq.domain.MqRecordMessage; +import com.bnyer.common.rocketmq.domain.BnyerMessage; import com.bnyer.common.rocketmq.domain.img.AddUserVipRecordMessage; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; import com.bnyer.img.service.UserVipRecordService; @@ -21,13 +21,13 @@ import org.springframework.stereotype.Component; @Slf4j @Component @RocketMQMessageListener(topic = RocketMqTopic.VIP_RECORD_CREATE_TOPIC,consumerGroup = RocketMqTopic.VIP_RECORD_CREATE_TOPIC) -public class VipRecordCreateConsumer extends EnhanceMessageHandler implements RocketMQListener { +public class VipRecordCreateConsumer extends EnhanceMessageHandler implements RocketMQListener { @Autowired private UserVipRecordService userVipRecordService; @Override - public void onMessage(MqRecordMessage message) { + public void onMessage(BnyerMessage message) { super.dispatchMessage(message); AddUserVipRecordMessage addUserVipRecordMessage = message.getObject(AddUserVipRecordMessage.class); AddUserVipRecordDto addUserVipRecordDto = EntityConvertUtil.copy(addUserVipRecordMessage, AddUserVipRecordDto.class); @@ -36,23 +36,15 @@ public class VipRecordCreateConsumer extends EnhanceMessageHandler */ @NotNull - private MqRecordMessage getMqRecordMessage(String topic, String tag, T message) { + private BnyerMessage getMqRecordMessage(String topic, String tag, T message) { String buildTopic = rocketMQEnhanceTemplate.reBuildTopic(topic); String buildReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(RocketMqTopic.IMG_RETURN_MSG_TOPIC); - MqRecordMessage mqRecordMessage = new MqRecordMessage(); - mqRecordMessage.setTopic(buildTopic); - mqRecordMessage.setTag(tag); - mqRecordMessage.setConsumerGroupName(buildTopic); - mqRecordMessage.setMessageKey(applicationName + "-"+ IdUtil.getSnowflakeNextIdStr()); - mqRecordMessage.setContent(JSON.toJSONString(message)); - mqRecordMessage.setSource(applicationName); - mqRecordMessage.setReturnTopic(buildReturnTopic); - return mqRecordMessage; + BnyerMessage bnyerMessage = new BnyerMessage(); + bnyerMessage.setTopic(buildTopic); + bnyerMessage.setTag(tag); + bnyerMessage.setConsumerGroupName(buildTopic); + bnyerMessage.setMessageKey(applicationName + "-"+ IdUtil.getSnowflakeNextIdStr()); + bnyerMessage.setContent(JSON.toJSONString(message)); + bnyerMessage.setSource(applicationName); + bnyerMessage.setReturnTopic(buildReturnTopic); + return bnyerMessage; } /** @@ -97,17 +95,17 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); //保存消息记录 log.info("消息发送中,开始入库本地消息记录表"); - MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); - ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(mqRecordMessage); - mqRecordMessage.setId(imgMqMessageRecord.getId()); + BnyerMessage bnyerMessage = getMqRecordMessage(topic, tag, message); + ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(bnyerMessage); + bnyerMessage.setId(imgMqMessageRecord.getId()); //发消息 - Message sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, mqRecordMessage.getMessageKey()).build(); + Message sendMessage = MessageBuilder.withPayload(bnyerMessage).setHeader(RocketMQHeaders.KEYS, bnyerMessage.getMessageKey()).build(); SendResult sendResult; try { sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage); - log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),mqRecordMessage.getTag(), JSONObject.toJSON(mqRecordMessage), JSONObject.toJSON(sendResult)); + log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", bnyerMessage.getTopic(), bnyerMessage.getTag(), JSONObject.toJSON(bnyerMessage), JSONObject.toJSON(sendResult)); } catch (Exception e) { - log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),mqRecordMessage.getTag(), JSONObject.toJSON(mqRecordMessage), e.getMessage()); + log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", bnyerMessage.getTopic(), bnyerMessage.getTag(), JSONObject.toJSON(bnyerMessage), e.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); editMessageRecordStatus(imgMqMessageRecord.getId(), imgMqMessageRecord.getStatus(),e.getMessage()); // throw new RuntimeException(e); @@ -124,18 +122,18 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); //保存消息记录 log.info("消息发送中,开始入库本地消息记录表"); - MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); - ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(mqRecordMessage); - mqRecordMessage.setId(imgMqMessageRecord.getId()); + BnyerMessage bnyerMessage = getMqRecordMessage(topic, tag, message); + ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(bnyerMessage); + bnyerMessage.setId(imgMqMessageRecord.getId()); //发消息 - rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { + rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(bnyerMessage).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage), JSON.toJSONString(sendResult)); + log.info("消息发送成功,topic:{},tag:{},message:{},result:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage), JSON.toJSONString(sendResult)); } @Override public void onException(Throwable throwable) { - log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage),throwable.getMessage()); + log.error("消息发送失败,topic:{},tag:{},message:{},error:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),throwable.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); editMessageRecordStatus(imgMqMessageRecord.getId(),imgMqMessageRecord.getStatus(),throwable.getMessage()); } @@ -152,18 +150,18 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); //保存消息记录 log.info("消息发送中,开始入库本地消息记录表"); - MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); - ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(mqRecordMessage); - mqRecordMessage.setId(imgMqMessageRecord.getId()); + BnyerMessage bnyerMessage = getMqRecordMessage(topic, tag, message); + ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(bnyerMessage); + bnyerMessage.setId(imgMqMessageRecord.getId()); //发消息 - rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { + rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(bnyerMessage).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage),JSON.toJSONString(sendResult)); + log.info("消息发送成功,topic:{},tag:{},message:{},result:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),JSON.toJSONString(sendResult)); } @Override public void onException(Throwable throwable) { - log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage),throwable.getMessage()); + log.error("消息发送失败,topic:{},tag:{},message:{},error:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),throwable.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); editMessageRecordStatus(imgMqMessageRecord.getId(),imgMqMessageRecord.getStatus(),throwable.getMessage()); } @@ -175,7 +173,7 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService * @param message */ @Transactional - public ImgMqMessageRecord saveMessageRecord(MqRecordMessage message){ + public ImgMqMessageRecord saveMessageRecord(BnyerMessage message){ ImgMqMessageRecord imgMqMessageRecord = new ImgMqMessageRecord(); imgMqMessageRecord.setStatus(EnumMessageStatus.PROCESS); imgMqMessageRecord.setConsumerGroupName(message.getConsumerGroupName()); @@ -216,7 +214,7 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService String content = imgMqMessageRecord.getContent(); JSONObject jsonObject = JSON.parseObject(content); String msg = jsonObject.getString("content"); - MqRecordMessage message = new MqRecordMessage(); + BnyerMessage message = new BnyerMessage(); message.setId(imgMqMessageRecord.getId()); message.setReturnTopic(imgMqMessageRecord.getReturnTopic()); message.setContent(msg); diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/BnyerOrderApplication.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/BnyerOrderApplication.java index 7d1d92e..563c331 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/BnyerOrderApplication.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/BnyerOrderApplication.java @@ -5,6 +5,7 @@ import com.bnyer.common.security.annotation.EnableRyFeignClients; import com.bnyer.common.swagger.annotation.EnableCustomSwagger2; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.ComponentScan; import org.springframework.scheduling.annotation.EnableAsync; /** @@ -15,7 +16,8 @@ import org.springframework.scheduling.annotation.EnableAsync; @EnableCustomConfig @EnableCustomSwagger2 @EnableRyFeignClients -@SpringBootApplication(scanBasePackages = { "com.bnyer.common"}) +@ComponentScan(basePackages = "com.bnyer") +@SpringBootApplication @EnableAsync public class BnyerOrderApplication { diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/OrderReturnMessageConsumer.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/OrderReturnMessageConsumer.java index f3b7de0..c40ead5 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/OrderReturnMessageConsumer.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/OrderReturnMessageConsumer.java @@ -1,7 +1,7 @@ package com.bnyer.order.listener; import com.bnyer.common.rocketmq.constant.RocketMqTopic; -import com.bnyer.common.rocketmq.domain.MqRecordMessage; +import com.bnyer.common.rocketmq.domain.BnyerMessage; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; import com.bnyer.order.service.OrderMqMessageRecordService; import lombok.extern.slf4j.Slf4j; @@ -19,29 +19,29 @@ import javax.annotation.Resource; @Slf4j @Component @RocketMQMessageListener(topic = RocketMqTopic.ORDER_RETURN_MSG_TOPIC,consumerGroup = RocketMqTopic.ORDER_RETURN_MSG_TOPIC) -public class OrderReturnMessageConsumer extends EnhanceMessageHandler implements RocketMQListener { +public class OrderReturnMessageConsumer extends EnhanceMessageHandler implements RocketMQListener { @Resource private OrderMqMessageRecordService orderMqMessageRecordService; @Override - public void onMessage(MqRecordMessage message) { + public void onMessage(BnyerMessage message) { super.dispatchMessage(message); orderMqMessageRecordService.editMessageRecordStatus(message.getId(),message.getStatus(),null); } @Override - protected void handleMessage(MqRecordMessage message) throws Exception { + protected void handleMessage(BnyerMessage message) throws Exception { } @Override - protected void handleMaxRetriesExceeded(MqRecordMessage message) { + protected void handleMaxRetriesExceeded(BnyerMessage message) { } @Override - protected boolean filter(MqRecordMessage message) { + protected boolean filter(BnyerMessage message) { return super.handleMsgRepeat(message); } diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderCancelConsumer.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderCancelConsumer.java index 9c708c2..abd06b2 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderCancelConsumer.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderCancelConsumer.java @@ -2,7 +2,7 @@ package com.bnyer.order.listener.vip; import com.bnyer.common.rocketmq.constant.RocketMqTag; import com.bnyer.common.rocketmq.constant.RocketMqTopic; -import com.bnyer.common.rocketmq.domain.MqRecordMessage; +import com.bnyer.common.rocketmq.domain.BnyerMessage; import com.bnyer.common.rocketmq.domain.order.VipOrderCancelMessage; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; import com.bnyer.order.bean.dto.CancelVipOrderDto; @@ -22,13 +22,13 @@ import org.springframework.stereotype.Component; @Slf4j @Component @RocketMQMessageListener(topic = RocketMqTopic.ORDER_CANCEL_TOPIC,selectorExpression = RocketMqTag.ORDER_VIP_TAG,consumerGroup = RocketMqTopic.ORDER_CANCEL_TOPIC) -public class VipOrderCancelConsumer extends EnhanceMessageHandler implements RocketMQListener { +public class VipOrderCancelConsumer extends EnhanceMessageHandler implements RocketMQListener { @Autowired private VipOrderService vipOrderService; @Override - public void onMessage(MqRecordMessage message) { + public void onMessage(BnyerMessage message) { super.dispatchMessage(message); VipOrderCancelMessage orderCancelMessage = message.getObject(VipOrderCancelMessage.class); // 如果订单未支付的话,将订单设为取消状态 @@ -39,17 +39,17 @@ public class VipOrderCancelConsumer extends EnhanceMessageHandler implements RocketMQListener { +public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler implements RocketMQListener { @Resource private VipOrderMapper vipOrderMapper; @@ -41,7 +39,7 @@ public class VipOrderPayNotifyConsumer extends EnhanceMessageHandler */ @NotNull - private MqRecordMessage getMqRecordMessage(String topic, String tag, T message) { + private BnyerMessage getMqRecordMessage(String topic, String tag, T message) { String buildTopic = rocketMQEnhanceTemplate.reBuildTopic(topic); String buildReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(RocketMqTopic.ORDER_RETURN_MSG_TOPIC); - MqRecordMessage mqRecordMessage = new MqRecordMessage(); - mqRecordMessage.setTopic(buildTopic); - mqRecordMessage.setTag(tag); - mqRecordMessage.setConsumerGroupName(buildTopic); - mqRecordMessage.setMessageKey(applicationName + "-" + IdUtil.getSnowflakeNextIdStr()); - mqRecordMessage.setContent(JSON.toJSONString(message)); - mqRecordMessage.setSource(applicationName); - mqRecordMessage.setReturnTopic(buildReturnTopic); - return mqRecordMessage; + BnyerMessage bnyerMessage = new BnyerMessage(); + bnyerMessage.setTopic(buildTopic); + bnyerMessage.setTag(tag); + bnyerMessage.setConsumerGroupName(buildTopic); + bnyerMessage.setMessageKey(applicationName + "-" + IdUtil.getSnowflakeNextIdStr()); + bnyerMessage.setContent(JSON.toJSONString(message)); + bnyerMessage.setSource(applicationName); + bnyerMessage.setReturnTopic(buildReturnTopic); + return bnyerMessage; } /** @@ -94,16 +94,16 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); //保存消息记录 log.info("消息发送中,开始入库本地消息记录表"); - MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); - OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(mqRecordMessage); + BnyerMessage bnyerMessage = getMqRecordMessage(topic, tag, message); + OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(bnyerMessage); //发消息 - Message sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, mqRecordMessage.getMessageKey()).build(); + Message sendMessage = MessageBuilder.withPayload(bnyerMessage).setHeader(RocketMQHeaders.KEYS, bnyerMessage.getMessageKey()).build(); SendResult sendResult; try { sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage); - log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),mqRecordMessage.getTag(), JSONObject.toJSON(mqRecordMessage), JSONObject.toJSON(sendResult)); + log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", bnyerMessage.getTopic(), bnyerMessage.getTag(), JSONObject.toJSON(bnyerMessage), JSONObject.toJSON(sendResult)); } catch (Exception e) { - log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),mqRecordMessage.getTag(), JSONObject.toJSON(mqRecordMessage), e.getMessage()); + log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", bnyerMessage.getTopic(), bnyerMessage.getTag(), JSONObject.toJSON(bnyerMessage), e.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); editMessageRecordStatus(orderMqMessageRecord.getId(), orderMqMessageRecord.getStatus(),e.getMessage()); // throw new RuntimeException(e); @@ -120,18 +120,18 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); //保存消息记录 log.info("消息发送中,开始入库本地消息记录表"); - MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); - OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(mqRecordMessage); - mqRecordMessage.setId(orderMqMessageRecord.getId()); + BnyerMessage bnyerMessage = getMqRecordMessage(topic, tag, message); + OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(bnyerMessage); + bnyerMessage.setId(orderMqMessageRecord.getId()); //发消息 - rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { + rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(bnyerMessage).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage), JSON.toJSONString(sendResult)); + log.info("消息发送成功,topic:{},tag:{},message:{},result:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage), JSON.toJSONString(sendResult)); } @Override public void onException(Throwable throwable) { - log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage),throwable.getMessage()); + log.error("消息发送失败,topic:{},tag:{},message:{},error:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),throwable.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); editMessageRecordStatus(orderMqMessageRecord.getId(),orderMqMessageRecord.getStatus(),throwable.getMessage()); } @@ -148,18 +148,18 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); //保存消息记录 log.info("消息发送中,开始入库本地消息记录表"); - MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); - OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(mqRecordMessage); - mqRecordMessage.setId(orderMqMessageRecord.getId()); + BnyerMessage bnyerMessage = getMqRecordMessage(topic, tag, message); + OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(bnyerMessage); + bnyerMessage.setId(orderMqMessageRecord.getId()); //发消息 - rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { + rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(bnyerMessage).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage),JSON.toJSONString(sendResult)); + log.info("消息发送成功,topic:{},tag:{},message:{},result:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),JSON.toJSONString(sendResult)); } @Override public void onException(Throwable throwable) { - log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage),throwable.getMessage()); + log.error("消息发送失败,topic:{},tag:{},message:{},error:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),throwable.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); editMessageRecordStatus(orderMqMessageRecord.getId(),orderMqMessageRecord.getStatus(),throwable.getMessage()); } @@ -172,7 +172,7 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ * @param */ @Transactional - public OrderMqMessageRecord saveMessageRecord(MqRecordMessage message){ + public OrderMqMessageRecord saveMessageRecord(BnyerMessage message){ OrderMqMessageRecord orderMqMessageRecord = new OrderMqMessageRecord(); orderMqMessageRecord.setStatus(EnumMessageStatus.PROCESS); orderMqMessageRecord.setConsumerGroupName(message.getConsumerGroupName()); @@ -213,7 +213,7 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ String content = orderMqMessageRecord.getContent(); JSONObject jsonObject = JSON.parseObject(content); String msg = jsonObject.getString("content"); - MqRecordMessage message = new MqRecordMessage(); + BnyerMessage message = new BnyerMessage(); message.setId(orderMqMessageRecord.getId()); message.setReturnTopic(orderMqMessageRecord.getReturnTopic()); message.setContent(msg); diff --git a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/BnyerPayApplication.java b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/BnyerPayApplication.java index b3112d7..a6d90ed 100644 --- a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/BnyerPayApplication.java +++ b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/BnyerPayApplication.java @@ -5,17 +5,19 @@ import com.bnyer.common.security.annotation.EnableRyFeignClients; import com.bnyer.common.swagger.annotation.EnableCustomSwagger2; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.ComponentScan; import org.springframework.scheduling.annotation.EnableAsync; /** * 支付模块 - * + * * @author penny */ @EnableCustomConfig @EnableCustomSwagger2 @EnableRyFeignClients -@SpringBootApplication(scanBasePackages = { "com.bnyer" }) +@ComponentScan(basePackages = "com.bnyer") +@SpringBootApplication @EnableAsync public class BnyerPayApplication { diff --git a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/listener/PayReturnMessageConsumer.java b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/listener/PayReturnMessageConsumer.java index 0532ba8..f6c51f5 100644 --- a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/listener/PayReturnMessageConsumer.java +++ b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/listener/PayReturnMessageConsumer.java @@ -1,7 +1,7 @@ package com.bnyer.pay.listener; import com.bnyer.common.rocketmq.constant.RocketMqTopic; -import com.bnyer.common.rocketmq.domain.MqRecordMessage; +import com.bnyer.common.rocketmq.domain.BnyerMessage; import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; import com.bnyer.pay.service.PayMqMessageRecordService; import lombok.extern.slf4j.Slf4j; @@ -19,29 +19,29 @@ import javax.annotation.Resource; @Slf4j @Component @RocketMQMessageListener(topic = RocketMqTopic.PAY_RETURN_MSG_TOPIC,consumerGroup = RocketMqTopic.PAY_RETURN_MSG_TOPIC) -public class PayReturnMessageConsumer extends EnhanceMessageHandler implements RocketMQListener { +public class PayReturnMessageConsumer extends EnhanceMessageHandler implements RocketMQListener { @Resource private PayMqMessageRecordService payMqMessageRecordService; @Override - public void onMessage(MqRecordMessage message) { + public void onMessage(BnyerMessage message) { super.dispatchMessage(message); payMqMessageRecordService.editMessageRecordStatus(message.getId(),message.getStatus(),null); } @Override - protected void handleMessage(MqRecordMessage message) throws Exception { + protected void handleMessage(BnyerMessage message) throws Exception { } @Override - protected void handleMaxRetriesExceeded(MqRecordMessage message) { + protected void handleMaxRetriesExceeded(BnyerMessage message) { } @Override - protected boolean filter(MqRecordMessage message) { + protected boolean filter(BnyerMessage message) { return super.handleMsgRepeat(message); } diff --git a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/PayMqMessageRecordService.java b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/PayMqMessageRecordService.java index ff2a036..ba5f9df 100644 --- a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/PayMqMessageRecordService.java +++ b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/PayMqMessageRecordService.java @@ -2,7 +2,7 @@ package com.bnyer.pay.service; import com.bnyer.common.core.domain.PayMqMessageRecord; import com.bnyer.common.core.enums.EnumMessageStatus; -import com.bnyer.common.rocketmq.domain.MqRecordMessage; +import com.bnyer.common.rocketmq.domain.BnyerMessage; /** * @author :WXC @@ -47,7 +47,7 @@ public interface PayMqMessageRecordService { * @param message * @return */ - PayMqMessageRecord saveMessageRecord(MqRecordMessage message); + PayMqMessageRecord saveMessageRecord(BnyerMessage message); /** * 修改消息记录状态 diff --git a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayMqMessageRecordServiceImpl.java b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayMqMessageRecordServiceImpl.java index 5b448f8..234ea03 100644 --- a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayMqMessageRecordServiceImpl.java +++ b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayMqMessageRecordServiceImpl.java @@ -10,7 +10,7 @@ import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.core.utils.DateUtils; import com.bnyer.common.rocketmq.constant.RocketMqConstant; import com.bnyer.common.rocketmq.constant.RocketMqTopic; -import com.bnyer.common.rocketmq.domain.MqRecordMessage; +import com.bnyer.common.rocketmq.domain.BnyerMessage; import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; import com.bnyer.pay.mapper.PayMqMessageRecordMapper; import com.bnyer.pay.service.PayMqMessageRecordService; @@ -66,18 +66,18 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService * @param */ @NotNull - private MqRecordMessage getMqRecordMessage(String topic, String tag, T message) { + private BnyerMessage getMqRecordMessage(String topic, String tag, T message) { String buildTopic = rocketMQEnhanceTemplate.reBuildTopic(topic); String buildReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(RocketMqTopic.PAY_RETURN_MSG_TOPIC); - MqRecordMessage mqRecordMessage = new MqRecordMessage(); - mqRecordMessage.setTopic(buildTopic); - mqRecordMessage.setTag(tag); - mqRecordMessage.setConsumerGroupName(buildTopic); - mqRecordMessage.setMessageKey(applicationName + "-" + IdUtil.getSnowflakeNextIdStr()); - mqRecordMessage.setContent(JSON.toJSONString(message)); - mqRecordMessage.setSource(applicationName); - mqRecordMessage.setReturnTopic(buildReturnTopic); - return mqRecordMessage; + BnyerMessage bnyerMessage = new BnyerMessage(); + bnyerMessage.setTopic(buildTopic); + bnyerMessage.setTag(tag); + bnyerMessage.setConsumerGroupName(buildTopic); + bnyerMessage.setMessageKey(applicationName + "-" + IdUtil.getSnowflakeNextIdStr()); + bnyerMessage.setContent(JSON.toJSONString(message)); + bnyerMessage.setSource(applicationName); + bnyerMessage.setReturnTopic(buildReturnTopic); + return bnyerMessage; } /** @@ -95,17 +95,17 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); //保存消息记录 log.info("消息发送中,开始入库本地消息记录表"); - MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); - PayMqMessageRecord payMqMessageRecord = saveMessageRecord(mqRecordMessage); - mqRecordMessage.setId(payMqMessageRecord.getId()); + BnyerMessage bnyerMessage = getMqRecordMessage(topic, tag, message); + PayMqMessageRecord payMqMessageRecord = saveMessageRecord(bnyerMessage); + bnyerMessage.setId(payMqMessageRecord.getId()); //发消息 - Message sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, mqRecordMessage.getMessageKey()).build(); + Message sendMessage = MessageBuilder.withPayload(bnyerMessage).setHeader(RocketMQHeaders.KEYS, bnyerMessage.getMessageKey()).build(); SendResult sendResult; try { sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage); - log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),mqRecordMessage.getTag(), JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); + log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", bnyerMessage.getTopic(), bnyerMessage.getTag(), JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); } catch (Exception e) { - log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),mqRecordMessage.getTag(), JSONObject.toJSON(message), e.getMessage()); + log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", bnyerMessage.getTopic(), bnyerMessage.getTag(), JSONObject.toJSON(message), e.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); editMessageRecordStatus(payMqMessageRecord.getId(), payMqMessageRecord.getStatus(),e.getMessage()); // throw new RuntimeException(e); @@ -122,18 +122,18 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); //保存消息记录 log.info("消息发送中,开始入库本地消息记录表"); - MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); - PayMqMessageRecord payMqMessageRecord = saveMessageRecord(mqRecordMessage); - mqRecordMessage.setId(payMqMessageRecord.getId()); + BnyerMessage bnyerMessage = getMqRecordMessage(topic, tag, message); + PayMqMessageRecord payMqMessageRecord = saveMessageRecord(bnyerMessage); + bnyerMessage.setId(payMqMessageRecord.getId()); //发消息 - rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { + rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(bnyerMessage).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage), JSON.toJSONString(sendResult)); + log.info("消息发送成功,topic:{},tag:{},message:{},result:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage), JSON.toJSONString(sendResult)); } @Override public void onException(Throwable throwable) { - log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage),throwable.getMessage()); + log.error("消息发送失败,topic:{},tag:{},message:{},error:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),throwable.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); editMessageRecordStatus(payMqMessageRecord.getId(),payMqMessageRecord.getStatus(),throwable.getMessage()); } @@ -150,18 +150,18 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); //保存消息记录 log.info("消息发送中,开始入库本地消息记录表"); - MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); - PayMqMessageRecord payMqMessageRecord = saveMessageRecord(mqRecordMessage); - mqRecordMessage.setId(payMqMessageRecord.getId()); + BnyerMessage bnyerMessage = getMqRecordMessage(topic, tag, message); + PayMqMessageRecord payMqMessageRecord = saveMessageRecord(bnyerMessage); + bnyerMessage.setId(payMqMessageRecord.getId()); //发消息 - rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { + rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(bnyerMessage).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage),JSON.toJSONString(sendResult)); + log.info("消息发送成功,topic:{},tag:{},message:{},result:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),JSON.toJSONString(sendResult)); } @Override public void onException(Throwable throwable) { - log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage),throwable.getMessage()); + log.error("消息发送失败,topic:{},tag:{},message:{},error:{}", bnyerMessage.getTopic(), bnyerMessage.getTag(),JSON.toJSONString(bnyerMessage),throwable.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); editMessageRecordStatus(payMqMessageRecord.getId(),payMqMessageRecord.getStatus(),throwable.getMessage()); } @@ -173,7 +173,7 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService * @param message */ @Transactional - public PayMqMessageRecord saveMessageRecord(MqRecordMessage message){ + public PayMqMessageRecord saveMessageRecord(BnyerMessage message){ PayMqMessageRecord payMqMessageRecord = new PayMqMessageRecord(); payMqMessageRecord.setStatus(EnumMessageStatus.PROCESS); payMqMessageRecord.setConsumerGroupName(message.getConsumerGroupName()); @@ -214,7 +214,7 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService String content = payMqMessageRecord.getContent(); JSONObject jsonObject = JSON.parseObject(content); String msg = jsonObject.getString("content"); - MqRecordMessage message = new MqRecordMessage(); + BnyerMessage message = new BnyerMessage(); message.setId(payMqMessageRecord.getId()); message.setReturnTopic(payMqMessageRecord.getReturnTopic()); message.setContent(msg);