RabbitMQ笔记二十二 :异步RPC之一(java client实现RPC功能)

异步RPC(Remote procedure call)

《RabbitMQ笔记二十二 :异步RPC之一(java client实现RPC功能)》 模型图

Server:提供服务的服务,即RPC模型中的Server。
Client:调用服务的服务,即RPC模型中的client。

Client发送消息给服务端,消息中包含二个属性,一个是reply_to属性(value是一个队列名,客户端服务一直监听这个队列),一个是correkation_id(本次请求的唯一标识),发送消息调用Server服务之后,服务端将调用结果发送到reply_to指定的队列中。当前也会根据客户端得到correlation_id,并指明这次结果与correlation_id对应。

java client实现RPC功能

服务端代码,监听sms队列(客户端请求消息发送到的队列)

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.concurrent.TimeUnit;

public class Consume {
    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        System.out.println(channel.queueDeclare().getQueue());

        channel.basicConsume("sms",true,new SimpleConsumer(channel));
        System.out.println("短信服务已经启动");
        TimeUnit.SECONDS.sleep(60);

        channel.close();
        connection.close();
    }
}

接收到客户端的消息之后,调用服务接口sendSMS方法,然后得到请求消息中的reply_tocorrelation_id属性,将调用接口的结果发送到reply_to属性的队列中,指定correlation_id属性。

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.zhihao.miao.rpc.server.SendSMSTool;

import java.io.IOException;

public class SimpleConsumer extends DefaultConsumer {

    public SimpleConsumer(Channel channel){
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("进入RPC方法调用");
        String phone =properties.getHeaders().get("phone").toString();
        String content = new String(body);
        //调用服务
        boolean result = SendSMSTool.sendSMS(phone,content);
        System.out.println("消息处理成功");

        String reply = properties.getReplyTo();
        String id = properties.getCorrelationId();

        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(id).build();
        this.getChannel().basicPublish("",reply,props,(result+"").getBytes());
        System.out.println("消息回复成功");
    }
}

服务接口:

public class SendSMSTool {

    public static boolean sendSMS(String phone,String content){
        System.out.println("发送短信内容:【"+content+"】到手机号:"+phone);
        return phone.length() > 6;
    }
}

总结
RPC Server步骤
1.创建服务
2.监听一个队列(sms),监听客户端发送的消息
3.收到消息之后,调用服务,得到调用结果
4.从消息属性中,获取reply_to, correlation_id属性,把调用结果发送给reply_to指定的队列中,发送的消息属性要带上reply_to。
5.一次调用处理成功

《RabbitMQ笔记二十二 :异步RPC之一(java client实现RPC功能)》 Exchange绑定sms队列

客户端代码
消息端先监听一个队列sms.reply,这个队列是客户端返回结果的队列,然后发送消息指定请求参数(参数头和参数内容),指定correlationIdreplyTo属性。

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

public class Send {
    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        String correlationId = UUID.randomUUID().toString();
        String replyTo = "sms.reply";
        //自动删除属性为true,当connection关闭后会自动删除该队列
        //channel.queueDeclare(replyTo,true,true,true,new HashMap<>());

        channel.basicConsume(replyTo,true,new SimpleConsumer(channel));

        Map<String,Object> headers = new HashMap<>();
        headers.put("phone","15790934342");

        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers).replyTo(replyTo).correlationId(correlationId).deliveryMode(2).
                contentEncoding("UTF-8").build();

        channel.basicPublish("send","sms",true,properties,"周年庆6折大促销,只剩三天。详情登录****,去了解详情。".getBytes());

        TimeUnit.SECONDS.sleep(20);

        channel.close();
        connection.close();
    }
}

接收到rpc调用返回接口的处理逻辑,

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;


public class SimpleConsumer extends DefaultConsumer {

    public SimpleConsumer(Channel channel){
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("====收到RPC调用 回复了=====");
        System.out.println(properties.getCorrelationId()+",短信发送结果:"+new String(body));


    }
}

总结
RPC Client步骤:
1.监听reply_to对应的队列(RPC调用结果发送指定的队列)
2.发送消息,消息属性需要带上reply_to,correlation_id属性
3.服务端处理完成之后,reply_to对应的队列就会收到异步处理结果消息
4.收到消息之后,进行处理,根据消息属性的correlation_id找到对应的请求
5.一次客户端调用就完成了。

说一下逻辑:
就是客户端往一个队列sms发送消息,服务端监听这个队列,接收到消息之后服务端在消息处理方法中调用服务,并把调用的服务结果返回给客户端,怎么返回呢(通过往一个队列里发送(sms.reply),并把这次调用的id也返回给客户端,这个队列名是reply_to属性,当前的返回id是correlation_id,都是在客户端往sms队列发送消息的时候带到服务端的),服务端将返回的结果发送到指定的队列,而客户端一直在监听这个队列拿到返回值,即完成了一次异步的rpc调用。

