怎么使用Redis实现一个延时队列?

目录
  1. 1. 1 回答
  2. 2. 2 代码实现
    1. 2.1. 2.1 利用 Redis 过期消息实现延时队列
      1. 2.1.1. 2.1.1 配置键空间通知
      2. 2.1.2. 2.1.2 应用程序订阅过期事件
    2. 2.2. 2.2 使用 Sorted Set 实现延时队列
      1. 2.2.1. 2.2.1 实现思路
      2. 2.2.2. 2.2.2 详细步骤
    3. 2.3. 2.3 Redisson 实现延迟队列
      1. 2.3.1. 2.3.1 添加Redisson依赖
      2. 2.3.2. 2.3.2 任务生产者类 TaskProducer
      3. 2.3.3. 2.3.3 创建任务消费者类 TaskConsumer
      4. 2.3.4. 2.3.4 测试类
  3. 3. 3 扩展
    1. 3.1. 3.1 Redis 键空间通知

1 回答

Redis 本身是不支持延时队列的,但是我们可以利用 Redis 一些特定的数据结构和特性来实现延时队列。

基于 Redis 目前有三种方式可以实现延时队列:

  1. 利用 Redis 过期消息实现延时队列
    Redis 允许我们为每一个 key 设置过期时间,在 key 过期时,Redis 可以配置为发送一个过期事件。在应用程序通过监听这个过期事件,就可以实现延迟队列了。

  2. 使用 Sorted Set 实现延时队列

  3. Redisson 实现延迟队列

2 代码实现

2.1 利用 Redis 过期消息实现延时队列

该方法是基于 Redis key 的过期通知特性。当一个 key 过期时,如果配置了Redis 的 key 过期通知功能,Redis 会发布一个消息到特定的 Channel,应用程序可以订阅这个 Channel 来接收过期事件,进而触发相应的业务处理逻辑。

实现步骤如下:

2.1.1 配置键空间通知

在Redis 配置文件redis.conf 中增加一条配置 notify-keyspace-events

2.1.2 应用程序订阅过期事件

应用程序使用 Redis 的发布/订阅功能订阅特定的过期事件通道:keyevent@0:expired。当 key 过期时,应用程序会接收到过期 key 的名称,我们就可以依据这个来执行对应的业务逻辑。

首先新建 RedisKeyExpirationEventMessageListener 继承 KeyExpirationEventMessageListener 实现它的 onMessage()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Service
@Slf4j
public class RedisKeyExpirationEventMessageListener extends KeyExpirationEventMessageListener {

public RedisKeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}

@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
log.info("监听到过期key:{}", expiredKey);
}
}

