RPC 框架 Thrift 的探索与使用

1. 概述

Thrift是跨语言的RPC框架,现在是一个Apache的顶级项目。Thrift通过一个中间语言–IDL接口定义语言,来定义RPC的接口和数据类型。使用Thrift的代码生成工具(thrift-0.10.0.exe编译器)读取IDL文件,生成不同语言的服务端与客户端代码,并由生成的代码负责RPC协议层和传输层的实现。目前支持语言C++,Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, Smalltalk

2. 下载使用

官网地址:thrift.apache.org

  1. 如果是Win OS , 在官网下载exe文件
  2. 然后编辑.thrift 文件,比如:
/**
* 实现功能:创建一个查询结果struct和一个服务接口service
* 基于:thrift-0.10.0
**/
namespace java com.thrift
struct QryResult {
        /**
        *返回码, 1成功,0失败
        */
        1:i32 code;
        /**
        *响应信息
        */
        2:string msg;
}
service TestQry{
        /**
        * 测试查询接口,当qryCode值为1时返回"成功"的响应信息,qryCode值为其他值时返回"失败"的响应信息
        * @param qryCode测试参数
        */
        QryResult qryTest(1:i32 qryCode)
}
  1. 进入命令行,执行thrift-0.10.0.exe --gen java TestQry.thrift,之后会生成基本代码.
  2. 在工程中引入thrift jar 包,把生成的文件引入工程中
<dependency>
  <groupId>org.apache.thrift</groupId>
  <artifactId>libthrift</artifactId>
  <version>0.10.0</version>
</dependency>

3. 基本概念

3.1 数据类型

基本类型:
bool:布尔值,true 或 false,对应 Java 的 boolean
byte:8 位有符号整数,对应 Java 的 byte
i16:16 位有符号整数,对应 Java 的 short
i32:32 位有符号整数,对应 Java 的 int
i64:64 位有符号整数,对应 Java 的 long
double:64 位浮点数,对应 Java 的 double
string:utf-8编码的字符串,对应 Java 的 String

结构体类型:
struct:定义公共的对象,类似于 C 语言中的结构体定义,在 Java 中是一个 JavaBean

容器类型:
list:对应 Java 的 ArrayList
set:对应 Java 的 HashSet
map:对应 Java 的 HashMap

异常类型:
exception:对应 Java 的 Exception

服务类型:
service:对应服务的类

3.2 传输协议

Thrift 支持多种传输协议,用户可以根据实际需求选择合适的类型。Thrift 传输协议上总体可划分为文本 (text) 和二进制 (binary) 传输协议两大类,一般在生产环境中使用二进制类型的传输协议为多数(相对于文本和 JSON 具有更高的传输效率)。常用的协议包含:

TBinaryProtocol:是Thrift的默认协议,使用二进制编码格式进行数据传输,基本上直接发送原始数据
TCompactProtocol:压缩的、密集的数据传输协议,基于Variable-length quantity的zigzag 编码格式
TJSONProtocol:以JSON (JavaScript Object Notation)数据编码协议进行数据传输
TDebugProtocol:常常用以编码人员测试,以文本的形式展现方便阅读

3.3 服务

Thrift 包含三个主要的组件:protocol,transport 和 server。
其中,protocol 定义了消息是怎样序列化的;transport 定义了消息是怎样在客户端和服务器端之间通信的;server 用于从 transport 接收序列化的消息,根据 protocol 反序列化之,调用用户定义的消息处理器,并序列化消息处理器的响应,然后再将它们写回 transport。
Thrift 模块化的结构使得它能提供各种 server 实现。下面列出了 Java 中可用的 server 实现:

TSimpleServer
TNonblockingServer
THsHaServer
TThreadedSelectorServer
TThreadPoolServer

3.4 编码步骤

3.4.1 服务端基本步骤

  1. 实现服务处理接口
  2. 创建TProcessor
  3. 创建TServerTransport
  4. 创建TProtocol
  5. 创建TServer
  6. 启动Server

3.4.2 客户端基本步骤

  1. 创建Transport
  2. 创建TProtocol
  3. 基于TTransport和TProtocol创建Client
  4. 调用Client的相应方法
  5. 服务方式的选择需要根据具体的业务需求.

4. 实例演示

4.1 一些说明

  1. 服务端采用的是多接口的实现,因此支持多个.thrift的文件的实现.对应的在客户端,也要使用多接口的方式实现
  2. 在实际运行中,会出现org.apache.thrift.transport.TTransportException: Frame size (40792739) larger than max length (16384000)!异常,所以在代码中会修改一次性传输的大小(1638400000),这个需在客户端和服务端同时设定.
  3. 服务模型的选择

