Java并发编程札记-总结

    • 一基础
      • 01基本概念
        • 并发
          • 什么是并发
          • 并发的优点
        • 多线程
        • 线程
        • 并发与并行
        • 线程和进程
      • 02创建线程
        • 创建线程的方式
        • Thread和Runnable该如何选择
        • run方法与start方法的区别
      • 03线程的生命周期
        • 线程的生命周期图
        • 线程的状态
      • 04Thread详解
        • 线程等待与唤醒waitnotifynotifyAll
        • 线程让步yield
        • 线程休眠sleep
        • 线程启动start
        • 中断线程interrupt
        • 线程优先级
        • 线程等待join
        • 守护线程
        • waitsleepyieldjoin区别
      • 05线程安全问题
        • 线程安全问题有哪些
        • 线程安全问题解决方法
      • 06synchronized
        • 修饰对象
        • 注意事项
      • 07volatile详解
        • 什么是volatile
        • 原子操作与原子性
        • 可见性
        • 有序性
        • synchronized锁与volatile变量的比较
    • 二JUC概述
        • atomic
        • locks
        • collections
        • threadPool
        • tools
    • 三JUC原子类
      • 01概述
        • 基本类型
        • 引用类型
        • 数组
        • 对象的属性
        • JDK18新增
        • 原子类可以替换锁吗
        • 原子类和javalangInteger等类的区别
        • 为什么只提供了intlongboolean这几种基本类型的原子类
        • 原子类的实现原理
      • 07CAS
    • 四JUC锁
      • 01概述
      • 02Lock与ReentrantLock
        • synchronized 与Lock比较
        • 特性
      • 03AQS
      • 04Condition简介
        • Condition地位
        • 与Object监视器监视器方法的比较
      • 05ReentrantReadWriteLock
      • 06LockSupport
        • 简介
        • 许可
        • blocker
        • 响应中断
      • 07读写锁的升级StampedLock
        • 为什么读写锁需要升级
        • 读写模式
      • 08CountDownLatch
      • 09CyclicBarrier
        • CyclicBarrier与CountDownLatch的比较
      • 10Semaphore简介
    • 五JUC容器
      • 01概述
        • 继承实现关系图
        • List
        • Set
        • Map
        • Queue
      • 02CopyOnWrite
        • 写时复制
        • 底层数组
        • 缺点
      • 03ConcurrentHashMap
        • 锁分段
        • JDK18结构
      • ConcurrentSkipListMap
        • ConcurrentSkipListMap与TreeMap底层结构的不同
        • skip list
      • 05ArrayBlockingQueue与LinkedBlockingQueue
      • 06LinkedBlockingDeque
      • 07ConcurrentLinkedQueue
    • 六线程池
      • 01概述
        • Executor
        • ExecutorService
        • AbstractExecutorService
        • ThreadPoolExecutor
        • ForkJoinPool
        • ScheduledThreadPoolExecutor
        • ExecutorCompletionService
        • CallableFuture
      • 02ThreadPoolExecutor
        • 为什么要使用线程池
        • 什么是线程池
        • 工作模型
        • Java中的线程池
        • 创建ThreadPoolExecutor线程池
        • 构造方法
          • corePoolSize与maximumPoolSize
          • workQueue
          • keepAliveTime
          • threadFactory
          • handler
        • 排队策略
        • 线程池状态
        • 执行任务
          • shutdown与shutdownNow区别
      • ForkJoin框架
        • 什么是ForkJoin框架
        • 工作窃取算法
        • ForkJoin框架的介绍
        • ForkJoin框架的异常处理
        • ForkJoin框架的实现原理
      • 多线程调度器ScheduledThreadPoolExecutor
        • Java中的多线程调度器
      • 05ExecutorCompletionService
      • 七JUC工具类
        • 01概述
        • CountDownLatch
        • CyclicBarrier
        • Semaphore
        • Executors
        • Exchanger

(一)基础

01基本概念

并发

什么是并发

并发是一种能力,是一种将程序分为几个片段,在单独的处理器上运行每个片段,而不影响最终结果的能力。

并发的优点

可以显著提高程序在多处理器和多核系统中的速度。

多线程

多线程就是达到并发目的的一种手段。多线程,是指从软件或者硬件上实现多个线程并发执行的技术。应用程序可以使用多线程将程序分割为多个子任务,并让底层体系结构管理线程如何运行,可以并发在一个内核上,也可以并行在多个内核上运行。

线程

线程是可以由调度程序独立管理的最小程序指令序列。

并发与并行

通俗的说,并发是多个任务交替执行,而并行是多个任务同时执行。两者的关键在于“同时”这个关键词。

线程和进程

在计算中,进程是正在执行的计算机程序的一个实例。线程是可以由调度程序独立管理的最小程序指令序列。一个进程可以由多个执行线程组成。

02创建线程

创建线程的方式

  • Runnable-定义无返回值的任务
  • Thread-线程构造器
  • Callable-定义可以返回值的任务

我认为Runnable和Callable的作用只是定义任务,创建线程还是需要Thread构造器来完成。

Thread和Runnable该如何选择?

  • 因为Java不支持多继承,但支持多实现。所以从这个方面考虑Runnable比Thread有优势。
  • Thread中提供了一些基本方法。而Runnable中只有run方法。如果只想重写run()方法,而不重写其他Thread方法,那么应使用Runnable接口。除非打算修改或增强Thread类的基本行为,否则应该选择Runnable。

从上面的分析可以看到,一般情况下Runnable更有优势。

run方法与start方法的区别

启动线程的方法是start方法。线程t启动后,t从新建状态转为就绪状态, 但并没有运行。 t获取CPU权限后才会运行,运行时执行的方法就是run方法。此时有t和主线程两个线程在运行,如果t阻塞,可以直接继续执行主线程中的代码。
直接运行run方法也是合法的,但此时并没有新启动一个线程,run方法是在主线程中执行的。此时只有主线程在运行,必须等到run方法中的代码执行完后才可以继续执行主线程中的代码。

03线程的生命周期

线程的生命周期图

