一. Looperprepare --- 準備和初始化
public static void prepare() { prepare(true); } public static void prepareMainLooper() { prepare(false); synchronized (Looper.class) { if (sMainLooper != null) { throw new IllegalStateException("The main Looper has already been prepared."); } sMainLooper = myLooper(); } }
private static void prepare(boolean quitAllowed) { if (sThreadLocal.get() != null) { throw new RuntimeException("Only one Looper may be created per thread"); } sThreadLocal.set(new Looper(quitAllowed)); }
private Looper(boolean quitAllowed) { mQueue = new MessageQueue(quitAllowed); mThread = Thread.currentThread(); }
loop --- 消息循環
public static void loop() { // 獲取tls的唯一Looper final Looper me = myLooper(); if (me == null) { throw new RuntimeException("No Looper; Looper.prepare() wasn"t called on this thread."); } // 獲取Looper中的消息隊列 final MessageQueue queue = me.mQueue; // Make sure the identity of this thread is that of the local process, // and keep track of what that identity token actually is. Binder.clearCallingIdentity(); final long ident = Binder.clearCallingIdentity(); // 進入消息泵循環體 for (;;) { // 獲取一個待處理的消息,有可能會阻塞,后面分析MessageQueue的時候回闡述 Message msg = queue.next(); // might block if (msg == null) { // No message indicates that the message queue is quitting. return; } // This must be in a local variable, in case a UI event sets the logger final Printer logging = me.mLogging; if (logging != null) { logging.println(">>>>> Dispatching to " + msg.target + " " + msg.callback + ": " + msg.what); } // 跟蹤消息 final long traceTag = me.mTraceTag; if (traceTag != 0 && Trace.isTagEnabled(traceTag)) { Trace.traceBegin(traceTag, msg.target.getTraceName(msg)); } try { // 處理消息的派發 msg.target.dispatchMessage(msg); } finally { // 結束跟蹤 if (traceTag != 0) { Trace.traceEnd(traceTag); } } if (logging != null) { logging.println("<<<<< Finished to " + msg.target + " " + msg.callback); } // Make sure that during the course of dispatching the // identity of the thread wasn"t corrupted. final long newIdent = Binder.clearCallingIdentity(); if (ident != newIdent) { Log.wtf(TAG, "Thread identity changed from 0x" + Long.toHexString(ident) + " to 0x" + Long.toHexString(newIdent) + " while dispatching to " + msg.target.getClass().getName() + " " + msg.callback + " what=" + msg.what); } // 回收處理后的消息,將其放入消息池中,準備復用 msg.recycleUnchecked(); } }
quit --- 退出消息泵
public void quit() { mQueue.quit(false); } public void quitSafely() { mQueue.quit(true); }
二. Handlerhandler我們最普通的用法就是new出來之后,重載handleMessage方法,來等待消息觸發并在這里寫下處理。之后無非就是在合適的時候調用sendMessage發送消息了。
Handler --- 構造
public Handler(Callback callback, boolean async) { if (FIND_POTENTIAL_LEAKS) { final Class extends Handler> klass = getClass(); if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) && (klass.getModifiers() & Modifier.STATIC) == 0) { Log.w(TAG, "The following Handler class should be static or leaks might occur: " + klass.getCanonicalName()); } } mLooper = Looper.myLooper(); if (mLooper == null) { throw new RuntimeException( "Can"t create handler inside thread that has not called Looper.prepare()"); } mQueue = mLooper.mQueue; mCallback = callback; mAsynchronous = async; } public Handler(Looper looper, Callback callback, boolean async) { mLooper = looper; mQueue = looper.mQueue; mCallback = callback; mAsynchronous = async; }
dispatchMessage --- 消息派發
public interface Callback { public boolean handleMessage(Message msg); } public void dispatchMessage(Message msg) { if (msg.callback != null) { handleCallback(msg); } else { if (mCallback != null) { if (mCallback.handleMessage(msg)) { return; } } handleMessage(msg); } } private static void handleCallback(Message message) { message.callback.run(); }
sendMessage --- 發送消息
public final boolean sendMessageDelayed(Message msg, long delayMillis) { if (delayMillis < 0) { delayMillis = 0; } return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis); } public boolean sendMessageAtTime(Message msg, long uptimeMillis) { MessageQueue queue = mQueue; if (queue == null) { RuntimeException e = new RuntimeException( this + " sendMessageAtTime() called with no mQueue"); Log.w("Looper", e.getMessage(), e); return false; } return enqueueMessage(queue, msg, uptimeMillis); } private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) { msg.target = this; if (mAsynchronous) { msg.setAsynchronous(true); } return queue.enqueueMessage(msg, uptimeMillis); }
public final boolean post(Runnable r) { return sendMessageDelayed(getPostMessage(r), 0); } private static Message getPostMessage(Runnable r) { Message m = Message.obtain(); m.callback = r; return m; }
三. Message既然message是載體,那么先來看看數據內容:
// 消息的唯一key public int what; // 消息支持的2個參數,都是int類型 public int arg1; public int arg2; // 消息內容 public Object obj; // 這個是一個應答的信使,其實是和信使服務有關系的一個東西,這里暫時不做解釋 public Messenger replyTo; // 消息觸發的時間 /*package*/ long when; // 消息相應的handler /*package*/ Handler target; // 消息回調 /*package*/ Runnable callback; // 本消息的下一個 /*package*/ Message next; // 消息池,其實就是第一個消息 private static Message sPool; // 消息池當前的大小 private static int sPoolSize = 0;
obtain --- 獲取消息
public static Message obtain() { synchronized (sPoolSync) { if (sPool != null) { Message m = sPool; sPool = m.next; m.next = null; m.flags = 0; // clear in-use flag sPoolSize--; return m; } } return new Message(); }
recycle --- 回收消息
public void recycle() { if (isInUse()) { if (gCheckRecycle) { throw new IllegalStateException("This message cannot be recycled because it " + "is still in use."); } return; } recycleUnchecked(); } void recycleUnchecked() { // Mark the message as in use while it remains in the recycled object pool. // Clear out all other details. flags = FLAG_IN_USE; what = 0; arg1 = 0; arg2 = 0; obj = null; replyTo = null; sendingUid = -1; when = 0; target = null; callback = null; data = null; synchronized (sPoolSync) { if (sPoolSize < MAX_POOL_SIZE) { next = sPool; sPool = this; sPoolSize++; } } }
四. MessageQueueMessageQueue里會涉及到c層,也就是native層的內容,其實他大部分核心內容都是在c層完成的。java層是個銜接部分。
MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; mPtr = nativeInit(); }
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) { NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); if (!nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue"); return 0; } nativeMessageQueue->incStrong(env); return reinterpret_cast(nativeMessageQueue); }
獲取隊列消息 --- next
Message next() { // 拿到初始化時候保存的地址,即是c層NativeMessageQueue對象的地址 final long ptr = mPtr; if (ptr == 0) { return null; } int pendingIdleHandlerCount = -1; // -1 only during first iteration int nextPollTimeoutMillis = 0; // 進入循環,為了獲取到消息 for (;;) { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); } // 阻塞,有超時 nativePollOnce(ptr, nextPollTimeoutMillis); synchronized (this) { // Try to retrieve the next message. Return if found. final long now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; // 當消息的handler為Null,找下一個異步的消息 if (msg != null && msg.target == null) { // Stalled by a barrier. Find the next asynchronous message in the queue. do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null) { if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. // 如果消息的觸發時間大于當前時鐘,則設置下一次阻塞等待超時為這個差值 nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // 得到并返回一個message,這里是個鏈表操作 mBlocked = false; if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (DEBUG) Log.v(TAG, "Returning message: " + msg); msg.markInUse(); return msg; } } else { // No more messages. nextPollTimeoutMillis = -1; } // 退出情況的判斷 if (mQuitting) { dispose(); return null; } // 空閑時候的idlerHandler處理 if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0) { // No idle handlers to run. Loop and wait some more. mBlocked = true; continue; } if (mPendingIdleHandlers == null) { mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; } mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); } // Run the idle handlers. // We only ever reach this code block during the first iteration. for (int i = 0; i < pendingIdleHandlerCount; i++) { final IdleHandler idler = mPendingIdleHandlers[i]; mPendingIdleHandlers[i] = null; // release the reference to the handler boolean keep = false; try { keep = idler.queueIdle(); } catch (Throwable t) { Log.wtf(TAG, "IdleHandler threw exception", t); } if (!keep) { synchronized (this) { mIdleHandlers.remove(idler); } } } // Reset the idle handler count to 0 so we do not run them again. pendingIdleHandlerCount = 0; // While calling an idle handler, a new message could have been delivered // so go back and look again for a pending message without waiting. nextPollTimeoutMillis = 0; } }
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis); }
五. c層運轉初始化
NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) { mLooper = Looper::getForThread(); if (mLooper == NULL) { mLooper = new Looper(false); Looper::setForThread(mLooper); } }
Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false), mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s", strerror(errno)); AutoMutex _l(mLock); rebuildEpollLocked(); } void Looper::rebuildEpollLocked() { // Close old epoll instance if we have one. if (mEpollFd >= 0) { #if DEBUG_CALLBACKS ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this); #endif close(mEpollFd); } // Allocate the new epoll instance and register the wake pipe. mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno)); struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = EPOLLIN; eventItem.data.fd = mWakeEventFd; int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem); LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s", strerror(errno)); for (size_t i = 0; i < mRequests.size(); i++) { const Request& request = mRequests.valueAt(i); struct epoll_event eventItem; request.initEventItem(&eventItem); int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem); if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s", request.fd, strerror(errno)); } } }
讀取消息 --- nativePollOnce
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis); } void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) { mPollEnv = env; mPollObj = pollObj; mLooper->pollOnce(timeoutMillis); mPollObj = NULL; mPollEnv = NULL; if (mExceptionObj) { env->Throw(mExceptionObj); env->DeleteLocalRef(mExceptionObj); mExceptionObj = NULL; } }
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) { // 處理每個response里的request,如果沒有回調,直接返回 while (mResponseIndex < mResponses.size()) { const Response& response = mResponses.itemAt(mResponseIndex++); int ident = response.request.ident; if (ident >= 0) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning signalled identifier %d: " "fd=%d, events=0x%x, data=%p", this, ident, fd, events, data); #endif if (outFd != NULL) *outFd = fd; if (outEvents != NULL) *outEvents = events; if (outData != NULL) *outData = data; return ident; } } if (result != 0) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning result %d", this, result); #endif if (outFd != NULL) *outFd = 0; if (outEvents != NULL) *outEvents = 0; if (outData != NULL) *outData = NULL; return result; } result = pollInner(timeoutMillis); } }
...... // Poll. int result = POLL_WAKE; mResponses.clear(); mResponseIndex = 0; // We are about to idle. mPolling = true; // 最大處理16個fd struct epoll_event eventItems[EPOLL_MAX_EVENTS]; 等待事件發生或超時 int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // No longer idling. mPolling = false; // Acquire lock. mLock.lock(); // 如果需要進行重建epoll if (mEpollRebuildRequired) { mEpollRebuildRequired = false; rebuildEpollLocked(); goto Done; } // <0錯誤處理,直接跳轉到Done if (eventCount < 0) { if (errno == EINTR) { goto Done; } ALOGW("Poll failed with an unexpected error: %s", strerror(errno)); result = POLL_ERROR; goto Done; } // 超時,跳轉到Done if (eventCount == 0) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - timeout", this); #endif result = POLL_TIMEOUT; goto Done; } ...... // 循環處理獲取到的event for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeEventFd) { // 如果是喚醒的fd,執行喚醒處理 if (epollEvents & EPOLLIN) { awoken(); } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents); } } else { // 否則,處理每個request ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { // 創建新的events,并通過pushResponse生成新的response,push int events = 0; if (epollEvents & EPOLLIN) events |= EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP; pushResponse(events, mRequests.valueAt(requestIndex)); } else { ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); } } } Done: ; // Invoke pending message callbacks. mNextMessageUptime = LLONG_MAX; // 處理堆積未處理的事件 while (mMessageEnvelopes.size() != 0) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= now) { // Remove the envelope from the list. // We keep a strong reference to the handler until the call to handleMessage // finishes. Then we drop it so that the handler can be deleted *before* // we reacquire our lock. { // obtain handler sphandler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mLock.unlock(); #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d", this, handler.get(), message.what); #endif handler->handleMessage(message); } // release handler mLock.lock(); mSendingMessage = false; result = POLL_CALLBACK; } else { // The last message left at the head of the queue determines the next wakeup time. mNextMessageUptime = messageEnvelope.uptime; break; } } // Release lock. mLock.unlock(); // 處理每個response for (size_t i = 0; i < mResponses.size(); i++) { Response& response = mResponses.editItemAt(i); if (response.request.ident == POLL_CALLBACK) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", this, response.request.callback.get(), fd, events, data); #endif // Invoke the callback. Note that the file descriptor may be closed by // the callback (and potentially even reused) before the function returns so // we need to be a little careful when removing the file descriptor afterwards. int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { removeFd(fd, response.request.seq); } // Clear the callback reference in the response structure promptly because we // will not clear the response vector itself until the next poll. response.request.callback.clear(); result = POLL_CALLBACK; } } return result;
struct Request { int fd; int ident; int events; int seq; spcallback; void* data; void initEventItem(struct epoll_event* eventItem) const; }; struct Response { int events; Request request; };
struct MessageEnvelope { MessageEnvelope() : uptime(0) { } MessageEnvelope(nsecs_t u, const sph, const Message& m) : uptime(u), handler(h), message(m) { } nsecs_t uptime; sp handler; Message message; };
處理消息 --- handler的調用
class?LooperCallback?:?public?virtual?RefBase?{ protected: ????virtual?~LooperCallback(); public: ????/** ?????*?Handles?a?poll?event?for?the?given?file?descriptor. ?????*?It?is?given?the?file?descriptor?it?is?associated?with, ?????*?a?bitmask?of?the?poll?events?that?were?triggered?(typically?EVENT_INPUT), ?????*?and?the?data?pointer?that?was?originally?supplied. ?????* ?????*?Implementations?should?return?1?to?continue?receiving?callbacks,?or?0 ?????*?to?have?this?file?descriptor?and?callback?unregistered?from?the?looper. ?????*/ ????virtual?int?handleEvent(int?fd,?int?events,?void*?data)?=?0; };
int?NativeMessageQueue::handleEvent(int?fd,?int?looperEvents,?void*?data)?{ ????int?events?=?0; ????if?(looperEvents?&?Looper::EVENT_INPUT)?{ ????????events?|=?CALLBACK_EVENT_INPUT; ????} ????if?(looperEvents?&?Looper::EVENT_OUTPUT)?{ ????????events?|=?CALLBACK_EVENT_OUTPUT; ????} ????if?(looperEvents?&?(Looper::EVENT_ERROR?|?Looper::EVENT_HANGUP?|?Looper::EVENT_INVALID))?{ ????????events?|=?CALLBACK_EVENT_ERROR; ????} ????int?oldWatchedEvents?=?reinterpret_cast(data); ????int?newWatchedEvents?=?mPollEnv->CallIntMethod(mPollObj, ????????????gMessageQueueClassInfo.dispatchEvents,?fd,?events); ????if?(!newWatchedEvents)?{ ????????return?0;?//?unregister?the?fd ????} ????if?(newWatchedEvents?!=?oldWatchedEvents)?{ ????????setFileDescriptorEvents(fd,?newWatchedEvents); ????} ????return?1; }
private?int?dispatchEvents(int?fd,?int?events)?{ ????????//?Get?the?file?descriptor?record?and?any?state?that?might?change. ????????final?FileDescriptorRecord?record; ????????final?int?oldWatchedEvents; ????????final?OnFileDescriptorEventListener?listener; ????????final?int?seq; ????????synchronized?(this)?{ ????????????record?=?mFileDescriptorRecords.get(fd); ????????????if?(record?==?null)?{ ????????????????return?0;?//?spurious,?no?listener?registered ????????????} ????????????oldWatchedEvents?=?record.mEvents; ????????????events?&=?oldWatchedEvents;?//?filter?events?based?on?current?watched?set ????????????if?(events?==?0)?{ ????????????????return?oldWatchedEvents;?//?spurious,?watched?events?changed ????????????} ????????????listener?=?record.mListener; ????????????seq?=?record.mSeq; ????????} ????????//?Invoke?the?listener?outside?of?the?lock. ????????int?newWatchedEvents?=?listener.onFileDescriptorEvents( ????????????????record.mDescriptor,?events); ????????if?(newWatchedEvents?!=?0)?{ ????????????newWatchedEvents?|=?OnFileDescriptorEventListener.EVENT_ERROR; ????????} ????????//?Update?the?file?descriptor?record?if?the?listener?changed?the?set?of ????????//?events?to?watch?and?the?listener?itself?hasn"t?been?updated?since. ????????if?(newWatchedEvents?!=?oldWatchedEvents)?{ ????????????synchronized?(this)?{ ????????????????int?index?=?mFileDescriptorRecords.indexOfKey(fd); ????????????????if?(index?>=?0?&&?mFileDescriptorRecords.valueAt(index)?==?record ????????????????????????&&?record.mSeq?==?seq)?{ ????????????????????record.mEvents?=?newWatchedEvents; ????????????????????if?(newWatchedEvents?==?0)?{ ????????????????????????mFileDescriptorRecords.removeAt(index); ????????????????????} ????????????????} ????????????} ????????} ????????//?Return?the?new?set?of?events?to?watch?for?native?code?to?take?care?of. ????????return?newWatchedEvents; ????}
