Spring boot整合Redis实现发布订阅(超详细)

article/2025/11/10 21:43:25

Redis发布订阅

  • 基础知识
  • 相关命令
    • 订阅者/等待接收消息
    • 发布者/发送消息
    • 订阅者/成功接收消息
    • 常用命令汇总
  • 原理
  • Spring boot整合redis
    • 导入依赖
    • Redis配置
    • 消息封装类(MessageDto)
    • Redis配置类
    • 测试类
    • 订阅方实现一:RedisMessageListener
    • 订阅方实现二:PrintMessageReceiver
    • MessageListenerAdapter源码分析


以下是Redis相关笔记总结,方便自己以后复习,同时也希望对大家有所帮助。

内容地址链接
Redis在Linux环境下的详细安装教程https://blog.csdn.net/BBQ__ZXB/article/details/123905061
Redis中五大基本数据类型和三种特殊数据类型https://blog.csdn.net/BBQ__ZXB/article/details/124169168
Redis中基本事务操作及乐观锁的实现https://blog.csdn.net/BBQ__ZXB/article/details/124192251
Java中使用JedisAPI操作Redis中五大基本数据类型https://blog.csdn.net/BBQ__ZXB/article/details/124194220
Spring boot整合Redis(入门教程)https://blog.csdn.net/BBQ__ZXB/article/details/124301234
Redis主从复制详解(入门教程)https://blog.csdn.net/BBQ__ZXB/article/details/124802602
Spring boot整合Redis实现发布订阅(超详细)https://blog.csdn.net/BBQ__ZXB/article/details/124980860
Redis执行save命令时报错ERRhttps://blog.csdn.net/BBQ__ZXB/article/details/123995289

基础知识

Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收信息。微信,微博,关注系统
Redis客户端可以订阅任意数量的频道

订阅/发布消息图:
在这里插入图片描述
剖析:
1.消息发送者,2.频道,3.消息订阅者

下图展示频道channel1,以及订阅这个频道的三个客户端–client2,client5和client1之间的关系
在这里插入图片描述
当有新消息通过PUBLISH命令发送给频道channel1时,这个消息就会被发送给订阅它的三个客户端
在这里插入图片描述


相关命令

订阅者/等待接收消息

首先打开 Redis 客户端,然后订阅了一个名为“bbx”的 channel,使用如下命令:

127.0.0.1:6379> SUBSCRIBE bbx
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "bbx"
3) (integer) 1

使用SUBSCRIBE命令订阅了名为 bbx 的 channel。命令执行后该客户端会出处于等待接收消息的阻塞状态。

发布者/发送消息

下面再启动一个 Redis 客户端,输入如下命令:

127.0.0.1:6379> PUBLISH bbx hello
(integer) 1
127.0.0.1:6379> PUBLISH bbx world
(integer) 1
127.0.0.1:6379> 

订阅者/成功接收消息

127.0.0.1:6379> SUBSCRIBE bbx
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "bbx"
3) (integer) 1
#等待读取推送消息
1) "message"	#消息
2) "bbx"		#频道
3) "hello"		#消息具体内容
1) "message"
2) "bbx"
3) "world"

常用命令汇总

命令说明
PSUBSCRIBE pattern [pattern …]订阅一个或多个符合指定模式的频道
PUBSUB subcommand [argument [argument …]]查看发布/订阅系统状态,可选参数
1) channel 返回在线状态的频道
2) numpat 返回指定模式的订阅者数量
3) numsub 返回指定频道的订阅者数量
PUBSUB subcommand [argument [argument …]]将信息发送到指定的频道
PUNSUBSCRIBE [pattern [pattern …]]退订所有指定模式的频道
SUBSCRIBE channel [channel …]订阅一个或者多个频道的消息
UNSUBSCRIBE [channel [channel …]]退订指定的频道

原理

Redis是使用C实现的,可以通过分析Redis源码里的pubsub.c文件,了解发布和订阅机制的底层实现

Redis通过PUBLISH,SUBSCRIBE和PSUBSCRIBE等命令实现发布和订阅功能

通过SUBSCRIBE命令订阅某频道后,redis-server里维护了一个字典,字典的键就是一个频道,字典的值则是一个链表,链表中保存了所有订阅这个频道的客户端。SUBSCRIBE命令的关键,就是将客户端添加到给定频道的订阅链表中。

通过PUBLISH命令向订阅者发送消息,redis-server会使用给定频道作为键,在它维护的频道字典中查找记录了订阅这个频道的所有客户端的链表,将消息发布给所有订阅者

