本文主要是介绍Android Media Player 框架分析-AHandler AMessage ALooper,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
在之前一篇中简单的介绍了一下setDataSource的流程,其中遇到了一个新的消息机制AHandler,其实这东西本来不需要介绍,因为消息机制原本就是一个很成熟和常见的技术技巧,这玩意里面包含了计算机哲学和计算机玄学的双学位问题,听起来牛逼轰轰,其实也就那回事了。为了这次文档的完整性,再一个后面可能要整理到公司的文档库中,所以在此介绍一下,熟悉的同学直接飘过了。
先说说为什么要用消息机制,在我们的软件开发中经常会遇到多线程的问题,由此便带来了线程同步的问题,比如各种锁,最常见的就是Mutex,而线程同步通常是有代价的,这个代价一方面来自于锁本身,而另一方面来自于程序设计,在一个有众多线程的进程中使用同步锁常常造成各种死锁问题,当然你可以说你水平高超没有这个问题,而通常解决锁之间的竞争往往在效率上也要付出相应的代价,当然了你也可以说你连这也能解决,那我也只能膜拜了。然而通常还有另一个问题,便是线程间的执行顺序问题,如果使用锁和条件变量来控制,在线程众多的情况下,可控性只会越来越差,另外通常来讲CPU的利用率也不高。那么消息机制是如何来解决这些问题的呢?
第一个问题是什么是消息机制,这里先介绍一个同步/异步的的概念,举例来说明,
比如说肚子饿了去饭店吃饭,点好菜后你坐那老老实实等着服务员上菜,什么时候菜做了好你才能结束等待,那这就是同步。
而异步就不用干等了,掏出手机,打开外卖软件,下单,OK,这时你就可以去干别的事情了,应该没有人死盯着手机干等吧,到时候饭店做好饭,小哥自然送上门。
听起来异步比同步是先进很多了,但在现实中事情往往没有想象的那么容易,首先你用外卖软件,如果我是老板,餐厅已经爆满了,顾客在饭店里已经骂娘了,哪还顾得上你,第二你点完得等送餐吧,又多了一段路程,而同步也有自己的缺点,坐在餐馆等饭的时间对于你而言算是浪费了,比如你还有其他事情没做此时也只能乖乖干坐着了。
好吧,扯太远了,回到消息机制上来。
当我们想要我们的程序执行一些事情的时候要么同步,要么异步,而消息机制便是一种异步的方式,首先创建一个消息队列,这个消息队列在一个单独的线程中轮询,一旦有人发送消息给它,它就会将消息取出执行,执行后又回到等待状态直到有新的消息到来。
如下图:
消息被投递到一个消息队列中去,而Looper会持续不断的从Message Queue中拿出消息,再投递给合适的Handler去处理。
好了,开始对Android的这个消息队列进行分析吧,依旧从使用上入手,通常在使用的开始会创建一个ALooper对象,然后会实现一个AHandler的子类,最后创建消息进行投递进入Message Queue。
首先我们先看一下AHandler的定义。代码不多直接贴上来。
namespace android {struct AMessage;
struct AHandler : public RefBase {AHandler(): mID(0),mVerboseStats(false),mMessageCounter(0) {}ALooper::handler_id id() const {return mID;}sp<ALooper> looper() const {return mLooper.promote();}wp<ALooper> getLooper() const {return mLooper;}wp<AHandler> getHandler() const {// allow getting a weak reference to a const handlerreturn const_cast<AHandler *>(this);}protected:virtual void onMessageReceived(const sp<AMessage> &msg) = 0;private:friend struct AMessage; // deliverMessage()friend struct ALooperRoster; // setID()ALooper::handler_id mID;wp<ALooper> mLooper;inline void setID(ALooper::handler_id id, wp<ALooper> looper) {mID = id;mLooper = looper;}bool mVerboseStats;uint32_t mMessageCounter;KeyedVector<uint32_t, uint32_t> mMessages;void deliverMessage(const sp<AMessage> &msg);DISALLOW_EVIL_CONSTRUCTORS(AHandler);
};} // namespace android
cpp文件:
namespace android {void AHandler::deliverMessage(const sp<AMessage> &msg) {onMessageReceived(msg);mMessageCounter++;if (mVerboseStats) {uint32_t what = msg->what();ssize_t idx = mMessages.indexOfKey(what);if (idx < 0) {mMessages.add(what, 1);} else {mMessages.editValueAt(idx)++;}}
}
} // namespace android
由于该类为基类,在使用时需要自己定义子类去继承重新类中的方法,所先以该类的Cpp文件相对比较简单,只定义了一个deliverMessage方法,
先看看.h文件中该类的构成,其中有一个容易混淆的成员mMessages被定义为KeyedVector类型,该并不是真正的Message Queue,相反该对象并没有什么太大的实际作用,只有在verbose模式下才存储一个消息被传递的次数,对我们分析没什么帮助。剩下的就是一个id值,和一个ALooper的弱引用。刚刚我们已经说过了,Looper负责将消息源源不断的发送给Handler来处理,所以两者是必然有关联的。另外还有一个mVerboseStats的bool值,这玩意实际上没啥用,先不看了。之后看看一个重要的纯虚函数onMessageReceived,我们定义的子类最重要的就是实现这个方法。之后会通过调用把Message发给这个函数,而这个函数会根据这个Message中的what值来判断需要进行什么操作,习惯用法是通过一个switch来处理。
namespace android {struct AHandler;
struct AMessage;
struct AReplyToken;struct ALooper : public RefBase {typedef int32_t event_id;typedef int32_t handler_id;ALooper();// Takes effect in a subsequent call to start().void setName(const char *name);handler_id registerHandler(const sp<AHandler> &handler);void unregisterHandler(handler_id handlerID);status_t start(bool runOnCallingThread = false,bool canCallJava = false,int32_t priority = PRIORITY_DEFAULT);status_t stop();static int64_t GetNowUs();const char *getName() const {return mName.c_str();}protected:virtual ~ALooper();private:friend struct AMessage; // post()struct Event {int64_t mWhenUs;sp<AMessage> mMessage;};Mutex mLock;Condition mQueueChangedCondition;AString mName;List<Event> mEventQueue;struct LooperThread;sp<LooperThread> mThread;bool mRunningLocally;// use a separate lock for reply handling, as it is always on another thread// use a central lock, however, to avoid creating a mutex for each replyMutex mRepliesLock;Condition mRepliesCondition;// START --- methods used only by AMessage// posts a message on this looper with the given timeoutvoid post(const sp<AMessage> &msg, int64_t delayUs);// creates a reply token to be used with this loopersp<AReplyToken> createReplyToken();// waits for a response for the reply token. If status is OK, the response// is stored into the supplied variable. Otherwise, it is unchanged.status_t awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response);// posts a reply for a reply token. If the reply could be successfully posted,// it returns OK. Otherwise, it returns an error value.status_t postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &msg);// END --- methods used only by AMessagebool loop();DISALLOW_EVIL_CONSTRUCTORS(ALooper);
};
} // namespace android
.cpp文件:
namespace android {ALooperRoster gLooperRoster;
struct ALooper::LooperThread : public Thread {LooperThread(ALooper *looper, bool canCallJava): Thread(canCallJava),mLooper(looper),mThreadId(NULL) {}virtual status_t readyToRun() {mThreadId = androidGetThreadId();return Thread::readyToRun();}virtual bool threadLoop() {return mLooper->loop();}bool isCurrentThread() const {return mThreadId == androidGetThreadId();}protected:virtual ~LooperThread() {}private:ALooper *mLooper;android_thread_id_t mThreadId;DISALLOW_EVIL_CONSTRUCTORS(LooperThread);
};// static
int64_t ALooper::GetNowUs() {return systemTime(SYSTEM_TIME_MONOTONIC) / 1000ll;
}ALooper::ALooper(): mRunningLocally(false) {// clean up stale AHandlers. Doing it here instead of in the destructor avoids// the side effect of objects being deleted from the unregister function recursively.gLooperRoster.unregisterStaleHandlers();
}ALooper::~ALooper() {stop();// stale AHandlers are now cleaned up in the constructor of the next ALooper to come along
}void ALooper::setName(const char *name) {mName = name;
}ALooper::handler_id ALooper::registerHandler(const sp<AHandler> &handler) {return gLooperRoster.registerHandler(this, handler);
}void ALooper::unregisterHandler(handler_id handlerID) {gLooperRoster.unregisterHandler(handlerID);
}status_t ALooper::start(bool runOnCallingThread, bool canCallJava, int32_t priority) {if (runOnCallingThread) {{Mutex::Autolock autoLock(mLock);if (mThread != NULL || mRunningLocally) {return INVALID_OPERATION;}mRunningLocally = true;}do {} while (loop());return OK;}Mutex::Autolock autoLock(mLock);if (mThread != NULL || mRunningLocally) {return INVALID_OPERATION;}mThread = new LooperThread(this, canCallJava);status_t err = mThread->run(mName.empty() ? "ALooper" : mName.c_str(), priority);if (err != OK) {mThread.clear();}return err;
}status_t ALooper::stop() {sp<LooperThread> thread;bool runningLocally;{Mutex::Autolock autoLock(mLock);thread = mThread;runningLocally = mRunningLocally;mThread.clear();mRunningLocally = false;}if (thread == NULL && !runningLocally) {return INVALID_OPERATION;}if (thread != NULL) {thread->requestExit();}mQueueChangedCondition.signal();{Mutex::Autolock autoLock(mRepliesLock);mRepliesCondition.broadcast();}if (!runningLocally && !thread->isCurrentThread()) {// If not running locally and this thread _is_ the looper thread,// the loop() function will return and never be called again.thread->requestExitAndWait();}return OK;
}void ALooper::post(const sp<AMessage> &msg, int64_t delayUs) {Mutex::Autolock autoLock(mLock);int64_t whenUs;if (delayUs > 0) {whenUs = GetNowUs() + delayUs;} else {whenUs = GetNowUs();}List<Event>::iterator it = mEventQueue.begin();while (it != mEventQueue.end() && (*it).mWhenUs <= whenUs) {++it;}Event event;event.mWhenUs = whenUs;event.mMessage = msg;if (it == mEventQueue.begin()) {mQueueChangedCondition.signal();}mEventQueue.insert(it, event);
}bool ALooper::loop() {Event event;{Mutex::Autolock autoLock(mLock);if (mThread == NULL && !mRunningLocally) {return false;}if (mEventQueue.empty()) {mQueueChangedCondition.wait(mLock);return true;}int64_t whenUs = (*mEventQueue.begin()).mWhenUs;int64_t nowUs = GetNowUs();if (whenUs > nowUs) {int64_t delayUs = whenUs - nowUs;mQueueChangedCondition.waitRelative(mLock, delayUs * 1000ll);return true;}event = *mEventQueue.begin();mEventQueue.erase(mEventQueue.begin());}event.mMessage->deliver();// NOTE: It's important to note that at this point our "ALooper" object// may no longer exist (its final reference may have gone away while// delivering the message). We have made sure, however, that loop()// won't be called again.return true;
}// to be called by AMessage::postAndAwaitResponse only
sp<AReplyToken> ALooper::createReplyToken() {return new AReplyToken(this);
}// to be called by AMessage::postAndAwaitResponse only
status_t ALooper::awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response) {// return status in case we want to handle an interrupted waitMutex::Autolock autoLock(mRepliesLock);CHECK(replyToken != NULL);while (!replyToken->retrieveReply(response)) {{Mutex::Autolock autoLock(mLock);if (mThread == NULL) {return -ENOENT;}}mRepliesCondition.wait(mRepliesLock);}return OK;
}status_t ALooper::postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &reply) {Mutex::Autolock autoLock(mRepliesLock);status_t err = replyToken->setReply(reply);if (err == OK) {mRepliesCondition.broadcast();}return err;
}} // namespace android
Mutex mLock;Condition mQueueChangedCondition;Mutex mRepliesLock;Condition mRepliesCondition;
struct Item {union {int32_t int32Value;int64_t int64Value;size_t sizeValue;float floatValue;double doubleValue;void *ptrValue;RefBase *refValue;AString *stringValue;Rect rectValue;} u;const char *mName;size_t mNameLength;Type mType;void setName(const char *name, size_t len);};
这个类型所定义的就是一个key-value,被命名为Item, 其中使用联合体这段空间来存储value,使用mName来作为key。mNameLength来存储key值得长度,这个长度在这里还挺重要的,后面我们再看,mType是一个枚举值,保存了所存数据是什么类型的,枚举体定义如下:
enum Type {kTypeInt32,kTypeInt64,kTypeSize,kTypeFloat,kTypeDouble,kTypePointer,kTypeString,kTypeObject,kTypeMessage,kTypeRect,kTypeBuffer,};
之后AMessage定义了一个数组Item mItems[kMaxNumItems];保存所有的Item不过这样确定不浪费空间吗?另外便定义了一堆的find和set函数,都是用来设置这些键值对的。一个AMessage最多携带kMaxNumItems个Item,默认为64.
wp<AHandler> mHandler;wp<ALooper> mLooper;
这两个猜都知道啦,一个是处理该消息的Handler一个是处理消息队列轮询的Looper。
uint32_t mWhat;
因为一个Handler不是给一个Message专用的,可能会处理很多种的消息,那么为了区分便加入一个what值来告诉Handler现在的消息是哪种,一般Handler会使用一个switch来进行处理,这个源码中例子多的不要不要的,自己看吧。 AMessage();
AMessage(uint32_t what, const sp<const AHandler> &handler);AMessage::AMessage(void): mWhat(0),mTarget(0),mNumItems(0) {
}AMessage::AMessage(uint32_t what, const sp<const AHandler> &handler): mWhat(what),mNumItems(0) {setTarget(handler);
}
一般使用第二种了,传入消息的what值和handler,mNumItems赋0表示目前还没有携带的附加信息,也就是键值对Item啦。之后调用setTarget
void AMessage::setTarget(const sp<const AHandler> &handler) {if (handler == NULL) {mTarget = 0;mHandler.clear();mLooper.clear();} else {mTarget = handler->id();mHandler = handler->getHandler();mLooper = handler->getLooper();}
}
mTarge是handler的一个id值,这个id是累加生成的,回头再说吧,暂时用不到,剩下的就没什么,简单赋值。另外为啥要调用handler的getHandler的方法而不直接赋值呢?主要是因为有可能AHandler是一个const的指针,getHandler方法会将其通过const_cast转换,那为啥要搞成非const的呢,因为要用wp,那为啥用wp就要非const呢。。。不想回答你。。。
status_t AMessage::post(int64_t delayUs) {sp<ALooper> looper = mLooper.promote();if (looper == NULL) {ALOGW("failed to post message as target looper for handler %d is gone.", mTarget);return -ENOENT;}looper->post(this, delayUs);return OK;
}
实现挺简单的吧,直接调用ALooper的post方法。ALooper的post我们已经说过了加入到消息队列就没了。之后loop函数会把消息取出来调用Message的deliver方法,我们来看看这个deliver方法是干嘛的。
void AMessage::deliver() {sp<AHandler> handler = mHandler.promote();if (handler == NULL) {ALOGW("failed to deliver message as target handler %d is gone.", mTarget);return;}handler->deliverMessage(this);
}
直接拿出handler调用了deliverMessage方法,这个方法我们刚才说过了。会不会想为啥AMessage的post方法不直接调用deliver方法呢,如果有这个问题证明你压根没看懂,再看一遍吧,当ALooper通过loop函数拿出消息调用deliver的时候,这个deliver才会在Looper的线程里面执行,而如果直接调用那就成了当前线程同步调用了。
struct AReplyToken : public RefBase {AReplyToken(const sp<ALooper> &looper): mLooper(looper),mReplied(false) {}private:friend struct AMessage;friend struct ALooper;wp<ALooper> mLooper;sp<AMessage> mReply;bool mReplied;sp<ALooper> getLooper() const {return mLooper.promote();}// if reply is not set, returns false; otherwise, it retrieves the reply and returns truebool retrieveReply(sp<AMessage> *reply) {if (mReplied) {*reply = mReply;mReply.clear();}return mReplied;}// sets the reply for this token. returns OK or errorstatus_t setReply(const sp<AMessage> &reply);
};
我们还是从源头上来看,这种情形通常是从AMessage的对象调用postAndAwaitResponse开始的,这个函数有一个参数是sp<AMessage>*类型了,sp包含的已经是指针了,那这里显然是一个out型的参数,也就是在调用结束后这个参数会被赋值。我们先看看这个函数的定义。
status_t AMessage::postAndAwaitResponse(sp<AMessage> *response) {sp<ALooper> looper = mLooper.promote();if (looper == NULL) {ALOGW("failed to post message as target looper for handler %d is gone.", mTarget);return -ENOENT;}sp<AReplyToken> token = looper->createReplyToken();if (token == NULL) {ALOGE("failed to create reply token");return -ENOMEM;}setObject("replyID", token);looper->post(this, 0 /* delayUs */);return looper->awaitResponse(token, response);}
// to be called by AMessage::postAndAwaitResponse only
status_t ALooper::awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response) {// return status in case we want to handle an interrupted waitMutex::Autolock autoLock(mRepliesLock);CHECK(replyToken != NULL);while (!replyToken->retrieveReply(response)) {{Mutex::Autolock autoLock(mLock);if (mThread == NULL) {return -ENOENT;}}mRepliesCondition.wait(mRepliesLock);}return OK;
}
status_t ALooper::postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &reply) {Mutex::Autolock autoLock(mRepliesLock);status_t err = replyToken->setReply(reply);if (err == OK) {mRepliesCondition.broadcast();}return err;
}
status_t AReplyToken::setReply(const sp<AMessage> &reply) {if (mReplied) {ALOGE("trying to post a duplicate reply");return -EBUSY;}CHECK(mReply == NULL);mReply = reply;mReplied = true;return OK;
}
做了两件事情,给成员mReply赋值为参数,mReplied赋值为ture,好像看不出来啥用。我们接着看看上面的postReply既然broadcast已经发了,那么之前在ALooper::awaitResponse函数中while循环也得跳出吧,现在是时候看看这个while中的retrieveReply返回啥玩意了。 bool retrieveReply(sp<AMessage> *reply) {if (mReplied) {*reply = mReply;mReply.clear();}return mReplied;}// sets the reply for this token. returns OK or errorstatus_t setReply(const sp<AMessage> &reply);
status_t AMessage::postReply(const sp<AReplyToken> &replyToken) {if (replyToken == NULL) {ALOGW("failed to post reply to a NULL token");return -ENOENT;}sp<ALooper> looper = replyToken->getLooper();if (looper == NULL) {ALOGW("failed to post reply as target looper is gone.");return -ENOENT;}return looper->postReply(replyToken, this);
}
bool AMessage::senderAwaitsResponse(sp<AReplyToken> *replyToken) {sp<RefBase> tmp;bool found = findObject("replyID", &tmp);if (!found) {return false;}*replyToken = static_cast<AReplyToken *>(tmp.get());tmp.clear();setObject("replyID", tmp);// TODO: delete Object instead of setting it to NULLreturn *replyToken != NULL;
}
sp<AReplyToken> replyID;CHECK(msg->senderAwaitsResponse(&replyID));sp<AMessage> response = new AMessage;response->postReply(replyID);
data:image/s3,"s3://crabby-images/01307/01307d3f3b9738d0dfdc2d8074b40af320b3cb18" alt="大笑"
这篇关于Android Media Player 框架分析-AHandler AMessage ALooper的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!