Java 线程池简单实现
线程池的作用
对于一些并发量不高,不要求服务器快速响应请求的任务,我们可以通过每接收一个任务,创建一个线程来执行任务的方式来完成,但是对于一些高并发下场景下,很容易就会性能瓶颈的。一个任务一个线程的话,系统会创建大量的线程,导致操作系统频繁地进行上下文切换,增加系统的负载,同时线程是一个很珍贵的资源,每一次线程的创建和销毁都会消耗系统资源。
我们可以通过创建线程池来解决问题,线程池通过预先创建一定数量的worker,不让用户直接创建线程来执行任务,通过重复的使用固定数目的worker来完成任务,可以方便管理线程的分配、监控和调优,同时减少了线程和销毁线程的系统资源开销,而且提高了任务的相应速度,不需要等到任务到来时才创建线程。
线程池的原理
线程池任务处理过程
当接收到一个任务请求的时候,线程池首先会判断线程池里的线程是否都在执行任务,如果有空闲线程的话,会调用空闲线程来执行任务,否则,就会将任务添加到工作队列,如果工作队列已满的话,就会将这个任务交给RejectExecutionHandler(饱和策略)来处理。
线程池简单实现(没有实现饱和策略)
package me.kuye.concurrent;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
public class ThreadPool<Task extends Runnable> {
//最大线程数
private static final int MAX_WORKER_NUMBERS = 10;
//默认线程数
private static final int DEFAULT_WORKER_NUMBERS = 5;
//最小线程数
private static final int MIN_WORKER_NUMBERS = 1;
//工作队列
private BlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>();
//工作者队列
private List<Worker> workerList = Collections.synchronizedList(new ArrayList<>());
//当前线程数
private int workerNum = DEFAULT_WORKER_NUMBERS;
//
private AtomicLong threadNumber = new AtomicLong();
public ThreadPool() {
this(DEFAULT_WORKER_NUMBERS);
}
public ThreadPool(int number) {
this.workerNum = number > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS
: number < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : number;
init(workerNum);
}
//线程池预热,预先创建固定数目的线程
private void init(int workerNum) {
for (int i = 0; i < workerNum; i++) {
workerList.add(new Worker("worker-" + threadNumber.incrementAndGet()));
}
for (Worker worker : workerList) {
new Thread(worker).start();
}
}
//执行任务
public void execute(Task task) {
if (task != null) {
synchronized (taskQueue) {
taskQueue.add(task);
taskQueue.notify();
}
}
}
//关闭线程池
public void shutdown() {
for (Worker worker : workerList) {
worker.shutdown();
}
}
// 增加一定数目的工作者
public void addWorkers(int number) {
synchronized (workerList) {
if (number + this.workerNum > MAX_WORKER_NUMBERS) {
number = MAX_WORKER_NUMBERS - this.workerNum;
}
init(number);
this.workerNum += number;
}
}
//减少一定数目的工作者
public void removeWorkers(int number) {
synchronized (workerList) {
if (number > this.workerNum) {
throw new IllegalArgumentException("the number is more than the workerNum");
}
int count = 0;
while (count < number) {
Worker worker = workerList.get(count);
if (workerList.remove(worker)) {
worker.shutdown();
count++;
}
}
}
this.workerNum -= number;
}
// 工作者
private class Worker implements Runnable {
private volatile boolean isStop = false;
private String workerName = "";
public Worker(String workerName) {
this.workerName = workerName;
}
@Override
public void run() {
while (!isStop) {
Task task = null;
synchronized (taskQueue) {
while (taskQueue.isEmpty()) {
try {
taskQueue.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
task = taskQueue.poll();
}
if (task != null) {
try {
System.out.print(this.workerName+ " ");
task.run();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
public void shutdown() {
this.isStop = true;
System.out.println(this.workerName + " is shutdown");
}
}
}