然后配置 Redis 监听器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration
public class RedisListenerConfig {

/**
* redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
*
* @param connectionFactory
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}

我们设置几个配置过期时间的 key:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SpringBootTest
class RedisDelayQueueApplicationTests {

@Resource
private StringRedisTemplate stringRedisTemplate;

// 通过监听过期时间
@Test
void redisDelayQueueMethod1() {
stringRedisTemplate.opsForValue().set("key_01", "value_01", 5, TimeUnit.SECONDS);
stringRedisTemplate.opsForValue().set("key_02", "value_02", 20, TimeUnit.SECONDS);
stringRedisTemplate.opsForValue().set("key_03", "value_03", 15, TimeUnit.SECONDS);
stringRedisTemplate.opsForValue().set("key_04", "value_04", 10, TimeUnit.SECONDS);
}
}

当这些 key 过期后,RedisKeyExpirationEventMessageListener 会收到相关通知:

至于 KeyExpirationEventMessageListener 的实现原理,其实是很简单的,直接看源码就行:

从源码中可以看出,它是监听 keyevent@*:expired 这个channel, __keyevent@*__:expired中的*代表监听所有的数据库。

这个方案是不是很简单?区区两步就实现了,但是 Redis 官方不推荐使用该方案,为什么?

key 的过期事件发布时机并不是当这个 key 的过期时间到了之后就发布,而是这个 key 在Redis中被清理之后,也就是真正被删除之后才会发布。但是 key 过期后并不是立刻删除的,它分为两种惰性删除和定期删除,所以这个有可能会存在延迟,不够精确,尤其是在高负载的情况下。

同时,事件也有可能会丢失,主要体现在若应用程序在接收事件通知之前就断开了连接,那么这些事件就会丢失,从而导致对应的任务无法执行。

同时,这个监听会导致所有的 key 过期后都会通知过来,如果我们要处理某一类型的 key 只能通过使用前缀来标识,非常麻烦。

2.2 使用 Sorted Set 实现延时队列

2.2.1 实现思路

redis作为一款高性能的NoSQL数据库,具备快熟读写,高并发,数据持久化等特点,非常适用与实现延迟队列 ,redis提供了丰富的数据结构.其中利用redis的ZSET集合 (有序集合)数据结构就可以实现一个简单的延迟队列

redis的zset数据结构中的每个元素都有一个分数score和一个值value,我们可以将任务的执行时间戳作为score,将任务数据作为value,将任务插入到zset中,每个任务有一个唯一的id(比如订单id),以及任务执行时间(比如30min),任务内容(比如订单超时支付系统自动取消)等信息体。然后另起一个线程,该线程会周期性地从zset中取出score最小(即最早要执行的)的任务,如果该任务的score小于当前时间戳,则执行任务,否则等待一段时间再次检查,直到任务可以执行,执行任务后,通过Redis的remove命令删除已经成功执行的任务即可。

2.2.2 详细步骤

本文将介绍如何使用Redis的Sorted Set数据结构来实现延迟队列,并提供一个完整的示例代码。同时,我们还将会给出对应的测试用例和测试结果。如下我先给同学们概括下,针对Spring Boot项目,如何利用Redis实现延迟队列的一些实现步骤?

  1. 引入相关依赖 (集成redis)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<dependencies>
<!--web依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>


<!--测试类-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<!--redis依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version> <!-- 版本号可能会有更新,请选择最新稳定版本 -->
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version> <!-- 版本号可能会有更新,请选择最新稳定版本 -->
</dependency>
</dependencies>
  1. 配置redis
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#redis配置
Spring:
redis:
database: 0 #Redis数据库索引(默认为0)
host: 127.0.0.1 #redis服务器ip,由于我是搭建在本地,固指向本地ip
port: 6379 #redis服务器连接端口
password: #redis服务器连接密码(默认为空)
# 连接池配置
jedis.pool:
max-active: 20 #连接池最大连接数(使用负值表示没有限制)
max-wait: -1 #连接池最大阻塞等待时间(使用负值表示没有限制)
max-idle: 10 #连接池中的最大空闲连接
min-idle: 0 #连接池中的最小空闲连接
timeout: 1000 #连接超时时间(毫秒)。我设置的是1秒
  1. 创建redis配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
public class RedisConfig {

/**
* RedisTemplate配置
*/
@Bean("redisTemplate")
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
// 使用fastjson进行序列化处理,提高解析效率
FastJsonRedisSerializer<Object> serializer = new FastJsonRedisSerializer<Object>(Object.class);
// value值的序列化采用fastJsonRedisSerializer
template.setValueSerializer(serializer);
template.setHashValueSerializer(serializer);
// key的序列化采用StringRedisSerializer
template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setConnectionFactory(redisConnectionFactory);
// 使用fastjson时需设置此项,否则会报异常not support type
ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
return template;
}
}
  1. 序列化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**    
* @Description:使用fastjson实现redis的序列化
*/
public class FastJsonRedisSerializer<T> implements RedisSerializer<T> {

public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");

private Class<T> clazz;

public FastJsonRedisSerializer(Class<T> clazz) {
super();
this.clazz = clazz;
}

@Override
public byte[] serialize(T t) throws SerializationException {
if (t == null) {
return new byte[0];
}
return JSON.toJSONString(t, SerializerFeature.WriteClassName).getBytes(DEFAULT_CHARSET);
}

@Override
public T deserialize(byte[] bytes) throws SerializationException {
if (bytes == null || bytes.length <= 0) {
return null;
}
String str = new String(bytes, DEFAULT_CHARSET);
return (T) JSON.parseObject(str, clazz);
}
}
  1. 创建消息类 DelayMessage
  • 这里定义一个消息类 , 包含消息的id,消息内容,以及到期时间(消息的执行时间) , 代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DelayMessage implements Serializable {

/**
* 切记实例化
*/
private static final long serialVersionUID = -7671756385477179547L;

/**
* 消息 id
*/
private String id;

/**
* 消息内容
*/
private String content;

/**
* 消息到期时间(指定当前消息在什么时间开始消费(时间戳))
*/
private long expireTime;
}
  1. 创建延迟队列类 DelayQueue
  • 创建一个延迟队列类 , 提供,添加消息,删除消息,和获取消息的方法 , 具体代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Component
