SpringBoot Redis消息队列(Stream实现)

目录
  1. 1. 基础概念介绍
  2. 2. 基础组件代码
  3. 3. 消息队列配置代码

基础概念介绍

stream:每个Stream都有唯一的名称,它就是Redis的key;

group:每个 Stream 都可以挂多个消费组;每个消费组 (Consumer Group) 的状态都是独立的,相互不受影响。也就是说同一份Stream 内部的消息会被每个消费组都消费到;

consumer:同一个消费组 (Consumer Group) 可以挂接多个消费者 (Consumer),这些消费者之间是竞争关系,即一条消息仅可被一个组内的一个消费者消费。每个消费者有一个组内唯一名称。

基础组件代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* 该组件是对redis stream命令的一些实现,可单独使用
*/
@Component
public class RedisStream {

@Autowired
private StringRedisTemplate redisTemplate;

/**
* 创建消费组
* @param key
* @param group
* @return
*/
public String creartGroup(String key, String group){
return redisTemplate.opsForStream().createGroup(key, group);
}

/**
* 消费组信息
* @param key
* @param group
* @return
*/
public StreamInfo.XInfoConsumers consumers(String key, String group){
return redisTemplate.opsForStream().consumers(key, group);
}

/**
* 确认已消费
* @param key
* @param group
* @param recordIds
* @return
*/
public Long ack(String key, String group, String... recordIds){
return redisTemplate.opsForStream().acknowledge(key, group, recordIds);
}

/**
* 追加消息
* @param key
* @param field
* @param value
* @return
*/
public String add(String key, String field, Object value){
Map<String, Object> content = new HashMap<>(1);
content.put(field, value);
return add(key, content);
}

public String add(String key, Map<String, Object> content){
return redisTemplate.opsForStream().add(key, content).getValue();
}

/**
* 删除消息,这里的删除仅仅是设置了标志位,不影响消息总长度
* 消息存储在stream的节点下,删除时仅对消息做删除标记,当一个节点下的所有条目都被标记为删除时,销毁节点
* @param key
* @param recordIds
* @return
*/
public Long del(String key, String... recordIds){
return redisTemplate.opsForStream().delete(key, recordIds);
}

/**
* 消息长度
* @param key
* @return
*/
public Long len(String key){
return redisTemplate.opsForStream().size(key);
}

/**
* 从开始读
* @param key
* @return
*/
public List<MapRecord<String, Object, Object>> read(String key){
return redisTemplate.opsForStream().read(StreamOffset.fromStart(key));
}

/**
* 从指定的ID开始读
* @param key
* @param recordId
* @return
*/
public List<MapRecord<String, Object, Object>> read(String key, String recordId){
return redisTemplate.opsForStream().read(StreamOffset.from(MapRecord.create(key, new HashMap<>(1)).withId(RecordId.of(recordId))));
}

}

消息队列配置代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

/**
* redis stream监听消息
*/
@Component
public class ListenerMessage implements StreamListener<String, MapRecord<String, String, String>> {

@Override
public void onMessage(MapRecord<String, String, String> message) {
// 接收到消息
System.out.println("message id " + message.getId());
System.out.println("stream " + message.getStream());
System.out.println("body " + message.getValue());
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* redis stream监听消息
* 在消费完成后确认已消费
*/
@Component
public class ListenerMessage2 implements StreamListener<String, MapRecord<String, String, String>> {

@Autowired
private StringRedisTemplate redisTemplate;

@Autowired
private StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ?> streamMessageListenerContainerOptions;

@Override
public void onMessage(MapRecord<String, String, String> message) {
// 接收到消息
System.out.println("message id " + message.getId());
System.out.println("stream " + message.getStream());
System.out.println("body " + message.getValue());

// 消费完成后确认消费(ACK)
redisTemplate.opsForStream(streamMessageListenerContainerOptions.getRequiredHashMapper()).acknowledge(message.getStream(),"group2", message.getId());
}

}

以上是两个不同的消息监听器处理,配合以下的配置代码使用,具体使用需要根据场景选择:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.time.Duration;

/**
* redis stream 配置(redis5.0以上)
*/
@Configuration
public class RedisStreamConfig {

@Autowired
private ListenerMessage listenerMessage;

@Autowired
private ListenerMessage listenerMessage2;

@Bean
public StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ?> streamMessageListenerContainerOptions(){
return StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.build();
}

@Bean
public StreamMessageListenerContainer streamMessageListenerContainer(RedisConnectionFactory factory,
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ?> streamMessageListenerContainerOptions){
StreamMessageListenerContainer listenerContainer = StreamMessageListenerContainer.create(factory,
streamMessageListenerContainerOptions);
listenerContainer.start();
return listenerContainer;
}

/**
* 订阅者1,消费组group1,收到消息后自动确认,与订阅者2为竞争关系,消息仅被其中一个消费
* @param streamMessageListenerContainer
* @return
*/
@Bean
public Subscription subscription(StreamMessageListenerContainer streamMessageListenerContainer){
Subscription subscription = streamMessageListenerContainer.receiveAutoAck(
Consumer.from("group1","name1"),
StreamOffset.create("stream1", ReadOffset.lastConsumed()),
listenerMessage
);
return subscription;
}

/**
* 订阅者2,消费组group1,收到消息后自动确认,与订阅者1为竞争关系,消息仅被其中一个消费
* @param streamMessageListenerContainer
* @return
*/
@Bean
public Subscription subscription2(StreamMessageListenerContainer streamMessageListenerContainer){
Subscription subscription = streamMessageListenerContainer.receiveAutoAck(
Consumer.from("group1","name2"),
StreamOffset.create("stream1", ReadOffset.lastConsumed()),
listenerMessage
);
return subscription;
}

/**
* 订阅者3,消费组group2,收到消息后不自动确认,需要用户选择合适的时机确认,与订阅者1和2非竞争关系,即使消息被订阅者1或2消费,亦可消费
*
* 当某个消息被ACK,PEL列表就会减少
* 如果忘记确认(ACK),则PEL列表会不断增长占用内存
* 如果服务器发生意外,重启连接后将再次收到PEL中的消息ID列表
* @param streamMessageListenerContainer
* @return
*/
@Bean
public Subscription subscription3(StreamMessageListenerContainer streamMessageListenerContainer){
Subscription subscription = streamMessageListenerContainer.receive(
Consumer.from("group2","name1"),
StreamOffset.create("stream1", ReadOffset.lastConsumed()),
listenerMessage2
);
return subscription;
}

}

扩展阅读:如何在Springboot中使用Redis5的Stream