Pub和Sub从字面上理解就是发布(Publish)和订阅(Subscribe),在redis中,可以设定对某一个key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的信息,这一功能最明显的用法就是实时消息系统,比如普通的即时聊天,群聊等功能。


Spring boot整合redis

导入依赖

 <!--操作redis--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>

Redis配置

#SpringBoot 所有的配置类,都有一个自动配置类 RedisAutoConfiguration
#自动配置类都每绑定一个properties配置文件  RedisProperties#配置redis
spring.redis.host=localhost
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=*****
#默认是数据库0
spring.redis.database= 0# 连接池最大连接数(使用负值表示没有限制) 默认 8
spring.redis.lettuce.pool.max-active=8
# 连接池最大阻塞等待时间(使用负值表示没有限制) 默认 -1
spring.redis.lettuce.pool.max-wait=-1
# 连接池中的最大空闲连接 默认 8
spring.redis.lettuce.pool.max-idle=8
# 连接池中的最小空闲连接 默认 0
spring.redis.lettuce.pool.min-idle=0

消息封装类(MessageDto)

@AllArgsConstructor
@NoArgsConstructor
@Data
public class MessageDto implements Serializable {private String data;private String title;private String content;
}

Redis配置类

@Configuration
public class RedisConfig {//编写配置类,可模仿RedisAutoConfiguration配置类,该类在开发中可直接使用@Bean@SuppressWarnings("all")public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {//由于源码autoConfig中是<Object, Object>,开发中一般直接使用<String,Object>RedisTemplate<String, Object> template = new RedisTemplate();template.setConnectionFactory(factory);//Json序列化配置Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper om = new ObjectMapper();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(om);//String的序列化StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();//key采用string的序列化template.setKeySerializer(stringRedisSerializer);//hash的key采用string的序列化template.setHashKeySerializer(stringRedisSerializer);//value序列化采用jacksontemplate.setValueSerializer(jackson2JsonRedisSerializer);//hash的value序列化方式采用jacksontemplate.setHashValueSerializer(jackson2JsonRedisSerializer);template.afterPropertiesSet();return template;}/*** Redis消息监听器容器* 这个容器加载了RedisConnectionFactory和消息监听器* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理** @param redisConnectionFactory 连接工厂* @param adapter                适配器* @return redis消息监听容器*/@Bean@SuppressWarnings("all")public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory,RedisMessageListener listener,MessageListenerAdapter adapter) {final String TOPIC_NAME1 = "TEST_TOPIC1"; // 订阅主题final String TOPIC_NAME2 = "TEST_TOPIC2"; // 订阅主题RedisMessageListenerContainer container = new RedisMessageListenerContainer();// 监听所有库的key过期事件container.setConnectionFactory(redisConnectionFactory);// 所有的订阅消息,都需要在这里进行注册绑定,new PatternTopic(TOPIC_NAME1)表示发布的主题信息// 可以添加多个 messageListener,配置不同的通道container.addMessageListener(listener, new PatternTopic(TOPIC_NAME1));container.addMessageListener(adapter, new PatternTopic(TOPIC_NAME2));/*** 设置序列化对象* 特别注意:1. 发布的时候需要设置序列化;订阅方也需要设置序列化*         2. 设置序列化对象必须放在[加入消息监听器]这一步后面,否则会导致接收器接收不到消息*/Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper objectMapper = new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);seria.setObjectMapper(objectMapper);container.setTopicSerializer(seria);return container;}/*** 这个地方是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”* 也有好几个重载方法,这边默认调用处理器的方法 叫OnMessage** @param printMessageReceiver* @return*/@Beanpublic MessageListenerAdapter listenerAdapter(PrintMessageReceiver printMessageReceiver) {MessageListenerAdapter receiveMessage = new MessageListenerAdapter(printMessageReceiver, "receiveMessage");Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);ObjectMapper objectMapper = new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);seria.setObjectMapper(objectMapper);receiveMessage.setSerializer(seria);return receiveMessage;}
}

该类中,可以通过调用消息接收容器(container)的 addMessageListener(MessageListener listener, Topic topic) 方法 订阅消息;相反地,也可以调用它的 removeMessageListener(MessageListener listener, Topic topic) 方法 取消订阅消息;在这里我们分别使用两种实现方式去订阅两个不通的频道(channel)。

  1. RedisMessageListener 通过实现MessageListener接口,从而实现该接口中的onMessage(Message message, byte[] pattern)方法。
  2. MessageListenerAdapter 通过适配器的方式,自定义一个消息接收类PrintMessageReceiver和接收消息的方法
  container.addMessageListener(listener, new PatternTopic(TOPIC_NAME1));container.addMessageListener(adapter, new PatternTopic(TOPIC_NAME2));

