juc并发包,用过哪些? 参考 并发编程的艺术 方腾飞
- 先来一道面试题
- 关于java并发包
- 1、concurrentHashmap
- 2、线程池 Executor/ThreadPoolExecute
- 3、阻塞队列 blockingqueue
- 4、locks 同步锁(多线程的锁机制) ****
- 5、CopyOnWriteArrayList
- 6、copyonwriteArrayset
- 7、Atomic类 (多线程的原子性操作提供的工具类)
- 8、volatile关键字 详解可以看多线程部分
- 9、cas乐观锁:(非阻塞算法)
- java中的并发工具类 10/11/12/13
先来一道面试题
面试题1:如何使线程按序交替?
- 实现方式:先做一个标记number=1;定义三个方法,先判断是否number为1,阻塞其他线程,打印当前线程,唤醒2号线程
关于java并发包
java1.5提供此包,加了一些在并发编程中常用的工具类,用于定义类似线程的自定义子系统,包括线程池,异步io,轻量级任务框架等
1、concurrentHashmap
一种线程安全的hash表,对于多线程的操作,性能介于hashmap和hashtable之间
1.1、不同版本的并发hashmap区别
版本 | 数据结构 |
---|---|
jdk1.7 | 内部采用了锁分段机制来替代hashtable的独占锁,从而提高了性能 segment[]数组和HashEntry[]数组 |
jdk1.8 | 进行put操作时:上面的segment采用的是cas机制来保证线程安全的。底层还是用 数组+链表–>红黑树 实现(下面这块还是使用synchronized锁) |
1.2、属性
initialCapacity初始容量 | 16 |
---|---|
loadFactor 加载因子 | 0.75 |
concurrencyLevel 并发级别 | 16 |
table | 默认为null,初始化发生在第一次put操作,默认大小为16的数组 |
nextTable | 默认为null,扩容时新生成的数组,其大小为原数组的两倍 |
sizeCtl | 默认为0,用来控制table的初始化和扩容操作(-1代表table正在初始化,-N表示n-1个线程正在扩容操作) |
Node | 保存key,value及key的hash值的数据结构(Node:保存key,value及key的hash值的数据结构) |
Segment的数组大小一定是2的次幂?
主要是便于通过按位与的散列算法来定位Segment的index,保证存储空间的充分利用。
1.3、concurrentHashmap组成
由Segment的数组结构(一种可重入的reentrantLock)和HashEntry数组结构(存储键值对数据)组成,对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁
1.4、put操作
1、假设table已经初始化完成,put操作采用CAS+synchronized实现并发插入或更新操作
2、首先对key.hashcode进行hash操作,得到key的hash值,定位索引位置,index = hash &(length-1)
3、获取table中对应索引的对象f,node类型:Unsafe.getObjectVolatile来获取 tabAt[index],获取指定内存的数据,保证每次拿到数据都是最新
4、如果f为null,说明table中该位置第一次插入元素,利用CAS方法插入Node节点
- CAS成功,说明Node节点已经插入,随后检查是否需要进行扩容
- CAS失败,说明有其它线程提前插入了节点,自旋重新尝试插入节点
如果f的hash值为-1,意味有其它线程正在扩容,则一起进行扩容操作
5、f不为null的其余情况把新的Node节点按链表或红黑树的方式插入到合适的位置,这个过程采用同步内置锁实现并发(Synchronized锁住Node,减少了锁粒度)
- 在节点f上进行同步,节点插入之前,再次利用tabAt(tab, i) == f判断,防止被其它线程修改
- 如果f.hash >= 0,说明f是链表结构的头结点,遍历链表,如果找到对应的node节点,则修改value,否则在链表尾部加入节点
- 如果f是TreeBin类型节点if(f instanceof TreeBin),说明f是红黑树根节点,则在树结构上遍历元素,更新或增加节点
- 如果链表中节点数binCount >= TREEIFY_THRESHOLD(默认是8),则把链表转化为红黑树结构
1.5、get(key)
无需加锁,涉及的共享变量都使用volatile修饰,volatile保证内存可见性。
获取索引的对象f=tabAt[index],遍历key,找到相等的,cas来保证变量的原子性读取
1.6、扩容时要注意的?
元素数量达到容量阈值sizeCtl(长度*0.75),扩容分为两部分:
1、构建一个nextTable,大小为table的两倍
2、Unsafe.compareAndSwapInt修改sizeCtl值-1,保证只有一个线程初始化,扩容后的数组长度为原来的两倍,但是容量是原来的1.5
3、把table的数据复制到nextTable中:扩容操作支持并发插入,支持节点的并发复制
1.7、为什么使用并发hashmap
因为hashmap的线程不安全,在并发环境下,可能会形成环状链表(扩容时造成的,与transfer函数有关),导致get操作时,cpu空转。
而hashtable实现线程安全的代价太大了,get/put所有相关操作都是基于synchronized的(全表锁)
2、线程池 Executor/ThreadPoolExecute
提供了一个线程队列,队列中保存着所有等待状态的线程,避免了创建和销毁的额外开销,提高了程序响应的速度
下一篇文章:java中的线程池会详细讲到
callble和runnable的区别
创建线程的4种方式
3、阻塞队列 blockingqueue
当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空
具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查
组别 | 插入 | 移除 | 检查 | 处理 |
---|---|---|---|---|
第一组 | 插入 add(e) | 移除 remove() | 检查 element() | 处理方式:抛出异常 |
第二组 | 插入 offer(e) | 移除 poll() | 检查 peek() | 处理方式:返回特殊值 |
第三组 | 插入 put(e) | 移除take() | 检查 不可用 | 处理方式: 一直阻塞 |
第四组 | 插入 offer(e,time,unit) | 移除 poll(time,unit) | 检查 不可用 | 处理方式:超时退出 |
对四组不同的行为方式解释:
抛出异常 | 如果试图的操作无法立即执行,抛一个异常 |
---|---|
特定值 | 如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。 |
阻塞 | 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。 |
超时 | 如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是true / false) |
注意:无法向一个BlockingQueue中插入 null。如果你试图插入null,BlockingQueue将会抛出一个NullPointerException
3.1、实现类:(共6种)
1、ArrayBlockingQueue 有界的阻塞队列 其内部实现是将对象放到一个数组里
细节:
1 | 内部有个数组 items 用来存放队列元素; |
---|---|
2 | putindex 下标标示入队元素下标 |
3 | takeIndex是出队下标,count统计队列元素个数 |
4 | 独占锁lock用来对出入队操作加锁 |
5 | notEmpty,notFull条件变量用来进行出入队的同步 |
方法:
1 | offer方法 | 在队尾插入元素,如果队列满则返回false,否者入队返回true |
---|---|---|
2 | Put操作 | 在队列尾部添加元素,如果队列满则等待队列有空位置插入后返回 |
3 | Poll操作 | 从队头获取并移除元素,队列为空,则返回null |
4 | Take操作 | 从队头获取元素,如果队列为空则阻塞直到队列有元素。 (当前线程会被挂起放到 notEmpty 的条件队列里面,直到入队操作执行调用notEmpty.signal后当前线程才会被激活,) |
5 | Peek操作 | 返回队列头元素但不移除该元素,队列为空,返回null |
6 | Size操作 | 获取队列元素个数,非常精确因为计算size时候加了独占锁,其他线程不能入队或者出队或者删除元素 |
总结 锁的粒度比较大 类似在方法上添加synchronized
2、DelayQueue 延迟无界阻塞队列
只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的Delayed 元素
运用:缓存系统的设计,缓存中的对象,超过了空闲时间,需要从缓存中移出
1、take()和offer()都是lock了重入锁,按照synchronized的公平锁,两个方法是互斥
2、take()方法需要等待1个小时才能返回,offer()需要马上提交一个10秒后运行的任务,此时offer()可以插队获取锁
3、原理:A执行时,B lock()锁,并休眠;当锁被A释放处于可用状态时,B线程却还处于被唤醒的过程中,此时C线程请求锁,可以优先C得到锁
3、LinkedBlockingQueue 有界/无界链表阻塞队列(线程池默认使用)
内部以一个链式结构(链接节点)对其元素进行存储,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限
细节:
1 | 两个 Node 分别用来存放首尾节点 |
---|---|
2 | 初始值为 0 的原子变量 count用来记录队列元素个数 |
3 | 两个ReentrantLock的独占锁,分别用来控制元素入队和出队加锁(takeLock取元素,putLock添加元素) |
4 | notEmpty和notFull用来实现入队和出队的同步 |
5 | 可以同时又一个线程入队和一个线程出队 |
方法:
1 | 带时间的Offer操作-生产者 |
---|---|
2 | 带时间的poll操作-消费者 (获取并移除队首元素,在指定的时间内去轮询队列看有没有首元素有则返回,否者超时后返回null) |
3 | put操作-生产者 (与带超时时间的poll类似不同在于put时候如果当前队列满了它会一直等待其他线程调用notFull.signal才会被唤醒) |
4 | take操作-消费者 (与带超时时间的poll类似不同在于take时候如果当前队列空了它会一直等待其他线程调用notEmpty.signal()才会被唤醒) |
5 | size操作-消费者 (当前队列元素个数,如代码直接使用原子变量count获取) |
6 | peek操作 (获取但是不移除当前队列的头元素,没有则返回null。 ) |
7 | remove操作 删除队列里面的一个元素,有则删除返回true,没有则返回false |
4、PriorityBlockingQueue 无界的并发队列
可以实现comparable接口中的方法来排序队列中的元素 //是二叉树最小堆的实现
5、synchronizedQueue 不存储元素的BlockingQueue
每一个put操作必须要等待一个take操作,否则不能继续添加元素;适合做交换工作
面试题2:
在多线程操作下,一个数组中最多只能存入 3 个元素。多放入不可以存入数组,或等待某线程对数组中某个元素取走才能放入,要求使用java的多线程来实现。
使用阻塞队列中的ArrayBlockingQueue队列来实现。
存数据使用put(e)方法,取数据使用take()方法。处理方式: 一直阻塞
面试题3:
如果提交任务时,线程池队列已满,这时会发生什么?
1、如果使用的是LinkedBlockingQueue,也就是无界队列,可以继续添加任务到阻塞队列中等待执行,可以无限存放任务;
2、如果使用的是有界队列,如ArrayBlockingQueue,任务首先会被添加到ArrayBlockingQueue中,满了后,会使用拒绝策略rejectedExecutionHandler处理满了的任务,
Abort(直接抛出rejectedExecutionException)、 默认
Discard(按照LIFO丢弃)、
DiscardOldest(按照LRU丢弃)、
CallsRun(主线程执行)
4、locks 同步锁(多线程的锁机制) ****
多线程安全的解决方案 同步代码块,同步方法,同步锁lock(是一个显示锁,必须通过unlock方法进行释放锁,放在finally操作中)
重点:消费者/生产者问题
解决方案:为了避免虚假唤醒问题,应该把代码放在循环中。
可以使用synchronized和wait,notifyall机制,也可以使用lock锁加上condition(控制线程通信)机制。
condition接口:
描述了可能会与锁有关的条件变量,这些变量在用法上与使用Object描述了可能会与锁有关的条件变量,这些变量在用法上与使用Object.wait访问的隐式监视器类似,但提供了更强大的功能。await,signal,signalall
5、CopyOnWriteArrayList
写入并复制,不适合添加操作多的场景,每次添加都会进行复制,开销大。适合并发迭代操作多的场景
特点:
1、线程安全版本的ArrayList,每次增加的时候,需要新创建一个比原来容量+1大小的数组
2、拷贝原来的元素到新的数组中,同时将新插入的元素放在最末端
3、然后切换引用
4、迭代时生成快照数组;适合读多写少
6、copyonwriteArrayset
- 基于CopyOnWriteArrayList实现
- 不能插入重复数据,每次add的时候都要遍历数据,性能略低于CopyOnWriteArrayList
CopyOnWrite特点
- 添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器
- 如果读的时候有多个线程正在向ArrayList添加数据,读还是会读到旧的数据
优点 | 1、对CopyOnWrite容器进行并发的读,而不需要加锁,是一种读写分离的思想 2、适合读多写少的并发场景。比如白名单,黑名单 |
---|---|
缺点 | 1、内存占用问题 写操作的时候,内存里会同时驻扎两个对象的内存 2、数据一致性问题 CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性 |
7、Atomic类 (多线程的原子性操作提供的工具类)
如atomicinteger,atomicboolean i++变成原子操作,底层是cas,别的线程自旋到该方法执行完成
详解请看java多线程
8、volatile关键字 详解可以看多线程部分
作用:保证内存的可见性,线程每次都从主存中读取数据。
缺点:不具备“互斥性”:多个线程能同时读写主存,不能保证变量的“原子性”:
(i++不能作为一个整体,分为3个步骤读-改-写),可以使用cas算法保证数据可原子性。
9、cas乐观锁:(非阻塞算法)
是一种硬件对并发的支持,用于管理对共享数据的访问。相当于是无锁的非阻塞实现。
包含三个操作数,内存值V,预估值A,更新值B,当且仅当V==A,V=B;否则,不做任何操作。
悲观锁:
假定并发环境是悲观的,如果发生并发冲突,就会破坏一致性,所以要通过独占锁彻底禁止冲突发生
乐观锁:(锁的粒度小)
假定并发环境是乐观的,即,虽然会有并发冲突,但冲突可发现且不会造成损害,所以,可以不加任何保护,等发现并发冲突后再决定放弃操作还是重试。
CAS算法 :
乐观锁的实现往往需要硬件的支持,多数处理器都都实现了一个CAS指令,实现“Compare And Swap”的语义
CAS包含3个操作数:
1 需要读写的内存位置V
2 进行比较的值A
3 拟写入的新值B
当且仅当位置 V 的值等于 A 时,CAS 才会通过原子方式用新值 B 来更新位置 V 的值;否则不会执行任何操作
CAS乐观锁的缺点
ABA问题:如果另一个线程修改V值假设原来是A,先修改成B,再修改回成A。当前线程的CAS操作无法分辨当前V值是否发生过变化
如何解决ABA问题:用另一个标识判断某值是否有改变过。
乐观锁的业务场景及实现方式 20181222 以后再补
java中的并发工具类 10/11/12/13
10、闭锁countdownlatch (等待多线程完成)实现了join的功能
10.1、概念:
在完成某些运算时,只有其他所有线程的运算全部完成,当前运算才继续执行,
可以用于统计多线程执行的时间。
可被多个线程并发的实现减1操作,并在计数器为0后调用await方法的线程被唤醒,从而实现多线程间的协作
10.2、可以实现的需求
现需要解析一个excel里的多个sheet数据,使用多线程,每个线程解析其中一个sheet的数据,等到所有sheet解析完,程序提示解析完成构造函数传入int型参数做改为计数器,countDown被调用,计数器减1,await会一直阻塞程序,直至计数为0.
如果某个sheet解析较慢,可以使用带时间参数的await方法,到时间后,不再阻塞当前线程
10.3、分析CountDownLatch的实现原理
1、在AQS队列中,将线程包装为Node.SHARED节点,即标志位共享锁
2、当头节点获得共享锁后,唤醒下一个共享类型结点的操作
- 1、头节点node1,调用unparkSuccessor()方法唤醒了Node2,并且调用tryAcquireShared方法,检查下一个节点是共享节点
- 2、如果是,更改头结点,重复以上步骤,以实现节点自身获取共享锁成功后,唤醒下一个共享类型结点的操作
10.5、什么是AQS?******
1、提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架
使用方式是继承:
子类通过继承同步器并需要实现它的方法来管理其状态,管理方式是通过acquire和release方式操纵状态
在多线程环境中对状态的操作必须保证原子性,需要使用这个同步器提供的以下三个方法对状态进行操作
1、AbstractQueuedSynchronizer.getState()
2、AbstractQueuedSynchronizer.setState(int)
3、AbstractQueuedSynchronizer.compareAndSetState(int, int)
同步器是实现锁的关键
同步器面向的是线程访问和资源控制,他定义了线程对资源是否能够获取以及线程的排队等操作。
依赖于FIFO队列,队列中的node就是保存着线程引用和线程状态的容器。
对于一个排它锁的获取和释放
//获取:
while(获取锁){
if(获取到)
退出while循环
else{
if(当前线程没有入队)
入队
阻塞当前线程
}
}
//释放:
if(释放成功){
删除头结点
激活原头结点的后继结点
};
10.6、AQS与锁(如LOCK)的对比
1、锁是面向使用者的,定义了用户调用的接口,隐藏了实现细节;
2、AQS是锁的实现者,屏蔽了同步状态管理,线程的排队,等待唤醒的底层操作;
3、锁是面向使用者,AQS是锁的具体实现者
10.7、CountDownLatch中的方法?
1、countDownLatch.await()发生什么?
直接调用了AQS的acquireSharedInterruptibly
当前线程就会进入了一个死循环当中,在这个死循环里面,会不断的进行判断,通过调用tryAcquireShared方法,如果值为0(说明共享锁没有了),会跳出循环
2、释放操作 //countDown操作实际就是释放锁的操作,每调用一次,计数值减少1
3、限定时间的await方法 await(long timeout, TimeUnit unit)
spinForTimeoutThreshold写死了1000ns,这就是所谓的自旋操作,让线程在循环中自旋,否则阻塞线程
代码:参考并发编程的艺术第8章 8.1/8.2
11、cyclicBarrier(同步屏障)
11.1、cyclicBarrier是什么?
让一组线程到达一个屏障(也称同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续工作
- 线程进入屏障通过cyclicBarrier的await()方法
11.2、cyclicBarrier底层原理?
由ReentrantLock可重入锁和Condition共同实现的
注意:
1、前面四个线程等待最后一个线程超时了,这个时候这四个线程不会再等待最后一个线程写入完毕,而是直接抛出BrokenBarrierException异常,继续执行后续的动作。最后一个线程完成写入数据操作后也继续了后续的动作
2、构造函数的参数表示屏障拦截的线程数量,还有一个高级的构造函数CyclicBarrier(int parties,Runnable barrierAction):在线程达到屏障后,优先执行barrierAction
11.3、cyclicBarrier的应用场景
用于多线程计算数据,最后合并计算结果的场景
例如:excel保存了用户所有的银行流水,每一个sheet保存一个账户近一年的每笔流水,现需要统计用户的日均银行流水,先用多线程处理每个sheet的银行流水,再用barrierAction计算最后结果,计算整个日均流水
11.4、cyclicBarrier和countDownLatch的区别
CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同
1、CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;
而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行
2、CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的
代码:参考并发编程8.2 P191
12、Semaphore详解(控制并发线程数)
Semaphore可以控同时访问的线程个数,通过acquire()获取一个许可,如果没有就等待,而release()可以释放一个许可
Semaphore其实和锁有点类似,它一般用于控制对某组资源的访问权限 //操作系统中讲过
应用场景:
semaphore可以用作流量控制,特别是公用资源有限的应用场景,如数据库连接。现在要读取几万个文件的数据,IO密集型任务,可以启动几十个线程去并发读取,读到内存后,还需要保存到数据库中,而数据库的连接数只有10个,这时必须并发控制10个线程同时获取数据库连接,来保存数据,否则报错无法获取数据库连接
方法:
1、semaphore(int permits) //许可证数量
2、acquire()/tryAcquire() //获取许可证
3、release() //归还许可证
4、intavailablePermits() //返回此信号量中当前可用的许可证数
5、intgetQueueLength() //返回正在等待获取许可证的线程数
代码:参考并发编程P196
13、Exchanger详解(线程间交换数据)
提供了在线程间交换数据的一种手段,它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,他会一直等待第二个线程也执行此方法,当两个线程都到达同步点时,这两个线程就交换数据。
应用场景:
1、用于遗传算法:选两个人作为交配对象,需要交换两人的数据,并使用交叉规则得出2个交配结果
2、用于校对工作:我们需要将纸质银行流水通过人功能的方式录入成电子银行流水,为避免错误,采用AB岗录入,对两个excel数据进行校对,看是否录入一致
可以使用exchange(V x,long timeout,TimeUnit unit) //设置最大等待时长
代码:参考并发编程P198
14、公平锁和非公平锁
公平锁:
就是在并发环境中,每个线程在获取锁时会先查看此锁维护的等待队列,如果为空,或者当前线程线程是等待队列的第一个,就占有锁
否则就会加入到等待队列中,以后会按照FIFO的规则从队列中取到自己
非公平锁:
比较粗鲁,上来就直接尝试占有锁,如果尝试失败,就再采用类似公平锁那种方式
15、并发队列-非阻塞队列
非阻塞队列使用的是CAS(compare and swap)来实现线程执行的非阻塞
入队:
1、add():底层调用offer();
2、offer():Queue接口继承下来的方法,实现队列的入队操作,不会阻碍线程的执行,插入成功返回true
出对:
1、poll():移动头结点指针,返回头结点元素,并将头结点元素出队;队列为空,则返回null
2、peek():移动头结点指针,返回头结点元素,并不会将头结点元素出队;队列为空,则返回null