动力节点RocketMQ全套视频教程-5小时学会rocketmq消息队列

20. RocketMQ消息重复消费问题
20.1 为什么会出现重复消费问题呢?
BROADCASTING(广播)模式下,所有注册的消费者都会消费,而这些消费者通常是集群部署的一个个微服务,这样就会多台机器重复消费,当然这个是根据需要来选择。
CLUSTERING(负载均衡)模式下,如果一个topic被多个consumerGroup消费,也会重复消费。
即使是在CLUSTERING模式下,同一个consumerGroup下,一个队列只会分配给一个消费者,看起来好像是不会重复消费。但是,有个特殊情况:一个消费者新上线后,同组的所有消费者要重新负载均衡(反之一个消费者掉线后,也一样)。一个队列所对应的新的消费者要获取之前消费的offset(偏移量,也就是消息消费的点位),此时之前的消费者可能已经消费了一条消息,但是并没有把offset提交给broker,那么新的消费者可能会重新消费一次。虽然orderly模式是前一个消费者先解锁,后一个消费者加锁再消费的模式,比起concurrently要严格了,但是加锁的线程和提交offset的线程不是同一个,所以还是会出现极端情况下的重复消费。
还有在发送批量消息的时候,会被当做一条消息进行处理,那么如果批量消息中有一条业务处理成功,其他失败了,还是会被重新消费一次。
那么如果在CLUSTERING(负载均衡)模式下,并且在同一个消费者组中,不希望一条消息被重复消费,改怎么办呢?我们可以想到去重操作,找到消息唯一的标识,可以是msgId也可以是你自定义的唯一的key,这样就可以去重了
20.2 解决方案
使用去重方案解决,例如将消息的唯一标识存起来,然后每次消费之前先判断是否存在这个唯一标识,如果存在则不消费,如果不存在则消费,并且消费以后将这个标记保存。
想法很好,但是消息的体量是非常大的,可能在生产环境中会到达上千万甚至上亿条,那么我们该如何选择一个容器来保存所有消息的标识,并且又可以快速的判断是否存在呢?
我们可以选择布隆过滤器(BloomFilter)
布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难。
在hutool的工具中我们可以直接使用,当然你自己使用redis的bitmap类型手写一个也是可以的 https://hutool.cn/docs/#/bloomFilter/%E6%A6%82%E8%BF%B0

20.5 测试消费者
/**
* 在boot项目中可以使用@Bean在整个容器中放置一个单利对象
*/
public static BitMapBloomFilter bloomFilter = new BitMapBloomFilter(100);
@Test
public void testRepeatConsumer() throws Exception {
// 创建默认消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setMessageModel(MessageModel.BROADCASTING);
// 设置nameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个主题来消费 表达式,默认是*
consumer.subscribe("TopicTest", "*");
// 注册一个消费监听 MessageListenerConcurrently是并发消费
// 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 拿到消息的key
MessageExt messageExt = msgs.get(0);
String keys = messageExt.getKeys();
// 判断是否存在布隆过滤器中
if (bloomFilter.contains(keys)) {
// 直接返回了 不往下处理业务
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 这个处理业务,然后放入过滤器中
// do sth...
bloomFilter.add(keys);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
21. RocketMQ集成SpringBoot
21.1 搭建rocketmq-producer(消息生产者)


21.1.1 创建项目,完整的pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.powernode</groupId>
<artifactId>01-rocketmq-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rocketmq-producer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- rocketmq的依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
21.1.2 修改配置文件application.yml
spring:
application:
name: rocketmq-producer
rocketmq:
name-server: 127.0.0.1:9876 # rocketMq的nameServer地址
producer:
group: powernode-group # 生产者组别
send-message-timeout: 3000 # 消息发送的超时时间
retry-times-when-send-async-failed: 2 # 异步消息发送失败重试次数
max-message-size: 4194304 # 消息的最大长度
21.1.3 我们在测试类里面测试发送消息
往powernode主题里面发送一个简单的字符串消息
/**
* 注入rocketMQTemplate,我们使用它来操作mq
*/
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 测试发送简单的消息
*
* @throws Exception
*/
@Test
public void testSimpleMsg() throws Exception {
// 往powernode的主题里面发送一个简单的字符串消息
SendResult sendResult = rocketMQTemplate.syncSend("powernode", "我是一个简单的消息");
// 拿到消息的发送状态
System.out.println(sendResult.getSendStatus());
// 拿到消息的id
System.out.println(sendResult.getMsgId());
}
运行后查看控制台

21.1.4 查看rocketMq的控制台

查看消息细节

21.2 搭建rocketmq-consumer(消息消费者)


21.2.1 创建项目,完整的pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.powernode</groupId>
<artifactId>02-rocketmq-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rocketmq-consumer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- rocketmq的依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
21.2.2 修改配置文件application.yml
spring:
application:
name: rocketmq-consumer
rocketmq:
name-server: 127.0.0.1:9876
21.2.3 添加一个监听的类SimpleMsgListener
消费者要消费消息,就添加一个监听
package com.powernode.listener;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 创建一个简单消息的监听
* 1.类上添加注解@Component和@RocketMQMessageListener
* @RocketMQMessageListener(topic = "powernode", consumerGroup = "powernode-group")
* topic指定消费的主题,consumerGroup指定消费组,一个主题可以有多个消费者组,一个消息可以被多个不同的组的消费者都消费
* 2.实现RocketMQListener接口,注意泛型的使用,可以为具体的类型,如果想拿到消息
* 的其他参数可以写成MessageExt
*/
@Component
@RocketMQMessageListener(topic = "powernode", consumerGroup = "powernode-group",messageModel = MessageModel.CLUSTERING)
public class SimpleMsgListener implements RocketMQListener<String> {
/**
* 消费消息的方法
*
* @param message
*/
@Override
public void onMessage(String message) {
System.out.println(message);
}
}