Browse Source

bug修复

feature-1.1
wuxicheng 3 years ago
parent
commit
0dc846ed95
  1. 6
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/OrderMqLocalRecordMessage.java
  2. 17
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java
  3. 7
      bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java
  4. 3
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/OrderReturnMessageConsumer.java
  5. 9
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/mapper/OrderMqMessageRecordMapper.java
  6. 9
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/OrderMqMessageRecordService.java
  7. 14
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java
  8. 8
      bnyer-services/bnyer-order/src/main/resources/com/bnyer/order/mapper/OrderMqMessageRecordMapper.xml

6
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/domain/order/OrderMqLocalRecordMessage.java

@ -30,10 +30,4 @@ public class OrderMqLocalRecordMessage extends BaseMessage {
*/ */
private String content; private String content;
/**
* 创建时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date createTime;
} }

17
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java

@ -1,10 +1,14 @@
package com.bnyer.common.rocketmq.handle; package com.bnyer.common.rocketmq.handle;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.bnyer.common.core.constant.ServiceNameConstants;
import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.rocketmq.config.RepeatConsumerConfig; import com.bnyer.common.rocketmq.config.RepeatConsumerConfig;
import com.bnyer.common.rocketmq.constant.RocketMqConstant; import com.bnyer.common.rocketmq.constant.RocketMqConstant;
import com.bnyer.common.rocketmq.domain.BaseMessage; import com.bnyer.common.rocketmq.domain.BaseMessage;
import com.bnyer.common.rocketmq.domain.RepeatElement; import com.bnyer.common.rocketmq.domain.RepeatElement;
import com.bnyer.common.rocketmq.domain.order.OrderMqLocalRecordMessage;
import com.bnyer.common.rocketmq.persist.IPersist; import com.bnyer.common.rocketmq.persist.IPersist;
import com.bnyer.common.rocketmq.strategy.RepeatConsumerStrategy; import com.bnyer.common.rocketmq.strategy.RepeatConsumerStrategy;
import com.bnyer.common.rocketmq.strategy.NormalRepeatStrategy; import com.bnyer.common.rocketmq.strategy.NormalRepeatStrategy;
@ -208,4 +212,17 @@ public abstract class EnhanceMessageHandler<T extends BaseMessage> {
return message.getMessageKey(); return message.getMessageKey();
} }
/**
* 构建返回队列通知
* @param success
* @param message
*/
protected void buildRerunMessage(EnumMessageStatus success, OrderMqLocalRecordMessage message) {
OrderMqLocalRecordMessage orderMqLocalRecordMessage = new OrderMqLocalRecordMessage();
orderMqLocalRecordMessage.setStatus(success);
orderMqLocalRecordMessage.setId(message.getId());
orderMqLocalRecordMessage.setSource(ServiceNameConstants.IMG_SERVICE);
orderMqLocalRecordMessage.setMessageKey(IdUtil.randomUUID());
}
} }

7
bnyer-services/bnyer-img/src/main/java/com/bnyer/img/listener/VipRecordCreateConsumer.java

@ -1,9 +1,12 @@
package com.bnyer.img.listener; package com.bnyer.img.listener;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.bnyer.common.core.constant.ServiceNameConstants;
import com.bnyer.common.core.dto.AddUserVipRecordDto; import com.bnyer.common.core.dto.AddUserVipRecordDto;
import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.core.utils.bean.EntityConvertUtil; import com.bnyer.common.core.utils.bean.EntityConvertUtil;
import com.bnyer.common.core.utils.uuid.IdUtils;
import com.bnyer.common.rocketmq.constant.RocketMqTopic; import com.bnyer.common.rocketmq.constant.RocketMqTopic;
import com.bnyer.common.rocketmq.domain.img.AddUserVipRecordMessage; import com.bnyer.common.rocketmq.domain.img.AddUserVipRecordMessage;
import com.bnyer.common.rocketmq.domain.order.OrderMqLocalRecordMessage; import com.bnyer.common.rocketmq.domain.order.OrderMqLocalRecordMessage;
@ -49,7 +52,7 @@ public class VipRecordCreateConsumer extends EnhanceMessageHandler<OrderMqLocalR
//发送返回队列,告知已经处理成功,完成最终一致性 //发送返回队列,告知已经处理成功,完成最终一致性
//TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免 //TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免
// 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的 // 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的
message.setStatus(EnumMessageStatus.SUCCESS); super.buildRerunMessage(EnumMessageStatus.SUCCESS, message);
rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.ORDER_RETURN_MSG_TOPIC,null,message); rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.ORDER_RETURN_MSG_TOPIC,null,message);
} }
@ -58,7 +61,7 @@ public class VipRecordCreateConsumer extends EnhanceMessageHandler<OrderMqLocalR
//发送返回队列,告知已经超过最大重试次数,消费失败,需人工处理该记录 //发送返回队列,告知已经超过最大重试次数,消费失败,需人工处理该记录
//TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免 //TODO: 2023/05/20 为避免这里消息通知失败,定时补偿的时候需要根据创建时间,判断如果超过固定时间比如30分钟,就直接设置成作废,避免
// 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的 // 定时任务一直补偿,因为这里不做补偿机制,是不会重发消息的
message.setStatus(EnumMessageStatus.FAILS); super.buildRerunMessage(EnumMessageStatus.FAILS, message);
rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.ORDER_RETURN_MSG_TOPIC,null,message); rocketMQEnhanceTemplate.sendAsyncMsg(RocketMqTopic.ORDER_RETURN_MSG_TOPIC,null,message);
} }

