java 并发(concurrent)包源码分析

参考连接:

http://www.cnblogs.com/luoxn28/p/6059881.html

http://www.cnblogs.com/java-zhao/p/5140158.html

持续更新中。。。。。

并发是一种能并行运行多个程序或并行运行一个程序中多个部分的能力。如果程序中一个耗时的任务能以异步或并行的方式运行,那么整个程序的吞吐量和可交互性将大大改善。现代的PC都有多个CPU或一个CPU中有多个核,是否能合理运用多核的能力将成为一个大规模应用程序的关键。

  Java多线程相关类的实现都在Java的并发包concurrent,concurrent包主要包含3部分内容,第一个是atomic包,里面主要是一些原子类,比如AtomicInteger、AtomicIntegerArray等;第二个是locks包,里面主要是锁相关的类,比如ReentrantLock、Condition等;第三个就是属于concurrent包的内容,主要包括线程池相关类(Executors)、阻塞集合类(BlockingQueue)、并发Map类(ConcurrentHashMap)等。

《java 并发(concurrent)包源码分析》

/*************************************************************************************************************************************************************/

atomic包

/*************************************************************************************************************************************************************/

        JDK1.5中引入了底层的支持,在int、long和对象的引用等类型上都公开了CAS的操作,并且JVM把它们编译为底层硬件提供的最有效的方法,在运行CAS的平台上,运行时把它们编译为相应的机器指令。在java.util.concurrent.atomic包下面的所有的原子变量类型中,比如AtomicInteger,都使用了这些底层的JVM支持为数字类型的引用类型提供一种高效的CAS操作。

  Unsafe中的操作一般都是基于CAS来实现的,CAS就是Compare and Swap的意思,比较并操作。很多的cpu直接支持CAS指令。CAS是一项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。CAS有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。

        下面就以AtomicInteger为例。

        在没有AtomicInteger之前,对于一个Integer的线程安全操作,是需要使用同步锁来实现的,当然现在也可以通过ReentrantLock来实现,但是最好最方便的实现方式是采用AtomicInteger。

