背景
最近在面试找工作吧,关于线程池的问题被面试官问得还是蛮多。但是发现大多面试官也问不出啥来,大都会问有哪几个核心参数,自己拉吧拉吧的讲12345个参数,然后就没然后了(就下一个问题了)。但不排除有些面试官确实可以,会针对你的理解,问你一些稍微源码级别或者给你设计一些场景给你回答(个人还是比较喜欢这种)。
核心参数
- corePoolSize 核心线程数
- maximumPoolSize 最大线程数
- keepAliveTime 线程空闲时间
- workQueue 任务缓冲队列
- RejectedExecutionHandler 拒绝策略,默认抛出异常(其他包括丢弃队列中旧的任务、丢弃当前任务、使用当前线程执行任务)
值得思考的问题
- keepAliveTime参数有什么用,体现在哪里?
- corePoolSize核心线程数内创建的线程会被回收?
- 我们知道当工作线程(worker)数大于等于 corePoolSize且缓存队列满了之后,会根据maximumPoolSize进行判断,如果小于,则开启一个线程去执行我们的任务。那有没有办法不用丢弃任何任务,也不用当前线程去执行呢?
- 为什么Worker要继承AbstractQueuedSynchronizer,其作用在哪里?
- shutdown()和shutdownNow()区别在哪里?
想到生活中的一些事
某外包公司老板最近接到一个小项目,项目经理(称A)把需求整理好,拆解任务,跟老板汇报保守要10个开发人员。嗯,二话不说立刻招人,干活。某一天,甲方提出新需求,A掐指一算,任务有点多,目前的开发都还在忙着,安排不上了,哎,先拆解任务存入需求库把。果然是甲方爸爸,没过几天,又提出新的需求,A也忍不住骂mmp了,因为需求库满了,只能跟老板汇报,老板摸了下头,当前开发人员还在我最大能接受(上限20个开发)范围内,二话不说立刻招人,干活。此时,老板跟A墨迹了下,如果继续提新需求,招的开发人员比我最大能接受数量还要多,直接跟他们摊牌把。
其实老板也有自己得小心思,招20个开发,也不是长久之计(程序员动不动就2w起步),等项目需求都做完了,把几个核心得留下就行了,裁掉那些几天都没活干的。
以上几个加粗文字,勉强能跟线程池的核心参数对得上把(大家自行对号入座哈),然后呢,针对此故事回答下前面得几个问题
回到前面的问题
- keepAliveTime:主要是用来回收那些空闲的线程,当线程创建之后,执行完任务,就立刻去队列中取任务,如果在指定时间内没有任务可取(也就是就是每个开发人员做完手上的任务,就要去需求库中领取任务,一直重复此操作),该线程就会被回收掉。当然,这个有一定的条件,比如:允许核心线程数超时或者工作线程数是大于配置的核心线程数。
- corePoolSize:核心线程数内创建的线程和后面创建的线程是一样的,在线程池中并没有所谓固定的核心线程。每个线程执行完任务,接着从队列中拉一个任务出来继续执行,当任务没有的时候,线程领取不到就会被回收掉(也就是你那10个员工先入职,不代表你可以永远在公司就职,如果后来的员工比你更能干活积极,照样把你开掉的)。
- 这里主要考察拒绝策略。我们可以自定义一个拒绝策略继续将我们的任务放到线程池,比如:新建一个队列用来存储那些触发拒绝策略的任务,再开一个线程从队列中取任务出来重新丢到线程池中。
- 有两个地方,需要使用worker进行加锁。分别是runWorker()和shutdown()函数。个人理解是执行runWorker函数时,如果取到任务时候加锁,作者不希望因为执行shutdown(主要是给工作线程打中断标记)而对线程正在执行的任务有任何影响(虽然给线程打中断标记,不会对线程有绝对的影响,主要还是看开发者怎么处理,这一点可具体看 interrupt 相关知识)。
- 执行shutdown(),会将线程池状态修改为SHUTDOWN,并且给工作线程打中断标记,此时不会接收新的任务,但,如果队列已有任务,则,会继续执行的。
执行shutdown(),线程池对应的状态是STOP,并强制给所有工作线程打中断标记,此时,同样不会接收新任务,如果队列中有任务,会把任务移除掉,不会执行。
总体来说,这两个函数对正在被执行的任务是没有影响的(这里排除你对中断异常做了其他处理)。
针对以上问题,我们分别看下源码:
线程执行逻辑:
final void runWorker(Worker w) { | |
Thread wt = Thread.currentThread(); | |
Runnable task = w.firstTask; | |
w.firstTask = null; | |
w.unlock(); // allow interrupts | |
boolean completedAbruptly = true; | |
try { | |
//领取任务,领得到进入while逻辑,否则,处理后面回收的逻辑 | |
//从这里可看出,没有所谓的固定核心线程,全靠抢,抢到就继续执行,抢不到就销毁 | |
while (task != null || (task = getTask()) != null) { | |
//取到任务,加锁 | |
w.lock(); | |
// If pool is stopping, ensure thread is interrupted; | |
// if not, ensure thread is not interrupted. This | |
// requires a recheck in second case to deal with | |
// shutdownNow race while clearing interrupt | |
if ((runStateAtLeast(ctl.get(), STOP) || | |
(Thread.interrupted() && | |
runStateAtLeast(ctl.get(), STOP))) && | |
!wt.isInterrupted()) | |
wt.interrupt(); | |
try { | |
beforeExecute(wt, task); | |
Throwable thrown = null; | |
try { | |
//这里是我们具体的业务逻辑了 | |
task.run(); | |
} catch (RuntimeException x) { | |
thrown = x; throw x; | |
} catch (Error x) { | |
thrown = x; throw x; | |
} catch (Throwable x) { | |
thrown = x; throw new Error(x); | |
} finally { | |
afterExecute(task, thrown); | |
} | |
} finally { | |
task = null; | |
w.completedTasks++; | |
w.unlock(); | |
} | |
} | |
completedAbruptly = false; | |
} finally { | |
//处理回收线程逻辑 | |
processWorkerExit(w, completedAbruptly); | |
} | |
} |
获取队列中的任务逻辑:
private Runnable getTask() { | |
boolean timedOut = false; // Did the last poll() time out? | |
for (;;) { | |
int c = ctl.get(); | |
int rs = runStateOf(c); | |
// Check if queue empty only if necessary. | |
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { | |
decrementWorkerCount(); | |
return null; | |
} | |
int wc = workerCountOf(c); | |
//(1) allowCoreThreadTimeOut 默认是false | |
//主要看 wc > corePoolSize; | |
// Are workers subject to culling? | |
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; | |
//(3)按理,如果此时没有任务加进来,这个条件是满足的,最终会走到 return null 返回,结束无限遍历 | |
if ((wc > maximumPoolSize || (timed && timedOut)) | |
&& (wc > 1 || workQueue.isEmpty())) { | |
if (compareAndDecrementWorkerCount(c)) | |
return null; | |
continue; | |
} | |
try { | |
//(2)当timed为true, 并且在指定时间keepAliveTime找不到任务会返回一 个 null | |
//由于在for(;;)内,会继续遍历回到(3)处 | |
Runnable r = timed ? | |
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : | |
workQueue.take(); | |
if (r != null) | |
return r; | |
timedOut = true; | |
} catch (InterruptedException retry) { | |
timedOut = false; | |
} | |
} | |
} |
shutdown逻辑,处理线程池状态为SHUTDOWN,遍历工作线程,然后获取锁,成功则打中断标记
private void interruptIdleWorkers(boolean onlyOne) { | |
final ReentrantLock mainLock = this.mainLock; | |
mainLock.lock(); | |
try { | |
for (Worker w : workers) { | |
Thread t = w.thread; | |
//这里同样是尝试获取worker的锁,意味着如果线程正在执行任务, | |
//那会阻塞在这里的,也就是无法打标记 | |
if (!t.isInterrupted() && w.tryLock()) { | |
try { | |
t.interrupt(); | |
} catch (SecurityException ignore) { | |
} finally { | |
w.unlock(); | |
} | |
} | |
if (onlyOne) | |
break; | |
} | |
} finally { | |
mainLock.unlock(); | |
} | |
} |
shutdownNow逻辑,处理线程池状态为STOP。遍历工作线程,给线程打中断标记。移除队列中的任务,并返回。
void interruptIfStarted() { | |
Thread t; | |
//这里跟shutdown不一样,,有点强制的意思 | |
//因为getState() >= 0 基本能满足,尽管线程已经在执行任务了 | |
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { | |
try { | |
t.interrupt(); | |
} catch (SecurityException ignore) { | |
} | |
} | |
} | |
private List<Runnable> drainQueue() { | |
BlockingQueue<Runnable> q = workQueue; | |
ArrayList<Runnable> taskList = new ArrayList<Runnable>(); | |
//移除队列中的元素 | |
q.drainTo(taskList); | |
if (!q.isEmpty()) { | |
//这里,主要是再次确认队列中是否还有元素,确保做到不留一个活口哈。 | |
//因为有可能在q.drainTo(taskList)期间,用户线程继续往线程池丢任务。 | |
for (Runnable r : q.toArray(new Runnable[0])) { | |
if (q.remove(r)) | |
taskList.add(r); | |
} | |
} | |
return taskList; | |
} |
写到最后
线程池不管平时工作还是面试,出现的频率还是比较多的。如果平时注意积累,使用时即可信手拈来。另外,以上仅是本人对线程池的初步了解,若有不对的地方,欢迎指出哈。