From f907768ceb0cb0a0dc098a8a5bf2e3ec4184aab9 Mon Sep 17 00:00:00 2001 From: wuxicheng <1441859745@qq.com> Date: Sun, 21 May 2023 20:33:31 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../handle/EnhanceMessageHandler.java | 15 +++-- .../service/ImgMqMessageRecordService.java | 4 +- .../impl/ImgMqMessageRecordServiceImpl.java | 62 +++++++++++------- .../service/OrderMqMessageRecordService.java | 4 +- .../impl/OrderMqMessageRecordServiceImpl.java | 61 +++++++++++------- .../service/PayMqMessageRecordService.java | 4 +- .../impl/PayMqMessageRecordServiceImpl.java | 63 ++++++++++++------- 7 files changed, 133 insertions(+), 80 deletions(-) diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java index 36c1327..35dd15e 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java @@ -216,19 +216,18 @@ public abstract class EnhanceMessageHandler { /** * 构建返回队列通知消息 - * @param oldMessage + * @param message * @param status * @return */ - protected MqRecordMessage buildReturnMessage(MqRecordMessage oldMessage,EnumMessageStatus status){ + protected MqRecordMessage buildReturnMessage(MqRecordMessage message,EnumMessageStatus status){ MqRecordMessage mqRecordMessage = new MqRecordMessage(); - String orderReturnTopic = rocketMQEnhanceTemplate.reBuildTopic(oldMessage.getReturnTopic()); mqRecordMessage.setMessageKey(IdUtil.randomUUID()); mqRecordMessage.setSource(applicationName); - mqRecordMessage.setTopic(orderReturnTopic); - mqRecordMessage.setConsumerGroupName(orderReturnTopic); + mqRecordMessage.setTopic(message.getReturnTopic()); + mqRecordMessage.setConsumerGroupName(message.getReturnTopic()); mqRecordMessage.setStatus(status); - mqRecordMessage.setId(oldMessage.getId()); + mqRecordMessage.setId(message.getId()); return mqRecordMessage; } @@ -238,7 +237,7 @@ public abstract class EnhanceMessageHandler { */ protected void sendReturnSuccessMessage(MqRecordMessage message){ MqRecordMessage returnMessage = buildReturnMessage(message, EnumMessageStatus.SUCCESS); - rocketMQEnhanceTemplate.sendAsyncMsg(returnMessage.getReturnTopic(),null,returnMessage); + rocketMQEnhanceTemplate.sendAsyncMsg(message.getReturnTopic(),null,returnMessage); } /** @@ -247,7 +246,7 @@ public abstract class EnhanceMessageHandler { */ protected void sendReturnFailsMessage(MqRecordMessage message){ MqRecordMessage returnMessage = buildReturnMessage(message, EnumMessageStatus.FAILS); - rocketMQEnhanceTemplate.sendAsyncMsg(returnMessage.getReturnTopic(),null,returnMessage); + rocketMQEnhanceTemplate.sendAsyncMsg(message.getReturnTopic(),null,returnMessage); } } \ No newline at end of file diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/ImgMqMessageRecordService.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/ImgMqMessageRecordService.java index 3a1d838..d7e0ef4 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/ImgMqMessageRecordService.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/ImgMqMessageRecordService.java @@ -2,6 +2,7 @@ package com.bnyer.img.service; import com.bnyer.common.core.domain.ImgMqMessageRecord; import com.bnyer.common.core.enums.EnumMessageStatus; +import com.bnyer.common.rocketmq.domain.MqRecordMessage; /** * @author :WXC @@ -47,9 +48,8 @@ public interface ImgMqMessageRecordService { * @param tag * @param message * @return - * @param */ - ImgMqMessageRecord saveMessageRecord(String topic, String tag, T message); + ImgMqMessageRecord saveMessageRecord(String topic, String tag, MqRecordMessage message); /** * 修改消息记录状态 diff --git a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/ImgMqMessageRecordServiceImpl.java b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/ImgMqMessageRecordServiceImpl.java index 77a1437..80bd290 100644 --- a/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/ImgMqMessageRecordServiceImpl.java +++ b/bnyer-services/bnyer-img/src/main/java/com/bnyer/img/service/impl/ImgMqMessageRecordServiceImpl.java @@ -4,7 +4,6 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.bnyer.common.core.domain.ImgMqMessageRecord; import com.bnyer.common.core.enums.EnumMessageStatus; -import com.bnyer.common.core.utils.bean.EntityConvertUtil; import com.bnyer.common.core.utils.uuid.IdUtils; import com.bnyer.common.rocketmq.constant.RocketMqConstant; import com.bnyer.common.rocketmq.constant.RocketMqTopic; @@ -17,6 +16,7 @@ import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; +import org.jetbrains.annotations.NotNull; import org.springframework.core.env.Environment; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; @@ -53,6 +53,28 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService this.applicationName = env.getProperty("spring.application.name"); } + /** + * 构建消息内容 + * @param topic + * @param tag + * @param message + * @return + * @param + */ + @NotNull + private MqRecordMessage getMqRecordMessage(String topic, String tag, T message) { + String buildTopic = rocketMQEnhanceTemplate.reBuildTopic(topic); + MqRecordMessage mqRecordMessage = new MqRecordMessage(); + mqRecordMessage.setTopic(buildTopic); + mqRecordMessage.setTag(tag); + mqRecordMessage.setConsumerGroupName(buildTopic); + mqRecordMessage.setMessageKey(IdUtils.randomUUID()); + mqRecordMessage.setContent(JSON.toJSONString(message)); + mqRecordMessage.setSource(applicationName); + mqRecordMessage.setReturnTopic(RocketMqTopic.IMG_RETURN_MSG_TOPIC); + return mqRecordMessage; + } + /** * 发送同步步消息 * @param topic @@ -68,18 +90,17 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); //保存消息记录 log.info("消息发送中,开始入库本地消息记录表"); - ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, message); + MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); + ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, mqRecordMessage); + mqRecordMessage.setId(imgMqMessageRecord.getId()); //发消息 - MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(imgMqMessageRecord, MqRecordMessage.class); - mqRecordMessage.setSource(applicationName); - mqRecordMessage.setReturnTopic(RocketMqTopic.IMG_RETURN_MSG_TOPIC); - Message sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, imgMqMessageRecord.getMessageKey()).build(); + Message sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, mqRecordMessage.getMessageKey()).build(); SendResult sendResult; try { sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage); - log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); + log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),tag, JSONObject.toJSON(mqRecordMessage), JSONObject.toJSON(sendResult)); } catch (Exception e) { - log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), e.getMessage()); + log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),tag, JSONObject.toJSON(mqRecordMessage), e.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); editMessageRecordStatus(imgMqMessageRecord.getId(), imgMqMessageRecord.getStatus(),e.getMessage()); // throw new RuntimeException(e); @@ -96,19 +117,18 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); //保存消息记录 log.info("消息发送中,开始入库本地消息记录表"); - ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, message); + MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); + ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, mqRecordMessage); + mqRecordMessage.setId(imgMqMessageRecord.getId()); //发消息 - MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(imgMqMessageRecord, MqRecordMessage.class); - mqRecordMessage.setSource(applicationName); - mqRecordMessage.setReturnTopic(RocketMqTopic.IMG_RETURN_MSG_TOPIC); rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message), JSON.toJSONString(sendResult)); + log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage), JSON.toJSONString(sendResult)); } @Override public void onException(Throwable throwable) { - log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); + log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); editMessageRecordStatus(imgMqMessageRecord.getId(),imgMqMessageRecord.getStatus(),throwable.getMessage()); } @@ -125,19 +145,18 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); //保存消息记录 log.info("消息发送中,开始入库本地消息记录表"); - ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, message); + MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); + ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, mqRecordMessage); + mqRecordMessage.setId(imgMqMessageRecord.getId()); //发消息 - MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(imgMqMessageRecord, MqRecordMessage.class); - mqRecordMessage.setSource(applicationName); - mqRecordMessage.setReturnTopic(RocketMqTopic.IMG_RETURN_MSG_TOPIC); rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message),JSON.toJSONString(sendResult)); + log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage),JSON.toJSONString(sendResult)); } @Override public void onException(Throwable throwable) { - log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); + log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); editMessageRecordStatus(imgMqMessageRecord.getId(),imgMqMessageRecord.getStatus(),throwable.getMessage()); } @@ -147,10 +166,9 @@ public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService /** * 添加消息记录 * @param message - * @param */ @Transactional - public ImgMqMessageRecord saveMessageRecord(String topic, String tag, T message){ + public ImgMqMessageRecord saveMessageRecord(String topic, String tag, MqRecordMessage message){ topic = rocketMQEnhanceTemplate.reBuildTopic(topic); ImgMqMessageRecord imgMqMessageRecord = new ImgMqMessageRecord(); imgMqMessageRecord.setStatus(EnumMessageStatus.PROCESS); diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/OrderMqMessageRecordService.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/OrderMqMessageRecordService.java index 2135902..4dd955e 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/OrderMqMessageRecordService.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/OrderMqMessageRecordService.java @@ -2,6 +2,7 @@ package com.bnyer.order.service; import com.bnyer.common.core.domain.OrderMqMessageRecord; import com.bnyer.common.core.enums.EnumMessageStatus; +import com.bnyer.common.rocketmq.domain.MqRecordMessage; /** * @author :WXC @@ -47,9 +48,8 @@ public interface OrderMqMessageRecordService { * @param tag * @param message * @return - * @param */ - OrderMqMessageRecord saveMessageRecord(String topic, String tag, T message); + OrderMqMessageRecord saveMessageRecord(String topic, String tag, MqRecordMessage message); /** * 修改消息记录状态 diff --git a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java index 98e19bf..74f8c7c 100644 --- a/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java +++ b/bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java @@ -4,7 +4,6 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.bnyer.common.core.domain.OrderMqMessageRecord; import com.bnyer.common.core.enums.EnumMessageStatus; -import com.bnyer.common.core.utils.bean.EntityConvertUtil; import com.bnyer.common.core.utils.uuid.IdUtils; import com.bnyer.common.rocketmq.constant.RocketMqConstant; import com.bnyer.common.rocketmq.constant.RocketMqTopic; @@ -17,6 +16,7 @@ import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; +import org.jetbrains.annotations.NotNull; import org.springframework.core.env.Environment; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; @@ -52,6 +52,28 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ this.applicationName = env.getProperty("spring.application.name"); } + /** + * 构建消息内容 + * @param topic + * @param tag + * @param message + * @return + * @param + */ + @NotNull + private MqRecordMessage getMqRecordMessage(String topic, String tag, T message) { + String buildTopic = rocketMQEnhanceTemplate.reBuildTopic(topic); + MqRecordMessage mqRecordMessage = new MqRecordMessage(); + mqRecordMessage.setTopic(buildTopic); + mqRecordMessage.setTag(tag); + mqRecordMessage.setConsumerGroupName(buildTopic); + mqRecordMessage.setMessageKey(IdUtils.randomUUID()); + mqRecordMessage.setContent(JSON.toJSONString(message)); + mqRecordMessage.setSource(applicationName); + mqRecordMessage.setReturnTopic(RocketMqTopic.ORDER_RETURN_MSG_TOPIC); + return mqRecordMessage; + } + /** * 发送同步步消息 * @param topic @@ -67,18 +89,16 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); //保存消息记录 log.info("消息发送中,开始入库本地消息记录表"); - OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, message); + MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); + OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, mqRecordMessage); //发消息 - MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(orderMqMessageRecord, MqRecordMessage.class); - mqRecordMessage.setSource(applicationName); - mqRecordMessage.setReturnTopic(RocketMqTopic.ORDER_RETURN_MSG_TOPIC); - Message sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, orderMqMessageRecord.getMessageKey()).build(); + Message sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, mqRecordMessage.getMessageKey()).build(); SendResult sendResult; try { sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage); - log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); + log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),tag, JSONObject.toJSON(mqRecordMessage), JSONObject.toJSON(sendResult)); } catch (Exception e) { - log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), e.getMessage()); + log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),tag, JSONObject.toJSON(mqRecordMessage), e.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); editMessageRecordStatus(orderMqMessageRecord.getId(), orderMqMessageRecord.getStatus(),e.getMessage()); // throw new RuntimeException(e); @@ -95,19 +115,18 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); //保存消息记录 log.info("消息发送中,开始入库本地消息记录表"); - OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, message); + MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); + OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, mqRecordMessage); + mqRecordMessage.setId(orderMqMessageRecord.getId()); //发消息 - MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(orderMqMessageRecord, MqRecordMessage.class); - mqRecordMessage.setSource(applicationName); - mqRecordMessage.setReturnTopic(RocketMqTopic.ORDER_RETURN_MSG_TOPIC); rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message), JSON.toJSONString(sendResult)); + log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage), JSON.toJSONString(sendResult)); } @Override public void onException(Throwable throwable) { - log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); + log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); editMessageRecordStatus(orderMqMessageRecord.getId(),orderMqMessageRecord.getStatus(),throwable.getMessage()); } @@ -124,19 +143,18 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); //保存消息记录 log.info("消息发送中,开始入库本地消息记录表"); - OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, message); + MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); + OrderMqMessageRecord orderMqMessageRecord = saveMessageRecord(topic,tag, mqRecordMessage); + mqRecordMessage.setId(orderMqMessageRecord.getId()); //发消息 - MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(orderMqMessageRecord, MqRecordMessage.class); - mqRecordMessage.setSource(applicationName); - mqRecordMessage.setReturnTopic(RocketMqTopic.ORDER_RETURN_MSG_TOPIC); rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message),JSON.toJSONString(sendResult)); + log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage),JSON.toJSONString(sendResult)); } @Override public void onException(Throwable throwable) { - log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); + log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); editMessageRecordStatus(orderMqMessageRecord.getId(),orderMqMessageRecord.getStatus(),throwable.getMessage()); } @@ -149,8 +167,7 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ * @param */ @Transactional - public OrderMqMessageRecord saveMessageRecord(String topic, String tag, T message){ - topic = rocketMQEnhanceTemplate.reBuildTopic(topic); + public OrderMqMessageRecord saveMessageRecord(String topic, String tag, MqRecordMessage message){ OrderMqMessageRecord orderMqMessageRecord = new OrderMqMessageRecord(); orderMqMessageRecord.setStatus(EnumMessageStatus.PROCESS); orderMqMessageRecord.setConsumerGroupName(topic); diff --git a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/PayMqMessageRecordService.java b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/PayMqMessageRecordService.java index 99dac52..feed491 100644 --- a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/PayMqMessageRecordService.java +++ b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/PayMqMessageRecordService.java @@ -2,6 +2,7 @@ package com.bnyer.pay.service; import com.bnyer.common.core.domain.PayMqMessageRecord; import com.bnyer.common.core.enums.EnumMessageStatus; +import com.bnyer.common.rocketmq.domain.MqRecordMessage; /** * @author :WXC @@ -47,9 +48,8 @@ public interface PayMqMessageRecordService { * @param tag * @param message * @return - * @param */ - PayMqMessageRecord saveMessageRecord(String topic, String tag, T message); + PayMqMessageRecord saveMessageRecord(String topic, String tag, MqRecordMessage message); /** * 修改消息记录状态 diff --git a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayMqMessageRecordServiceImpl.java b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayMqMessageRecordServiceImpl.java index 3ccf2cc..1dc97cd 100644 --- a/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayMqMessageRecordServiceImpl.java +++ b/bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayMqMessageRecordServiceImpl.java @@ -17,6 +17,7 @@ import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; +import org.jetbrains.annotations.NotNull; import org.springframework.core.env.Environment; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; @@ -53,6 +54,28 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService } + /** + * 构建消息内容 + * @param topic + * @param tag + * @param message + * @return + * @param + */ + @NotNull + private MqRecordMessage getMqRecordMessage(String topic, String tag, T message) { + String buildTopic = rocketMQEnhanceTemplate.reBuildTopic(topic); + MqRecordMessage mqRecordMessage = new MqRecordMessage(); + mqRecordMessage.setTopic(buildTopic); + mqRecordMessage.setTag(tag); + mqRecordMessage.setConsumerGroupName(buildTopic); + mqRecordMessage.setMessageKey(IdUtils.randomUUID()); + mqRecordMessage.setContent(JSON.toJSONString(message)); + mqRecordMessage.setSource(applicationName); + mqRecordMessage.setReturnTopic(RocketMqTopic.PAY_RETURN_MSG_TOPIC); + return mqRecordMessage; + } + /** * 发送同步步消息 * @param topic @@ -68,18 +91,17 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); //保存消息记录 log.info("消息发送中,开始入库本地消息记录表"); - PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, message); + MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); + PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, mqRecordMessage); + mqRecordMessage.setId(payMqMessageRecord.getId()); //发消息 - MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(payMqMessageRecord, MqRecordMessage.class); - mqRecordMessage.setSource(applicationName); - mqRecordMessage.setReturnTopic(RocketMqTopic.PAY_RETURN_MSG_TOPIC); - Message sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, payMqMessageRecord.getMessageKey()).build(); + Message sendMessage = MessageBuilder.withPayload(mqRecordMessage).setHeader(RocketMQHeaders.KEYS, mqRecordMessage.getMessageKey()).build(); SendResult sendResult; try { sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage); - log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); + log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); } catch (Exception e) { - log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), e.getMessage()); + log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),tag, JSONObject.toJSON(message), e.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); editMessageRecordStatus(payMqMessageRecord.getId(), payMqMessageRecord.getStatus(),e.getMessage()); // throw new RuntimeException(e); @@ -96,19 +118,18 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); //保存消息记录 log.info("消息发送中,开始入库本地消息记录表"); - PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, message); + MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); + PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, mqRecordMessage); + mqRecordMessage.setId(payMqMessageRecord.getId()); //发消息 - MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(payMqMessageRecord, MqRecordMessage.class); - mqRecordMessage.setSource(applicationName); - mqRecordMessage.setReturnTopic(RocketMqTopic.PAY_RETURN_MSG_TOPIC); rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message), JSON.toJSONString(sendResult)); + log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage), JSON.toJSONString(sendResult)); } @Override public void onException(Throwable throwable) { - log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); + log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); editMessageRecordStatus(payMqMessageRecord.getId(),payMqMessageRecord.getStatus(),throwable.getMessage()); } @@ -125,19 +146,18 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); //保存消息记录 log.info("消息发送中,开始入库本地消息记录表"); - PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, message); + MqRecordMessage mqRecordMessage = getMqRecordMessage(topic, tag, message); + PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, mqRecordMessage); + mqRecordMessage.setId(payMqMessageRecord.getId()); //发消息 - MqRecordMessage mqRecordMessage = EntityConvertUtil.copy(payMqMessageRecord, MqRecordMessage.class); - mqRecordMessage.setSource(applicationName); - mqRecordMessage.setReturnTopic(RocketMqTopic.PAY_RETURN_MSG_TOPIC); rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { - log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message),JSON.toJSONString(sendResult)); + log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage),JSON.toJSONString(sendResult)); } @Override public void onException(Throwable throwable) { - log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); + log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage()); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); editMessageRecordStatus(payMqMessageRecord.getId(),payMqMessageRecord.getStatus(),throwable.getMessage()); } @@ -147,11 +167,9 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService /** * 添加消息记录 * @param message - * @param */ @Transactional - public PayMqMessageRecord saveMessageRecord(String topic, String tag, T message){ - topic = rocketMQEnhanceTemplate.reBuildTopic(topic); + public PayMqMessageRecord saveMessageRecord(String topic, String tag, MqRecordMessage message){ PayMqMessageRecord payMqMessageRecord = new PayMqMessageRecord(); payMqMessageRecord.setStatus(EnumMessageStatus.PROCESS); payMqMessageRecord.setConsumerGroupName(topic); @@ -174,4 +192,5 @@ public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService payMqMessageRecordMapper.updateStatusByStatus(id,status,errMsg); } + }