分别使用listener去订阅主题TOPIC_NAME1,adapter去订阅TOPIC_NAME2。
接下来分别探讨测试这两种方式。

测试类

@Slf4j
@SpringBootTest
public class RedisMessageTest {@Autowiredprivate RedisUtils redisUtils;@Testpublic void test(){final String TOPIC_NAME1 = "TEST_TOPIC1"; // 订阅主题final String TOPIC_NAME2 = "TEST_TOPIC2"; // 订阅主题// 发布消息MessageDto dto = new MessageDto();LocalDateTime now = LocalDateTime.now();DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");dto.setData(timeFormatter.format(now));dto.setTitle("日常信息");dto.setContent("hello world!");redisUtils.convertAndSend(TOPIC_NAME1, dto);}
}

该类中的RedisUtils是之前自己封装的一个工具类,在该类中新增convertAndSend()方法。
RedisUtils中其他方法可跳转此连接查看

	/*** 向通道发布消息*/public boolean convertAndSend(String channel, Object message) {if (!StringUtils.hasText(channel)) {return false;}try {redisTemplate.convertAndSend(channel, message);log.info("发送消息成功,channel:{},message:{}", channel, message);return true;} catch (Exception e) {log.info("发送消息失败,channel:{},message:{}", channel, message);e.printStackTrace();}return false;}

订阅方实现一:RedisMessageListener

@Slf4j
@Component
public class RedisMessageListener implements MessageListener {@Autowiredprivate RedisTemplate redisTemplate;@Overridepublic void onMessage(Message message, byte[] pattern) {// 接收的topiclog.info("channel:" + new String(pattern));//序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)MessageDto messageDto = (MessageDto) redisTemplate.getValueSerializer().deserialize(message.getBody());log.info(messageDto.getData()+","+messageDto.getContent());}
}

对使用RedisMessageListener 进行接收消息测试。
测试结果:
在这里插入图片描述

订阅方实现二:PrintMessageReceiver

@Slf4j
@Component
public class PrintMessageReceiver {@Autowiredprivate RedisTemplate redisTemplate;public void receiveMessage(MessageDto messageDto , String channel) {// 接收的topiclog.info("channel:" + channel);log.info("message:" + messageDto.getTitle());}
}

注意:该方法的接收参数类型以及顺序,查阅源码得知,该方法的参数可以是一个(只有消息message),也可是两个(message,channel)并且顺序不能变。
在测试类中将 redisUtils.convertAndSend(TOPIC_NAME1, dto);中的TOPIC_NAME1改为TOPIC_NAME2,
测试结果:
在这里插入图片描述

MessageListenerAdapter源码分析

