java线程池的原理与实现

简单介绍

创建线程有两种方式:继承Thread或实现Runnable。Thread实现了Runnable接口,提供了一个空的run()

方法,所以不论是继承Thread还是实现Runnable,都要有自己的run()方法。

一个线程创建后就存在,调用start()方法就开始运行(执行run()方法),调用wait进入等待或调用sleep

进入休眠期,顺利运行完毕或休眠被中断或运行过程中出现异常而退出。

wait和sleep比较:

sleep方法有:

sleep(long millis)

sleep(long millis, long nanos)

调用sleep方法后,当前线程进入休眠期,暂停执行,但该线程继续拥有监视资源的所有权。到达休眠时

间后线程将继续执行,直到完成。若在休眠期另一线程中断该线程,则该线程退出。

wait方法有:

wait()

wait(long timeout)

wait(long timeout, long nanos)

调用wait方法后,该线程放弃监视资源的所有权进入等待状态;

wait():等待有其它的线程调用notify()或notifyAll()进入调度状态,与其它线程共同争夺监视。wait()相当于wait(0),wait(0, 0)。

wait(long timeout):当其它线程调用notify()或notifyAll(),或时间到达timeout亳秒,或有其它某线程中断该线程,则该线程进入调度状态。

wait(long timeout, long nanos):相当于wait(1000000*timeout + nanos),只不过时间单位为纳秒。

线程池:

多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。

    

假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。

如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。

一个线程池包括以下四个基本组成部分:

1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;

2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;

3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;

4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。

                

线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安

排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会

有T1,T3的开销了。

线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目,看一个例子:

假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数

一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些

请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建

50000而在处理请求时浪费时间,从而提高效率。

/** 线程池类,工作线程作为其内部类 **/

package org.ymcn.util;

import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;

import org.apache.log4j.Logger;

/**
 * 线程池 创建线程池,销毁线程池,添加新任务
 * 
 * @author obullxl
 */
public final class ThreadPool {
	private static Logger logger = Logger.getLogger(ThreadPool.class);
	private static Logger taskLogger = Logger.getLogger("TaskLogger");

	private static boolean debug = taskLogger.isDebugEnabled();
	// private static boolean debug = taskLogger.isInfoEnabled();
	/* 单例 */
	private static ThreadPool instance = ThreadPool.getInstance();

	public static final int SYSTEM_BUSY_TASK_COUNT = 150;
	/* 默认池中线程数 */
	public static int worker_num = 5;
	/* 已经处理的任务数 */
	private static int taskCounter = 0;

	public static boolean systemIsBusy = false;

	private static List<Task> taskQueue = Collections
			.synchronizedList(new LinkedList<Task>());
	/* 池中的所有线程 */
	public PoolWorker[] workers;

	private ThreadPool() {
		workers = new PoolWorker[5];
		for (int i = 0; i < workers.length; i++) {
			workers[i] = new PoolWorker(i);
		}
	}

	private ThreadPool(int pool_worker_num) {
		worker_num = pool_worker_num;
		workers = new PoolWorker[worker_num];
		for (int i = 0; i < workers.length; i++) {
			workers[i] = new PoolWorker(i);
		}
	}

	public static synchronized ThreadPool getInstance() {
		if (instance == null)
			return new ThreadPool();
		return instance;
	}

	/**
	 * 增加新的任务 每增加一个新任务,都要唤醒任务队列
	 * 
	 * @param newTask
	 */
	public void addTask(Task newTask) {
		synchronized (taskQueue) {
			newTask.setTaskId(++taskCounter);
			newTask.setSubmitTime(new Date());
			taskQueue.add(newTask);
			/* 唤醒队列, 开始执行 */
			taskQueue.notifyAll();
		}
		logger.info("Submit Task<" + newTask.getTaskId() + ">: "
				+ newTask.info());
	}

	/**
	 * 批量增加新任务
	 * 
	 * @param taskes
	 */
	public void batchAddTask(Task[] taskes) {
		if (taskes == null || taskes.length == 0) {
			return;
		}
		synchronized (taskQueue) {
			for (int i = 0; i < taskes.length; i++) {
				if (taskes[i] == null) {
					continue;
				}
				taskes[i].setTaskId(++taskCounter);
				taskes[i].setSubmitTime(new Date());
				taskQueue.add(taskes[i]);
			}
			/* 唤醒队列, 开始执行 */
			taskQueue.notifyAll();
		}
		for (int i = 0; i < taskes.length; i++) {
			if (taskes[i] == null) {
				continue;
			}
			logger.info("Submit Task<" + taskes[i].getTaskId() + ">: "
					+ taskes[i].info());
		}
	}

	/**
	 * 线程池信息
	 * 
	 * @return
	 */
	public String getInfo() {
		StringBuffer sb = new StringBuffer();
		sb.append("\nTask Queue Size:" + taskQueue.size());
		for (int i = 0; i < workers.length; i++) {
			sb.append("\nWorker " + i + " is "
					+ ((workers[i].isWaiting()) ? "Waiting." : "Running."));
		}
		return sb.toString();
	}

