Java并发和多线程3:线程调度和有条件取消调度

在第1篇中“并发框架基本演示样例”。提到了Executors和ThreadPool。

当中。还有个“定时调度”的方法。Executors.newScheduledThreadPool(10)。

// 可运行调度命令(定时+周期性)的线程池,拥有固定的线程数
	// 反复运行,无穷尽
	public static void scheduledThreadPool() {
		int initialDelay = 10;
		int period = 10;
		Executor executor = Executors.newScheduledThreadPool(10);
		ScheduledExecutorService scheduler = (ScheduledExecutorService) executor;
		scheduler.scheduleAtFixedRate(task, initialDelay, period,
				TimeUnit.SECONDS);
	}

这段代码。会一直反复运行,是一种常见的场景。

可是,想到一种“仅仅调度N次”的需求。看了下API,没有提供。

就网上搜索“Java 取消线程调度”,找到了1个问答,就研究了下。

代码演示样例的关键是,使用了线程安全的AtomicInteger和通用同步工具CountDownLatch。

核心逻辑就是,当超过了最大任务数N的时候,取消Future中的任务,countDownLatch中的计数器-1,变为0,“countDownLatch.await()”线程堵塞结束

countDownLatch.countDown();

关于CountDownLatch的进一步介绍,请參考第4篇。

package cn.fansunion.executorframework;


import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;


//有条件地。取消调度
public class ConditionCancelSchedulerDemo {


	public static Runnable task = new Runnable() {
		@Override
		public void run() {
			System.out.println("Execute a task");
		}
	};


	// 可运行调度命令(定时+周期性)的线程池,拥有固定的线程数
	// 反复运行,无穷尽
	public static void scheduledThreadPool() {
		int initialDelay = 10;
		int period = 10;
		Executor executor = Executors.newScheduledThreadPool(10);
		ScheduledExecutorService scheduler = (ScheduledExecutorService) executor;
		scheduler.scheduleAtFixedRate(task, initialDelay, period,
				TimeUnit.SECONDS);
	}


	public static void main(String[] args) throws Exception {
		scheduledThreadPool();
		conditionCancelScheduler();
	}


	private static void conditionCancelScheduler() throws InterruptedException {
		final String jobID = "my_job_1";
		final AtomicInteger count = new AtomicInteger(0);
		final Map<String, Future> futures = new HashMap<>();
		// 最多运行10个任务
		final int maxTaskSize = 10;
		// CountDownLatch的很多其它使用方法,请參考CountDownLatchDemo
		final CountDownLatch countDownLatch = new CountDownLatch(1);
		ScheduledExecutorService scheduler = Executors
				.newSingleThreadScheduledExecutor();


		Future future = scheduler.scheduleWithFixedDelay(new Runnable() {
			@Override
			public void run() {
				System.out.println(count.getAndIncrement());
				// 当调度运行,第maxTaskSize+1个任务的时候,取消Future中的任务。第11个任务
				if (count.get() > maxTaskSize) {
					System.out.println("a");
					Future f = futures.get(jobID);
					if (f != null) {
						f.cancel(true);
					}
					// countDownLatch中的计数器-1,变为0
					// “countDownLatch.await()”线程堵塞结束
					countDownLatch.countDown();
				}
			}
		}, 0, 1, TimeUnit.SECONDS);


		futures.put(jobID, future);
		countDownLatch.await();


		scheduler.shutdown();
	}
}

很多其它代码演示样例:

http://git.oschina.net/fansunion/Concurrent(逐步更新中)

參考资料:

java并发编程-Executor框架

http://www.iteye.com/topic/366591

有条件地终止 ScheduledExecutorService 中运行的定时任务

http://www.oschina.net/question/1158769_119659?sort=time

JDK API 文档

    原文作者:jzdwajue
    原文地址: http://www.cnblogs.com/jzdwajue/p/7010134.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