自定义线程池内置线程池的使用 ThreadPoolExecutor和Executorservice 示例与注意事项

文章目录

线程池介绍

自JDK1.5起,utils包提供了ExecutorService线程池的实现,主要目的是为了重复利用线程,提高系统效率。我们知道Thread是一个重量级的资源,创建、启动以及销毁都是比较耗费系统资源的,因此对线程的重复利用一种是非常好的程序设计习惯,加之系统中可创建的线程数量是有限的,线程数量和系统性能是一种抛物线的关系,也就是说当线程数量达到某个数值的时候,性能反倒会降低很多,因此对线程的管理,尤其是数量的控制更能直接决定程序的性能。

所谓线程池,通俗的理解就是有一个池子,里面存放着已经创建好的线程,当有任务提交给线程池执行时,池子中的某个线程会主动执行该任务。如果池子中的线程数量不够应付数量众多的任务时,则需要自动扩充新的线程到池子中,但是该数量是有限的,就好比池塘的水界线一样。当任务比较少的时候,池子中的线程能够自动回收,释放资源。
为了能够异步地提交任务和缓存未被处理的任务,需要有一个任务队列,
《自定义线程池内置线程池的使用 ThreadPoolExecutor和Executorservice 示例与注意事项》

通过上面的描述可知,一个完整的线程池应该具备如下要素。
任务队列:用于缓存提交的任务。
线程数量管理功能:一个线程池必须能够很好地管理和控制线程数量,可通过如下三个参数来实现,比如创建线程池时初始的线程数量init;线程池自动扩充时最大的线程数量max;在线程池空闲时需要释放线程但是也要维护一定数量的活跃数量或者核心数量core。
有了这三个参数,就能够很好地控制线程池中的线程数量,将其维护在一个合理的范围之内,
三者之间的关系是init<=core<=max。

任务拒绝策略:如果线程数量已达到上限且任务队列已达到上限且任务队列已满,则需要有相应的拒绝策略来通知任务提交者。

线程工厂:主要用于个性化定制线程,比如将线程设置为守护线程以及设置线程名称等。

QueueSize:任务队列主要存放提交的Runnable,但是为了防止内存溢出,需要有limit数量对其进行控制。

Keepedalive时间:该时间主要决定线程各个重要参数自动维护的时间间隔。

自己设计一个线程池

在这里实现一个比较简单的ThreadPool,虽然比较简单,但是该有的功能基本上都具备,对读者学习和掌握JUC中的ExecutorService也有一定的帮助

线程池工作过程

  1. 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
  2. 当调用 execute() 方法添加一个任务时,线程池会做如下判断:
    a) 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
    b) 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
    c) 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
    d) 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常RejectExecutionException。
  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行。
  4. 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

1.设计ThreadPool类:



public interface ThreadPool
{ 
    //提交任务到线程池
    void execute(Runnable runnable);

    //关闭线程池
    void shutdown();

    //获取线程池的初始化大小
    int getInitSize();

    //获取线程池最大的线程数
    int getMaxSize();

    //获取线程池的核心线程数量
    int getCoreSize();

    //获取线程池中用于缓存任务队列的大小
    int getQueueSize();

    //获取线程池中活跃线程的数量
    int getActiveCount();

    //查看线程池是否已经被shutdown
    boolean isShutdown();
}

ThreadFactory提供了创建线程的接口,以便于个性化地定制Thread,比如Thread应该被加到哪个Group中,优先级、线程名字以及是否为守护线程等,

2.设计工作队列

RunanbleQueue主要用于存放提交的Runnable,该Runnable是一个BlockedQueue,并且有limit的限制

package com.wangwenjun.concurrent.chapter08;

//任务队列,主要用于缓存提交到线程池中的任务
public interface RunnableQueue
{ 
    //当有新的任务进来时首先会offer到队列中
    void offer(Runnable runnable);

    //工作线程通过take方法获取Runnable
    Runnable take();

    //获取任务队列中任务的数量
    int size();
}

自定义阻塞队列LinkedRunnableQueue的示例:


import java.util.LinkedList;

public class LinkedRunnableQueue implements RunnableQueue
{ 
    //任务队列的最大容量,在构造时传入
    private final int limit;

    //若任务队列中的任务已经满了,则需要执行拒绝策略
private final DenyPolicy denyPolicy;

    //存放任务的队列
    private final LinkedList<Runnable> runnableList = new LinkedList<>();

    private final ThreadPool threadPool;
    public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool)
    { 
        this.limit = limit;
        this.denyPolicy = denyPolicy;
        this.threadPool = threadPool;
    }
}
// 在LinkedRunnableQueue中有几个重要的属性,第一个是limit,也就是Runnable队列的上限;当提交的Runnable数量达到limit上限时,则会调用DenyPolicy的reject方法;runnableList是一个双向循环列表,用于存放Runnable任务
  @Override