《Java并发编程札记-总结》
此图是根据自己的了解画的,如果有不足或错误欢迎指正。

线程的状态

Java中线程有哪些状态在Thread.State枚举中的介绍得很清楚。六种状态分别是NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING、TERMINATED。

04Thread详解

线程等待与唤醒(wait()、notify()/notifyAll())

wait()作用是在其他线程调用此对象的notify()方法或notifyAll()方法前,或者其他某个线程中断当前线程前,导致当前线程等待。wait(long timeout)、wait(long timeout, int nanos)作用是在已超过某个实际时间量前,或者其他某个线程中断当前线程前,导致当前线程等待。使用wait方法有个条件:当前线程必须持有对象的锁。执行wait后,当前线程会失去对象的锁,状态变为WAITING或者TIMED_WAITING状态。
notify()可以随机唤醒正在等待的多个线程中的一个。被唤醒的线程并不能马上参与对锁的竞争,必须等调用notify的线程释放锁后才能参与对锁的竞争。而且被唤醒的线程在竞争锁时没有任何优势。
同wait方法一样,使用notify方法有个条件:线程必须持有对象的锁。执行notify方法后,线程会继续执行,并不会马上释放对象的锁。所以才有了上文中的“被唤醒的线程并不能马上参与对锁的竞争,必须等调用notify的线程释放锁后才能参与对锁的竞争。”。
notifyAll()与notify()类似,区别是它可以唤醒在此对象监视器上等待的所有线程。

线程让步(yield())

API中对yield()的介绍是可以暂停当前正在执行的线程对象,并执行其他线程。“暂停”代表着让出CPU,但不会释放锁。执行yield()后,当前线程由运行状态变为就绪状态。但不能保证在当前线程调用yield()之后,其它具有相同优先级的线程就一定能获得执行权,也有可能是当前线程又进入到运行状态继续运行!
yield()与无参的wait()的区别:

  • 执行yield()后,当前线程由运行状态变为就绪状态。执行wait后,当前线程会失去对象的锁,状态变为WAITING状态。
  • 执行yield()后,当前线程不会释放锁。执行wait后,当前线程会释放锁。

线程休眠(sleep())

线程的休眠(暂停执行)与sleep(long millis)和sleep(long millis, int nanos)有关。API中的介绍是sleep(long millis) 方法可以在指定的毫秒数内让当前正在执行的线程休眠;sleep(long millis, int nanos) 可以在指定的毫秒数加指定的纳秒数内让当前正在执行的线程休眠。该线程不丢失任何监视器的所属权。简单来说就是sleep方法可以使正在执行的线程让出CPU,但不会释放锁。执行sleep方法后,当前线程由运行状态变为TIMED_WAITING状态。

sleep()与有参的wait()的区别是:

  • 执行sleep()后,当前线程不会释放锁。执行有参的wait()后,当前线程会释放锁。

sleep()与yield()的区别是:

  • 执行sleep后,当前线程状态变为TIMED_WAITING状态。执行yield()后,当前线程由运行状态变为WAITING状态。

线程启动(start())

中断线程(interrupt())

interrupt()常常被用来中断处于阻塞状态的线程。
interrupted()与isInterrupted()都可以测试当前线程是否已经中断。区别在于线程的中断状态由interrupted()清除。换句话说,如果连续两次调用interrupted(),则第二次调用将返回false。

线程优先级

每个线程都有一个优先级,高优先级线程的执行优先于低优先级线程。java 中的线程优先级的范围是1~10,默认的优先级是5。
setPriority(int newPriority)和getPriority()分别用来更改线程的优先级和返回线程的优先级。

线程等待(join())

join()的作用是等待该线程终止,常常被用来让主线程在子线程运行结束之后才能继续运行。如在主线程main中调用了thread.join(),那么主线程会等待thread执行完后才继续执行。join(long millis)、join(long millis , int nanos)功能与join()类似,但限定了等待时间,join(long millis)意味着等待该线程终止的时间最长为millis毫秒,join(long millis , int nanos)意味着等待该线程终止的时间最长为millis毫秒 + nanos 纳秒,超时将主线程将继续执行。join()等价于join(0),即超时为0,意味着要一直等下去。
从源码中可以了解到,join()的实现依赖于wait方法,所以join()释放锁。

守护线程

Java中有两种线程:用户线程和守护线程。守护线程是一种特殊的线程,它的作用是为其他线程提供服务。例如GC线程就是守护线程。当正在运行的线程都是守护线程时,Java虚拟机退出,守护线程自动销毁。
setDaemon(boolean on)用于将该线程标记为守护线程或用户线程。该方法必须在启动线程前调用。isDaemon()用于测试该线程是否为守护线程。

wait(),sleep(),yield(),join()区别

  • wait()方法在Object中定义的,其他只有Thread中有。
  • wait(),join()会释放锁,sleep(),yield()不会。
  • sleep()可以有参数,yield()没有。

05线程安全问题

线程安全问题有哪些

在多线程编程中,可能会出现多个线程访问一个资源的情况,资源可以是同一内存区(变量,数组,或对象)、系统(数据库,web services等)或文件等等。如果不对这样的访问做控制,就可能出现不可预知的结果。这就是线程安全问题,常见的情况是“丢失修改”、“不可重复读”、“读‘脏’数据”等等。

线程安全问题解决方法

  • 内部锁(Synchronized)和显式锁(Lock)。这两种方式是重量级的多线程同步机制,可能会引起上下文切换和线程调度,它同时提供内存可见性、有序性和原子性。
  • volatile:轻量级多线程同步机制,不会引起上下文切换和线程调度。仅提供内存可见性、有序性保证,不提供原子性。
  • CAS原子指令:轻量级多线程同步机制,不会引起上下文切换和线程调度。它同时提供内存可见性、有序性和原子化更新保证。

06synchronized

修饰对象

  • 方法。作用范围是整个方法,作用的对象是调用这个方法的对象;
  • 代码块。作用范围是大括号{}括起来的代码,作用的对象是调用这个代码块的对象;
  • 静态方法。作用范围是整个静态方法,作用的对象是这个类的所有对象;
  • 类。作用范围是synchronized后面括号括起来的部分,作用的对象是这个类的所有对象。

