欢迎光临散文网 会员登陆 & 注册

动力节点RabbitMQ教程|12小时学会rabbitmq消息中间件

2023-04-26 17:11 作者:山药当当  | 我要投稿

1. What is RabbitMQ?

1.1简介

RabbitMQ是一个广泛使用的消息服务器,采用Erlang语言编写,是一种开源的实现 AMQP(高级消息队列协议)的消息中间件;

RabbitMQ最初起源于金融系统,它的性能及稳定性都非常出色;

AMQP协议(http://www.amqp.org),即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计;

我们学的协议有哪些:(http、ftp)

1.2 相关网址

官网:https://www.rabbitmq.com 

Github:https://github.com/rabbitmq

1.3 消息中间件(MQ=Message Queue)

简单来说,消息中间件就是指保存数据的一个容器(服务器),可以用于两个系统之间的数据传递。

消息中间件一般有三个主要角色:生产者、消费者、消息代理(消息队列、消息服务器);

rabbitmq-java-client      rabbitmq-server         rabbitmq-java-client

生产者发送消息到消息服务器,然后消费者从消息代理(消息队列)中获取数据并进行处理;

1.4 常用的消息中间件

目前比较主流的几个消息中间件:

Ø RabbitMQ

Ø kafka(大数据领域)

Ø RocketMQ(阿里巴巴开源)献给Apache组织

Ø pulsar(最近一两年流行起来的)

1. MQ(Message Queue)的应用场景

1.1 异步处理

下订单:下订单--》加积分--》发红包--》发手机短信

下订单---向MQ 发消息--》积分系统,红包系统,手机短信系统接收消息

同步是阻塞的(会造成等待),异步是非阻塞的(不会等待);

大流量高并发请求、批量数据传递,就可以采用异步处理,提升系统吞吐量;

2.2系统解耦

多个系统之间,不需要直接交互,通过消息进行业务流转;

2.3 流量削峰

高负载请求/任务的缓冲处理;

2.4日志处理

主要是用kafka这个服务器来做;

日志处理是指将消息队列用于在日志处理中,比如Kafka解决大量日志传输的问题;

loger.info(.....)

ELK 日志处理解决方案:

loger.error(.....) -->logstash收集消息--> 发送消息的kafka --> elastic search (es) -->Kibana ELK日志处理平台

1. RabbitMQ运行环境搭建

RabbitMQ是使用Erlang语言开发的,所以要先下载安装Erlang

3.1 Erlang及RabbitMQ安装版本的选择

下载时一定要注意版本兼容性

版本兼容说明地址:https://www.rabbitmq.com/which-erlang.html

我们选择的版本

3.2下载Erlang

Erlang官网

https://www.erlang.org/

Linux下载:

wget https://github.com/erlang/otp/releases/download/OTP-25.1.1/otp_src_25.1.1.tar.gz

说明:wget 是linux命令,可以用来下载软件

3.3 安装Erlang

3.3.1 安装erlang前先安装Linux依赖库

yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel

说明:yum -y install 安装linux的一些依赖库的命令 ,-y表示自动确认;

3.3.2 解压erlang压缩包文件

tar -zxvf otp_src_25.1.1.tar.gz

3.3.3 配置

切换到解压的目录下,运行相应命令

cd otp_src_25.1.1

./configure

3.3.4 编译

make

3.3.5安装

make install

安装好了erlang后可以将解压的文件夹删除:

rm -rf otp_src_25.1.1

3.3.6 验证erlang是否安装成功

在命令行输入: erl 如果进入了编程命令行则表示安装成功,然后按ctrl + z 退出编程命令行;

3.4 下载RabbitMQ

从RabbitMQ官网https://www.rabbitmq.com找到下载链接

Linux:下载3.10.11

wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.10.11/rabbitmq-server-generic-unix-3.10.11.tar.xz

generic 是通用的意思,这个版本也就是通用的unix版本

3.5 安装RabbitMQ

解压RabbitMQ的压缩包,即安装完成,无需再编译

tar -xvf rabbitmq-server-generic-unix-3.10.11.tar.xz  -C  /usr/local/

说明 -C 选项是指定解压目录,如果不指定会解压到当前目录

此时rabbitmq就安装好了;

4. 启动及停止RabbitMQ

4.1启动RabbitMQ

切换到安装目录的sbin目录下:

#启动

./rabbitmq-server  -detached

说明:

-detached 将表示在后台启动运行rabbitmq;不加该参数表示前台启动;

rabbitmq的运行日志存放在安装目录的var目录下;

现在的目录是:/usr/local/rabbitmq_server-3.10.11/var/log/rabbitmq

4.2 查看RabbitMQ的状态

切换到sbin目录下执行:

./rabbitmqctl -n rabbit status

 说明:-n rabbit 是指定节点名称为rabbit,目前只有一个节点,节点名默认为rabbit

 此处-n rabbit 也可以省略

4.3 停止RabbitMQ

切换到sbin目录下执行:

./rabbitmqctl shutdown

4.4 配置path环境变量

vi /etc/profile

RABBIT_HOME=/usr/local/rabbitmq_server-3.10.11

PATH=$PATH:$RABBIT_HOME/sbin

export RABBIT_HOME PATH

刷新环境变量,命令如下

source /etc/profile

5. RabbitMQ管理命令

./rabbitmqctl 是一个管理命令,可以管理rabbitmq的很多操作。

./rabbitmqctl help可以查看一下有哪些操作

查看具体子命令 可以使用 ./rabbitmqctl help 子命令名称

5.1 用户管理

用户管理包括增加用户,删除用户,查看用户列表,修改用户密码。

这些操作都是通过rabbitmqctl管理命令来实现完成。

查看帮助:

rabbitmqctl add_user --help

相应的命令

(1) 查看当前用户列表


rabbitmqctl list_users


(2) 新增一个用户

语法:rabbitmqctl add_user Username Password

示例: rabbitmqctl add_user admin 123456

5.2 设置用户角色

rabbitmqctl set_user_tags User Tag

示例:rabbitmqctl set_user_tags admin administrator

说明:此处设置用户的角色为管理员角色

5.3 设置用户权限

rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

说明:此操作是设置admin用户拥有操作虚拟主机/下的所有权限

查看用户权限

./rabbitmqctl list_permissions

2. web管理后台

Rabbitmq有一个web管理后台,这个管理后台是以插件的方式提供的,启动后台web管理功能,切换到sbin目录下执行:

6.1 启用管理后台

# 查看rabbitmq 的插件列表

./rabbitmq-plugins list

#启用

./rabbitmq-plugins enable rabbitmq_management 

#禁用

./rabbitmq-plugins disable rabbitmq_management

6.2防火墙操作

systemctl status firewalld --检查防火墙状态

systemctl stop firewalld --关闭防火墙,Linux重启之后会失效

systemctl disable firewalld --防火墙置为不可用,Linux重启后,防火墙服务不自动启动,依然是不可用

6.3 访问

http://192.168.131.131:15672 

用户名/密码为我们上面创建的admin/123456

注意上面改成你的虚拟主机的ip地址

备注:如果使用默认用户guest、密码guest登录,会提示User can only log in via localhost

说明guest用户只能从localhost本机登录,所以不要使用该用户。

6.4 通过web页面新建虚拟主机

建完后如下

7. RabbitMQ工作模型

broker 相当于mysql服务器,virtual host相当于数据库(可以有多个数据库)

queue相当于表,消息相当于记录。

消息队列有三个核心要素: 消息生产者消息队列消息消费者

生产者(Producer):发送消息的应用;(java程序,也可能是别的语言写的程序)

消费者(Consumer):接收消息的应用;(java程序,也可能是别的语言写的程序)

代理(Broker):就是消息服务器,RabbitMQ Server就是Message Broker;

连接(Connection):连接RabbitMQ服务器的TCP长连接;

信道(Channel):连接中的一个虚拟通道,消息队列发送或者接收消息时,都是通过信道进行的;

虚拟主机(Virtual host):一个虚拟分组,在代码中就是一个字符串,当多个不同的用户使用同一个RabbitMQ服务时,可以划分出多个Virtual host,每个用户在自己的Virtual host创建exchange/queue等;(分类比较清晰、相互隔离)

交换机(Exchange):交换机负责从生产者接收消息,并根据交换机类型分发到对应的消息队列中,起到一个路由的作用;

路由键(Routing Key):交换机根据路由键来决定消息分发到哪个队列,路由键是消息的目的地址;

绑定(Binding):绑定是队列和交换机的一个关联连接(关联关系);

队列(Queue):存储消息的缓存;

消息(Message):由生产者通过RabbitMQ发送给消费者的信息;(消息可以任何数据,字符串、user对象,json串等等)

8. RabbitMQ交换机类型

Exchange(X) 可翻译成交换机/交换器/路由器

8.1 RabbitMQ交换器 (Exchange)类型

1、Fanout Exchange(扇形)

2、Direct Exchange(直连)

3、Topic Exchange(主题)

4、Headers Exchange(头部)

8.2 Fanout Exchange

8.2.1 介绍

Fanout 扇形的,散开的; 扇形交换机

投递到所有绑定的队列,不需要路由键,不需要进行路由键的匹配,相当于广播、群发;

8.2.2 示例

8.3 Direct Exchange

8.3.1 介绍

根据路由键精确匹配(一模一样)进行路由消息队列;

8.3.2 示例

8.4 Topic Exchange

8.4.1 介绍

通配符匹配,相当于模糊匹配;

#匹配多个单词,用来表示任意数量(零个或多个)单词

*匹配一个单词(必须有一个,而且只有一个),用.隔开的为一个单词:

beijing.# == beijing.queue.abc, beijing.queue.xyz.xxx

beijing.* == beijing.queue, beijing.xyz

发送时指定的路由键:lazy.orange.rabbit

8.4.2 示例

8.5 Headers Exchange (用的比较少)

8.5.1 介绍

基于消息内容中的headers属性进行匹配;

8.5.2 示例

绑定参考代码:

Map<String, Object> headerValues = new HashMap<>();

headerValues.put("type", "m");

headerValues.put("status", 1);

return BindingBuilder.bind(queueA).to(headersExchange).whereAll(headerValues).match();

发送参考代码

  MessageProperties messageProperties = new MessageProperties();

        messageProperties.setHeader("type", "m");

        messageProperties.setHeader("status", 1);

        Message message = new Message(msg.getBytes(), messageProperties);

//        void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;

        amqpTemplate.convertAndSend(RabbitConfig.EXCHANGE, null, message);

学习它的目的是:发消息时可以指定消息属性(MessageProperties)

9. RabbitMQ过期消息

过期消息也叫TTL消息,TTL:Time To Live 

消息的过期时间有两种设置方式:(过期消息)

9.1 设置单条消息的过期时间

参考代码

MessageProperties messageProperties = new MessageProperties();

messageProperties.setExpiration("15000"); // 设置过期时间,单位:毫秒

Message message = new Message(json.getBytes(), messageProperties);

//发送消息

amqpTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, RabbitConfig.DIRECT_ROUTINGKEY, message);

