说一下这个面试,关于线程池的那些事

背景

最近在面试找工作吧,关于线程池的问题被面试官问得还是蛮多。但是发现大多面试官也问不出啥来,大都会问有哪几个核心参数,自己拉吧拉吧的讲12345个参数,然后就没然后了(就下一个问题了)。但不排除有些面试官确实可以,会针对你的理解,问你一些稍微源码级别或者给你设计一些场景给你回答(个人还是比较喜欢这种)。

核心参数

  1. corePoolSize 核心线程数
  2. maximumPoolSize 最大线程数
  3. keepAliveTime 线程空闲时间
  4. workQueue 任务缓冲队列
  5. RejectedExecutionHandler 拒绝策略,默认抛出异常(其他包括丢弃队列中旧的任务、丢弃当前任务、使用当前线程执行任务)

值得思考的问题

  1. keepAliveTime参数有什么用,体现在哪里?
  2. corePoolSize核心线程数内创建的线程会被回收?
  3. 我们知道当工作线程(worker)数大于等于 corePoolSize且缓存队列满了之后,会根据maximumPoolSize进行判断,如果小于,则开启一个线程去执行我们的任务。那有没有办法不用丢弃任何任务,也不用当前线程去执行呢?
  4. 为什么Worker要继承AbstractQueuedSynchronizer,其作用在哪里?
  5. shutdown()和shutdownNow()区别在哪里?

想到生活中的一些事

某外包公司老板最近接到一个小项目,项目经理(称A)把需求整理好,拆解任务,跟老板汇报保守要10个开发人员。嗯,二话不说立刻招人,干活。某一天,甲方提出新需求,A掐指一算,任务有点多,目前的开发都还在忙着,安排不上了,哎,先拆解任务存入需求库把。果然是甲方爸爸,没过几天,又提出新的需求,A也忍不住骂mmp了,因为需求库满了,只能跟老板汇报,老板摸了下头,当前开发人员还在我最大能接受(上限20个开发)范围内,二话不说立刻招人,干活。此时,老板跟A墨迹了下,如果继续提新需求,招的开发人员比我最大能接受数量还要多,直接跟他们摊牌把。

其实老板也有自己得小心思,招20个开发,也不是长久之计(程序员动不动就2w起步),等项目需求都做完了,把几个核心得留下就行了,裁掉那些几天都没活干的。

以上几个加粗文字,勉强能跟线程池的核心参数对得上把(大家自行对号入座哈),然后呢,针对此故事回答下前面得几个问题

回到前面的问题

  1. keepAliveTime:主要是用来回收那些空闲的线程,当线程创建之后,执行完任务,就立刻去队列中取任务,如果在指定时间内没有任务可取(也就是就是每个开发人员做完手上的任务,就要去需求库中领取任务,一直重复此操作),该线程就会被回收掉。当然,这个有一定的条件,比如:允许核心线程数超时或者工作线程数是大于配置的核心线程数。
  2. corePoolSize:核心线程数内创建的线程和后面创建的线程是一样的,在线程池中并没有所谓固定的核心线程。每个线程执行完任务,接着从队列中拉一个任务出来继续执行,当任务没有的时候,线程领取不到就会被回收掉(也就是你那10个员工先入职,不代表你可以永远在公司就职,如果后来的员工比你更能干活积极,照样把你开掉的)。
  3. 这里主要考察拒绝策略。我们可以自定义一个拒绝策略继续将我们的任务放到线程池,比如:新建一个队列用来存储那些触发拒绝策略的任务,再开一个线程从队列中取任务出来重新丢到线程池中。
  4. 有两个地方,需要使用worker进行加锁。分别是runWorker()和shutdown()函数。个人理解是执行runWorker函数时,如果取到任务时候加锁,作者不希望因为执行shutdown(主要是给工作线程打中断标记)而对线程正在执行的任务有任何影响(虽然给线程打中断标记,不会对线程有绝对的影响,主要还是看开发者怎么处理,这一点可具体看 interrupt 相关知识)。
  5. 执行shutdown(),会将线程池状态修改为SHUTDOWN,并且给工作线程打中断标记,此时不会接收新的任务,但,如果队列已有任务,则,会继续执行的。
    执行shutdown(),线程池对应的状态是STOP,并强制给所有工作线程打中断标记,此时,同样不会接收新任务,如果队列中有任务,会把任务移除掉,不会执行。
    总体来说,这两个函数对正在被执行的任务是没有影响的(这里排除你对中断异常做了其他处理)。

针对以上问题,我们分别看下源码:

线程执行逻辑:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //领取任务,领得到进入while逻辑,否则,处理后面回收的逻辑
            //从这里可看出,没有所谓的固定核心线程,全靠抢,抢到就继续执行,抢不到就销毁
            while (task != null || (task = getTask()) != null) {
                //取到任务,加锁
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //这里是我们具体的业务逻辑了
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //处理回收线程逻辑
            processWorkerExit(w, completedAbruptly);
        }
    }

获取队列中的任务逻辑:

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            //(1) allowCoreThreadTimeOut 默认是false 
            //主要看  wc > corePoolSize;
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

           //(3)按理,如果此时没有任务加进来,这个条件是满足的,最终会走到 return null 返回,结束无限遍历
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //(2)当timed为true, 并且在指定时间keepAliveTime找不到任务会返回一 个 null
                //由于在for(;;)内,会继续遍历回到(3)处
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

shutdown逻辑,处理线程池状态为SHUTDOWN,遍历工作线程,然后获取锁,成功则打中断标记

private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                //这里同样是尝试获取worker的锁,意味着如果线程正在执行任务,
                //那会阻塞在这里的,也就是无法打标记
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

shutdownNow逻辑,处理线程池状态为STOP。遍历工作线程,给线程打中断标记。移除队列中的任务,并返回。

void interruptIfStarted() {
            Thread t;
            //这里跟shutdown不一样,,有点强制的意思
            //因为getState() >= 0 基本能满足,尽管线程已经在执行任务了
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        } 

private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        //移除队列中的元素
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            //这里,主要是再次确认队列中是否还有元素,确保做到不留一个活口哈。
            //因为有可能在q.drainTo(taskList)期间,用户线程继续往线程池丢任务。
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }

写到最后

线程池不管平时工作还是面试,出现的频率还是比较多的。如果平时注意积累,使用时即可信手拈来。另外,以上仅是本人对线程池的初步了解,若有不对的地方,欢迎指出哈。

正文完