對(duì)于多線程有了一點(diǎn)了解之后,那么來(lái)看看java.lang.concurrent包下面的一些東西。在此之前,我們運(yùn)行一個(gè)線程都是顯式調(diào)用了 Thread的start()方法。我們用concurrent下面的類來(lái)實(shí)現(xiàn)一下線程的運(yùn)行,而且這將成為以后常用的方法或者實(shí)現(xiàn)思路。
看一個(gè)簡(jiǎn)單的例子:
Java代碼

- public class CacheThreadPool {
- public static void main(String[] args) {
- ExecutorService exec=Executors.newCachedThreadPool();
- for(int i=0;i<5;i++)
- exec.execute(new LiftOff());
- exec.shutdown();//并不是終止線程的運(yùn)行,而是禁止在這個(gè)Executor中添加新的任務(wù)
- }
- }
這個(gè)例子其實(shí)很容易看懂,ExecutorService中有一個(gè)execute方法,這個(gè)方法的參數(shù)是Runnable類型。也就是說(shuō),將一個(gè)實(shí)現(xiàn)了Runnable類型的類的實(shí)例作為參數(shù)傳入execute方法并執(zhí)行,那么線程就相應(yīng)的執(zhí)行了。
一、ExecutorService 先看看ExecutorService,這是一個(gè)接口,簡(jiǎn)單的列一下這個(gè)接口:
Java代碼

- public interface ExecutorService extends Executor {
-
- void shutdown();
-
- List<Runnable> shutdownNow();
-
- boolean isShutdown();
-
- boolean isTerminated();
-
- boolean awaitTermination(long timeout, TimeUnit unit)
-
- <T> Future<T> submit(Callable<T> task);
-
- <T> Future<T> submit(Runnable task, T result);
-
- Future<?> submit(Runnable task);
-
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
-
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-
- <T> T invokeAny(Collection<? extends Callable<T>> tasks)
-
- <T> T invokeAny(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- }
ExecuteService繼承了Executor,Executor也是一個(gè)接口,里面只有一個(gè)方法:
Java代碼

- void execute(Runnable command)
二、Executors Executors是一個(gè)類,直接援引JDK文檔的說(shuō)明來(lái)說(shuō)一下這個(gè)類的作用:
Factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes defined in this package. This class supports the following kinds of methods:
- Methods that create and return an ExecutorService set up with commonly useful configuration settings.
- Methods that create and return a ScheduledExecutorService set up with commonly useful configuration settings.
- Methods that create and return a "wrapped" ExecutorService, that disables reconfiguration by making implementation-specific methods inaccessible.
- Methods that create and return a ThreadFactory that sets newly created threads to a known state.
- Methods that create and return a Callable out of other closure-like forms, so they can be used in execution methods requiring Callable.
在上面的例子中,我們用到了newCachedThreadPool()方法。看一下這個(gè)方法:
Java代碼

- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
- }
在源碼中我們可以知道兩點(diǎn),1、這個(gè)方法返回類型是ExecutorService;2、此方法返回值實(shí)際是另一個(gè)類的實(shí)例。看一下這個(gè)類的信息:
Java代碼

- public class ThreadPoolExecutor extends AbstractExecutorService {
- ..........
- private final BlockingQueue<Runnable> workQueue;//這個(gè)變量在下面會(huì)提到
- ..........
- }
ThreadPoolExecutor繼承了AbstractExecutorService,而AbstractExecutorService又實(shí)現(xiàn) 了ExecutorService接口。所以,根據(jù)多態(tài),ThreadPoolExecutor可以看作是ExecutorService類型。
線程執(zhí)行的最關(guān)鍵的一步是執(zhí)行了executor方法,根據(jù)java的動(dòng)態(tài)綁定,實(shí)際執(zhí)行的是ThreadPoolExecutor所實(shí)現(xiàn)的executor方法。看看源碼:
Java代碼

- public class ThreadPoolExecutor extends AbstractExecutorService {
- ..........
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
- if (runState == RUNNING && workQueue.offer(command)) {
- if (runState != RUNNING || poolSize == 0)
- ensureQueuedTaskHandled(command);
- }
- else if (!addIfUnderMaximumPoolSize(command))
- reject(command); // is shutdown or saturated
- }
- }
- ..........
- }
根據(jù)程序正常執(zhí)行的路線來(lái)看,這個(gè)方法中比較重要的兩個(gè)地方分別是:
1、workQueue.offer(command)
workQueue在上面提到過(guò),是BlockingQueue<Runnable>類型的變量,這條語(yǔ)句就是將Runnable類型的實(shí)例加入到隊(duì)列中。
2、ensureQueuedTaskHandled(command)
這個(gè)是線程執(zhí)行的關(guān)鍵語(yǔ)句。看看它的源碼:
Java代碼

