Android中消息处理机制研究 上篇

消息机制概述

Android应用程序的每一个线程在启动时,都可以首先在内部创建一个消息队列,然后再进入一个无限循环中,不断检查它的消息队列是否有新的消息需要处理,如果有新的消息需要处理,那么线程就会将它从消息队列中取出来,并且对它进行处理;否则线程就会进入睡眠等待状态,直到有新的消息需要处理为止,这样就可以通过消息来驱动Android应用程序的执行。

消息机制组成部分

Android系统主要通过Messagequeue,Looper,Handler三个类来实现Android应用程序的消息处理机制:

1.Messagequeue:描述消息队列

2.Looper:创建消息队列,以及进入消息循环

3.Handler:用来发送消息和处理消息

消息处理流程

1、程序启动的时候,主线程会创建一个Looper对象。Looper对象初始化一个MessageQueue,然后调用loop()方法循环去读取消息。
2、初始化Handler的时候,在Handler的构造函数内部,会获取当前线程的Looper对象,进而获取MessageQueue对象。由此可见,想要操作UI的Handler必须在主线程中创建。否则会提示你:【”Can’tcreate handler inside thread that has not called Looper.prepare()”】
3、调用Handler的相关方法时,会获取Message对象,将消息对象的target指向当前handler对象,然后放到消息队列中。
4、loop()工作中,会从消息队列中获取一个个的消息,调用handle的dispatchMessage(msg)分发处理。
5、Message内部维护一个消息池,用来回收缓存message对象。

6、Looper相当于一个发动机,MessageQueue相当于流水线,Message相当于一个个的物品,而Handler就相当于工人。


1.创建Java层Looper对象:

private static void prepare(boolean quitAllowed) {
       if (sThreadLocal.get() != null) {
       throw new RuntimeException("Only one Looper may be created per thread");
       }
       sThreadLocal.set(new Looper(quitAllowed));//创建looper对象
}

2.Looper对象内部创建一个MessageQueue对象(mQueue):

private Looper(boolean quitAllowed) {
       //Looper在创建的时候会创建一个MessageQueue对象
       mQueue = new MessageQueue(quitAllowed); 
       mThread = Thread.currentThread();
}

3.调用MessageQueue的nativeInit方法创建一个NativeMessageQueue对象:

MessageQueue(boolean quitAllowed) {
       mQuitAllowed = quitAllowed;
       mPtr = nativeInit();//..
}

4.nativeInit方法返回NativeMessageQueue地址给mPtr:

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
      //在C++层通过此方法创建一个NativeMessageQueue对象
      NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
      if (!nativeMessageQueue) {
        jniThrowRuntimeException(env, "Unable to allocate native queue");
        return 0;
      }
      nativeMessageQueue->incStrong(env);
      //返回nativeMessageQueue地址给Java层;
      return reinterpret_cast<jlong>(nativeMessageQueue);
}

5.NativeMessageQueue创建时内部创建一个C++层Looper(Native)对象:

NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
     //NativeMessageQueue创建时会创建一个Looper(Native)对象
     mLooper = Looper::getForThread();
     if (mLooper == NULL) {
        mLooper = new Looper(false);
        Looper::setForThread(mLooper);
     }
}

6.Looper(Native)创建时内部创建一个管道通过两个文件描述符管理它:

Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mResponseIndex(0) {
     int wakeFds[2];
     int result = pipe(wakeFds);//创建一个管道
     LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe.  errno=%d", errno);

     mWakeReadPipeFd = wakeFds[0];//读端文件描述符
     mWakeWritePipeFd = wakeFds[1];//写端文件描述符

     result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
     LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking.  errno=%d",errno);

     result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
     LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking.  errno=%d",errno);

#ifdef LOOPER_USES_EPOLL
     // Allocate the epoll instance and register the wake pipe.
     mEpollFd = epoll_create(EPOLL_SIZE_HINT);//..
     LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance.  errno=%d", errno);

     struct epoll_event eventItem;
     memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
     eventItem.events = EPOLLIN;
     eventItem.data.fd = mWakeReadPipeFd;
     //将文件描述符放在epoll中进行管理
     result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); 
     LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance.  errno=%d",errno);
#else
     // Add the wake pipe to the head of the request list with a null callback.
     struct pollfd requestedFd;
     requestedFd.fd = mWakeReadPipeFd;
     requestedFd.events = POLLIN;
     mRequestedFds.push(requestedFd);

     Request request;
     request.fd = mWakeReadPipeFd;
     request.callback = NULL;
     request.ident = 0;
     request.data = NULL;
     mRequests.push(request);

     mPolling = false;
     mWaiters = 0;
