服务器利用线程技术响应客户请求已经很常见,这的确提高了服务器的响应处理能力,但通常服务器端并不会为每个客户端请求分配一个新的线程对象,在大量的客户端的请求下,这种处理方式将会创建大量的线程来处理请求,这对内存和CPU都是很不利的,于是需要一种更合理的线程技术,这就是线程池技术。在日常开发中,也或多或少会接触到线程池,比较Servlet容器处理浏览器请求的线程池,数据库连接池等,本文将理解一番JDK本身的线程池(ThreadPool)实现,与其他线程池相比,也会大同小异。
线程池,从字面含义来看,是指管理一组同构工作线程的资源池。其由工作线程(Worker Thread),任务队列(Task Queue),任务(Task)等核心组件组成,如下图所示:
合理利用线程池能够带来三个好处:
JDK中通过ThreadPoolExecutor来抽象通用线程池,可看其继承树:
Executor接口定义了提交任务(Submit Task)的功能,任务(Task)是一个Runnable,Executor需要实现使用何种策略执行任务,可以通过Runnable.run()同步执行,或者通过new Thread(runnbale).start()异步执行,又或者通过某些调度算法Runnable等,并且Executor也保证了一定的内存语义:提交Runnable之前的动作Happens-Before该Runnable的开始执行。
public interface Executor {
/**
* 提交任务
*/
void execute(Runnable command);
}
ExecutorService对Executor进行了一些扩展,增加了一些管理任务执行的功能,如终止,跟踪任务等:
public interface ExecutorService extends Executor {
/**
* 关闭线程池服务,不再接受新的任务,并且不会等待之前的任务执行完
*/
void shutdown();
/**
* 尝试停止所有正在执行和等待执行的任务,并且不会等待之前的任务执行完
* @return 正在等待执行的任务列表
*/
List<Runnable> shutdownNow();
/**
* 线程池是否已经关闭
*/
boolean isShutdown();
/**
* 线程池关闭后,是否所有任务都已完成
*/
boolean isTerminated();
/**
* 关闭线程池后,等待所有任务完成
* @param timeout 最大等待时间
* @param unit 时间单位
* @return 若等待到所有任务已完成,返回true,若等待超时,返回false
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 提交一个Callable任务,可以获取返回值
*/
<T> Future<T> submit(Callable<T> task);
/**
* 提交一个Runnable任务,可以获取返回值
*/
<T> Future<T> submit(Runnable task, T result);
/**
* 提交一个Runnable任务,可以获取返回值
*/
Future<?> submit(Runnable task);
/**
* 执行一些列任务(完成的任务可以是正常执行完或抛出异常)
* @return 结果集
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/**
* 执行一些列任务,超时后未完成的任务将被取消
* @return 结果集
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 执行一些列任务
* @return 返回其中一个完成的任务结果,未完成的任务将被取消
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/**
* 执行一些列任务,超时后若没有一个任务执行完会抛出TimeoutException
* @return 返回其中一个完成的任务结果,未完成的任务将被取消
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ExecutorService中出现两个新的接口,Callable和Future。Callable类似于Runnable,但其可以获取执行的结果。Future代表了一种异步执行的结果,可以对执行做取消,检查,获取结果等操作。
AbstractExecutorService作为ExecutorService的默认实现,实现了submit,invokeAny,invokeAll等方法,并且提供了构造任务的方法newTaskFor,最终通过FutureTask来抽象线程池中的任务。
public abstract class AbstractExecutorService implements ExecutorService {
/**
* 创建一个任务
* @param runnable Runnable任务
* @param 默认值
* @return 新任务
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
/**
* 创建一个任务
* @param callable Callable任务
* @return 新任务
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
/**
* 提交任务
*/
public Future<?> submit(Runnable task) {
if (task == null) {
throw new NullPointerException();
}
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* invokeAny的主要逻辑
*/
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null){
throw new NullPointerException();
}
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
// 构造一个ExecutorCompletionService对象:
// 1. 该CompletionService内部包装了当前的Executor对象,用于执行任务
// 2. 并且通过LinkedBlockingQueue对象保存已经完成的任务
// 3. 在提交任务时,将创建的RunnableFuture有包装为QueueingFuture,QueueingFuture会在任务完成时,将该RunnableFuture添加到LinkedBlockingQueue
ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);
try {
// 记录异常
ExecutionException ee = null;
long lastTime = timed ? System.nanoTime() : 0;
Iterator<? extends Callable<T>> it = tasks.iterator();
// 先提交一个任务
futures.add(ecs.submit(it.next()));
// 待提交的任务数
--ntasks;
// 已提交的任务
int active = 1;
for (;;) {
Future<T> f = ecs.poll();
// 没有完成的任务,增量提交一个
if (f == null) {
if (ntasks > 0) {
// 还有任务未提交
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
} else if (active == 0){
// 已经有完成的任务正在等待返回结果
break;
} else if (timed) {
// 有超时限制
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
} else {
f = ecs.take();
}
}
// 已经有完成的任务,其结果
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
// 取消所有任务
for (Future<T> f : futures)
f.cancel(true);
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
// 轮询提交任务
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
// 轮询获取任务结果
for (Future<T> f : futures) {
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) throws InterruptedException {
if (tasks == null || unit == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks){
futures.add(newTaskFor(t));
}
long lastTime = System.nanoTime();
Iterator<Future<T>> it = futures.iterator();
while (it.hasNext()) {
execute((Runnable)(it.next()));
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
if (nanos <= 0){
// 超时直接返回futures
return futures;
}
}
for (Future<T> f : futures) {
if (!f.isDone()) {
if (nanos <= 0){
// 超时直接返回futures
return futures;
}
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
// 超时直接返回futures
return futures;
}
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
}
}
done = true;
return futures;
} finally {
if (!done){
// 未完成则取消任务
for (Future<T> f : futures){
f.cancel(true);
}
}
}
}
}
ThreadPoolExecutor作为基础的线程池实现,通过线程池中的某个线程来执行任务。其不但提升了在执行大量异步任务时的性能,还提供了管理资源的手段。ThreadPoolExecutor中有几个很关键的属性,用于调优线程池行为:
属性名 | 描述 |
---|---|
corePoolSize | 线程池基本大小。当提交一个新任务到线程池时,只要线程池当前工作线程数小于corePoolSize,线程池就会创建新的工作线程来执行该任务(即使有其他空闲的工作线程能够执行该任务)。默认情况下,线程池只会在有任务提交的情况才会创建工作线程,不过可以通过prestartCoreThread()或prestartAllCoreThreads()事先创建好基本的工作线程。 |
maximumPoolSize | 线程池最大大小。当线程池的任务队列是有界的时,如果任务队列满了,并且已创建的工作线程数小于maximumPoolSize,则线程池会再创建新的工作线程执行新任务。对于无界的任务队列,该参数无效。 |
keepAliveTime | 空闲工作线程存活时间。如果线程池当前的工作线程数超过corePoolSize,超出的这些工作线程若处于空闲的时间大于keepAliveTime,这些线程将被销毁,以释放不必要的资源。另外,如果设置了allowCoreThreadTimeOut为true,那么基本大小内的工作线程在空闲时间大于keepAliveTime后,也会被销毁。 |
workQueue |
任务队列。用于保存等待执行的任务的阻塞队列,通常有几种常用的阻塞队列:
ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)排序元素。
LinkedBlockingQueue:是一个基于链表结构的的无界阻塞队列,此队列按FIFO(先进先出)排序元素。
SynchronousQueue:一个不存储元素的阻塞队列每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。
PriorityBlockingQueue:一个具有优先级的无界阻塞队列。
|
ThreadFactory | 用于设置创建线程的工厂:可以通过线程工厂给每个创建出来的线程设置更有意义的名字,这样有利于开发人员进行必要时排查跟踪。 |
RejectedExecutionHandler |
拒绝任务的策略处理器:当线程池中的工作线程数大于maximumPoolSize且任务队列已满时,线程池将不再继续执行新提交的任务,而是交由RejectedExecutionHandler处理。ThreadPoolExecutor中提供了4种简单的策略:
AbortPolicy:直接抛出异常。
CallerRunsPolicy:使用调用者所在线程来运行任务。
DiscardOldestPolicy:丢弃队列里最旧的一个任务,并执行当前任务。
DiscardPolicy:直接丢弃掉提交的任务。
|
ThreadPoolExecutor中有几个重要的状态,如下图:
比较巧妙的是,ThreadPoolExecutor的实现中,通过一个int字段来保存了workerCount和线程池状态:
// 用于保存线程池状态和工作线程数int变量,
// 线程池初始为RUNNING状态,0个工作线程
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 用29位表示工作线程数,能达到500百万个线程
private static final int COUNT_BITS = Integer.SIZE - 3;
// 00011111 11111111 11111111 11111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// ====== 线程池状态枚举,可见其是单调递增的 ======
// 11111111 11111111 11111111 11111111 << 29 = 11100000 00000000 00000000 00000000
private static final int RUNNING = -1 << COUNT_BITS;
// 00000000 00000000 00000000 00000000 << 29 = 00000000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 00000000 00000000 00000000 00000001 << 29 = 00100000 00000000 00000000 00000000
private static final int STOP = 1 << COUNT_BITS;
// 00000000 00000000 00000000 00000010 << 29 = 01000000 00000000 00000000 00000000
private static final int TIDYING = 2 << COUNT_BITS;
// 00000000 00000000 00000000 00000011 << 29 = 01100000 00000000 00000000 00000000
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取线程池当前状态
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
// 获取线程池工作线程数
private static int workerCountOf(int c) {
return c & CAPACITY;
}
// 获取int变量值
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
// c状态是否在s状态之前
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// c状态是否在s状态之后
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 线程池是否处于运行状态,即小于SHUTDOWN
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 增加工作线程数
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* 减少工作线程数
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* 减少工作线程数,仅当一个线程突然终止时调用,可见processWorkerExit()
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
ThreadPoolExecutor中其它一些重要属性:
/**
* 任务队列
*/
private final BlockingQueue<Runnable> workQueue;
/**
* 访问工作线程和相关统计操作时,使用到的锁
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 工作线程集,仅当获取到mainLock时可访问
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* awaitTermination()时的等待条件
*/
private final Condition termination = mainLock.newCondition();
/**
* 跟踪线程池达到的最大工作线程数(仅在获取到mainLock后可访问)
*/
private int largestPoolSize;
/**
* 统计完成的任务数,仅当工作线程终止后更新(仅在获取到mainLock后可访问)
*/
private long completedTaskCount;
/**
* 线程工厂,用于创建Worker
*/
private volatile ThreadFactory threadFactory;
/**
* 任务拒绝策略处理器
*/
private volatile RejectedExecutionHandler handler;
/**
* 超过corePoolSize的线程的空闲存活时间
*/
private volatile long keepAliveTime;
/**
* 基本线程是否会空闲超时
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* 线程池基本大小
*/
private volatile int corePoolSize;
/**
* 线程池最大线程数
*/
private volatile int maximumPoolSize;
/**
* 默认的任务拒绝策略处理器
*/
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
/**
* 调用shutdown和shutdownNow时,所需要的权限
*/
private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
ThreadPoolExecutor实现了execute()方法,完成任务的提交逻辑:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1. 如果线程数小于corePoolSize,则创建新的Worker,将该任务作为Worker的初始任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 线程池处于运行中,且新任务入队成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command)){
// 线程池未运行,则拒绝任务
reject(command);
} else if (workerCountOf(recheck) == 0){
// 线程池处于运行,且工作线程数为0
addWorker(null, false);
}
// 3. 若队列已满,尝试创建新的Worker
} else if (!addWorker(command, false)){
// 线程池已经关闭,或数量已达到maximumPoolSize,则拒绝任务
reject(command);
}
}
在ThreadPoolExecutor中,将工作者线程抽象为Worker。Worker实现了Runnable,即将作为线程启动,同时也继承了AbstractQueuedSynchronizer, AbstractQueuedSynchronizer 是一个基于队列的同步器,其作为Java并发的核心基础类,这里先简单知道Worker是一个同步器,具有非重入的互斥锁的功能:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
/**
* 当前Worker所使用的Thread,由ThreadFactory创建
*/
final Thread thread;
/**
* 初始任务,可以为null
*/
Runnable firstTask;
/**
* 完成的任务数
*/
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 禁止中断,直到runWorker
setState(-1);
// 初始任务
this.firstTask = firstTask;
// 创建工作线程
this.thread = getThreadFactory().newThread(this);
}
/**
* 启动Worker
*/
public void run() {
runWorker(this);
}
// 以下实现AbstractQueuedSynchronizer
// 0 解锁状态.
// 1 加锁状态.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// 中断工作线程
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
看看Worker如何被创建:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 线程池当前状态
int rs = runStateOf(c);
// 下面3个条件中一个满足,则拒绝创建新的Worker
// 1. 线程池处于STOP,TIDYING或TERMINATED
// 2. 线程池状态 >= SHUTDOWN,且有新任务提交(firstTask != null)
// 3. 线程池状态 >= SHUTDOWN,没有新任务且任务队列为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty())) // rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
return false;
// 线程池处于RUNNING
for (;;) {
// 线程池线程数
int wc = workerCountOf(c);
// 超出容量,或超过了基本大小/最大大小,则拒绝创建新的Worker
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 利用CAS增加线程池线程数
if (compareAndIncrementWorkerCount(c))
break retry; // 跳出retry
c = ctl.get();
// 线程池状态发生,重新执行retry
if (runStateOf(c) != rs)
continue retry;
// 由于workerCount发生改变,导致compareAndIncrementWorkerCount失败,再次循环执行
}
}
// 线程池线程数+1完成
// Worker是否已启动
boolean workerStarted = false;
// Workder是否已添加
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
// 创建工作线程
w = new Worker(firstTask);
final Thread t = w.thread;
// 线程创建成功
if (t != null) {
// 下面访问workers之前,需先持有mainLock锁
mainLock.lock();
try {
// 在锁住之后再重新检测一下状态
int c = ctl.get();
int rs = runStateOf(c);
// 1. 线城池处于RUNNING状态
// 2. 线程池处于SHUTDOWN状态,且没有新任务提交(firstTask = null)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 线程已激活
if (t.isAlive())
throw new IllegalThreadStateException();
// 添加Worker
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 释放mainLock锁
mainLock.unlock();
}
if (workerAdded) {
// 启动Worker
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted){
// 启动工作线程失败时
// 1. 移除workers.add(w)此处添加的workder
// 2. 减少worker数量
// 3. 尝试中止线程池
addWorkerFailed(w);
}
}
return workerStarted;
}
当创建好Worker后,即启动Worker,Worker本质是一个Runnable,其运行逻辑由自己实现:
// Worker.run()
public void run() {
runWorker(this);
}
// ThreadPoolExecutor.runWorker()
final void runWorker(Worker w) {
// 获取当前工作线程
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
// 置空工作线程的初始任务
w.firstTask = null;
// 允许工作线程被中断
w.unlock();
// 标志该Worker是否异常中断
boolean completedAbruptly = true;
try {
// 初始任务不为空,或尝试获取一个任务
while (task != null || (task = getTask()) != null) {
// 锁住当前工作线程
w.lock();
// 以下情况会中断工作线程:
// 1. 线程池状态 >=STOP
// 2. 线程池还处于RUNNING或SHUTDOWN状态,且当前线程已经被中断,重新检查一下线程池状态,若处于STOP状态并且当前线程没有被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 中断工作线程
wt.interrupt();
try {
// 执行任务前回调
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
// 执行任务后回调
afterExecute(task, thrown);
}
} finally {
task = null;
// 增加该工作线程完成的任务数
w.completedTasks++;
// 解锁该工作线程
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 处理工作线程销毁的逻辑
// 1. 更新线程池完成的任务数
// 2. 减少线程池Worker数
// 3. 移除Worker
processWorkerExit(w, completedAbruptly);
}
}
再看看Worker如何去获取一个任务:
/**
* 该方法返回null,意味着该Worker要正常退出,有几种情况:
* 1. 由于调用了setMaximumPoolSize(),导致worker数超过maximumPoolSize;
* 2. 线程池状态处于STOP
* 3. 线程池处于SHUTDOWN,且任务队列为空
* 4. 获取任务超时
*/
private Runnable getTask() {
// 是否超时
boolean timedOut = false;
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 线程池状态处于STOP 或
// 线程池处于SHUTDOWN,且任务队列为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// Worker正常退出时,在这里减少Worker数,非正常退出时,在processWorkerExit()中减少Worker数
decrementWorkerCount();
return null;
}
// 标志从该Worker是否有可能被回收
boolean timed;
for (;;) {
// 获取Worker数
int wc = workerCountOf(c);
// 两种情况下,标志该Worker有可能被回收:
// 1. 允许超时回收core线程;
// 2. 线程数已超过corePoolSize
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
// 获取任务超时,减少Worker数
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get();
// 线程池状态发生改变,重试
if (runStateOf(c) != rs)
continue retry;
// 由于workerCount发生改变,导致compareAndIncrementWorkerCount失败,再次循环执行
}
try {
// 从任务队列获取任务,有可能回收则用poll超时获取任务,否则用take永久等待任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null){
// 返回获取的任务
return r;
}
// 获取任务超时
timedOut = true;
} catch (InterruptedException retry) {
// 被中断,不算超时
timedOut = false;
}
}
}
看看Worker是如何被回收的:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// Worker异常中断时,减少Worker数
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 统计线程池完成的任务数
completedTaskCount += w.completedTasks;
// 移除Worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 线程池尝试终止自己
tryTerminate();
// 下面尝试是否需要创建新的Worker
int c = ctl.get();
// 若线程池状态 < STOP
if (runStateLessThan(c, STOP)) {
// Worker正常退出
if (!completedAbruptly) {
// 线程池应该保持的最小Worker数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
// 没有必要新创建Worker
return;
}
// 有必要创建Worker
addWorker(null, false);
}
}
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 三种情况不用终止线程池
// 1. 线程池处于RUNNING状态
// 2. 线程池状态 >= TIDYING,即已经调用过tryTerminate(),并调用过ctl.compareAndSet(c, ctlOf(TIDYING, 0))
// 3. 线程池处于SHUTDOWN,且任务队列不为空,这时需要等待任务队列中的任务执行完
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// Worker数不为0
if (workerCountOf(c) != 0) {
// 尝试中断一个空闲线程
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 设置线程池状态为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 回调子类terminated()方法
terminated();
} finally {
// 设置线程池状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 通知所有等待条件
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// ctl.compareAndSet(c, ctlOf(TIDYING, 0))时失败,则循环重试
}
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 在runWorker()中,每当Worker执行任务前会先lock自己,只有任务执行完在unlock自己(闲置自己),期间处于空闲
if (!t.isInterrupted() && w.tryLock()) {
try {
// 中断工作线程
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
开发人员可以通过shutdown()或shutdownNow()来关闭线程池:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 线程池状态置为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断空闲Worker
interruptIdleWorkers();
// 回调,ScheduledThreadPoolExecutor会用
onShutdown();
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 线程池状态置为STOP
advanceRunState(STOP);
// 中断所有Worker
interruptWorkers();
// 获取任务队列中的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
return tasks;
}
以上则是线程池基本的工作原理,主要包括任务提交,启动工作线程,回收工作线程,线程池状态维护等。
日常开发中使用线程池时,常常使用Executors来创建自己的线程池,Executors提供了几个方便的工厂方法:
线程池类型 | 描述 |
---|---|
newFixedThreadPool |
|
newSingleThreadExecutor |
|
newCachedThreadPool |
|
newScheduledThreadPool |
|
在使用线程池时,还有一个比较关键的地方是配置合理的线程池大小,通常会有一些基本原则:
当然,上面只是一些原则性问题,实际生产是应该作一些测试来进行更精确地验证得。
以上则是Java线程池相关的部分实现。明白了实现细节,不仅有利于对线程池运行机制有所理解,也会其他线程池的工作原理能触类旁通。