接上一篇《java并发编程(二)对象的共享》
客户端加锁:
java为线程安全提供了一套安全的集合类操作 在java.util.Collections中,在多线程下能 安全的CRUD,但是应该注意,仅是对几何类进行同步,下面看一个实例。
若没有则添加实例:
public class ListHelper<E> {
public List<E> list = Collections.synchronizedList(new ArrayList<E>());
public synchronized boolean putIfAbsent(E x) {
boolean absent = !list.contains(x); //①
if (absent)
list.add(x);
return absent;
}
public static void main(String[] args) {
ListHelper<String> helper = new ListHelper<String>();
new Thread(new Runnable() {
@Override
public void run() {
helper.list.add("a"); //②
}
}).start();
helper.putIfAbsent("a");
}
}
我们在
①
②
处下断点调试后发现断点1处拿到的锁 是ListHelper 而断点2的锁是 由java虚拟机维护的,不是同一个锁,导致添加和判断不能同步,而在java 的API中也给出例子,要锁list例子
Returns a synchronized (thread-safe) list backed by the specified list. In order to guarantee serial access, it is critical that all access to the backing list is accomplished through the returned list.
It is imperative that the user manually synchronize on the returned list when iterating over it:
List list = Collections.synchronizedList(new ArrayList());
…
synchronized (list) {
Iterator i = list.iterator(); // Must be in synchronized block
while (i.hasNext())
foo(i.next());
}
一定要明确要保证谁是锁,不然加锁知识看似线程安全。
组合:
当为现在的类添加一个原子操作时,有一个种更好的方法:组合。我们不关心源是不是线程安全的,通过继承活着实现,给类添加同步实现线程安全(最好时实现借口,继承需要重写所有方法,并加同步)
public class ArrayListHelper<E> extends ArrayList<E>{
@Override
public synchronized int size() {
// TODO Auto-generated method stub
return super.size();
}
<span style="white-space:pre"> </span>...
}
这只是一种方案,当然java提供了一套安全的集合类操作。
当然通过使用java 提供的安全集合类也有可能引发安全性或者异常错误
例1:(ArrayIndexOutOfException)
public static Object getLast(Vector list) {
int lastIndex = list.size();
return list.get(lastIndex);
}
public static void deleteLast(Vector list) {
int lastIndex = list.size()-1;
list.remove(lastIndex);
}
虽然 Vector 是线程安全的,但是getLast 和 deleteLast 可能引发异常,因为当 getLast 要执行 return 时,另一个线程执行 deleteLast 执行完成,就会导致 ArrayIndexOutOfBoundsException。所以在加锁的时候,一定要明确要通过谁来加锁。
修改方法:
public static Object getLast(Vector list) {
synchronized (list) {
int lastIndex = list.size();
return list.get(lastIndex);
}
}
public static void deleteLast(Vector list) {
synchronized (list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
}
例2:(ConcurrentModificationException)
非常常见的一个异常,当在遍历集合的时候,修改了集合 就会抛出异常,记得以前做一个项目就出现这个异常,当时只知道百度后用了java 提供的安全类,但是印象不深,最近专门研究并发,更深入的理解了这个异常以及处理办法。看错误的案例:
List<String> stringList = Collections.synchronizedList(new ArrayList<String>());
for (String string : stringList) {
dosomething(string);
}
在遍历的过程中,如果有其他线程对stringList进行修改,就会抛出异常,当然我们根据例1的教训很快就想到了解决方案。
synchronized (stringList) {
for (String string : stringList) {
dosomething(string);
}
}
当然 我们也可以通过在克隆stringList,遍历时拿到克隆的对象进行遍历(加锁),但是如果stringList 过大,会消耗性能。因为这个问题经常要解决,所以java又提供了一个CopyOnWriteArrayList来处理,具体在后面介绍。
例3:(ConcurrentModificationException)
为什么例3依旧是ConcurrentModificationException,因为这个例子很容易被忽略,而且不容易被发现。我们通过例子加深印象。
public class HiddenIterator {
private final Set<Integer> set = new HashSet<Integer>();
public synchronized void add(Integer i) {set.add(i);}
public synchronized void remove(Integer i) {set.remove(i);}
public void addTenThings() {
Random r = new Random();
for (int i = 0; i < 10; i++) {
add(r.nextInt());
System.out.println("DEBUG:added ten elements to"+set);
}
}
}
也许我们的项目运行一段时间是没问题的,但是这个为什么会抛出ConcurrentModificationException呢?原因在于
System.
out
.println(
“DEBUG:added ten elements to”
+
set
);这行,因为编译器将字符串的连接操作转换成StringBuilder.append(object),而这个方法又会调用容器的toString方法,标准容器的toString方法将迭代容器,并在每个元素上调用toString来生产内容的格式化表示。(这个才知道,赶紧补上~~)。查看String 源码 其中一段说明:
The Java language provides special support for the string concatenation operator ( + ), and for conversion of other objects to strings. String concatenation is implemented through the StringBuilder(or StringBuffer) class and its append method. String conversions are implemented through the method toString, defined by Object and inherited by all classes in Java. For additional information on string concatenation and conversion, see Gosling, Joy, and Steele, The Java Language Specification.
大概意思是:java提供了一个特殊的操作符(+)来支持字符串的连接和将一个其他对象转换成string类型。String的连接时通过实现StringBuilder(或StringBuffer) 类和他的append方法。String 的转换时通过toString实现的,在java 中,通过定义Object和继承所有类。连接和转换的额外的的信息,看
《java语言规范》。
当然容器的hashCode 和 equals 等方法也会间接的执行迭代操作,当容器作为另一个容器的元素或键值时就会出现这个问题。当然
containAll
removeAll
retrainAll的等方法,也会对容器进行迭代,需要加同步。
并发容器:
由于这些问题经常遇到,而且不易解决,java提供可一套并发的容器类型,来确保线程的安全性。同时增加了ConcurrentHasMap,用来替换同步并基于散列Map、CopyOnWriteArrayList用于遍历操作作为主要操作的情况下代替List。并增加常见的复合操作,比如“若没有则添加”。
java5.0 增加了两种新的容器类型:Queue和BlockingQueue。
Queue:用来临时保存一组等待处理的元素。它提供了几种实现:
1.ConcurrentLinkedQueue 传统的先进先出队列
2.PriorityQueue 非并发的有线队列。
Queue上的操作不会阻塞,如果队列为空,返回空值。
BlockingQueue:扩展了Queue,增加了可阻塞的插入和获取等操作。
java6.0引入了ConcurrentSkipListMap ConcurrentSkipListSet作为同步的SortedMap SortedSet的并发替代品。
ConcurrentHashMap
和HashMap一样,是一个基于散列的Map,它使用了一种完全不同的加锁策略来提供更高的并发性和伸缩性。它并不是将所有的方法都在同一个锁上,而是使用一种粒度更细的加锁机制来实现更大程度上的共享。任意数量的读取线程可以并发地访问Map,执行读取操作的线程和执行写入的线程可以并发地访问Map。
它不会抛出ConcurrentModificationException,因此迭代时可以不加锁,ConcurrentHashMap返回的迭代器具有弱一致性,弱一执行的迭代器可以容忍并发的修改,当创建构造器时,会遍历已有的元素。
CopyOnWriteArrayList
写入时复制容器时线程安全的,返回的 迭代器不会抛出异常,而且元素与迭代器创建时的元素完全一致,而不必考虑之后修改操作所带来的影响。
CopyOnWriteArrayList 在每次add的时候都会copy一下,然后加在尾部。看源码:
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
由于每次add都会复制底层数组,所有,CopyOnWriteArrayList多用于,少添加 多迭代的情况。多用在通知系统里面(观察者模式?),在分发通知时需要迭代已注册监听的列表。
BlockingQueue
类库中包含了多个实现:
LinkedBlockingQueue & ArrayBlockingQueue
都是FIFO队列,两者和LinkedList 、 ArrayList 区别类似。但LinkedBlockingQueue有更好的并发性,
PriorityBlockingQueue
一个按优先级排序的队列。
SynchronousQueue
它实际不是一个真正的队列,因为他不会为队列中元素维护存储空间,他维护一组线程,这些线程在等待着把元素加入或移出队列。类似于 一手交钱 一手交货的
public class SynchronousTest {
public static void main(String[] args) {
SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();
new Customer(queue).start();
new Pruduct(queue).start();
}
}
class Customer extends Thread {
private SynchronousQueue queue;
public Customer(SynchronousQueue queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
System.out.println("消费了一个产品:" + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("------------------------------------------");
}
}
}
class Pruduct extends Thread {
private SynchronousQueue queue;
public Pruduct(SynchronousQueue queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
int rand = new Random().nextInt(1000);
System.out.println("生产了一个产品:" + rand);
System.out.println("等待三秒后运送出去...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
queue.offer(rand);
}
}
}
生产了一个产品:435
等待三秒后运送出去...
生产了一个产品:35
等待三秒后运送出去...
消费了一个产品:435
------------------------------------------
消费了一个产品:35
------------------------------------------
生产了一个产品:851
等待三秒后运送出去...
生产了一个产品:890
等待三秒后运送出去...
消费了一个产品:851
------------------------------------------