本文主要是介绍【安卓R 源码】native层媒体模块通信AHandler机制源码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
本章将分析native层媒体模块通信AHandler机制源码实现,类图关系如下:
from:Android native层媒体通信架构AHandler/ALooper机制实现源码分析【Part 1】_小白兔LHao的博客-CSDN博客
Native层和Java层的Handler 处理机制的不同
(1)native层Handler机制的Looper实现中会自动创建一个独立线程,而java层Handler需要应用层自己实现一个线程来完成,当然你也可以使用java层HandlerThread类更简单完成。
(2)一些类似相同作用的方法的属于实现者类有区别,即放置的地方不同如发送消息事件的方法,在java层位于Handler来完成发送消息并完成接收处理。而native层发送消息是由消息对象(AMessage)来完成发送,Handler来完成接收处理的。
frameworks/av/media/libstagefright/MediaClock.cpp
void MediaClock::processTimers_l() {int64_t nowMediaTimeUs;status_t status = getMediaTime_l(ALooper::GetNowUs(), &nowMediaTimeUs, false /* allowPastMaxTime */);。。。。。。。。// 1. 获取AMessage 对象sp<AMessage> msg = new AMessage(kWhatTimeIsUp, this);// 2. 设置值msg->setInt32("generation", mGeneration);// 3. 发送消息msg->post(nextLapseRealUs);
}
// [frameworks/av/media/libstagefright/include/media/stagefright/MediaClock.h]// MediaClock 继承了父类 AHandler
struct MediaClock : public AHandler {virtual void onMessageReceived(const sp<AMessage> &msg);private:sp<ALooper> mLooper;
}
Handler, Looper 和 Message 的关系
1. 获取AMessage 对象
new AMessage(kWhatTimeIsUp, this);
frameworks/av/media/libstagefright/foundation/AMessage.cpp
// 设置了 mWhat,传入 handler,其实handler 就是 MediaClock
// 最终还是会回到 MediaClock
AMessage::AMessage(uint32_t what, const sp<const AHandler> &handler): mWhat(what) {setTarget(handler);
}----------
void AMessage::setTarget(const sp<const AHandler> &handler) {if (handler == NULL) {mTarget = 0;mHandler.clear();mLooper.clear();} else {mTarget = handler->id();mHandler = handler->getHandler();
// 获取loopermLooper = handler->getLooper();}
}
2. 设置数据值
frameworks/av/media/libstagefright/foundation/include/media/stagefright/foundation/AMessage.h
struct AMessage : public RefBase {AMessage();// 主要实现的设置数据参数setXXX()方法有如下几个声明void setInt32(const char *name, int32_t value);void setInt64(const char *name, int64_t value);void setSize(const char *name, size_t value);void setFloat(const char *name, float value);void setDouble(const char *name, double value);void setPointer(const char *name, void *value);void setString(const char *name, const char *s, ssize_t len = -1);void setString(const char *name, const AString &s);void setObject(const char *name, const sp<RefBase> &obj);void setBuffer(const char *name, const sp<ABuffer> &buffer);void setMessage(const char *name, const sp<AMessage> &obj);void setRect(const char *name,int32_t left, int32_t top, int32_t right, int32_t bottom);
方法的实现是在宏定义中实现:
- frameworks/av/media/libstagefright/foundation/AMessage.cpp
#define BASIC_TYPE(NAME,FIELDNAME,TYPENAME) \
void AMessage::set##NAME(const char *name, TYPENAME value) { \// 1. 这里有个 allocateItem 方法,增加元素到mItems 数组 Item *item = allocateItem(name); \if (item) { \item->mType = kType##NAME; \item->u.FIELDNAME = value; \} \
} \\
/* NOLINT added to avoid incorrect warning/fix from clang.tidy */ \
bool AMessage::find##NAME(const char *name, TYPENAME *value) const { /* NOLINT */ \// 2. 找到对应的item,依据name 返回对应的元素const Item *item = findItem(name, kType##NAME); \if (item) { \*value = item->u.FIELDNAME; \return true; \} \return false; \
}BASIC_TYPE(Int32,int32Value,int32_t)
BASIC_TYPE(Int64,int64Value,int64_t)
BASIC_TYPE(Size,sizeValue,size_t)
BASIC_TYPE(Float,floatValue,float)
BASIC_TYPE(Double,doubleValue,double)
BASIC_TYPE(Pointer,ptrValue,void *)#undef BASIC_TYPE
Item 是个结构体
struct Item {// 联合体union {int32_t int32Value;int64_t int64Value;size_t sizeValue;float floatValue;double doubleValue;void *ptrValue;RefBase *refValue;AString *stringValue;Rect rectValue;} u;const char *mName;size_t mNameLength;Type mType;void setName(const char *name, size_t len);Item() : mName(nullptr), mNameLength(0), mType(kTypeInt32) { }Item(const char *name, size_t length);};enum {kMaxNumItems = 256};// mItems 是一个数组std::vector<Item> mItems;
有6种 setXXX() 设置数据参数方法通过宏定义来完成实现,那么也肯定会有对应的6种 findXXX() 查询获取对应数据参数值的方法。
并且存储的每一个参数数据都是通过一个创建一个Key值名称为(name)参数名的数据项对象来缓存的,并且最终会将整个AMessage中的每个参数数据都添加到数据项 Item 类型的数组参数集中。【注意:该数组只允许最多携带64个数据项数据,若超出则程序将抛错并停止运行】
// 1. 这里有个 allocateItem 方法,增加元素到mItems 数组
frameworks/av/media/libstagefright/foundation/AMessage.cpp
AMessage::Item *AMessage::allocateItem(const char *name) {size_t len = strlen(name);size_t i = findItemIndex(name, len);Item *item;if (i < mItems.size()) {item = &mItems[i];freeItemValue(item);} else {//默认是 kTypeInt32,4个字节,kMaxNumItems为256,默认是64个数据CHECK(mItems.size() < kMaxNumItems);i = mItems.size();// place a 'blank' item at the end - this is of type kTypeInt32// 传入的参数为如下:
// Item() : mName(nullptr), mNameLength(0), mType(kTypeInt32) 默认是 kTypeInt32
// mName为字符串指针mItems.emplace_back(name, len);// 返回item 的指针item = &mItems[i];}return item;
}
// 2. 找到对应的item,依据name 返回对应的元素
const AMessage::Item *AMessage::findItem(const char *name, Type type) const {/// 找到数据的索引值size_t i = findItemIndex(name, strlen(name));if (i < mItems.size()) {// 依据索引值获取对应 item const Item *item = &mItems[i];
// item 的数据类型return item->mType == type ? item : NULL;}return NULL;
}
3. 线程启动,开启循环消息队列流程
以NuPlayerDriver 的构造函数看下:
- /frameworks/av/media/libmediaplayerservice/nuplayer/NuPlayerDriver.cpp
NuPlayerDriver::NuPlayerDriver(pid_t pid): mState(STATE_IDLE),mIsAsyncPrepare(false),mAsyncResult(UNKNOWN_ERROR),mSetSurfaceInProgress(false),mDurationUs(-1),mPositionUs(-1),mSeekInProgress(false),mPlayingTimeUs(0),mRebufferingTimeUs(0),mRebufferingEvents(0),mRebufferingAtExit(false),// 初始化Looper 对象mLooper(new ALooper),mMediaClock(new MediaClock),mPlayer(new NuPlayer(pid, mMediaClock)),mPlayerFlags(0),mMetricsItem(NULL),mClientUid(-1),mAtEOS(false),mLooping(false),mAutoLoop(false) {ALOGD("NuPlayerDriver(%p) created, clientPid(%d)", this, pid);mLooper->setName("NuPlayerDriver Looper");mMediaClock->init();// set up an analytics recordmMetricsItem = mediametrics::Item::create(kKeyPlayer);// 启动线程mLooper->start(// 下列2 个参数,1 不运行在本地,即创建一个线程;2.可以被java端调用false, /* runOnCallingThread */true, /* canCallJava */PRIORITY_AUDIO);mLooper->registerHandler(mPlayer);mPlayer->init(this);
}
runOnCallingThread 决定了当调用线程调用Alooper::start函数后,取消息,发送消息的工作在当前线程执行,还是子线程执行。区别在于:
- 如果runOnCallingThread = true:那么当前线程不会再做其它工作,陷入一个死循环。用于循环执行loop()函数。
- 如果runOnCallingThread = false:会创建一个子线程,并将loop()逻辑放到这个特定子线程中处理。
- /frameworks/av/media/libstagefright/foundation/ALooper.cpp
status_t ALooper::start(bool runOnCallingThread, bool canCallJava, int32_t priority) {// 如果runOnCallingThread 为true,则在调用的线程中轮询if (runOnCallingThread) {{Mutex::Autolock autoLock(mLock);if (mThread != NULL || mRunningLocally) {return INVALID_OPERATION;}mRunningLocally = true;}do {} while (loop());return OK;}// 如果runOnCallingThread 为false,则创建一个线程Mutex::Autolock autoLock(mLock);if (mThread != NULL || mRunningLocally) {return INVALID_OPERATION;}// 创建 LooperThread对象mThread = new LooperThread(this, canCallJava);// 启动这个线程status_t err = mThread->run(mName.empty() ? "ALooper" : mName.c_str(), priority);if (err != OK) {mThread.clear();}return err;
}
创建 LooperThread对象并启动线程:
- /frameworks/av/media/libstagefright/foundation/ALooper.cpp
// LooperThread 继承了 Thread,调用了父类的run 方法
struct ALooper::LooperThread : public Thread {LooperThread(ALooper *looper, bool canCallJava): Thread(canCallJava),mLooper(looper),mThreadId(NULL) {}virtual status_t readyToRun() {mThreadId = androidGetThreadId();return Thread::readyToRun();}// 重写了父类的方法virtual bool threadLoop() {return mLooper->loop();}bool isCurrentThread() const {return mThreadId == androidGetThreadId();}
- /system/core/libutils/Threads.cpp
status_t Thread::run(const char* name, int32_t priority, size_t stack)
{LOG_ALWAYS_FATAL_IF(name == nullptr, "thread name not provided to Thread::run");Mutex::Autolock _l(mLock);if (mRunning) {// thread already startedreturn INVALID_OPERATION;}// reset status and exitPending to their default value, so we can// try again after an error happened (either below, or in readyToRun())mStatus = OK;mExitPending = false;mThread = thread_id_t(-1);// hold a strong reference on ourselfmHoldSelf = this;mRunning = true;bool res;// 可以被java 调用,走到如下if (mCanCallJava) {
// 有传入方法:_threadLoopres = createThreadEtc(_threadLoop,this, name, priority, stack, &mThread);} else {//其实此流程也是相似的res = androidCreateRawThreadEtc(_threadLoop,this, name, priority, stack, &mThread);}if (res == false) {mStatus = UNKNOWN_ERROR; // something happened!mRunning = false;mThread = thread_id_t(-1);mHoldSelf.clear(); // "this" may have gone away after this.return UNKNOWN_ERROR;}// Do not refer to mStatus here: The thread is already running (may, in fact// already have exited with a valid mStatus result). The OK indication// here merely indicates successfully starting the thread and does not// imply successful termination/execution.return OK;// Exiting scope of mLock is a memory barrier and allows new thread to run
}-------------
createThreadEtc 在头文件中
/system/core/libutils/include/utils/AndroidThreads.hinline bool createThreadEtc(thread_func_t entryFunction,void *userData,const char* threadName = "android:unnamed_thread",int32_t threadPriority = PRIORITY_DEFAULT,size_t threadStackSize = 0,thread_id_t *threadId = nullptr)
{return androidCreateThreadEtc(entryFunction, userData, threadName,threadPriority, threadStackSize, threadId) ? true : false;
}
androidCreateThreadEtc 方法
static android_create_thread_fn gCreateThreadFn = androidCreateRawThreadEtc;int androidCreateThreadEtc(android_thread_func_t entryFunction,void *userData,const char* threadName,int32_t threadPriority,size_t threadStackSize,android_thread_id_t *threadId)
{return gCreateThreadFn(entryFunction, userData, threadName,threadPriority, threadStackSize, threadId);
}-----------------
int androidCreateRawThreadEtc(android_thread_func_t entryFunction,void *userData,const char* threadName __android_unused,int32_t threadPriority,size_t threadStackSize,android_thread_id_t *threadId)
{pthread_attr_t attr;pthread_attr_init(&attr);pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);#if defined(__ANDROID__) /* valgrind is rejecting RT-priority create reqs */if (threadPriority != PRIORITY_DEFAULT || threadName != NULL) {// Now that the pthread_t has a method to find the associated// android_thread_id_t (pid) from pthread_t, it would be possible to avoid// this trampoline in some cases as the parent could set the properties// for the child. However, there would be a race condition because the// child becomes ready immediately, and it doesn't work for the name.// prctl(PR_SET_NAME) only works for self; prctl(PR_SET_THREAD_NAME) was// proposed but not yet accepted.thread_data_t* t = new thread_data_t;t->priority = threadPriority;t->threadName = threadName ? strdup(threadName) : NULL;t->entryFunction = entryFunction;t->userData = userData;// 这里对 entryFunction重新进行赋值了,其实就是调用对应函数,传入参数entryFunction = (android_thread_func_t)&thread_data_t::trampoline;userData = t;}
#endifif (threadStackSize) {pthread_attr_setstacksize(&attr, threadStackSize);}errno = 0;pthread_t thread;// 创建线程,然后执行 entryFunctionint result = pthread_create(&thread, &attr,(android_pthread_entry)entryFunction, userData);pthread_attr_destroy(&attr);if (result != 0) {ALOGE("androidCreateRawThreadEtc failed (entry=%p, res=%d, %s)\n""(android threadPriority=%d)",entryFunction, result, strerror(errno), threadPriority);return 0;}// Note that *threadID is directly available to the parent only, as it is// assigned after the child starts. Use memory barrier / lock if the child// or other threads also need access.if (threadId != nullptr) {*threadId = (android_thread_id_t)thread; // XXX: this is not portable}return 1;
}
调用 _threadLoop 方法
int Thread::_threadLoop(void* user)
{Thread* const self = static_cast<Thread*>(user);sp<Thread> strong(self->mHoldSelf);wp<Thread> weak(strong);self->mHoldSelf.clear();#if defined(__ANDROID__)// this is very useful for debugging with gdbself->mTid = gettid();
#endifbool first = true;// while 循环do {bool result;if (first) {first = false;// 第一次循环才调用 readyToRun方法
// 先调用子类的 readyToRun方法,然后调用父类的,返回OKself->mStatus = self->readyToRun();result = (self->mStatus == OK);if (result && !self->exitPending()) {// Binder threads (and maybe others) rely on threadLoop// running at least once after a successful ::readyToRun()// (unless, of course, the thread has already been asked to exit// at that point).// This is because threads are essentially used like this:// (new ThreadSubclass())->run();// The caller therefore does not retain a strong reference to// the thread and the thread would simply disappear after the// successful ::readyToRun() call instead of entering the// threadLoop at least once.// 调用子类的 threadLoop 方法result = self->threadLoop();}} else {result = self->threadLoop();}// establish a scope for mLock{Mutex::Autolock _l(self->mLock);if (result == false || self->mExitPending) {self->mExitPending = true;self->mRunning = false;// clear thread ID so that requestExitAndWait() does not exit if// called by a new thread using the same thread ID as this one.self->mThread = thread_id_t(-1);// note that interested observers blocked in requestExitAndWait are// awoken by broadcast, but blocked on mLock until break exits scopeself->mThreadExitedCondition.broadcast();break;}}// Release our strong reference, to let a chance to the thread// to die a peaceful death.strong.clear();// And immediately, re-acquire a strong reference for the next loopstrong = weak.promote();} while(strong != nullptr);return 0;
}
// 调用子类的 threadLoop 方法
- /frameworks/av/media/libstagefright/foundation/ALooper.cpp
struct ALooper::LooperThread : public Thread {LooperThread(ALooper *looper, bool canCallJava): Thread(canCallJava),mLooper(looper),mThreadId(NULL) {}virtual status_t readyToRun() {mThreadId = androidGetThreadId();return Thread::readyToRun();}virtual bool threadLoop() {return mLooper->loop();}--------------------
// 读取消息队列,做对应的操作
bool ALooper::loop() {Event event;{Mutex::Autolock autoLock(mLock);if (mThread == NULL && !mRunningLocally) {return false;}// 消息队列是空的,当前线程等待,直到被唤醒发回true
// post 会发signal,唤醒if (mEventQueue.empty()) {mQueueChangedCondition.wait(mLock);return true;}// 获取消息队列第一条消息的发送时间【即包含有延时发消息的情况】
// 在post 会设置 mWhenUsint64_t whenUs = (*mEventQueue.begin()).mWhenUs;int64_t nowUs = GetNowUs();if (whenUs > nowUs) {int64_t delayUs = whenUs - nowUs;if (delayUs > INT64_MAX / 1000) {delayUs = INT64_MAX / 1000;}// 如果第一条消息还没有到发送时间,则等待whenUs - nowUs后唤醒线程返回truemQueueChangedCondition.waitRelative(mLock, delayUs * 1000ll);return true;}// 取出队列的头节点,将其删除出队列,然后发送消息event = *mEventQueue.begin();mEventQueue.erase(mEventQueue.begin());}event.mMessage->deliver();// NOTE: It's important to note that at this point our "ALooper" object// may no longer exist (its final reference may have gone away while// delivering the message). We have made sure, however, that loop()// won't be called again.return true;
}
loop函数,总共做了以下几件事情:
- 条件判断:判断是否初始化线程,并且线程是否在本地运行,如果否则返回false,使可能存在的循环停止。
- 消息队列判断:判断消息队列中是否有消息,没有的话,让线程进入等待,直到有消息入队后被唤醒。
- 消息发送判断:判断队列中,第一条小时发送时间是否满足,满足则发送消息,并将消息移出队列。否则让线程等待,一定时间(当前时间和发送时间的时间差)后,自动唤醒线程。
4. 发送消息
4.1 Post 发送消息
msg->post(nextLapseRealUs)
- frameworks/av/media/libstagefright/foundation/AMessage.cpp
status_t AMessage::post(int64_t delayUs) {sp<ALooper> looper = mLooper.promote();if (looper == NULL) {ALOGW("failed to post message as target looper for handler %d is gone.", mTarget);return -ENOENT;}looper->post(this, delayUs);return OK;
}
- frameworks/av/media/libstagefright/foundation/ALooper.cpp
void ALooper::post(const sp<AMessage> &msg, int64_t delayUs) {Mutex::Autolock autoLock(mLock);int64_t whenUs;if (delayUs > 0) {int64_t nowUs = GetNowUs();whenUs = (delayUs > INT64_MAX - nowUs ? INT64_MAX : nowUs + delayUs);} else {whenUs = GetNowUs();}List<Event>::iterator it = mEventQueue.begin();while (it != mEventQueue.end() && (*it).mWhenUs <= whenUs) {++it;}Event event;event.mWhenUs = whenUs;event.mMessage = msg;if (it == mEventQueue.begin()) {mQueueChangedCondition.signal();}mEventQueue.insert(it, event);
}
其实在 MediaClock的构造函数中,有启动 looper
- frameworks/av/media/libstagefright/MediaClock.cpp
MediaClock::MediaClock(): mAnchorTimeMediaUs(-1),mAnchorTimeRealUs(-1),mMaxTimeMediaUs(INT64_MAX),mStartingTimeMediaUs(-1),mPlaybackRate(1.0),mGeneration(0) {mLooper = new ALooper;
// 设置名称mLooper->setName("MediaClock");
// start loopermLooper->start(false /* runOnCallingThread */,false /* canCallJava */,ANDROID_PRIORITY_AUDIO);
}-----------
// 在初始化中,有向looper 注册handler 为 this
void MediaClock::init() {mLooper->registerHandler(this);
}------------
frameworks/av/media/libstagefright/foundation/ALooperRoster.cppALooper::handler_id ALooperRoster::registerHandler(const sp<ALooper> &looper, const sp<AHandler> &handler) {Mutex::Autolock autoLock(mLock);if (handler->id() != 0) {CHECK(!"A handler must only be registered once.");return INVALID_OPERATION;}HandlerInfo info;info.mLooper = looper;info.mHandler = handler;ALooper::handler_id handlerID = mNextHandlerID++;mHandlers.add(handlerID, info);// 这里又调用 handler 去设置了id,一个id 对应了一个looperhandler->setID(handlerID, looper);return handlerID;
}
// start looper
mLooper->start(
status_t ALooper::start(bool runOnCallingThread, bool canCallJava, int32_t priority) {if (runOnCallingThread) {{Mutex::Autolock autoLock(mLock);if (mThread != NULL || mRunningLocally) {return INVALID_OPERATION;}mRunningLocally = true;}do {
// loop 循环,从消息队列中获取message} while (loop());return OK;}Mutex::Autolock autoLock(mLock);if (mThread != NULL || mRunningLocally) {return INVALID_OPERATION;}// 创建了 LooperThread线程对象mThread = new LooperThread(this, canCallJava);// 运行这个线程status_t err = mThread->run(mName.empty() ? "ALooper" : mName.c_str(), priority);if (err != OK) {mThread.clear();}return err;
}
// 该线程继承了 Thread
struct ALooper::LooperThread : public Thread {LooperThread(ALooper *looper, bool canCallJava): Thread(canCallJava),mLooper(looper),mThreadId(NULL) {}
loop()
bool ALooper::loop() {Event event;{Mutex::Autolock autoLock(mLock);if (mThread == NULL && !mRunningLocally) {return false;}
// 如果消息队列是空,则等待if (mEventQueue.empty()) {mQueueChangedCondition.wait(mLock);return true;}
// 看是否消息队列有设置延时int64_t whenUs = (*mEventQueue.begin()).mWhenUs;int64_t nowUs = GetNowUs();if (whenUs > nowUs) {int64_t delayUs = whenUs - nowUs;if (delayUs > INT64_MAX / 1000) {delayUs = INT64_MAX / 1000;}mQueueChangedCondition.waitRelative(mLock, delayUs * 1000ll);return true;}
// 获取消息队列的头部event,并且将其移出队列event = *mEventQueue.begin();mEventQueue.erase(mEventQueue.begin());}
// 调用message 去发送消息event.mMessage->deliver();// NOTE: It's important to note that at this point our "ALooper" object// may no longer exist (its final reference may have gone away while// delivering the message). We have made sure, however, that loop()// won't be called again.return true;
}
// 调用message 去发送消息
- frameworks/av/media/libstagefright/foundation/AMessage.cpp
void AMessage::deliver() {sp<AHandler> handler = mHandler.promote();if (handler == NULL) {ALOGW("failed to deliver message as target handler %d is gone.", mTarget);return;}handler->deliverMessage(this);
}
- frameworks/av/media/libstagefright/foundation/AHandler.cpp
void AHandler::deliverMessage(const sp<AMessage> &msg) {
// 应为 MediaClock继承了 AHandler
// 所以执行 MediaClock方法onMessageReceived(msg);mMessageCounter++;if (mVerboseStats) {uint32_t what = msg->what();ssize_t idx = mMessages.indexOfKey(what);if (idx < 0) {mMessages.add(what, 1);} else {mMessages.editValueAt(idx)++;}}
}
- frameworks/av/media/libstagefright/MediaClock.cpp
void MediaClock::onMessageReceived(const sp<AMessage> &msg) {switch (msg->what()) {case kWhatTimeIsUp:{int32_t generation;CHECK(msg->findInt32("generation", &generation));Mutex::Autolock autoLock(mLock);if (generation != mGeneration) {break;}processTimers_l();break;}default:TRESPASS();break;}
}
4.2 postAndAwaitResponse 和 postReply
postAndAwaitResponse 和 postReply 是成对使用的,postAndAwaitResponse是作为消息发送端,用于接收到接收端的消息的返回。即,在返回应答值之前一直wait等待响应返回值(即AMessage消息)。postReply 是接收端返回消息给发送端。
以 MediaCodec设置回调为例子:
- /frameworks/av/media/libstagefright/MediaCodec.cpp
status_t MediaCodec::setCallback(const sp<AMessage> &callback) {// 设置消息为:kWhatSetCallbacksp<AMessage> msg = new AMessage(kWhatSetCallback, this);// 设置消息的key,value值msg->setMessage("callback", callback);sp<AMessage> response;return PostAndAwaitResponse(msg, &response);
}-----------------
status_t MediaCodec::PostAndAwaitResponse(const sp<AMessage> &msg, sp<AMessage> *response) {// 调用 postAndAwaitResponse 方法status_t err = msg->postAndAwaitResponse(response);if (err != OK) {return err;}if (!(*response)->findInt32("err", &err)) {err = OK;}return err;
}
- /frameworks/av/media/libstagefright/foundation/AMessage.cpp
status_t AMessage::postAndAwaitResponse(sp<AMessage> *response) {// 获取looper 对象sp<ALooper> looper = mLooper.promote();if (looper == NULL) {ALOGW("failed to post message as target looper for handler %d is gone.", mTarget);return -ENOENT;}// 获取tokensp<AReplyToken> token = looper->createReplyToken();if (token == NULL) {ALOGE("failed to create reply token");return -ENOMEM;}setObject("replyID", token);// 1. post 到消息队列中,然后调用AHandler子类 onMessageReceived处理looper->post(this, 0 /* delayUs */);// 2. 返回值,下列方法线程阻塞,需要等待消息接收方回复return looper->awaitResponse(token, response);
}
关于 AReplyToken 回复令牌:
- AReplyToken:意味消息的回复令牌
- AReplyToken中包含消息是否已经被处理过的字段mReplied,如果处理过,mReplied字段被置为true。
- AReplyToken中包含了回复消息本身,体现在mReply字段。
// 1. post 到消息队列中,然后调用AHandler子类 onMessageReceived处理
- /frameworks/av/media/libstagefright/foundation/ALooper.cpp
// 增加到消息队列中
void ALooper::post(const sp<AMessage> &msg, int64_t delayUs) {Mutex::Autolock autoLock(mLock);int64_t whenUs;if (delayUs > 0) {int64_t nowUs = GetNowUs();whenUs = (delayUs > INT64_MAX - nowUs ? INT64_MAX : nowUs + delayUs);} else {whenUs = GetNowUs();}List<Event>::iterator it = mEventQueue.begin();while (it != mEventQueue.end() && (*it).mWhenUs <= whenUs) {++it;}Event event;event.mWhenUs = whenUs;event.mMessage = msg;if (it == mEventQueue.begin()) {mQueueChangedCondition.signal();}mEventQueue.insert(it, event);
}// 线程loop 进行处理
bool ALooper::loop() {Event event;{
// 2. 返回值,下列方法线程阻塞,需要等待消息接收方回复
looper->awaitResponse(token, response);
- /frameworks/av/media/libstagefright/foundation/ALooper.cpp
// to be called by AMessage::postAndAwaitResponse only
status_t ALooper::awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response) {// return status in case we want to handle an interrupted waitMutex::Autolock autoLock(mRepliesLock);CHECK(replyToken != NULL);// retrieveReply() 方法返回false,则还没有获取到接收端的消息,则轮询while (!replyToken->retrieveReply(response)) {{Mutex::Autolock autoLock(mLock);if (mThread == NULL) {return -ENOENT;}}// 消息发送端条件变量wait等待唤醒(获取锁)mRepliesCondition.wait(mRepliesLock);}return OK;
}==========
/frameworks/av/media/libstagefright/foundation/include/media/stagefright/foundation/AMessage.hbool retrieveReply(sp<AMessage> *reply) {// 需要接收端去设置mReplied 为trueif (mReplied) {*reply = mReply;
// 清除这个 responsemReply.clear();}return mReplied;}
用于同步的对象Condition,为Android中特有。它的函数有:
看下 onMessageReceived 处理:
void MediaCodec::onMessageReceived(const sp<AMessage> &msg) {switch (msg->what()) {case kWhatSetCallback:{sp<AReplyToken> replyID;// 1. 消息接收端响应 senderAwaitsResponseCHECK(msg->senderAwaitsResponse(&replyID));if (mState == UNINITIALIZED|| mState == INITIALIZING|| isExecuting()) {// callback can't be set after codec is executing,// or before it's initialized (as the callback// will be cleared when it goes to INITIALIZED)PostReplyWithError(replyID, INVALID_OPERATION);break;}sp<AMessage> callback;CHECK(msg->findMessage("callback", &callback));mCallback = callback;if (mCallback != NULL) {ALOGI("MediaCodec will operate in async mode");mFlags |= kFlagIsAsync;} else {mFlags &= ~kFlagIsAsync;}sp<AMessage> response = new AMessage;// 2. 应答消息发送端消息response->postReply(replyID);break;}
// 1. 消息接收端响应 senderAwaitsResponse
msg->senderAwaitsResponse(&replyID)
bool AMessage::senderAwaitsResponse(sp<AReplyToken> *replyToken) {sp<RefBase> tmp;// 上述有 setObject,这里去获取 tmpbool found = findObject("replyID", &tmp);if (!found) {return false;}*replyToken = static_cast<AReplyToken *>(tmp.get());// 清除 replyTokentmp.clear();// 设置为空setObject("replyID", tmp);// TODO: delete Object instead of setting it to NULLreturn *replyToken != NULL;
}
// 2. 应答消息发送端消息
response->postReply(replyID)
status_t AMessage::postReply(const sp<AReplyToken> &replyToken) {if (replyToken == NULL) {ALOGW("failed to post reply to a NULL token");return -ENOENT;}sp<ALooper> looper = replyToken->getLooper();if (looper == NULL) {ALOGW("failed to post reply as target looper is gone.");return -ENOENT;}return looper->postReply(replyToken, this);
}
- /frameworks/av/media/libstagefright/foundation/ALooper.cpp
status_t ALooper::postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &reply) {Mutex::Autolock autoLock(mRepliesLock);
// 设置replystatus_t err = replyToken->setReply(reply);if (err == OK) {// 若设置应答数据成功,则唤醒在该条件变量上正在等待【mRepliesLock】锁的其他线程mRepliesCondition.broadcast();}return err;
}--------------
/frameworks/av/media/libstagefright/foundation/AMessage.cppstatus_t AReplyToken::setReply(const sp<AMessage> &reply) {if (mReplied) {ALOGE("trying to post a duplicate reply");return -EBUSY;}CHECK(mReply == NULL);mReply = reply;// 这里设置了 mReplied 为truemReplied = true;return OK;
}
然后:status_t MediaCodec::setCallback 方法返回 OK
由此可知:
postAndAwaitResponse()和postReply()必须出成对出现的。其实这就是一个线程间异步通信的过程。
这篇关于【安卓R 源码】native层媒体模块通信AHandler机制源码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!