|
|
|
@ -1,5 +1,6 @@ |
|
|
|
package com.bnyer.common.rocketmq.template; |
|
|
|
|
|
|
|
import cn.hutool.core.util.IdUtil; |
|
|
|
import com.alibaba.fastjson.JSON; |
|
|
|
import com.alibaba.fastjson.JSONObject; |
|
|
|
import com.bnyer.common.rocketmq.config.RocketEnhanceProperties; |
|
|
|
@ -11,9 +12,11 @@ import org.apache.rocketmq.client.producer.*; |
|
|
|
import org.apache.rocketmq.spring.core.RocketMQTemplate; |
|
|
|
import org.apache.rocketmq.spring.support.RocketMQHeaders; |
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.core.env.Environment; |
|
|
|
import org.springframework.messaging.Message; |
|
|
|
import org.springframework.messaging.support.MessageBuilder; |
|
|
|
|
|
|
|
import javax.annotation.PostConstruct; |
|
|
|
import javax.annotation.Resource; |
|
|
|
|
|
|
|
/** |
|
|
|
@ -30,6 +33,16 @@ public class RocketMQEnhanceTemplate { |
|
|
|
@Resource |
|
|
|
private RocketEnhanceProperties rocketEnhanceProperties; |
|
|
|
|
|
|
|
@Resource |
|
|
|
private Environment env; |
|
|
|
|
|
|
|
private String applicationName; |
|
|
|
|
|
|
|
@PostConstruct |
|
|
|
public void init(){ |
|
|
|
this.applicationName = env.getProperty("spring.application.name"); |
|
|
|
} |
|
|
|
|
|
|
|
public RocketMQTemplate getTemplate() { |
|
|
|
return template; |
|
|
|
} |
|
|
|
@ -67,6 +80,7 @@ public class RocketMQEnhanceTemplate { |
|
|
|
public <T extends BaseMessage> SendResult send(String topic, String tag, T message) { |
|
|
|
// 设置业务键,此处根据公共的参数进行处理
|
|
|
|
// 更多的其它基础业务处理...
|
|
|
|
buildBaseMessage(topic,tag,message); |
|
|
|
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getMessageKey()).build(); |
|
|
|
SendResult sendResult; |
|
|
|
try { |
|
|
|
@ -89,6 +103,7 @@ public class RocketMQEnhanceTemplate { |
|
|
|
* @param <T> |
|
|
|
*/ |
|
|
|
public <T extends BaseMessage> SendResult send(String topic, String tag, T message, int delayLevel) { |
|
|
|
buildBaseMessage(topic,tag,message); |
|
|
|
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getMessageKey()).build(); |
|
|
|
SendResult sendResult; |
|
|
|
try { |
|
|
|
@ -105,6 +120,7 @@ public class RocketMQEnhanceTemplate { |
|
|
|
* (适合对响应时间敏感的业务场景) |
|
|
|
*/ |
|
|
|
public <T extends BaseMessage> void sendAsyncMsg(String topic, String tag, T message) { |
|
|
|
buildBaseMessage(topic,tag,message); |
|
|
|
template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() { |
|
|
|
@Override |
|
|
|
public void onSuccess(SendResult sendResult) { |
|
|
|
@ -124,6 +140,7 @@ public class RocketMQEnhanceTemplate { |
|
|
|
* (适合对响应时间敏感的业务场景) |
|
|
|
*/ |
|
|
|
public <T extends BaseMessage> void sendAsyncMsg(String topic, String tag, T message, int delayLevel) { |
|
|
|
buildBaseMessage(topic,tag,message); |
|
|
|
template.asyncSend(buildDestination(topic,tag), MessageBuilder.withPayload(message).build(), new SendCallback() { |
|
|
|
@Override |
|
|
|
public void onSuccess(SendResult sendResult) { |
|
|
|
@ -153,5 +170,20 @@ public class RocketMQEnhanceTemplate { |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
* 构建消息体基本信息 |
|
|
|
* @param topic |
|
|
|
* @param tag |
|
|
|
* @param message |
|
|
|
* @param <T> |
|
|
|
*/ |
|
|
|
private<T extends BaseMessage> void buildBaseMessage(String topic,String tag,T message){ |
|
|
|
String buildTopic = reBuildTopic(topic); |
|
|
|
message.setSource(applicationName); |
|
|
|
message.setMessageKey(applicationName + "-" + IdUtil.getSnowflakeNextIdStr()); |
|
|
|
message.setTopic(buildTopic); |
|
|
|
message.setTag(tag); |
|
|
|
message.setConsumerGroupName(buildTopic); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|