ConcurrentHashMap:
- 底层结构和HashMap是相同的。继承自ConcurrentMap,AbstractMap以及序列化接口内部类主要有Node类以及Traverser类,CollectionView类。Node用于存储键值对,有子类ForwardingNode以及TreeBin类,ReservationNode类以及TreeBin类。Traverser类主要用于遍历操作。CollectionView的子类keySetView,ValueSetView分别用来表示键视图以及值视图
- ConcurrenthashMap在进行putVal的时候会先去单独的桶中的节点进行加锁,而不是多整个map结构来进行加锁。这样锁的粒度就变得比较小,不同桶之间的操作互补影响,其速度比上Hashtable以及通过Collections.SynchronizedMap生成的同步集合要快上不少
- ConcurrentHashMap在进行节点操作以及相应的put之后增加节点数量统计的操作的时候使用到了大量的CAS操作。对摸一个桶的加锁操作通过Synchronized()块来实现
ConcurrentSkipListMap
- 底层树结构实现是跳表,跳表的插入和删除操作时间复杂度都是O(lgn),其操作方式类似于二分查找。
内部类主要分为Index,HeadIndex以及Node这三个类。HeadIndex是跳表的第一行,Index在中间,都拥有向下以及向右节点。ConcurrentSkipListMap在添加或者删除右侧,下侧元素的时候使用了CAS操作。
Node节点位域最底层,只有向后的连接点,用于组成单链表。 - 跳表中大量使用了CAS操作,在存放一个值的时候,首先通过跳表向右或者向下找到待插入的前面节点,找到之后通过CAS插入,如果前去节点不是之前记录的前去节点,说明在待插入的过程中又有其他的的节点插入到前驱节点之后。那么重新开始查询前驱节点。插入之后来检测是否需要进行层级的增加。
跳表的删除操作并不是立即删除的,其做法是在待删除节点后面添加一个Mark节点,然后将该节点从主链上断开。下次遍历的时候凭借这个mark节点来删除这个节点。
ArrayBlockingQueue
- ArrayBlockingQueue的底层数据结构是数组, 对数组的访问添加了锁的机制,使其能够支持多线程并发。
继承了AbstractQueue抽象类,继承 队列的基本操作,然后实现了BlockingQueue接口。
内部实际上是由ReenTranLock以及两个
LinkedBlockingQueue
底层采用的是链表结构,继承了AbstractQueue,实现了BlockingQueue接口,BlockingQueue表示阻塞型的队列,其对队列的操作可能会抛出异常;同时也实现了Searializable接口,表示可以被序列化。他继承的东西和ArrayBlockingQueue相同
内部包含两把ReenTranlock,用于实现读写分离。非别有fullCondition以及emptyCondition.
队列如果容量达到了最大值,put函数会在条件notFull上等待,否则执行完成之后会唤醒在notFull条件上等待的线程。由于分别使用两把Reentranlock,一把用于读,一把用于写,对队列当前元素数量的统计使用AtomicInteger来实现
一般有4个操作,put,take,offer,poll,后两者的特点是不会抛出中断异常。
当需要删除节点的时候,会首先获取读写锁(防止此时出入队列),然后遍历,寻找到指定的节点删除。
ConcurrentLinkedQueue
ConcurrentLinkedQueue不允许有null元素,与LinkBlockingQueue使用的都是相同的,都是链表结构。
同样拥有内部节点类Node,通过cas操作保证操作的原子性。
poll以及offer函数都是使用cas操作来保证原子性,(相当于乐观锁)。remove(删除摸个节点)也是相同的
注意ConcurrentLinkedQueue为空的时候,取元素操作不会被阻塞,而是直接返回一个null值。
CopyOnWriteArrayList
CopyOnWriteArrayList是ArrayList 的一个线程安全的变体,其中所有可变操作(add、set 等等)都是通过对底层数组进行一次新的复制来实现的。其底层使用数组来存放元素
COWIterator是CopyOnWriteArrayList的携带起,其内部有一个Object类型的数组作为CopyOnWriteArrayList数组的快照,这种快照风格的迭代器方法在创建迭代器时使用了对当时数组状态的引用。此数组在迭代器的生存期内不会更改,因此不可能发生冲突,
创建迭代器以后,迭代器就不会反映列表的添加、移除或者更改。在迭代器上进行的元素更改操作(remove、set 和 add)不受支持
添加删除操作是通过写时复制实现的,add先加锁,然后复制一份长度为len+1的数组,然后将elements添加到最后的位子,再将CopyOnWriteArrayList的数组用这个新数组替换掉。set(设置某个Index上的元素)一样,也通过写时复制实现
当一边网COWArrayList中添加元素,一边通过迭代遍历的时候,实际上不会访问真正当前的数组,而只是访问迭代器中的快照。
CopyOnWriteArraySet
底层实际上采用的是前面的CopyOnWriteArrayList,最为一个内部类
内部所有的操作都转换为CowArrayList操作实现,通过其addAbsent来实现,也就是不允许元素的重复
ConcurrentSkipListSet
底层基于ConcurrentSkipListMap来实现
并且迭代器是弱一致性的,即在迭代的过程中,可以有其他修改ConcurrentSkipListSet的操作,不会抛出ConcurrentModificationException异常
ThreadPoolExecutor
ThreadPoolExecutor内部主要由BlockingQueue和AbstractQueuedSynchronizer提供支持,继承自AbstractExecutorService,AbstractExecuetorService提供了ExecutorService执行方法的默认实现。
内部的主要类包括一个Worker类(继承自AQS,实现了Runnable),内部包含Runnable对象以及Thread属性。thread属性的通过ThreadFactory来对其初始化,然后通过AQS来设置独占线程。
ThreadPoolExecutor有一个Atomic的ctl属性,高三位表明了线程池的状态,低29位表示了运行的worker数量。
线程池在饱和情况或者是shutDown情况下的时候实际上会拒绝任务的,然后会调用构造线程池时候的饱和函数。线程池会不断的将用户的任务封装成worker,然后将worker添加到worker集合中去。
每次运行一个Worker的时候,实际上会对会将WOrker加锁锁起来
线程被submit到线程池的时候就会被提交运行,当递交的RUnnable数量唱过线程池容量,Runnable会进入阻塞队列,LinkedBlockingQueue,当线程池Worker线程有空闲的时候会去除阻塞队列中的worker运行
Worker线程没没取得对应的Runable线程的时候,对LinkedBlockingQueue的take操作(也就是取用户任务)会阻塞住。运行线程池的shutDown会中断这些阻塞的Worker线程。
corePoolSize运行的线程少于 corePoolSize,则创建==新线程==来处理请求,即使其他已创建的线程是空闲的。
maxPoolSize:用户任务(Runnable)阻塞队列的大小。
表示曾经同时存在在线程池的worker的大小,为workers集合的最大大小。
当阻塞队列无法添加Runnable的时候,线程池会拒绝任务,调用拒绝任务处理函数
shutdown会设置线程池的运行状态为SHUTDOWN,并且中断所有空闲的worker,由于worker运行时会进行相应的检查,所以之后会退出线程池,并且其会继续运行之前提交到阻塞队列中的任务,不再接受新任务。shutdownNow则会设置线程池的运行状态为STOP,并且中断所有的线程(包括空闲和正在运行的线程),在阻塞队列中的任务将不会被运行,并且会将其转化为List返回给调用者
线程池通过减少每次做任务的时候产生的性能消耗来优化执行大量的异步任务的时候的系统性能。线程池还提供了限制和管理批量任务被执行的时候消耗的资源、线程的方法。另外ThreadPoolExecutor还提供了简单的统计功能,比如当前有多少任务被执行完了。
ThreadPoolExecutor提供了protected类型可以被覆盖的钩子方法,允许用户在任务执行之前会执行之后做一些事情。我们可以通过它来实现比如初始化ThreadLocal、收集统计信息、如记录日志等操作。这类Hook如beforeExecute和afterExecute。另外还有一个Hook可以用来在任务被执行完的时候让用户插入逻辑,如rerminated。
偏向锁
AQS的CAS操作实际上是乐观锁,而偏向锁实际上会偏向于获取他的线程,其在无竞争的情况下把整个同步都消除掉,连CAS操作都不做了。锁偏向于第一个获得它的线程。如果在接下来的执行过程中,该锁没有被其他的线程获取,则持有偏向锁的线程将永远不需要再进行同步。
关于无锁
讲到无锁,必然是Disruptor并发框架,Disruptor底层依赖一个RingBuffer来进行线程之间的数据交换。多线程对RingBuffer的读和写不会涉及到锁,然而因为RingBuffer满或者RingBuffer中没有可消费内容引发的线程等待,那就要另当别论了。
简单几句介绍下无锁原理,RingBuffer维护者可读和可写的指针,也叫游标,它指向生产者或消费者需要写或读的位置,而对于指针的更新是由CAS来完成的,这个过程中我们不需要加锁/解锁的过程。
使用到Future或者FutureTask应该注意到的问题:
任务超时时间,一方面我们不能够无限期的占用线程资源,另一方面我们不能够让外部无限期的等待,因此timeout变得尤为重要。
主动取消任务,假如我们觉得timeout不够灵活,通常场景是当我们在timeout之前已经知道FutureTask不需要再继续为我们工作的时候,我们可以先判断任务是否已经done(isDone),如果没有done,我们可以主动的将任务取消掉,这个时候Future定义的cancel可以派上用场。
任务异常信息,还记得我们最初提交的Runnable和Callable么,当任务抛出了异常我们如何get到异常信息呢,FutureTask其实是代理了Runnable和Callable的执行,捕获异常并将异常信息交给outcome,因此通过FutureTask,我们同样可以获得任务内部抛出的异常信息。