测试代码:

 1 package com.collection.test;
 2 
 3 import java.util.concurrent.atomic.AtomicInteger;
 4 
 5 /**
 6  * 原子类的测试
 7  */
 8 public class AtomicTest {
 9     private static AtomicInteger atomicInteger = new AtomicInteger();
10     
11     //获取当前值
12     public static void getCurrentValue(){
13         System.out.println(atomicInteger.get());//-->0
14     }
15     
16     //设置value值
17     public static void setValue(){
18         atomicInteger.set(12);//直接用12覆盖旧值
19         System.out.println(atomicInteger.get());//-->12
20     }
21     
22     //根据方法名称getAndSet就知道先get,则最后返回的就是旧值,如果get在后,就是返回新值
23     public static void getAndSet(){
24         System.out.println(atomicInteger.getAndSet(15));//-->12
25     }
26     
27     public static void getAndIncrement(){
28         System.out.println(atomicInteger.getAndIncrement());//-->15
29     }
30     
31     public static void getAndDecrement(){
32         System.out.println(atomicInteger.getAndDecrement());//-->16
33     }
34     
35     public static void getAndAdd(){
36         System.out.println(atomicInteger.getAndAdd(10));//-->15
37     }
38     
39     public static void incrementAndGet(){
40         System.out.println(atomicInteger.incrementAndGet());//-->26
41     }
42     
43     public static void decrementAndGet(){
44         System.out.println(atomicInteger.decrementAndGet());//-->25
45     }
46     
47     public static void addAndGet(){
48         System.out.println(atomicInteger.addAndGet(20));//-->45
49     }
50     
51     public static void main(String[] args) {
52         AtomicTest test = new AtomicTest();
53         test.getCurrentValue();
54         test.setValue();
55         //返回旧值系列
56         test.getAndSet();
57         test.getAndIncrement();
58         test.getAndDecrement();
59         test.getAndAdd();
60         //返回新值系列
61         test.incrementAndGet();
62         test.decrementAndGet();
63         test.addAndGet();
64         
65     }
66 }
AtomicInteger类的源代码
  1 private volatile int value;// 初始化值
  2 
  3     /**
  4      * 创建一个AtomicInteger,初始值value为initialValue
  5      */
  6     public AtomicInteger(int initialValue) {
  7         value = initialValue;
  8     }
  9 
 10     /**
 11      * 创建一个AtomicInteger,初始值value为0
 12      */
 13     public AtomicInteger() {
 14     }
 15 
 16     /**
 17      * 返回value
 18      */
 19     public final int get() {
 20         return value;
 21     }
 22 
 23     /**
 24      * 为value设值(基于value),而其他操作是基于旧值<--get()
 25      */
 26     public final void set(int newValue) {
 27         value = newValue;
 28     }
 29 
 30     public final boolean compareAndSet(int expect, int update) {
 31         return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
 32     }
 33     
 34     /**
 35      * 基于CAS为旧值设定新值,采用无限循环,直到设置成功为止
 36      * 
 37      * @return 返回旧值
 38      */
 39     public final int getAndSet(int newValue) {
 40         for (;;) {
 41             int current = get();// 获取当前值(旧值)
 42             if (compareAndSet(current, newValue))// CAS新值替代旧值
 43                 return current;// 返回旧值
 44         }
 45     }
 46 
 47     /**
 48      * 当前值+1,采用无限循环,直到+1成功为止
 49      * @return the previous value 返回旧值
 50      */
 51     public final int getAndIncrement() {
 52         for (;;) {
 53             int current = get();//获取当前值
 54             int next = current + 1;//当前值+1
 55             if (compareAndSet(current, next))//基于CAS赋值
 56                 return current;
 57         }
 58     }
 59 
 60     /**
 61      * 当前值-1,采用无限循环,直到-1成功为止 
 62      * @return the previous value 返回旧值
 63      */
 64     public final int getAndDecrement() {
 65         for (;;) {
 66             int current = get();
 67             int next = current - 1;
 68             if (compareAndSet(current, next))
 69                 return current;
 70         }
 71     }
 72 
 73     /**
 74      * 当前值+delta,采用无限循环,直到+delta成功为止 
 75      * @return the previous value  返回旧值
 76      */
 77     public final int getAndAdd(int delta) {
 78         for (;;) {
 79             int current = get();
 80             int next = current + delta;
 81             if (compareAndSet(current, next))
 82                 return current;
 83         }
 84     }
 85 
 86     /**
 87      * 当前值+1, 采用无限循环,直到+1成功为止
 88      * @return the updated value 返回新值
 89      */
 90     public final int incrementAndGet() {
 91         for (;;) {
 92             int current = get();
 93             int next = current + 1;
 94             if (compareAndSet(current, next))
 95                 return next;//返回新值
 96         }
 97     }
 98 
 99     /**
100      * 当前值-1, 采用无限循环,直到-1成功为止 
101      * @return the updated value 返回新值
102      */
103     public final int decrementAndGet() {
104         for (;;) {
105             int current = get();
106             int next = current - 1;
107             if (compareAndSet(current, next))
108                 return next;//返回新值
109         }
110     }
111 
112     /**
113      * 当前值+delta,采用无限循环,直到+delta成功为止  
114      * @return the updated value 返回新值
115      */
116     public final int addAndGet(int delta) {
117         for (;;) {
118             int current = get();
119             int next = current + delta;
120             if (compareAndSet(current, next))
121                 return next;//返回新值
122         }
123     }
124 
125     /**
126      * 获取当前值
127      */
128     public int intValue() {
129         return get();
130     }

注意:

  • value是volatile的,关于volatile的相关内容见:http://www.cnblogs.com/java-zhao/p/5125698.html
  • 单步操作:例如set()是直接对value进行操作的,不需要CAS,因为单步操作就是原子操作。
  • 多步操作:例如getAndSet(int newValue)是两步操作–>先获取值,在设置值,所以需要原子化,这里采用CAS实现。
  • 对于方法是返回旧值还是新值,直接看方法是以get开头(返回旧值)还是get结尾(返回新值)就好
  • CAS:比较CPU内存上的值是不是当前值current,如果是就换成新值update,如果不是,说明获取值之后到设置值之前,该值已经被别人先一步设置过了,此时如果自己再设置值的话,需要在别人修改后的值的基础上去操作,否则就会覆盖别人的修改,所以这个时候会直接返回false,再进行无限循环,重新获取当前值,然后再基于CAS进行加减操作。
  • 如果还是不懂CAS,类比数据库的乐观锁
 1 // setup to use Unsafe.compareAndSwapInt for updates
 2     private static final Unsafe unsafe = Unsafe.getUnsafe();
 3     private static final long valueOffset;
 4 
 5     static {
 6         try {
 7             valueOffset = unsafe.objectFieldOffset
 8                 (AtomicInteger.class.getDeclaredField("value"));
 9         } catch (Exception ex) { throw new Error(ex); }
10     }
11 
12     private volatile int value;

