Java核心基础加强系列-Java线程池详解(源码解析)

作为Java开发者,多线程是一个绕不开的主题,Java作为一门支持多线程的编程语言,我们必须要去理解和使用多线程技术,除此之外,多线程也是面试的一大重点,提到多线程,不可避免地就要用到线程池技术。

如何开启一个线程执行任务

在讨论线程池是什么之前,我们先来看看如何开启一个线程去执行一个任务,你是不是会写出以下代码呢?

new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("开启一个线程执行咯。。。");
            }
        }).start();

或者优雅一点,你可能写出以下的代码。

new Thread(()->{
            System.out.println("开启一个线程执行咯。。。");
        }).start();

你会说,这种方式没有任何问题啊,开启一个线程去执行一个任务,当然啦,这种方式用在我们自己的项目中是没有任何问题的,但是如果你在公司的项目中写出这样的代码,那么你得考虑一下了,说不定会挨揍。

说说为啥会挨揍?首先,我们知道线程是非常宝贵的资源,新建线程和销毁线程都是非常消耗系统资源的,如果使用以上的方式去开启线程,那么系统每次都会新建一个线程,任务执行完毕后马上销毁线程。如果程序中多次使用到这种方式,每次都会重复新建和销毁的动作,这对系统资源的消耗是非常巨大的,而且这种方式也不便于对任务进行管理,当开启后,就失去了对其的控制权。所以,切记:公司项目中不要写出这样的代码哦!别问笔者为何知道,说多都是泪。

什么是线程池

上面讲了为什么不能使用传统的方式去开启一个线程,主要的问题包括以下几点:

  • 线程的新建和销毁极其消耗系统资源。
  • 不便于统一管理

那么如何解决这个问题呢?类比数据库连接,很容易想到的就是 “池化技术”,我们可以在程序启动时,一次性申请多个线程资源,并将其放到一个池子中,每次需要使用时,去池子中取,用完将其还回到池子中,这样就省去了很多线程新建时的消耗,在关闭程序时,再一次性将线程销毁,这样既提高了线程的复用率,还能提高系统的响应率。

如何使用线程池

大家可以想想我们是如何使用线程池的?如果是使用的Spring全家桶系列,那么是不是会有一个ThreadPoolTaskExecutor对象呢?其实Spring的线程池只是对JDK自带的ThreadPoolExecutor进行了再一次的封装,其核心还是JDK中JUC包下的ThreadPoolExecutor类。

再来说说JDK中的几类线程池:

Executors.newCachedThreadPool();
Executors.newFixedThreadPool(10);
Executors.newSingleThreadExecutor();
Executors.newScheduledThreadPool(10);

其实我们稍微翻一下源码,这几类线程池也只是对最底层的ThreadPoolExecutor进行了一些特殊的配置而已,所以,怎么都逃不过JDK中JUC包下的ThreadPoolExecutor类。下面我们就来见识见识这个类到底是何方神圣?最后我们再来看看上面这几种线程池到底进行了怎么样特殊的配置。

核心的ThreadPoolExecutor类

翻一翻源码,发现其顶级接口是Executor,其中只有一个接收参数类型为Runnable的execute方法。接下来就是非常繁杂的继承体系。到ThreadPoolExecutor这一层就已经是具体的实现了,我们可以向其提交任务,就等待CPU去调度执行了。

ThreadPoolExecutor类的核心参数

打开源码,就很轻松地发现其中有多个参数配置:

public ThreadPoolExecutor(
        int corePoolSize,                   //核心线程数
        int maximumPoolSize,                //最大线程数
        long keepAliveTime,                 //空闲线程最大存活时间
        TimeUnit unit,                      //存活时间单位
        BlockingQueue<Runnable> workQueue,  //任务阻塞队列
        ThreadFactory threadFactory,        //线程工厂
        RejectedExecutionHandler handler)   //任务拒绝策略

