Browse Source

优化mq重复消费处理

feature-1.1
wuxicheng 3 years ago
parent
commit
86087442e6
  1. 25
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java
  2. 3
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/NormalRepeatStrategy.java
  3. 33
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RedisRepeatStrategy.java
  4. 4
      bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RepeatConsumerStrategy.java

25
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java

@ -5,7 +5,6 @@ import com.bnyer.common.core.enums.EnumMessageStatus;
import com.bnyer.common.core.utils.StringUtils;
import com.bnyer.common.rocketmq.config.RepeatConsumerConfig;
import com.bnyer.common.rocketmq.constant.RocketMqConstant;
import com.bnyer.common.rocketmq.domain.BaseMessage;
import com.bnyer.common.rocketmq.domain.BnyerMessage;
import com.bnyer.common.rocketmq.domain.RepeatElement;
import com.bnyer.common.rocketmq.persist.IPersist;
@ -22,7 +21,7 @@ import org.springframework.data.redis.core.StringRedisTemplate;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.function.Function;
import java.util.Objects;
/**
* @author :WXC
@ -48,6 +47,10 @@ public abstract class EnhanceMessageHandler {
private RepeatConsumerConfig repeatConsumerConfig;
private IPersist persist = null;
private RepeatElement repeatElement = null;
@Resource
private RocketMQEnhanceTemplate rocketMQEnhanceTemplate;
@ -58,6 +61,10 @@ public abstract class EnhanceMessageHandler {
public void init(){
this.applicationName = env.getProperty("spring.application.name");
this.repeatConsumerConfig = RepeatConsumerConfig.enableRedisConfig(applicationName,stringRedisTemplate);
this.persist = this.repeatConsumerConfig.getPersist();
if (Objects.isNull(persist)){
throw new RuntimeException("消息去重持久层配置初始化失败...");
}
}
/**
@ -138,10 +145,6 @@ public abstract class EnhanceMessageHandler {
sendReturnFailsMessage(message);
return;
}
IPersist persist = repeatConsumerConfig.getPersist();
RepeatElement repeatElement = new RepeatElement(repeatConsumerConfig.getApplicationName(), message.getConsumerGroupName(),message.getTopic()
, message.getTag()==null ? "" : message.getTag()
, repeatMessageKey(message));
//消费消息,末尾消费失败会删除消费记录,消费成功则更新消费状态
try {
long now = System.currentTimeMillis();
@ -208,13 +211,17 @@ public abstract class EnhanceMessageHandler {
* @return
*/
protected Boolean handleMsgRepeat(final BnyerMessage message) {
//构建重复消息对象
this.repeatElement = new RepeatElement(this.repeatConsumerConfig.getApplicationName()
, repeatMessageKey(message), message.getConsumerGroupName(),message.getTopic()
, message.getTag()==null ? "" : message.getTag());
RepeatConsumerStrategy strategy = new NormalRepeatStrategy();
Function<BaseMessage, String> repeatKeyFunction = baseMessage -> repeatMessageKey(message);
//Function<BnyerMessage, String> repeatKeyFunction = bnyerMessage -> repeatMessageKey(message);
if (repeatConsumerConfig.getRepeatStrategy() == RepeatConsumerConfig.REPEAT_STRATEGY_CONSUME_LATER) {
strategy = new RedisRepeatStrategy(repeatConsumerConfig, repeatKeyFunction);
strategy = new RedisRepeatStrategy(repeatConsumerConfig, repeatElement);
}
//调用对应的去重策略
return strategy.invoke(message);
return strategy.invoke();
}
/**

3
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/NormalRepeatStrategy.java

@ -1,7 +1,6 @@
package com.bnyer.common.rocketmq.strategy;
import com.bnyer.common.rocketmq.domain.BaseMessage;
import lombok.extern.slf4j.Slf4j;
/**
* @author :WXC
@ -12,7 +11,7 @@ import lombok.extern.slf4j.Slf4j;
public class NormalRepeatStrategy implements RepeatConsumerStrategy {
@Override
public <T extends BaseMessage> boolean invoke(T message) {
public boolean invoke() {
return false;
}

33
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RedisRepeatStrategy.java

@ -2,14 +2,11 @@ package com.bnyer.common.rocketmq.strategy;
import com.bnyer.common.rocketmq.config.RepeatConsumerConfig;
import com.bnyer.common.rocketmq.domain.BaseMessage;
import com.bnyer.common.rocketmq.domain.RepeatElement;
import com.bnyer.common.rocketmq.persist.IPersist;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.util.function.Function;
/**
* @author :WXC
@ -22,37 +19,37 @@ public class RedisRepeatStrategy implements RepeatConsumerStrategy {
private final RepeatConsumerConfig repeatConfig;
/**
* 获取去重键的函数
* 重复消息元素
*/
private final Function<BaseMessage, String> repeatMessageKeyFunction;
private final RepeatElement repeatElement;
public RedisRepeatStrategy(RepeatConsumerConfig repeatConfig, Function<BaseMessage, String> repeatMessageKeyFunction) {
public RedisRepeatStrategy(RepeatConsumerConfig repeatConfig, RepeatElement repeatElement) {
this.repeatConfig = repeatConfig;
this.repeatMessageKeyFunction = repeatMessageKeyFunction;
this.repeatElement = repeatElement;
}
//public RedisRepeatStrategy(RepeatConsumerConfig repeatConfig, Function<BnyerMessage, String> repeatMessageKeyFunction) {
// this.repeatConfig = repeatConfig;
// this.repeatMessageKeyFunction = repeatMessageKeyFunction;
//}
@Override
public <T extends BaseMessage> boolean invoke(T message) {
return doInvoke(message);
public boolean invoke() {
return doInvoke();
}
private <T extends BaseMessage> boolean doInvoke(T message) {
private boolean doInvoke() {
IPersist persist = repeatConfig.getPersist();
RepeatElement repeatElement = new RepeatElement(repeatConfig.getApplicationName(), message.getConsumerGroupName(),message.getTopic()
, message.getTag()==null ? "" : message.getTag()
, repeatMessageKeyFunction.apply(message));
//RepeatElement repeatElement = new RepeatElement(repeatConfig.getApplicationName(),repeatMessageKeyFunction.apply(message)
// , message.getConsumerGroupName(),message.getTopic()
// , message.getTag()==null ? "" : message.getTag());
boolean shouldConsume = true;
if (StringUtils.isNotBlank(repeatElement.getMessageKey())) {
shouldConsume = persist.setConsumingIfNX(repeatElement, repeatConfig.getProcessingExpireMilliSeconds());
}
//设置成功,证明没有消费过
if (shouldConsume) {
return false;
} else {
return true;
}
return !shouldConsume;
}
}

4
bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RepeatConsumerStrategy.java

@ -1,8 +1,6 @@
package com.bnyer.common.rocketmq.strategy;
import com.bnyer.common.rocketmq.domain.BaseMessage;
/**
* @author :WXC
* @Date :2023/05/20
@ -10,7 +8,7 @@ import com.bnyer.common.rocketmq.domain.BaseMessage;
*/
public interface RepeatConsumerStrategy {
<T extends BaseMessage> boolean invoke(T message);
boolean invoke();
}

Loading…
Cancel
Save