SpringBoot+RabbitMQ学习笔记(三)使用RabbitMQ的三种交换器之Topic

一丶简介

Topic Exchange 
将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。

业务场景:

1.日志服务器记录用户服务、商品服务、订单服务三个服务。

2.日志服务器有三个日志服务:INFO日志处理服务、ERROR日志处理服务、全日志处理服务。

3.使用Topic交换器处理日志,匹配规则依次为:*.log.info、*.log.error和*.log.*。

《SpringBoot+RabbitMQ学习笔记(三)使用RabbitMQ的三种交换器之Topic》

二丶配置文件

还是创建两个项目,一个作为生产者一个作为消费者。

生产者配置:

《SpringBoot+RabbitMQ学习笔记(三)使用RabbitMQ的三种交换器之Topic》
《SpringBoot+RabbitMQ学习笔记(三)使用RabbitMQ的三种交换器之Topic》

server.port=8883

spring.application.name=hello-world

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.thymeleaf.cache=false

设置交换器名称
mq.config.exchange=log.topic

View Code

消费者配置:

《SpringBoot+RabbitMQ学习笔记(三)使用RabbitMQ的三种交换器之Topic》
《SpringBoot+RabbitMQ学习笔记(三)使用RabbitMQ的三种交换器之Topic》

server.port=8884

spring.application.name=lesson1

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#设置交换器名称
mq.config.exchange=log.topic
#info队列名称
mq.config.queue.info=log.info
#error队列名称
mq.config.queue.error=log.error
#log队列名称
mq.config.queue.logs=log.all

View Code

三丶创建生产者

1.订单服务

《SpringBoot+RabbitMQ学习笔记(三)使用RabbitMQ的三种交换器之Topic》
《SpringBoot+RabbitMQ学习笔记(三)使用RabbitMQ的三种交换器之Topic》

package com.example.amqptopicprovider;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:模拟订单服务发送消息
 */
@Component
public class OrderSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    //exChange 交换器
    @Value("${mq.config.exchange}")
    private String exChange;

    /**
     * 发送消息的方法
     * @param msg
     */
    public void send(String msg){
        //向消息队列发送消息
        //参数1:队列名称
        //参数2:消息
        // this.amqpTemplate.convertAndSend("hello-queue",msg);

        //向消息队列发送消息
        //参数1:交换器名称
        //参数2:路由键
        //参数3:消息
        this.amqpTemplate.convertAndSend(exChange,"order.log.debug","order.log.debug-"+msg);
        this.amqpTemplate.convertAndSend(exChange,"order.log.info","order.log.info-"+msg);
        this.amqpTemplate.convertAndSend(exChange,"order.log.warn","order.log.warn-"+msg);
        this.amqpTemplate.convertAndSend(exChange,"order.log.error","order.log.error-"+msg);

    }
}

View Code

2.商品服务

《SpringBoot+RabbitMQ学习笔记(三)使用RabbitMQ的三种交换器之Topic》
《SpringBoot+RabbitMQ学习笔记(三)使用RabbitMQ的三种交换器之Topic》

package com.example.amqptopicprovider;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:模拟商品服务发送消息
 */
@Component
public class ProductSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    //exChange 交换器
    @Value("${mq.config.exchange}")
    private String exChange;

    /**
     * 发送消息的方法
     * @param msg
     */
    public void send(String msg){
        //向消息队列发送消息
        //参数1:队列名称
        //参数2:消息
        // this.amqpTemplate.convertAndSend("hello-queue",msg);

        //向消息队列发送消息
        //参数1:交换器名称
        //参数2:路由键
        //参数3:消息
        this.amqpTemplate.convertAndSend(exChange,"product.log.debug","product.log.debug-"+msg);
        this.amqpTemplate.convertAndSend(exChange,"product.log.info","product.log.info-"+msg);
        this.amqpTemplate.convertAndSend(exChange,"product.log.warn","product.log.warn-"+msg);
        this.amqpTemplate.convertAndSend(exChange,"product.log.error","product.log.error-"+msg);

    }
}

View Code

3.用户服务

《SpringBoot+RabbitMQ学习笔记(三)使用RabbitMQ的三种交换器之Topic》
《SpringBoot+RabbitMQ学习笔记(三)使用RabbitMQ的三种交换器之Topic》

package com.example.amqptopicprovider;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:模拟用户服务发送消息
 */
@Component
public class UserSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    //exChange 交换器
    @Value("${mq.config.exchange}")
    private String exChange;

    /**
     * 发送消息的方法
     * @param msg
     */
    public void send(String msg){
        //向消息队列发送消息
        //参数1:队列名称
        //参数2:消息
        // this.amqpTemplate.convertAndSend("hello-queue",msg);

        //向消息队列发送消息
        //参数1:交换器名称
        //参数2:路由键
        //参数3:消息
        this.amqpTemplate.convertAndSend(exChange,"user.log.debug","user.log.debug-"+msg);
        this.amqpTemplate.convertAndSend(exChange,"user.log.info","user.log.info-"+msg);
        this.amqpTemplate.convertAndSend(exChange,"user.log.warn","user.log.warn-"+msg);
        this.amqpTemplate.convertAndSend(exChange,"user.log.error","user.log.error-"+msg);

    }
}

View Code

四丶创建消费者

1.ERROR日志处理服务

《SpringBoot+RabbitMQ学习笔记(三)使用RabbitMQ的三种交换器之Topic》
《SpringBoot+RabbitMQ学习笔记(三)使用RabbitMQ的三种交换器之Topic》

package com.ant.amqptopicconsumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:消息接收者
 * @RabbitListener bindings:绑定队列
 * @QueueBinding  value:绑定队列的名称
 *                  exchange:配置交换器
 * @Queue : value:配置队列名称
 *          autoDelete:是否是一个可删除的临时队列
 * @Exchange value:为交换器起个名称
 *           type:指定具体的交换器类型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.error}",autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "*.log.error"
        )
)
public class TopicErrorReceiver {

    /**
     * 接收消息的方法,采用消息队列监听机制
     * @param msg
     */
    @RabbitHandler
    public void process(String msg){
        System.out.println("error-receiver:"+msg);
    }
}

View Code

2.INFO日志处理服务

《SpringBoot+RabbitMQ学习笔记(三)使用RabbitMQ的三种交换器之Topic》
《SpringBoot+RabbitMQ学习笔记(三)使用RabbitMQ的三种交换器之Topic》

package com.ant.amqptopicconsumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * Author:aijiaxiang
 * Date:2020/4/26
 * Description:消息接收者
 * @RabbitListener bindings:绑定队列
 * @QueueBinding  value:绑定队列的名称
 *                  exchange:配置交换器
 * @Queue : value:配置队列名称
 *          autoDelete:是否是一个可删除的临时队列
 * @Exchange value:为交换器起个名称
 *           type:指定具体的交换器类型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.info}",autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                key = "*.log.info"
        )
)
public class TopicInfoReceiver {

    /**
     * 接收消息的方法,采用消息队列监听机制
     * @param msg
     */
    @RabbitHandler
    public void process(String msg){
        System.out.println("info-receiver:"+msg);
    }
}

View Code

3.全日志处理服务

《SpringBoot+RabbitMQ学习笔记(三)使用RabbitMQ的三种交换器之Topic》
《SpringBoot+RabbitMQ学习笔记(三)使用RabbitMQ的三种交换器之Topic》

点赞