#endif

#ifdef LOOPER_STATISTICS
    mPendingWakeTime = -1;
    mPendingWakeCount = 0;
    mSampledWakeCycles = 0;
    mSampledWakeCountSum = 0;
    mSampledWakeLatencySum = 0;

    mSampledPolls = 0;
    mSampledZeroPollCount = 0;
    mSampledZeroPollLatencySum = 0;
    mSampledTimeoutPollCount = 0;
    mSampledTimeoutPollLatencySum = 0;
#endif
}


消息循环过程

1.Looper获取当前线程MessageQueue并循环调用它的next方法检查是否有新消息需要处理:

public static void loop() {
      final Looper me = myLooper();//获取当前线程looper
      if (me == null) {
      throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
      final MessageQueue queue = me.mQueue;//获取当前线程MessageQueue

       // Make sure the identity of this thread is that of the local process,
       // and keep track of what that identity token actually is.
       Binder.clearCallingIdentity();
       final long ident = Binder.clearCallingIdentity();

       for (;;) {//不断检查是否有新消息需要处理
            Message msg = queue.next(); // might block
            if (msg == null) {
                // No message indicates that the message queue is quitting.
               return;
            }

            // This must be in a local variable, in case a UI event sets the logger
            Printer logging = me.mLogging;
            if (logging != null) {
                logging.println(">>>>> Dispatching to " + msg.target + " " +
                        msg.callback + ": " + msg.what);
            }
       //msg.target指向一个Handler对象,调用Handler的dispatchMessage方法分发消息
       msg.target.dispatchMessage(msg); 
            if (logging != null) {
                logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
            }

            // Make sure that during the course of dispatching the
            // identity of the thread wasn't corrupted.
            final long newIdent = Binder.clearCallingIdentity();
            if (ident != newIdent) {
               Log.wtf(TAG, "Thread identity changed from 0x"
                        + Long.toHexString(ident) + " to 0x"
                        + Long.toHexString(newIdent) + " while dispatching to "
                        + msg.target.getClass().getName() + " "
                        + msg.callback + " what=" + msg.what);
            }

            msg.recycleUnchecked();
        }
    }

2.MessageQueue的next方法中调用nativePollOnce函数检查当前线程的消息队列中是否有新消息要处理,如果有消息会存在mMessage中并进行处理:

Message next() {
        // Return here if the message loop has already quit and been disposed.
        // This can happen if the application tries to restart a looper after quit
        // which is not supported.
        final long ptr = mPtr;
        if (ptr == 0) {
            return null;
        }

        int pendingIdleHandlerCount = -1; // -1 only during first iteration
        int nextPollTimeoutMillis = 0;//当前线程需要进入睡眠等待状态的时间
        for (;;) {//不断调用成员函数nativePollOnce来检查当前线程的消息队列是否有新消息要处理
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }

            nativePollOnce(ptr, nextPollTimeoutMillis);//..

            synchronized (this) {
                // Try to retrieve the next message.  Return if found.
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;//当前线程需要处理的消息
                if (msg != null && msg.target == null) {
                   // Stalled by a barrier.  Find the next asynchronous message in the queue.
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
                if (msg != null) {
                    if (now < msg.when) {
                        // Next message is not ready.  Set a timeout to wake up when it is ready.
                       nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        // Got a message.
                        mBlocked = false;
                        if (prevMsg != null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        msg.next = null;
                       if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                        msg.markInUse();
                        return msg;
                    }
                } else {
                    // No more messages.
                    nextPollTimeoutMillis = -1;//没有消息就睡觉
                }

                // Process the quit message now that all pending messages have been handled.
                if (mQuitting) {
                    dispose();
                    return null;
                }

                // If first time idle, then get the number of idlers to run.
                // Idle handles only run if the queue is empty or if the first message
                // in the queue (possibly a barrier) is due to be handled in the future.
                if (pendingIdleHandlerCount < 0
                        && (mMessages == null || now < mMessages.when)) {
                    pendingIdleHandlerCount = mIdleHandlers.size();
                }
                if (pendingIdleHandlerCount <= 0) {
                    // No idle handlers to run.  Loop and wait some more.
                    mBlocked = true;
                    continue;
                }

               if (mPendingIdleHandlers == null) {
             mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                }
            mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
            }

            // Run the idle handlers.
            // We only ever reach this code block during the first iteration.
            for (int i = 0; i < pendingIdleHandlerCount; i++) {
            final IdleHandler idler = mPendingIdleHandlers[i];      
            mPendingIdleHandlers[i] = null;  //release the reference to the handler
            boolean keep = false; 
            try { 
                 keep = idler.queueIdle(); 
                 } catch (Throwable t) { 
                Log.wtf(TAG, "IdleHandler threw exception", t); 
                }
            if (!keep) { 
                            synchronized (this) {
                                    mIdleHandlers.remove(idler); 
                            }
                        } 
               } 
                         // Reset the idle handler count to 0 so we do not run them again. 
                         pendingIdleHandlerCount = 0; 
                         // While calling an idle handler, a new message could have been delivered 
                         // so go back and look again for a pending message without waiting. 
                         nextPollTimeoutMillis = 0; 
             } 
}

3.在nativePollOnce函数中调用Looper(Native)的pollOnce函数不断检查是否有新消息要处理:

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,jlong ptr, jint timeoutMillis) {
      //通过ptr找到NativeMessageQueue
      NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
      //调用nativeMessageQueue对象的pollOnce函数检查当前线程是否有新消息   
      nativeMessageQueue->pollOnce(env, obj, timeoutMillis); 
}

4.在pollOnce函数中调用polllnner函数(返回值不等于0即有新消息):

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    //调用Looper(Native)的pollOnce函数检查当前线程是否有新消息要处理
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;

    if (mExceptionObj) {
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
}

----------------------------------------------------------分割线------------------------------------------------------------------

int Looper::pollOnce(int timeoutMillis, int* outFd,int* outEvents, void** outData) {
    int result = 0;
    for (;;) {//不断调用pollInner方法检查是否有新消息
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            if (! response.request.callback) {
#if DEBUG_POLL_AND_WAKE
                LOGD("%p ~ pollOnce - returning signalled identifier %d: "
                        "fd=%d, events=0x%x, data=%p", this,
                        response.request.ident, response.request.fd,
                        response.events, response.request.data);
#endif
                if (outFd != NULL) *outFd = response.request.fd;
                if (outEvents != NULL) *outEvents = response.events;
                if (outData != NULL) *outData = response.request.data;
                return response.request.ident;
            }
        }

        if (result != 0) {
#if DEBUG_POLL_AND_WAKE
            LOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = NULL;
            if (outData != NULL) *outData = NULL;
            return result;
        }
        result = pollInner(timeoutMillis);//如果有新消息返回值不等于0
    }
}