注意事项

  • 将域设置为private。在使用并发时,要将域设置为private,否则synchronized就不能阻止其他任务直接访问域,这样可能会产生不可预知的结果。
  • 一个任务可以多次获得对象的锁。如果一个任务在同一个对象上调用了第二个方法,后者又调用了同一个对象上的第三个方法,这个任务就会多次获取这个对象的锁。每当任务执行所有的方法,锁才被完全释放。
  • 每个访问临界资源的方法都必须被同步。如果在你的类中有超过一个方法在处理临界数据,那么必须同步所有的方法。如果只同步一个方法,其他方法可以忽略这个锁。所以,每个访问临界资源的方法都必须被同步。
  • 异常自动释放锁 。当一个线程执行的代码出现异常时,其所持有的锁会自动释放。

07volatile详解

什么是volatile

Java中的volatile可以看做是“轻量级的synchronized”。synchronized可能会引起上下文切换和线程调度,同时保证可见性、有序性和原子性。volatile不会引起上下文切换和线程调度,但仅提供可见性和有序性保证,不提供原子性保证。

原子操作与原子性

如果一系列(或者一个)操作是不可中断的,要么都执行,要么不执行,就称操作是原子操作,具有原子性。
拿移动支付举例,A用户向B用户付款100元,其中包含两个操作:A用户账户扣减100元,B用户账户增加100元。如果这两个操作不是原子操作就可能会出错,比如A账户账户扣减100元,但B用户账户并没有增加100元。

可见性

可见性指的是多个线程对共享资源的可见性。当一个线程修改了某一资源,其他线程能够看到修改结果。

有序性

有效性指程序按照代码的先后顺序执行。

synchronized锁与volatile变量的比较

  • volatile变量最大的优点是使用方便。在某些情形下,使用volatile变量要比使用相应的synchronized锁简单得多。
  • 某些情况下,volatile变量同步机制的性能要优于synchronized锁。
  • volatile变量不会像synchronized锁一样造成阻塞。
  • volatile变量最大的缺点在于使用范围有限,而且容易出错。

总的来说volatile变量使用范围有限,不能替代synchronized,但在某些场景下,使用volatile更好。

(二)JUC概述

从今天开始学习JUC。JUC是java.util.concurrent包的简称。下图是JUC的整体结构。参考JDK1.8的java.util.concurrent,画出下图。

《Java并发编程札记-总结》

atomic

以下是JUC中的原子类。
《Java并发编程札记-总结》

locks

以下是JUC中的锁,也称显示锁。

《Java并发编程札记-总结》

collections

以下是JUC中的集合。

《Java并发编程札记-总结》

threadPool

以下是JUC中与线程池有关的类。

《Java并发编程札记-总结》

tools

以下是JUC中的工具类。

《Java并发编程札记-总结》

(三)JUC原子类

01概述

参考JDK1.8的java.util.concurrent.atomic包,画出如下图:
《Java并发编程札记-总结》

可以将包中的类分为五类:

  • 基本类型:AtomicBoolean、AtomicInteger、AtomicLong
  • 引用类型:AtomicReference、AtomicStampedRerence、AtomicMarkableReference
  • 数组:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
  • 对象的属性:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater
  • JDK1.8新增:DoubleAccumulator、LongAccumulator、DoubleAdder、LongAdder

基本类型

AtomicBoolean、AtomicInteger、AtomicLong和AtomicReference的实例各自提供对相应类型单个变量的原子方式访问和更新功能。例如AtomicBoolean提供对int类型单个变量的原子方式访问和更新功能。
每个类也为该类型提供适当的实用工具方法。例如,类AtomicLong和AtomicInteger提供了原子增量方法,可以用于生成序列号。

引用类型

AtomicStampedRerence维护带有整数“标志”的对象引用,可以用原子方式对其进行更新。AtomicMarkableReference维护带有标记位的对象引用,可以原子方式对其进行更新。

数组

AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray类进一步扩展了原子操作,对这些类型的数组提供了支持。例如AtomicIntegerArray是可以用原子方式更新其元素的int数组。

对象的属性

AtomicReferenceFieldUpdater、AtomicIntegerFieldUpdater和AtomicLongFieldUpdater是基于反射的实用工具,可以提供对关联字段类型的访问。例如AtomicIntegerFieldUpdater可以对指定类的指定volatile int字段进行原子更新。

JDK1.8新增

DoubleAccumulator、LongAccumulator、DoubleAdder、LongAdder是JDK1.8新增的部分,是对AtomicLong等类的改进。比如LongAccumulator与LongAdder在高并发环境下比AtomicLong更高效。

原子类可以替换锁吗?

原子类不是锁的常规替换方法。仅当对象的重要更新限定于单个变量时才应用它。

原子类和java.lang.Integer等类的区别

原子类不提供诸如hashCode和compareTo之类的方法。因为原子变量是可变的。

为什么只提供了int、long、boolean这几种基本类型的原子类?

待补充。

原子类的实现原理

原子类是基于CAS实现的。

07CAS

CAS,compare and swap的缩写,意为比较并交换。CAS操作包含三个操作数:内存值(V),预期值(A)、新值(B)。如果内存值与预期值相同,就将内存值修改为新值,否则不做任何操作。

java.util.concurrent.atomic是建立在CAS之上的。下面以AtomicLong为例看下是如何使用CAS的。
下面看下AtomicLong的compareAndSet方法。

// Java不能直接访问操作系统底层,所以使用Unsafe类提供硬件级别的原子操作。
//Unsafe.compareAndSwapLong是CAS操作
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

/** * 如果当前内存值等于预期值,原子更新当前值为新值value * * @param expect 预期值 * @param update 新值 * @return {@code true} 如果成功,则返回 true。返回 false 指示实际值与预期值不相等。 */
public final boolean compareAndSet(long expect, long update) {
    //使用unsafe来实现CAS
    return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}

