欢迎光临散文网 会员登陆 & 注册

RabbitMQ教程 | RabbitMQ的发布订阅模式Publish/Subscribe精讲

2023-11-01 15:54 作者:袁庭新  | 我要投稿

各位小伙伴很久不见了,袁老师今儿又要给大家分享干货了。我们知道RabbitMQ有简单模式、工作队列模式、发布订阅模式、路由模式、主题模式、远程过程调用模式、发布者确认模式等。这么多模式,你可能一下子很难全部吸收,今天袁老师主要给大家介绍发布订阅模式Publish/Subscribe的相关内容。

SpringBoot整合RabbitMQ中间件实现消息服务,主要围绕3个部分的工作进行展开:定制中间件、消息发送者发送消息、消息消费者接收消息。其中,定制中间件是比较麻烦的工作,且必须预先定制。

下面袁老师以用户注册成功后,同时发送邮件通知和短信通知这一场景为例进行功能介绍, 分别使用基于API、基于配置类和基于注解这3种方式,来实现Publish/Subscribe工作模式的整合。

01 基于API的方式

基于API的方式,是指使用Spring框架提供的API管理类AmqpAdmin定制消息发送组件,并进行消息的发送。这种定制消息发送组件的方式,与在RabbitMQ可视化界面上通过对应面板进行组件操作的实现基本一样,都是通过管理员的身份,预先手动声明交换器、队列、路由键等,然后组装消息队列供应用程序调用,从而实现消息服务。下面我们就对这种基于API的方式进行讲解和演示。

1.1 使用AmqpAdmin定制消息发送组件

我们先打开chapter08项目的测试类Chapter08ApplicationTests,在该测试类中先引入AmqpAdmin管理类定制Publish/Subscribe工作模式所需的消息组件。

执行上述单元测试方法amqpAdmin(),验证RabbitMQ消息组件的定制效果。单元测试方法执行成功后,通过RabbitMQ可视化管理页面的Exchanges面板查看效果。

从上图可以看出,在RabbitMQ可视化管理页面的Exchanges面板中,新出现了一个名称为fanout_exchange的交换器(其他7个交换器是RabbitMQ自带的),且其类型是我们设置的fanout类型。我们可以单击fanout_exchange交换器进入查看。

从上图可以看出,在fanout_exchange交换器详情页面中展示有该交换器的具体信息,还有与之绑定的fanout_queue_email和fanout_queue_sms两个消息队列,并且与程序中设置的绑定规则一致。切换到Queues面板页面,查看定制生成的消息队列信息。

从上图可以看出,在Queues队列面板页面中,展示有定制的消息队列信息,这与程序中定制的消息队列一致,我们可以单击消息队列名称查看每个队列的详情。

通过上述操作可以发现,在管理页面中提供了消息组件交换器、队列的定制功能。在程序中使用Spring框架提供的管理员API组件AmqpAdmin,定制消息组件和在管理页面上手动定制消息组件的本质是一样的。

1.2 消息发送着发送消息

完成消息组件的定制工作后,创建消息发送者发送消息到消息队列中。发送消息时,我们可以借助一个实体类传递消息,需要预先创建一个实体类对象。

首先,在chapter08项目中创建名为com.ytx.domain的包,并在该包下创建一个实体类User。

其次,我们在项目测试类Chapter08ApplicationTests中,使用Spring框架提供的RabbitTemplate模板类实现消息发送。

上述代码中,我们先使用@Autowired注解,引入消息中间件管理的RabbitTemplate组件对象,然后使用该模板工具类的convertAndSend(String exchange, String routingKey, Object object)方法进行消息发布。此方法中的第1个参数表示发送消息的交换器,这个参数值要与之前定制的交换器名称一致;第2个参数表示路由键,因为实现的是Publish/Subscribe工作模式,所以不需要指定;第3个参数是发送的消息内容,接收Object类型。

然后,执行上述消息发送的测试方法subPublisher(),控制台执行效果见下图所示。

从上图可以看出,发送实体类对象消息时程序发生异常,从异常信息“SimpleMessageConverter only supports String, byte[] and Serializable payloads”可以看出,消息发送过程中默认使用了SimpleMessageConverter转换器进行消息转换存储,该转换器只支持字符串或实体类对象序列化后的消息。而测试类中发送的是User实体类对象消息,所以发生异常。

如果要解决上述消息中间件发送实体类消息出现的异常,我们通常可以采用两种解决方案:第一种是执行JDK自带的Serializable序列化接口;第二种是定制其他类型的消息转化器。两种实现方式都可行,相对于第二种实现方式而言,第一种方式实现后的可视化效果较差,转换后的消息无法辨识,所以一般使用第二种方式。

接着我们在chapter08项目中创建名为com.ytx.config的包,并在该包下创建一个RabbitMQ消息配置类RabbitMQConfig。

代码中创建一个RabbitMQ消息配置类RabbitMQConfig,并在该配置类中通过@Bean注解自定义一个Jackson2JsonMessageConverter类型的消息转换器组件,该组件的返回值必须为MessageConverter类型。

再次执行subPublisher()方法,该方法执行成功后,查看RabbitMQ可视化管理页面Queues面板信息。

从上图可以看出,消息发送完成后,Publish/Subscribe工作模式下绑定的两个消息队列中各自拥有一条待接收的消息, 由于目前尚未提供消息消费者,所以刚才测试类发送的消息会暂存在队列中。进入队列详情页面查看消息。

1.3 消息消费者接收消息

在chapter08项目中创建名为com.ytx.service的包,并在该包下创建一个针对RabbitMQ消息中间件进行消息接收和处理的业务类RabbitMQService。

上述代码中,创建了一个接收处理RabbitMQ消息的业务处理类RabbitMQService,在该类中使用Spring框架提供的@RabbitListener注解,我们可以监听队列名称为fanout_queue_email和fanout_queue_sms的消息,监听的这两个队列是前面指定发送并存储消息的消息队列。

需要说明的是,使用@RabbitListener注解监听队列消息后,一旦服务启动且监听到指定的队列中有消息存在(目前两个队列中各有一条相同的消息),对应注解的方法就会立即接收并消费队列中的消息。另外,在接收消息的方法中,参数类型可以与发送的消息类型保持一致,或者使用Object类型和Message类型。如果使用与消息类型对应的参数接收消息的话,只能够得到具体的消息体信息;如果使用Object或者Message类型参数接收消息的话,还可以获得除了消息体外的消息参数信息MessageProperties。

启动chapter08项目,控制台显示的消息消费效果如下图所示。

从上图可以看出,项目启动成功后,消息消费者监听到消息队列中存在的两条消息,并进行了各自的消费。与此同时,通过RabbitMQ可视化管理页面的Queues面板查看队列消息情况,会发现两个队列中存储的消息已经被消费。至此,一条完整的消息发送、 消息中间件存储、消息消费的Publish/Subscribe(发布订阅模式)工作模式的业务案例已经实现。

注意,如果没有引入Spring Web模块的依赖,启动chapter08项目时,消息消费者接收消息会报以下错误。

小提示:

上述代码中,使用的是开发中常用的@RabbitListener注解,来监听指定名称队列的消息情况,这种方式会在监听到指定队列存在消息后立即进行消费处理。除此之外,我们还可以使用RabbitTemplate模板类的receiveAndConvert(String queueName)方法手动消费指定队列中的消息。

02 基于配置类的方式

基于配置类的方式,主要讲的是使用SpringBoot框架提供的@Configuration注解,配置定制消息发送组件,并进行消息发送。下面我们来对这种基于配置类的方式进行讲解和演示。

打开RabbitMQ消息配置类RabbitMQConfig,在该配置类中使用基于配置类的方式定制消息发送相关组件。

上述代码中,使用@Bean注解定制了3种类型的Bean组件,这3种组件分别表示交换器、消息队列和消息队列与交换器的绑定。这种基于配置类方式定制的消息组件,其实现和基于API方式定制的消息组件完全一样,只不过是实现方式不同而已。

按照消息服务整合实现步骤,完成消息组件的定制后,还需要编写消息发送者和消息消费者,而在基于API的方式中已经实现了消息发送者和消息消费者,并且基于配置类方式定制的消息组件名称,和之前测试用的消息发送和消息消费组件名称都是一致的,所以这里我们可以直接重复使用。

重新运行消息发送者测试方法subPublisher(),消息消费者可以自动监听并消费消息队列中存在的消息,效果与基于API的方式测试效果一样。

03 基于注解的方式

基于注解的方式指的是使用Spring框架的@RabbitListener注解定制消息发送组件并发送消息。

在消息接收和处理的业务类RabbitMQService中,将针对邮件业务和短信业务处理的消息消费者方法进行注释,使用@RabbitListener注解及其相关属性定制消息发送组件。

上述代码中,使用@RabbitListener注解及其相关属性定制了两个消息组件的消费者,这两个消费者都接收实体类User并消费。在@RabbitListener注解中,bindings属性用于创建并绑定交换器和消息队列组件,需要注意的是,为了能使两个消息组件的消费者接收到实体类User,需要我们在定制交换器时将交换器类型type设置为fanout。另外,bindings属性的@QueueBinding注解除了有value、exchange属性外,还有key属性用于定制路由键routingKey(当前发布订阅模式不需要)。

重启测试方法subPublisher(),消息消费者可以自动监听并消费消息队列中存在的消息,效果与基于API的方式测试效果一样。

至此,我们就在SpringBoot中完成了基于API、基于配置类和基于注解这3种方式,来实现Publish/Subscribe工作模式的整合讲解。在这3种实现消息服务的方式中,基于API的方式相对简单、直观,但容易与业务代码产生耦合;基于配置类的方式相对隔离、容易统一管理、符合Spring Boot框架思想;基于注解的方式清晰明了、方便各自管理,但是也容易与业务代码产生耦合。

在实际开发中,使用基于配置类的方式和基于注解的方式较为常见,基于API的方式则偶尔使用,当然大家要根据实际情况进行具体选择。

04 总结回顾

今天袁老师给大家分别介绍了基于API方式、配置类方式和注解的3种消息队列,并教会了大家实现发布订阅Publish/Subscribe模式的整合及代码实现,要求大家重点掌握基于注解方式的实现哦。有关RabbitMQ的其他内容,袁老师会在后续的文章中不断进行更新,欢迎大家持续关注。

今天的内容,袁老师就给大家介绍到这里,现在你学会了吗?关注「袁庭新」,干货天天都不断!


RabbitMQ教程 | RabbitMQ的发布订阅模式Publish/Subscribe精讲的评论 (共 条)

分享到微博请遵守国家法律