public void offer(Runnable runnable)
{ 
    synchronized (runnableList)
{ 
        if (runnableList.size() >= limit)
        { 
            //无法容纳新的任务时执行拒绝策略
            denyPolicy.reject(runnable, threadPool);
        } else
        { 
            //将任务加入到队尾,并且唤醒阻塞中的线程
            runnableList.addLast(runnable);
            runnableList.notifyAll();
        }
    }
}
 //offer方法是一个同步方法,如果队列数量达到了上限,则会执行拒绝策略,否则会将runnable存放至队列中,同时唤醒take任务的线程: @Override
public Runnable take() throws InterruptedException
{ 
    synchronized (runnableList)
    { 
        while (runnableList.isEmpty())
        { 
            try
            { 
                //如果任务队列中没有可执行任务,则当前线程将会挂起,进入runnableList关联的monitor waitset中等待唤醒(新的任务加入)
                runnableList.wait();
            } catch (InterruptedException e)
            { 
                //被中断时需要将该异常抛出
                throw e;
            }
        }
        //从任务队列头部移除一个任务
        return runnableList.removeFirst();
    }
}

 //take方法也是同步方法,线程不断从队列中获取Runnable任务,当队列为空的时候工作线程会陷入阻塞,有可能在阻塞的过程中被中断,为了传递中断信号需要在catch语句块中将异常抛出以通知上游(InternalTask),示例代码如下: 
 @Override
public int size()
{ 
    synchronized (runnableList)
    { 
        //返回当前任务队列中的任务数
        return runnableList.size();
    }
//其中,size方法用于返回runnableList的任务个数。
}

3.实现自己设计的线程池

public class BasicThreadPool extends Thread implements ThreadPool
{ 
    //初始化线程数量
    private final int initSize;

    //线程池最大线程数量
    private final int maxSize;

    //线程池核心线程数量
    private final int coreSize;

    //当前活跃的线程数量
    private int activeCount;
    //创建线程所需的工厂
    private final ThreadFactory threadFactory;

    //任务队列
    private final RunnableQueue runnableQueue;

    //线程池是否已经被shutdown
    private volatile boolean isShutdown = false;

    //工作线程队列
    private final Queue<ThreadTask> threadQueue = new ArrayDeque<>();

    private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();

    private final static ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();

    private final long keepAliveTime;

    private final TimeUnit timeUnit;

    //构造时需要传递的参数:初始的线程数量,最大的线程数量,核心线程数量,任务队列的最大数量
    public BasicThreadPool(int initSize, int maxSize, int coreSize,
                           int queueSize)
    { 
        this(initSize, maxSize, coreSize, DEFAULT_THREAD_FACTORY,
                queueSize, DEFAULT_DENY_POLICY, 10, TimeUnit.SECONDS);
    }

    //构造线程池时需要传入的参数,该构造函数需要的参数比较多
    public BasicThreadPool(int initSize, int maxSize, int coreSize,ThreadFactory threadFactory, int queueSize,
DenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit)
    { 
        this.initSize = initSize;
        this.maxSize = maxSize;
        this.coreSize = coreSize;
        this.threadFactory = threadFactory;
            this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy, this);
        this.keepAliveTime = keepAliveTime;
        this.timeUnit = timeUnit;
        this.init();
    }
    //初始化时,先创建initSize个线程
    private void init()
    { 
        start();
        for (int i = 0; i < initSize; i++)
        { 
            newThread();
        }
    }
//提交任务非常简单,只是将Runnable插入runnableQueue中即可
 @Override
public void execute(Runnable runnable)
{ 
    if (this.isShutdown)
        throw new IllegalStateException("The thread pool is destroy");
   //提交任务只是简单地往任务队列中插入Runnable
   this.runnableQueue.offer(runnable);
}
,线程池自动维护代码如下: private void newThread()
{ 
    //创建任务线程,并且启动
    InternalTask internalTask = new InternalTask(runnableQueue);
    Thread thread = this.threadFactory.createThread(internalTask);
    ThreadTask threadTask = new ThreadTask(thread, internalTask);
    threadQueue.offer(threadTask);
    this.activeCount++;
    thread.start();}
