学习记录之消息队列
Dubbo远程调用的性能问题
Dubbo调用普遍存在于我们的微服务项目中
这些Dubbo调用全部是同步的操作
这里的"同步"指:消费者A调用生产者B之后,A的线程会进入阻塞状态,等待生产者B运行结束返回之后,A才能运行之后的代码

Dubbo消费者发送调用后进入阻塞状态,这个状态表示该线程仍占用内存资源,但是什么动作都不做
如果生产者运行耗时较久,消费者就一直等待,如果消费者利用这个时间,那么可以处理更多请求,业务整体效率会提升
实际情况下,Dubbo有些必要的返回值必须等待,但是不必要等待的服务返回值,我们可以不等待去做别的事情
这种情况下我们就要使用消息队列
什么是消息队列
消息队列(Message Queue)简称MQ
消息队列是采用"异步(两个微服务项目并不需要同时完成请求)"的方式来传递数据完成业务操作流程的业务处理方式
消息队列的特征

消息队列的特征(作用):
利用异步的特性,提高服务器的运行效率,减少因为远程调用出现的线程等待\阻塞
削峰填谷:在并发峰值超过当前系统处理能力时,我们将没处理的信息保存在消息队列中,在后面出现的较闲的时间中去处理,直到所有数据依次处理完成,能够防止在并发峰值时短时间大量请求而导致的系统不稳定
消息队列的延时:因为是异步执行,请求的发起者并不知道消息何时能处理完,如果业务不能接受这种延迟,就不要使用消息队列

常见消息队列软件
Kafka:性能好\功能弱:适合大数据量,高并发的情况,大数据领域使用较多
RabbitMQ:功能强\性能一般:适合发送业务需求复杂的消息队列,java业务中使用较多
RocketMQ:阿里的
ActiveMQ:前几年流行的,老项目可能用到
消息队列异常处理:
问:消息队列运行时发生异常,没有完成业务,怎么通知消息的发送者,完成数据的回滚?
当消息队列中发生异常时,在异常处理的代码中,我们可以向消息的发送者发送消息,然后通知发送者处理,消息的发送者接收到消息后,一般要手写代码回滚,如果回滚代码过程中再发送异常
可以将这个信息直接发送给"死信队列"
死信队列没有任何处理者,通常情况下会有专人周期性的处理死信队列的消息
Kafka
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。Kafka最初是由LinkedIn开发,并随后于2011年初开源。
kafka软件结构
Kafka是一个结构相对简单的消息队列(MQ)软件
kafka软件结构图