这是AtomicInteger的所有属性,其中value存的是当前值,而当前值存放的内存地址可以通过valueOffset来确定。实际上是“value字段相对Java对象的起始地址的偏移量”

1 public final boolean compareAndSet(int expect, int update) {
2          return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
3      }

CAS方法:通过对比“valueOffset上的value”与expect是否相同,来决定是否修改value值为update值。

/*************************************************************************************************************************************************************/

lock包

/*************************************************************************************************************************************************************/

 

 

 

 

/*************************************************************************************************************************************************************/

concurrent源生包

/*************************************************************************************************************************************************************/

BlockingQueue 

BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。下面的源码以ArrayBlockingQueue和LinkedBlockingDeque为例。

1.ArrayBlockingQueue

BlockingQueue内部有一个ReentrantLock,其生成了两个Condition,在ArrayBlockingQueue的属性声明中可以看见:

 1 /** Main lock guarding all access */
 2 final ReentrantLock lock;
 3 /** Condition for waiting takes */
 4 private final Condition notEmpty;
 5 /** Condition for waiting puts */
 6 private final Condition notFull;
 7 
 8 ...
 9 
10 public ArrayBlockingQueue(int capacity, boolean fair) {
11     if (capacity <= 0)
12         throw new IllegalArgumentException();
13     this.items = new Object[capacity];
14     lock = new ReentrantLock(fair);
15     notEmpty = lock.newCondition();
16     notFull =  lock.newCondition();
17 }

而如果能把notEmpty、notFull、put线程、take线程拟人的话,那么我想put与take操作可能会是下面这种流程:

put(e) 
《java 并发(concurrent)包源码分析》

take() 
《java 并发(concurrent)包源码分析》

其中ArrayBlockingQueue.put(E e)源码如下(其中中文注释为自定义注释,下同):

 1 /**
 2  * Inserts the specified element at the tail of this queue, waiting
 3  * for space to become available if the queue is full.
 4  *
 5  * @throws InterruptedException {@inheritDoc}
 6  * @throws NullPointerException {@inheritDoc}
 7  */
 8 public void put(E e) throws InterruptedException {
 9     checkNotNull(e);
10     final ReentrantLock lock = this.lock;
11     lock.lockInterruptibly();
12     try {
13         while (count == items.length)
14             notFull.await(); // 如果队列已满,则等待
15         insert(e);
16     } finally {
17         lock.unlock();
18     }
19 }
20 
21 /**
22  * Inserts element at current put position, advances, and signals.
23  * Call only when holding lock.
24  */
25 private void insert(E x) {
26     items[putIndex] = x;
27     putIndex = inc(putIndex);
28     ++count;
29     notEmpty.signal(); // 有新的元素被插入,通知等待中的取走元素线程
30 }

ArrayBlockingQueue.take()源码如下:

 1 public E take() throws InterruptedException {
 2     final ReentrantLock lock = this.lock;
 3     lock.lockInterruptibly();
 4     try {
 5         while (count == 0)
 6             notEmpty.await(); // 如果队列为空,则等待
 7         return extract();
 8     } finally {
 9         lock.unlock();
10     }
11 }
12 
13 /**
14  * Extracts element at current take position, advances, and signals.
15  * Call only when holding lock.
16  */
17 private E extract() {
18     final Object[] items = this.items;
19     E x = this.<E>cast(items[takeIndex]);
20     items[takeIndex] = null;
21     takeIndex = inc(takeIndex);
22     --count;
23     notFull.signal(); // 有新的元素被取走,通知等待中的插入元素线程
24     return x;
25 }

可以看见,put(E)与take()是同步的,在put操作中,当队列满了,会阻塞put操作,直到队列中有空闲的位置。而在take操作中,当队列为空时,会阻塞take操作,直到队列中有新的元素。

而这里使用两个Condition,则可以避免调用signal()时,会唤醒相同的put或take操作。

以上。

    原文作者:java源码分析
    原文地址: http://www.cnblogs.com/nsxqf/p/7076857.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