一. 背景
总前提:队列无论是在生产者声明还是在消费者声明,只有声明了,才能在RabbitMQ的管理界面看到该队列
生产者直接发送消息到队列,消费者直接消费队列中的消息,而不用指定exchange并绑定。这种需求下,分三种情况:① 生产者声明队列(指定队列名称),消费者不指定队列,而是直接消费生产者指定的队列;② 生产者声指定队列,但不声明队列,而是直接将消息发送到该队列,消费生声明该队列,并从该队列接收消息;③ 生产者声明队列并将消息发送到该队列,消费者也声明该队列,并从该队列消费消息,但是:生产者和消费者声明队列时指定的参数要一致,否则会报错。下面分别进行说明:
1. 生产者声明队列(指定队列名称),消费者不指定队列,而是直接消费生产者指定的队列,但是此时,声明队列的一方要先运行,否则消费者连不上队列,要报错
① 生产者代码
1 import java.io.IOException; 2 import com.rabbitmq.client.Channel; 3 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.ConnectionFactory; 5 6 public class Producer { 7 private final static String QUEUE_NAME = "QUEUE1"; 8 9 public static void main(String[] args) throws IOException { 10 ConnectionFactory factory = new ConnectionFactory(); 11 factory.setHost("localhost"); 12 factory.setUsername("guest"); 13 factory.setPassword("guest"); 14 factory.setPort(5672); 15 Connection connection = factory.newConnection(); 16 Channel channel = connection.createChannel(); 17 18 // 声明队列 19 channel.queueDeclare(QUEUE_NAME, true, false, false, null); 20 String message = "Hello World!"; 21 22 // 发行消息到队列 23 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 24 System.out.println(" [x] Sent '" + message + "'"); 25 26 channel.close(); 27 connection.close(); 28 } 29 }
2. 消费者
1 import com.rabbitmq.client.ConnectionFactory; 2 import com.rabbitmq.client.QueueingConsumer; 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 6 public class Reqv { 7 8 private final static String QUEUE_NAME = "QUEUE1"; 9 10 public static void main(String[] argv) throws Exception { 11 12 ConnectionFactory factory = new ConnectionFactory(); 13 factory.setUsername("guest"); 14 factory.setPassword("guest"); 15 factory.setHost("localhost"); 16 factory.setPort(5672); 17 18 Connection connection = factory.newConnection(); 19 Channel channel = connection.createChannel(); 20 21 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 22 23 QueueingConsumer consumer = new QueueingConsumer(channel); 24 25 // 消费者不声明队列,直接从队列中消费 26 channel.basicConsume(QUEUE_NAME, true, consumer); 27 while(true){ 28 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 29 String message = new String(delivery.getBody(),"UTF-8"); 30 System.out.println(" 【[x] Received 】:" + message); 31 } 32 } 33 }
2. 生产者声指定队列,但不声明队列,而是直接将消息发送到该队列,消费生声明该队列,并从该队列接收消息,生产者可先运行)(不报错),但是发的消息无效(被丢弃),只有声明队列的一方运行后,在管理界面才能看到该队列
① 生产者
1 import java.io.IOException; 2 import com.rabbitmq.client.Channel; 3 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.ConnectionFactory; 5 6 public class Producer { 7 private final static String QUEUE_NAME = "QUEUE2"; 8 9 public static void main(String[] args) throws IOException { 10 ConnectionFactory factory = new ConnectionFactory(); 11 factory.setHost("localhost"); 12 factory.setUsername("guest"); 13 factory.setPassword("guest"); 14 factory.setPort(5672); 15 Connection connection = factory.newConnection(); 16 Channel channel = connection.createChannel(); 17 18 String message = "Hello World!"; 19 20 // 发行消息到队列 21 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 22 System.out.println(" [x] Sent '" + message + "'"); 23 24 channel.close(); 25 connection.close(); 26 } 27 }
② 消费者
1 import com.rabbitmq.client.ConnectionFactory; 2 import com.rabbitmq.client.QueueingConsumer; 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 6 public class Reqv { 7 8 private final static String QUEUE_NAME = "QUEUE2"; 9 10 public static void main(String[] argv) throws Exception { 11 12 ConnectionFactory factory = new ConnectionFactory(); 13 factory.setUsername("guest"); 14 factory.setPassword("guest"); 15 factory.setHost("localhost"); 16 factory.setPort(5672); 17 18 Connection connection = factory.newConnection(); 19 Channel channel = connection.createChannel(); 20 21 // 声明队列 22 channel.queueDeclare(QUEUE_NAME, true, false, false, null); 23 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 24 25 QueueingConsumer consumer = new QueueingConsumer(channel); 26 27 // 消费者不声明队列,直接从队列中消费 28 channel.basicConsume(QUEUE_NAME, true, consumer); 29 while(true){ 30 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 31 String message = new String(delivery.getBody(),"UTF-8"); 32 System.out.println(" 【[x] Received 】:" + message); 33 } 34 } 35 }
3. 生产者声明队列并将消息发送到该队列,消费者也声明该队列,并从该队列消费消息,但是:生产者和消费者声明队列时指定的参数要一致,否则会报错。
① 生产者
1 import java.io.IOException; 2 import com.rabbitmq.client.Channel; 3 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.ConnectionFactory; 5 6 public class Producer { 7 private final static String QUEUE_NAME = "QUEUE2"; 8 9 public static void main(String[] args) throws IOException { 10 ConnectionFactory factory = new ConnectionFactory(); 11 factory.setHost("localhost"); 12 factory.setUsername("guest"); 13 factory.setPassword("guest"); 14 factory.setPort(5672); 15 Connection connection = factory.newConnection(); 16 Channel channel = connection.createChannel(); 17 18 // 声明队列 19 channel.queueDeclare(QUEUE_NAME, true, false, false, null); 20 String message = "Hello World!"; 21 22 // 发行消息到队列 23 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 24 System.out.println(" [x] Sent '" + message + "'"); 25 26 channel.close(); 27 connection.close(); 28 } 29 }
② 消费者
1 import com.rabbitmq.client.ConnectionFactory; 2 import com.rabbitmq.client.QueueingConsumer; 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 6 public class Reqv { 7 8 private final static String QUEUE_NAME = "QUEUE2"; 9 10 public static void main(String[] argv) throws Exception { 11 12 ConnectionFactory factory = new ConnectionFactory(); 13 factory.setUsername("guest"); 14 factory.setPassword("guest"); 15 factory.setHost("localhost"); 16 factory.setPort(5672); 17 18 Connection connection = factory.newConnection(); 19 Channel channel = connection.createChannel(); 20 21 // 声明队列 22 channel.queueDeclare(QUEUE_NAME, true, false, false, null); 23 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 24 25 QueueingConsumer consumer = new QueueingConsumer(channel); 26 27 // 消费者不声明队列,直接从队列中消费 28 channel.basicConsume(QUEUE_NAME, true, consumer); 29 while(true){ 30 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 31 String message = new String(delivery.getBody(),"UTF-8"); 32 System.out.println(" 【[x] Received 】:" + message); 33 } 34 } 35 }