Java并发Executor框架

1 Executor框架简介

从JDK5开始,工作单元和执行机制隔离开来,工作单元包括Runnable和Callable,执行机制由Executor提供。

调用关系:Java线程一对一映射到本地操作系统的系统线程,当多线程程序分解若干任务,使用用户级的调度器(Executor框架)将任务映射为固定数量的线程,底层,操作系统吧、内核将这些线程映射到硬件处理器上。

2.EXecutor结构成员

《Java并发Executor框架》

Executor是一个接口,它将任务的提交与任务的执行分离开来。

ThreadPoolExecutor是线程池的核心实现类,执行被提交的任务

ScheduledThreadPoolExecutor是一个实现类,在给定的延迟后运行或定期执行命令

Future接口和实现Future接口的FutureTask类,代表异步计算的结果

Runable接口和Callable接口的实现类,可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行

2.1 使用流程

(1)创建任务对象

创建实现Runnable接口或Callable接口的任务对象。Runnable不会返回结果,Callable可以返回结果

可以使用工厂类Executors把Runnable封装成一个Callable

public static Callable<Object> callable(Runnable task)

Executors.callable(Runnable task)或Executors.callable(Runnable task,Object result)

(2)对象提交执行

把Runnable对象直接交给ExecutorService执行(ExecutorService.execute(Runnable command); ExecutorService.submit(Runnable task);ExecutorService.submit(Callable<T> task))。execute()方法用于提交不需要返回值的任务,无法判断任务是否被线程池执行成功。submit()用于提交需要返回值的任务。

(3)返回值

如果执行ExecutorService.submit(…),将返回一个实现Future接口的对象FutureTask对象,主线程可以执行FutureTask.get()方法等待任务执行完成,也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消任务的执行。

2.2 框架成员

ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future接口、Runnable接口、Callable接口、Executors。

2.2.1 线程池介绍

public  ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

corePoolSize:线程池的基本大小,新任务到来时会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,当需要执行的任务数大于线程池基本大小则不创建。如果调用线程池的prestartAllCoreThread()方法,线程池会提前创建并启动所有基本线程。

maximumPoolSize:线程池最大数量,线程池允许创建的最大线程数,如果队列满了,并且已创建的线程数小于最大线程数则创建新线程,对于无界队列该参数无效。

KeepAliveTime:线程活动保持时间,工作线程空闲后保持存活的时间,如果任务多每个任务执行时间短可以调大时间,提高线程利用率

TimeUnit:线程活动保持时间单位,可选单位,DAYS,HOURS,MINUTES,MILLISECONDS,MICROSECONDS,NANOSECONDS

BlockingQueue<Runnable>:任务队列,等待执行任务的阻塞队列,ArrayBlockingQueue:基于数组结构的有界阻塞队列,按照FIFO排序,LinkedBlockingQueue:基于链表结构的阻塞队列,按FIFO排序,吞吐量高于前者,Executors.newFixedThreadPool()使用该队列。SysnchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量高于前者,Executors.newCachedThreadPool()使用该队列,PriorityBlockingQueue:一个具有优先级的无线阻塞队列。

ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个线程设置有意义的命名,可默认设置。

RejectedExecutionHandler:饱和策略,当队列和线程池满了,必须采取一种策略处理新提交的任务。默认AbortPolicy:直接抛出异常。CallerRunsPolicy:只用调用者所在线程来运行任务;DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务;DiscardPolicy:不处理,丢弃掉。

2.2.1.1 线程池处理流程

1)线程池判断核心线程池里的线程是否都在执行任务。否,创建一个新的线程执行任务,如果都在执行任务进入下一步;

2)线程池判断工作队列是否已经满了,没有则新提交的任务存储在工作队列里,满了进入下一步;

3)线程池判断线程池的线程是否处于工作状态。没有,创建新的工作线程来执行任务,满了交给饱和策略处理。

ThreadPoolExecutor执行execute()方法流程:

1)如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(该步需要获取全局锁)

2)如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue;

3)如果队列已满无法将任务加入队列,则创建新的线程来处理任务(该步骤获取全局锁)