3
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/OrderReturnMessageConsumer.java

@ -29,8 +29,7 @@ public class OrderReturnMessageConsumer extends EnhanceMessageHandler<OrderMqLoc
@Override @Override
public void onMessage(OrderMqLocalRecordMessage message) { public void onMessage(OrderMqLocalRecordMessage message) {
super.dispatchMessage(message); super.dispatchMessage(message);
OrderMqMessageRecord orderMqMessageRecord = EntityConvertUtil.copy(message, OrderMqMessageRecord.class); orderMqMessageRecordService.editMessageRecordStatus(message.getId(),message.getStatus(),null);
orderMqMessageRecordService.editMessageRecord(orderMqMessageRecord);
} }
@Override @Override

9
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/mapper/OrderMqMessageRecordMapper.java

@ -2,7 +2,9 @@ package com.bnyer.order.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.bnyer.common.core.domain.OrderMqMessageRecord; import com.bnyer.common.core.domain.OrderMqMessageRecord;
import com.bnyer.common.core.enums.EnumMessageStatus;
import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
/** /**
* @author :WXC * @author :WXC
@ -10,5 +12,10 @@ import org.apache.ibatis.annotations.Mapper;
*/ */
@Mapper @Mapper
public interface OrderMqMessageRecordMapper extends BaseMapper<OrderMqMessageRecord> { public interface OrderMqMessageRecordMapper extends BaseMapper<OrderMqMessageRecord> {
/**
* 更新状态
* @param status
* @return
*/
Integer updateStatusByStatus(@Param("id") Long id, @Param("status") EnumMessageStatus status, @Param("errMsg") String errMsg);
} }

9
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/OrderMqMessageRecordService.java

@ -1,6 +1,7 @@
package com.bnyer.order.service; package com.bnyer.order.service;
import com.bnyer.common.core.domain.OrderMqMessageRecord; import com.bnyer.common.core.domain.OrderMqMessageRecord;
import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.rocketmq.domain.BaseMessage; import com.bnyer.common.rocketmq.domain.BaseMessage;
/** /**
@ -40,9 +41,11 @@ public interface OrderMqMessageRecordService {
<T> OrderMqMessageRecord saveMessageRecord(String topic, String tag, T message); <T> OrderMqMessageRecord saveMessageRecord(String topic, String tag, T message);
/** /**
* 修改消息记录 * 修改消息记录状态
* @param orderMqMessageRecord * @param id
* @param status
* @param errMsg
*/ */
void editMessageRecord(OrderMqMessageRecord orderMqMessageRecord); void editMessageRecordStatus(Long id, EnumMessageStatus status,String errMsg);
} }

14
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/OrderMqMessageRecordServiceImpl.java

@ -65,7 +65,7 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ
log.error("消息发送失败,topic:{},tag:{},error:{}",topic,tag,throwable.getMessage()); log.error("消息发送失败,topic:{},tag:{},error:{}",topic,tag,throwable.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
orderMqMessageRecord.setErrMsg(throwable.getMessage()); orderMqMessageRecord.setErrMsg(throwable.getMessage());
editMessageRecord(orderMqMessageRecord); editMessageRecordStatus(orderMqMessageRecord.getId(),orderMqMessageRecord.getStatus(),orderMqMessageRecord.getErrMsg());
} }
}); });
} }
@ -93,8 +93,7 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ
public void onException(Throwable throwable) { public void onException(Throwable throwable) {
log.error("消息发送失败,topic:{},tag:{},error:{}",topic,tag,throwable.getMessage()); log.error("消息发送失败,topic:{},tag:{},error:{}",topic,tag,throwable.getMessage());
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿");
orderMqMessageRecord.setErrMsg(throwable.getMessage()); editMessageRecordStatus(orderMqMessageRecord.getId(),orderMqMessageRecord.getStatus(),throwable.getMessage());
editMessageRecord(orderMqMessageRecord);
} }
},TIME_OUT,delayLevel); },TIME_OUT,delayLevel);
} }
@ -120,12 +119,13 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ
} }
/** /**
* 修改消息记录 * 修改消息记录状态
* @param orderMqMessageRecord * @param id
* @param status
*/ */
@Transactional @Transactional
public void editMessageRecord(OrderMqMessageRecord orderMqMessageRecord){ public void editMessageRecordStatus(Long id,EnumMessageStatus status,String errMsg){
orderMqMessageRecordMapper.updateById(orderMqMessageRecord); orderMqMessageRecordMapper.updateStatusByStatus(id,status,errMsg);
} }
} }

8
bnyer-services/bnyer-order/src/main/resources/com/bnyer/order/mapper/OrderMqMessageRecordMapper.xml

@ -25,5 +25,13 @@
t.content, t.content,
t.create_time, t.create_time,
</sql> </sql>
<update id="updateStatusByStatus">
update order_mq_message_record
set status = #{status}
<if test="errMsg != null and errMsg != ''">
,err_msg = #{errMsg}
</if>
where id = #{id}
</update>
</mapper> </mapper>

Loading…
Cancel
Save