在业务中,有很多情况我们需要用消息队列来处理异步。有很多专门的消息队列的应用,rabbimq,kafka等等。但是,这些都是要单独去部署的,如果我们就想要使用这个功能,又不想增加部署以及运维工作量怎么办呢?我们可以借助redis的相关功能来实现。
软件环境:spring boot:2.4.3
1.发布订阅 (pub/sub)
redis自带发布订阅的的功能,所有的订阅者都会收到消息。我们先看这个实现方式,得益于spring boot全家桶的便捷性,我们可以很方便的实现这个功能。
1.1 添加依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.72</version> </dependency>
|
1.2 添加一个消息监听
1 2 3 4 5 6 7 8
| @Slf4j @Component public class RedisMessageListener implements MessageListener {
@Override public void onMessage(Message message, byte[] bytes) { log.info(message.toString()); } }
|
1.3 配置订阅
这里我们订阅名为 message.redis.pub
的channel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Slf4j @RequiredArgsConstructor @Configuration public class Config { @Resource RedisMessageListener messageListener;
@Bean RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(messageListener, new ChannelTopic("message.redis.pub")); return container; } }
|
1.4 消息发布者
1 2 3 4 5
| @GetMapping(value = "pub") public void send(){ Message message = Message.builder().senderId(1L).receiverId(2L).content("message").build(); stringRedisTemplate.convertAndSend("message.redis.pub", JSON.toJSONString(message)); }
|
这里的convertAndSend就是调用的pub/sub
执行控制台打印:
1
| > 2021-04-14 16:29:43.662 INFO 85187 --- [edisContainer-2] com.ms.redisqueue.RedisMessageListener : {"content":"message","receiverId":2,"senderId":1}
|
这里我们可以看到利用redis的pub/sub模式,可以实现消息的异步发送,但是,这种模式有很多问题
- 消息无法持久化,订阅者没收到就是没收到,如果出现网络断开、Redis 宕机等,消息就会被丢弃。
- 所有订阅这个channel的client 都会收到这个消息,如果用来做task的通知,还需要再加个分布式锁,去重校验什么的
2. stream
redis5.0支持了stream这种新的数据结构,而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
Redis Stream 的结构是一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:
Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。
last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
pending_ids :消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。
2.1 消息对象
1 2 3 4 5 6 7 8 9
| @Data @SuperBuilder @AllArgsConstructor @NoArgsConstructor public class Message { private Long senderId; private Long receiverId; private String content; }
|
2.2 同样的,需要创建一个消息的监听器
1 2 3 4 5 6 7 8
| @Slf4j @Component() public class RedisStreamListener implements StreamListener<String, ObjectRecord<String,Message>> { @Override public void onMessage(ObjectRecord<String, Message> message) { log.info(message.toString()); } }
|
2.3 配置订阅
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| @Slf4j @RequiredArgsConstructor @Configuration public class Config {
private final StringRedisTemplate redisTemplate;
private final StreamListener<String,ObjectRecord<String,Message>> streamListener;
@Bean public Subscription subscription(RedisConnectionFactory factory){ checkGroup(); StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String,Message>> options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)).targetType(Message.class) .build(); StreamMessageListenerContainer<String, ObjectRecord<String,Message>> listenerContainer = StreamMessageListenerContainer.create(factory,options); Subscription subscription = listenerContainer.receiveAutoAck( Consumer.from("test","localhost"), StreamOffset.create("message", ReadOffset.lastConsumed()), streamListener); listenerContainer.start(); return subscription; }
private void checkGroup(){ List<String> consumers = new ArrayList<>(); consumers.add("test"); StreamInfo.XInfoGroups infoGroups = null; try{ infoGroups = redisTemplate.opsForStream().groups("message"); } catch (RedisSystemException | RedisException | InvalidDataAccessApiUsageException ex) { log.error("group key not exist or commend error", ex); }
for (String consumer: consumers){ boolean consumerExist = false; if( Objects.nonNull(infoGroups) ){ if(infoGroups.stream().anyMatch(t->Objects.equals(consumer,t.groupName()))){ consumerExist = true; } } if(!consumerExist){ redisTemplate.opsForStream().createGroup("message",consumer); } }
}
}
|
2.4 消息发布
1 2 3 4 5 6
| @GetMapping(value = "stream/pub") public void streamPub(){ Message message = Message.builder().senderId(1L).receiverId(2L).content("message").build(); Record<String,Message> record = StreamRecords.objectBacked(message).withStreamKey("message"); stringRedisTemplate.opsForStream().add(record); }
|
调用发布接口,控制台输出:
1
| > 2021-04-14 16:38:00.650 INFO 85349 --- [cTaskExecutor-1] com.ms.redisqueue.RedisStreamListener : ObjectBackedRecord{recordId=1618389481872-0, value=Message(senderId=1, receiverId=2, content=message)}
|
总体来说,redis的消息队列功能还不算强大,跟专门的mq工具相比还是有所不足,但是应付一般的功能性需求,我觉得是足够了。毕竟降低了mq的部署和维护。在要求不高的情况下,用用还是比较方便的。
根据官网的说法:
Redis Stream support is currently only available through the Lettuce client as it is not yet supported by Jedis.
但是亲测下来 redission客户端也是照样没问题的。可以直接用redission客户端,这样的话,做分布式锁也就能直接用了。
redis 6+的版本比较稳定,redis 5还是各种有问题。