5.polllnner函数中调用awoken方法把管道中的旧数据清理掉:

int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
    LOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif	

    int result = ALOOPER_POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;

#ifdef LOOPER_STATISTICS
    nsecs_t pollStartTime = systemTime(SYSTEM_TIME_MONOTONIC);
#endif

#ifdef LOOPER_USES_EPOLL
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
    bool acquiredLock = false;
#else
    // Wait for wakeAndLock() waiters to run then set mPolling to true.
    mLock.lock();
    while (mWaiters != 0) {
        mResume.wait(mLock);
    }
    mPolling = true;
    mLock.unlock();

    size_t requestedCount = mRequestedFds.size();
    int eventCount = poll(mRequestedFds.editArray(), requestedCount, timeoutMillis);
#endif

    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }

        LOGW("Poll failed with an unexpected error, errno=%d", errno);
        result = ALOOPER_POLL_ERROR;
        goto Done;
    }

    if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
        LOGD("%p ~ pollOnce - timeout", this);
#endif
        result = ALOOPER_POLL_TIMEOUT;
        goto Done;
    }

#if DEBUG_POLL_AND_WAKE
    LOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif

#ifdef LOOPER_USES_EPOLL
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;

    //判断发生IO事件的文件描述符是否与当前线程所关联的管道的mWakeReadPipeFd一致
        if (fd == mWakeReadPipeFd) {
            if (epollEvents & EPOLLIN) {
                awoken();
            } else {
              LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
            }
        } else {
            if (! acquiredLock) {
                mLock.lock();
                acquiredLock = true;
            }

            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
             LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }
    if (acquiredLock) {
        mLock.unlock();
    }