Direct reply-to

Improve performance and simplicity of RPC clients by sending replies direct to a waiting channel.
通过直接将RPC的回复发送到等待的Channel中而提供RPC客户端的性能。

动机(原因)
RPC is a popular pattern to implement with a messaging broker like RabbitMQ. The typical way to do this is for RPC clients to send requests to a long lived server queue. The RPC server(s) read requests from this queue and then send replies to each client using the queue named by the client in the reply-to header.

But where does the client’s queue come from? The client can declare a single-use queue for each request-response pair. But this is inefficient; even a transient unmirrored queue can be expensive to create and then delete (compared with the cost of sending a message). This is especially true in a cluster as all cluster nodes need to agree that the queue has been created, even if it is unmirrored.

So alternatively the client can create a long-lived queue for its replies. But this can be fiddly to manage, especially if the client itself is not long-lived.

The direct reply-to feature allows RPC clients to receive replies directly from their RPC server, without going through a reply queue. (“Directly” here still means going through AMQP and the RabbitMQ server; there is no separate network connection between RPC client and RPC server.)

大意就是使用RabbitMQ实现异步RPC,通常的做法就是客户端将请求发送到长期存在的服务器队列。RPC服务器读取接收此队列的请求,然后将RPC调用结果返回到客户端在发送请求时请求头中的reply_to属性中的队列。完成一次异步的RPC调用。
那么请求头中的队列哪里来呢?客户端可以声明一个针对每一对请求的临时队列。但是开销太大(相比于发送消息本身),在集群环境中更是如此。
如果创建一个长期的队列呢?如果客户端本身并不是长期的那么这个长期的队列就很难管理。

Direct reply-to就是解决这个问题的,不需要创建这样一个回复的队列。

示列

服务端
服务启动类,监听sms队列

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.concurrent.TimeUnit;

public class Consume {
    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        System.out.println(channel.queueDeclare().getQueue());

        channel.basicConsume("sms",true,new SimpleConsumer(channel));
        System.out.println("短信服务已经启动");
        TimeUnit.SECONDS.sleep(60);

        channel.close();
        connection.close();
    }
}

服务端监听消息处理器,接收到请求参数,调用服务接口,返回参数

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class SimpleConsumer extends DefaultConsumer {

    public SimpleConsumer(Channel channel){
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
        System.out.println("进入RPC方法调用");
        String phone =properties.getHeaders().get("phone").toString();
        String content = new String(body);
        //调用服务
        boolean result = SendSMSTool.sendSMS(phone,content);
        System.out.println("消息处理成功");

        String reply = properties.getReplyTo();
        String id = properties.getCorrelationId();

        BasicProperties props = new BasicProperties.Builder().correlationId(id).build();
        this.getChannel().basicPublish("",reply,props,(result+"").getBytes());
        System.out.println("消息回复成功");
    }
}

服务接口

public class SendSMSTool {

    public static boolean sendSMS(String phone,String content){
        System.out.println("发送短信内容:【"+content+"】到手机号:"+phone);
        return phone.length() > 6;
    }
}

客户端
改变的是客户端代码,直接定义replyToamq.rabbitmq.reply-to

import com.rabbitmq.client.*;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

public class Send {
    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        String correlationId = UUID.randomUUID().toString();

        String replyTo ="amq.rabbitmq.reply-to";

        channel.basicConsume(replyTo,true,new SimpleConsumer(channel));

        Map<String,Object> headers = new HashMap<>();
        headers.put("phone","15790934342");

        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers).replyTo(replyTo).correlationId(correlationId).deliveryMode(2).
                contentEncoding("UTF-8").build();

        channel.basicPublish("send","sms",true,properties,"周年庆6折大促销,只剩三天。详情登录****,去了解详情。".getBytes());

        TimeUnit.SECONDS.sleep(20);

        channel.close();
        connection.close();
    }
}

定义消息监听消费者

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class SimpleConsumer extends DefaultConsumer {

    public SimpleConsumer(Channel channel){
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("====收到RPC调用恢复了=====");
        System.out.println(properties.getCorrelationId()+",短信发送结果:"+new String(body));


    }
}

管控台中也不会创建amq.rabbitmq.reply-to这个队列,而是直接将返回的消息发到等到的Channel中,提升异步rpc调用的性能,amq.rabbitmq.reply-to也叫做伪队列。

参考资料

Remote procedure call (RPC)
Direct reply-to

    原文作者:二月_春风
    原文地址: https://www.jianshu.com/p/7ee7c7037b52
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