Android多線程分析之四:MessageQueue的實(shí)現(xiàn)
羅朝輝 (http://www.shnenglu.com/kesalin/)
在前面兩篇文章《Android多線程分析之二:Thread的實(shí)現(xiàn)》,《Android多線程分析之三:Handler,Looper的實(shí)現(xiàn)》中分別介紹了 Thread 的創(chuàng)建,運(yùn)行,銷毀的過程以及 Thread與 Handler,Looper 之間的關(guān)聯(lián):Thread 在其 run() 方法中創(chuàng)建和運(yùn)行消息處理循環(huán) Looper,而 Looper::loop() 方法不斷地從 MessageQueue 中獲取消息,并由 Handler 分發(fā)處理該消息。接下來就來介紹 MessageQueue 的運(yùn)作機(jī)制,MessageQueue。
參考源碼:
android/framework/base/core/java/android/os/MessageQueue.java
android/framework/base/core/java/android/os/Message.java
android/frameworks/base/core/jni/android_os_MessageQueue.h
android/frameworks/base/core/jni/android_os_MessageQueue.cpp
先來看 MessageQueue 的構(gòu)造函數(shù)以及重要的成員變量:
// True if the message queue can be quit.
private final boolean mQuitAllowed;
private int mPtr; // used by native code
Message mMessages;
private boolean mQuiting;
// Indicates whether next() is blocked waiting in pollOnce() with a non-zero timeout.
private boolean mBlocked;
mQuitAllowed: 其含義與 Looper.prepare(boolean quitAllowed) 中參數(shù)含義一直,是否允許中止;mPtr:Android MessageQueue 是通過調(diào)用 C++ native MessageQueue 實(shí)現(xiàn)的,這個 mPtr 就是指向 native MessageQueue;mMessages:Message 是鏈表結(jié)構(gòu)的,因此這個變量就代表 Message 鏈表;mQuiting:是否終止了;mBlocked:是否正在等待被激活以獲取消息;
MessageQueue 的構(gòu)造函數(shù)很簡單: MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
nativeInit();
}
它通過轉(zhuǎn)調(diào) native 方法 nativeInit() 實(shí)現(xiàn)的,后者是定義在 android_os_MessageQueue.cpp 中:static void android_os_MessageQueue_nativeInit(JNIEnv* env, jobject obj) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return;
}
nativeMessageQueue->incStrong(env);
android_os_MessageQueue_setNativeMessageQueue(env, obj, nativeMessageQueue);
}
static void android_os_MessageQueue_setNativeMessageQueue(JNIEnv* env, jobject messageQueueObj,
NativeMessageQueue* nativeMessageQueue) {
env->SetIntField(messageQueueObj, gMessageQueueClassInfo.mPtr,
reinterpret_cast<jint>(nativeMessageQueue));
}
nativeInit() 方法創(chuàng)建 NativeMessageQueue 對象,并將這個對象的指針復(fù)制給 Android MessageQueue 的 mPtr。NativeMessageQueue 的定義如下:class MessageQueue : public RefBase {
public:
/* Gets the message queue's looper. */
inline sp<Looper> getLooper() const {
return mLooper;
}
bool raiseAndClearException(JNIEnv* env, const char* msg);
virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj) = 0;
protected:
MessageQueue();
virtual ~MessageQueue();
protected:
sp<Looper> mLooper;
};
class NativeMessageQueue : public MessageQueue {
public:
NativeMessageQueue();
virtual ~NativeMessageQueue();
virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj);
void pollOnce(JNIEnv* env, int timeoutMillis);
void wake();
private:
bool mInCallback;
jthrowable mExceptionObj;
};
其中值得關(guān)注的是 NativeMessageQueue 的構(gòu)造以及pollOnce,wake 兩個方法,它們是Java MessageQueue 中 nativePollOnce 和 nativeWake 的 native 方法:NativeMessageQueue::NativeMessageQueue() : mInCallback(false), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
void NativeMessageQueue::pollOnce(JNIEnv* env, int timeoutMillis) {
mInCallback = true;
mLooper->pollOnce(timeoutMillis);
mInCallback = false;
}
void NativeMessageQueue::wake() {
mLooper->wake();
}
在 NativeMessageQueue 的構(gòu)造函數(shù)中,會獲取當(dāng)前線程的 Looper(注意這是 C++ Looper,定義在frameworks/native/libs/utils/Looper.h 中),如果當(dāng)前線程還沒有 Looper,就創(chuàng)建一個,并保存在線程的 TLS 中。pollOnce 和 wake 最終都是通過 Linux 的 epoll 模型來實(shí)現(xiàn)的。pollOnce() 通過等待被激活,然后從消息隊(duì)列中獲取消息;wake() 則是激活處于等待狀態(tài)的消息隊(duì)列,通知它有消息到達(dá)了。這是典型的生產(chǎn)者-消費(fèi)者模型。
對于Android MessageQueue 來說,其主要的工作就是:接收投遞進(jìn)來的消息,獲取下一個需要處理的消息。這兩個功能是通過 enqueueMessage() 和 next() 方法實(shí)現(xiàn)的。next() 在前一篇文章介紹 Looper.loop() 時(shí)提到過。
在分析這兩個函數(shù)之前,先來介紹一下 Message:前面說過 Message 是完備的,即它同時(shí)帶有消息內(nèi)容和處理消息的 Handler 或 callback。下面列出它的主要成員變量:public int what; // 消息 id
public int arg1; // 消息參數(shù)
public int arg2; // 消息參數(shù)
public Object obj; // 消息參數(shù)
long when; // 處理延遲時(shí)間,由 Handler 的 sendMessageDelayed/postDelayed 設(shè)置
Handler target; // 處理消息的 Handler
Runnable callback; // 處理消息的回調(diào)
Message next; // 鏈表結(jié)構(gòu),指向下一個消息
Message 有一些名為 obtain 的靜態(tài)方法用于創(chuàng)建 Message,通常我們都是通過 Handler 的 obtain 靜態(tài)方法轉(zhuǎn)調(diào) Message 的靜態(tài)方法來創(chuàng)建新的 Message。
接下來分析 enqueueMessage: final boolean enqueueMessage(Message msg, long when) {
if (msg.isInUse()) {
throw new AndroidRuntimeException(msg + " This message is already in use.");
}
if (msg.target == null) {
throw new AndroidRuntimeException("Message must have a target.");
}
boolean needWake;
synchronized (this) {
if (mQuiting) {
return false;
}
msg.when = when;
Message p = mMessages;
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
}
if (needWake) {
nativeWake(mPtr);
}
return true;
}
首先檢測消息的合法性:是否已經(jīng)在處理中和是否有處理它的Handler,然后判斷 mQuiting 是否中止了,如果沒有則根據(jù)消息處理時(shí)間排序?qū)⑾⒉迦腈湵碇械暮线m位置。在這其中作了一些減少同步操作的優(yōu)化,即使當(dāng)前消息隊(duì)列已經(jīng)處于 Blocked 狀態(tài),且隊(duì)首是一個消息屏障(和內(nèi)存屏障的理念一樣,這里是通過 p.target == null 來判斷隊(duì)首是否是消息屏障),并且要插入的消息是所有異步消息中最早要處理的才會 needwake 激活消息隊(duì)列去獲取下一個消息。Handler 的 post/sendMessage 系列方法最后都是通過轉(zhuǎn)調(diào) MessageQueue 的 enqueueMessage 來實(shí)現(xiàn)的,比如: public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
其實(shí) Handler 中與Message 相關(guān)的靜態(tài)方法都是通過 MessageQueue 的對應(yīng)的靜態(tài)方法實(shí)現(xiàn)的,比如 removeMessages, hasMessages, hasCallbacks 等等,這里就不一一詳述了。至此,已經(jīng)完整地分析了如何通過 Handler 提交消息到 MessageQueue 中了。
下面來分析如何從 MessageQueue 中獲取合適的消息, 這是 next() 要做的最主要的事情,next() 方法還做了其他一些事情,這些其它事情是為了提高系統(tǒng)效果,利用消息隊(duì)列在空閑時(shí)通過 idle handler 做一些事情,比如 gc 等等。但它們和獲取消息關(guān)系不大,所以這部分將從略介紹。 final Message next() {
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
nativePollOnce(mPtr, nextPollTimeoutMillis);
synchronized (this) {
if (mQuiting) {
return null;
}
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (false) Log.v("MessageQueue", "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf("MessageQueue", "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
隊(duì)列被激活之后,首先判斷隊(duì)首是不是消息屏障,如果是則跳過所有的同步消息,查找最先要處理的異步消息。如果第一個待處理的消息還沒有到要處理的時(shí)機(jī)則設(shè)置激活等待時(shí)間;否則這個消息就是需要處理的消息,將該消息設(shè)置為 inuse,并將隊(duì)列設(shè)置為非 blocked 狀態(tài),然后返回該消息。next() 方法是在 Looper.loop() 中被調(diào)用的,Looper 在獲得要處理的消息之后就會調(diào)用和消息關(guān)聯(lián)的 Handler 來分發(fā)消息,這里再回顧一下: public static void loop() {
final Looper me = myLooper();
if (me ==
null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
final MessageQueue queue = me.mQueue;
for (;;) {
Message msg = queue.next();
// might block
if (msg ==
null) {
// No message indicates that the message queue is quitting.
return;
}
msg.target.dispatchMessage(msg);
msg.recycle();
}
}
如果隊(duì)列中沒有消息或者第一個待處理的消息時(shí)機(jī)未到,且也沒有其他利用隊(duì)列空閑要處理的事務(wù),則將隊(duì)列設(shè)置為設(shè)置 blocked 狀態(tài),進(jìn)入等待狀態(tài);否則就利用隊(duì)列空閑處理其它事務(wù)。
至此,已經(jīng)對 Android 多線程相關(guān)的主要概念 Thread, HandlerThread, Handler, Looper, Message, MessageQueue 作了一番介紹,下一篇就要講講 AsyncTask,這是為了簡化 UI 多線程編程為提供的一個便利工具類。