从源码中可以看到,AtomicLong.compareAndSet利用unsafe..compareAndSwapLong(this, valueOffset, expect, update)实现CAS操作,而unsafe通过调用JNI来完成CPU指令的操作。JNI是Java Native Interface的缩写,允许Java调用其他语言,unsafe.compareAndSwapLong方法就是借助C来调用CPU底层指令实现。其他的原子类中也大量使用了类似unsafe..compareAndSwap×××的方式。

CAS缺点

  • ABA问题:CAS操作先取出内存值,然后才将内存值与期望值比较。这当中可能会出现某些问题,比如,thread1获得了内存值V,thread2也从内存中取出V,并且thread2进行了一些操作将内存值修改为B,然后two又将内存值修改为V,当前线程的CAS操作无法分辨内存值V是否发生过变化。尽管CAS成功,但可能存在潜在的问题。举个生活中的例子,你倒了一杯水,然后有事离开,回来后看到还是一杯水,但你不能确定是不是有人已经把这杯水喝掉然后又给你倒了一杯水。尽管还是一杯水,但已经不一样了,而且可能存在潜在的问题。解决问题的方法是对“这杯水”设置一个标记,这样回来时就可以判断“这杯水”是不是被动过。AtomicStampedReference和AtomicMarkableReference可以实现标记的功能。

(四)JUC锁

01概述

JUC锁位于java.util.concurrent.locks包下,为锁和等待条件提供一个框架的接口和类,它不同于内置同步和监视器。参考JDK1.8的java.util.concurrent.locks包,画出如下图:
《Java并发编程札记-总结》
CountDownLatch,CyclicBarrier和Semaphore不在包中,但也是通过AQS来实现的。因此,我也将它们归纳到JUC锁中进行介绍。

Lock
Lock实现提供了比使用synchronized方法和语句可获得的更广泛的锁定操作。

ReentrantLock
一个可重入的互斥锁,它具有与隐式锁synchronized相同的一些基本行为和语义,但功能更强大。

AbstractOwnableSynchronizer/AbstractQueuedSynchronizer/AbstractQueuedLongSynchronizer
AbstractQueuedSynchronizer就是被称之为AQS的类,为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架。ReentrantLock,ReentrantReadWriteLock,CountDownLatch,CyclicBarrier和Semaphore等这些类都是基于AQS类实现的。
AbstractQueuedLongSynchronizer以long形式维护同步状态的一个AbstractQueuedSynchronizer版本。
AbstractQueuedSynchronizer与AbstractQueuedLongSynchronizer都继承了AbstractOwnableSynchronizer。AbstractOwnableSynchronizer是可以由线程以独占方式拥有的同步器。

Condition
Condition又称等待条件,它实现了对锁更精确的控制。Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的wait(),notify(),notifyAll()方法是和synchronized组合使用的;而Condition需要与Lock组合使用。

ReentrantReadWriteLock
ReentrantReadWriteLock维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。

LockSupport
用来创建锁和其他同步类的基本线程阻塞原语。

CountDownLatch
一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

CyclicBarrier
一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点。

Semaphore
一个计数信号量。从概念上讲,信号量维护了一个许可集。Semaphore通常用于限制可以访问某些资源的线程数目。

02Lock与ReentrantLock

synchronized 与Lock比较

  • 与synchronized 相比Lock的使用更灵活。Lock接口的实现允许锁在不同的范围内获取和释放,并支持以任何顺序获取和释放多个锁。
  • ReentrantLock具有与使用 synchronized 相同的一些基本行为和语义,但功能更强大。包括提供了一个非块结构的获取锁尝试 (tryLock())、一个获取可中断锁的尝试 (lockInterruptibly()) 和一个获取超时失效锁的尝试 (tryLock(long, TimeUnit))。
  • ReentrantLock具有synchronized所没有的许多特性,比如时间锁等候、可中断锁等候、无块结构锁、多个条件变量或者轮询锁。
  • ReentrantLock可伸缩性强,应当在高度争用的情况下使用它。

特性

  • ReentrantLock是一个可重入的互斥锁。
  • ReentrantLock既可以是公平锁又可以是非公平锁。当此类的构造方法ReentrantLock(boolean fair) 接收true作为参数时,ReentrantLock就是公平锁。

03AQS

AQS,AbstractQueuedSynchronizer的缩写,是JUC中非常重要的一个类。javadoc中对其的介绍是:

为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。

AbstractQueuedSynchronizer是CountDownLatch、ReentrantLock、RenntrantReadWriteLock、Semaphore等类实现的基础。
待补充。。。

04Condition简介

Condition地位

在任务协作中,关键问题是任务之间的通信。握手可以通过Object的监视器方法(wait()和notify()/notifyAll())和synchronized方法和语句来安全地实现。Java SE5的JUC提供了具有await()和signal()/signalAll()方法的Condition和Lock来实现。其中,Lock代替了synchronized方法和语句,Condition代替了Object的监视器方法。与Object相比,Condition可以更精细地控制线程的休眠与唤醒。

与Object监视器监视器方法的比较

对比项ConditionObject监视器备注
使用条件获取锁获取锁,创建Condition对象
等待队列的个数一个多个
是否支持通知指定等待队列支持不支持
是否支持当前线程释放锁进入等待状态支持支持
是否支持当前线程释放锁并进入超时等待状态支持支持
是否支持当前线程释放锁并进入等待状态直到指定最后期限支持不支持
是否支持唤醒等待队列中的一个任务支持支持
是否支持唤醒等待队列中的全部任务支持支持

05ReentrantReadWriteLock

ReentrantReadWriteLock是一种共享锁。ReadWriteLock维护了两个锁,读锁和写锁,所以一般称其为读写锁。写锁是独占的。读锁是共享的,如果没有写锁,读锁可以由多个线程共享。与互斥锁相比,虽然一次只能有一个写线程可以修改共享数据,但大量读线程可以同时读取共享数据,所以,在共享数据很大,且读操作远多于写操作的情况下,读写锁值得一试。
ReentrantReadWriteLock具有以下特性:(有待详细介绍)

  • 公平性
  • 重入性
  • 锁降级
  • 锁获取中断
  • 支持Condition
  • 检测系统状态

