publicvoidexecute(Runnable command){ if (command == null) thrownew NullPointerException(); /* * 分三步执行 * 1. 现在的线程池的线程数量小于corePoolSize值, 则创建一个新的线程, 用新的线程执行command. * 在调用addWorker()时会自动检测runState和workerCount, 如果发现添加失败的话, 会返回false. * 2. 如果能将command成功地加入到任务队列里, 在接下来的执行中无论是否新建工作线程都要进行对线程池状态 * 进行Double check, 因为 existing ones died since last checking 或者线程池恰巧在这时关闭了. * So we recheck state and if necessary roll back the enqueuing if stopped, or start a new thread if there are none. * 3. 如果不能将command入列到任务队列的话,就尝试启动一个新的线程来运行它. 如果仍然失败就需要reject任务了. */ int c = ctl.get(); // 第一步 : // 计算线程池中运行的线程数量, 如果当前运行的线程数量小于corePoolSize, 就增加一个worker. if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; // 因为在addWorker时会改变ctl的值, 因此重新获取一下 c = ctl.get(); } // 第二步: // 工作线程已经达到corePoolSize数量或者添加worker失败, 将command加入到任务队列里面去 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 如果线程池不再处于运行状态且能成功从任务队列里将删除删除掉, 就reject任务 if (! isRunning(recheck) && remove(command)) reject(command); // 如果worker数量为0的话,就新建一个worker, 执行刚刚添加到任务队列里的任务 elseif (workerCountOf(recheck) == 0) addWorker(null, false); } // 第三步: // 如果任务队列已经满了, 则尝试新建一个worker用来执行command elseif (!addWorker(command, false)) reject(command); }
privatebooleanaddWorker(Runnable firstTask, boolean core){ // 下面整个循环都是为了 改变ctl中工作线程worker的数量. retry: for (;;) { int c = ctl.get(); // 计算线程池状态. int rs = runStateOf(c);
// Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) ) // 当线程池处于非运行状态, returnfalse;
// 开始增加ctl中的线程数量 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) returnfalse; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // 如果线程池的状态发生了改变则继续执行retry循环, 进行Check if queue empty only if necessary 检测. // 没有线程池状态没有发生变化的话, 则继续执行ctl数量增加操作 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
// ok,到现在ctl中的worker数量已经改变完成, 开始真正的创建worker boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable thrownew IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
privatevoidprocessWorkerExit(Worker w, boolean completedAbruptly){ if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount();
int c = ctl.get(); if (runStateLessThan(c, STOP)) { // 如果线程池还能执行任务队列里的任务(Runnable, SHUTDOWN状态),就继续执行任务 if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }