欢迎光临散文网 会员登陆 & 注册

【RabbitMQ】消息队列

2022-11-06 19:37 作者:阿提艾斯  | 我要投稿

1、认识RabbitMQ

MQ: Message Queue,是一种应用程序对应用程序的通信方法,可以理解为进程间通信的一种方法,它不限编程语言,各个语言都可以使用RabbitMQ。MQ是一种生产-消费者模型,一端不断往消息队列中写消息,另一端则可以读取队列中的消息进行处理。

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦异步消息流量消锋等问题,实现高性能、高可用、可伸缩和最终一致性架构。

解耦和异步消息主要是将消息队列用在系统内部,不同系统进程之间通过消息队列达到高内聚低耦合的目的。

流量消锋主要是将消息队列放在请求进入系统内部之前,比如双十一用户请求系统量增加,就可以在请求进入系统之前,把这些请求放入消息队列里,然后系统再从消息队列里获取请求,防止系统被海量请求冲垮。


目前使用较多的消息队列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ。

Kafka: 主要解决日志方面的问题。

RabbitMQ是由Erlang语言基于AMQP协议开发的消息中间件.


2、rabbitmq安装

因为目的是学习rabbitmq逻辑思想,所以使用了在虚拟机(ubuntu)上使用docker安装的方式,非常简单直观。

2.1 下载rabbitmq镜像

如果不指定rabbitmq,默认下载的最新版本rabbitmq。

下载rabbitmq最新版本


2.2 运行rabbitmq容器

 从下图中也能看到rabbitmq中的一些基础信息,比如rabbitmq版本,erlang版本等。

运行rabbitmq

查看docker下载的rabbitmq版本,从下图中可以看到下载的版本是3.9.11

查看docker下载的rabbitmq版本


2.3 安装rabbitmq插件

rabbitmq_management插件提供了一个基于HTTP的API,用于管理和监控您的RabbitMQ服务器,以及基于浏览器的UI和命令行工具。

安装rabbitmq_management插件

那现在就可以在浏览器里访问http://127.0.0.1:15672查看rabbitmq情况了

网页登陆查看rabbitmq web服务

如果在2.2中运行容器的时候没有指定rabbitmq启动的用户名和密码的话,这里登陆的默认用户名和密码是:guest/guest,下图是登陆之后的网页样式,很简洁。

登陆rabbitmq 网页

2.4 python代码验证是否能联通rabbitmq服务

也可以换做其他语言连接rabbitmq服务,就像连接mysql一样。

前提是需要安装python库pika

生产者的一个样例:

注意连接rabbitmq的时候,ip换成虚拟机的ip,以下是验证结果,打印信息走到了,说明之前的连接是正常的:

验证

 管理界面查看下推送的消息是否存在。

管理界面查看消息

如果连接失败,会报如下错误:

连接rabbitmq失败的情况

3、rabbitmq的2种模式

3.1 简单模式

生产者做的三件事

(1) 连接rabbitmq

(2) 创建rabbitmq消息队列

(3) 向消息队列种插入数据

消费者做的三件事

(1)连接rabbitmq

(2)监听模式

(3)确定回调函数


启动生产者和消费者程序,查看运行情况:

生产者运行结果:

生产者运行结果

消费者运行结果:发现消费者一直处于hang状态,只要生产者发送一个消息,消费者就会接收到并打印出来。

消费者运行结果


注意,有几个参数需要解释下:

(1) 应答参数

默认应答

消费者监听队列代码中,有一个参数auto_ack=True,表示设置为默认应答。

这个参数设置后,有什么效果呢?消费者从消息队列中取走数据之后,消息队列中就没有这个数据了。可以看如下图截图。

默认应答

手动应答

那上面的自动应答就有一个问题,如果消费者的回调函数报错了,修复问题后,我们想重新从消息队列中取报错的那个消息就取不到了,所以为了解决这个问题,有了一个手动应答的参数。

手动应答就是把auto_ack参数设置为False,同时手动在回调函数里加一个数据已处理完的应答,通知rabbitmq,可以把消息队列中的这个数据删掉了。

为了验证手动应答生效,修改消费者应答为手动应答,并且在回调函数里写一个异常,查看产生异常之后,数据是否还在消息队列中,如果还在就表明手动应答生效了:

很明显,消费者收到消息之后,肯定会报错,因为回调函数里发生了除0操作。

回调函数构造异常

查看消息队列中报错对应的那条消息是否还存在,从下图rabbitmq管理界面看出消息果然还在,修复回调函数,依旧能正常取出堆积的消息,手动应答至此验证结束。

验证手动应答

(2) 持久化参数

持久化就是把数据保存在硬盘里,防止rabbitmq服务突然断掉了,内存里的消息都不见了。方法是:

创建队列的时候,指定该队列为可持久化队列;

插入消息时,指定该消息需要被持久化。

(3)分发参数

当有多个消费者时,默认采用的是轮询分发,也就是当生产者产生多个消息,会轮着给不同的消费者,这就是轮询分发。


但是轮询分发会有一个问题,消费者A拿到消息需要处理2小时,消费者B拿到消息需要处理3分钟,如果再来一个消息,按照轮询分发机制,是肯定要分发给消费者A的,那就得等消费者A把前一个消息处理完才会接着处理下一条消息,这就导致了时间上的浪费。因此引出了公平分发机制。


公平分发机制的目的就是哪个消费者先运行结束,就可以紧接着读取下一条消息进行处理,在时间上有了很大的节省。代码中操作起来起始很方便,就是在消费者中指定分发机制:

3.2 交换机模式

3.2.1 发布订阅模式

如下图所示,发布订阅模式生产者和消费者分别做的事情为:

生产者:

(1) 创建交换机,并向交换机中插入消息,这就相当于发布

消费者:

(1)创建消息队列

(2)监听交换机,这样交换机中一有数据,就会向监听它的消息队列中同步数据,这就相当于订阅。每个消费者接收到的数据都是相同的。

发布订阅模式原理图

生产者关键代码:

交换机类型设置为fanout

消费者关键代码


3.2.2 关键字模式

有些场景并不需要消费者创建的消息队列全部接受交换机发布的数据,比如消费者1只想处理日志级别为info的信息,消费者2只想处理日志级别为warning的信息,消费者3想处理日志级别为info、warning、error的信息。rabbitmq正好提供了一个关键字模式,可以实现上述目的。


生产者关键代码:

交换机类型设置为direct,同时插入交换机的数据添加关键字routing_key,值可以自定义。

消费者关键代码:

接受消息指定关键字routing_key,值可自定义,但是要想接收到生产者发布的数据,就必须要和生产者发布消息时指定的关键字保持一直。

3.2.3 通配符模式

关键字模式要求生产者插入数据的关键和消费者接受数据的关键字保持一致,一个字符都不可以差。通配符模式则可以进行模糊匹配,类似于正则。

通配符模式将消息的传输类型的key更加细化,以”key1.key2.key3...“的模式来指定信息传输的key的大类型和小类型,让消费者可以更加精细的确认想要获取的信息类型。

通配符模式中两个符号含义需要明确:

“#”匹配一个或多个词,例如a.#可以匹配到a.b.c.d;

“**仅匹配一个词,例如a.*只能匹配到a.b,匹配不到a.b.c。


生产者关键代码:

交换机类型指定为topic,同时关键字可以指定的更精细一些。

消费者关键代码:

绑定的关键字routing_key使用通配符匹配一类消息。


最后通过rabbitmq管理界面再查看下生成的交换机:

交换机


参考资料:

官方文档:https://www.rabbitmq.com/documentation.html

docker hub官网:https://hub.docker.com/

个人仓:https://gitee.com/atiaisi/python_rabbitmq




【RabbitMQ】消息队列的评论 (共 条)

分享到微博请遵守国家法律