- public class ThreadPoolExecutor extends AbstractExecutorService {
- ..........
- private void ensureQueuedTaskHandled(Runnable command) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- boolean reject = false;
- Thread t = null;
- try {
- int state = runState;
- if (state != RUNNING && workQueue.remove(command))
- reject = true;
- else if (state < STOP &&
- poolSize < Math.max(corePoolSize, 1) &&
- !workQueue.isEmpty())
- t = addThread(null);
- } finally {
- mainLock.unlock();
- }
- if (reject)
- reject(command);
- else if (t != null)
- t.start();
- }
- ..........
- }
在這里我們就可以看到最終執(zhí)行了t.start()方法來(lái)運(yùn)行線程。在這之前的重點(diǎn)是t=addThread(null)方法,看看addThread方法的源碼:
Java代碼

- public class ThreadPoolExecutor extends AbstractExecutorService {
- ..........
- private Thread addThread(Runnable firstTask) {
- Worker w = new Worker(firstTask);
- Thread t = threadFactory.newThread(w);
- if (t != null) {
- w.thread = t;
- workers.add(w);
- int nt = ++poolSize;
- if (nt > largestPoolSize)
- largestPoolSize = nt;
- }
- return t;
- }
- ..........
- }
這里兩個(gè)重點(diǎn),很明顯:
1、Worker w = new Worker(firstTask)
2、Thread t = threadFactory.newThread(w)
先看Worker是個(gè)什么結(jié)構(gòu):
Java代碼

- public class ThreadPoolExecutor extends AbstractExecutorService {
- ..........
- private final class Worker implements Runnable {
- ..........
- Worker(Runnable firstTask) {
- this.firstTask = firstTask;
- }
-
- private Runnable firstTask;
- ..........
-
- public void run() {
- try {
- Runnable task = firstTask;
- firstTask = null;
- while (task != null || (task = getTask()) != null) {
- runTask(task);
- task = null;
- }
- } finally {
- workerDone(this);
- }
- }
- }
-
- Runnable getTask() {
- for (;;) {
- try {
- int state = runState;
- if (state > SHUTDOWN)
- return null;
- Runnable r;
- if (state == SHUTDOWN) // Help drain queue
- r = workQueue.poll();
- else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
- r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
- else
- r = workQueue.take();
- if (r != null)
- return r;
- if (workerCanExit()) {
- if (runState >= SHUTDOWN) // Wake up others
- interruptIdleWorkers();
- return null;
- }
- // Else retry
- } catch (InterruptedException ie) {
- // On interruption, re-check runState
- }
- }
- }
- }
- ..........
- }
Worker是一個(gè)內(nèi)部類。根據(jù)之前可以知道,傳入addThread的參數(shù)是null,也就是說(shuō)Work中firstTask為null。
在看看newThread是一個(gè)什么方法:
Java代碼

- public class Executors {
- ..........
- static class DefaultThreadFactory implements ThreadFactory {
- ..........
- public Thread newThread(Runnable r) {
- Thread t = new Thread(group, r,
- namePrefix + threadNumber.getAndIncrement(),
- 0);
- if (t.isDaemon())
- t.setDaemon(false);
- if (t.getPriority() != Thread.NORM_PRIORITY)
- t.setPriority(Thread.NORM_PRIORITY);
- return t;
- }
- ..........
- }
- ..........
- }
通過(guò)源碼可以得知threadFactory的實(shí)際類型是DefaultThreadFactory,而DefaultThreadFactory是Executors的一個(gè)嵌套內(nèi)部類。
之前我們提到了t.start()這個(gè)方法執(zhí)行了線程。那么現(xiàn)在從頭順一下,看看到底是執(zhí)行了誰(shuí)的run方法。首先知 道,t=addThread(null),而addThread內(nèi)部執(zhí)行了下面三步,Worker w = new Worker(null);Thread t = threadFactory.newThread(w);return t;這里兩個(gè)t是一致的。
從這里可以看出,t.start()實(shí)際上執(zhí)行的是Worker內(nèi)部的run方法。run()內(nèi)部會(huì)在if條件里面使用“短路”:判斷firstTask 是否為null,若不是null則直接執(zhí)行firstTask的run方法;如果是null,則調(diào)用getTask()方法來(lái)獲取Runnable類型實(shí) 例。從哪里獲取呢?workQueue!在execute方法中,執(zhí)行ensureQueuedTaskHandled(command)之前就已經(jīng)把 Runnable類型實(shí)例放入到workQueue中了,所以這里可以從workQueue中獲取到。
轉(zhuǎn)自:
http://paddy-w.iteye.com/blog/1039530
posted on 2012-10-30 21:56
小果子 閱讀(689)
評(píng)論(0) 編輯 收藏 引用 所屬分類:
Android & Ios