优点
与互斥锁相比,虽然一次只能有一个写线程可以修改共享数据,但大量读线程可以同时读取共享数据。在共享数据很大,且读操作远多于写操作的情况下,ReentrantReadWriteLock值得一试。

缺点
只有当前没有线程持有读锁或者写锁时才能获取到写锁,这可能会导致写线程发生饥饿现象,即读线程太多导致写线程迟迟竞争不到锁而一直处于等待状态。StampedLock可以解决这个问题,解决方法是如果在读的过程中发生了写操作,应该重新读而不是直接阻塞写线程。

06LockSupport

简介

LockSupport是JUC锁中比较基础的类,用来创建锁和其他同步类的基本线程阻塞原语。比如,在AQS中就使用LockSupport作为基本线程阻塞原语。它的park()和unpark()方法分别用于阻塞线程和解除阻塞线程。与Thread.suspend()相比,它没有由于resume()在前发生,导致线程无法继续执行的问题。和Object.wait()对比,它不需要先获得某个对象的锁,能够响应中断请求(中断状态被设置成true),也不会抛出InterruptException异常。

许可

此类以及每个使用它的线程与一个许可关联。如果许可可用,当前线程可获取许可执行。如果许可不可用,调用park()后,当前线程阻塞,等待获取许可。unpark(Thread thread)可使指定线程的许可可用。这与Semaphore相似,但LockSupport最多只能有一个许可。

blocker

三种形式的park(park(Object blocker)、parkNanos(Object blocker, long nanos)、parkUntil(Object blocker, long deadline))都支持blocker对象参数。此对象在线程受阻塞时被记录,以允许监视工具和诊断工具确定线程受阻塞的原因。监视工具和诊断工具可以使用方法getBlocker(java.lang.Thread)访问 blocker。建议最好使用这些形式,而不是不带此参数的原始形式。在锁实现中提供的作为blocker的普通参数是this。

响应中断

Object.wait()对比,LockSupport.park();不需要先获得某个对象的锁,能够响应中断请求(中断状态被设置成true),也不会抛出InterruptException异常。

07读写锁的升级—StampedLock

为什么读写锁需要升级

StampedLock是JDK1.8新增的一个锁,是对读写锁ReentrantReadWriteLock的改进。前面已经学习了ReentrantReadWriteLock,我们了解到,在共享数据很大,且读操作远多于写操作的情况下,ReentrantReadWriteLock值得一试。但要注意的是,只有当前没有线程持有读锁或者写锁时才能获取到写锁,这可能会导致写线程发生饥饿现象,即读线程太多导致写线程迟迟竞争不到锁而一直处于等待状态。StampedLock可以解决这个问题,解决方法是如果在读的过程中发生了写操作,应该重新读而不是直接阻塞写线程。

读/写模式

  • 写。独占锁,只有当前没有线程持有读锁或者写锁时才能获取到该锁。方法writeLock()返回一个可用于unlockWrite(long)释放锁的方法的戳记。tryWriteLock()提供不计时和定时的版本。
  • 读。共享锁,如果当前没有线程持有写锁即可获取该锁,可以由多个线程获取到该锁。方法readLock()返回可用于unlockRead(long)释放锁的方法的戳记。tryReadLock()也提供不计时和定时的版本。
  • 乐观读。方法tryOptimisticRead()仅当锁定当前未处于写入模式时,方法才会返回非零戳记。返回戳记后,需要调用validate(long stamp)方法验证戳记是否可用。也就是看当调用tryOptimisticRead返回戳记后到到当前时间是否有其他线程持有了写锁,如果有,返回false,否则返回true,这时就可以使用该锁了。

08CountDownLatch

CountDownLatch是一个通用同步器,用于同步一个或多个任务。在完成一组正在其他线程中执行的任务之前,它允许一个或多个线程一直等待。
可以用一个初始计数值来初始化CountDownLatch对象,任何在这个对象上调用await()的方法都将阻塞,直至计数值到达0。每完成一个任务,都可以在这个对象上调用countDown()减少计数值。当计数值减为0,所有等待的线程都会被释放。CountDownLatch的计数值不能重置。如果需要重置计数器,请考虑使用CyclicBarrier。

09CyclicBarrier

CyclicBarrier允许一组线程互相等待,直到到达某个公共屏障点。如果你希望一组并行的任务在下个步骤之前相互等待,直到所有的任务都完成了下个步骤前的所有操作,才继续向前执行,那么CyclicBarrier很合适。

CyclicBarrier与CountDownLatch的比较

看过CyclicBarrier的方法列表后,有没有发现CyclicBarrier与CountDownLatch比较像。它们之间最大的区别在于CyclicBarrier的计数器可以重置,相当于可以循环使用。cyclic,意为可循环的,barrier,意为屏障,刚好映照了CyclicBarrier的两个特点。

10Semaphore简介

一般的锁在任意时刻只允许一个线程访问一项资源,而计数信号量允许n个任务同时访问一项资源。我们可以将信号量看做一个许可集,可以向线程分发使用资源的许可证。获得资源前,线程调用acquire()从许可集中获取许可。该线程结束后,通过release()将许可还给许可集。

(五)JUC容器

01概述

继承实现关系图

JUC提供了用于多线程上下文中的Collection实现与高效的、可伸缩的、线程安全的非阻塞FIFO队列。参考JDK1.8,画出下图。
《Java并发编程札记-总结》

List

JUC容器中List的实现只有CopyOnWriteArrayList。CopyOnWriteArrayList相当于线程安全的ArrayList。

Set

JUC容器中Set的实现有CopyOnWriteArraySet与ConcurrentSkipListSet。CopyOnWriteArraySet相当于线程安全的HashSet,ConcurrentSkipListSet相当于线程安全的TreeSet。当set 大小通常保持很小,只读操作远多于可变操作,需要在遍历期间防止线程间的冲突时,CopyOnWriteArraySet优于同步的HashSet。ConcurrentSkipListSet是一个基于 ConcurrentSkipListMap 的可缩放并发 NavigableSet 实现。set的元素可以根据它们的自然顺序进行排序,也可以根据创建set时所提供的 Comparator 进行排序,具体取决于使用的构造方法。CopyOnWriteArraySet的实现依赖于CopyOnWriteArrayList。ConcurrentSkipListSet的实现依赖于ConcurrentSkipListMap。所以CopyOnWriteArraySet会在CopyOnWriteArrayList之后学习,ConcurrentSkipListSet会在ConcurrentSkipListMap之后学习。

