java多线程之JUC
1. 简介
在 Java 5.0 提供了 java.util.concurrent(简称JUC)包,在此包中增加了在并发编程中很常用的工具类,
用于定义类似于线程的自定义子系统,包括线程池,异步 IO 和轻量级任务框架;还提供了设计用于多线程上下文中 的 Collection
实现等;
2. volatile 关键字
2-1问题重现
package com.main.juc;
/**使用 volatile 之前
* 执行下列代码 会发现控制台打印出了flag=true,程序没有停止。
*/
public class TestVolatile {
public static void main(String[] args) {
ThreadDemo td = new ThreadDemo();
new Thread(td).start();
while (true) {
if (td.isFlag()) {
System.out.println("########");
break;
}
}
}
}
class ThreadDemo implements Runnable {
private boolean flag = false;
public void run() {
try {
// 该线程 sleep(200), 导致了程序无法执行成功
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
flag = true;
System.out.println("flag=" + isFlag());
}
public boolean isFlag() {
System.out.println("q");
return flag;
}
public void setFlag(boolean flag) {
this.flag = flag;
}
}
2-2原因分析:
出现了可见性错误。main线程读的一直是缓存。
内存可见性(Memory Visibility)是指当某个线程正在使用对象状态 而另一个线程在同时修改该状态,需要确保当一个线程修改了对象状态后,其他线程能够看到发生的状态变化。
可见性错误是指当读操作与写操作在不同的线程中执行时,我们无 法确保执行读操作的线程能适时地看到其他线程写入的值,有时甚至是根本不可能的事情。
我们可以通过同步来保证对象被安全地发布。除此之外我们也可以 使用一种更加轻量级的 volatile 变量
2-3解决方法:
// 解决问题方式一: 同步锁
// 但是,效率太低
public class TestVolatile{
public static void main(String[] args){
ThreadDemo td = new ThreadDemo();
new Thread(td).start();
while(true){
// 使用同步锁
synchronized(td){
if(td.isFlag()){
System.out.println("########");
break;
}
}
}
}
}
// 解决方式二: 使用 volatile 关键字
public class TestVolatile{
public static void main(String[] args){
ThreadDemo td = new ThreadDemo();
new Thread(td).start();
while(true){
if(td.isFlag()){
System.out.println("########");
break;
}
}
}
}
class ThreadDemo implements Runnable{
private volatile boolean flag = false;
同上(略)
}
Java 提供了一种稍弱的同步机制,即 volatile 变 量,用来确保将变量的更新操作通知到其他线程。 可以将 volatile
看做一个轻量级的锁,但是又与 锁有些不同:
对于多线程,不是一种互斥关系(不具备互斥性)
不能保证变量状态的“原子性操作”(不具备原子性)
3.i++ 的原子性问题
int i = 10;
i = i++; // 此时, i=10
执行步骤:
int temp = i;
i = i + 1;
i = temp;
// 测试类
public class TestAtomicDemo{
public static void main(String[] args){
AtomicDemo ad = new AtomicDemo();
for(int i=0; i < 10; i++){
new Thread(ad).start();
}
}
}
class AtomicDemo implements Runnable{
private int serialNumber = 0;
public void run(){
try{
Thread.sleep(200);
}catch(InterruptedException e){
}
System.out.println(Thread.currentThread().getName() + ":" + getSerialNumber());
}
public int getSerialNumber(){
return serialNumber++;
}
}
i++的操作实际上分为三个步骤: “读-改-写”;
原子性: 就是”i++”的”读-改-写”是不可分割的三个步骤;
原子变量: JDK1.5以后, java.util.concurrent.atomic包下,提供了常用的原子变量;原子变量中的值,使用 volatile 修饰,保证了内存可见性; CAS(Compare-And-Swap) 算法保证数据的原子性;
// 改进: 使用原子变量
class AtomicDemo implements Runnable{
private AtomicInteger serialNumber = new AtomicInteger();
public void run(){
try{
Thread.sleep(200);
}catch(InterruptedException e){
}
System.out.println(Thread.currentThread().getName()+":"+getSerialNumber());
}
public int getSerialNumber(){
// 自增运算
return serialNumber.getAndIncrement();
}
}
原子变量
类的小工具包,支持在单个变量上解除锁的线程安全编程。事实上,此包中的类可
将 volatile 值、字段和数组元素的概念扩展到那些也提供原子条件更新操作的类。
类 AtomicBoolean、 AtomicInteger、 AtomicLong 和 AtomicReference 的实例各自提供对
相应类型单个变量的访问和更新。每个类也为该类型提供适当的实用工具方法。
AtomicIntegerArray、 AtomicLongArray 和 AtomicReferenceArray 类进一步扩展了原子操
作,对这些类型的数组提供了支持。这些类在为其数组元素提供 volatile 访问语义方
面也引人注目,这对于普通数组来说是不受支持的。
核心方法: boolean compareAndSet(expectedValue, updateValue)
java.util.concurrent.atomic 包下提供了一些原子操作的常用类:
AtomicBoolean 、 AtomicInteger 、 AtomicLong 、 AtomicReference
AtomicIntegerArray 、 AtomicLongArray
AtomicMarkableReference
AtomicReferenceArray
AtomicStampedReference
CAS算法
CAS (Compare-And-Swap) 是一种硬件对并发的支持,针对多处理器
操作而设计的处理器中的一种特殊指令,用于管理对共享数据的并
发访问。
CAS 是一种无锁的非阻塞算法的实现。
CAS 包含了 3 个操作数:
需要读写的内存值 V
进行比较的值 A
拟写入的新值 B
当且仅当 V 的值等于 A 时, CAS 通过原子方式用新值 B 来更新 V 的
值,否则不会执行任何操作
粗略地模拟CAS算法:
// 模拟CAS 算法
class CompareAndSwap{
private int value;
// 获取内存值
public synchronized int get(){
return value;
}
/**
// 无论更新成功与否,都返回修改之前的内存值
//expectedValue 预估值
//newValue 新值
*/
public synchronized int compareAndSwap(int expectedValue, int newValue){
// 获取旧值
int oldValue = value;
if(oldValue == expectedValue){
this.value = newValue;
}
// 返回修改之前的值
return oldValue;
}
// 判断是否设置成功
public synchronized boolean compareAndSet(int expectedValue, int newValue){
return expectedValue == compareAndSwap(expectedValue, newValue);
}
}
public class TestCompareAndSwap{
public static void main(String[] args){
final CopareAndSwap cas = new CompareAndSwap();
for(int i=0; i<10; i++){
// 创建10个线程,模拟多线程环境
new Thead(new Runnable(){
public void run(){
int expectedValue = cas.get();
boolean b = cas.compareAndSet(expectedValue, (int)(Math.random()*100));
System.out.println(b);
}
}).start();
}
}
}
4.同步容器类
ConcurrentHashMap及“锁分段”机制
Java 5.0 在 java.util.concurrent 包中提供了多种并发容器类来改进同步容器
的性能。
ConcurrentHashMap 同步容器类是Java 5 增加的一个线程安全的哈希表。对
与多线程的操作,介于 HashMap 与 Hashtable 之间。内部采用“锁分段”
机制替代 Hashtable 的独占锁。进而提高性能。
此包还提供了设计用于多线程上下文中的 Collection 实现:
ConcurrentHashMap、 ConcurrentSkipListMap、 ConcurrentSkipListSet、
CopyOnWriteArrayList 和 CopyOnWriteArraySet。当期望许多线程访问一个给
定 collection 时, ConcurrentHashMap 通常优于同步的 HashMap,
ConcurrentSkipListMap 通常优于同步的 TreeMap。当期望的读数和遍历远远
大于列表的更新数时, CopyOnWriteArrayList 优于同步的
CountDownLatch(闭锁)
Java 5.0 在 java.util.concurrent 包中提供了多种并发容器类来改进同步容器
的性能。
CountDownLatch 一个同步辅助类,在完成一组正在其他线程中执行的操作
之前,它允许一个或多个线程一直等待。
闭锁可以延迟线程的进度直到其到达终止状态,闭锁可以用来确保某些活
动直到其他活动都完成才继续执行:
确保某个计算在其需要的所有资源都被初始化之后才继续执行;
确保某个服务在其依赖的所有其他服务都已经启动之后才启动;
等待直到某个操作所有参与者都准备就绪再继续执行。
// 测试类: 计算多线程的执行时间
public class TestCountDownLatch{
public static void main(String[] args){
final CountDownLatch latch = new CountDownLatch(10);
LatchDemo ld = new LatchDemo(latch);
long start = System.currentTimeMillis();
// 创建10个线程
for(int i=0; i<10; i++){
new Thread(ld).start();
}
try{
latch.await();
}catch(InterruptedException e){
}
long end = System.currentTimeMillis();
System.out.println("耗费时间为:"+(end - start));
}
}
class LatchDemo implements Runnable{
private CountDownLatch latch;
// 有参构造器
public LatchDemo(CountDownLatch latch){
this.latch = latch;
}
public void run(){
synchronized(this){
try{
// 打印50000以内的偶数
for(int i=0; i<50000; i++){
if(i % 2 == 0){
System.out.println(i);
}
}
}finally{
// 线程数量递减
latch.countDown();
}
}
}
}
5.实现 Callable 接口
这是创建一个线程的第三种方法(前两种分别为继承Thread和实现Runnable)
这是Java 5.0 在 java.util.concurrent 提供的一个新的创建执行 线程的方式: Callable 接口
Callable 接口类似于 Runnable,两者都是为那些其实例可 能被另一个线程执行的类设计的。但是 Runnable 不会返 回结果,并且无法抛出经过检查的异常。Callable可以抛出异常,返回结果。
Callable 需要依赖FutureTask , FutureTask 也可以用作闭 锁。
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class TestCallable {
public static void main(String[] args) {
ThreadDemo2 td = new ThreadDemo2();
// 执行Callable 方式 需要FutureTask实现类的支持 ,用于接收运算结果 。
// FutureTask是Future接口的实现类
FutureTask<Integer> result = new FutureTask<>(td);
new Thread(result).start();
System.out.println("------------------------------");
//接收线程运算后的结果
try {
Integer sum =result.get();//FutureTask 可用于闭锁
System.out.println(sum);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
class ThreadDemo2 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i <= 100; i++) {
System.out.println(i);
sum += i;
}
return sum;
}
}
6.Lock 同步锁
显示锁 Lock
在 Java 5.0 之前,协调共享对象的访问时可以使用的机 制只有 synchronized 和 volatile 。 Java 5.0
后增加了一些 新的机制,但并不是一种替代内置锁的方法,而是当内 置锁不适用时,作为一种可选择的高级功能。
ReentrantLock 实现了 Lock 接口,并提供了与 synchronized 相同的互斥性和内存可见性。但相较于
synchronized 提供了更高的处理锁的灵活性。
// 测试类: 以卖票为例
// 使用 lock 之前,存在线程安全问题
public class TestLock{
public static void main(String[] args){
Ticket ticket = new Ticket();
new Thread(ticket,"1号窗口").start();
new Thread(ticket,"2号窗口").start();
new Thread(ticket,"3号窗口").start();
}
}
class Ticket implements Runnable{
private int tick = 100;
public void run(){
while(true){
if(tick > 0){
try{
Thread.sleep(200);
}catch(InterruptedException e){
}
System.out.println(Thread.currentThread().getName()+"完成售票,余票为: "+ --tick);
}
}
}
}
// 使用 Lock
class Ticket implements Runnable{
private int tick = 100;
private Lock lock = new ReentrantLock();
public void run(){
while(true){
lock.lock(); // 上锁
try{
if(tick > 0){
try{
Thread.sleep(200);
}catch(InterruptedException e){
}
System.out.println(Thread.currentThread().getName()+"完成售票,余票为: "+ --tick);
}
}finally{
lock.unlock(); // 释放锁
}
}
}
}
Condition 控制线程通信Condition
Condition 接口描述了可能会与锁有关联的条件变量。这些变量在用 法上与使用 Object.wait
访问的隐式监视器类似,但提供了更强大的 功能。需要特别指出的是,单个 Lock 可能与多个 Condition 对象关 联。为了避免兼容性问题, Condition 方法的名称与对应的 Object 版 本中的不同。
在 Condition 对象中,与 wait、 notify 和 notifyAll 方法对应的分别是 await、 signal 和 signalAll。
Condition 实例实质上被绑定到一个锁上。要为特定 Lock 实例获得 Condition 实例,请使用其
newCondition() 方法
class Resource
{
private String name;
private int count = 1; // 记录烤鸭的编号
private boolean flag = false;
// 创建一个锁对象
Lock lock = new ReentrantLock();
// 通过已有的锁获取两组监视器, 一组监视生产者, 一组监视消费者
Condition producer_con = lock.newCondition();
Condition consumer_con = lock.newCondition();
public void set(String name)
{
lock.lock(); //获取锁
try
{
while(flag)
try{producer_con.wait();}catch(InterruptedException e){}
this.name = name + count;
count++;
System.out.println(Thread.currentThread().getName()+"..生产者.."+this.name);
flag = true;
consumer_con.signal();
}
finally
{
lock.unlock(); //释放锁
}
}
public void out()
{
lock.lock(); //获取锁
try
{
while(!flag)
try{consumer_con.wait();}catch(InterruptedException e){}
Sytem.out.println(Thread.currentThread().getName()+ "...消费者.."+ this.name);
flag = false;
producer_con.signal();
}
finally
{
lock.unlock(); //释放锁
}
}
}
class Producer implements Runnable
{
Resource r;
Producer(Resource r)
{
this.r = r;
}
public void run()
{
while(true)
{
r.set("烤鸭");
}
}
}
class Consumer implements Runnable
{
Resource r;
Consumer(Resource r)
{
this.r = r;
}
public void run()
{
while(true)
{
r.out();
}
}
}
class ProducerConsumerDemo
{
public static void main(String[] args)
{
// 创建资源
Resource r = new Resource();
// 创建任务
Producer pro = new Producer(r);
Consumer con = new Consumer(r);
// 多生产者
Thread t0 = new Thread(pro);
Thread t1 = new Thread(pro);
// 多消费者
Thread t2 = new Thread(con);
Thread t3 = new Thread(con);
Thread t4 = new Thread(con);
Thread t5 = new Thread(con);
// 开启线程
t0.start();
t1.start();
t2.start();
t3.start();
t4.start();
t5.start();
}
}
// 练习: 程序按序交替
// 编写一个程序,开启3个线程,这三个线程的 ID 分别为 A, B, C, 每个线程将自己的 ID 在屏幕上打印10遍,
// 要求输出的结果必须按顺序显示:
// 如: ABCABCABC... 依次递归
public class TestABCAlternate{
public static void main(String[] args){
AlternateDemo ad = new AlternateDemo();
new Thread(new Runnable(){
public void run(){
for(int i=1; i<20; i++){
ad.loopA(i);
}
}
},"A").start();
new Thread(new Runnable(){
public void run(){
for(int i=1; i<20; i++){
ad.loopB(i);
}
}
},"B").start();
new Thread(new Runnable(){
public void run(){
for(int i=1; i<20; i++){
ad.loopC(i);
System.out.println("--------------------");
}
}
},"C").start();
}
}
class AlternateDemo{
private int number = 1; // 当前正在执行线程的标记
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
// totalLoop 表示循环第几轮
// 线程A
public void loopA(int totalLoop){
// 上锁
lock.lock();
try{
// 1. 判断
if(number != 1){
condition1.await();
}
// 2. 打印
for(int i=1; i <= 5; i++){
System.out.println(Thread.currentThread().getName()+"\t"+i+"\t"+totalLoop);
}
// 3. 唤醒线程B
number = 2;
condition2.signal();
}catch(Exception e){
e.printStackTrace();
}finally{
// 释放锁
lock.unlock();
}
}
// 线程B
public void loopB(int totalLoop){
// 上锁
lock.lock();
try{
// 1. 判断
if(number != 2){
condition2.await();
}
// 2. 打印
for(int i=1; i <= 15; i++){
System.out.println(Thread.currentThread().getName()+"\t"+i+"\t"+totalLoop);
}
// 3. 唤醒线程C
number = 3;
condition3.signal();
}catch(Exception e){
e.printStackTrace();
}finally{
// 释放锁
lock.unlock();
}
}
// 线程C
public void loopC(int totalLoop){
// 上锁
lock.lock();
try{
// 1. 判断
if(number != 3){
condition3.await();
}
// 2. 打印
for(int i=1; i <= 20; i++){
System.out.println(Thread.currentThread().getName()+"\t"+i+"\t"+totalLoop);
}
// 3. 唤醒线程A
number = 1;
condition1.signal();
}catch(Exception e){
e.printStackTrace();
}finally{
// 释放锁
lock.unlock();
}
}
}
7.ReadWriteLock 读写锁
ReadWriteLock 维护了一对相关的锁,一个用于只读操作, 另一个用于写入操作。只要没有 writer,读取锁可以由 多个 reader 线程同时保持。写入锁是独占的。
ReadWriteLock 读取操作通常不会改变共享资源,但执行 写入操作时,必须独占方式来获取锁。对于读取操作占 多数的数据结构。ReadWriteLock 能提供比独占锁更高 的并发性。而对于只读的数据结构,其中包含的不变性 可以完全不需要考虑加锁操作。
//读与写、 写与写需要互斥
//读与读则不需要互斥
//所以我们用ReadWriteLock
package com.main.juc;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TestReadWriteLock {
public static void main(String[] args){
ReadWriteLockDemo rw = new ReadWriteLockDemo();
// 一个线程进行写
new Thread(new Runnable(){
public void run(){
rw.set((int)(Math.random()*100));
}
},"Write:").start();
// 100个线程进行读操作
for(int i=0; i<100; i++){
new Thread(new Runnable(){
public void run(){
rw.get();
}
},"Read:").start();
}
}
}
class ReadWriteLockDemo {
private int number = 0;
private ReadWriteLock lock = new ReentrantReadWriteLock();
// 读
public void get() {
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " : " + number);
} finally {
lock.readLock().unlock();
}
}
// 写
public void set(int number) {
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName());
this.number = number;
} finally {
lock.writeLock().unlock();
}
}
}
线程八锁(重要)
// 测试类
public class Test{
public static void main(String[] args){
Demo demo = new Demo();
Demo demo2 = new Demo();
new Thread(new Runnable(){
public void run(){
demo.getOne();
}
}).start();
new Thread(new Runnable(){
public void run(){
// demo2.getTwo();
demo.getTwo();
}
}).start();
}
}
class Demo{
public synchronized void getOne(){
try{
Thread.sleep(3000);
}catch(InterruptedException e){
}
System.out.println("one");
}
public synchronized void getTwo(){
System.out.println("two");
}
}
/*
* 1. 两个普通同步方法,两个线程,标准打印, 打印输出: one two
* 2. 新增 Thread.sleep() 给 getOne(), 打印输出: one two
* 3. 新增普通方法 getThree(), 打印输出: three one two
* 4. 两个普通同步方法,两个Demo对象, 两个线程,打印输出: two one
* 5. 修改 getOne() 为静态同步方法, 一个Demo对象, 打印输出: two one
* 6. 修改两个方法都为静态同步方法, 一个 Demo 对象, 打印输出: one two
* 7. 修改 getone() 为静态同步方法, 两个 Demo 对象, 打印输出: two one
* 8. 两个均为静态同步方法,两个 Demo 对象,打印输出: one two
*/
// 总结:
// 1. 非静态方法的锁默认为 this, 静态方法的锁为 "对应的Class实例";
// 2. 在某一个时刻内,同一把锁只能被一个线程持有,无论几个方法;
一个对象里面如果有多个synchronized方法,某一个时刻内,只要一个线程去调用
其中的一个synchronized方法了,其它的线程都只能等待,换句话说,某一个时刻
内,只能有唯一一个线程去访问这些synchronized方法
锁的是当前对象this,被锁定后,其它的线程都不能进入到当前对象的其它的 synchronized方法
加个普通方法后发现和同步锁无关
换成两个对象后,不是同一把锁了,情况立刻变化。
都换成静态同步方法后,情况又变化
所有的非静态同步方法用的都是同一把锁——实例对象本身,也就是说如果一个实
例对象的非静态同步方法获取锁后,该实例对象的其他非静态同步方法必须等待获
取锁的方法释放锁后才能获取锁,可是别的实例对象的非静态同步方法因为跟该实
例对象的非静态同步方法用的是不同的锁,所以毋须等待该实例对象已获取锁的非 静态同步方法释放锁就可以获取他们自己的锁。
所有的静态同步方法用的也是同一把锁——类对象本身,这两把锁是两个不同的对
象,所以静态同步方法与非静态同步方法之间是不会有竞态条件的。但是一旦一个
静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取
锁,而不管是同一个实例对象的静态同步方法之间,还是不同的实例对象的静态同 步方法之间,只要它们同一个类的实例对象!
8.线程池—-第四种获取线程的方法
第四种获取线程的方法:线程池,一个 ExecutorService,它使用可能的几个池线程之 一执行每个提交的任务,通常使用Executors 工厂方法配置。
线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在
执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行 任务集时使用的线程)的方法。每个 ThreadPoolExecutor 还维护着一些基本的统计数 据,如完成的任务数。
- 线程池提供了一个线程队列,队列中保存着所有等待状态的线程;
- 避免了创建与销毁线程的额外开销,提高了响应速度;
为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子 (hook)。但 是,强烈建议程序员使用较为方便的 Executors
工厂方法 :Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收,可以根据需求自动更改数量) Executors.newFixedThreadPool(int)(固定大小线程池) Executors.newSingleThreadExecutor()(单个后台线程,线程池中只有一个线程) ScheduledExecutorService .newScheduledThreadPool 创建固定大小的线程池,可以延迟或者定时执行任务
它们均为大多数使用场景预定义了设置。
- 线程池的体系结构
- java.util.concurrent.Executor: 负责线程的使用和调度的根接口;
- ExecutorService: 子接口,线程池的主要接口;
- ThreadPoolExecutor: 线程池的实现类;
- ScheduledExecutorService: 子接口,负责线程的调度;
- ScheduledThreadPoolExecutor: 继承了线程池的实现类(ThreadPoolExecutor),实现了负责线程调度的子接口(ScheduledExecutorService);
package com.main.juc;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestThreadPool{
public static void main(String[] args){
// 1. 创建线程池
ExecutorService pool = Executors.newFixedThreadPool(5);
ThreadPoolDemo tpd = new ThreadPoolDemo();
// 2. 为线程池中线程分配任务
// submit(Callable<T> task)
// submit(Runnable task)
for(int i=0; i<10; i++){
pool.submit(tpd);
}
// 3. 关闭线程池
pool.shutdown();
}
}
class ThreadPoolDemo implements Runnable{
private int i=0;
public void run(){
while(i <= 100){
System.out.println(Thread.currentThread().getName()+" : "+ i++);
}
}
}
线程调度
package com.main.juc;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestScheduledThreadPool {
public static void main(String[] args) throws Exception {
ScheduledExecutorService pool= Executors.newScheduledThreadPool(5);
for (int i=0;i<5;i++) {
Future<Integer> future = pool.schedule(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int number = new Random().nextInt(100);//获取随机数
System.out.println(Thread.currentThread().getName()+" : "+number);
return number;
}
}, 3, TimeUnit.SECONDS);//延迟三秒,执行
System.out.println(future.get());
}
pool.shutdown();//关闭线程池
}
}
9.Fork/Join框架
采用 “工作窃取”模式(work-stealing):
当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加 到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队 列中。
相对于一般的线程池实现, fork/join框架的优势体现在对其中包含的任务 的处理方式上.在一般的线程池中,如果一个线程正在执行的任务由于某些 原因无法继续运行, 那么该线程会处于等待状态。 而在fork/join框架实现中, 如果某个子问题由于等待另外一个子问题的完成而无法继续运行。 那么处理 该子问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了 线程的等待时间, 提高了性能。
//以一个从1加到五百亿的计算作为例子
//由于例子较为简单,可能运算节省时间上没有优势。这里仅作为例子讲解
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;
import org.junit.Test;
public class TestForkJoinPool {
//Fork/join的方式
@Test
public void TestByForkJoinPool(){
Instant start = Instant.now();//开始时间
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(0L, 50000000000L);//0加到5百亿
Long sum =pool.invoke(task);
System.out.println(sum);
Instant end =Instant.now();//结束时间
System.out.println("耗费时间:"+Duration.between(start, end).toMillis());//
}
//传统的方法
@Test
public void testSum(){
Instant start = Instant.now();//开始时间
long sum=0L;
for(long i=0L;i<=50000000000L;i++){
sum+=i;
}
System.out.println(sum);
Instant end =Instant.now();//结束时间
System.out.println("耗费时间:"+Duration.between(start, end).toMillis());//
}
//java8新特性
@Test
public void testSum2(){
Instant start = Instant.now();//开始时间
long sum=LongStream.rangeClosed(0L, 50000000000L)
.parallel()
.reduce(0L, Long::sum);
System.out.println(sum);
Instant end =Instant.now();//结束时间
System.out.println("耗费时间:"+Duration.between(start, end).toMillis());//
}
}
/**
* 计算 从 一个数加到另一个数
*/
class ForkJoinSumCalculator extends RecursiveTask<Long> {
private static final long serialVersionUID = 5835377731077414709L;
private long start;//求和的起点
private long end;//求和的终点
private static final long THURSHOLD = 10000L;// 临界值
public ForkJoinSumCalculator(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long length = end - start;
if (length <= THURSHOLD) {
long sum = 0L;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
long middle = (start + end) / 2;
ForkJoinSumCalculator left = new ForkJoinSumCalculator(start, middle);
left.fork();// 进行拆分,同时压入线程队列
ForkJoinSumCalculator right = new ForkJoinSumCalculator(middle, end);
right.fork();// 进行拆分,同时压入线程队列
return left.join() + right.join();
}
}
}
转自https://www.cnblogs.com/linkworld/p/7819270.html