Kafka Cluster(Kafka集群)
Partition(分片)
Producer:消息的发送方,也就是消息的来源,Kafka中的生产者
Consumer:消息的接收方,也是消息的目标,Kafka中的消费者
Topic:话题或主题的意思,消息的收发双方要依据同一个话题名称,才不会将信息错发给别人
Record:消息记录,就是生产者和消费者传递的信息内容,保存在指定的Topic中
Kafka的特征与优势
Kafka作为消息队列,它和其他同类产品相比,突出的特点就是性能强大
Kafka将消息队列中的信息保存在硬盘中
Kafka对硬盘的读取规则进行优化后,效率能够接近内存
硬盘的优化规则主要依靠"顺序读写,零拷贝,日志压缩等技术"
Kafka处理队列中数据的默认设置:
Kafka队列信息能够一直向硬盘中保存(理论上没有大小限制)
Kafka默认队列中的信息保存7天,可以配置这个时间,缩短这个时间可以减少Kafka的磁盘消耗
Kafka的安装和配置
必须将我们kafka软件的解压位置设置在一个根目录,文件夹名称尽量短(例如:kafka)
然后路径不要有空格和中文
我们要创建一个空目录用于保存Kafka运行过程中产生的数据
本次创建名称为data的空目录
下面进行Kafka启动前的配置
先到F:\kafka\config下配置有文件zookeeper.properties
找到dataDir属性修改如下
dataDir=F:/data
修改完毕之后要Ctrl+S进行保存,否则修改无效!!!!
注意F盘和data文件夹名称,匹配自己电脑的真实路径和文件夹名称
还要修改server.properties配置文件
log.dirs=F:/data
修改注意事项和上面相同
启动kafka
要想启动Kafka必须先启动Zookeeper
Zookeeper介绍
zoo:动物园
keeper:园长
可以引申为管理动物的人
Linux服务器中安装的各种软件,很多都是有动物动物形象的
如果这些软件在Linux中需要修改配置信息的话,就需要进入这个软件,去修改配置,每个软件都需要单独修改配置的话,工作量很大
我们使用Zookeeper之后,可以创建一个新的管理各种软件配置的文件管理系统
在Zookeeper中,可以修改服务器系统中的所有软件配置
长此以往,很多软件就删除了自己写配置文件的功能,而直接从Zookeeper中获取
Kafka就是需要将配置编写在Zookeeper中的软件之一
Zookeeper启动
进入路径F:\kafka\bin\windows
输入cmd进入dos命令行
kafka启动
总体方式一样,输入不同指令
在启动kafka时有一个常见错误
wmic不是内部或外部命令
这样的提示,需要安装wmic命令,安装方式参考
https://zhidao.baidu.com/question/295061710.html
Kafka使用演示
启动的zookeeper和kafka的窗口不要关闭
我们在csmall项目中编写一个kafka使用的演示
csmall-cart-webapi模块
添加依赖
修改yml文件进行配置
在SpringBoot启动类中添加启动Kafka的注解
下面我们就可以实现周期性的向kafka发送消息并接收的操作了
编写消息的发送
创建一个叫Consumer的类来接收消息
RabbitMQ
什么是RabbitMQ
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。 AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。 RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ特征
1.可靠性(Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
2.灵活的路由(Flexible Routing) 在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
3.消息集群(Clustering) 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker
4.高可用(Highly Available Queues) 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
5.多种协议(Multi-protocol) RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
6.多语言客户端(Many Clients) RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
7.管理界面(Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
8.跟踪机制(Tracing) 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
9.插件机制(Plugin System) RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
下载软件
RabbitMQ是Erlang语言开发的,所以要先安装Erlang语言的运行环境
下载Erlang的官方路径
https://erlang.org/download/otp_versions_tree.html

安装的话就是双击
安装过程中都可以使用默认设置,需要注意的是
不要安装在中文路径和有空格的路径下!!!
下载RabbitMQ的官方网址
https://www.rabbitmq.com/install-windows.html

安装也是双击即可
不要安装在中文路径和有空格的路径下!!!
RabbitMQ的结构
RabbitMQ软件支持很多种消息队列的发送方式的
使用的比较多的是路由模式

和Kafka不同,Kafka是使用话题名称来收发信息,结构简单
RabbitMQ是使用交换机\路由key指定要发送消息的队列
消息的发送者发送消息时,需要指定交换机和路由key名称
消息的接收方接收消息时,只需要指定队列的名称
在编写代码上,相比于Kafka,每个业务要编写一个配置类
这个配置类中要绑定交换机和路由key的关系,以及路由Key和队列的关系
配置Erlang的环境变量
要想运行RabbitMQ必须保证系统有Erlang的环境变量
配置Erlang环境变量
把安装Erlang的bin目录配置在环境变量Path的属性中

启动RabbitMQ
找到RabbitMQ的安装目录
可能是:
具体路径根据自己的情况寻找
地址栏运行cmd
输入启动指令如下
结果如下

运行完成后
可以在Window任务管理器中的服务选项卡里找到RabbitMQ的服务(Ctrl+Shift+ESC)
另外的验证方法:
因为RabbitMQ自带一个管理的界面,所以我们可以访问这个界面来验证它的运行状态
http://localhost:15672

登录界面用户名密码
guest
guest
登录成功后看到RabbitMQ运行的状态
如果启动失败,需要重新安装
参考路径如下
https://baijiahao.baidu.com/s?id=1720472084636520996&wfr=spider&for=pc
利用RabbitMQ完成消息的收发
csmall-stock-webapi项目中测试RabbitMQ
可以利用之前我们使用Quartz实现的每隔一段时间输出当前日期信息的方法改为发送消息
添加依赖
yml文件配置
交换机\路由Key\队列的配置类
RabbitMQ要求我们在java代码级别设置交换机\路由Key\队列的关系
我们在quartz包下,创建config包
包中创建配置信息类
RabbitMQ发送消息
我们在QuartzJob类中输出时间的代码后继续编写代码
实现RabbitMQ消息的发送
我们可以通过修改QuartzConfig类中的Cron表达式修改调用的周期
接收RabbitMQ的消息
quartz包下再创建一个新的类用于接收信息
RabbitMQConsumer代码如下
启动Nacos\RabbitMQ\Seata
启动stock-webapi
根据Cron表达式,消息会在0/10/20/30/40/50秒数时运行
测试成功表示一切正常