Map

JUC容器中Map的实现只有ConcurrentHashMap和ConcurrentSkipListMap。
ConcurrentHashMap是线程安全的哈希表,相当于线程安全的HashMap。ConcurrentSkipListMap是线程安全的有序的哈希表,相当于线程安全的TreeMap。

Queue

JUC容器中Queue的常用实现有ArrayBlockingQueue、LinkedBlockingQueue、LinkedBlockingDeque、ConcurrentLinkedDeque和ConcurrentLinkedQueue。

  • ArrayBlockingQueue是一个由基于数组的、线程安全的、有界阻塞队列。
  • LinkedBlockingQueue是一个基于单向链表的、可指定大小的阻塞队列。
  • LinkedBlockingDeque是一个基于单向链表的、可指定大小的双端阻塞队列。
  • ConcurrentLinkedDeque是一个基于双向链表的、无界的队列。
  • ConcurrentLinkedQueue是一个基于单向链表的、无界的队列。

02CopyOnWrite

写时复制

CopyOnWrite,简称COW。所谓写时复制,即读操作时不加锁以保证性能不受影响,写操作时加锁,复制资源的一份副本,在副本上执行写操作,写操作完成后将资源的引用指向副本。高并发环境下,当读操作次数远远大于写操作次数时这种做法可以大大提高读操作的效率。

写操作时使用的锁是ReentrantLock。

底层数组

CopyOnWriteArrayList底层仍是数组。为了当写操作改变了底层数组array时,读操作可以得知这个消息,需要使用volatile来保证array的可见性。

缺点

有利就有弊,写时复制提高了读操作的性能,但写操作时内存中会同时存在资源和资源的副本,可能会占用大量的内存。

03ConcurrentHashMap

锁分段

《Java并发编程札记-总结》
在JDK1.7中,ConcurrentHashMap通过“锁分段”来实现线程安全。ConcurrentHashMap将哈希表分成许多片段(segments),每一个片段(table)都类似于HashMap,它有一个HashEntry数组,数组的每项又是HashEntry组成的链表。每个片段都是Segment类型的,Segment继承了ReentrantLock,所以Segment本质上是一个可重入的互斥锁。这样每个片段都有了一个锁,这就是“锁分段”。

JDK1.8结构

《Java并发编程札记-总结》
在JDK1.8中,ConcurrentHashMap放弃了“锁分段”,取而代之的是类似于HashMap的数组+链表+红黑树结构,使用CAS算法和synchronized实现线程安全。

ConcurrentSkipListMap

ConcurrentSkipListMap与TreeMap底层结构的不同

ConcurrentSkipListMap是线程安全的有序的哈希表。与同是有序的哈希表TreeMap相比,ConcurrentSkipListMap是线程安全的,TreeMap则不是,且ConcurrentSkipListMap是通过跳表(skip list)实现的,而TreeMap是通过红黑树实现的。至于为什么ConcurrentSkipListMap不像TreeMap一样使用红黑树结构,在ConcurrentSkipListMap源码中Doug Lea已经给出解释:

The reason is that there are no known efficient lock-free insertion and deletion algorithms for search trees.

有必要详细了解下skip list。

skip list

引用作者William Pugh的一句话:

Skip lists are a probabilistic data structure that seem likely to supplant balanced trees as the implementation method of choice for many applications. Skip list algorithms have the same asymptotic expected time bounds as balanced trees and are simpler, faster and use less space.

大意为跳过列表是一种概率数据结构,可能取代平衡树作为许多应用程序的实现方法。跳过列表算法具有与平衡树相同的渐近期望时间界限,并且更简单,更快速并且使用更少的空间。下图是skip list的数据结构示意图。
《Java并发编程札记-总结》
从图中可以看出跳表主要有以下几个成员构成:

  • Node:节点,保存map元素值。有三个属性,key、value、指向下个node的指针next。
  • Index:索引节点。有三个属性,指向最底层node的指针、指向下一层的index的指针down、指向本层中的下个index的指针right。
  • HeadIndex:索引头节点。除了有Index的所有属性外,还有一个表示此索引所在层的属性。
  • NULL:表尾,全部为NULL。

05ArrayBlockingQueue与LinkedBlockingQueue

06LinkedBlockingDeque

LinkedBlockingDeque是一个基于链表的、可指定大小的阻塞双端队列。“双端队列”意味着可以操作队列的头尾两端,所以LinkedBlockingDeque既支持FIFO,也支持FILO。
可选的容量范围构造方法参数是一种防止过度膨胀的方式。如果未指定容量,那么容量将等于 Integer.MAX_VALUE。只要插入元素不会使双端队列超出容量,每次插入后都将动态地创建链接节点。

07ConcurrentLinkedQueue

ConcurrentLinkedQueue是一个基于链表的、无界的、线程安全的队列。此队列按照FIFO原则对元素进行排序。此队列不允许使用null元素,采用了有效的“无等待(wait-free)”算法(CAS算法)。
与大多数collection不同,size方法不是一个固定时间操作。由于这些队列的异步特性,确定当前元素的数量需要遍历这些元素。

(六)线程池

01概述

前面的例子中总是需要线程时就创建,不需要就销毁它。但频繁创建和销毁线程是很耗资源的,在并发量较高的情况下频繁创建和销毁线程会降低系统的效率。线程池可以通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
参考JDK1.8中的相关类,画出下图。
《Java并发编程札记-总结》
(此图不是十分准确,有些类实现了两个接口,这里只展示出了一个)
本章只是简单地介绍下它们,在以后的文章中会选一些最重要的来学习。

Executor