private void removeThread()
{ 
    //从线程池中移除某个线程
    ThreadTask threadTask = threadQueue.remove();
    threadTask.internalTask.stop();
    this.activeCount--;
}
@Override
public void run()
{ 
    //run方法继承自Thread,主要用于维护线程数量,比如扩容、回收等工作
    while (!isShutdown && !isInterrupted())
    { 
        try
        { 
            timeUnit.sleep(keepAliveTime);
        } catch (InterruptedException e)
        { 
            isShutdown = true;
            break;
        }
        synchronized (this)
        { 
            if (isShutdown)
                break;
            //当前的队列中有任务尚未处理,并且activeCount< coreSize则继续扩容
            if (runnableQueue.size() > 0&& activeCount < coreSize)
            { 
                for (int i = initSize; i < coreSize; i++)
                { 
                    newThread();
                }
                //continue的目的在于不想让线程的扩容直接达到maxsize
                continue;
            }
//当前的队列中有任务尚未处理,并且activeCount< maxSize则继续扩容
            if (runnableQueue.size() > 0&& activeCount < maxSize)
            { 
                for (int i = coreSize; i < maxSize; i++)
                { 
                    newThread();
                }
            }
            //如果任务队列中没有任务,则需要回收,回收至coreSize即可
            if (runnableQueue.size()==0&& activeCount > coreSize)
            { 
                for (int i = coreSize; i < activeCount; i++)
                { 
                    removeThread();
                }
            }
        }
    }
}

//ThreadTask只是InternalTask和Thread的一个组合
private static class ThreadTask
{ 
    public ThreadTask(Thread thread, InternalTask internalTask)
    { 
        this.thread = thread;
        this.internalTask = internalTask;
    }
    Thread thread;
    InternalTask internalTask;
}

自己设计的线程池测试代码

import java.util.concurrent.TimeUnit;

public class ThreadPoolTest
{ 
    public static void main(String[] args) throws InterruptedException
    { 
//定义线程池,初始化线程数为2,核心线程数为4,最大线程数为6,任务队列最多允许1000个任务
                for (int i = 0; i < 20; i++)
            threadPool.execute(() ->
            { 
                try
                { 
                    TimeUnit.SECONDS.sleep(10);
System.out.println(Thread.currentThread().getName() + " is running and done.");
                } catch (InterruptedException e)
                { 
                    e.printStackTrace();
                }
            });

        for (; ; )
        { 
            //不断输出线程池的信息
            System.out.println("getActiveCount:" + threadPool.getActiveCount());
            System.out.println("getQueueSize:" + threadPool.getQueueSize());
            System.out.println("getCoreSize:" + threadPool.getCoreSize());
            System.out.println("getMaxSize:" + threadPool.getMaxSize());
            System.out.println("======================================");
            TimeUnit.SECONDS.sleep(5);
        }
    }
}


用java的ThreadPoolExecutor自定义线程池

自定义线程池需要用到ThreadPoolExecutor,这个类提供ExecutorService执行方法的默认实现。 此类使用newTaskFor返回的RunnableFuture实现submit 、 invokeAny和invokeAll方法,默​​认为该包中提供的FutureTask类。

2.1.1:ThreadPoolExecutor部分源码

 构造方法:
  public ThreadPoolExecutor(int corePoolSize,/核心线程数量int maximumPoolSize,//
  最大线程数
  long keepAliveTime,/
  最大空闲时间
  TimeUnit unit,
  时间单位
  BlockingQueue<Runnable>workQueue,/任务队列ThreadFactory threadFactory,/线程工厂
  RejectedExecutionHandler handler/∥饱和处理机制)
  { }

自定义线程池-参数设计分析

◆通过观察Java中的内置线程池参数讲解和线程池工作流程总结我们不难发现要设计一个好的线程池,就必须合理的设置线程池的4个参数那到底该如何合理的设计4个参数的值呢?我们起往下看.

4个参数的设计
1:核心线程数( corepoolsize)
核心线程数的设计需要依据任务的处理时间可和每秒产生的任务数量来确定例如执行个任务常要0.1秒系统百分之80的时间每秒都会产生100个任务那么要想在1秒内处理完这100个任务就需要10个线程此时我们就可以设计核心线程数为10,当然实时情兄不可能这么平均所以我们般按照8020原则设计即可,既技照百分之80的情况设计核心线程数剩下的百分之20可以利用最大线程数处理

2:任务队列长度( workqueue)
任务队列长度一般设计为线程数/单个任务执行时可2即可,如上面的场中核心线程数设计为10单个任务执行时可为
0.1秒.则队列长度可以设计为200

3:最大线程数(maximumPoolSize)
  最大线程数的设计除了需要参照核心线程数的条件外,还需要参照系统每秒产生的最大任务数决定例如:上述环境中,如果系统每秒最大产生的任务是1000个,那么最大线程数=(最大任务数-任务队列长度)*单个任务执行时间:既:最大线程数=(1000-200)*0.1=80个:

4:最大空闲时间(keepAliveTime)
  这个参数的设计完全参考系统运行环境和硬件压力设定没有固定的参考值用户可以根据经验和系统产生任务的时间间隔合理设置一个值即可;

