线程池做的工作主要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数超过了最大数量超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

# 一、什么是线程池

线程池Thread Pool是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来4个好处:
1)、降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
2)、提高响应速度。当任务达到时,任务可以不需要等到线程创建就能立即执行。
3)、提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用线程池,必须对其实现原理了如指掌。
4)、提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

# 二、线程池解决的问题是什么

线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:
【1】频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大;
【2】对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险;
【3】系统无法合理管理内部的资源分布,会降低系统的稳定性;

为解决资源分配这个问题,线程池采用了“池化”Pooling思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。在计算机领域中的表现为:统一管理IT资源,包括服务器、存储、和网络资源等等。除了线程池,还有其他比较典型的几种使用策略包括:
【1】内存池Memory Pooling:预先申请内存,提升申请内存速度,减少内存碎片。
【2】连接池Connection Pooling:预先申请数据库连接,提升申请连接的速度,降低系统的开销。
【3】实例池Object Pooling:循环使用对象,减少资源在初始化和释放时的昂贵损耗。

# 三、线程池核心设计与实现

Java中的线程池核心实现类是ThreadPoolExecutor,还有一个工具类Excutors。本章基于JDK 1.8的源码来分析Java线程池的核心设计与实现。我们首先来看一下ThreadPoolExecutorUML类图,了解下ThreadPoolExecutor的继承关系。

UML类图 UML类图

ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器Executor中,由 Executor框架完成线程的调配和任务的执行部分。

ExecutorService接口增加了一些能力: 1)扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法;2)提供了管控线程池的方法,比如停止线程池的运行。

AbstractExecutorService 则是上层的抽象类,使用模板模式将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类ThreadPoolExecutor实现最复杂的运行部分。

ThreadPoolExecutor 维护自身的生命周期,同时管理线程和任务,使两者良好的结合从而执行并行任务。

# 四、线程池生命周期

线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部维护。线程池内部使用一个32位的整数维护两个值:运行状态runState和线程数量workerCount两个参数维护在一起,其中高 3位用于存放线程池状态,低29位表示线程数CAPACITY。如下代码所示:

// 状态 RUNNING 线程数 = 0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 这里 COUNT_BITS 设置为 29(32-3),意味着前三位用于存放线程状态,后29位用于存放线程数
private static final int COUNT_BITS = Integer.SIZE - 3;
//最大线程数是 2^29-1=536870911
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
//111 00000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;
// 000 00000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
//将CAPACITY取非后和c进行取与运算,可以得到高3位的值,即线程池的状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//将c和CAPACITY取与运算,可以得到低29位的值,即线程池的个数
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

ThreadPoolExecutor的运行状态有5种,其生命周期转换如下入所示:

ThreadPoolExecutor

【1】RUNNING:能接收新提交的任务,也能处理阻塞队列中的任务;
【2】SHUTDOWN:关闭状态,不再接收新提交的任务,但却可以继续处理阻塞队列中已保存的消息;
【3】STOP:不接受任务,也不处理阻塞队列中的消息,会中断正在执行的任务;
【4】TIDYING:所有的任务已终止,workerCount(有效线程数为0);
【5】TERMINATED:在terminated()方法执行完后进入该状态;

# 五、线程池处理任务的流程

ThreadPoolExecutor是如何运行,如何同时维护线程和执行任务的呢?线程池处理任务的流程图如下:

流程图

【从图中可以看出,当提交一个任务到线程池时,线程池处理流程如下】:
 1)、首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
 2)、线程池判断核心线程池里的线程数是否已达到上限,如果没有,就创建新线程执行任务。如果已达到上限,进入步骤3。
 3)、线程池判断工作队列是否已满。如果工作队列没满,就将任务存储在队列中。如果队列满了,则进入步骤4。
 4)、线程池判断线程池中的线程数是否已达上限,没有就创建新线程执行任务,如果已满则根据饱和策略处理此任务。默认的处理方式是直接抛异常。

ThreadPoolExecutor执行execute()方法的示意图,如下所示:

execute

