本文主要是介绍【从源码看Android】03Android MessageQueue消息循环处理机制(epoll实现),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1 enqueueMessage
handler发送一条消息
- mHandler.sendEmptyMessage(1);
Handler.java
- public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
- MessageQueue queue = mQueue;
- if (queue == null) {
- RuntimeException e = new RuntimeException(
- this + " sendMessageAtTime() called with no mQueue");
- Log.w("Looper", e.getMessage(), e);
- return false;
- }
- return enqueueMessage(queue, msg, uptimeMillis);
- }
最后调用到Handler私有的函数enqueueMessage,把handler对象赋值给msg.target,调用queue.enqueueMessage
Handler.java- private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
- msg.target = this;
- if (mAsynchronous) {
- msg.setAsynchronous(true);
- }
- return queue.enqueueMessage(msg, uptimeMillis);
- }
下面是核心代码,首先是获得同步锁,
MessageQueue.java
- boolean enqueueMessage(Message msg, long when) {
- if (msg.isInUse()) {
- throw new AndroidRuntimeException(msg + " This message is already in use.");
- }
- if (msg.target == null) {
- throw new AndroidRuntimeException("Message must have a target.");
- }
- synchronized (this) {
- if (mQuitting) {
- RuntimeException e = new RuntimeException(
- msg.target + " sending message to a Handler on a dead thread");
- Log.w("MessageQueue", e.getMessage(), e);
- return false;
- }
- msg.when = when;
- Message p = mMessages;
- boolean needWake;
- if (p == null || when == 0 || when < p.when) {
- // New head, wake up the event queue if blocked.
- msg.next = p;
- mMessages = msg;
- needWake = mBlocked;
- } else {
- // Inserted within the middle of the queue. Usually we don't have to wake
- // up the event queue unless there is a barrier at the head of the queue
- // and the message is the earliest asynchronous message in the queue.
- needWake = mBlocked && p.target == null && msg.isAsynchronous();
- Message prev;
- for (;;) {
- prev = p;
- p = p.next;
- if (p == null || when < p.when) {
- break;
- }
- if (needWake && p.isAsynchronous()) {
- needWake = false;
- }
- }
- msg.next = p; // invariant: p == prev.next
- prev.next = msg;
- }
- // We can assume mPtr != 0 because mQuitting is false.
- if (needWake) {
- nativeWake(mPtr);
- }
- }
- return true;
- }
首先是获得自身的同步锁synchronized (this),接着这个msg跟MessageQueue实例的头结点Message进行触发时间先后的比较,
如果触发时间比现有的头结点Message前,则这个新的Message作为整个MessageQueue的头结点,如果阻塞着,则立即唤醒线程处理
如果触发时间比头结点晚,则按照触发时间先后,在消息队列中间插入这个结点
接着如果需要唤醒,则调用nativeWake函数
在android_os_MessageQueue.cpp里定义了nativeWake函数
- static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj, jint ptr) {
- NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
- return nativeMessageQueue->wake();
- }
实际调用到mLooper->wake();
android_os_MessageQueue.cpp
- void NativeMessageQueue::wake() {
- mLooper->wake();
- }
framework/base/libs/utils/Looper.cpp
- void Looper::wake() {
- #if DEBUG_POLL_AND_WAKE
- LOGD("%p ~ wake", this);
- #endif
- #ifdef LOOPER_STATISTICS
- // FIXME: Possible race with awoken() but this code is for testing only and is rarely enabled.
- if (mPendingWakeCount++ == 0) {
- mPendingWakeTime = systemTime(SYSTEM_TIME_MONOTONIC);
- }
- #endif
- ssize_t nWrite;
- do {
- nWrite = write(mWakeWritePipeFd, "W", 1);
- } while (nWrite == -1 && errno == EINTR);
- if (nWrite != 1) {
- if (errno != EAGAIN) {
- LOGW("Could not write wake signal, errno=%d", errno);
- }
- }
- }
是不是很熟悉?基本就是上一讲epoll原型的唤醒函数,向mWakeWritePipeFD写入1字节,唤醒监听block在mWakeReadPipeFD端口的epoll_wait
2 dequeueMessage
首先dequeueMessage只是我取的一个叫法,当java层的Looper进行loop的时候,就已经在不停地读取MessageQueue里的Message了
Looper.java
- public static void loop() {
- final Looper me = myLooper();
- if (me == null) {
- throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
- }
- final MessageQueue queue = me.mQueue;
- // Make sure the identity of this thread is that of the local process,
- // and keep track of what that identity token actually is.
- Binder.clearCallingIdentity();
- final long ident = Binder.clearCallingIdentity();
- for (;;) {
- Message msg = queue.next(); // might block
- if (msg == null) {
- // No message indicates that the message queue is quitting.
- return;
- }
- // This must be in a local variable, in case a UI event sets the logger
- Printer logging = me.mLogging;
- if (logging != null) {
- logging.println(">>>>> Dispatching to " + msg.target + " " +
- msg.callback + ": " + msg.what);
- }
- msg.target.dispatchMessage(msg);
- if (logging != null) {
- logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
- }
- // Make sure that during the course of dispatching the
- // identity of the thread wasn't corrupted.
- final long newIdent = Binder.clearCallingIdentity();
- if (ident != newIdent) {
- Log.wtf(TAG, "Thread identity changed from 0x"
- + Long.toHexString(ident) + " to 0x"
- + Long.toHexString(newIdent) + " while dispatching to "
- + msg.target.getClass().getName() + " "
- + msg.callback + " what=" + msg.what);
- }
- msg.recycle();
- }
- }
调用queue.next()读取下一条消息(在loop调用的线程中),如果读取到了就msg,target.dispatchMessage,
下面来看看queue.next()如何实现
MessageQueue.java
- Message next() {
- int pendingIdleHandlerCount = -1; // -1 only during first iteration
- int nextPollTimeoutMillis = 0;
- for (;;) {
- if (nextPollTimeoutMillis != 0) {
- Binder.flushPendingCommands();
- }
- // We can assume mPtr != 0 because the loop is obviously still running.
- // The looper will not call this method after the loop quits.
- nativePollOnce(mPtr, nextPollTimeoutMillis);
- synchronized (this) {
- // Try to retrieve the next message. Return if found.
- final long now = SystemClock.uptimeMillis();
- Message prevMsg = null;
- Message msg = mMessages;
- if (msg != null && msg.target == null) {
- // Stalled by a barrier. Find the next asynchronous message in the queue.
- do {
- prevMsg = msg;
- msg = msg.next;
- } while (msg != null && !msg.isAsynchronous());
- }
- if (msg != null) {
- if (now < msg.when) {
- // Next message is not ready. Set a timeout to wake up when it is ready.
- nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
- } else {
- // Got a message.
- mBlocked = false;
- if (prevMsg != null) {
- prevMsg.next = msg.next;
- } else {
- mMessages = msg.next;
- }
- msg.next = null;
- if (false) Log.v("MessageQueue", "Returning message: " + msg);
- msg.markInUse();
- return msg;
- }
- } else {
- // No more messages.
- nextPollTimeoutMillis = -1;
- }
- // Process the quit message now that all pending messages have been handled.
- if (mQuitting) {
- dispose();
- return null;
- }
- // If first time idle, then get the number of idlers to run.
- // Idle handles only run if the queue is empty or if the first message
- // in the queue (possibly a barrier) is due to be handled in the future.
- if (pendingIdleHandlerCount < 0
- && (mMessages == null || now < mMessages.when)) {
- pendingIdleHandlerCount = mIdleHandlers.size();
- }
- if (pendingIdleHandlerCount <= 0) {
- // No idle handlers to run. Loop and wait some more.
- mBlocked = true;
- continue;
- }
- if (mPendingIdleHandlers == null) {
- mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
- }
- mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
- }
- // Run the idle handlers.
- // We only ever reach this code block during the first iteration.
- for (int i = 0; i < pendingIdleHandlerCount; i++) {
- final IdleHandler idler = mPendingIdleHandlers[i];
- mPendingIdleHandlers[i] = null; // release the reference to the handler
- boolean keep = false;
- try {
- keep = idler.queueIdle();
- } catch (Throwable t) {
- Log.wtf("MessageQueue", "IdleHandler threw exception", t);
- }
- if (!keep) {
- synchronized (this) {
- mIdleHandlers.remove(idler);
- }
- }
- }
- // Reset the idle handler count to 0 so we do not run them again.
- pendingIdleHandlerCount = 0;
- // While calling an idle handler, a new message could have been delivered
- // so go back and look again for a pending message without waiting.
- nextPollTimeoutMillis = 0;
- }
- }
首先是个包内函数,所以在同一个包中(android.os)的Looper对象能调用到
nativePollOnce(mPtr, nextPollTimeoutMillis);函数待会展开,功能是调用上一讲的epoll_wait,
nextPollTimeoutMillis超时时间为下一条Message的触发时间,如果没有消息则会一直阻塞到超过超时时间
被唤醒后,我们暂时先忽略barrier类型的Message(这是android4.1后加入的一个特性Choreographer,http://blog.csdn.net/innost/article/details/8272867),
如果头结点msg不为null,就判断现在到了这条msg触发时间没有,
如果没到,则nextPollTimeoutMillis设置为这个条消息需要执行的时间和现在的时间差,给for循环下一次调用nativePollOnce时使用
如果到了甚至超过了,则取出这条msg,退出for循环返回这条msg,给上面上的handler进行dispatch
那么nativePollOnce具体是如何实现的呢?
android_os_MessageQueue.cpp
- static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
- jint ptr, jint timeoutMillis) {
- NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
- nativeMessageQueue->pollOnce(timeoutMillis);
- }
android_os_MessageQueue.cpp
- void NativeMessageQueue::pollOnce(int timeoutMillis) {
- mLooper->pollOnce(timeoutMillis);
- }
同样,在framework/base/libs/utils/Looper.cpp中
- int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
- int result = 0;
- for (;;) {
- while (mResponseIndex < mResponses.size()) {
- const Response& response = mResponses.itemAt(mResponseIndex++);
- if (! response.request.callback) {
- #if DEBUG_POLL_AND_WAKE
- LOGD("%p ~ pollOnce - returning signalled identifier %d: "
- "fd=%d, events=0x%x, data=%p", this,
- response.request.ident, response.request.fd,
- response.events, response.request.data);
- #endif
- if (outFd != NULL) *outFd = response.request.fd;
- if (outEvents != NULL) *outEvents = response.events;
- if (outData != NULL) *outData = response.request.data;
- return response.request.ident;
- }
- }
- if (result != 0) {
- #if DEBUG_POLL_AND_WAKE
- LOGD("%p ~ pollOnce - returning result %d", this, result);
- #endif
- if (outFd != NULL) *outFd = 0;
- if (outEvents != NULL) *outEvents = NULL;
- if (outData != NULL) *outData = NULL;
- return result;
- }
- result = pollInner(timeoutMillis);
- }
- }
调用到pollInner
framework/base/libs/utils/Looper.cpp
- nt Looper::pollInner(int timeoutMillis) {
- #if DEBUG_POLL_AND_WAKE
- LOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
- #endif
- int result = ALOOPER_POLL_WAKE;
- mResponses.clear();
- mResponseIndex = 0;
- #ifdef LOOPER_STATISTICS
- nsecs_t pollStartTime = systemTime(SYSTEM_TIME_MONOTONIC);
- #endif
- #ifdef LOOPER_USES_EPOLL
- struct epoll_event eventItems[EPOLL_MAX_EVENTS];
- int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
- bool acquiredLock = false;
- #else
- // Wait for wakeAndLock() waiters to run then set mPolling to true.
- mLock.lock();
- while (mWaiters != 0) {
- mResume.wait(mLock);
- }
- mPolling = true;
- mLock.unlock();
- size_t requestedCount = mRequestedFds.size();
- int eventCount = poll(mRequestedFds.editArray(), requestedCount, timeoutMillis);
- #endif
- if (eventCount < 0) {
- if (errno == EINTR) {
- goto Done;
- }
- LOGW("Poll failed with an unexpected error, errno=%d", errno);
- result = ALOOPER_POLL_ERROR;
- goto Done;
- }
- if (eventCount == 0) {
- #if DEBUG_POLL_AND_WAKE
- LOGD("%p ~ pollOnce - timeout", this);
- #endif
- result = ALOOPER_POLL_TIMEOUT;
- goto Done;
- }
- #if DEBUG_POLL_AND_WAKE
- LOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
- #endif
- #ifdef LOOPER_USES_EPOLL
- for (int i = 0; i < eventCount; i++) {
- int fd = eventItems[i].data.fd;
- uint32_t epollEvents = eventItems[i].events;
- if (fd == mWakeReadPipeFd) {
- if (epollEvents & EPOLLIN) {
- awoken();
- } else {
- LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
- }
- } else {
- if (! acquiredLock) {
- mLock.lock();
- acquiredLock = true;
- }
- ssize_t requestIndex = mRequests.indexOfKey(fd);
- if (requestIndex >= 0) {
- int events = 0;
- if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
- if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
- if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
- if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
- pushResponse(events, mRequests.valueAt(requestIndex));
- } else {
- LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
- "no longer registered.", epollEvents, fd);
- }
- }
- }
- if (acquiredLock) {
- mLock.unlock();
- }
- Done: ;
- #else
- for (size_t i = 0; i < requestedCount; i++) {
- const struct pollfd& requestedFd = mRequestedFds.itemAt(i);
- short pollEvents = requestedFd.revents;
- if (pollEvents) {
- if (requestedFd.fd == mWakeReadPipeFd) {
- if (pollEvents & POLLIN) {
- awoken();
- } else {
- LOGW("Ignoring unexpected poll events 0x%x on wake read pipe.", pollEvents);
- }
- } else {
- int events = 0;
- if (pollEvents & POLLIN) events |= ALOOPER_EVENT_INPUT;
- if (pollEvents & POLLOUT) events |= ALOOPER_EVENT_OUTPUT;
- if (pollEvents & POLLERR) events |= ALOOPER_EVENT_ERROR;
- if (pollEvents & POLLHUP) events |= ALOOPER_EVENT_HANGUP;
- if (pollEvents & POLLNVAL) events |= ALOOPER_EVENT_INVALID;
- pushResponse(events, mRequests.itemAt(i));
- }
- if (--eventCount == 0) {
- break;
- }
- }
- }
- Done:
- // Set mPolling to false and wake up the wakeAndLock() waiters.
- mLock.lock();
- mPolling = false;
- if (mWaiters != 0) {
- mAwake.broadcast();
- }
- mLock.unlock();
- #endif
- #ifdef LOOPER_STATISTICS
- nsecs_t pollEndTime = systemTime(SYSTEM_TIME_MONOTONIC);
- mSampledPolls += 1;
- if (timeoutMillis == 0) {
- mSampledZeroPollCount += 1;
- mSampledZeroPollLatencySum += pollEndTime - pollStartTime;
- } else if (timeoutMillis > 0 && result == ALOOPER_POLL_TIMEOUT) {
- mSampledTimeoutPollCount += 1;
- mSampledTimeoutPollLatencySum += pollEndTime - pollStartTime
- - milliseconds_to_nanoseconds(timeoutMillis);
- }
- if (mSampledPolls == SAMPLED_POLLS_TO_AGGREGATE) {
- LOGD("%p ~ poll latency statistics: %0.3fms zero timeout, %0.3fms non-zero timeout", this,
- 0.000001f * float(mSampledZeroPollLatencySum) / mSampledZeroPollCount,
- 0.000001f * float(mSampledTimeoutPollLatencySum) / mSampledTimeoutPollCount);
- mSampledPolls = 0;
- mSampledZeroPollCount = 0;
- mSampledZeroPollLatencySum = 0;
- mSampledTimeoutPollCount = 0;
- mSampledTimeoutPollLatencySum = 0;
- }
- #endif
- for (size_t i = 0; i < mResponses.size(); i++) {
- const Response& response = mResponses.itemAt(i);
- if (response.request.callback) {
- #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
- LOGD("%p ~ pollOnce - invoking callback: fd=%d, events=0x%x, data=%p", this,
- response.request.fd, response.events, response.request.data);
- #endif
- int callbackResult = response.request.callback(
- response.request.fd, response.events, response.request.data);
- if (callbackResult == 0) {
- removeFd(response.request.fd);
- }
- result = ALOOPER_POLL_CALLBACK;
- }
- }
- return result;
- }
主要看#ifdef LOOPER_USES_EPOLL部分
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
等待所有attach到mEpollFd上的事件,如果收到唤醒信号继续执行,否则阻塞等待
之后的#ifdef LOOPER_USES_EPOLL部分
- #ifdef LOOPER_USES_EPOLL
- for (int i = 0; i < eventCount; i++) {
- int fd = eventItems[i].data.fd;
- uint32_t epollEvents = eventItems[i].events;
- if (fd == mWakeReadPipeFd) {
- if (epollEvents & EPOLLIN) {
- awoken();
- } else {
- LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
- }
- } else {
- if (! acquiredLock) {
- mLock.lock();
- acquiredLock = true;
- }
- ssize_t requestIndex = mRequests.indexOfKey(fd);
- if (requestIndex >= 0) {
- int events = 0;
- if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
- if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
- if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
- if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
- pushResponse(events, mRequests.valueAt(requestIndex));
- } else {
- LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
- "no longer registered.", epollEvents, fd);
- }
- }
- }
- if (acquiredLock) {
- mLock.unlock();
- }
- Done: ;
对所有attach在mEpollFd上的事件进行遍历,如果对象文件描述符有mWakeReadPipeFd,则awoken()
framework/base/libs/utils/Looper.cpp
- void Looper::awoken() {
- #if DEBUG_POLL_AND_WAKE
- LOGD("%p ~ awoken", this);
- #endif
- #ifdef LOOPER_STATISTICS
- if (mPendingWakeCount == 0) {
- LOGD("%p ~ awoken: spurious!", this);
- } else {
- mSampledWakeCycles += 1;
- mSampledWakeCountSum += mPendingWakeCount;
- mSampledWakeLatencySum += systemTime(SYSTEM_TIME_MONOTONIC) - mPendingWakeTime;
- mPendingWakeCount = 0;
- mPendingWakeTime = -1;
- if (mSampledWakeCycles == SAMPLED_WAKE_CYCLES_TO_AGGREGATE) {
- LOGD("%p ~ wake statistics: %0.3fms wake latency, %0.3f wakes per cycle", this,
- 0.000001f * float(mSampledWakeLatencySum) / mSampledWakeCycles,
- float(mSampledWakeCountSum) / mSampledWakeCycles);
- mSampledWakeCycles = 0;
- mSampledWakeCountSum = 0;
- mSampledWakeLatencySum = 0;
- }
- }
- #endif
- char buffer[16];
- ssize_t nRead;
- do {
- nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
- } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
- }
awoken()即上一讲中得awoken()函数,用于把mWakeReadPipeFd上的数据读取干净,因为mWakeWriteReadPipeFd可能写入多次
读取干净后下一次epoll_wait时就会等待mWakeWriteReadPipeFd写入,如果没有读取干净,即通知epoll内核和mWakeReadPipeFd这个事件相关的处理完毕了,
否则epoll_wait就一直会触发对应的事件了(不等待新的写入,一直不阻塞)
3 总结
那么至此,enqueueMessage和定义dequeueMessage都解释清楚,感觉豁然开朗了有木有!!!!
下一讲讲nativeapp的线程消息循环处理过程(主要解读android_native_app_glue.c)
欢迎各位指正!!
4 reference
android sdk sourcecode
android framework sourcecode
这篇关于【从源码看Android】03Android MessageQueue消息循环处理机制(epoll实现)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!