比如一个日志系统,之前的处理方式呢,是各种类型(info,error,warning)的消息都发给订阅者,可是实际情况上不一定都需要。可能A需要error,其他的都不需要。那么就引入了今天的处理方式--路由(直接交换)
(兔子的官网真心良心,图文并茂,通俗易懂)这种处理方式你只需记住一个字:有选择的接受消息
首先,我们将消息绑定在不同的路由键上,然后消费者根据需要绑定对应的路由键即可收到消息。路由键随便取名字
生产者代码:
package com.example.demo;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 有选择的接受消息 */public class RoutingSend { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); // 连接工厂 factory.setHost("localhost"); Connection connection = factory.newConnection(); // 获取连接 Channel channel = connection.createChannel(); // 当我们发送时,需要一个路由密钥,这里选择直接交换 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String[] msg = {"错误","信息","警告"}; // 第二个参数为路由密钥 channel.basicPublish(EXCHANGE_NAME, "error", null, msg[0].getBytes()); channel.basicPublish(EXCHANGE_NAME, "info", null, msg[1].getBytes()); channel.basicPublish(EXCHANGE_NAME, "warning", null, msg[2].getBytes()); System.out.println("PS-Send:" + msg.toString()); channel.close(); connection.close(); }}
消费者代码:
package com.example.demo;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class RoutingReceive { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); // 连接工厂 factory.setHost("localhost"); Connection connection = factory.newConnection(); // 获取连接 Channel channel = connection.createChannel(); // 声明一个direct交换类型 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 当声明队列,不加任何参数,产生的将是一个临时队列,getQueue返回的是队列名称 String queueA = channel.queueDeclare().getQueue(); String queueB = channel.queueDeclare().getQueue(); System.out.println("临时队列:" + queueA); System.out.println("临时队列:" + queueB); // 第三个参数为“绑定建” channel.queueBind(queueA, EXCHANGE_NAME, "error"); channel.queueBind(queueB, EXCHANGE_NAME, "info"); channel.queueBind(queueB, EXCHANGE_NAME, "warning"); Consumer consumerA = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String recv = new String(body, "UTF-8"); System.out.println("Direct-Receive-A:" + recv); } }; Consumer consumerB = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String recv = new String(body, "UTF-8"); System.out.println("Direct-Receive-B:" + recv); } }; channel.basicConsume(queueA, true, consumerA); channel.basicConsume(queueB, true, consumerB); }}
先启动消费者:再启动生产者,查看控制台:
..