ThreadPoolExecutor执行execute方法分下面4中情况:
1)、如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(执行需要获取全局锁)。
2)、如果运行的线程等于或大余corePoolSize,则将任务加入BlockingQueue
3)、如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(需要获取全局锁)
4)、如果创建新线程将使当前运行的线程超过maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecutor()方法。

TIP

ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。

# 六、源码分析

通过上面流程图的分析直观的了解了线程池的工作原理,下面就通过源码看看是如何实现的,方法如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
      // 如果当前的线程数小于corePoolSize
    if (workerCountOf(c) < corePoolSize) {
        // 调用addWorker新建一个线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 到这里说明,要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了
    // 校验当前线程状态是RUNNING,并将command入队
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 如果不是运行状态,那么移除队列,并执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果线程池还是 RUNNING 的,并且线程数为 0,那么开启新的线程
        // 防止任务提交到队列中了,但是线程都关闭了
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
      // 到这里说明队列已经满了,所以新建一个线程,如果新建的线程数已经超过了maximumPoolSize,那么执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

我们下面看一下addWorker是如何创建线程的:

代码中的retry: 就是一个标记,标记对一个循环方法的操作continuebreak处理点,功能类似于goto,所以retry一般都是伴随着for循环出现,retry:标记的下一行就是for循环,在for循环里面调用continue或者break再紧接着retry标记时,就表示从这个地方开始执行continue或者break操作

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // 获取当前线程池状态
        int rs = runStateOf(c);

        // 1.仅在必要时检查队列是否为空。
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
               //2. 校验传入的线程数是否超过了容量大小, 或者是否超过了corePoolSize或maximumPoolSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
              //到了这里说明线程数没有超,那么就用CAS将线程池的个数加1
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
              //3 说明有其他的线程抢先更新了状态,继续下一轮的循环,跳到外层循环
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
          //创建一个线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                  //4 如果线程是没有问题的话,那么将worker加入到队列中
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize 用于记录 workers 中的个数的最大值
                    // 因为 workers 是不断增加减少的,通过这个值可以知道线程池的大小曾经达到的最大值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
               //如果worker入队成功,那么启动线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
          //如果worker启动失败,那么就回滚woker线程创建的状态
        if (! workerStarted)
            addWorkerFailed(w);
    }
      // 返回线程是否启动成功
    return workerStarted;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76

这里主要是列举了几个条件不能创建新的worker的情况:
【1】线程池状态大于SHUTDOWN,其实也就是STOP,TIDYING, 或TERMINATED
【2】firstTask != null
【3】workQueue.isEmpty()如果线程池处于SHUTDOWN,但是firstTasknull,且workQueue非空,那么是允许创建worker的;

如果传入的core参数是true代表使用核心线程数corePoolSize作为创建线程的界限,也就说创建这个线程的时候,如果线程池中的线程总数已经达到corePoolSize,那么不能响应这次创建线程的请求;如果是false,代表使用最大线程数maximumPoolSize作为界限;

如果CAS失败并不是因为有其他线程在操作导致的,那么就直接在里层循环继续下一次的循环就好了,如果是因为其他线程的操作,导致线程池的状态发生了变更,如有其他线程关闭了这个线程池,那么需要回到外层的for循环;

如果是小于SHUTTDOWN那就是RUNNING,则继续往下继续,或者状态是SHUTDOWN但是传入的firstTask为空,代表继续处理队列中的任务

addWorkerFailed:将workers集合里面的worker移除,然后count减1,

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);
        decrementWorkerCount();
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

# 七、工作线程

线程池创建线程时,会将线程封装成工作线程WorkerWorker是继承AQS对象的,在创建Worker对象的时候会传入一个Runnable对象,并设置AQSstate状态为-1,并从线程工厂中新建一个线程。调用thread.start方法会调用到Workerrun方法中。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
      ....
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

Worker在执行完任务后,还会循环获取工作队列里的任务来执行。我们可以从Worker类的run()方法里看到这点。

public void run() {
    runWorker(this);
}
1
2
3

