28 changed files with 933 additions and 59 deletions
@ -0,0 +1,19 @@ |
|||
package com.bnyer.common.core.domain; |
|||
|
|||
import com.baomidou.mybatisplus.annotation.TableName; |
|||
import lombok.Getter; |
|||
import lombok.NoArgsConstructor; |
|||
import lombok.Setter; |
|||
|
|||
/** |
|||
* @author :WXC |
|||
* @Date :2023/05/18 |
|||
* @description :图片服务本地消息表 |
|||
*/ |
|||
@Getter |
|||
@Setter |
|||
@NoArgsConstructor |
|||
@TableName(value = "img_mq_message_record") |
|||
public class ImgMqMessageRecord extends BaseMqMessage{ |
|||
|
|||
} |
|||
@ -0,0 +1,19 @@ |
|||
package com.bnyer.common.core.domain; |
|||
|
|||
import com.baomidou.mybatisplus.annotation.TableName; |
|||
import lombok.Getter; |
|||
import lombok.NoArgsConstructor; |
|||
import lombok.Setter; |
|||
|
|||
/** |
|||
* @author :WXC |
|||
* @Date :2023/05/18 |
|||
* @description :支付服务本地消息表 |
|||
*/ |
|||
@Getter |
|||
@Setter |
|||
@NoArgsConstructor |
|||
@TableName(value = "pay_mq_message_record") |
|||
public class PayMqMessageRecord extends BaseMqMessage{ |
|||
|
|||
} |
|||
@ -0,0 +1,33 @@ |
|||
package com.bnyer.common.rocketmq.domain.img; |
|||
|
|||
import com.bnyer.common.core.enums.EnumMessageStatus; |
|||
import com.bnyer.common.rocketmq.domain.BaseMessage; |
|||
import lombok.Getter; |
|||
import lombok.NoArgsConstructor; |
|||
import lombok.Setter; |
|||
|
|||
/** |
|||
* @author :WXC |
|||
* @Date :2023/05/18 |
|||
* @description :图片服务本地消息表记录 |
|||
*/ |
|||
@Getter |
|||
@Setter |
|||
@NoArgsConstructor |
|||
public class ImgMqLocalRecordMessage extends BaseMessage { |
|||
/** |
|||
* 主键id |
|||
*/ |
|||
private Long id; |
|||
|
|||
/** |
|||
* 消息状态 |
|||
*/ |
|||
private EnumMessageStatus status; |
|||
|
|||
/** |
|||
* 消息内容 |
|||
*/ |
|||
private String content; |
|||
|
|||
} |
|||
@ -0,0 +1,33 @@ |
|||
package com.bnyer.common.rocketmq.domain.pay; |
|||
|
|||
import com.bnyer.common.core.enums.EnumMessageStatus; |
|||
import com.bnyer.common.rocketmq.domain.BaseMessage; |
|||
import lombok.Getter; |
|||
import lombok.NoArgsConstructor; |
|||
import lombok.Setter; |
|||
|
|||
/** |
|||
* @author :WXC |
|||
* @Date :2023/05/18 |
|||
* @description :支付服务本地消息表记录 |
|||
*/ |
|||
@Getter |
|||
@Setter |
|||
@NoArgsConstructor |
|||
public class PayMqLocalRecordMessage extends BaseMessage { |
|||
/** |
|||
* 主键id |
|||
*/ |
|||
private Long id; |
|||
|
|||
/** |
|||
* 消息状态 |
|||
*/ |
|||
private EnumMessageStatus status; |
|||
|
|||
/** |
|||
* 消息内容 |
|||
*/ |
|||
private String content; |
|||
|
|||
} |
|||
@ -0,0 +1,60 @@ |
|||
package com.bnyer.img.listener; |
|||
|
|||
import com.bnyer.common.core.constant.ServiceNameConstants; |
|||
import com.bnyer.common.core.enums.EnumMessageStatus; |
|||
import com.bnyer.common.rocketmq.constant.RocketMqTopic; |
|||
import com.bnyer.common.rocketmq.domain.img.ImgMqLocalRecordMessage; |
|||
import com.bnyer.common.rocketmq.domain.pay.PayMqLocalRecordMessage; |
|||
import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; |
|||
import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; |
|||
import com.bnyer.img.service.ImgMqMessageRecordService; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; |
|||
import org.apache.rocketmq.spring.core.RocketMQListener; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import javax.annotation.Resource; |
|||
|
|||
/** |
|||
* @author :WXC |
|||
* @Date :2023/05/20 |
|||
* @description : |
|||
*/ |
|||
@Slf4j |
|||
@Component |
|||
@RocketMQMessageListener(topic = RocketMqTopic.IMG_RETURN_MSG_TOPIC,consumerGroup = RocketMqTopic.IMG_RETURN_MSG_TOPIC) |
|||
public class ImgReturnMessageConsumer extends EnhanceMessageHandler<ImgMqLocalRecordMessage> implements RocketMQListener<ImgMqLocalRecordMessage> { |
|||
|
|||
@Resource |
|||
private ImgMqMessageRecordService imgMqMessageRecordService; |
|||
|
|||
@Override |
|||
public void onMessage(ImgMqLocalRecordMessage message) { |
|||
super.dispatchMessage(message); |
|||
imgMqMessageRecordService.editMessageRecordStatus(message.getId(),message.getStatus(),null); |
|||
} |
|||
|
|||
@Override |
|||
protected void handleMessage(ImgMqLocalRecordMessage message) throws Exception { |
|||
} |
|||
|
|||
@Override |
|||
protected void handleMaxRetriesExceeded(ImgMqLocalRecordMessage message) { |
|||
} |
|||
|
|||
@Override |
|||
protected boolean filter(ImgMqLocalRecordMessage message) { |
|||
return super.handleMsgRepeat(message); |
|||
} |
|||
|
|||
@Override |
|||
protected boolean isRetry() { |
|||
return true; |
|||
} |
|||
|
|||
@Override |
|||
protected boolean throwException() { |
|||
return false; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,22 @@ |
|||
package com.bnyer.img.mapper; |
|||
|
|||
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
|||
import com.bnyer.common.core.domain.ImgMqMessageRecord; |
|||
import com.bnyer.common.core.domain.OrderMqMessageRecord; |
|||
import com.bnyer.common.core.enums.EnumMessageStatus; |
|||
import org.apache.ibatis.annotations.Mapper; |
|||
import org.apache.ibatis.annotations.Param; |
|||
|
|||
/** |
|||
* @author :WXC |
|||
* @description : |
|||
*/ |
|||
@Mapper |
|||
public interface ImgMqMessageRecordMapper extends BaseMapper<ImgMqMessageRecord> { |
|||
/** |
|||
* 更新状态 |
|||
* @param status |
|||
* @return |
|||
*/ |
|||
Integer updateStatusByStatus(@Param("id") Long id, @Param("status") EnumMessageStatus status, @Param("errMsg") String errMsg); |
|||
} |
|||
@ -0,0 +1,62 @@ |
|||
package com.bnyer.img.service; |
|||
|
|||
import com.bnyer.common.core.domain.ImgMqMessageRecord; |
|||
import com.bnyer.common.core.enums.EnumMessageStatus; |
|||
|
|||
/** |
|||
* @author :WXC |
|||
* @Date :2023/05/20 |
|||
* @description :图片服务本地消息service层 |
|||
*/ |
|||
public interface ImgMqMessageRecordService { |
|||
|
|||
/** |
|||
* 发送同步步消息 |
|||
* @param topic |
|||
* @param tag |
|||
* @param message |
|||
* @param <T> |
|||
*/ |
|||
<T> void send(String topic, String tag, T message); |
|||
|
|||
/** |
|||
* 发送异步消息 |
|||
* @param topic |
|||
* @param tag |
|||
* @param message |
|||
* @param <T> |
|||
*/ |
|||
<T> void sendAsyncMsg(String topic, String tag, T message); |
|||
|
|||
/** |
|||
* 发送异步延时消息 |
|||
* 废弃:延时消息不走消息表,因为延时消息最少支持秒级延时,定时任务不可能对消息表进行秒级扫描,那样性能损耗太大了 |
|||
* 所以确保消息一定发送成功,走同步发送,如果发送失败直接抛异常,确保本地事物回滚 |
|||
* @param topic |
|||
* @param tag |
|||
* @param message |
|||
* @param delayLevel |
|||
* @param <T> |
|||
*/ |
|||
@Deprecated |
|||
<T> void sendAsyncMsg(String topic, String tag, T message, int delayLevel); |
|||
|
|||
/** |
|||
* 保存消息记录 |
|||
* @param topic |
|||
* @param tag |
|||
* @param message |
|||
* @return |
|||
* @param <T> |
|||
*/ |
|||
<T> ImgMqMessageRecord saveMessageRecord(String topic, String tag, T message); |
|||
|
|||
/** |
|||
* 修改消息记录状态 |
|||
* @param id |
|||
* @param status |
|||
* @param errMsg |
|||
*/ |
|||
void editMessageRecordStatus(Long id, EnumMessageStatus status,String errMsg); |
|||
|
|||
} |
|||
@ -0,0 +1,164 @@ |
|||
package com.bnyer.img.service.impl; |
|||
|
|||
import com.alibaba.fastjson.JSON; |
|||
import com.alibaba.fastjson.JSONObject; |
|||
import com.bnyer.common.core.constant.ServiceNameConstants; |
|||
import com.bnyer.common.core.domain.ImgMqMessageRecord; |
|||
import com.bnyer.common.core.enums.EnumMessageStatus; |
|||
import com.bnyer.common.core.utils.bean.EntityConvertUtil; |
|||
import com.bnyer.common.core.utils.uuid.IdUtils; |
|||
import com.bnyer.common.rocketmq.constant.RocketMqConstant; |
|||
import com.bnyer.common.rocketmq.domain.img.ImgMqLocalRecordMessage; |
|||
import com.bnyer.common.rocketmq.domain.img.ImgMqLocalRecordMessage; |
|||
import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; |
|||
import com.bnyer.img.mapper.ImgMqMessageRecordMapper; |
|||
import com.bnyer.img.service.ImgMqMessageRecordService; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.rocketmq.client.producer.SendCallback; |
|||
import org.apache.rocketmq.client.producer.SendResult; |
|||
import org.apache.rocketmq.spring.core.RocketMQTemplate; |
|||
import org.apache.rocketmq.spring.support.RocketMQHeaders; |
|||
import org.springframework.messaging.Message; |
|||
import org.springframework.messaging.support.MessageBuilder; |
|||
import org.springframework.stereotype.Service; |
|||
import org.springframework.transaction.annotation.Transactional; |
|||
|
|||
import javax.annotation.Resource; |
|||
import java.util.Date; |
|||
|
|||
|
|||
/** |
|||
* @author :WXC |
|||
* @Date :2023/05/19 |
|||
* @description : |
|||
*/ |
|||
@Slf4j |
|||
@Service |
|||
public class ImgMqMessageRecordServiceImpl implements ImgMqMessageRecordService { |
|||
|
|||
@Resource |
|||
private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; |
|||
|
|||
@Resource |
|||
private ImgMqMessageRecordMapper imgMqMessageRecordMapper; |
|||
|
|||
/** |
|||
* 发送同步步消息 |
|||
* @param topic |
|||
* @param tag |
|||
* @param message |
|||
* @param <T> |
|||
*/ |
|||
@Transactional |
|||
@Override |
|||
public <T> void send(String topic, String tag, T message) { |
|||
// 设置业务键,此处根据公共的参数进行处理
|
|||
// 更多的其它基础业务处理...
|
|||
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); |
|||
//保存消息记录
|
|||
log.info("消息发送中,开始入库本地消息记录表"); |
|||
ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, message); |
|||
//发消息
|
|||
ImgMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(imgMqMessageRecord, ImgMqLocalRecordMessage.class); |
|||
mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE); |
|||
Message<ImgMqLocalRecordMessage> sendMessage = MessageBuilder.withPayload(mqLocalMessage).setHeader(RocketMQHeaders.KEYS, imgMqMessageRecord.getMessageKey()).build(); |
|||
SendResult sendResult; |
|||
try { |
|||
sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage); |
|||
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); |
|||
} catch (Exception e) { |
|||
editMessageRecordStatus(imgMqMessageRecord.getId(), imgMqMessageRecord.getStatus(),e.getMessage()); |
|||
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), e.getMessage()); |
|||
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); |
|||
// throw new RuntimeException(e);
|
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) |
|||
* (适合对响应时间敏感的业务场景) |
|||
*/ |
|||
@Transactional |
|||
@Override |
|||
public <T> void sendAsyncMsg(String topic, String tag, T message) { |
|||
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); |
|||
//保存消息记录
|
|||
log.info("消息发送中,开始入库本地消息记录表"); |
|||
ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, message); |
|||
//发消息
|
|||
ImgMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(imgMqMessageRecord, ImgMqLocalRecordMessage.class); |
|||
mqLocalMessage.setSource(ServiceNameConstants.IMG_SERVICE); |
|||
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() { |
|||
@Override |
|||
public void onSuccess(SendResult sendResult) { |
|||
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message), JSON.toJSONString(sendResult)); |
|||
} |
|||
@Override |
|||
public void onException(Throwable throwable) { |
|||
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); |
|||
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); |
|||
imgMqMessageRecord.setErrMsg(throwable.getMessage()); |
|||
editMessageRecordStatus(imgMqMessageRecord.getId(),imgMqMessageRecord.getStatus(),imgMqMessageRecord.getErrMsg()); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
/** |
|||
* 发送异步延时消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) |
|||
* (适合对响应时间敏感的业务场景) |
|||
*/ |
|||
@Transactional |
|||
@Override |
|||
public <T> void sendAsyncMsg(String topic, String tag, T message, int delayLevel) { |
|||
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); |
|||
//保存消息记录
|
|||
log.info("消息发送中,开始入库本地消息记录表"); |
|||
ImgMqMessageRecord imgMqMessageRecord = saveMessageRecord(topic,tag, message); |
|||
//发消息
|
|||
ImgMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(imgMqMessageRecord, ImgMqLocalRecordMessage.class); |
|||
mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE); |
|||
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() { |
|||
@Override |
|||
public void onSuccess(SendResult sendResult) { |
|||
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message),JSON.toJSONString(sendResult)); |
|||
} |
|||
@Override |
|||
public void onException(Throwable throwable) { |
|||
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); |
|||
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); |
|||
editMessageRecordStatus(imgMqMessageRecord.getId(),imgMqMessageRecord.getStatus(),throwable.getMessage()); |
|||
} |
|||
}, RocketMqConstant.TIME_OUT,delayLevel); |
|||
} |
|||
|
|||
/** |
|||
* 添加消息记录 |
|||
* @param message |
|||
* @param <T> |
|||
*/ |
|||
@Transactional |
|||
public <T> ImgMqMessageRecord saveMessageRecord(String topic, String tag, T message){ |
|||
topic = rocketMQEnhanceTemplate.reBuildTopic(topic); |
|||
ImgMqMessageRecord imgMqMessageRecord = new ImgMqMessageRecord(); |
|||
imgMqMessageRecord.setStatus(EnumMessageStatus.PROCESS); |
|||
imgMqMessageRecord.setConsumerGroupName(topic); |
|||
imgMqMessageRecord.setTopic(topic); |
|||
imgMqMessageRecord.setTag(tag); |
|||
imgMqMessageRecord.setCreateTime(new Date()); |
|||
imgMqMessageRecord.setMessageKey(IdUtils.randomUUID()); |
|||
imgMqMessageRecord.setContent(JSON.toJSONString(message)); |
|||
imgMqMessageRecordMapper.insert(imgMqMessageRecord); |
|||
return imgMqMessageRecord; |
|||
} |
|||
|
|||
/** |
|||
* 修改消息记录状态 |
|||
* @param id |
|||
* @param status |
|||
*/ |
|||
@Transactional |
|||
public void editMessageRecordStatus(Long id,EnumMessageStatus status,String errMsg){ |
|||
imgMqMessageRecordMapper.updateStatusByStatus(id,status,errMsg); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,37 @@ |
|||
<?xml version="1.0" encoding="UTF-8"?> |
|||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> |
|||
<mapper namespace="com.bnyer.img.mapper.ImgMqMessageRecordMapper"> |
|||
<resultMap id="BaseResultMap" type="com.bnyer.common.core.domain.ImgMqMessageRecord"> |
|||
<!--order_mq_message--> |
|||
<id column="id" jdbcType="BIGINT" property="id" /> |
|||
<result column="message_key" jdbcType="VARCHAR" property="messageKey" /> |
|||
<result column="topic" jdbcType="VARCHAR" property="topic" /> |
|||
<result column="tag" jdbcType="VARCHAR" property="tag" /> |
|||
<result column="consumer_group_name" jdbcType="VARCHAR" property="consumerGroupName" /> |
|||
<result column="status" jdbcType="VARCHAR" property="status" /> |
|||
<result column="err_msg" jdbcType="VARCHAR" property="errMsg" /> |
|||
<result column="content" jdbcType="VARCHAR" property="content" /> |
|||
<result column="create_time" jdbcType="TIMESTAMP" property="createTime" /> |
|||
</resultMap> |
|||
|
|||
<sql id="Base_Column_List"> |
|||
t.id, |
|||
t.message_key, |
|||
t.topic, |
|||
t.tag, |
|||
t.consumer_group_name, |
|||
t.status, |
|||
t.err_msg, |
|||
t.content, |
|||
t.create_time, |
|||
</sql> |
|||
<update id="updateStatusByStatus"> |
|||
update img_mq_message_record |
|||
set status = #{status} |
|||
<if test="errMsg != null and errMsg != ''"> |
|||
,err_msg = #{errMsg} |
|||
</if> |
|||
where id = #{id} |
|||
</update> |
|||
|
|||
</mapper> |
|||
@ -0,0 +1,58 @@ |
|||
package com.bnyer.pay.listener; |
|||
|
|||
import com.bnyer.common.rocketmq.constant.RocketMqTopic; |
|||
import com.bnyer.common.rocketmq.domain.pay.PayMqLocalRecordMessage; |
|||
import com.bnyer.common.rocketmq.handle.EnhanceMessageHandler; |
|||
import com.bnyer.pay.service.PayMqMessageRecordService; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; |
|||
import org.apache.rocketmq.spring.core.RocketMQListener; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import javax.annotation.Resource; |
|||
|
|||
/** |
|||
* @author :WXC |
|||
* @Date :2023/05/20 |
|||
* @description : |
|||
*/ |
|||
@Slf4j |
|||
@Component |
|||
@RocketMQMessageListener(topic = RocketMqTopic.PAY_RETURN_MSG_TOPIC,consumerGroup = RocketMqTopic.PAY_RETURN_MSG_TOPIC) |
|||
public class PayReturnMessageConsumer extends EnhanceMessageHandler<PayMqLocalRecordMessage> implements RocketMQListener<PayMqLocalRecordMessage> { |
|||
|
|||
@Resource |
|||
private PayMqMessageRecordService payMqMessageRecordService; |
|||
|
|||
@Override |
|||
public void onMessage(PayMqLocalRecordMessage message) { |
|||
super.dispatchMessage(message); |
|||
payMqMessageRecordService.editMessageRecordStatus(message.getId(),message.getStatus(),null); |
|||
} |
|||
|
|||
@Override |
|||
protected void handleMessage(PayMqLocalRecordMessage message) throws Exception { |
|||
|
|||
} |
|||
|
|||
@Override |
|||
protected void handleMaxRetriesExceeded(PayMqLocalRecordMessage message) { |
|||
|
|||
} |
|||
|
|||
@Override |
|||
protected boolean filter(PayMqLocalRecordMessage message) { |
|||
return super.handleMsgRepeat(message); |
|||
} |
|||
|
|||
@Override |
|||
protected boolean isRetry() { |
|||
return true; |
|||
} |
|||
|
|||
@Override |
|||
protected boolean throwException() { |
|||
return false; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,22 @@ |
|||
package com.bnyer.pay.mapper; |
|||
|
|||
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
|||
import com.bnyer.common.core.domain.OrderMqMessageRecord; |
|||
import com.bnyer.common.core.domain.PayMqMessageRecord; |
|||
import com.bnyer.common.core.enums.EnumMessageStatus; |
|||
import org.apache.ibatis.annotations.Mapper; |
|||
import org.apache.ibatis.annotations.Param; |
|||
|
|||
/** |
|||
* @author :WXC |
|||
* @description : |
|||
*/ |
|||
@Mapper |
|||
public interface PayMqMessageRecordMapper extends BaseMapper<PayMqMessageRecord> { |
|||
/** |
|||
* 更新状态 |
|||
* @param status |
|||
* @return |
|||
*/ |
|||
Integer updateStatusByStatus(@Param("id") Long id, @Param("status") EnumMessageStatus status, @Param("errMsg") String errMsg); |
|||
} |
|||
@ -0,0 +1,62 @@ |
|||
package com.bnyer.pay.service; |
|||
|
|||
import com.bnyer.common.core.domain.PayMqMessageRecord; |
|||
import com.bnyer.common.core.enums.EnumMessageStatus; |
|||
|
|||
/** |
|||
* @author :WXC |
|||
* @Date :2023/05/20 |
|||
* @description :支付服务本地消息service层 |
|||
*/ |
|||
public interface PayMqMessageRecordService { |
|||
|
|||
/** |
|||
* 发送同步步消息 |
|||
* @param topic |
|||
* @param tag |
|||
* @param message |
|||
* @param <T> |
|||
*/ |
|||
<T> void send(String topic, String tag, T message); |
|||
|
|||
/** |
|||
* 发送异步消息 |
|||
* @param topic |
|||
* @param tag |
|||
* @param message |
|||
* @param <T> |
|||
*/ |
|||
<T> void sendAsyncMsg(String topic, String tag, T message); |
|||
|
|||
/** |
|||
* 发送异步延时消息 |
|||
* 废弃:延时消息不走消息表,因为延时消息最少支持秒级延时,定时任务不可能对消息表进行秒级扫描,那样性能损耗太大了 |
|||
* 所以确保消息一定发送成功,走同步发送,如果发送失败直接抛异常,确保本地事物回滚 |
|||
* @param topic |
|||
* @param tag |
|||
* @param message |
|||
* @param delayLevel |
|||
* @param <T> |
|||
*/ |
|||
@Deprecated |
|||
<T> void sendAsyncMsg(String topic, String tag, T message, int delayLevel); |
|||
|
|||
/** |
|||
* 保存消息记录 |
|||
* @param topic |
|||
* @param tag |
|||
* @param message |
|||
* @return |
|||
* @param <T> |
|||
*/ |
|||
<T> PayMqMessageRecord saveMessageRecord(String topic, String tag, T message); |
|||
|
|||
/** |
|||
* 修改消息记录状态 |
|||
* @param id |
|||
* @param status |
|||
* @param errMsg |
|||
*/ |
|||
void editMessageRecordStatus(Long id, EnumMessageStatus status,String errMsg); |
|||
|
|||
} |
|||
@ -0,0 +1,162 @@ |
|||
package com.bnyer.pay.service.impl; |
|||
|
|||
import com.alibaba.fastjson.JSON; |
|||
import com.alibaba.fastjson.JSONObject; |
|||
import com.bnyer.common.core.constant.ServiceNameConstants; |
|||
import com.bnyer.common.core.domain.PayMqMessageRecord; |
|||
import com.bnyer.common.core.enums.EnumMessageStatus; |
|||
import com.bnyer.common.core.utils.bean.EntityConvertUtil; |
|||
import com.bnyer.common.core.utils.uuid.IdUtils; |
|||
import com.bnyer.common.rocketmq.constant.RocketMqConstant; |
|||
import com.bnyer.common.rocketmq.domain.pay.PayMqLocalRecordMessage; |
|||
import com.bnyer.common.rocketmq.template.RocketMQEnhanceTemplate; |
|||
import com.bnyer.pay.mapper.PayMqMessageRecordMapper; |
|||
import com.bnyer.pay.service.PayMqMessageRecordService; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.rocketmq.client.producer.SendCallback; |
|||
import org.apache.rocketmq.client.producer.SendResult; |
|||
import org.apache.rocketmq.spring.core.RocketMQTemplate; |
|||
import org.apache.rocketmq.spring.support.RocketMQHeaders; |
|||
import org.springframework.messaging.Message; |
|||
import org.springframework.messaging.support.MessageBuilder; |
|||
import org.springframework.stereotype.Service; |
|||
import org.springframework.transaction.annotation.Transactional; |
|||
|
|||
import javax.annotation.Resource; |
|||
import java.util.Date; |
|||
|
|||
/** |
|||
* @author :WXC |
|||
* @Date :2023/05/19 |
|||
* @description : |
|||
*/ |
|||
@Slf4j |
|||
@Service |
|||
public class PayMqMessageRecordServiceImpl implements PayMqMessageRecordService { |
|||
|
|||
@Resource |
|||
private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; |
|||
|
|||
@Resource |
|||
private PayMqMessageRecordMapper payMqMessageRecordMapper; |
|||
|
|||
/** |
|||
* 发送同步步消息 |
|||
* @param topic |
|||
* @param tag |
|||
* @param message |
|||
* @param <T> |
|||
*/ |
|||
@Transactional |
|||
@Override |
|||
public <T> void send(String topic, String tag, T message) { |
|||
// 设置业务键,此处根据公共的参数进行处理
|
|||
// 更多的其它基础业务处理...
|
|||
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); |
|||
//保存消息记录
|
|||
log.info("消息发送中,开始入库本地消息记录表"); |
|||
PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, message); |
|||
//发消息
|
|||
PayMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(payMqMessageRecord, PayMqLocalRecordMessage.class); |
|||
mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE); |
|||
Message<PayMqLocalRecordMessage> sendMessage = MessageBuilder.withPayload(mqLocalMessage).setHeader(RocketMQHeaders.KEYS, payMqMessageRecord.getMessageKey()).build(); |
|||
SendResult sendResult; |
|||
try { |
|||
sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage); |
|||
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); |
|||
} catch (Exception e) { |
|||
editMessageRecordStatus(payMqMessageRecord.getId(), payMqMessageRecord.getStatus(),e.getMessage()); |
|||
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), e.getMessage()); |
|||
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); |
|||
// throw new RuntimeException(e);
|
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) |
|||
* (适合对响应时间敏感的业务场景) |
|||
*/ |
|||
@Transactional |
|||
@Override |
|||
public <T> void sendAsyncMsg(String topic, String tag, T message) { |
|||
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); |
|||
//保存消息记录
|
|||
log.info("消息发送中,开始入库本地消息记录表"); |
|||
PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, message); |
|||
//发消息
|
|||
PayMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(payMqMessageRecord, PayMqLocalRecordMessage.class); |
|||
mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE); |
|||
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() { |
|||
@Override |
|||
public void onSuccess(SendResult sendResult) { |
|||
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message), JSON.toJSONString(sendResult)); |
|||
} |
|||
@Override |
|||
public void onException(Throwable throwable) { |
|||
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); |
|||
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); |
|||
payMqMessageRecord.setErrMsg(throwable.getMessage()); |
|||
editMessageRecordStatus(payMqMessageRecord.getId(),payMqMessageRecord.getStatus(),payMqMessageRecord.getErrMsg()); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
/** |
|||
* 发送异步延时消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) |
|||
* (适合对响应时间敏感的业务场景) |
|||
*/ |
|||
@Transactional |
|||
@Override |
|||
public <T> void sendAsyncMsg(String topic, String tag, T message, int delayLevel) { |
|||
RocketMQTemplate rocketMQTemplate = rocketMQEnhanceTemplate.getTemplate(); |
|||
//保存消息记录
|
|||
log.info("消息发送中,开始入库本地消息记录表"); |
|||
PayMqMessageRecord payMqMessageRecord = saveMessageRecord(topic,tag, message); |
|||
//发消息
|
|||
PayMqLocalRecordMessage mqLocalMessage = EntityConvertUtil.copy(payMqMessageRecord, PayMqLocalRecordMessage.class); |
|||
mqLocalMessage.setSource(ServiceNameConstants.ORDER_SERVICE); |
|||
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqLocalMessage).build(), new SendCallback() { |
|||
@Override |
|||
public void onSuccess(SendResult sendResult) { |
|||
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message),JSON.toJSONString(sendResult)); |
|||
} |
|||
@Override |
|||
public void onException(Throwable throwable) { |
|||
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); |
|||
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); |
|||
editMessageRecordStatus(payMqMessageRecord.getId(),payMqMessageRecord.getStatus(),throwable.getMessage()); |
|||
} |
|||
}, RocketMqConstant.TIME_OUT,delayLevel); |
|||
} |
|||
|
|||
/** |
|||
* 添加消息记录 |
|||
* @param message |
|||
* @param <T> |
|||
*/ |
|||
@Transactional |
|||
public <T> PayMqMessageRecord saveMessageRecord(String topic, String tag, T message){ |
|||
topic = rocketMQEnhanceTemplate.reBuildTopic(topic); |
|||
PayMqMessageRecord payMqMessageRecord = new PayMqMessageRecord(); |
|||
payMqMessageRecord.setStatus(EnumMessageStatus.PROCESS); |
|||
payMqMessageRecord.setConsumerGroupName(topic); |
|||
payMqMessageRecord.setTopic(topic); |
|||
payMqMessageRecord.setTag(tag); |
|||
payMqMessageRecord.setCreateTime(new Date()); |
|||
payMqMessageRecord.setMessageKey(IdUtils.randomUUID()); |
|||
payMqMessageRecord.setContent(JSON.toJSONString(message)); |
|||
payMqMessageRecordMapper.insert(payMqMessageRecord); |
|||
return payMqMessageRecord; |
|||
} |
|||
|
|||
/** |
|||
* 修改消息记录状态 |
|||
* @param id |
|||
* @param status |
|||
*/ |
|||
@Transactional |
|||
public void editMessageRecordStatus(Long id,EnumMessageStatus status,String errMsg){ |
|||
payMqMessageRecordMapper.updateStatusByStatus(id,status,errMsg); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,37 @@ |
|||
<?xml version="1.0" encoding="UTF-8"?> |
|||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> |
|||
<mapper namespace="com.bnyer.pay.mapper.PayMqMessageRecordMapper"> |
|||
<resultMap id="BaseResultMap" type="com.bnyer.common.core.domain.PayMqMessageRecord"> |
|||
<!--order_mq_message--> |
|||
<id column="id" jdbcType="BIGINT" property="id" /> |
|||
<result column="message_key" jdbcType="VARCHAR" property="messageKey" /> |
|||
<result column="topic" jdbcType="VARCHAR" property="topic" /> |
|||
<result column="tag" jdbcType="VARCHAR" property="tag" /> |
|||
<result column="consumer_group_name" jdbcType="VARCHAR" property="consumerGroupName" /> |
|||
<result column="status" jdbcType="VARCHAR" property="status" /> |
|||
<result column="err_msg" jdbcType="VARCHAR" property="errMsg" /> |
|||
<result column="content" jdbcType="VARCHAR" property="content" /> |
|||
<result column="create_time" jdbcType="TIMESTAMP" property="createTime" /> |
|||
</resultMap> |
|||
|
|||
<sql id="Base_Column_List"> |
|||
t.id, |
|||
t.message_key, |
|||
t.topic, |
|||
t.tag, |
|||
t.consumer_group_name, |
|||
t.status, |
|||
t.err_msg, |
|||
t.content, |
|||
t.create_time, |
|||
</sql> |
|||
<update id="updateStatusByStatus"> |
|||
update pay_mq_message_record |
|||
set status = #{status} |
|||
<if test="errMsg != null and errMsg != ''"> |
|||
,err_msg = #{errMsg} |
|||
</if> |
|||
where id = #{id} |
|||
</update> |
|||
|
|||
</mapper> |
|||
Loading…
Reference in new issue