|
|
|
@ -1,10 +1,14 @@ |
|
|
|
package com.bnyer.order.service.impl; |
|
|
|
|
|
|
|
import cn.hutool.core.collection.CollectionUtil; |
|
|
|
import cn.hutool.core.util.IdUtil; |
|
|
|
import com.alibaba.fastjson.JSON; |
|
|
|
import com.alibaba.fastjson.JSONObject; |
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; |
|
|
|
import com.bnyer.common.core.domain.ImgMqMessageRecord; |
|
|
|
import com.bnyer.common.core.domain.OrderMqMessageRecord; |
|
|
|
import com.bnyer.common.core.enums.EnumMessageStatus; |
|
|
|
import com.bnyer.common.core.utils.DateUtils; |
|
|
|
import com.bnyer.common.rocketmq.constant.RocketMqConstant; |
|
|
|
import com.bnyer.common.rocketmq.constant.RocketMqTopic; |
|
|
|
import com.bnyer.common.rocketmq.domain.MqRecordMessage; |
|
|
|
@ -26,6 +30,7 @@ import org.springframework.transaction.annotation.Transactional; |
|
|
|
import javax.annotation.PostConstruct; |
|
|
|
import javax.annotation.Resource; |
|
|
|
import java.util.Date; |
|
|
|
import java.util.List; |
|
|
|
|
|
|
|
/** |
|
|
|
* @author :WXC |
|
|
|
@ -41,12 +46,12 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ |
|
|
|
|
|
|
|
@Resource |
|
|
|
private OrderMqMessageRecordMapper orderMqMessageRecordMapper; |
|
|
|
|
|
|
|
|
|
|
|
@Resource |
|
|
|
private Environment env; |
|
|
|
|
|
|
|
|
|
|
|
private String applicationName; |
|
|
|
|
|
|
|
|
|
|
|
@PostConstruct |
|
|
|
public void init(){ |
|
|
|
this.applicationName = env.getProperty("spring.application.name"); |
|
|
|
@ -191,4 +196,33 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ |
|
|
|
orderMqMessageRecordMapper.updateStatusByStatus(id,status,errMsg); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void orderMessageCompensation() { |
|
|
|
log.info("==============order服务消费补偿任务开始!==============="); |
|
|
|
long startTime = System.currentTimeMillis(); |
|
|
|
//获取全表状态为process处理中的数据
|
|
|
|
LambdaQueryWrapper<OrderMqMessageRecord> wrapper = new LambdaQueryWrapper<>(); |
|
|
|
wrapper.eq(OrderMqMessageRecord::getStatus, EnumMessageStatus.PROCESS); |
|
|
|
List<OrderMqMessageRecord> orderMqMessageRecords = orderMqMessageRecordMapper.selectList(wrapper); |
|
|
|
Date now = new Date(); |
|
|
|
System.out.println(now); |
|
|
|
if(CollectionUtil.isNotEmpty(orderMqMessageRecords)){ |
|
|
|
for (OrderMqMessageRecord orderMqMessageRecord : orderMqMessageRecords) { |
|
|
|
//判断消息是否超过30分钟,超过则改状态为INVALID废弃,否则调用发送消息方法
|
|
|
|
if(DateUtils.isLessThan30Min(orderMqMessageRecord.getCreateTime())){ |
|
|
|
String content = orderMqMessageRecord.getContent(); |
|
|
|
JSONObject jsonObject = JSON.parseObject(content); |
|
|
|
String msg = jsonObject.getString("content"); |
|
|
|
//少于30分钟,发送消息
|
|
|
|
sendAsyncMsg(orderMqMessageRecord.getTopic(),null,msg); |
|
|
|
}else{ |
|
|
|
//超过30分钟,修改状态为废弃
|
|
|
|
orderMqMessageRecord.setStatus(EnumMessageStatus.INVALID); |
|
|
|
orderMqMessageRecordMapper.updateById(orderMqMessageRecord); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
log.info("==============order服务消费补偿任务完成,耗时【{}】毫秒!===============",System.currentTimeMillis() - startTime); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|