publicThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
publicvoidexecute(Runnable command) { if (command == null) thrownewNullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ intc= ctl.get(); if (workerCountOf(c) < corePoolSize) { //workerCountOf获取线程池的当前线程数;小于corePoolSize,执行addWorker创建新线程执行command任务 if (addWorker(command, true)) return; c = ctl.get(); } // double check: c, recheck // 线程池处于RUNNING状态,把提交的任务成功放入阻塞队列中 if (isRunning(c) && workQueue.offer(command)) { intrecheck= ctl.get(); // recheck and if necessary 回滚到入队操作前,即倘若线程池shutdown状态,就remove(command) //如果线程池没有RUNNING,成功从阻塞队列中删除任务,执行reject方法处理任务 if (! isRunning(recheck) && remove(command)) reject(command); //线程池处于running状态,但是没有线程,则创建线程 elseif (workerCountOf(recheck) == 0) addWorker(null, false); } // 往线程池中创建新的线程失败,则reject任务 elseif (!addWorker(command, false)) reject(command); }
// Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) returnfalse;
for (;;) { intwc= workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) returnfalse; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
booleanworkerStarted=false; booleanworkerAdded=false; Workerw=null; try { w = newWorker(firstTask); finalThreadt= w.thread; if (t != null) { // 线程池重入锁 finalReentrantLockmainLock=this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. intrs= runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable thrownewIllegalThreadStateException(); workers.add(w); ints= workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); // 线程启动,执行任务(Worker.thread(firstTask).start()); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
Worker 类的 runworker 方法
1 2 3 4 5 6 7 8 9 10 11 12
privatefinalclassWorkerextendsAbstractQueuedSynchronizerimplementsRunnable{ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); // 创建线程 } /** Delegates main run loop to outer runWorker */ publicvoidrun() { runWorker(this); } // ... }
继承了 AQS 类,可以方便的实现工作线程的中止操作;
实现了 Runnable 接口,可以将自身作为一个任务在工作线程中执行;
当前提交的任务 firstTask 作为参数传入 Worker 的构造方法;
一些属性还有构造方法:
1 2 3 4 5 6 7 8 9 10 11 12 13
//运行的线程,前面addWorker方法中就是直接通过启动这个线程来启动这个worker final Thread thread; //当一个worker刚创建的时候,就先尝试执行这个任务 Runnable firstTask; //记录完成任务的数量 volatilelong completedTasks;
public class FutureTask<V> implements RunnableFuture<V> 可以将 FutureTask 提交至线程池中等待被执行(通过 FutureTask 的 run 方法来执行)
内部状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/* The run state of this task, initially NEW. * ... * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ privatevolatileint state; privatestaticfinalintNEW=0; privatestaticfinalintCOMPLETING=1; privatestaticfinalintNORMAL=2; privatestaticfinalintEXCEPTIONAL=3; privatestaticfinalintCANCELLED=4; privatestaticfinalintINTERRUPTING=5; privatestaticfinalintINTERRUPTED=6;
内部状态的修改通过 sun.misc.Unsafe 修改。
get 方法
1 2 3 4 5 6
public V get()throws InterruptedException, ExecutionException { ints= state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
publicvoidrun() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts ints= state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }