RPC框架的TCP常连接管理 JAVA实现

ConnectManage

客户端使用一个ConnectManage,管理多个netty常连接。连接为tcp常连接,保存其handler,供发送协议包的对象调用。
ConnectManage写成单例模式,节约开销。双重检查平衡其线程安全。

单例:

单例线程安全的方法:写为静态变量(变量后要跟new,不能跟null),包在静态内部类中,双重检查。
饥饿模式是典型的空间换时间。双重检查是一种懒汉模式。
若直接对getInstance方法加锁,则判断实例是否为空,初始化均进行加锁,性能低下。
改进为双检查,第一次判空是不加锁的,非空才进行加锁初始化。(是否可以借鉴jdk7的段初始化进行优化),一定程度避免加锁。
第二次检查在锁中,是线程安全的。由于加锁的过程时间较长,可能发生状态变化,与第一次检查并不等同,还需要再检查一次。类似于wait()用while()包住的道理。
还要注意,由于第一次判空并不在锁中,需要对实例加volatile,确保其并发读可见性。volatile解决了并发读的问题,但不保证并发写。
另外由于volatile还有禁止重排序(优化)的语义,如没有特别需要,不建议使用。

单例实现2:利用类级内部类和多线程缺省同步
隐式同步的情况:
静态字段和static{}块的初始化过程。
final字段。
线程创建之前创建的对象时。
包在静态内部类,与直接写成静态字段的区别为,它是延迟加载且线程安全。因为只能内部类使用时才会创建。

单例实现3:枚举
public enum Singleton{instance;}

变量及方法解释

多个netty连接共用EventLoopGroup,指定线程数。
建立tcp连接的线程池,注意不是异步调用的线程池(放在ClientHandler中),之前的理解有误。为计算密集型,线程数16,队列最好改为LinkedlistBlockingQueue。
保存RpcClientHandler的队列为CopyOnWriteArrayList。
保存RpcClientHandler的哈希为ConcurrentHashMap。
tcp连接超时时间connectTimeoutMillis为常量。(最好配在spring框架的常量文件中)
变量为AtomicInteger roundRobin。调用getAndAdd()方法改值。
变量volatile boolean isRuning。结束标志位,只调用一次,只要保证并发读可见即可。
单例一般都要求线程安全,不仅要实例线程安全,也要保证其中的字段也是线程安全的。以上皆为具体实践。

方法解释:
节点调用算法chooseHandler()
连接节点connectServerNode(final InetSocketAddress remotePeer)
重连reconnect(final RpcClientHandler handler, final SocketAddress remotePeer)不能反查,由参数传入需要删除的原RpcClientHandler
与注册中心对应的updateConnectedServer(List<String> allServerAddress)
条件锁方法waitingForHandler()和signalAvailableHandler()
stop()方法,关handler前刷内存,唤醒条件锁,关异步回调的业务线程池,关netty的线程池。

线程池调参,详见:CompetableFuture组合式异步编程。
ConcurrentHashMap详见:ImportNew中的ConcurrentHashmap总结。

CopyOnWriteArrayList
有用的几行代码:
List<Integer> list = Arrays.asList(new Integer[]{1,2});
CopyOnWriteArrayList<Integer> copyList = new CopyOnWriteArrayList<>();
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new ReadThread(copyList));
CopyOnWriteArrayList使用了写时复制,当需要添加元素时,先把原有数组拷贝出来,然后
在新的数组上作写操作,再将原数组指针指向新数组。
写操作是在锁保护下进行的,这样做避免多线程并发执行操作时,复制出多个副本来。
如果有多线程并发写,则使用锁来控制。并发读写有3种情况。对于写操作完成,还未指向>新数组的情况,仍读取原数据。
使用场景:
写操作需要消耗内存,原数组过大会导致gc。
只能做到最终一致性,不适用于实时读的场景。
适用于读多写少,数据也不能太多。
用到的思想:
读写分享
最终一致性
在新空间执行写操作,解决并发冲突。

条件锁的实践

先补上之前的一个信号量的条件锁实践:

public class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition readCondition = lock.newCondition();
    final Condition writeCondition = lock.newCondition();
    final Object[] items = new Object[100];
    int putptr,takeptr,count;

public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                writeCondition.await();
            items[putptr++] = x;
            count++;
    
            if (putptr == items.length)
                putptr = 0;
            readCondition.signal();
        }finally {
            lock.unlock();
        }
    }
    
    public Object take() throws InterruptedException {
        lock.lock();
        try{
            while (count == 0)
                readCondition.await();
            Object x = items[takeptr++];
            count--;
            if (takeptr == items.length)
                takeptr = 0;
            writeCondition.signal();
            return x;
        }finally {
            lock.unlock();
        }
}
}
    原文作者:fjxCode
    原文地址: https://www.jianshu.com/p/cc2346c9388b
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