diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java index 8828869..704b733 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/handle/EnhanceMessageHandler.java @@ -5,7 +5,6 @@ import com.bnyer.common.core.enums.EnumMessageStatus; import com.bnyer.common.core.utils.StringUtils; import com.bnyer.common.rocketmq.config.RepeatConsumerConfig; import com.bnyer.common.rocketmq.constant.RocketMqConstant; -import com.bnyer.common.rocketmq.domain.BaseMessage; import com.bnyer.common.rocketmq.domain.BnyerMessage; import com.bnyer.common.rocketmq.domain.RepeatElement; import com.bnyer.common.rocketmq.persist.IPersist; @@ -22,7 +21,7 @@ import org.springframework.data.redis.core.StringRedisTemplate; import javax.annotation.PostConstruct; import javax.annotation.Resource; -import java.util.function.Function; +import java.util.Objects; /** * @author :WXC @@ -48,6 +47,10 @@ public abstract class EnhanceMessageHandler { private RepeatConsumerConfig repeatConsumerConfig; + private IPersist persist = null; + + private RepeatElement repeatElement = null; + @Resource private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; @@ -58,6 +61,10 @@ public abstract class EnhanceMessageHandler { public void init(){ this.applicationName = env.getProperty("spring.application.name"); this.repeatConsumerConfig = RepeatConsumerConfig.enableRedisConfig(applicationName,stringRedisTemplate); + this.persist = this.repeatConsumerConfig.getPersist(); + if (Objects.isNull(persist)){ + throw new RuntimeException("消息去重持久层配置初始化失败..."); + } } /** @@ -138,10 +145,6 @@ public abstract class EnhanceMessageHandler { sendReturnFailsMessage(message); return; } - IPersist persist = repeatConsumerConfig.getPersist(); - RepeatElement repeatElement = new RepeatElement(repeatConsumerConfig.getApplicationName(), message.getConsumerGroupName(),message.getTopic() - , message.getTag()==null ? "" : message.getTag() - , repeatMessageKey(message)); //消费消息,末尾消费失败会删除消费记录,消费成功则更新消费状态 try { long now = System.currentTimeMillis(); @@ -208,13 +211,17 @@ public abstract class EnhanceMessageHandler { * @return */ protected Boolean handleMsgRepeat(final BnyerMessage message) { + //构建重复消息对象 + this.repeatElement = new RepeatElement(this.repeatConsumerConfig.getApplicationName() + , repeatMessageKey(message), message.getConsumerGroupName(),message.getTopic() + , message.getTag()==null ? "" : message.getTag()); RepeatConsumerStrategy strategy = new NormalRepeatStrategy(); - Function repeatKeyFunction = baseMessage -> repeatMessageKey(message); + //Function repeatKeyFunction = bnyerMessage -> repeatMessageKey(message); if (repeatConsumerConfig.getRepeatStrategy() == RepeatConsumerConfig.REPEAT_STRATEGY_CONSUME_LATER) { - strategy = new RedisRepeatStrategy(repeatConsumerConfig, repeatKeyFunction); + strategy = new RedisRepeatStrategy(repeatConsumerConfig, repeatElement); } //调用对应的去重策略 - return strategy.invoke(message); + return strategy.invoke(); } /** diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/NormalRepeatStrategy.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/NormalRepeatStrategy.java index 9cd49cc..2f86415 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/NormalRepeatStrategy.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/NormalRepeatStrategy.java @@ -1,7 +1,6 @@ package com.bnyer.common.rocketmq.strategy; -import com.bnyer.common.rocketmq.domain.BaseMessage; import lombok.extern.slf4j.Slf4j; /** * @author :WXC @@ -12,8 +11,8 @@ import lombok.extern.slf4j.Slf4j; public class NormalRepeatStrategy implements RepeatConsumerStrategy { @Override - public boolean invoke(T message) { + public boolean invoke() { return false; } -} \ No newline at end of file +} diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RedisRepeatStrategy.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RedisRepeatStrategy.java index 4cdd394..e754913 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RedisRepeatStrategy.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RedisRepeatStrategy.java @@ -2,14 +2,11 @@ package com.bnyer.common.rocketmq.strategy; import com.bnyer.common.rocketmq.config.RepeatConsumerConfig; -import com.bnyer.common.rocketmq.domain.BaseMessage; import com.bnyer.common.rocketmq.domain.RepeatElement; import com.bnyer.common.rocketmq.persist.IPersist; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import java.util.function.Function; - /** * @author :WXC @@ -22,37 +19,37 @@ public class RedisRepeatStrategy implements RepeatConsumerStrategy { private final RepeatConsumerConfig repeatConfig; /** - * 获取去重键的函数 + * 重复消息元素 */ - private final Function repeatMessageKeyFunction; + private final RepeatElement repeatElement; - public RedisRepeatStrategy(RepeatConsumerConfig repeatConfig, Function repeatMessageKeyFunction) { + public RedisRepeatStrategy(RepeatConsumerConfig repeatConfig, RepeatElement repeatElement) { this.repeatConfig = repeatConfig; - this.repeatMessageKeyFunction = repeatMessageKeyFunction; + this.repeatElement = repeatElement; } + //public RedisRepeatStrategy(RepeatConsumerConfig repeatConfig, Function repeatMessageKeyFunction) { + // this.repeatConfig = repeatConfig; + // this.repeatMessageKeyFunction = repeatMessageKeyFunction; + //} @Override - public boolean invoke(T message) { - return doInvoke(message); + public boolean invoke() { + return doInvoke(); } - private boolean doInvoke(T message) { + private boolean doInvoke() { IPersist persist = repeatConfig.getPersist(); - RepeatElement repeatElement = new RepeatElement(repeatConfig.getApplicationName(), message.getConsumerGroupName(),message.getTopic() - , message.getTag()==null ? "" : message.getTag() - , repeatMessageKeyFunction.apply(message)); + //RepeatElement repeatElement = new RepeatElement(repeatConfig.getApplicationName(),repeatMessageKeyFunction.apply(message) + // , message.getConsumerGroupName(),message.getTopic() + // , message.getTag()==null ? "" : message.getTag()); boolean shouldConsume = true; if (StringUtils.isNotBlank(repeatElement.getMessageKey())) { shouldConsume = persist.setConsumingIfNX(repeatElement, repeatConfig.getProcessingExpireMilliSeconds()); } //设置成功,证明没有消费过 - if (shouldConsume) { - return false; - } else { - return true; - } + return !shouldConsume; } } diff --git a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RepeatConsumerStrategy.java b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RepeatConsumerStrategy.java index 39506b3..ff95817 100644 --- a/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RepeatConsumerStrategy.java +++ b/bnyer-common/bnyer-common-rocketmq/src/main/java/com/bnyer/common/rocketmq/strategy/RepeatConsumerStrategy.java @@ -1,8 +1,6 @@ package com.bnyer.common.rocketmq.strategy; -import com.bnyer.common.rocketmq.domain.BaseMessage; - /** * @author :WXC * @Date :2023/05/20 @@ -10,7 +8,7 @@ import com.bnyer.common.rocketmq.domain.BaseMessage; */ public interface RepeatConsumerStrategy { - boolean invoke(T message); + boolean invoke(); }