System.out.println("发送完毕:" + new Date());

单条消息的过期时间决定了在没有任何消费者消费时,消息可以存活多久;

9.2 通过队列属性设置消息过期时间

@Bean

public Queue directQueue() {

    Map<String, Object> arguments = new HashMap<>();

    arguments.put("x-message-ttl", 10000);

    return new Queue(DIRECT_QUEUE, true, false, false, arguments);

}

队列的过期时间决定了在没有任何消费者的情况下,队列中的消息可以存活多久;

注意事项:

如果消息和对列都设置过期时间,则消息的TTL以两者之间较小的那个数值为准。

10. RabbitMQ死信队列

也有叫 死信交换机、死信邮箱等说法;

DLX: Dead-Letter-Exchange 死信交换器,死信邮箱

如下情况下一个消息会进入DLX(Dead Letter Exchange)死信交换机。

10.1 消息过期

参考代码

MessageProperties messageProperties=new MessageProperties();

//设置此条消息的过期时间为10秒

messageProperties.setExpiration("10000");


1.2 队列过期

参考代码

 Map<String, Object> arguments =new HashMap<>();

 //指定死信交换机,通过x-dead-letter-exchange 来设置

 arguments.put("x-dead-letter-exchange",EXCHANGE_DLX);

 //设置死信路由key,value 为死信交换机和死信队列绑定的key,要一模一样,因为死信交换机是直连交换机

 arguments.put("x-dead-letter-routing-key",BINDING_DLX_KEY);

 //队列的过期时间

 arguments.put("x-message-ttl",10000);

return  new Queue(QUEUE_NORMAL,true,false,false,arguments);

TTL: Time to Live的简称,过期时间


1.3 队列达到最大长度(先入队的消息会被发送到DLX)

Map<String, Object> arguments = new HashMap<String, Object>();

//设置队列的最大长度 ,对头的消息会被挤出变成死信

arguments.put("x-max-length", 5);


1.4 消费者拒绝消息不进行重新投递

从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列。

application.yml 启动手动确认

spring:

  rabbitmq:

    listener:

      simple:

        acknowledge-mode: manual

参考代码

 /**

     * 监听正常的那个队列的名字,不是监听那个死信队列

     * 我们从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列

     *

     * channel 消息信道(是连接下的一个消息信道,一个连接下有多个消息信息,发消息/接消息都是通过信道完成的)

     */

    @RabbitListener(queues = {RabbitConfig.QUEUE})

    public void process(Message message, Channel channel) {

        System.out.println("接收到的消息:" + message);

        //对消息不确认, ack单词是 确认 的意思

        // void basicNack(long deliveryTag, boolean multiple, boolean requeue)

        // deliveryTag:消息的一个数字标签

        // multiple:翻译成中文是多个的意思,如果是true表示对小于deliveryTag标签下的消息都进行Nack不确认,false表示只对当前deliveryTag标签的消息Nack

        // requeue:如果是true表示消息被Nack后,重新发送到队列,如果是false,消息被Nack后,不会重新发送到队列

        try {

            System.out.println("deliveryTag = " + message.getMessageProperties().getDeliveryTag());

            //要开启rabbitm消息消费的手动确认模式,然后才这么写代码;

            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

        } catch (IOException e) {

            e.printStackTrace();

        }

    }


1.5 消费者拒绝消息

开启手动确认模式,并拒绝消息,不重新投递,则进入死信队列

参考代码:

    /**

     * 监听正常的那个队列的名字,不是监听那个死信队列

     * 我们从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列

     *

     * channel 消息信道(是连接下的一个消息信道,一个连接下有多个消息信息,发消息/接消息都是通过信道完成的)

     */

    @RabbitListener(queues = {RabbitConfig.QUEUE})

    public void process(Message message, Channel channel) {

        System.out.println("接收到的消息:" + message);

        //对消息不确认, ack单词是 确认 的意思

        // void basicNack(long deliveryTag, boolean multiple, boolean requeue)

        // deliveryTag:消息的一个数字标签

        // multiple:翻译成中文是多个的意思,如果是true表示对小于deliveryTag标签下的消息都进行Nack不确认,false表示只对当前deliveryTag标签的消息Nack

        // requeue:如果是true表示消息被Nack后,重新发送到队列,如果是false,消息被Nack后,不会重新发送到队列

        try {

            System.out.println("deliveryTag = " + message.getMessageProperties().getDeliveryTag());

            //要开启rabbitm消息消费的手动确认模式,然后才这么写代码;

            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

}


11. RabbitMQ延迟队列

场景:有一个订单,15分钟内如果不支付,就把该订单设置为交易关闭,那么就不能支付了,这类实现延迟任务的场景就可以采用延迟队列来实现,当然除了延迟队列来实现,也可以有一些其他办法实现;

11.1 定时任务方式

每隔3秒扫描一次数据库,查询过期的订单然后进行处理;

优点:

简单,容易实现;

缺点:

1、存在延迟(延迟时间不准确),如果你每隔1分钟扫一次,那么就有可能延迟1分钟;

2、性能较差,每次扫描数据库,如果订单量很大

1. 被动取消

当用户查询订单的时候,判断订单是否超时,超时了就取消(交易关闭);

优点:

对服务器而言,压力小;

缺点:

1、用户不查询订单,将永远处于待支付状态,会对数据统计等功能造成影响;

2、用户打开订单页面,有可能比较慢,因为要处理大量订单,用户体验少稍差;

11.2 JDK延迟队列(单体应用,不能分布式下)

DelayedQueue

无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素

优点:

实现简单,任务延迟低;

缺点:

服务重启、宕机,数据丢失;

只适合单机版,不适合集群;

订单量大,可能内存不足而发生异常; oom

11.3 采用消息中间件(rabbitmq)

1、RabbitMQ本身不支持延迟队列,可以使用TTL结合DLX的方式来实现消息的延迟投递,即把DLX跟某个队列绑定,到了指定时间,消息过期后,就会从DLX路由到这个队列,消费者可以从这个队列取走消息。

代码:正常延迟

//问题? 如果先发送的消息,消息延迟时间长,会影响后面的 延迟时间段的消息的消费;

//解决:不同延迟时间的消息要发到不同的队列上,同一个队列的消息,它的延迟时间应该一样

代码 延迟问题

14.4 使用rabbitmq-delayed-message-exchange 延迟插件

11.4.1 下载

选择对应的版本下载 rabbitmq-delayed-message-exchange 插件,下载地址:http://www.rabbitmq.com/community-plugins.html 

2、插件拷贝到 RabbitMQ 服务器plugins目录下

11.4.2 解压

unzip rabbitmq_delayed_message_exchange-3.10.2.ez

如果unzip 没有安装,先安装一下

yum install unzip -y

11.4.3 启用插件

./rabbitmq-plugins enable rabbitmq_delayed_message_exchange 开启插件;

11.4.4 查询安装情况

./rabbitmq-plugins list 查询安装的所有插件;

重启rabbitmq使其生效;(此处也可以不重启)

消息发送后不会直接投递到队列,而是存储到 Mnesia(嵌入式数据库),检查 x-delay 时间(消息头部);

延迟插件在 RabbitMQ 3.5.7 及以上的版本才支持,依赖 Erlang/OPT 18.0 及以上运行环境;

Mnesia 是一个小型数据库,不适合于大量延迟消息的实现

解决了消息过期时间不一致出现的问题。

参考代码:

@Component

@Slf4j

public class RabbitConfig {

    public static final String EXCHANGE = "exchange:plugin";

    public static final String QUEUE = "queue.plugin";

    public static final String KEY = "plugin";

    

    @Bean

    public CustomExchange customExchange() {

        Map<String, Object> arguments = new HashMap<>();

        arguments.put("x-delayed-type", "direct");

        // CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)

        return new CustomExchange(EXCHANGE, "x-delayed-message", true, false, arguments);

    }


    @Bean

    public Queue queue() {

        return QueueBuilder.durable(QUEUE).build();

    }


    @Bean

    public Binding binding(CustomExchange customExchange, Queue queue) {

        return BindingBuilder.bind(queue).to(customExchange).with(KEY).noargs();

    }

}

发消息参考代码

MessageProperties messageProperties=new MessageProperties();

messageProperties.setHeader("x-delay",16000);

String msg = "hello world";

Message message=new Message(msg.getBytes(),messageProperties);

rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, "plugin", message);

log.info("发送完毕,发送时间为:{}",new Date());



动力节点RabbitMQ教程|12小时学会rabbitmq消息中间件的评论 (共 条)

分享到微博请遵守国家法律