對于多線程有了一點了解之后,那么來看看java.lang.concurrent包下面的一些東西。在此之前,我們運行一個線程都是顯式調用了 Thread的start()方法。我們用concurrent下面的類來實現一下線程的運行,而且這將成為以后常用的方法或者實現思路。
看一個簡單的例子:
Java代碼

- public class CacheThreadPool {
- public static void main(String[] args) {
- ExecutorService exec=Executors.newCachedThreadPool();
- for(int i=0;i<5;i++)
- exec.execute(new LiftOff());
- exec.shutdown();//并不是終止線程的運行,而是禁止在這個Executor中添加新的任務
- }
- }
這個例子其實很容易看懂,ExecutorService中有一個execute方法,這個方法的參數是Runnable類型。也就是說,將一個實現了Runnable類型的類的實例作為參數傳入execute方法并執行,那么線程就相應的執行了。
一、ExecutorService 先看看ExecutorService,這是一個接口,簡單的列一下這個接口:
Java代碼

- public interface ExecutorService extends Executor {
-
- void shutdown();
-
- List<Runnable> shutdownNow();
-
- boolean isShutdown();
-
- boolean isTerminated();
-
- boolean awaitTermination(long timeout, TimeUnit unit)
-
- <T> Future<T> submit(Callable<T> task);
-
- <T> Future<T> submit(Runnable task, T result);
-
- Future<?> submit(Runnable task);
-
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
-
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
-
- <T> T invokeAny(Collection<? extends Callable<T>> tasks)
-
- <T> T invokeAny(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- }
ExecuteService繼承了Executor,Executor也是一個接口,里面只有一個方法:
Java代碼

- void execute(Runnable command)
二、Executors Executors是一個類,直接援引JDK文檔的說明來說一下這個類的作用:
Factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes defined in this package. This class supports the following kinds of methods:
- Methods that create and return an ExecutorService set up with commonly useful configuration settings.
- Methods that create and return a ScheduledExecutorService set up with commonly useful configuration settings.
- Methods that create and return a "wrapped" ExecutorService, that disables reconfiguration by making implementation-specific methods inaccessible.
- Methods that create and return a ThreadFactory that sets newly created threads to a known state.
- Methods that create and return a Callable out of other closure-like forms, so they can be used in execution methods requiring Callable.
在上面的例子中,我們用到了newCachedThreadPool()方法。看一下這個方法:
Java代碼

- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
- }
在源碼中我們可以知道兩點,1、這個方法返回類型是ExecutorService;2、此方法返回值實際是另一個類的實例。看一下這個類的信息:
Java代碼

- public class ThreadPoolExecutor extends AbstractExecutorService {
- ..........
- private final BlockingQueue<Runnable> workQueue;//這個變量在下面會提到
- ..........
- }
ThreadPoolExecutor繼承了AbstractExecutorService,而AbstractExecutorService又實現 了ExecutorService接口。所以,根據多態,ThreadPoolExecutor可以看作是ExecutorService類型。
線程執行的最關鍵的一步是執行了executor方法,根據java的動態綁定,實際執行的是ThreadPoolExecutor所實現的executor方法。看看源碼:
Java代碼

- public class ThreadPoolExecutor extends AbstractExecutorService {
- ..........
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
- if (runState == RUNNING && workQueue.offer(command)) {
- if (runState != RUNNING || poolSize == 0)
- ensureQueuedTaskHandled(command);
- }
- else if (!addIfUnderMaximumPoolSize(command))
- reject(command); // is shutdown or saturated
- }
- }
- ..........
- }
根據程序正常執行的路線來看,這個方法中比較重要的兩個地方分別是:
1、workQueue.offer(command)
workQueue在上面提到過,是BlockingQueue<Runnable>類型的變量,這條語句就是將Runnable類型的實例加入到隊列中。
2、ensureQueuedTaskHandled(command)
這個是線程執行的關鍵語句。看看它的源碼:
Java代碼

