|
|
|
@ -30,11 +30,11 @@ public class RedisPersist implements IPersist { |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public <T extends BaseMessage> boolean setConsumingIfNX(RepeatElement repeatElement, long dedupProcessingExpireMilliSeconds) { |
|
|
|
public <T extends BaseMessage> boolean setConsumingIfNX(RepeatElement repeatElement, long processingExpireMilliSeconds) { |
|
|
|
String repeatKey = buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey()); |
|
|
|
//setnx, 成功就可以消费
|
|
|
|
Boolean execute = redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> redisConnection.set(repeatKey.getBytes(), |
|
|
|
(CONSUME_STATUS_CONSUMING).getBytes(), Expiration.milliseconds(dedupProcessingExpireMilliSeconds), RedisStringCommands.SetOption.SET_IF_ABSENT)); |
|
|
|
(CONSUME_STATUS_CONSUMING).getBytes(), Expiration.milliseconds(processingExpireMilliSeconds), RedisStringCommands.SetOption.SET_IF_ABSENT)); |
|
|
|
if (execute == null) { |
|
|
|
return false; |
|
|
|
} |
|
|
|
@ -48,9 +48,9 @@ public class RedisPersist implements IPersist { |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public <T extends BaseMessage> void markConsumed(RepeatElement repeatElement, long dedupRecordReserveMinutes) { |
|
|
|
public <T extends BaseMessage> void markConsumed(RepeatElement repeatElement, long recordReserveMinutes) { |
|
|
|
String repeatKey = buildRepeatMessageRedisKey(repeatElement.getApplicationName(), repeatElement.getTopic(), repeatElement.getTag(), repeatElement.getMessageKey()); |
|
|
|
redisTemplate.opsForValue().set(repeatKey, CONSUME_STATUS_SUCCESS, dedupRecordReserveMinutes, TimeUnit.MINUTES); |
|
|
|
redisTemplate.opsForValue().set(repeatKey, CONSUME_STATUS_SUCCESS, recordReserveMinutes, TimeUnit.MINUTES); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
|