背景
最近在面试找工作吧,关于线程池的问题被面试官问得还是蛮多。但是发现大多面试官也问不出啥来,大都会问有哪几个核心参数,自己拉吧拉吧的讲12345个参数,然后就没然后了(就下一个问题了)。但不排除有些面试官确实可以,会针对你的理解,问你一些稍微源码级别或者给你设计一些场景给你回答(个人还是比较喜欢这种)。
核心参数
- corePoolSize 核心线程数
- maximumPoolSize 最大线程数
- keepAliveTime 线程空闲时间
- workQueue 任务缓冲队列
- RejectedExecutionHandler 拒绝策略,默认抛出异常(其他包括丢弃队列中旧的任务、丢弃当前任务、使用当前线程执行任务)
值得思考的问题
- keepAliveTime参数有什么用,体现在哪里?
- corePoolSize核心线程数内创建的线程会被回收?
- 我们知道当工作线程(worker)数大于等于 corePoolSize且缓存队列满了之后,会根据maximumPoolSize进行判断,如果小于,则开启一个线程去执行我们的任务。那有没有办法不用丢弃任何任务,也不用当前线程去执行呢?
- 为什么Worker要继承AbstractQueuedSynchronizer,其作用在哪里?
- shutdown()和shutdownNow()区别在哪里?
想到生活中的一些事
某外包公司老板最近接到一个小项目,项目经理(称A)把需求整理好,拆解任务,跟老板汇报保守要10个开发人员。嗯,二话不说立刻招人,干活。某一天,甲方提出新需求,A掐指一算,任务有点多,目前的开发都还在忙着,安排不上了,哎,先拆解任务存入需求库把。果然是甲方爸爸,没过几天,又提出新的需求,A也忍不住骂mmp了,因为需求库满了,只能跟老板汇报,老板摸了下头,当前开发人员还在我最大能接受(上限20个开发)范围内,二话不说立刻招人,干活。此时,老板跟A墨迹了下,如果继续提新需求,招的开发人员比我最大能接受数量还要多,直接跟他们摊牌把。
其实老板也有自己得小心思,招20个开发,也不是长久之计(程序员动不动就2w起步),等项目需求都做完了,把几个核心得留下就行了,裁掉那些几天都没活干的。
以上几个加粗文字,勉强能跟线程池的核心参数对得上把(大家自行对号入座哈),然后呢,针对此故事回答下前面得几个问题
回到前面的问题
- keepAliveTime:主要是用来回收那些空闲的线程,当线程创建之后,执行完任务,就立刻去队列中取任务,如果在指定时间内没有任务可取(也就是就是每个开发人员做完手上的任务,就要去需求库中领取任务,一直重复此操作),该线程就会被回收掉。当然,这个有一定的条件,比如:允许核心线程数超时或者工作线程数是大于配置的核心线程数。
- corePoolSize:核心线程数内创建的线程和后面创建的线程是一样的,在线程池中并没有所谓固定的核心线程。每个线程执行完任务,接着从队列中拉一个任务出来继续执行,当任务没有的时候,线程领取不到就会被回收掉(也就是你那10个员工先入职,不代表你可以永远在公司就职,如果后来的员工比你更能干活积极,照样把你开掉的)。
- 这里主要考察拒绝策略。我们可以自定义一个拒绝策略继续将我们的任务放到线程池,比如:新建一个队列用来存储那些触发拒绝策略的任务,再开一个线程从队列中取任务出来重新丢到线程池中。
- 有两个地方,需要使用worker进行加锁。分别是runWorker()和shutdown()函数。个人理解是执行runWorker函数时,如果取到任务时候加锁,作者不希望因为执行shutdown(主要是给工作线程打中断标记)而对线程正在执行的任务有任何影响(虽然给线程打中断标记,不会对线程有绝对的影响,主要还是看开发者怎么处理,这一点可具体看 interrupt 相关知识)。
- 执行shutdown(),会将线程池状态修改为SHUTDOWN,并且给工作线程打中断标记,此时不会接收新的任务,但,如果队列已有任务,则,会继续执行的。
执行shutdown(),线程池对应的状态是STOP,并强制给所有工作线程打中断标记,此时,同样不会接收新任务,如果队列中有任务,会把任务移除掉,不会执行。
总体来说,这两个函数对正在被执行的任务是没有影响的(这里排除你对中断异常做了其他处理)。
针对以上问题,我们分别看下源码:
线程执行逻辑:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//领取任务,领得到进入while逻辑,否则,处理后面回收的逻辑
//从这里可看出,没有所谓的固定核心线程,全靠抢,抢到就继续执行,抢不到就销毁
while (task != null || (task = getTask()) != null) {
//取到任务,加锁
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//这里是我们具体的业务逻辑了
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//处理回收线程逻辑
processWorkerExit(w, completedAbruptly);
}
}
获取队列中的任务逻辑:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//(1) allowCoreThreadTimeOut 默认是false
//主要看 wc > corePoolSize;
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//(3)按理,如果此时没有任务加进来,这个条件是满足的,最终会走到 return null 返回,结束无限遍历
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//(2)当timed为true, 并且在指定时间keepAliveTime找不到任务会返回一 个 null
//由于在for(;;)内,会继续遍历回到(3)处
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
shutdown逻辑,处理线程池状态为SHUTDOWN,遍历工作线程,然后获取锁,成功则打中断标记
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//这里同样是尝试获取worker的锁,意味着如果线程正在执行任务,
//那会阻塞在这里的,也就是无法打标记
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
shutdownNow逻辑,处理线程池状态为STOP。遍历工作线程,给线程打中断标记。移除队列中的任务,并返回。
void interruptIfStarted() {
Thread t;
//这里跟shutdown不一样,,有点强制的意思
//因为getState() >= 0 基本能满足,尽管线程已经在执行任务了
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
//移除队列中的元素
q.drainTo(taskList);
if (!q.isEmpty()) {
//这里,主要是再次确认队列中是否还有元素,确保做到不留一个活口哈。
//因为有可能在q.drainTo(taskList)期间,用户线程继续往线程池丢任务。
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
写到最后
线程池不管平时工作还是面试,出现的频率还是比较多的。如果平时注意积累,使用时即可信手拈来。另外,以上仅是本人对线程池的初步了解,若有不对的地方,欢迎指出哈。