下面我们来一一讲解。

  • corePoolSize 核心线程的最大值。

    线程池新建线程的时候,如果当前线程总数小于corePoolSize,则新建的是核心线程,如果超过corePoolSize,则新建的是非核心线程,核心线程默认情况下会一直存活在线程池中,即使这个核心线程啥也不干(闲置状态)。

    如果指定ThreadPoolExecutor的allowCoreThreadTimeOut这个属性为true,那么核心线程如果不干活(闲置状态)的话,超过一定时间(keepAliveTime参数决定),就会被销毁掉。

  • maximumPoolSize 线程池中最大线程数。

    线程最大数 = 核心线程数 + 非核心线程数。

  • keepAliveTime 空闲线程最大存活时间。

    一个非核心线程,如果不干活(闲置状态)的时长超过这个参数所设定的时长,就会被销毁掉。

    如果设置参数allowCoreThreadTimeOut = true,则会作用于核心线程。

  • TimeUnit unit:keepAliveTime的单位。

    TimeUnit是一个枚举类型,其包括:

    NANOSECONDS : 1微毫秒 = 1微秒 / 1000

    MICROSECONDS : 1微秒 = 1毫秒 / 1000

    MILLISECONDS : 1毫秒 = 1秒 /1000

    SECONDS : 秒

    MINUTES : 分

    HOURS : 小时

    DAYS : 天

  • BlockingQueue workQueue 该线程池中的任务队列:维护着等待执行的Runnable对象

    当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务。

    maximumPoolSize - corePoolSize之间就是可以新建的非核心线程数的最大值。

    当任务队列已满(由任务队列种类决定),且线程池中的线程总数没超过maximumPoolSize,线程池将会新建“非核心线程”去临时帮助执行任务,在这种情况下,如果一个非核心线程处于“闲置状态”,超过keepAliveTime后将会被销毁,直至线程池中的线程数重新达到corePoolSize。

    总结

    1. 如果运行的线程少于 corePoolSize,线程池始终首选添加新的线程去执行任务,而不进行排队。
    2. 如果运行的线程等于或多于 corePoolSize,线程池始终首选将请求加入队列,而不添加新的线程。
    3. 如果无法将请求加入队列,则创建新的线程(非核心线程),除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

如下是几种常用的workQueue类型:

有限队列

SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大,具体可以参见CachedThreadPool。

ArrayBlockingQueue:可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则执行拒绝策略。

无限队列

LinkedBlockingQueue:这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为创建的线程就不会超过 corePoolSize。

DelayQueue:队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。

  • RejectedExecutionHandler 拒绝任务处理器。

    任务的数量达到了 corePoolSize,任务入队成功,以此同时线程池被关闭了,而且关闭线程池并没有将这个任务出队,那么执行拒绝策略。这里说的是非常边界的问题,入队和关闭线程池并发执行,大家可以看看execute 方法的源码,看看是怎么进到第一个 reject(command) 里面的。

    任务的数量大于等于 corePoolSize,准备入队,可是队列满了,任务入队失败,那么准备开启新的线程,可是线程数已经达到 maximumPoolSize,那么执行拒绝策略。

    JDK中自带的四种拒绝策略,大家自行研究吧,我没研究过,一般用默认的:

    ThreadPoolExecutor.AbortPolicy 用于被拒绝任务的处理程序,它将抛出 RejectedExecutionException。

    ThreadPoolExecutor.CallerRunsPolicy 用于被拒绝任务的处理程序,它直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务。

    ThreadPoolExecutor.DiscardOldestPolicy 用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试 execute;如果执行程序已关闭,则会丢弃该任务。

    ThreadPoolExecutor.DiscardPolicy 用于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务。

    四种类型的线程池

    讲完了核心的参数,我们再来看看之前提到过的四种线程池:

    Executors.newSingleThreadExecutor();        单个线程线程池 (Core和Max数量都是1)
    Executors.newFixedThreadPool(10);           固定大小的线程池(Core = Max)
    Executors.newCachedThreadPool();            无限线程池(Core=0 ,Max=无限大)
    Executors.newScheduledThreadPool(10);       (Max=无限大,任务队列为DelayedWorkQueue)       
    

再详细的大家可以自己看看源码。

如何优雅关闭线程池

无非就是两个方法:shutdown()和shutdownNow()。

shutdown() 执行后停止接受新任务,会把队列的任务执行完毕。

shutdownNow() 也是停止接受新任务,但会中断所有的任务,将线程池状态变为 stop。

以下的一种关闭方式仅供参考:

   long start = System.currentTimeMillis();
  
   for (int i = 0; i <= 5; i++) {
       pool.execute(new Job());
    }
    
   pool.shutdown();
  
   while (!pool.awaitTermination(1, TimeUnit.SECONDS){
           LOGGER.info("线程还在执行。。。");
        }
        
   long end = System.currentTimeMillis();
  
   LOGGER.info("一共处理了【{}】", (end - start));

pool.awaitTermination(1, TimeUnit.SECONDS) 会每隔一秒钟检查一次是否执行完毕(状态为 TERMINATED),当从 while 循环退出时就表明线程池中的任务已经完全终止了。

源码解析

最近有一些面试,被问到了Java线程池核心的实现,我们就来分析一下源码吧。

暂无评论
发表新评论