自定义线程池-实现步骤示例

1编写任务类( My Task),实现 Runnable接口
2编写线程类( My Worker)用于执行任务需要持有所有任务;
3编写线程池类( Mythread Pool),包含提交任务执行任务的能力;
4编写测试类( Mytest)创建线程池对象提交多个任务

例子:

public static void main(String[] args)
        throws ExecutionException, InterruptedException
{ 
    // ① 创建ThreadPoolExecutor,7个构造参数
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 30,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(10),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.DiscardPolicy());

    // ② 提交执行异步任务,不关注返回值
executor.execute(() -> System.out.println(" execute the runnable task"));

    // ③ 提交执行异步任务,关注返回值
Future<String> future = executor.submit(() -> " Execute the callable task and this is the result");

    // ④获取并输出callable任务的返回值
    System.out.println(future.get());
} 

Exectors创建内置线程池

注:《阿里巴巴Java开发手册》中强制线程池不允许使用 Executors 去创建,而是通过ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
Executors 返回线程池对象的弊端如下:
FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为Integer.MAX_VALUE,可能堆积大量的请求,从而导致OOM。
CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为Integer.MAX_VALUE ,可能会创建大量线程,从而导致OOM。

ExecutorService介绍和示例

Executors其实是个工具类,里面提供了好多静态方法,这些方法根据用户选择返回不同的线程池实例。 ThreadPoolExecutor继承了AbstractExecutorService,成员变量ctl是一个Integer的原子变量,用来记录线程池状态和线程池中线程个数,类似于ReentrantReadWriteLock使用一个变量来保存两种信息。
  
  获取ExecutorServicei可以利用JDK中的Executors类中的静态方法,常用获取方式如下:
static ExecutorService newCachedThreadPool(创建一个默认的线程池对象,里面的线程可重用,且在第一次使用时才创建static ExecutorService ,最多线程个数为Integer.MAX_VALUE,并且阻塞队列为同步队列。keeyAliveTime=60说明只要当前线程在60s内空闲则回收。这个类型的特殊之处在于,加入同步队列的任务会被马上执行,同步队列里面最多只有一个任务

static ExecutorService newFixedThreadPool(int n Threads)创建一个可重用固定线程数的线程池并且阻塞队列长度为Integer.MAX_VALUE。keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收。
static ExecutorService newFixedThreadPool(int n

static ExecutorService newSingleThreadExecutor()创建一个使用单个worker线程的Executor,以无界队列方式来运行该线程。并且阻塞队列长度为Integer.MAX_VALUE。keeyAliveTime=0说明只要线程个数比核心线程个数多并且当前空闲则回收。

上面三个构造方法都有带ThreadFactory的重载方法,用于自定义线程创建的方式。

例子:

//创建一个线程池
ExecutorService pool = Executors.newFixedThreadPool(taskSize);
// 创建多个有返回值的任务
List<Future> list = new ArrayList<Future>();
for (int i = 0; i < taskSize; i++) { 
Callable c = new MyCallable(i + " ");
// 执行任务并获取Future 对象
Future f = pool.submit(c);
list.add(f);
}
// 关闭线程池
pool.shutdown();
// 获取所有并发任务的运行结果
for (Future f : list) { 
// 从Future 对象上获取任务的返回值,并输出到控制台
System.out.println("res:" + f.get().toString());

Scheduledexecutorservice

Scheduledexecutorservice,是 ExecutorService的子接口,具备了延迟运行或定期执行任务的能力,常用获取方式如下
static Scheduledexecutorservice newscheduled Threadpool(int corepoolsize创建一个可重用固定线程数的线程池且允许延迟运行或定期执行任务
static Scheduledexecutorservice newscheduledthread Pool(int corepoolsize, Threadfactory threadfactory)
创建一个可重用固定线程数的线程池且线程池中的所有线程都使用 Thread Factory来创建,且允许延迟运行或定期执行任务;

static Scheduledexecutorservice newsinglethreadscheduledexecutor(Threadfactory threadfactory)创建一个单线程执行程序,它可安排在给定延退后运行命令或者定期地执行。

例子:

ScheduledExecutorService scheduledThreadPool= Executors.newScheduledThreadPool(3); 

scheduledThreadPool.schedule(newRunnable(){ 
 @Override
  public void run() 
  {  
System.out.println("延迟三秒"); 
} 
}, 3, 
TimeUnit.SECONDS); 
scheduledThreadPool.scheduleAtFixedRate(newRunnable(){  @Override 
public void run() {  
System.out.println("延迟1秒后每三秒执行一次");
 } },1,3,TimeUnit.SECONDS);
    原文作者:march of Time
    原文地址: https://blog.csdn.net/qq_41358574/article/details/121852746
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