spring boot redis 消息队列(pub/sub和stream)

目录
  1. 1. 1.发布订阅 (pub/sub)
    1. 1.1. 1.1 添加依赖
    2. 1.2. 1.2 添加一个消息监听
    3. 1.3. 1.3 配置订阅
    4. 1.4. 1.4 消息发布者
  2. 2. 2. stream
    1. 2.1. 2.1 消息对象
    2. 2.2. 2.2 同样的,需要创建一个消息的监听器
    3. 2.3. 2.3 配置订阅
    4. 2.4. 2.4 消息发布

在业务中,有很多情况我们需要用消息队列来处理异步。有很多专门的消息队列的应用,rabbimq,kafka等等。但是,这些都是要单独去部署的,如果我们就想要使用这个功能,又不想增加部署以及运维工作量怎么办呢?我们可以借助redis的相关功能来实现。

软件环境:spring boot:2.4.3

1.发布订阅 (pub/sub)

redis自带发布订阅的的功能,所有的订阅者都会收到消息。我们先看这个实现方式,得益于spring boot全家桶的便捷性,我们可以很方便的实现这个功能。

1.1 添加依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.72</version>
</dependency>

1.2 添加一个消息监听

1
2
3
4
5
6
7
8
@Slf4j
@Component
public class RedisMessageListener implements MessageListener {

@Override public void onMessage(Message message, byte[] bytes) {
log.info(message.toString());
}
}

1.3 配置订阅

这里我们订阅名为 message.redis.pub的channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Slf4j
@RequiredArgsConstructor
@Configuration
public class Config {
@Resource
RedisMessageListener messageListener;

@Bean
RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// container.addMessageListener(new MessageListenerAdapter(messageListener), new ChannelTopic("message.redis.pub"));
container.addMessageListener(messageListener, new ChannelTopic("message.redis.pub"));
return container;
}
}

1.4 消息发布者

1
2
3
4
5
@GetMapping(value = "pub")
public void send(){
Message message = Message.builder().senderId(1L).receiverId(2L).content("message").build();
stringRedisTemplate.convertAndSend("message.redis.pub", JSON.toJSONString(message));
}

这里的convertAndSend就是调用的pub/sub

执行控制台打印:

1
> 2021-04-14 16:29:43.662  INFO 85187 --- [edisContainer-2] com.ms.redisqueue.RedisMessageListener : {"content":"message","receiverId":2,"senderId":1}

这里我们可以看到利用redis的pub/sub模式,可以实现消息的异步发送,但是,这种模式有很多问题

  • 消息无法持久化,订阅者没收到就是没收到,如果出现网络断开、Redis 宕机等,消息就会被丢弃。
  • 所有订阅这个channel的client 都会收到这个消息,如果用来做task的通知,还需要再加个分布式锁,去重校验什么的

2. stream

redis5.0支持了stream这种新的数据结构,而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
Redis Stream 的结构是一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:

Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。
last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
pending_ids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。

2.1 消息对象

1
2
3
4
5
6
7
8
9
@Data
@SuperBuilder
@AllArgsConstructor
@NoArgsConstructor
public class Message {
private Long senderId;
private Long receiverId;
private String content;
}

2.2 同样的,需要创建一个消息的监听器

1
2
3
4
5
6
7
8
@Slf4j
@Component()
public class RedisStreamListener implements StreamListener<String, ObjectRecord<String,Message>> {
@Override
public void onMessage(ObjectRecord<String, Message> message) {
log.info(message.toString());
}
}

2.3 配置订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Slf4j
@RequiredArgsConstructor
@Configuration
public class Config {

private final StringRedisTemplate redisTemplate;

private final StreamListener<String,ObjectRecord<String,Message>> streamListener;

@Bean
public Subscription subscription(RedisConnectionFactory factory){
checkGroup();
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String,Message>>
options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1)).targetType(Message.class)
.build();
StreamMessageListenerContainer<String, ObjectRecord<String,Message>>
listenerContainer = StreamMessageListenerContainer.create(factory,options);
Subscription subscription = listenerContainer.receiveAutoAck(
Consumer.from("test","localhost"),
StreamOffset.create("message", ReadOffset.lastConsumed()),
streamListener);
listenerContainer.start();
return subscription;
}

/**
* 由于订阅需要先有stream,先做下检查
*/
private void checkGroup(){
List<String> consumers = new ArrayList<>();
consumers.add("test");
StreamInfo.XInfoGroups infoGroups = null;
try{
infoGroups = redisTemplate.opsForStream().groups("message");
} catch (RedisSystemException | RedisException | InvalidDataAccessApiUsageException ex) {
log.error("group key not exist or commend error", ex);
}

for (String consumer: consumers){
boolean consumerExist = false;
if( Objects.nonNull(infoGroups) ){
if(infoGroups.stream().anyMatch(t->Objects.equals(consumer,t.groupName()))){
consumerExist = true;
}
}
if(!consumerExist){
redisTemplate.opsForStream().createGroup("message",consumer);
}
}

}

}

2.4 消息发布

1
2
3
4
5
6
@GetMapping(value = "stream/pub")
public void streamPub(){
Message message = Message.builder().senderId(1L).receiverId(2L).content("message").build();
Record<String,Message> record = StreamRecords.objectBacked(message).withStreamKey("message");
stringRedisTemplate.opsForStream().add(record);
}

调用发布接口,控制台输出:

1
> 2021-04-14 16:38:00.650  INFO 85349 --- [cTaskExecutor-1] com.ms.redisqueue.RedisStreamListener    : ObjectBackedRecord{recordId=1618389481872-0, value=Message(senderId=1, receiverId=2, content=message)}

总体来说,redis的消息队列功能还不算强大,跟专门的mq工具相比还是有所不足,但是应付一般的功能性需求,我觉得是足够了。毕竟降低了mq的部署和维护。在要求不高的情况下,用用还是比较方便的。

根据官网的说法:
Redis Stream support is currently only available through the Lettuce client as it is not yet supported by Jedis.

但是亲测下来 redission客户端也是照样没问题的。可以直接用redission客户端,这样的话,做分布式锁也就能直接用了。

redis 6+的版本比较稳定,redis 5还是各种有问题。