	/**
	 * 销毁线程池
	 */
	public synchronized void destroy() {
		for (int i = 0; i < worker_num; i++) {
			workers[i].stopWorker();
			workers[i] = null;
		}
		taskQueue.clear();
	}

	/**
	 * 池中工作线程
	 * 
	 * @author obullxl
	 */
	private class PoolWorker extends Thread {
		private int index = -1;
		/* 该工作线程是否有效 */
		private boolean isRunning = true;
		/* 该工作线程是否可以执行新任务 */
		private boolean isWaiting = true;

		public PoolWorker(int index) {
			this.index = index;
			start();
		}

		public void stopWorker() {
			this.isRunning = false;
		}

		public boolean isWaiting() {
			return this.isWaiting;
		}

		/**
		 * 循环执行任务 这也许是线程池的关键所在
		 */
		public void run() {
			while (isRunning) {
				Task r = null;
				synchronized (taskQueue) {
					while (taskQueue.isEmpty()) {
						try {
							/* 任务队列为空,则等待有新任务加入从而被唤醒 */
							taskQueue.wait(20);
						} catch (InterruptedException ie) {
							logger.error(ie);
						}
					}
					/* 取出任务执行 */
					r = (Task) taskQueue.remove(0);
				}
				if (r != null) {
					isWaiting = false;
					try {
						if (debug) {
							r.setBeginExceuteTime(new Date());
							taskLogger.debug("Worker<" + index
									+ "> start execute Task<" + r.getTaskId()
									+ ">");
							if (r.getBeginExceuteTime().getTime()
									- r.getSubmitTime().getTime() > 1000)
								taskLogger.debug("longer waiting time. "
										+ r.info()
										+ ",<"
										+ index
										+ ">,time:"
										+ (r.getFinishTime().getTime() - r
												.getBeginExceuteTime()
												.getTime()));
						}
						/* 该任务是否需要立即执行 */
						if (r.needExecuteImmediate()) {
							new Thread(r).start();
						} else {
							r.run();
						}
						if (debug) {
							r.setFinishTime(new Date());
							taskLogger.debug("Worker<" + index
									+ "> finish task<" + r.getTaskId() + ">");
							if (r.getFinishTime().getTime()
									- r.getBeginExceuteTime().getTime() > 1000)
								taskLogger.debug("longer execution time. "
										+ r.info()
										+ ",<"
										+ index
										+ ">,time:"
										+ (r.getFinishTime().getTime() - r
												.getBeginExceuteTime()
												.getTime()));
						}
					} catch (Exception e) {
						e.printStackTrace();
						logger.error(e);
					}
					isWaiting = true;
					r = null;
				}
			}
		}
	}
}

/** 任务接口类 **/

package org.ymcn.util;

import java.util.Date;

/**
 * 所有任务接口 其他任务必须继承访类
 * 
 * @author obullxl
 */
public abstract class Task implements Runnable {
	// private static Logger logger = Logger.getLogger(Task.class);
	/* 产生时间 */
	private Date generateTime = null;
	/* 提交执行时间 */
	private Date submitTime = null;
	/* 开始执行时间 */
	private Date beginExceuteTime = null;
	/* 执行完成时间 */
	private Date finishTime = null;

	private long taskId;

	public Task() {
		this.generateTime = new Date();
	}

	/**
	 * 任务执行入口
	 */
	public void run() {
		/**
		 * 相关执行代码
		 * 
		 * beginTransaction();
		 * 
		 * 执行过程中可能产生新的任务 subtask = taskCore();
		 * 
		 * commitTransaction();
		 * 
		 * 增加新产生的任务 ThreadPool.getInstance().batchAddTask(taskCore());
		 */
	}

	/**
	 * 所有任务的核心 所以特别的业务逻辑执行之处
	 * 
	 * @throws Exception
	 */
	public abstract Task[] taskCore() throws Exception;

	/**
	 * 是否用到数据库
	 * 
	 * @return
	 */
	protected abstract boolean useDb();

	/**
	 * 是否需要立即执行
	 * 
	 * @return
	 */
	protected abstract boolean needExecuteImmediate();

	/**
	 * 任务信息
	 * 
	 * @return String
	 */
	public abstract String info();

	public Date getGenerateTime() {
		return generateTime;
	}

	public Date getBeginExceuteTime() {
		return beginExceuteTime;
	}

	public void setBeginExceuteTime(Date beginExceuteTime) {
		this.beginExceuteTime = beginExceuteTime;
	}

	public Date getFinishTime() {
		return finishTime;
	}

	public void setFinishTime(Date finishTime) {
		this.finishTime = finishTime;
	}

	public Date getSubmitTime() {
		return submitTime;
	}

	public void setSubmitTime(Date submitTime) {
		this.submitTime = submitTime;
	}

	public long getTaskId() {
		return taskId;
	}

	public void setTaskId(long taskId) {
		this.taskId = taskId;
	}

}

转自:http://hi.baidu.com/obullxl/item/e5b0095c7fc97aced3e10c36

    原文作者:java线程池
    原文地址: https://blog.csdn.net/u011742227/article/details/42496555
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