RPC-03-Thrift简单连接池实现

声明:原创文章,转载请注明出处。http://www.jianshu.com/u/e02df63eaa87

在上一节中,介绍了Thrift框架的基本情况和使用,本节针对上节中最后实现的四则运算服务实现一个简单的连接池。

1、Apache Common-pool2简介

Apache Common-pool2包提供了一个通用对象池实现,可方便地基于此实现对象池。对象的创建和销毁在一定程度上会消耗系统的资源,虽然JVM想性能得到了很大的提升,对于多数对象来说,没必要利用对象池进行对象的创建和管理,但是对于线程、TCP连接、数据库连接等对象,其创建与销毁的代价是很大的,因此对象池技术还是有其存在的意义。

Common-pool2由三大模块组成:ObjectPool、PooledObject和PooledObjectFactory。

  • ObjectPool:提供所有对象的存取管理。
  • PooledObject:池化的对象,是对对象的一个包装,加上了对象的一些其他信息,包括对象的状态(已用、空闲),对象的创建时间等。
  • PooledObjectFactory:工厂类,负责池化对象的创建,对象的初始化,对象状态的销毁和对象状态的验证。

ObjectPool会持有PooledObjectFactory,将具体的对象的创建、初始化、销毁等任务交给它处理,其操作对象是PooledObject,即具体的Object的包装类。

2、 Thrift连接池实现

2.1 Node节点

用于保存服务端的IP、Port等信息

public class Node {
    private static final Logger logger = LogManager.getLogger(Node.class);

    private String ip;
    private int port;

    public Node() {
    }

    public Node(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    public static Logger getLogger() {
        return logger;
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    @Override
    public String toString() {
        return "Node{" +
                "ip='" + ip + '\'' +
                ", port=" + port +
                '}';
    }
}
2.2 Factory

回调。用于对象创建,销毁,验证,激活、钝化等。

public class ConnectionFactory implements PooledObjectFactory<TServiceClient> {
    private static final Logger logger = LogManager.getLogger(ConnectionFactory.class);
    private Node node;
    private int timeout;

    public ConnectionFactory(Node node, int timeout) {
        this.node = node;
        this.timeout = timeout;
    }

    public PooledObject<TServiceClient> makeObject() throws Exception {
        try {
            TTransport tTransport = new TFramedTransport(new TSocket(node.getIp(), node.getPort(), timeout));
            tTransport.open();
            // 具体的连接池
            ComputeServer.Client client = new ComputeServer.Client(new TBinaryProtocol(tTransport));
            logger.info("connect to server success.");
            return new DefaultPooledObject<TServiceClient>(client);
        } catch (Exception e) {
            logger.error("connect to server failed.", e);
            throw e;
        }
    }

    public boolean validateObject(PooledObject<TServiceClient> pooledObject) {
        TServiceClient client = pooledObject.getObject();
        TTransport transport = client.getInputProtocol().getTransport();
        return transport.isOpen();
    }

    public void activateObject(PooledObject<TServiceClient> pooledObject) throws Exception {
        TServiceClient client = pooledObject.getObject();
        TTransport transport = client.getInputProtocol().getTransport();
        if (!transport.isOpen()) {
            logger.info("transport is closed, reopen");
            transport.open();
        }
    }

    public void destroyObject(PooledObject<TServiceClient> pooledObject) throws Exception {
        TServiceClient client = pooledObject.getObject();
        TTransport transport = client.getInputProtocol().getTransport();
        if (transport.isOpen()) {
            transport.close();
            logger.info("close renode connection." + node.toString());
        }
    }

    public void passivateObject(PooledObject<TServiceClient> pooledObject) throws Exception {
        TServiceClient client = pooledObject.getObject();
        TTransport transport = client.getInputProtocol().getTransport();
        if (!transport.isOpen()) {
            logger.info("transport is closed, reopen");
            transport.open();
        }
    }
}

备注:其中需要注意的地方是,在makeObject()中,根据不同的Client将进行创建。

2.3 代理

用于获取连接和归还连接。

public class ThriftClientProxy {
    private static final Logger logger = LogManager.getLogger(ThriftClientProxy.class);

    private static final int WAIT_TIME_MS = 20; // 等待时间
    GenericObjectPoolConfig poolConfig;         // 连接池配置

    // Thrift客户端连接池,ConcurrentHashMap用于存储所有的对象(不含销毁的对象)
    // LinkedBlockingDeque用于存储空闲的对象
    GenericObjectPool<TServiceClient> pool;
    ConnectionFactory connectionFactory;        // 连接
    Node node;                                  // 节点

    /* 构造连接池 */
    public ThriftClientProxy(Node node, int timeout, int minPoolSize, int maxPoolSize) {
        this.node = node;
        poolConfig = new GenericObjectPoolConfig();
        poolConfig.setMaxTotal(maxPoolSize);
        poolConfig.setMinIdle(minPoolSize);

        connectionFactory = new ConnectionFactory(this.node, timeout);
        pool = new GenericObjectPool<TServiceClient>(connectionFactory, poolConfig);
    }

    /* 连接池初始化 */
    public boolean connect() {
        try {
            pool.preparePool();
        } catch (Exception e) {
            logger.error("prepare client connection pool failed.", e);
            close();
            return false;
        }
        return true;
    }
    /* 关闭连接池 */
    public void close() {
        pool.close();
    }

    /* 获取连接池中的连接 */
    public TServiceClient takeConnection() {
        try {
            return pool.borrowObject(WAIT_TIME_MS);
        } catch (Exception e) {
            logger.error("take connection from pool failed.", e);
            return null;
        }
    }

    /* 归还连接 */
    public void returnConnection(TServiceClient client) {
        if (client == null)
            return;
        pool.returnObject(client);
    }
}
2.4 Main

用于测试连接池的功能。

public class Main {
    private static final Logger logger = LogManager.getLogger(Main.class);

    public static void main(String[] args) {
        // 根据节点创建连接池
        Node node = new Node("127.0.0.1", 9000);
        ThriftClientProxy thriftClientProxy = new ThriftClientProxy(node, 200, 5, 5);

        if (!thriftClientProxy.connect()) {
            logger.error("connect to server failed.{}", node.toString());
            return;
        }
        // 获取连接并使用
        ComputeServer.Client computeClient = (ComputeServer.Client) thriftClientProxy.takeConnection();
        ComputeRequest request = new ComputeRequest();
        request.setX(1);
        request.setY(2);
        request.setComputeType(ComputeType.ADD);
        try {
            ComputeResponse response = computeClient.getComputeResult(request);
            if (response != null) {
                System.out.println(response.toString());
            }
        } catch (Exception e) {
            logger.error(e);
        } finally {
            // 归还连接
            thriftClientProxy.returnConnection(computeClient);
        }
    }
}
2.5 代码结构

《RPC-03-Thrift简单连接池实现》 包结构

2.6 测试

《RPC-03-Thrift简单连接池实现》 测试

引用
http://www.open-open.com/lib/view/open1415453575730.html
http://www.cnblogs.com/jinzhiming/p/5120623.html
http://commons.apache.org/proper/commons-pool/api-2.4.2/index.html

    原文作者:唐影若凡
    原文地址: https://www.jianshu.com/p/596779686ee0
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