4)如果创建新线程将使得当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandle.rejectedExecution()方法。

2.2.2 ThreadPoolExecutor

FixedThreadPool,创建使用固定线程数,为了满足资源管理的需求,而需要限制当前线程数量的应用场景,适用于负载比较重的服务器

public static ExecutorService newFixedThreadPool(int nThreads) {

  return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());

}

SingleThreadPool,创建使用单个线程,适用于保证顺序执行各个任务,并且在任意时间点都不会有多个线程活动的场景,可用于处理共享资源问题

public static ExecutorService newSingleThreadExecutor(){

  return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());

}

CachedThreadPool,创建会根据需要创建新线程,初始化一定数量的线程,当新任务到来时没有空闲线程则创建新线程,有空闲重用以前的线程

public static ExecutorService newCachedThreadPool(){

  return new ThreadPoolExecutor(0,Integer.Max_VALUE,60L,TimeUnit.SECONDS,new SysnchronousQueue<Runnable>());

}超过60秒空闲线程将会终止

2.2.3 ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor继承ThreadPoolExecutor,用于在给定的延迟以后执行任务或者定期执行任务,比Timer灵活,Timer对应单个后台线程而ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。

ScheduledExecutorService中至少有2个方法可用于周期性执行任务。

scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

我们可以使用该方法延迟执行任务,设置任务的执行周期。时间周期从线程池中首先开始执行的线程算起,所以假设period为1s,线程执行了5s,那么下一个线程在第一个线程运行完后会很快被执行。

比如下面的代码

for (int i = 0; i < 3; i++) {
    Thread.sleep(1000);
    WorkerThread worker = new WorkerThread("do heavy processing");
    // schedule task to execute at fixed rate
    scheduledThreadPool.scheduleAtFixedRate(worker, 0, 10,
            TimeUnit.SECONDS);

输出

Current Time = Tue Oct 29 16:10:00 IST 2013
pool-1-thread-1 Start. Time = Tue Oct 29 16:10:01 IST 2013
pool-1-thread-2 Start. Time = Tue Oct 29 16:10:02 IST 2013
pool-1-thread-3 Start. Time = Tue Oct 29 16:10:03 IST 2013
pool-1-thread-1 End. Time = Tue Oct 29 16:10:06 IST 2013
pool-1-thread-2 End. Time = Tue Oct 29 16:10:07 IST 2013
pool-1-thread-3 End. Time = Tue Oct 29 16:10:08 IST 2013
pool-1-thread-1 Start. Time = Tue Oct 29 16:10:11 IST 2013
pool-1-thread-4 Start. Time = Tue Oct 29 16:10:12 IST 2013

scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) 该方法可被用于延迟周期性执行任务,delaytime是线程停止执行到下一次开始执行之间的延迟时间,假设有下面的代码

for (int i = 0; i < 3; i++) {
    Thread.sleep(1000);
    WorkerThread worker = new WorkerThread("do heavy processing");
    scheduledThreadPool.scheduleWithFixedDelay(worker, 0, 1,
            TimeUnit.SECONDS);
}

输出结果

Current Time = Tue Oct 29 16:14:13 IST 2013
pool-1-thread-1 Start. Time = Tue Oct 29 16:14:14 IST 2013
pool-1-thread-2 Start. Time = Tue Oct 29 16:14:15 IST 2013
pool-1-thread-3 Start. Time = Tue Oct 29 16:14:16 IST 2013
pool-1-thread-1 End. Time = Tue Oct 29 16:14:19 IST 2013
pool-1-thread-2 End. Time = Tue Oct 29 16:14:20 IST 2013
pool-1-thread-1 Start. Time = Tue Oct 29 16:14:20 IST 2013
pool-1-thread-3 End. Time = Tue Oct 29 16:14:21 IST 2013
pool-1-thread-4 Start. Time = Tue Oct 29 16:14:21 IST 2013

 

    原文作者:yanko_lin
    原文地址: http://www.cnblogs.com/yanko/p/5414804.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