  1. 构造函数
public MessageListenerAdapter(Object delegate, String defaultListenerMethod) {this(delegate);setDefaultListenerMethod(defaultListenerMethod);}

其中this()方法中,初始化了序列化方式,该适配器默认的序列化方式是UTF-8的字符串序列化。
在这里插入图片描述
在这里插入图片描述
2.onMessage()

@Overridepublic void onMessage(Message message, @Nullable byte[] pattern) {try {// Check whether the delegate is a MessageListener impl itself.// In that case, the adapter will simply act as a pass-through.if (delegate != this) {if (delegate instanceof MessageListener) {((MessageListener) delegate).onMessage(message, pattern);return;}}// Regular case: find a handler method reflectively.Object convertedMessage = extractMessage(message);String convertedChannel = stringSerializer.deserialize(pattern);// Invoke the handler method with appropriate arguments.Object[] listenerArguments = new Object[] { convertedMessage, convertedChannel };invokeListenerMethod(invoker.getMethodName(), listenerArguments);} catch (Throwable th) {handleListenerException(th);}}

该方法当订阅频道有消息时默认执行,首先,if(delegate instanceof MessageListener)判断该对象的类是不是实现了MessageListener接口,如果是,就会执行它实现的onMessage()。很显然,我们是自定义的
PrintMessageReceiver 对象,,所以接着往下看。
Object convertedMessage = extractMessage(message);会将message反序列化,如未自定义序列化方式,就会用使用默认的字符串序列化,这就是为什么我们在RedisConfig类中注入listenerAdapter对象时,自定义了Jackson2JsonRedisSerializer 。
在这里插入图片描述
3. invokeListenerMethod(invoker.getMethodName(), listenerArguments);
通过反射查找定义对象中处理消息的方法。我们会看到如下的方法实现。

void invoke(Object[] arguments) throws InvocationTargetException, IllegalAccessException {Object[] message = new Object[] { arguments[0] };for (Method m : methods) {Class<?>[] types = m.getParameterTypes();Object[] args = //types.length == 2 //&& types[0].isInstance(arguments[0]) //&& types[1].isInstance(arguments[1]) ? arguments : message;if (!types[0].(args[0])) {continue;}m.invoke(delegate, args);return;}}

在这里插入图片描述
从而得知我们自定义方法中参数个数可以是一个也可以是两个,如两个参数时,第一个参数接收消息(message),第二个参数接收频道(channel),也可得知为什么自定义方法中,接收消息参数类型我们可以直接写MessageDto。


以上内容如有不对之处,还望不吝赐教。


http://chatgpt.dhexx.cn/article/e9krswFe.shtml

相关文章

【redis】发布和订阅消息

1.说明 在Redis2版本之后支持发布订阅功能&#xff0c;发布者创建一个频道&#xff0c;并在上面发送消息&#xff0c;所有订阅该频道的客户端都能收到消息&#xff08;不出意外的情况下&#xff0c;但实际不一定&#xff09;&#xff0c;发布订阅的好处是减少不必要的轮询&…

Redis的发布订阅模式

本文源码参看&#xff1a;https://github.com/duktig666/learn-example/tree/5586febea31c2fb368e19fbdba11ed08afd463e0/Redis/src/main/java/cn/duktig/pubsub Redis发布订阅概述 Redis 发布订阅 (publish/subscribe) 是一种消息通信模式&#xff1a;发送者 (pub) 发送消息…

Redis订阅和发布

1.发布和订阅 1.1什么是发布和订阅 发布订阅是一种应用程序&#xff08;系统&#xff09;之间通讯&#xff0c;传递数据的技术手段。特别是在异构&#xff08;不 同语言&#xff09;系统之间作用非常明显。发布订阅可以是实现应用&#xff08;系统&#xff09;之间的解耦合。…

Redis数据库的订阅发布

大家好&#xff0c;今天分享一下redis的订阅发布 Redis 发布订阅 (pub/sub) 是一种消息通信模式&#xff1a;发送者 (pub) 发送消息&#xff0c;订阅者 (sub) 接收消息。 Redis 客户端可以订阅任意数量的频道。 &#xff08;比如说&#xff0c;你在一个一个网站上面可以关注…

Redis订阅和发布(实操教学)

什么是Redis发布订阅 Redis 发布订阅 (pub/sub) 是一种消息通信模式&#xff1a;发送者 (pub) 发送消息&#xff0c;订阅者 (sub) 接收消息。Redis 客户端可以订阅任意数量的频道。 例1&#xff1a;client2,client5,client1订阅了频道channel1 例2&#xff1a;有新消息通过p…

Redis消息订阅发布

Redis的发布订阅(pub/sub)是一种 消息通信模式 : 发送者(pub)发送消息,订阅者(sub)接收消息 redis客户端可以订阅任意数量的频道 消息发送者 频道 消息接收者 redis频道 channel1 以及订阅这个频道的客户端client2 ; client5 ; client1 之间的关系 当有新消息通过PUBLISH 命…

Redis:发布订阅机制

参考资料&#xff1a; 《Redis进阶——发布订阅详解》 《Redis 发布订阅》 《Redis进阶 - 消息传递&#xff1a;发布订阅模式详解》 写在开头&#xff1a;本文为学习后的总结&#xff0c;可能有不到位的地方&#xff0c;错误的地方&#xff0c;欢迎各位指正。 目录 一、什…

redis发布订阅

目录 一、概要 二、特点 三、发布及订阅功能 四、Redis发布订阅命令 五、php实现redis发布-订阅 1、消息发布端 2、消息订阅端 六、订阅发布使用场景 七、在订阅时遇到错误 八、模式匹配&#xff08;正则匹配&#xff09;订阅 一、概要 Redis发布订阅(pub/sub)是一种…

springboot 整合使用redis发布订阅功能

前言 发布订阅作为一种设计思想在很多开源组件中都有体现,比如大家熟知的消息中间件等,可谓把发布订阅这一思想体现的淋漓尽致了; 一、redis发布订阅简介 Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收信息。可以参考下面两张图进…

Redis进阶——发布订阅详解

什么是发布订阅&#xff1f; Redis 发布订阅(pub/sub)是一种消息通信模式&#xff1a;发送者(pub)发送消息&#xff0c;订阅者(sub)接收消息。 Redis 的 subscribe 命令可以让客户端订阅任意数量的频道&#xff0c; 每当有新信息发送到被订阅的频道时&#xff0c; 信息就会被发…

Redis 的发布与订阅

3、Redis 的发布与订阅 3.1、发布与订阅简述 Redis提供了基于“发布/订阅”模式的消息机制。此种模式下&#xff0c;消息发布者和订阅者不进行直接通信&#xff0c;发布者客户端向指定的频道&#xff08;channel&#xff09; 发布消息&#xff0c;订阅该频道的每个客户端都可…

6. Redis 发布与订阅

文章目录 6. Redis 发布与订阅为什么需要发布、订阅发布/订阅如何使用&#xff1f; :one:基于频道发布SUBSCRIBE&#xff1a;频道订阅PUBLISH&#xff1a;向频道发送消息UNSUBSCRIBE&#xff1a;退订频道 :two: 基于模式(pattern)的发布/订阅PSUBSCRIBE&#xff1a;模式订阅PUB…

Redis的发布订阅

Redis的发布订阅&#xff08;pub/sub&#xff09;是一种消息通信模式&#xff0c;发送者&#xff08;pub&#xff09;发送信息&#xff0c;订阅者&#xff08;sub&#xff09;接收信息。Redis客户端可以订阅任意数量的频道。Pub/Sub 从字面上理解就是发布&#xff08;Publish&a…

Redis数据库的发布与订阅(详细讲解)

一、实验目的 了解Redis数据库的发布与订阅 二、发布与订阅 1.1什么是发布和订阅 发布订阅是一对多的关系&#xff0c;需要有信息的发布者和消息的收听者。 发布者&#xff1a;提供某个内容或主题&#xff0c;把内容信息发送给多个对此内容感兴趣的订阅者 订阅者&#xff…

Redis发布和订阅

一、什么是Redis发布和订阅 Redis的发布(pub)和订阅(sub)是一种消息通信模式。它包含有三个角色分别是&#xff1a;发送者、订阅者、频道。 Redis客户端可以订阅多个任意的频道。 Redis发布和订阅的结构图&#xff1a; 发送者&#xff1a;用于发送消息 订阅者&#xff1a;订阅…

Redis--发布订阅--原理/使用场景

原文网址&#xff1a;Redis--发布订阅--原理/使用场景_IT利刃出鞘的博客-CSDN博客 简介 本文介绍Redis的发布订阅功能。 Redis发布订阅简述 Redis提供了基于“发布/订阅”模式的消息机制。此种模式下&#xff0c;消息发布者和订阅者不进行直接通信&#xff0c;发布者客户端…

php查netstat,netstat怎么查看端口状态

netstat查看端口状态的方法&#xff1a;首先打开终端命令窗口&#xff1b;然后通过命令“netstat -ntlp”查看当前所有tcp端口&#xff1b;最后通过“netstat -ntulp | grep 80”命令查看所有80端口使用情况即可。 本教程操作环境&#xff1a;linux5.9.8系统&#xff0c;DELL G…

Linux系统使用ss命令查看端口状态

Linux系统使用ss命令查看端口状态 Linux系统使用ss命令查看端口状态 目录 1.可用工具 2.ss帮助 2.1 选项分类说明 2.2 过滤选项family 2.3 过滤选项state 2.4 状态之间的关系 3.ss的使用 3.1 使用示例 3.2 过滤 3.2.1 状态过滤 3.2.2 通过family过滤 3.2.3 使用地址和端口过滤 …

Win7怎样查看端口状态

在Win7系统中&#xff0c;查看端口状态可以采用以下两种方法&#xff1a; 1.netstat 先单击“开始”&#xff0c;再单击“运行”&#xff0c;输入"cmd"&#xff0c;进入DOS窗口。输入命令"netstat -na"&#xff0c;按回车&#xff0c;就会显示本机连接情况…

Linux查看端口状态

在Linux使用过程中&#xff0c;需要了解当前系统开放了哪些端口&#xff0c;并且要查看开放这些端口的具体进程和用户&#xff0c;可以通过netstat命令进行简单查询 netstat命令各个参数说明如下&#xff1a; -t : 指明显示TCP端口   -u : 指明显示UDP端口   -l : 仅显示…