消息队列rabbitmq使用
1、docker启动
docker search rabbitmq:management 搜索
docker pull rabbitmq:management 拉取镜像
docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management 启动
docker logs rabbitmq 打印日志
此时,可以在浏览器登录,地址http://localhost:15672,用户名密码都为guest
2、使用
2.1编写生产者类
public class Producer {
private final static String QUEUE_NAME = "Test";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/**
* 生成一个队列
* 1、队列名称 QUEUE_NAME
* 2、队列里面的消息是否持久化(默认消息存储在内存中)
* 3、该队列是否只供一个Consumer消费 是否共享 设置为true可以多个消费者消费
* 4、是否自动删除 最后一个消费者断开连接后 该队列是否自动删除
* 5、其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message = "Hello world!";
/**
* 发送消息
* 1、发送到哪个exchange交换机,2、路由的key,3、其他的参数信息,4、消息体
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("send '"+message+"'");
channel.close();
connection.close();
}
}
2.2编写消费者类
public class Receiver {
private final static String QUEUE_NAME = "Test";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setConnectionTimeout(600000);//milliseconds
factory.setRequestedHeartbeat(60);//seconds
factory.setHandshakeTimeout(6000);//milliseconds
factory.setRequestedChannelMax(5);
factory.setNetworkRecoveryInterval(500);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
System.out.println("Waiting for messages. ");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
2.3工具类
public class RabbitMqUtils {
public static Channel getChannel() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
2.4 设置手动应答
rabbitmq将message发送给消费者后,就会将该消息标记为删除。如果消费者在处理message过程中宕机,没有来得及处理消息,则会导致消息的丢失。
因此,需要设置手动应答。代码如下:
2.4.1 生产者
public class Task {
private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
try(Channel channel = RabbitMqUtils.getChannel()){
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
Scanner scanner = new Scanner(System.in);
System.out.println("请输入信息");
while(scanner.hasNext()){
String message = scanner.nextLine();
channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());
System.out.println("生产者发出消息"+ message);
}
}
}
}
2.4.1 消费者
public class Work {
private static final String ACK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String message = new String(delivery.getBody());
SleepUtils.sleep(1);
System.out.println("接收到消息:"+message);
/**
* 1、消息的标记tag
* 2、是否批量应答
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
};
//采用手动应答
boolean autoAck = false;
channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
工具类SleepUtils :
public class SleepUtils {
public static void sleep(int second){
try{
Thread.sleep(1000*second);
}catch (InterruptedException _ignored){
Thread.currentThread().interrupt();
}
}
}
2.5 不公平分发
rabbitmq默认是轮询分发:生产者依次向消费者按顺序发送消息。但是当消费者A处理速度很快,而消费者B处理速度很慢时,这种分发策略显然是不合理的。需要设置为不公平分发:
int prefetchCount = 10;
channel.basicQos(prefetchCount);
通过此配置,rabbitmq会优先将message分发给空闲消费者