public class DelayQueue {

private static final String KEY = "delay_queue:" + getHostAddress();

@Autowired
private RedisTemplate redisTemplate;

/**
* 添加消息到延时队列中
*/
public void put(DelayMessage message) {
redisTemplate.opsForZSet().add(KEY, message, message.getExpireTime());
}

/**
* 从延时队列中删除消息
*/
public Long remove(DelayMessage message) {
Long remove = redisTemplate.opsForZSet().remove(KEY, message);
return remove;
}

/**
* 获取延时队列中已到期的消息
*/
public List<DelayMessage> getExpiredMessages() {
// 1. 获取到开始时间
long minScore = 0;
// 2. 获取到结束时间
long maxScore = System.currentTimeMillis();
// 3. 获取到指定范围区间的数据列表
Set<Object> messages = redisTemplate.opsForZSet().rangeByScore(KEY, minScore, maxScore);
if (messages == null || messages.isEmpty()) {
return Collections.emptyList();
}

// 4. 把对像进行封装,返回
List<DelayMessage> result = new ArrayList<>();
for (Object message : messages) {
DelayMessage delayMessage = JSONObject.parseObject(JSONObject.toJSONString(message), DelayMessage.class);
result.add(delayMessage);
}
return result;
}

/**
* 获取地址(服务器的内网地址)(内网ip)
*
* @return
*/
public static String getHostAddress() {
InetAddress localHost = null;
try {
localHost = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
e.printStackTrace();
}
return localHost.getHostAddress();
}
}
  1. 创建 DelayMessageHandler 消息处理类
  • 创建一个消息处理累, 添加一个处理过期的消息,写个定时任务,间隔1s轮询延时队列中已到期的任务,如果获取不到为空, 则不进行消息处理的逻辑 , 反之继续轮询

  • 注意:需要在主启动类上加上@EnableScheduling注解开启@Scheduled定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Component
@Slf4j
public class DelayMessageHandler {

public static SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

@Autowired
private DelayQueue delayQueue;

/**
* 处理已到期的消息(轮询)
*/
@Scheduled(fixedDelay = 1000)
public void handleExpiredMessages() {
String currTime = getCurrTime();
// 1. 扫描任务,并将需要执行的任务加入到任务队列当中
List<DelayMessage> messages = delayQueue.getExpiredMessages();
List<DelayMessage> messages_2 = delayQueue.getExpiredMessages();
log.info(currTime + " 待处理消息数量: " + messages.size());
// 2. 开始处理消息
if (!messages.isEmpty()) {
for (DelayMessage message : messages) {
log.info(message.getId() + "------------> 消息开始处理");
try {
// 2.1.1: 模拟睡眠3秒,任务的处理时间(实际可能会更长)
Thread.sleep(3000);
}catch (Exception e){
e.printStackTrace();
}
log.info(message.getId() + "------------> 消息处理结束");
// 2.2 处理完消息,删除消息
delayQueue.remove(message);
}
}
}

/**
* 获取到的当前时分秒
*
* @return
*/
public static String getCurrTime() {
return dateTimeFormat.format(System.currentTimeMillis());
}
}
  1. 生成测试数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
void redisDelayQueueMethod2() {
// 获取当前时间戳
long currentTime = System.currentTimeMillis();
System.out.println("当前时间戳:" + currentTime);
delayQueue.put(new DelayMessage("1", "westbrook", currentTime + 10000));
delayQueue.put(new DelayMessage("0", "love", currentTime + 10000));
delayQueue.put(new DelayMessage("6", "brook", currentTime + 10000));
delayQueue.put(new DelayMessage("7", "wade", currentTime + 10000));
delayQueue.put(new DelayMessage("8", "lebron", currentTime + 10000));
delayQueue.put(new DelayMessage("2", "james", currentTime + 20000));
delayQueue.put(new DelayMessage("3", "kobe", currentTime + 30000));
delayQueue.put(new DelayMessage("4", "curry", currentTime + 40000));
delayQueue.put(new DelayMessage("9", "durant", currentTime + 50000));
delayQueue.put(new DelayMessage("10", "paul", currentTime + 50000));
delayQueue.put(new DelayMessage("5", "durant", currentTime + 50000));
}

执行结果 : (我们可以看到 , 消息正在慢慢的被消费)

