|
|
|
@ -101,9 +101,9 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ |
|
|
|
SendResult sendResult; |
|
|
|
try { |
|
|
|
sendResult = rocketMQTemplate.syncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), sendMessage); |
|
|
|
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),tag, JSONObject.toJSON(mqRecordMessage), JSONObject.toJSON(sendResult)); |
|
|
|
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),mqRecordMessage.getTag(), JSONObject.toJSON(mqRecordMessage), JSONObject.toJSON(sendResult)); |
|
|
|
} catch (Exception e) { |
|
|
|
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),tag, JSONObject.toJSON(mqRecordMessage), e.getMessage()); |
|
|
|
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", mqRecordMessage.getTopic(),mqRecordMessage.getTag(), JSONObject.toJSON(mqRecordMessage), e.getMessage()); |
|
|
|
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); |
|
|
|
editMessageRecordStatus(orderMqMessageRecord.getId(), orderMqMessageRecord.getStatus(),e.getMessage()); |
|
|
|
// throw new RuntimeException(e);
|
|
|
|
@ -127,11 +127,11 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ |
|
|
|
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { |
|
|
|
@Override |
|
|
|
public void onSuccess(SendResult sendResult) { |
|
|
|
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage), JSON.toJSONString(sendResult)); |
|
|
|
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage), JSON.toJSONString(sendResult)); |
|
|
|
} |
|
|
|
@Override |
|
|
|
public void onException(Throwable throwable) { |
|
|
|
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage()); |
|
|
|
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage),throwable.getMessage()); |
|
|
|
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); |
|
|
|
editMessageRecordStatus(orderMqMessageRecord.getId(),orderMqMessageRecord.getStatus(),throwable.getMessage()); |
|
|
|
} |
|
|
|
@ -155,11 +155,11 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ |
|
|
|
rocketMQTemplate.asyncSend(rocketMQEnhanceTemplate.buildDestination(topic,tag), MessageBuilder.withPayload(mqRecordMessage).build(), new SendCallback() { |
|
|
|
@Override |
|
|
|
public void onSuccess(SendResult sendResult) { |
|
|
|
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage),JSON.toJSONString(sendResult)); |
|
|
|
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage),JSON.toJSONString(sendResult)); |
|
|
|
} |
|
|
|
@Override |
|
|
|
public void onException(Throwable throwable) { |
|
|
|
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage()); |
|
|
|
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",mqRecordMessage.getTopic(),mqRecordMessage.getTag(),JSON.toJSONString(mqRecordMessage),throwable.getMessage()); |
|
|
|
log.info("消息发送失败,更新消息记录,等待定时任务进行消息补偿"); |
|
|
|
editMessageRecordStatus(orderMqMessageRecord.getId(),orderMqMessageRecord.getStatus(),throwable.getMessage()); |
|
|
|
} |
|
|
|
@ -218,7 +218,7 @@ public class OrderMqMessageRecordServiceImpl implements OrderMqMessageRecordServ |
|
|
|
message.setReturnTopic(orderMqMessageRecord.getReturnTopic()); |
|
|
|
message.setContent(msg); |
|
|
|
//少于30分钟,发送消息
|
|
|
|
rocketMQEnhanceTemplate.sendAsyncMsg(orderMqMessageRecord.getTopic(),null,message); |
|
|
|
rocketMQEnhanceTemplate.sendAsyncMsg(orderMqMessageRecord.getTopic(),orderMqMessageRecord.getTag(),message); |
|
|
|
}else{ |
|
|
|
//超过30分钟,修改状态为废弃
|
|
|
|
orderMqMessageRecord.setStatus(EnumMessageStatus.INVALID); |
|
|
|
|