Workerrun方法会调用到ThreadPoolExecutorrunWorker方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
          //如果task为空,那么就从workQueue里面获取task
        while (task != null || (task = getTask()) != null) {
            w.lock();
              // 如果线程池状态大于等于 STOP,那么意味着该线程也要中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                  // 这是一个钩子方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                       //执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                       // 这是一个钩子方法
                    afterExecute(task, thrown);
                }
            } finally {
                   // 置空 task,准备 getTask 获取下一个任务
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
          //异常情况或getTask获取不到任务时会执行关闭
        processWorkerExit(w, completedAbruptly);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

传入一个Worker首先去校验firstTask是不是null,如果是那么就调用getTask方法从workQueue队列里面获取,然后判断一下当前的线程是否需要中断,如需要的话执行钩子方法,然后调用taskrun方法执行task; 如果while循环里面getTask获取不到任务的话,就结束循环调用processWorkerExit方法执行关闭;如果是异常原因导致的while循环退出,那么会调用processWorkerExit并传入为true

ThreadPoolExecutor中线程执行任务的示意图如下:

ThreadPoolExecutor

线程池中的线程执行分为两种情况,如下:
【1】在execute()方法中创建一个线程时,会让这个线程执行当前任务。
【2】这个线程完成1的任务后,会反复从BlockingQueue获取任务来执行。

Java线程池中的核心线程是如何被重复利用的】:看一下runWorker()方法的代码,有一个while循环,当执行完firstTasktask==null了,那么就会执行判断条件(task = getTask()) != null,我们假设这个条件成立的话,那么这个线程就不止只执行一个任务了,可以执行多个任务了,也就实现了重复利用了。答案呼之欲出了,接着看getTask()方法。

// 为分析而简化后的代码
private Runnable getTask() {
    boolean timedOut = false;
    for (;;) {
        int c = ctl.get();
        int wc = workerCountOf(c);

        // timed变量用于判断是否需要进行超时控制。
        // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
        // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
        // 对于超过核心线程数量的这些线程,需要进行超时控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if (timed && timedOut) {
            // 如果需要进行超时控制,且上次从缓存队列中获取任务时发生了超时,那么尝试将workerCount减1,即当前活动线程数减1,
            // 如果减1成功,则返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了
            if (compareAndDecrementWorkerCount(c))
            return null;
            continue;
        }

        try {
            Runnable r = timed ?
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();

            // 注意workQueue中的poll()方法与take()方法的区别
            //poll方式取任务的特点是从缓存队列中取任务,最长等待keepAliveTime的时长,取不到返回null
            //take方式取任务的特点是从缓存队列中取任务,若队列为空,则进入阻塞状态,直到能取出对象为止

            if (r != null)
            return r;
            timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

从以上代码可以看出,getTask()的作用:如果当前活动线程数大于核心线程数,当去缓存队列中取任务的时候,如果缓存队列中没任务了,则等待keepAliveTime的时长,此时还没任务就返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了。因此只要线程池中的线程数大于核心线程数就会这样一个一个地销毁这些多余的线程。如果当前活动线程数小于等于核心线程数,同样也是去缓存队列中取任务,但当缓存队列中没任务了,就会进入阻塞状态,直到能取出任务为止,因此这个线程是处于阻塞状态的,并不会因为缓存队列中没有任务了而被销毁。这样就保证了线程池有N个线程是活的,可以随时处理任务,从而达到重复利用的目的。

上面方法返回null有如下几种情况:
【1】当前状态是SHUTDOWN并且workQueue队列为空;
【2】当前状态是STOP及以上;
【3】池中有大于maximumPoolSizeworkers存在(通过调用setMaximumPoolSize进行设置);

processWorkerExit方法源码:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
      //如果是异常原因中断,那么需要将运行线程数减一
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
          //设置完成任务数
        completedTaskCount += w.completedTasks;
          //将worker从集合里移除
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
      //判断当前的线程池是否处于SHUTDOWN状态,判断是否要终止线程
    tryTerminate();

    int c = ctl.get();
      //如果是RUNNING或SHUTDOWN则会进入这个方法
    if (runStateLessThan(c, STOP)) {
          //如不是以外中断则会往下走
        if (!completedAbruptly) {
              //判断是否保留最少核心线程数
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
          //如果当前运行的Worker数比当前所需要的Worker数少的话,那么就会调用addWorker,添加新的Worker
        addWorker(null, false);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

【1】判断是否是意外退出的,如果是意外退出的话,那么就需要把WorkerCount–
【2】加完锁后,同步将completedTaskCount进行增加,表示总共完成的任务数,并且从WorkerSet中将对应的Worker移除;
【3】调用tryTemiate,进行判断当前的线程池是否处于SHUTDOWN状态,判断是否要终止线程;
【4】判断当前的线程池状态,如果当前线程池状态比 STOP大的话,就不处理;
【5】判断是否是意外退出,如果不是意外退出的话,那么就会判断最少要保留的核心线程数,如果allowCoreThreadTimeOut被设置为true的话,那么说明核心线程在设置的KeepAliveTime之后,也会被销毁;
【6】如果最少保留的Worker数为0的话,那么就会判断当前的任务队列是否为空,如果任务队列不为空的话而且线程池没有停止,那么说明至少还需要1个线程继续将任务完成;
【7】判断当前的Worker是否大于min,也就是说当前的 Worker总数大于最少需要的Worker数的话,那么就直接返回,因为剩下的Worker会继续从WorkQueue中获取任务执行;
【8】如果当前运行的Worker数比当前所需要的Worker数少的话,那么就会调用addWorker,添加新的Worker,也就是新开启线程继续处理任务;

# 八、拒绝策略

当线程池的线程数达到最大线程数时,需要执行拒绝策略。拒绝策略需要实现RejectedExecutionHandler接口,并实现rejectedExecution(Runnable r, ThreadPoolExecutor executor)方法。不过Executors框架已经为我们实现了4种拒绝策略:
【1】AbortPolicy(默认):丢弃任务并抛出RejectedExecutionException异常。及时反馈程序运行状态。如果是比较关键的业务,推荐使用该策略。这样在系统不能承载更大的并发量时,能够及时的通过异常发现。
【2】CallerRunsPolicy:由调用线程处理该任务。 这种情况是需要让所有任务都执行完毕,那么就适合大量计算的任务类型去执行,多线程仅仅是增大吞吐量的手段,最终必须让每个任务都执行完毕。 【3】DiscardPolicy:丢弃任务,但是不抛出异常。可以配合这种模式进行自定义的处理方式。可能会使我们无法发现系统的异常状态。建议一些无关紧要的业务采用此策略。
【4】DiscardOldestPolicy:丢弃队列最早的未处理任务,然后重新尝试执行任务。

# 九、线程池业务选型

【1】快速响应类型: 对于业务的并行操作,服务追求的时响应时间。比如客人信息和机票信息等聚合展示。从客户体验角度而言,响应越快越好,所以不应该设置队列去缓冲并发任务,调高corePoolSizemaxPoolSize去尽可能创造多的线程快速执行任务。
【2】快速处理批量任务: 离线的大量任务,需要快速的执行,比如数据迁移/清洗工作。但与响应速度优先的场景区别在于,这类场景任务量巨大,并不需要瞬时完成,而是关注如何使用有限的资源,在单位时间内处理更多的任务,也就是吞吐量优先的问题。所以应该设置队列去缓冲并发任务,调整合适的corePoolSize,如果设置线程数过多可能引发线程上下文切换频繁的问题,反而降低处理任务的速度和吞掉量。

# 十、动态化线程池

线程池执行情况和任务类型相关性较大,IO密集型和CPU密集型的任务运行起来的情况差异非常大,业界没有一些成熟的经验策略帮助开发人员参考。当调用量发生变化时,导致最大核心数设置偏小,大量抛出RejectedExecutorException触发服务降级条件。或者因为队列设置过长,最大线程数失效,导致执行时间过长,最总导致下游服务的大量调用超时失败等问题。

动态化线程池功能如下:
【1】动态调参: 线程池构造参数有8个,但是最核心的是3个:corePoolSizemaximumPoolSizeworkQueue,它们最大程度地决定了线程池的任务分配和线程分配策略。并发性的场景主要分为两种:并行执行子任务,提高响应速度并行执行大批次任务,提升吞吐量,这种情况会使用有界队列去缓冲大批量的任务。所以线程池只需要提供这三个关键参数的配置,并且提供两种队列的选择等,参数修改后及时生效。
【2】任务监控: 支持应用粒度、线程池粒度、任务粒度的Transaction监控,可以看到线程池的任务执行情况、最大任务执行时间、平均任务执行时间、95/99线等。
【3】负载告警: 线程池任务队列积压到一定值的时候会通过Trip告知应用开发负责人。
【4】操作监控: 创建/修改和删除线程池都会通知到应用开发负责人。
【5】操作日志: 可以查看线程池的修改记录,具体的修改时间和修改前后的参数等。
【6】权限校验: 只有应用负责人才能够修改应用的线程池配置。

参数动态化: JDK原生线程池ThreadPoolExecutor提供了几个setter方法;

setCorePoolSize(int): void
setKeepAliveTime(long, TimeUnit): void
setMaximumPoolSize(int): void
setRejectedExecutionHandler(RejectedExecutionHandler): void
setThreadFactory(ThreadFactory): void
1
2
3
4
5

JDK允许通过ThreadPoolExecutor的实例来动态设置线程池的核心策略,当线程池运行期间调用setCorePoolSize方法时,线程池会直接覆盖原来的corePoolSize值,并且基于当前值和原始值的比较结果采取不同的处理策略。
【1】当前值 < 原始值: 说明有多余的worker线程,此时会向当前idleworker线程发起中断请求,实现回收,多余的worker在下次idel的时候也会被回收。
【2】当前值 > 原始值: 判断阻塞队列中是否有待执行的任务,有则创建新的worker线程来执行任务,没有则暂不创建worker

线程池内部会处理好当前状态做到平滑修改,基于上述的public方法,我们只需要维护ThreadPoolExecutor的实例,并且在需要修改的时候拿到实例修改其参数即可。我们搭建了“配置管理平台”可以修改和配置线程池参数,页面如下:

线程池

用户可以在管理平台上通过线程池的名字找到指定的线程池,然后对其参数进行修改,保存后会实时生效。

# 十一、线程池监控

为了更好地使用线程池,需要对线程池的运行状况有感知,比如线程池的任务执行情况怎么样?是长任务还是短任务?长任务是否可以使用虚拟线程进行替换改造?分配的资源够不够用?基于对这些问题的思考,动态化线程池提供了多个维度的监控和告警能力,包括:线程池活跃度、任务的执行Transaction(频率、耗时)、Reject异常、线程池内部统计信息等,既能多维度分析线程池,又能第一时间通知用户,从而避免故障或加速故障恢复。

【1】负载监控和告警: 线程池负载关注的核心问题是基于当前线程池参数分配的资源够不够。来让用户在发生Reject异常之前能够感知线程池的负载问题,线程池的活跃度=activeCount/maximumPoolSize。代表当前活跃度线程数趋于maximumPoolSize的时候,代表线程负载趋高。也可以从发生Reject异常,或者阻塞队列中的等待任务超过阈值的时候发生触发告警,告警信息通过Trip等推送给项目负责组织。告警信息如下:

线程池告警:  
应用AppId: 1000xxxx
告警原因:activeCount/maximumPoolSize值为100触达活跃告警阈值80
          线程池中出现 RejectedExecutionException(66)
线程池参数:
[poolName]: RebookInitQuery
[corePoolSize]: 4
[maximumPoolSize]: 8
[poolSize]: 8
[activeCount]: 8
[queueType]: SynchronousQueue
[queueCapacity]: 0
[queueSize]: 0
[queueRemainingCapacity]: 0
[completedTaskCount]: 0
[largestPoolSize]: 5
[rejectCount]: 45
[线程池配置链接]:httpxxxxxxxxx
[业务负责人]: xxx
[告警间隔时间] 5分钟
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

【2】任务级精细化监控: 传统的线程池的任务执行情况对于用户来说是透明的,也就是说当一个线程池同时执行两种任务时,实际执行的频率和时长对于用户来说没有一个直观的感觉,很可能这两类任务不适合共享一个线程池,但是由于用户无法感知,因此也无法优化。动态化线程池内部实现了任务级别的埋点,且允许为不同的业务任务指定具有业务含义的名称,线程池内部基于这个名称做Transaction打点,基于这个功能,用户可以看到线程池内部任务级别的执行情况,且区分业务,任务监控示意图如下:

           Name        Total   Failure   Failure%  Log View  Max  Avg  90Line  95Line  99Line
[:show::]  TOTAL        1,787       0    0.0000%    L  S     102  31.2   0.0     0.0     0.0
[:show::]  XXXX[任务名]   627        0    0.0000%    L  S     102  45.6  54.2    60.0    75.2
[:show::]  XXXX[任务名]   582        0    0.0000%    L  S      87  34.2  48.1    54.2    69.3
[:show::]  XXXX[任务名]   578        0    0.0000%    L  S      34   5.8   3.2     6.1    10.4
1
2
3
4
5

【3】运行时状态实时常看: 通过ThreadPoolExecutor提供的原生getter方法,可以读取到当前线程池的运行状态以及参数:

getActiveCount(): int
getCompletedTaskCount(): long
getCorePoolSize(): int
getKeepAliveTime(TimeUnit): long
getLargestPoolSize(): int
getMaximumPoolSize(): int
getPoolSize(): int
getQueue(): BlockQueue<Runnable>
getRejectExecutionHandler(): RejectedExecutionHandler
getTask(): Runnable
getTaskCount(): long
getThreadFactory(): ThreadFactory
1
2
3
4
5
6
7
8
9
10
11
12

动态化线程池基于这几个接口封装了运行时状态实时查看的功能,用户基于这个功能可以了解线程池的实时状态,比如当前线程有多少个工作线程,执行了多少个任务,队列中等待的任务数等等。

线程池

成本与收益:成本在于实现动态化以及监控的成本不高,收益在于不改变原有线程池使用方式的基础上,从降低线程池参数修改的成本以及多维度监控这两个方面降低了故障发生的概率。

# 十二、线程池的种类

除了自定义之外,线程池为我们也提供了7种常见的线程池,可以用于调试等使用,不建议直接使用,因为大多数阻塞队列都是无界的,容易OOM

【1】newSingleThreadExecutor():它的特点在于工作线程数目被限制为1,操作一个无界的工作队列,所以它保证了所有任务的都是被顺序执行,最多会有一个任务处于活动状态,并且不允许使用者改动线程池实例,因此可以避免其改变线程数目;
【2】newCachedThreadPool():它是一种用来处理大量短时间工作任务的线程池,具有几个鲜明特点:它会试图缓存线程并重用,当无缓存线程可用时,就会创建新的工作线程。如果线程闲置的时间超过60秒,则被终止并移出缓存。长时间闲置时,这种线程池,不会消耗什么资源。其内部使用SynchronousQueue作为工作队列;
【3】newFixedThreadPool(int nThreads):重用指定数目nThreads的线程,其背后使用的是无界的工作队列,任何时候最多有nThreads个工作线程是活动的。这意味着,如果任务数量超过了活动队列数目,将在工作队列中等待空闲线程出现。如果有工作线程退出,将会有新的工作线程被创建,以补足指定的数目nThreads;
【4】newSingleThreadScheduledExecutor():创建单线程池返回ScheduledExecutorService,可以进行定时或周期性的工作调度;
【5】newScheduledThreadPool(int corePoolSize)newSingleThreadScheduledExecutor()类似创建的是个ScheduledExecutorService,可以进行定时或周期性的工作调度,区别在于单一工作线程还是多个工作线程;
【6】newWorkStealingPool(int parallelism):这是一个经常被人忽略的线程池,Java8才加入这个创建方法,其内部会构建ForkJoinPool,利用Work-Stealing算法,并行地处理任务,不保证处理顺序;
【7】ThreadPoolExecutor():是最原始的线程池创建,上面1-3创建方式都是对ThreadPoolExecutor的封装。

#

(adsbygoogle = window.adsbygoogle || []).push({});