此处我们会发现一个问题 , @Scheduled 注解是轮询执行的 , 如果上一个任务没执行完毕 , 定时器会等待 , 等待上一次执行完毕也就是说 ,@Scheduled注解表示同步执行的 , 那么就会出现一个问题 , 每一个消息处理都会耗时3秒,假设有 A B 两条消息 , 消息的过期时间是一致的 , 那么这两个消息会被同时从缓存中取出准备消费 ,假设A消息第一个开始消费 ,那么B消息,就要等待3秒 , 等A消息执行完成,才开始消费B消息 , 那么就会出现消息堆积,延迟消费的情况 , 本来14:00就要消费的消息,等到了 14:10 才开始消费(可能会更晚) ,如果消息量足够大的情况下 , 就会出现问题 , 内存泄漏 , 消息堆积 , 延迟消费等情况

解决办法 : 开线程去执行 (使用线程池) , 使用以下代码 , 我们消费一条消息,就需要创建一个线程去后台消费 , 就会解决了上面的问题 ,(这里需要用到线程池,我为了偷懒 ,就简单模拟了一下)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Scheduled(fixedDelay = 1000)
public void handleExpiredMessages() {
String currentTime = getCurrTime();
// 1 : 扫描任务,并将需要执行的任务加入到任务队列中
List<DelayMessage> messages = delayQueue.getExpiredMessages();
System.out.println(currentTime + " 待处理消息数量:" + messages.size());
// 2 : 开始处理消息
if (!messages.isEmpty()) {
for (DelayMessage message : messages) {
// 2.1 : 开启线程异步处理消息:不让处理消息的时间阻塞当前线程
new Thread(() -> {
System.out.println(currentTime + " :" + message.getId() + " --> 消息开始处理");
try {
// 2.1.1 : 模拟睡眠3秒,任务的处理时间(实际可能会更长)
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(currentTime + " :" + message.getId() + " --> 消息处理结束");
// 2.2 : 处理完消息,删除消息
delayQueue.remove(message);
}).start();
}
}
}

执行结果 : 开启线程异步执行消息

我们使用了开启新线程的方式来消费消息 , 消息延迟的问题解决了 , 但是又出现了新的问题 , 消息会出现重复消费的情况

问题的原因 : 我们第一次定时 , 取出了符合条件的4条过期的消息 , 我们开启了4个线程去执行 , 当第二秒 , 我们又获取了符合条件的消息 ,因为第一次获取的消息执行需要时间 , 那么我们第二次拿消息的时候 , 就会有可能把第一次的4条消息 , 也拿出来 , 然后开线程再次消费 , 就会出现重复消费的情况了

解决方案 :

这个问题出现原因是 , 当前线程不知道这个消息已经被其他线程正在处理了 ,只要解决这个问题 ,当前线程开始处理这个消息,先判断当前消息有没有被其他线程处理 , 如果正在处理,则不进行处理了 , 如果没处理,则开始进行处理

我们知道 redis删除元素的 remove() 方法 , 有一个返回值 , 表示删除的状态 ,我们可以在消息处理前 , 先 remove() 这个消息 , 如果 remove()成功,则表示当前消息没有被消费 , 如果 remove()失败,则表示该消息已经被消费了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Scheduled(fixedDelay = 1000)
public void handleExpiredMessages() {
String currentTime = getCurrTime();
// 1 : 扫描任务,并将需要执行的任务加入到任务队列中
List<DelayMessage> messages = delayQueue.getExpiredMessages();
System.out.println(currentTime + " 待处理消息数量:" + messages.size());
// 2 : 开始处理消息
if (!messages.isEmpty()) {
for (DelayMessage message : messages) {
// 2.1 : 处理消息:先删除消息,获取当前消息是否已经被其他人消费
Long remove = delayQueue.remove(message);
if (remove > 0) {
// 2.2 : 开启线程异步处理消息:不让处理消息的时间阻塞当前线程
new Thread(() -> {
System.out.println(currentTime + " :" + message.getId() + " --> 消息开始处理");
try {
// 2.1.1 : 模拟睡眠3秒,任务的处理时间(实际可能会更长)
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(currentTime + " :" + message.getId() + " --> 消息处理结束");
}).start();
}
}
}
}
  • 执行结果 : 我们会发现 , 重复消费的问题 , 解决了

但是还会出现问题 , 如果服务重启 , 或者服务宕机 , 那么当前执行中的消息 , 在下次服务启动的时候 , 就会出现消息丢失的情况

我给出的解决方案就是 : 创建一张临时数据表 , 当消息开始消费的时候 ,在表中添加一条记录,当消息消费成功,则把临时表中的记录删除当服务重启 , 则把临时表中的记录,读到延迟队列中 , 就解决了消息丢失的情况

关键点

  1. 使用 缓存的key带内网ip的方式,解决了集群,多机器会出现的所有问题.
  2. 使用 后台线程,线程池,解决了消息堆积,延迟消费的问题.
  3. 使用 先删除key的方法 , 解决了消息重复消费的问题.
  4. 把当前处理的消息进行持久化,解决了消息丢失的问题.

2.3 Redisson 实现延迟队列

首先 zset 是一个不错的方案,但是我们自己实现起来稍微复杂了点,需要考虑的问题较多,但是如果有别人帮我们实现好了呢?Redisson 恰好提供了这个功能。

Redisson 提供的延迟队列(Delayed Queue)是它基于 Redis 的发布/订阅机制zset 实现的一种高级数据结构,用于处理需要延迟执行的任务。

其实现原理是:Redisson 将任务按照预定的执行时间存储在 zset 中,任务的执行时间作为 score,任务本身序列化后的数据作为 member。同时,Redisson 会在后台持续监控这个 ZSET,一旦发现有符合执行条件(当前时间 >= score)的任务,就会自动将这些任务转移到另一个 RQueue(Redisson 的队列实现)中,应用程序只需要从 RQueue 中取出任务执行即可

下面将演示下如何利用 Redisson 来实现延迟队列。这里的关键是需要确保任务生产者和消费者共享同一个 Redisson 延迟队列实例。

2.3.1 添加Redisson依赖

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.3</version> <!-- Redisson 的版本号,选择最新稳定版本 -->
</dependency>

<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.3</version> <!-- Redisson 的版本号,选择最新稳定版本 -->
</dependency>

2.3.2 任务生产者类 TaskProducer

负责添加任务到延迟队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
public class TaskProducer implements Runnable {

private final RDelayedQueue<String> delayedQueue;

public TaskProducer(RDelayedQueue<String> delayedQueue) {
this.delayedQueue = delayedQueue;
}

@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {
String task = "task-" + i;
delayedQueue.offer(task, 5 * i, TimeUnit.SECONDS);
log.info("任务{}已放入队列,将在{}秒后执行", task, i * 5);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

2.3.3 创建任务消费者类 TaskConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Slf4j
public class TaskConsumer implements Runnable{

private final RBlockingQueue<String> blockingDeque;

public TaskConsumer(RBlockingQueue<String> blockingDeque) {
this.blockingDeque = blockingDeque;
}

@Override
public void run() {
try {
// 阻塞等待并执行
while (true){
String task = blockingDeque.take();
log.info("消费任务:{}",task);
}
}catch (InterruptedException e){
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
}

2.3.4 测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
void redisDelayQueueMethod3() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);

RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("delayQueue");
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);

Thread producerThread = new Thread(new TaskProducer(delayedQueue));
Thread consumerThread = new Thread(new TaskConsumer(blockingQueue));

producerThread.start();
consumerThread.start();
}

运行结果:

这种方案利用了 Redis 的高性能和持久性,使延迟队列的实现既高效又可靠,同时 Redisson 提供的延迟队列抽象了底层的实现细节,使用简单方便。

3 扩展

3.1 Redis 键空间通知

Redis 键空间通知是 Redis 提供的一种机制,它允许客户端(应用程序)订阅相关事件,这些事件与 Redis 中的keys 的变化有关,当某个 key 发生了变化(如键过期、键被删除、值被修改等)时,应用程序会得到通知,从而可以执行相关的业务逻辑。

由于该操作会消耗一些性能,所以,默认情况下,键空间通知是禁用的,如果要使用该功能,我们需要在 Redis 的配置文件添加配置 notify-keyspace-events *,或者通过 CONFIG SET 命令动态启用,如下:

1
CONFIG SET notify-keyspace-events KEA

KEA 是一个参数,代表我们想要订阅的事件类型:

  • K :键空间通知,事件以 __keyspace@<db>__: 为前缀。
  • E :键事件通知,事件以__keyevent@<db>__:为前缀。
  • A :所有类型的通知。

除了上面这三个参数,还可以选择以下参数来订阅特定类型的事件:

  • g - 通用命令(如 DEL,EXPIRE,RENAME 等)。
  • $ - 字符串命令。
  • l - 列表命令。
  • s - 集合命令。
  • h - 哈希命令。
  • z - 有序集合命令。
  • x - 过期事件:当某个键过期并被删除时触发。
  • e - 驱逐事件:当某个键因为 maxmemory 策略而被删除时触发。

代码地址:https://github.com/huang-hanson/java_integration/tree/main/redis-delayQueue
转自:https://blog.csdn.net/weixin_45683778/article/details/140359695