此接口提供一种将任务提交与每个任务将如何运行的机制分离开来的方法。它只提供了execute(Runnable)这么一个方法,用于执行已提交的Runnable任务。

ExecutorService

继承了Executor接口,用于提交一个用于执行的Runnable任务、试图停止所有正在执行的活动任务,暂停处理正在等待的任务、执行给定的任务。

AbstractExecutorService

提供了ExecutorService的默认实现。

ThreadPoolExecutor

提供一个可扩展的线程池实现,是最出名的“线程池”。

ForkJoinPool

JDK1.7中新增的一个线程池,与ThreadPoolExecutor一样,同样继承了AbstractExecutorService。ForkJoinPool是Fork/Join框架的两大核心类之一。与其它类型的ExecutorService相比,其主要的不同在于采用了工作窃取算法(work-stealing):所有池中线程会尝试找到并执行已被提交到池中的或由其他线程创建的任务。这样很少有线程会处于空闲状态,非常高效。这使得能够有效地处理以下情景:大多数由任务产生大量子任务的情况;从外部客户端大量提交小任务到池中的情况。

ScheduledThreadPoolExecutor

ScheduledExecutorService继承了ExecutorService,可安排在给定的延迟后运行或定期执行命令。
ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,实现了ScheduledExecutorService。

ExecutorCompletionService

CompletionService接口是将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者利用submit()提交要执行的任务。使用者利用take()获取并移除已完成的任务的返回值,并按照完成这些任务的顺序处理它们的结果。
通常,CompletionService 依赖于一个单独的 Executor 来实际执行任务,在这种情况下,CompletionService 只管理一个内部完成队列。ExecutorCompletionService 类提供了此方法的一个实现。此类将那些完成时提交的任务放置在可使用take()访问的队列上。

Callable&Future

Callable接口类似于Runnable,两者作用都是定义任务。不同的是,被线程执行后,Callable可以返回结果或抛出异常。而Runnable不可以。Callable的返回值可以通过Future来获取。

02ThreadPoolExecutor

为什么要使用线程池

许多服务器都面临着处理大量客户端远程请求的压力,如果每收到一个请求,就创建一个线程来处理,表面看是没有问题的,但实际上存在着很严重的缺陷。服务器应用程序中经常出现的情况是请求处理的任务很简单但客户端的数目却是庞大的,这种情况下如果还是每收到一个请求就创建一个线程来处理它,服务器在创建和销毁线程所花费的时间和资源可能比处理客户端请求处理的任务花费的时间和资源更多。为了缓解服务器压力,需要解决频繁创建和销毁线程的问题。线程池可以实现这个需求。

什么是线程池

线程池可以看做是许多线程的集合。在没有任务时线程处于空闲状态,当请求到来,线程池给这个请求分配一个空闲的线程,任务完成后回到线程池中等待下次任务。这样就实现了线程的重用。线程池会通过相应的调度策略和拒绝策略,对添加到线程池中的线程进行管理。

工作模型

《Java并发编程札记-总结》
工作模型中一共有三种队列:正在执行的任务队列,等待被执行的阻塞队列,等待被commit进阻塞队列中的任务队列。

Java中的线程池

Java中常用的线程池有三个,最出名的当然是ThreadPoolExecutor,除此之外还有ScheduledThreadPoolExecutor、ForkJoinPool。

创建ThreadPoolExecutor线程池

  • 强烈推荐使用Executors工厂方法创建线程池,如Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和 Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预定义了设置。
  • 通过构造方法手动配置线程池

构造方法

ThreadPoolExecutor一共有四个构造方法,其他三个构造方法都是通过上述的构造方法来实现的。毫无疑问手动配置线程池的关键就是学好构造方法中的几个参数如何设置。这几个参数对应着ThreadPoolExecutor中的几个成员属性。

corePoolSize与maximumPoolSize

corePoolSize与maximumPoolSize分别是核心池大小与最大池大小。在源码中的声明为
private volatile int corePoolSize;private volatile int maximumPoolSize;
当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。

workQueue

workQueue是线程池工作模型中的阻塞队列,用于传输和保持提交的任务。在源码中的声明为private final BlockingQueue workQueue;。

keepAliveTime

keepAliveTime是池中线程空闲时的活动时间。如果池中当前有多于 corePoolSize 的线程,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止(参见 getKeepAliveTime(java.util.concurrent.TimeUnit))。这提供了当池处于非活动状态时减少资源消耗的方法。如果池后来变得更为活动,则可以创建新的线程。也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 动态地更改此参数。使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 的值在关闭前有效地从以前的终止状态禁用空闲线程。默认情况下,保持活动策略只在有多于 corePoolSizeThreads 的线程时应用。但是只要 keepAliveTime 值非 0, allowCoreThreadTimeOut(boolean) 方法也可将此超时策略应用于核心线程。

threadFactory

threadFactory是一个线程集合。线程池可以使用ThreadFactory创建新线程。如果没有另外说明,则在同一个 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 创建线程,并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。

handler

handler是线程池拒绝策略,RejectedExecutionHandler类型的对象。当 Executor 已经关闭,并且 Executor 将有限边界用于最大线程和工作队列容量,且已经饱和时,在方法 execute(java.lang.Runnable) 中提交的新任务将被拒绝。在以上两种情况下, execute 方法都将调用其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四种预定义的处理程序策略:

  • ThreadPoolExecutor.AbortPolicy ,默认策略,处理程序遭到拒绝将抛出运行时 RejectedExecutionException。
  • ThreadPoolExecutor.CallerRunsPolicy,线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
  • ThreadPoolExecutor.DiscardPolicy,不能执行的任务将被删除。
  • ThreadPoolExecutor.DiscardOldestPolicy,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。

排队策略

排队有三种通用策略:

  • SynchronousQueue。它将任务直接传输给工作队列workers,而不保持任务。如果不存在空闲线程,则会新建一个线程来执行任务。比如,在Executors.newCachedThreadPool()方法中使用的就是此策略。
  • LinkedBlockingQueue。无界队列,使用此队列会导致在所有corePoolSize线程都忙时新任务在队列中等待。这样,创建的线程就不会超过corePoolSize。比如,在Executors.newFixedThreadPool()和Executors.newSingleThreadExecutor()方法中使用的就是此策略。
  • ArrayBlockingQueue 。有界队列,没见到在哪里用到了这种策略。

