欢迎光临散文网 会员登陆 & 注册

消息队列rabbitmq使用

2022-05-13 14:45 作者:wulizhao1  | 我要投稿

1、docker启动

docker search rabbitmq:management 搜索

docker pull rabbitmq:management  拉取镜像

docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management 启动

docker logs rabbitmq  打印日志

此时,可以在浏览器登录,地址http://localhost:15672,用户名密码都为guest


2、使用

2.1编写生产者类


public class Producer {

    private final static String QUEUE_NAME = "Test";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        factory.setHost("localhost");

        factory.setPort(5672);

        factory.setVirtualHost("/");


        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        /**

         * 生成一个队列

         * 1、队列名称 QUEUE_NAME

         * 2、队列里面的消息是否持久化(默认消息存储在内存中)

         * 3、该队列是否只供一个Consumer消费 是否共享 设置为true可以多个消费者消费

         * 4、是否自动删除 最后一个消费者断开连接后 该队列是否自动删除

         * 5、其他参数

         */

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        String message = "Hello world!";

        /**

         * 发送消息

         * 1、发送到哪个exchange交换机,2、路由的key,3、其他的参数信息,4、消息体

         */

        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

        System.out.println("send '"+message+"'");


        channel.close();

        connection.close();

    }

}


2.2编写消费者类

public class Receiver {

    private final static String QUEUE_NAME = "Test";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("guest");

        factory.setPassword("guest");

        factory.setHost("localhost");

        factory.setPort(5672);

        factory.setVirtualHost("/");

        factory.setConnectionTimeout(600000);//milliseconds

        factory.setRequestedHeartbeat(60);//seconds

        factory.setHandshakeTimeout(6000);//milliseconds

        factory.setRequestedChannelMax(5);

        factory.setNetworkRecoveryInterval(500);


        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();


        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        System.out.println("Waiting for messages. ");


        Consumer consumer = new DefaultConsumer(channel) {

            @Override

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {

                String message = new String(body, "UTF-8");

                System.out.println(" received '" + message + "'");

            }

        };

        channel.basicConsume(QUEUE_NAME,true,consumer);

    }

}

2.3工具类

public class RabbitMqUtils {

    public static Channel getChannel() throws Exception{

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");

        factory.setUsername("guest");

        factory.setPassword("guest");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        return channel;

    }

}

2.4 设置手动应答

rabbitmq将message发送给消费者后,就会将该消息标记为删除。如果消费者在处理message过程中宕机,没有来得及处理消息,则会导致消息的丢失。
因此,需要设置手动应答。代码如下:


2.4.1 生产者

public class Task {

    private static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception{

        try(Channel channel = RabbitMqUtils.getChannel()){

            channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);

            Scanner scanner = new Scanner(System.in);

            System.out.println("请输入信息");

            while(scanner.hasNext()){

                String message = scanner.nextLine();

                channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());

                System.out.println("生产者发出消息"+ message);

            }

        }

    }

}


2.4.1 消费者

public class Work {

    private static final String ACK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception{

        Channel channel = RabbitMqUtils.getChannel();

        DeliverCallback deliverCallback = (consumerTag,delivery)->{

            String message = new String(delivery.getBody());

            SleepUtils.sleep(1);

            System.out.println("接收到消息:"+message);

            /**

             * 1、消息的标记tag

             * 2、是否批量应答

             */

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

        };

        CancelCallback cancelCallback = (consumerTag)->{

            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");

        };

        //采用手动应答

        boolean autoAck = false;

        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);

    }

}

工具类SleepUtils :

public class SleepUtils {

    public static void sleep(int second){

        try{

            Thread.sleep(1000*second);

        }catch (InterruptedException _ignored){

            Thread.currentThread().interrupt();

        }

    }

}

2.5 不公平分发

rabbitmq默认是轮询分发:生产者依次向消费者按顺序发送消息。但是当消费者A处理速度很快,而消费者B处理速度很慢时,这种分发策略显然是不合理的。需要设置为不公平分发:

int prefetchCount = 10;

channel.basicQos(prefetchCount);

通过此配置,rabbitmq会优先将message分发给空闲消费者

消息队列rabbitmq使用的评论 (共 条)

分享到微博请遵守国家法律