|
|
@ -5,11 +5,14 @@ import com.alibaba.fastjson.JSON; |
|
|
import com.alibaba.fastjson.JSONObject; |
|
|
import com.alibaba.fastjson.JSONObject; |
|
|
import com.bnyer.common.rocketmq.config.RocketEnhanceProperties; |
|
|
import com.bnyer.common.rocketmq.config.RocketEnhanceProperties; |
|
|
import com.bnyer.common.rocketmq.constant.RocketMqConstant; |
|
|
import com.bnyer.common.rocketmq.constant.RocketMqConstant; |
|
|
import com.bnyer.common.rocketmq.domain.BaseMessage; |
|
|
import com.bnyer.common.rocketmq.domain.MqRecordMessage; |
|
|
import lombok.RequiredArgsConstructor; |
|
|
import lombok.RequiredArgsConstructor; |
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
import org.apache.rocketmq.client.producer.*; |
|
|
import org.apache.rocketmq.client.producer.SendCallback; |
|
|
|
|
|
import org.apache.rocketmq.client.producer.SendResult; |
|
|
|
|
|
import org.apache.rocketmq.client.producer.SendStatus; |
|
|
|
|
|
import org.apache.rocketmq.client.producer.TransactionSendResult; |
|
|
import org.apache.rocketmq.spring.core.RocketMQTemplate; |
|
|
import org.apache.rocketmq.spring.core.RocketMQTemplate; |
|
|
import org.apache.rocketmq.spring.support.RocketMQHeaders; |
|
|
import org.apache.rocketmq.spring.support.RocketMQHeaders; |
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
@ -78,11 +81,11 @@ public class RocketMQEnhanceTemplate { |
|
|
* @return |
|
|
* @return |
|
|
* @param <T> |
|
|
* @param <T> |
|
|
*/ |
|
|
*/ |
|
|
public <T extends BaseMessage> SendResult send(String topic, String tag, T message) { |
|
|
public <T> SendResult send(String topic, String tag, T message) { |
|
|
// 设置业务键,此处根据公共的参数进行处理
|
|
|
// 设置业务键,此处根据公共的参数进行处理
|
|
|
// 更多的其它基础业务处理...
|
|
|
// 更多的其它基础业务处理...
|
|
|
buildBaseMessage(topic,tag,message); |
|
|
MqRecordMessage mqRecordMessage = buildBaseMessage(topic, tag, message); |
|
|
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getMessageKey()).build(); |
|
|
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, mqRecordMessage.getMessageKey()).build(); |
|
|
SendResult sendResult; |
|
|
SendResult sendResult; |
|
|
try { |
|
|
try { |
|
|
sendResult = template.syncSend(buildDestination(topic,tag), sendMessage); |
|
|
sendResult = template.syncSend(buildDestination(topic,tag), sendMessage); |
|
|
@ -90,7 +93,7 @@ public class RocketMQEnhanceTemplate { |
|
|
throw new RuntimeException(e); |
|
|
throw new RuntimeException(e); |
|
|
} |
|
|
} |
|
|
// 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
|
|
|
// 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
|
|
|
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); |
|
|
log.info("topic:[{}]tag:[{}]同步消息[{}]发送结果[{}]", topic,tag, JSONObject.toJSON(mqRecordMessage), JSONObject.toJSON(sendResult)); |
|
|
return sendResult; |
|
|
return sendResult; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@ -103,16 +106,16 @@ public class RocketMQEnhanceTemplate { |
|
|
* @return |
|
|
* @return |
|
|
* @param <T> |
|
|
* @param <T> |
|
|
*/ |
|
|
*/ |
|
|
public <T extends BaseMessage> SendResult send(String topic, String tag, T message, int delayLevel) { |
|
|
public <T> SendResult send(String topic, String tag, T message, int delayLevel) { |
|
|
buildBaseMessage(topic,tag,message); |
|
|
MqRecordMessage mqRecordMessage = buildBaseMessage(topic, tag, message); |
|
|
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getMessageKey()).build(); |
|
|
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, mqRecordMessage.getMessageKey()).build(); |
|
|
SendResult sendResult; |
|
|
SendResult sendResult; |
|
|
try { |
|
|
try { |
|
|
sendResult = template.syncSend(buildDestination(topic,tag), sendMessage, RocketMqConstant.TIME_OUT, delayLevel); |
|
|
sendResult = template.syncSend(buildDestination(topic,tag), sendMessage, RocketMqConstant.TIME_OUT, delayLevel); |
|
|
} catch (Exception e) { |
|
|
} catch (Exception e) { |
|
|
throw new RuntimeException(e); |
|
|
throw new RuntimeException(e); |
|
|
} |
|
|
} |
|
|
log.info("topic:[{}]tag:[{}]延迟等级[{}]消息[{}]发送结果[{}]", topic,tag, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); |
|
|
log.info("topic:[{}]tag:[{}]延迟等级[{}]消息[{}]发送结果[{}]", topic,tag, delayLevel, JSONObject.toJSON(mqRecordMessage), JSONObject.toJSON(sendResult)); |
|
|
return sendResult; |
|
|
return sendResult; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@ -120,18 +123,18 @@ public class RocketMQEnhanceTemplate { |
|
|
* 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) |
|
|
* 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) |
|
|
* (适合对响应时间敏感的业务场景) |
|
|
* (适合对响应时间敏感的业务场景) |
|
|
*/ |
|
|
*/ |
|
|
public <T extends BaseMessage> void sendAsyncMsg(String topic, String tag, T message) { |
|
|
public <T> void sendAsyncMsg(String topic, String tag, T message) { |
|
|
buildBaseMessage(topic,tag,message); |
|
|
MqRecordMessage mqRecordMessage = buildBaseMessage(topic, tag, message); |
|
|
template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() { |
|
|
template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() { |
|
|
@Override |
|
|
@Override |
|
|
public void onSuccess(SendResult sendResult) { |
|
|
public void onSuccess(SendResult sendResult) { |
|
|
// 处理消息发送成功逻辑
|
|
|
// 处理消息发送成功逻辑
|
|
|
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message), JSON.toJSONString(sendResult)); |
|
|
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(mqRecordMessage), JSON.toJSONString(sendResult)); |
|
|
} |
|
|
} |
|
|
@Override |
|
|
@Override |
|
|
public void onException(Throwable throwable) { |
|
|
public void onException(Throwable throwable) { |
|
|
// 处理消息发送失败逻辑
|
|
|
// 处理消息发送失败逻辑
|
|
|
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); |
|
|
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage()); |
|
|
} |
|
|
} |
|
|
}); |
|
|
}); |
|
|
} |
|
|
} |
|
|
@ -140,17 +143,17 @@ public class RocketMQEnhanceTemplate { |
|
|
* 发送异步延时消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) |
|
|
* 发送异步延时消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑) |
|
|
* (适合对响应时间敏感的业务场景) |
|
|
* (适合对响应时间敏感的业务场景) |
|
|
*/ |
|
|
*/ |
|
|
public <T extends BaseMessage> void sendAsyncMsg(String topic, String tag, T message, int delayLevel) { |
|
|
public <T> void sendAsyncMsg(String topic, String tag, T message, int delayLevel) { |
|
|
buildBaseMessage(topic,tag,message); |
|
|
MqRecordMessage mqRecordMessage = buildBaseMessage(topic, tag, message); |
|
|
template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() { |
|
|
template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() { |
|
|
@Override |
|
|
@Override |
|
|
public void onSuccess(SendResult sendResult) { |
|
|
public void onSuccess(SendResult sendResult) { |
|
|
// 处理消息发送成功逻辑
|
|
|
// 处理消息发送成功逻辑
|
|
|
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(message),JSON.toJSONString(sendResult)); |
|
|
log.info("消息发送成功,topic:{},tag:{},message:{},result:{}",topic,tag,JSON.toJSONString(mqRecordMessage),JSON.toJSONString(sendResult)); |
|
|
} |
|
|
} |
|
|
@Override |
|
|
@Override |
|
|
public void onException(Throwable throwable) { |
|
|
public void onException(Throwable throwable) { |
|
|
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(message),throwable.getMessage()); |
|
|
log.error("消息发送失败,topic:{},tag:{},message:{},error:{}",topic,tag,JSON.toJSONString(mqRecordMessage),throwable.getMessage()); |
|
|
} |
|
|
} |
|
|
}, RocketMqConstant.TIME_OUT,delayLevel); |
|
|
}, RocketMqConstant.TIME_OUT,delayLevel); |
|
|
} |
|
|
} |
|
|
@ -163,10 +166,10 @@ public class RocketMQEnhanceTemplate { |
|
|
* @return 发送结果 |
|
|
* @return 发送结果 |
|
|
* @param <T> |
|
|
* @param <T> |
|
|
*/ |
|
|
*/ |
|
|
public <T extends BaseMessage> boolean sendTransactionalMsg(String topic, String tag,String arg, T message) { |
|
|
public <T> boolean sendTransactionalMsg(String topic, String tag,String arg, T message) { |
|
|
buildBaseMessage(topic,tag,message); |
|
|
MqRecordMessage mqRecordMessage = buildBaseMessage(topic, tag, message); |
|
|
String destination = buildDestination(topic, tag); |
|
|
String destination = buildDestination(topic, tag); |
|
|
TransactionSendResult sendResult = template.sendMessageInTransaction(destination, MessageBuilder.withPayload(message).build(), arg); |
|
|
TransactionSendResult sendResult = template.sendMessageInTransaction(destination, MessageBuilder.withPayload(mqRecordMessage).build(), arg); |
|
|
log.info("Send transaction msg result: " + sendResult); |
|
|
log.info("Send transaction msg result: " + sendResult); |
|
|
return sendResult.getSendStatus() == SendStatus.SEND_OK; |
|
|
return sendResult.getSendStatus() == SendStatus.SEND_OK; |
|
|
} |
|
|
} |
|
|
@ -179,13 +182,16 @@ public class RocketMQEnhanceTemplate { |
|
|
* @param message |
|
|
* @param message |
|
|
* @param <T> |
|
|
* @param <T> |
|
|
*/ |
|
|
*/ |
|
|
private <T extends BaseMessage> void buildBaseMessage(String topic,String tag,T message){ |
|
|
private <T> MqRecordMessage buildBaseMessage(String topic,String tag,T message){ |
|
|
|
|
|
MqRecordMessage mqRecordMessage = new MqRecordMessage(); |
|
|
String buildTopic = reBuildTopic(topic); |
|
|
String buildTopic = reBuildTopic(topic); |
|
|
message.setSource(applicationName); |
|
|
mqRecordMessage.setSource(applicationName); |
|
|
message.setMessageKey(applicationName + "-" + IdUtil.getSnowflakeNextIdStr()); |
|
|
mqRecordMessage.setMessageKey(applicationName + "-" + IdUtil.getSnowflakeNextIdStr()); |
|
|
message.setTopic(buildTopic); |
|
|
mqRecordMessage.setTopic(buildTopic); |
|
|
message.setTag(tag); |
|
|
mqRecordMessage.setTag(tag); |
|
|
message.setConsumerGroupName(buildTopic); |
|
|
mqRecordMessage.setConsumerGroupName(buildTopic); |
|
|
|
|
|
mqRecordMessage.setContent(JSON.toJSONString(message)); |
|
|
|
|
|
return mqRecordMessage; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
|