线程池状态

源码已经告诉了我们线程池有几个状态。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

可以看出,一共有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五种状态。ctl对象一共32位,高3位保存线程池状态信息,后29位保存线程池容量信息。线程池的初始化状态是RUNNING,在源码中体现为private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

状态高三位工作队列workers中的任务阻塞队列workQueue中的任务未添加的任务
RUNNING111继续处理继续处理添加
SHUTDOWN000继续处理继续处理不添加
STOP001尝试中断不处理不添加
TIDYING010处理完了如果由SHUTDOWN – TIDYING ,那就是处理完了;如果由STOP – TIDYING ,那就是不处理不添加
TERMINATED011同TIDYING同TIDYING同TIDYING

各个状态的转换图如下所示
《Java并发编程札记-总结》

执行任务

execute()分三种情况处理任务
case1:如果线程池中运行的线程数量<corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。
case2:如果线程池中运行的线程数量>=corePoolSize,且线程池处于RUNNING状态,且把提交的任务成功放入阻塞队列中,就再次检查线程池的状态,1.如果线程池不是RUNNING状态,且成功从阻塞队列中删除任务,则该任务由当前 RejectedExecutionHandler 处理。2.否则如果线程池中运行的线程数量为0,则通过addWorker(null, false)尝试新建一个线程,新建线程对应的任务为null。
case3:如果以上两种case不成立,即没能将任务成功放入阻塞队列中,且addWoker新建线程失败,则该任务由当前 RejectedExecutionHandler 处理。

submit()方法是通过调用execute(Runnable)实现的。

shutdown()与shutdownNow()区别
  • 调用shutdown()后,线程池状态立刻变为SHUTDOWN,而调用shutdownNow(),线程池状态立刻变为STOP。
  • shutdown()通过中断空闲线程、不接受新任务的方式按过去执行已提交任务的顺序发起一个有序的关闭,shutdownNow()无差别地停止所有的活动执行任务,暂停等待任务的处理。也就是说,shutdown()等待任务执行完才中断线程,而shutdownNow()不等任务执行完就中断了线程。

Fork/Join框架

1. 什么是Fork/Join框架

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+。。+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。Fork/Join的运行流程图如下:
《Java并发编程札记-总结》

2. 工作窃取算法

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。工作窃取的运行流程图如下:
《Java并发编程札记-总结》
那么为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

3. Fork/Join框架的介绍

我们已经很清楚Fork/Join框架的需求了,那么我们可以思考一下,如果让我们来设计一个Fork/Join框架,该如何设计?这个思考有助于你理解Fork/Join框架的设计。

第一步分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。

第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

Fork/Join使用两个类来完成以上两件事情:

  • ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:
    • RecursiveAction:用于没有返回结果的任务。
    • RecursiveTask :用于有返回结果的任务。
  • ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

5. Fork/Join框架的异常处理

ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常。使用如下代码:

if(task.isCompletedAbnormally())
{
    System.out.println(task.getException());
}

getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。

6. Fork/Join框架的实现原理

ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。

多线程调度器(ScheduledThreadPoolExecutor)

Java中的多线程调度器

我们如果要用java默认的线程池来做调度器,一种选择就是Timer和TimerTask的结合。一个Timer为一个单独的线程,虽然一个Timer可以调度多个TimerTask,但是对于一个Timer来讲是串行的。多线程调度器ScheduledThreadPoolExecutor,也就是定时任务,基于多线程调度完成,当然你可以为了完成多线程使用多个Timer,只是这些Timer的管理需要你来完成,不是一个框架体系,而ScheduleThreadPoolExecutor提供了这个功能。

05ExecutorCompletionService

CompletionService接口是将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者利用submit()提交要执行的任务。使用者利用take()获取并移除已完成的任务的返回值,并按照完成这些任务的顺序处理它们的结果。

通常,CompletionService 依赖于一个单独的 Executor 来实际执行任务,在这种情况下,CompletionService 只管理一个内部完成队列。ExecutorCompletionService 类提供了此方法的一个实现。此类将那些完成时提交的任务放置在可使用take()访问的队列上。

(七)JUC工具类。

01概述

《Java并发编程札记-总结》

CountDownLatch

CountDownLatch是一个通用同步器,用于同步一个或多个任务。在完成一组正在其他线程中执行的任务之前,它允许一个或多个线程一直等待。

CyclicBarrier

CyclicBarrier允许一组线程互相等待,直到到达某个公共屏障点。如果你希望一组并行的任务在下个步骤之前相互等待,直到所有的任务都完成了下个步骤前的所有操作,才继续向前执行,那么CyclicBarrier很合适。

Semaphore

一般的锁在任意时刻只允许一个线程访问一项资源,而计数信号量允许n个任务同时访问一项资源。我们可以将信号量看做一个许可集,可以向线程分发使用资源的许可证。获得资源前,线程调用acquire()从许可集中获取许可。该线程结束后,通过release()将许可还给许可集。

Executors

配置线程池是比较复杂的工作,为了方便用户使用,Executors中为创建Executor、ExecutorService、ScheduledExecutorService、ThreadFactory和Callable对象提供了便捷的静态工厂方法。比如

  • callable(Runnable task, T result) :返回 Callable 对象,调用它时可运行给定的任务并返回给定的结果。
  • defaultThreadFactory() :返回用于创建新线程的默认线程工厂。
  • newCachedThreadPool() :创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。
  • newFixedThreadPool(int nThreads) :创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。
  • newScheduledThreadPool(int corePoolSize) :创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
  • newSingleThreadExecutor() :创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
  • newSingleThreadScheduledExecutor() :创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。
  • newWorkStealingPool():创建work-stealing线程池,JDK1.8新增

Exchanger

Exchanger用于在线程之间传输数据。

    原文作者:java并发
    原文地址: http://blog.csdn.net/panweiwei1994/article/details/79038846
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