1 回答 Redis 本身是不支持延时队列的,但是我们可以利用 Redis 一些特定的数据结构和特性来实现延时队列。
基于 Redis 目前有三种方式可以实现延时队列:
利用 Redis 过期消息实现延时队列 Redis 允许我们为每一个 key 设置过期时间,在 key 过期时,Redis 可以配置为发送一个过期事件。在应用程序通过监听这个过期事件,就可以实现延迟队列了。
使用 Sorted Set 实现延时队列
Redisson 实现延迟队列
2 代码实现 2.1 利用 Redis 过期消息实现延时队列 该方法是基于 Redis key 的过期通知特性。当一个 key 过期时,如果配置了Redis 的 key 过期通知功能,Redis 会发布一个消息到特定的 Channel,应用程序可以订阅这个 Channel 来接收过期事件,进而触发相应的业务处理逻辑。
实现步骤如下:
2.1.1 配置键空间通知 在Redis 配置文件redis.conf
中增加一条配置 notify-keyspace-events
。
2.1.2 应用程序订阅过期事件 应用程序使用 Redis 的发布/订阅功能订阅特定的过期事件通道:keyevent@0:expired
。当 key 过期时,应用程序会接收到过期 key 的名称,我们就可以依据这个来执行对应的业务逻辑。
首先新建 RedisKeyExpirationEventMessageListener
继承 KeyExpirationEventMessageListener
实现它的 onMessage()
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Service @Slf4j public class RedisKeyExpirationEventMessageListener extends KeyExpirationEventMessageListener { public RedisKeyExpirationEventMessageListener (RedisMessageListenerContainer listenerContainer) { super (listenerContainer); } @Override public void onMessage (Message message, byte [] pattern) { String expiredKey = message.toString(); log.info("监听到过期key:{}" , expiredKey); } }
然后配置 Redis 监听器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Configuration public class RedisListenerConfig { @Bean RedisMessageListenerContainer container (RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer (); container.setConnectionFactory(connectionFactory); return container; } }
我们设置几个配置过期时间的 key:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @SpringBootTest class RedisDelayQueueApplicationTests { @Resource private StringRedisTemplate stringRedisTemplate; @Test void redisDelayQueueMethod1 () { stringRedisTemplate.opsForValue().set("key_01" , "value_01" , 5 , TimeUnit.SECONDS); stringRedisTemplate.opsForValue().set("key_02" , "value_02" , 20 , TimeUnit.SECONDS); stringRedisTemplate.opsForValue().set("key_03" , "value_03" , 15 , TimeUnit.SECONDS); stringRedisTemplate.opsForValue().set("key_04" , "value_04" , 10 , TimeUnit.SECONDS); } }
当这些 key 过期后,RedisKeyExpirationEventMessageListener
会收到相关通知:
至于 KeyExpirationEventMessageListener
的实现原理,其实是很简单的,直接看源码就行:
从源码中可以看出,它是监听 keyevent@*:expired
这个channel,__keyevent@*__:expired
中的*
代表监听所有的数据库。
这个方案是不是很简单?区区两步就实现了,但是 Redis 官方不推荐使用该方案,为什么?
key 的过期事件发布时机并不是当这个 key 的过期时间到了之后就发布,而是这个 key 在Redis中被清理之后,也就是真正被删除之后才会发布。但是 key 过期后并不是立刻删除的,它分为两种惰性删除和定期删除,所以这个有可能会存在延迟,不够精确,尤其是在高负载的情况下。
同时,事件也有可能会丢失,主要体现在若应用程序在接收事件通知之前就断开了连接,那么这些事件就会丢失,从而导致对应的任务无法执行。
同时,这个监听会导致所有的 key 过期后都会通知过来,如果我们要处理某一类型的 key 只能通过使用前缀来标识,非常麻烦。
2.2 使用 Sorted Set 实现延时队列 2.2.1 实现思路 redis作为一款高性能的NoSQL数据库,具备快速读写,高并发,数据持久化等特点,非常适用与实现延迟队列,redis提供了丰富的数据结构.其中利用redis的ZSET
集合 (有序集合)数据结构就可以实现一个简单的延迟队列
redis的zset数据结构中的每个元素都有一个分数score和一个值value,我们可以将任务的执行时间戳作为score,将任务数据作为value,将任务插入到zset中,每个任务有一个唯一的id(比如订单id),以及任务执行时间(比如30min),任务内容(比如订单超时支付系统自动取消)等信息体。然后另起一个线程,该线程会周期性地从zset中取出score最小(即最早要执行的)的任务,如果该任务的score小于当前时间戳,则执行任务,否则等待一段时间再次检查,直到任务可以执行,执行任务后,通过Redis的remove命令删除已经成功执行的任务即可。
2.2.2 详细步骤 本文将介绍如何使用Redis的Sorted Set数据结构来实现延迟队列,并提供一个完整的示例代码。同时,我们还将会给出对应的测试用例和测试结果。如下我先给同学们概括下,针对Spring Boot项目,如何利用Redis实现延迟队列的一些实现步骤?
引入相关依赖 (集成redis)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-data-redis</artifactId > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <version > 1.18.22</version > <scope > provided</scope > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > fastjson</artifactId > <version > 1.2.78</version > </dependency > </dependencies >
配置redis
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Spring: redis: database: 0 host: 127.0 .0 .1 port: 6379 password: jedis.pool: max-active: 20 max-wait: -1 max-idle: 10 min-idle: 0 timeout: 1000
创建redis配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Configuration public class RedisConfig { @Bean("redisTemplate") public RedisTemplate<Object, Object> redisTemplate (RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> template = new RedisTemplate <>(); template.setConnectionFactory(redisConnectionFactory); FastJsonRedisSerializer<Object> serializer = new FastJsonRedisSerializer <Object>(Object.class); template.setValueSerializer(serializer); template.setHashValueSerializer(serializer); template.setKeySerializer(new StringRedisSerializer ()); template.setHashKeySerializer(new StringRedisSerializer ()); template.setConnectionFactory(redisConnectionFactory); ParserConfig.getGlobalInstance().setAutoTypeSupport(true ); return template; } }
序列化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class FastJsonRedisSerializer <T> implements RedisSerializer <T> { public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8" ); private Class<T> clazz; public FastJsonRedisSerializer (Class<T> clazz) { super (); this .clazz = clazz; } @Override public byte [] serialize(T t) throws SerializationException { if (t == null ) { return new byte [0 ]; } return JSON.toJSONString(t, SerializerFeature.WriteClassName).getBytes(DEFAULT_CHARSET); } @Override public T deserialize (byte [] bytes) throws SerializationException { if (bytes == null || bytes.length <= 0 ) { return null ; } String str = new String (bytes, DEFAULT_CHARSET); return (T) JSON.parseObject(str, clazz); } }
创建消息类 DelayMessage
这里定义一个消息类,包含消息的id,消息内容,以及到期时间(消息的执行时间),代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Data @AllArgsConstructor @NoArgsConstructor public class DelayMessage implements Serializable { private static final long serialVersionUID = -7671756385477179547L ; private String id; private String content; private long expireTime; }
创建延迟队列类 DelayQueue
创建一个延迟队列类,提供,添加消息,删除消息,和获取消息的方法,具体代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 @Component public class DelayQueue { private static final String KEY = "delay_queue:" + getHostAddress(); @Autowired private RedisTemplate redisTemplate; public void put (DelayMessage message) { redisTemplate.opsForZSet().add(KEY, message, message.getExpireTime()); } public Long remove (DelayMessage message) { Long remove = redisTemplate.opsForZSet().remove(KEY, message); return remove; } public List<DelayMessage> getExpiredMessages () { long minScore = 0 ; long maxScore = System.currentTimeMillis(); Set<Object> messages = redisTemplate.opsForZSet().rangeByScore(KEY, minScore, maxScore); if (messages == null || messages.isEmpty()) { return Collections.emptyList(); } List<DelayMessage> result = new ArrayList <>(); for (Object message : messages) { DelayMessage delayMessage = JSONObject.parseObject(JSONObject.toJSONString(message), DelayMessage.class); result.add(delayMessage); } return result; } public static String getHostAddress () { InetAddress localHost = null ; try { localHost = InetAddress.getLocalHost(); } catch (UnknownHostException e) { e.printStackTrace(); } return localHost.getHostAddress(); } }
创建 DelayMessageHandler 消息处理类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Component @Slf4j public class DelayMessageHandler { public static SimpleDateFormat dateTimeFormat = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss" ); @Autowired private DelayQueue delayQueue; @Scheduled(fixedDelay = 1000) public void handleExpiredMessages () { String currTime = getCurrTime(); List<DelayMessage> messages = delayQueue.getExpiredMessages(); List<DelayMessage> messages_2 = delayQueue.getExpiredMessages(); log.info(currTime + " 待处理消息数量: " + messages.size()); if (!messages.isEmpty()) { for (DelayMessage message : messages) { log.info(message.getId() + "------------> 消息开始处理" ); try { Thread.sleep(3000 ); }catch (Exception e){ e.printStackTrace(); } log.info(message.getId() + "------------> 消息处理结束" ); delayQueue.remove(message); } } } public static String getCurrTime () { return dateTimeFormat.format(System.currentTimeMillis()); } }
生成测试数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Test void redisDelayQueueMethod2 () { long currentTime = System.currentTimeMillis(); System.out.println("当前时间戳:" + currentTime); delayQueue.put(new DelayMessage ("1" , "westbrook" , currentTime + 10000 )); delayQueue.put(new DelayMessage ("0" , "love" , currentTime + 10000 )); delayQueue.put(new DelayMessage ("6" , "brook" , currentTime + 10000 )); delayQueue.put(new DelayMessage ("7" , "wade" , currentTime + 10000 )); delayQueue.put(new DelayMessage ("8" , "lebron" , currentTime + 10000 )); delayQueue.put(new DelayMessage ("2" , "james" , currentTime + 20000 )); delayQueue.put(new DelayMessage ("3" , "kobe" , currentTime + 30000 )); delayQueue.put(new DelayMessage ("4" , "curry" , currentTime + 40000 )); delayQueue.put(new DelayMessage ("9" , "durant" , currentTime + 50000 )); delayQueue.put(new DelayMessage ("10" , "paul" , currentTime + 50000 )); delayQueue.put(new DelayMessage ("5" , "durant" , currentTime + 50000 )); }
执行结果 : (我们可以看到,消息正在慢慢的被消费)
此处我们会发现一个问题, @Scheduled
注解是轮询执行的,如果上一个任务没执行完毕,定时器会等待,等待上一次执行完毕也就是说,@Scheduled
注解表示同步执行的,那么就会出现一个问题,每一个消息处理都会耗时3秒,假设有 A B 两条消息,消息的过期时间是一致的,那么这两个消息会被同时从缓存中取出准备消费,假设A消息第一个开始消费,那么B消息,就要等待3秒,等A消息执行完成,才开始消费B消息,那么就会出现消息堆积,延迟消费的情况,本来14:00就要消费的消息,等到了 14:10 才开始消费(可能会更晚),如果消息量足够大的情况下,就会出现问题,内存泄漏,消息堆积,延迟消费等情况
解决办法 : 开线程去执行 (使用线程池),使用以下代码,我们消费一条消息,就需要创建一个线程去后台消费,就会解决了上面的问题,(这里需要用到线程池,我为了偷懒,就简单模拟了一下)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Scheduled(fixedDelay = 1000) public void handleExpiredMessages () { String currentTime = getCurrTime(); List<DelayMessage> messages = delayQueue.getExpiredMessages(); System.out.println(currentTime + " 待处理消息数量:" + messages.size()); if (!messages.isEmpty()) { for (DelayMessage message : messages) { new Thread (() -> { System.out.println(currentTime + " :" + message.getId() + " --> 消息开始处理" ); try { Thread.sleep(3000 ); } catch (Exception e) { e.printStackTrace(); } System.out.println(currentTime + " :" + message.getId() + " --> 消息处理结束" ); delayQueue.remove(message); }).start(); } } }
执行结果 : 开启线程异步执行消息
我们使用了开启新线程的方式来消费消息,消息延迟的问题解决了,但是又出现了新的问题,消息会出现重复消费的情况
问题的原因 : 我们第一次定时,取出了符合条件的4条过期的消息,我们开启了4个线程去执行,当第二秒,我们又获取了符合条件的消息,因为第一次获取的消息执行需要时间,那么我们第二次拿消息的时候,就会有可能把第一次的4条消息,也拿出来,然后开线程再次消费,就会出现重复消费的情况了
解决方案 :
这个问题出现原因是,当前线程不知道这个消息已经被其他线程正在处理了,只要解决这个问题,当前线程开始处理这个消息,先判断当前消息有没有被其他线程处理,如果正在处理,则不进行处理了,如果没处理,则开始进行处理
我们知道 redis删除元素的 remove() 方法,有一个返回值,表示删除的状态,我们可以在消息处理前,先 remove() 这个消息,如果 remove()成功,则表示当前消息没有被消费,如果 remove()失败,则表示该消息已经被消费了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Scheduled(fixedDelay = 1000) public void handleExpiredMessages () { String currentTime = getCurrTime(); List<DelayMessage> messages = delayQueue.getExpiredMessages(); System.out.println(currentTime + " 待处理消息数量:" + messages.size()); if (!messages.isEmpty()) { for (DelayMessage message : messages) { Long remove = delayQueue.remove(message); if (remove > 0 ) { new Thread (() -> { System.out.println(currentTime + " :" + message.getId() + " --> 消息开始处理" ); try { Thread.sleep(3000 ); } catch (Exception e) { e.printStackTrace(); } System.out.println(currentTime + " :" + message.getId() + " --> 消息处理结束" ); }).start(); } } } }
但是还会出现问题,如果服务重启,或者服务宕机,那么当前执行中的消息,在下次服务启动的时候,就会出现消息丢失的情况
我给出的解决方案就是 : 创建一张临时数据表,当消息开始消费的时候,在表中添加一条记录,当消息消费成功,则把临时表中的记录删除当服务重启,则把临时表中的记录,读到延迟队列中,就解决了消息丢失的情况
关键点
使用 缓存的key带内网ip的方式,解决了集群,多机器会出现的所有问题.
使用 后台线程,线程池,解决了消息堆积,延迟消费的问题.
使用 先删除key的方法,解决了消息重复消费的问题.
把当前处理的消息进行持久化,解决了消息丢失的问题.
2.3 Redisson 实现延迟队列 首先 zset 是一个不错的方案,但是我们自己实现起来稍微复杂了点,需要考虑的问题较多,但是如果有别人帮我们实现好了呢?Redisson 恰好提供了这个功能。
Redisson 提供的延迟队列(Delayed Queue
)是它基于 Redis 的发布/订阅机制
和 zset
实现的一种高级数据结构,用于处理需要延迟执行的任务。
其实现原理是:Redisson 将任务按照预定的执行时间存储在 zset 中,任务的执行时间作为 score,任务本身序列化后的数据作为 member。同时,Redisson 会在后台持续监控这个 ZSET,一旦发现有符合执行条件(当前时间 >= score)的任务,就会自动将这些任务转移到另一个 RQueue(Redisson 的队列实现)中,应用程序只需要从 RQueue 中取出任务执行即可
。
下面将演示下如何利用 Redisson 来实现延迟队列。这里的关键是需要确保任务生产者和消费者共享同一个 Redisson 延迟队列实例。
2.3.1 添加Redisson依赖 1 2 3 4 5 6 7 8 9 10 11 <dependency > <groupId > org.redisson</groupId > <artifactId > redisson</artifactId > <version > 3.16.3</version > </dependency > <dependency > <groupId > org.redisson</groupId > <artifactId > redisson-spring-boot-starter</artifactId > <version > 3.16.3</version > </dependency >
2.3.2 任务生产者类 TaskProducer 负责添加任务到延迟队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Slf4j public class TaskProducer implements Runnable { private final RDelayedQueue<String> delayedQueue; public TaskProducer (RDelayedQueue<String> delayedQueue) { this .delayedQueue = delayedQueue; } @Override public void run () { try { for (int i = 0 ; i < 5 ; i++) { String task = "task-" + i; delayedQueue.offer(task, 5 * i, TimeUnit.SECONDS); log.info("任务{}已放入队列,将在{}秒后执行" , task, i * 5 ); } } catch (Exception e) { e.printStackTrace(); } } }
2.3.3 创建任务消费者类 TaskConsumer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Slf4j public class TaskConsumer implements Runnable { private final RBlockingQueue<String> blockingDeque; public TaskConsumer (RBlockingQueue<String> blockingDeque) { this .blockingDeque = blockingDeque; } @Override public void run () { try { while (true ){ String task = blockingDeque.take(); log.info("消费任务:{}" ,task); } }catch (InterruptedException e){ Thread.currentThread().interrupt(); e.printStackTrace(); } } }
2.3.4 测试类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Test void redisDelayQueueMethod3 () { Config config = new Config (); config.useSingleServer().setAddress("redis://127.0.0.1:6379" ); RedissonClient redisson = Redisson.create(config); RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("delayQueue" ); RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue); Thread producerThread = new Thread (new TaskProducer (delayedQueue)); Thread consumerThread = new Thread (new TaskConsumer (blockingQueue)); producerThread.start(); consumerThread.start(); }
运行结果:
这种方案利用了 Redis 的高性能和持久性,使延迟队列的实现既高效又可靠,同时 Redisson 提供的延迟队列抽象了底层的实现细节,使用简单方便。
3 扩展 3.1 Redis 键空间通知 Redis 键空间通知是 Redis 提供的一种机制,它允许客户端(应用程序)订阅相关事件,这些事件与 Redis 中的keys 的变化有关,当某个 key 发生了变化(如键过期、键被删除、值被修改等)时,应用程序会得到通知,从而可以执行相关的业务逻辑。
由于该操作会消耗一些性能,所以,默认情况下,键空间通知是禁用的,如果要使用该功能,我们需要在 Redis 的配置文件添加配置 notify-keyspace-events *
,或者通过 CONFIG SET
命令动态启用,如下:
1 CONFIG SET notify-keyspace-events KEA
KEA
是一个参数,代表我们想要订阅的事件类型:
K
:键空间通知,事件以 __keyspace@<db>__:
为前缀。
E
:键事件通知,事件以__keyevent@<db>__:
为前缀。
A
:所有类型的通知。
除了上面这三个参数,还可以选择以下参数来订阅特定类型的事件:
g
- 通用命令(如 DEL,EXPIRE,RENAME 等)。
$
- 字符串命令。
l
- 列表命令。
s
- 集合命令。
h
- 哈希命令。
z
- 有序集合命令。
x
- 过期事件:当某个键过期并被删除时触发。
e
- 驱逐事件:当某个键因为 maxmemory 策略而被删除时触发。
代码地址:https://github.com/huang-hanson/java_integration/tree/main/redis-delayQueue 转自:https://blog.csdn.net/weixin_45683778/article/details/140359695