Done: ;
#else
    for (size_t i = 0; i < requestedCount; i++) {
        const struct pollfd& requestedFd = mRequestedFds.itemAt(i);

        short pollEvents = requestedFd.revents;
        if (pollEvents) {
            if (requestedFd.fd == mWakeReadPipeFd) {
                if (pollEvents & POLLIN) {
                    awoken();
                } else {
                 LOGW("Ignoring unexpected poll events 0x%x on wake read pipe.", pollEvents);
                }
            } else {
                int events = 0;
                if (pollEvents & POLLIN) events |= ALOOPER_EVENT_INPUT;
                if (pollEvents & POLLOUT) events |= ALOOPER_EVENT_OUTPUT;
                if (pollEvents & POLLERR) events |= ALOOPER_EVENT_ERROR;
                if (pollEvents & POLLHUP) events |= ALOOPER_EVENT_HANGUP;
                if (pollEvents & POLLNVAL) events |= ALOOPER_EVENT_INVALID;
                pushResponse(events, mRequests.itemAt(i));
            }
            if (--eventCount == 0) {
                break;
            }
        }
    }

Done:
    // Set mPolling to false and wake up the wakeAndLock() waiters.
    mLock.lock();
    mPolling = false;
    if (mWaiters != 0) {
        mAwake.broadcast();
    }
    mLock.unlock();
#endif

#ifdef LOOPER_STATISTICS
    nsecs_t pollEndTime = systemTime(SYSTEM_TIME_MONOTONIC);
    mSampledPolls += 1;
    if (timeoutMillis == 0) {
        mSampledZeroPollCount += 1;
        mSampledZeroPollLatencySum += pollEndTime - pollStartTime;
    } else if (timeoutMillis > 0 && result == ALOOPER_POLL_TIMEOUT) {
        mSampledTimeoutPollCount += 1;
        mSampledTimeoutPollLatencySum += pollEndTime - pollStartTime
                - milliseconds_to_nanoseconds(timeoutMillis);
    }
    if (mSampledPolls == SAMPLED_POLLS_TO_AGGREGATE) {
        LOGD("%p ~ poll latency statistics: %0.3fms zero timeout, %0.3fms non-zero timeout", this,
       0.000001f * float(mSampledZeroPollLatencySum) / mSampledZeroPollCount,
 0.000001f * float(mSampledTimeoutPollLatencySum) / mSampledTimeoutPollCount);
        mSampledPolls = 0;
        mSampledZeroPollCount = 0;
        mSampledZeroPollLatencySum = 0;
        mSampledTimeoutPollCount = 0;
        mSampledTimeoutPollLatencySum = 0;
    }
#endif

    for (size_t i = 0; i < mResponses.size(); i++) {
        const Response& response = mResponses.itemAt(i);
        if (response.request.callback) {
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
            LOGD("%p ~ pollOnce - invoking callback: fd=%d, events=0x%x, data=%p", this,
           response.request.fd, response.events, response.request.data);
#endif
            int callbackResult = response.request.callback(
              response.request.fd, response.events, response.request.data);
            if (callbackResult == 0) {
                removeFd(response.request.fd);
            }

            result = ALOOPER_POLL_CALLBACK;
        }
    }
    return result;
}

6.awoken方法的实现:

void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
    LOGD("%p ~ awoken", this);
#endif

#ifdef LOOPER_STATISTICS
    if (mPendingWakeCount == 0) {
        LOGD("%p ~ awoken: spurious!", this);
    } else {
        mSampledWakeCycles += 1;
        mSampledWakeCountSum += mPendingWakeCount;
        mSampledWakeLatencySum += systemTime(SYSTEM_TIME_MONOTONIC) - mPendingWakeTime;
        mPendingWakeCount = 0;
        mPendingWakeTime = -1;
        if (mSampledWakeCycles == SAMPLED_WAKE_CYCLES_TO_AGGREGATE) {
           LOGD("%p ~ wake statistics: %0.3fms wake latency, %0.3f wakes per cycle", this,
              0.000001f * float(mSampledWakeLatencySum) / mSampledWakeCycles,
                    float(mSampledWakeCountSum) / mSampledWakeCycles);
            mSampledWakeCycles = 0;
            mSampledWakeCountSum = 0;
            mSampledWakeLatencySum = 0;
        }
    }
#endif

    char buffer[16];
    ssize_t nRead;
    do {
  nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));//将管道中数据读出来
    } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}

相关阅读:Android中消息处理机制研究 下篇

本文来自网易实践者社区,经作者曹兴授权发布。