欢迎光临散文网 会员登陆 & 注册

动力节点RocketMQ全套视频教程-5小时学会rocketmq消息队列

2023-05-16 11:17 作者:摸鱼协会秘书长  | 我要投稿

20. RocketMQ消息重复消费问题

20.1 为什么会出现重复消费问题呢?

BROADCASTING(广播)模式下,所有注册的消费者都会消费,而这些消费者通常是集群部署的一个个微服务,这样就会多台机器重复消费,当然这个是根据需要来选择。

CLUSTERING(负载均衡)模式下,如果一个topic被多个consumerGroup消费,也会重复消费。

即使是在CLUSTERING模式下,同一个consumerGroup下,一个队列只会分配给一个消费者,看起来好像是不会重复消费。但是,有个特殊情况:一个消费者新上线后,同组的所有消费者要重新负载均衡(反之一个消费者掉线后,也一样)。一个队列所对应的新的消费者要获取之前消费的offset(偏移量,也就是消息消费的点位),此时之前的消费者可能已经消费了一条消息,但是并没有把offset提交给broker,那么新的消费者可能会重新消费一次。虽然orderly模式是前一个消费者先解锁,后一个消费者加锁再消费的模式,比起concurrently要严格了,但是加锁的线程和提交offset的线程不是同一个,所以还是会出现极端情况下的重复消费。

还有在发送批量消息的时候,会被当做一条消息进行处理,那么如果批量消息中有一条业务处理成功,其他失败了,还是会被重新消费一次。

那么如果在CLUSTERING(负载均衡)模式下,并且在同一个消费者组中,不希望一条消息被重复消费,改怎么办呢?我们可以想到去重操作,找到消息唯一的标识,可以是msgId也可以是你自定义的唯一的key,这样就可以去重了

20.2 解决方案

使用去重方案解决,例如将消息的唯一标识存起来,然后每次消费之前先判断是否存在这个唯一标识,如果存在则不消费,如果不存在则消费,并且消费以后将这个标记保存。

想法很好,但是消息的体量是非常大的,可能在生产环境中会到达上千万甚至上亿条,那么我们该如何选择一个容器来保存所有消息的标识,并且又可以快速的判断是否存在呢?

我们可以选择布隆过滤器(BloomFilter)

布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难。

在hutool的工具中我们可以直接使用,当然你自己使用redis的bitmap类型手写一个也是可以的 https://hutool.cn/docs/#/bloomFilter/%E6%A6%82%E8%BF%B0 

20.5 测试消费者

/**

 * 在boot项目中可以使用@Bean在整个容器中放置一个单利对象

 */

public static BitMapBloomFilter bloomFilter = new BitMapBloomFilter(100);


@Test

public void testRepeatConsumer() throws Exception {

    // 创建默认消费者组

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");

    consumer.setMessageModel(MessageModel.BROADCASTING);

    // 设置nameServer地址

    consumer.setNamesrvAddr("localhost:9876");

    // 订阅一个主题来消费   表达式,默认是*

    consumer.subscribe("TopicTest", "*");

    // 注册一个消费监听 MessageListenerConcurrently是并发消费

    // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()

    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override

        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,

                                                        ConsumeConcurrentlyContext context) {

            // 拿到消息的key

            MessageExt messageExt = msgs.get(0);

            String keys = messageExt.getKeys();

            // 判断是否存在布隆过滤器中

            if (bloomFilter.contains(keys)) {

                // 直接返回了 不往下处理业务

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

            }

            // 这个处理业务,然后放入过滤器中

            // do sth...

            bloomFilter.add(keys);

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

        }

    });

    consumer.start();

    System.in.read();

}

21. RocketMQ集成SpringBoot

21.1 搭建rocketmq-producer(消息生产者)


21.1.1 创建项目,完整的pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <parent>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-parent</artifactId>

        <version>2.6.3</version>

        <relativePath/> <!-- lookup parent from repository -->

    </parent>

    <groupId>com.powernode</groupId>

    <artifactId>01-rocketmq-producer</artifactId>

    <version>0.0.1-SNAPSHOT</version>

    <name>rocketmq-producer</name>

    <description>Demo project for Spring Boot</description>

    <properties>

        <java.version>1.8</java.version>

    </properties>

    <dependencies>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-web</artifactId>

        </dependency>

        <!-- rocketmq的依赖 -->

        <dependency>

            <groupId>org.apache.rocketmq</groupId>

            <artifactId>rocketmq-spring-boot-starter</artifactId>

            <version>2.2.2</version>

        </dependency>


        <dependency>

            <groupId>org.projectlombok</groupId>

            <artifactId>lombok</artifactId>

            <optional>true</optional>

        </dependency>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-test</artifactId>

            <scope>test</scope>

        </dependency>

    </dependencies>


    <build>

        <plugins>

            <plugin>

                <groupId>org.springframework.boot</groupId>

                <artifactId>spring-boot-maven-plugin</artifactId>

                <configuration>

                    <excludes>

                        <exclude>

                            <groupId>org.projectlombok</groupId>

                            <artifactId>lombok</artifactId>

                        </exclude>

                    </excludes>

                </configuration>

            </plugin>

        </plugins>

    </build>