《RPC 框架 Thrift 的探索与使用》 Thrift的TThreadedSelectorServer 服务模式.png

采用Thrift的TThreadedSelectorServer 服务模式,提高并发请求的响应.TThreadedSelectorServer模式中有一个专门的线程AcceptThread用于处理新连接请求,因此能够及时响应大量并发连接请求;另外它将网络I/O操作分散到多个SelectorThread线程中来完成,因此能够快速对网络I/O进行读写操作,能够很好地应对网络I/O较多的情况;Thrift的TThreadedSelectorServer,用业务之外的小demo进行测试,并发提高能很快返回结果。

4.2 接口的实现

public class QueryImp implements TestQry.Iface{
    public QryResult qryTest(int qryCode) throws TException {
        QryResult result = new QryResult();
        if(qryCode == 1){
            result.code = 1;
            result.msg = "success";
        }else {
            result.code = 0;
            result.msg = "fail";
        }
        for(int i=0;i<10000;i++){
               System.out.println("2");
        }
        return result;
    }
}

4.3 服务端的实现

private final static int DEFAULT_PORT = 30002;
    private static TServer server = null;
    public static void main(String[] args) {
        TNonblockingServerSocket socket = null;
        try {
            socket = new TNonblockingServerSocket(DEFAULT_PORT);
        } catch (TTransportException e) {
            e.printStackTrace();
        }
        //多接口的实现
        TProcessor  tProcessor1 = new TestQry.Processor<TestQry.Iface>(new QueryImp());
        TProcessor tProcessor2 = new TestQry1.Processor<TestQry1.Iface>(new QueryImp1());
        TThreadedSelectorServer.Args arg = new TThreadedSelectorServer.Args(socket);
        TMultiplexedProcessor multiplexedProcessor = new TMultiplexedProcessor();
        multiplexedProcessor.registerProcessor("processor1",tProcessor1);
        multiplexedProcessor.registerProcessor("processor2",tProcessor2);
        arg.processor(multiplexedProcessor);
        arg.protocolFactory(new TCompactProtocol.Factory());
        //如果传输数据量过大,需要修改这个地方的参数,默认16M
        arg.transportFactory(new TFramedTransport.Factory(1638400000));
        arg.processorFactory(new TProcessorFactory(multiplexedProcessor));
        //监听线程数
        arg.selectorThreads(10);
        //工作线程数
        ExecutorService pool = Executors.newFixedThreadPool(100);
        arg.executorService(pool);
        arg.getExecutorService();
        server = new TThreadedSelectorServer(arg);
        System.out.println("Starting server on port " + DEFAULT_PORT + "......");
        server.serve();
    }

4.4 客户端实现

private final static int DEFAULT_QRY_CODE = 1;
    public void startClient() {
        TTransport tTransport = null;
        try {
            tTransport = getTTransport();
        } catch (Exception e) {
            e.printStackTrace();
        }
        TProtocol protocol = new TCompactProtocol(tTransport);
        //对应的客户端也要用多接口的方式实现
        TMultiplexedProtocol q1 = new TMultiplexedProtocol(protocol,"processor1");
        TMultiplexedProtocol q2 = new TMultiplexedProtocol(protocol,"processor2");
        TestQry.Client client1 = new TestQry.Client(q1);
        TestQry1.Client client2 = new TestQry1.Client(q2);
        try {
            QryResult result = client1.qryTest(DEFAULT_QRY_CODE);
            System.out.println("code="+result.code+" msg="+result.msg);
            close(tTransport);
        } catch (TException e) {
            e.printStackTrace();
        }


    }

    private static TTransport getTTransport() throws Exception{
        TTransport tTransport = getTTransport("127.0.0.1",30002,300000);
        if(tTransport != null && !tTransport.isOpen()){
            tTransport.open();
        }
        return tTransport;
    }

    private static TTransport getTTransport(String host, int port, int timeout) {
        final TSocket tSocket = new TSocket(host,port,timeout);
        final TTransport tTransport = new TFramedTransport(tSocket,1638400000);
        return tTransport;
    }
    private void close(TTransport transport){
        if(transport !=null && transport.isOpen()){
            transport.close();
        }
    }

5. 参考资料

  1. 官网
  2. Thrift入门及Java实例演示
  3. 和 Thrift 的一场美丽邂逅
  4. 由浅入深了解Thrift(三)——Thrift server端的几种工作模式分析
  5. Thrift框架服务端并发处理模式的java示例
  6. Thrift多接口支持例子
    原文作者:奔跑的Libra
    原文地址: https://www.jianshu.com/p/222f7f584c1a
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