- public class ThreadPoolExecutor extends AbstractExecutorService {
- ..........
- private void ensureQueuedTaskHandled(Runnable command) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- boolean reject = false;
- Thread t = null;
- try {
- int state = runState;
- if (state != RUNNING && workQueue.remove(command))
- reject = true;
- else if (state < STOP &&
- poolSize < Math.max(corePoolSize, 1) &&
- !workQueue.isEmpty())
- t = addThread(null);
- } finally {
- mainLock.unlock();
- }
- if (reject)
- reject(command);
- else if (t != null)
- t.start();
- }
- ..........
- }
在這里我們就可以看到最終執行了t.start()方法來運行線程。在這之前的重點是t=addThread(null)方法,看看addThread方法的源碼:
Java代碼

- public class ThreadPoolExecutor extends AbstractExecutorService {
- ..........
- private Thread addThread(Runnable firstTask) {
- Worker w = new Worker(firstTask);
- Thread t = threadFactory.newThread(w);
- if (t != null) {
- w.thread = t;
- workers.add(w);
- int nt = ++poolSize;
- if (nt > largestPoolSize)
- largestPoolSize = nt;
- }
- return t;
- }
- ..........
- }
這里兩個重點,很明顯:
1、Worker w = new Worker(firstTask)
2、Thread t = threadFactory.newThread(w)
先看Worker是個什么結構:
Java代碼

- public class ThreadPoolExecutor extends AbstractExecutorService {
- ..........
- private final class Worker implements Runnable {
- ..........
- Worker(Runnable firstTask) {
- this.firstTask = firstTask;
- }
-
- private Runnable firstTask;
- ..........
-
- public void run() {
- try {
- Runnable task = firstTask;
- firstTask = null;
- while (task != null || (task = getTask()) != null) {
- runTask(task);
- task = null;
- }
- } finally {
- workerDone(this);
- }
- }
- }
-
- Runnable getTask() {
- for (;;) {
- try {
- int state = runState;
- if (state > SHUTDOWN)
- return null;
- Runnable r;
- if (state == SHUTDOWN) // Help drain queue
- r = workQueue.poll();
- else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
- r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
- else
- r = workQueue.take();
- if (r != null)
- return r;
- if (workerCanExit()) {
- if (runState >= SHUTDOWN) // Wake up others
- interruptIdleWorkers();
- return null;
- }
- // Else retry
- } catch (InterruptedException ie) {
- // On interruption, re-check runState
- }
- }
- }
- }
- ..........
- }
Worker是一個內部類。根據之前可以知道,傳入addThread的參數是null,也就是說Work中firstTask為null。
在看看newThread是一個什么方法:
Java代碼

- public class Executors {
- ..........
- static class DefaultThreadFactory implements ThreadFactory {
- ..........
- public Thread newThread(Runnable r) {
- Thread t = new Thread(group, r,
- namePrefix + threadNumber.getAndIncrement(),
- 0);
- if (t.isDaemon())
- t.setDaemon(false);
- if (t.getPriority() != Thread.NORM_PRIORITY)
- t.setPriority(Thread.NORM_PRIORITY);
- return t;
- }
- ..........
- }
- ..........
- }
通過源碼可以得知threadFactory的實際類型是DefaultThreadFactory,而DefaultThreadFactory是Executors的一個嵌套內部類。
之前我們提到了t.start()這個方法執行了線程。那么現在從頭順一下,看看到底是執行了誰的run方法。首先知 道,t=addThread(null),而addThread內部執行了下面三步,Worker w = new Worker(null);Thread t = threadFactory.newThread(w);return t;這里兩個t是一致的。
從這里可以看出,t.start()實際上執行的是Worker內部的run方法。run()內部會在if條件里面使用“短路”:判斷firstTask 是否為null,若不是null則直接執行firstTask的run方法;如果是null,則調用getTask()方法來獲取Runnable類型實 例。從哪里獲取呢?workQueue!在execute方法中,執行ensureQueuedTaskHandled(command)之前就已經把 Runnable類型實例放入到workQueue中了,所以這里可以從workQueue中獲取到。
轉自:
http://paddy-w.iteye.com/blog/1039530
posted on 2012-10-30 21:56
小果子 閱讀(672)
評論(0) 編輯 收藏 引用 所屬分類:
Android & Ios