</project>

 

21.1.2 修改配置文件application.yml

spring:

    application:

        name: rocketmq-producer

rocketmq:

    name-server: 127.0.0.1:9876     # rocketMq的nameServer地址

    producer:

        group: powernode-group        # 生产者组别

        send-message-timeout: 3000  # 消息发送的超时时间

        retry-times-when-send-async-failed: 2  # 异步消息发送失败重试次数

        max-message-size: 4194304       # 消息的最大长度

 21.1.3 我们在测试类里面测试发送消息

往powernode主题里面发送一个简单的字符串消息

/**

 * 注入rocketMQTemplate,我们使用它来操作mq

 */

@Autowired

private RocketMQTemplate rocketMQTemplate;


/**

 * 测试发送简单的消息

 *

 * @throws Exception

 */

@Test

public void testSimpleMsg() throws Exception {

    // 往powernode的主题里面发送一个简单的字符串消息

    SendResult sendResult = rocketMQTemplate.syncSend("powernode", "我是一个简单的消息");

    // 拿到消息的发送状态

    System.out.println(sendResult.getSendStatus());

    // 拿到消息的id

    System.out.println(sendResult.getMsgId());

}

 

运行后查看控制台

21.1.4 查看rocketMq的控制台

查看消息细节


21.2 搭建rocketmq-consumer(消息消费者)


21.2.1 创建项目,完整的pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <parent>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-parent</artifactId>

        <version>2.6.3</version>

        <relativePath/> <!-- lookup parent from repository -->

    </parent>

    <groupId>com.powernode</groupId>

    <artifactId>02-rocketmq-consumer</artifactId>

    <version>0.0.1-SNAPSHOT</version>

    <name>rocketmq-consumer</name>

    <description>Demo project for Spring Boot</description>

    <properties>

        <java.version>1.8</java.version>

    </properties>

    <dependencies>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-web</artifactId>

        </dependency>

        <!-- rocketmq的依赖 -->

        <dependency>

            <groupId>org.apache.rocketmq</groupId>

            <artifactId>rocketmq-spring-boot-starter</artifactId>

            <version>2.2.2</version>

        </dependency>

        <dependency>

            <groupId>org.projectlombok</groupId>

            <artifactId>lombok</artifactId>

            <optional>true</optional>

        </dependency>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-test</artifactId>

            <scope>test</scope>

        </dependency>

    </dependencies>


    <build>

        <plugins>

            <plugin>

                <groupId>org.springframework.boot</groupId>

                <artifactId>spring-boot-maven-plugin</artifactId>

                <configuration>

                    <excludes>

                        <exclude>

                            <groupId>org.projectlombok</groupId>

                            <artifactId>lombok</artifactId>

                        </exclude>

                    </excludes>

                </configuration>

            </plugin>

        </plugins>

    </build>


</project>

 

21.2.2 修改配置文件application.yml

spring:

    application:

        name: rocketmq-consumer

rocketmq:

    name-server: 127.0.0.1:9876

21.2.3 添加一个监听的类SimpleMsgListener

消费者要消费消息,就添加一个监听

package com.powernode.listener;


import org.apache.rocketmq.spring.annotation.MessageModel;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;

import org.apache.rocketmq.spring.core.RocketMQListener;

import org.springframework.stereotype.Component;


/**

 * 创建一个简单消息的监听

 * 1.类上添加注解@Component和@RocketMQMessageListener

 *      @RocketMQMessageListener(topic = "powernode", consumerGroup = "powernode-group")

 *      topic指定消费的主题,consumerGroup指定消费组,一个主题可以有多个消费者组,一个消息可以被多个不同的组的消费者都消费

 * 2.实现RocketMQListener接口,注意泛型的使用,可以为具体的类型,如果想拿到消息

 * 的其他参数可以写成MessageExt

 */

@Component

@RocketMQMessageListener(topic = "powernode", consumerGroup = "powernode-group",messageModel = MessageModel.CLUSTERING)

public class SimpleMsgListener implements RocketMQListener<String> {


    /**

     * 消费消息的方法

     *

     * @param message

     */

    @Override

    public void onMessage(String message) {

        System.out.println(message);

    }

}

21.2.4 启动rocketmq-consumer

查看控制台,发现我们已经监听到消息了

动力节点RocketMQ全套视频教程-5小时学会rocketmq消息队列的评论 (共 条)

分享到微博请遵守国家法律