Browse Source

mq统一改为异步发送

feature-1.1
wuxicheng 3 years ago
parent
commit
f2da6fccbe
  1. 38
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java
  2. 26
      bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java
  3. 51
      bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java

38
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/listener/vip/VipOrderPayNotifyConsumer.java

@ -4,18 +4,15 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.bnyer.common.core.domain.VipOrder;
import com.bnyer.common.core.enums.ResponseEnum;
import com.bnyer.common.core.exception.ServiceException;
import com.bnyer.common.core.utils.SpringUtils;
import com.bnyer.common.rocketmq.config.RocketMqConstant;
import com.bnyer.order.mapper.VipOrderMapper;
import com.bnyer.order.service.VipOrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@ -55,11 +52,32 @@ public class VipOrderPayNotifyConsumer implements RocketMQListener<String> {
vipOrderService.updateByToPaySuccess(vipOrder);
//发消息,添加用户会员记录
String msg = buildVipRecordMsg(vipOrder);
SendStatus sendStatus = vipRecordMqTemplate.syncSend(RocketMqConstant.VIP_RECORD_CREATE_TOPIC, new GenericMessage<>(msg)).getSendStatus();
if (!Objects.equals(sendStatus, SendStatus.SEND_OK)) {
// 消息发不出去就抛异常
throw new ServiceException(ResponseEnum.SERVER_ERROR);
}
sendMsg(RocketMqConstant.VIP_RECORD_CREATE_TOPIC, msg);
// SendStatus sendStatus = vipRecordMqTemplate.syncSend(RocketMqConstant.VIP_RECORD_CREATE_TOPIC, new GenericMessage<>(msg)).getSendStatus();
// if (!Objects.equals(sendStatus, SendStatus.SEND_OK)) {
// // 消息发不出去就抛异常
// throw new ServiceException(ResponseEnum.SERVER_ERROR);
// }
// TODO: 2023/05/17 发消息添加其他奖励
}
/**
* 发送消息
* @param topic
* @param msg
* @param <T>
*/
public <T> void sendMsg(String topic, T msg) {
vipRecordMqTemplate.asyncSend(topic, msg,new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("topic:{}消息发送成功,result:{}",topic,JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable e) {
log.info("topic:{}消息发送失败,error:{}",topic,e.getMessage());
}
});
}
/**

26
bnyer-services/bnyer-order/src/main/java/com/bnyer/order/service/impl/VipOrderServiceImpl.java

@ -27,6 +27,8 @@ import com.bnyer.common.core.enums.EnumVipOrderStatus;
import com.bnyer.order.mapper.VipOrderMapper;
import com.bnyer.order.service.VipOrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
@ -105,13 +107,23 @@ public class VipOrderServiceImpl extends ServiceImpl<VipOrderMapper, VipOrder> i
vipOrderMapper.insert(vipOrder);
String orderNo = vipOrder.getOrderNo();
//发送消息,如果三十分钟后没有支付,则取消订单
SendStatus sendStatus = orderCancelMqTemplate.syncSend(RocketMqConstant.VIP_ORDER_CANCEL_TOPIC, new GenericMessage<>(orderNo), RocketMqConstant.TIMEOUT, RocketMqConstant.CANCEL_ORDER_DELAY_LEVEL).getSendStatus();
if (!Objects.equals(sendStatus,SendStatus.SEND_OK)) {
// 消息发不出去就抛异常,发的出去无所谓
throw new ServiceException(ResponseEnum.SERVER_ERROR);
}else {
log.info("消息发送成功,topic:{}",RocketMqConstant.VIP_ORDER_CANCEL_TOPIC);
}
orderCancelMqTemplate.asyncSend(RocketMqConstant.VIP_ORDER_CANCEL_TOPIC, orderNo,new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("topic:{}消息发送成功,result:{}",RocketMqConstant.VIP_ORDER_CANCEL_TOPIC,JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable e) {
log.info("topic:{}消息发送失败,error:{}",RocketMqConstant.VIP_ORDER_CANCEL_TOPIC,e.getMessage());
}
},RocketMqConstant.TIMEOUT);
// SendStatus sendStatus = orderCancelMqTemplate.syncSend(RocketMqConstant.VIP_ORDER_CANCEL_TOPIC, new GenericMessage<>(orderNo), RocketMqConstant.TIMEOUT, RocketMqConstant.CANCEL_ORDER_DELAY_LEVEL).getSendStatus();
// if (!Objects.equals(sendStatus,SendStatus.SEND_OK)) {
// // 消息发不出去就抛异常,发的出去无所谓
// throw new ServiceException(ResponseEnum.SERVER_ERROR);
// }else {
// log.info("消息发送成功,topic:{}",RocketMqConstant.VIP_ORDER_CANCEL_TOPIC);
// }
return orderNo;
}

51
bnyer-services/bnyer-pay/src/main/java/com/bnyer/pay/service/impl/PayInfoServiceImpl.java

@ -5,7 +5,6 @@ import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.bnyer.common.core.domain.PayInfo;
import com.bnyer.common.core.enums.EnumPayStatus;
import com.bnyer.common.core.enums.EnumSceneCode;
import com.bnyer.common.core.enums.ResponseEnum;
import com.bnyer.common.core.exception.ServiceException;
@ -14,14 +13,14 @@ import com.bnyer.common.rocketmq.config.RocketMqConstant;
import com.bnyer.pay.bean.dto.AddPayInfoDto;
import com.bnyer.pay.bean.dto.EditPayInfoNotifyDto;
import com.bnyer.pay.bean.dto.EditPayInfoSingleDto;
import com.bnyer.pay.bean.vo.PayInfoDetailsVo;
import com.bnyer.pay.mapper.PayInfoMapper;
import com.bnyer.pay.service.PayInfoService;
import com.bnyer.pay.bean.vo.PayInfoDetailsVo;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -86,26 +85,36 @@ public class PayInfoServiceImpl extends ServiceImpl<PayInfoMapper, PayInfo> impl
vipRechargeMsgObj.put("orderNo",orderNo);
String vipRechargeMsgStr = JSON.toJSONString(vipRechargeMsgObj);
// TODO: 2023/04/23可优化为:添加一张消息日志表,字段:topicName,消息内容,消息状态(发送中、成功、失败),错误信息
// vipOrderPayNotifyMqTemplate.asyncSend(RocketMqConstant.VIP_ORDER_PAY_NOTIFY_TOPIC,vipRechargeMsgStr,new SendCallback() {
// @Override
// public void onSuccess(SendResult sendResult) {
// log.info("topic:{}消息发送成功",RocketMqConstant.VIP_ORDER_PAY_NOTIFY_TOPIC);
// }
// @Override
// public void onException(Throwable e) {
// log.info("topic:{}消息发送失败,error:{}",RocketMqConstant.VIP_ORDER_PAY_NOTIFY_TOPIC,e.getMessage());
// }
// });
SendStatus sendStatus = vipOrderPayNotifyMqTemplate.syncSend(RocketMqConstant.VIP_ORDER_PAY_NOTIFY_TOPIC,
new GenericMessage<>(vipRechargeMsgStr)).getSendStatus();
if (!Objects.equals(sendStatus, SendStatus.SEND_OK)) {
// 消息发不出去就抛异常,因为订单回调会有多次,几乎不可能每次都无法发送出去,发的出去无所谓
throw new ServiceException(ResponseEnum.SERVER_ERROR);
}
break;
sendMsg(RocketMqConstant.VIP_ORDER_PAY_NOTIFY_TOPIC,vipRechargeMsgStr);
// SendStatus sendStatus = vipOrderPayNotifyMqTemplate.syncSend(RocketMqConstant.VIP_ORDER_PAY_NOTIFY_TOPIC,
// new GenericMessage<>(vipRechargeMsgStr)).getSendStatus();
// if (!Objects.equals(sendStatus, SendStatus.SEND_OK)) {
// // 消息发不出去就抛异常,因为订单回调会有多次,几乎不可能每次都无法发送出去,发的出去无所谓
// throw new ServiceException(ResponseEnum.SERVER_ERROR);
// }
// break;
}
}
/**
* 发送消息
* @param topic
* @param msg
* @param <T>
*/
public <T> void sendMsg(String topic, T msg) {
vipOrderPayNotifyMqTemplate.asyncSend(topic, msg,new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("topic:{}消息发送成功,result:{}",topic,JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable e) {
log.info("topic:{}消息发送失败,error:{}",topic,e.getMessage());
}
});
}
/**
* 构建修改支付单信息实体
* @param editPayInfoNotifyDto

Loading…
Cancel
Save