【安卓R 源码】native层媒体模块通信AHandler机制源码

2024-02-01 17:50

本文主要是介绍【安卓R 源码】native层媒体模块通信AHandler机制源码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本章将分析native层媒体模块通信AHandler机制源码实现,类图关系如下:

类图

 from:Android native层媒体通信架构AHandler/ALooper机制实现源码分析【Part 1】_小白兔LHao的博客-CSDN博客

Native层和Java层的Handler 处理机制的不同

(1)native层Handler机制的Looper实现中会自动创建一个独立线程,而java层Handler需要应用层自己实现一个线程来完成,当然你也可以使用java层HandlerThread类更简单完成。
(2)一些类似相同作用的方法的属于实现者类有区别,即放置的地方不同如发送消息事件的方法,在java层位于Handler来完成发送消息并完成接收处理。而native层发送消息是由消息对象(AMessage)来完成发送,Handler来完成接收处理的。

frameworks/av/media/libstagefright/MediaClock.cpp

void MediaClock::processTimers_l() {int64_t nowMediaTimeUs;status_t status = getMediaTime_l(ALooper::GetNowUs(), &nowMediaTimeUs, false /* allowPastMaxTime */);。。。。。。。。// 1. 获取AMessage 对象sp<AMessage> msg = new AMessage(kWhatTimeIsUp, this);// 2. 设置值msg->setInt32("generation", mGeneration);// 3. 发送消息msg->post(nextLapseRealUs);
}
// [frameworks/av/media/libstagefright/include/media/stagefright/MediaClock.h]// MediaClock 继承了父类 AHandler 
struct MediaClock : public AHandler {virtual void onMessageReceived(const sp<AMessage> &msg);private:sp<ALooper> mLooper;
}

 Handler, Looper 和 Message 的关系

在这里插入图片描述

 

1. 获取AMessage 对象

new AMessage(kWhatTimeIsUp, this);

frameworks/av/media/libstagefright/foundation/AMessage.cpp

// 设置了 mWhat,传入 handler,其实handler 就是 MediaClock
// 最终还是会回到 MediaClock
AMessage::AMessage(uint32_t what, const sp<const AHandler> &handler): mWhat(what) {setTarget(handler);
}----------
void AMessage::setTarget(const sp<const AHandler> &handler) {if (handler == NULL) {mTarget = 0;mHandler.clear();mLooper.clear();} else {mTarget = handler->id();mHandler = handler->getHandler();
// 获取loopermLooper = handler->getLooper();}
}

 2. 设置数据值

frameworks/av/media/libstagefright/foundation/include/media/stagefright/foundation/AMessage.h

struct AMessage : public RefBase {AMessage();// 主要实现的设置数据参数setXXX()方法有如下几个声明void setInt32(const char *name, int32_t value);void setInt64(const char *name, int64_t value);void setSize(const char *name, size_t value);void setFloat(const char *name, float value);void setDouble(const char *name, double value);void setPointer(const char *name, void *value);void setString(const char *name, const char *s, ssize_t len = -1);void setString(const char *name, const AString &s);void setObject(const char *name, const sp<RefBase> &obj);void setBuffer(const char *name, const sp<ABuffer> &buffer);void setMessage(const char *name, const sp<AMessage> &obj);void setRect(const char *name,int32_t left, int32_t top, int32_t right, int32_t bottom);

方法的实现是在宏定义中实现:

  • frameworks/av/media/libstagefright/foundation/AMessage.cpp
#define BASIC_TYPE(NAME,FIELDNAME,TYPENAME)                             \
void AMessage::set##NAME(const char *name, TYPENAME value) {            \// 1. 这里有个   allocateItem 方法,增加元素到mItems 数组 Item *item = allocateItem(name);                                    \if (item) {                                                         \item->mType = kType##NAME;                                      \item->u.FIELDNAME = value;                                      \}                                                                   \
}                                                                       \\
/* NOLINT added to avoid incorrect warning/fix from clang.tidy */       \
bool AMessage::find##NAME(const char *name, TYPENAME *value) const {  /* NOLINT */ \// 2. 找到对应的item,依据name 返回对应的元素const Item *item = findItem(name, kType##NAME);                     \if (item) {                                                         \*value = item->u.FIELDNAME;                                     \return true;                                                    \}                                                                   \return false;                                                       \
}BASIC_TYPE(Int32,int32Value,int32_t)
BASIC_TYPE(Int64,int64Value,int64_t)
BASIC_TYPE(Size,sizeValue,size_t)
BASIC_TYPE(Float,floatValue,float)
BASIC_TYPE(Double,doubleValue,double)
BASIC_TYPE(Pointer,ptrValue,void *)#undef BASIC_TYPE

Item 是个结构体

    struct Item {// 联合体union {int32_t int32Value;int64_t int64Value;size_t sizeValue;float floatValue;double doubleValue;void *ptrValue;RefBase *refValue;AString *stringValue;Rect rectValue;} u;const char *mName;size_t      mNameLength;Type mType;void setName(const char *name, size_t len);Item() : mName(nullptr), mNameLength(0), mType(kTypeInt32) { }Item(const char *name, size_t length);};enum {kMaxNumItems = 256};// mItems 是一个数组std::vector<Item> mItems;

有6种 setXXX() 设置数据参数方法通过宏定义来完成实现,那么也肯定会有对应的6种 findXXX() 查询获取对应数据参数值的方法。
并且存储的每一个参数数据都是通过一个创建一个Key值名称为(name)参数名的数据项对象来缓存的,并且最终会将整个AMessage中的每个参数数据都添加到数据项 Item 类型的数组参数集中。【注意:该数组只允许最多携带64个数据项数据,若超出则程序将抛错并停止运行】
 

 // 1. 这里有个   allocateItem 方法,增加元素到mItems 数组 

frameworks/av/media/libstagefright/foundation/AMessage.cpp

AMessage::Item *AMessage::allocateItem(const char *name) {size_t len = strlen(name);size_t i = findItemIndex(name, len);Item *item;if (i < mItems.size()) {item = &mItems[i];freeItemValue(item);} else {//默认是 kTypeInt32,4个字节,kMaxNumItems为256,默认是64个数据CHECK(mItems.size() < kMaxNumItems);i = mItems.size();// place a 'blank' item at the end - this is of type kTypeInt32// 传入的参数为如下:
// Item() : mName(nullptr), mNameLength(0), mType(kTypeInt32) 默认是 kTypeInt32
// mName为字符串指针mItems.emplace_back(name, len);// 返回item 的指针item = &mItems[i];}return item;
}

// 2. 找到对应的item,依据name 返回对应的元素

const AMessage::Item *AMessage::findItem(const char *name, Type type) const {/// 找到数据的索引值size_t i = findItemIndex(name, strlen(name));if (i < mItems.size()) {// 依据索引值获取对应 item const Item *item = &mItems[i];
// item 的数据类型return item->mType == type ? item : NULL;}return NULL;
}

3. 线程启动,开启循环消息队列流程

以NuPlayerDriver 的构造函数看下:

  • /frameworks/av/media/libmediaplayerservice/nuplayer/NuPlayerDriver.cpp
NuPlayerDriver::NuPlayerDriver(pid_t pid): mState(STATE_IDLE),mIsAsyncPrepare(false),mAsyncResult(UNKNOWN_ERROR),mSetSurfaceInProgress(false),mDurationUs(-1),mPositionUs(-1),mSeekInProgress(false),mPlayingTimeUs(0),mRebufferingTimeUs(0),mRebufferingEvents(0),mRebufferingAtExit(false),// 初始化Looper 对象mLooper(new ALooper),mMediaClock(new MediaClock),mPlayer(new NuPlayer(pid, mMediaClock)),mPlayerFlags(0),mMetricsItem(NULL),mClientUid(-1),mAtEOS(false),mLooping(false),mAutoLoop(false) {ALOGD("NuPlayerDriver(%p) created, clientPid(%d)", this, pid);mLooper->setName("NuPlayerDriver Looper");mMediaClock->init();// set up an analytics recordmMetricsItem = mediametrics::Item::create(kKeyPlayer);// 启动线程mLooper->start(// 下列2 个参数,1 不运行在本地,即创建一个线程;2.可以被java端调用false, /* runOnCallingThread */true,  /* canCallJava */PRIORITY_AUDIO);mLooper->registerHandler(mPlayer);mPlayer->init(this);
}

 

runOnCallingThread 决定了当调用线程调用Alooper::start函数后,取消息,发送消息的工作在当前线程执行,还是子线程执行。区别在于:

  • 如果runOnCallingThread = true:那么当前线程不会再做其它工作,陷入一个死循环。用于循环执行loop()函数。
  • 如果runOnCallingThread = false:会创建一个子线程,并将loop()逻辑放到这个特定子线程中处理。

  • /frameworks/av/media/libstagefright/foundation/ALooper.cpp
status_t ALooper::start(bool runOnCallingThread, bool canCallJava, int32_t priority) {// 如果runOnCallingThread 为true,则在调用的线程中轮询if (runOnCallingThread) {{Mutex::Autolock autoLock(mLock);if (mThread != NULL || mRunningLocally) {return INVALID_OPERATION;}mRunningLocally = true;}do {} while (loop());return OK;}// 如果runOnCallingThread 为false,则创建一个线程Mutex::Autolock autoLock(mLock);if (mThread != NULL || mRunningLocally) {return INVALID_OPERATION;}// 创建 LooperThread对象mThread = new LooperThread(this, canCallJava);// 启动这个线程status_t err = mThread->run(mName.empty() ? "ALooper" : mName.c_str(), priority);if (err != OK) {mThread.clear();}return err;
}

创建 LooperThread对象并启动线程:

  • /frameworks/av/media/libstagefright/foundation/ALooper.cpp
// LooperThread 继承了 Thread,调用了父类的run 方法
struct ALooper::LooperThread : public Thread {LooperThread(ALooper *looper, bool canCallJava): Thread(canCallJava),mLooper(looper),mThreadId(NULL) {}virtual status_t readyToRun() {mThreadId = androidGetThreadId();return Thread::readyToRun();}// 重写了父类的方法virtual bool threadLoop() {return mLooper->loop();}bool isCurrentThread() const {return mThreadId == androidGetThreadId();}

  • /system/core/libutils/Threads.cpp
status_t Thread::run(const char* name, int32_t priority, size_t stack)
{LOG_ALWAYS_FATAL_IF(name == nullptr, "thread name not provided to Thread::run");Mutex::Autolock _l(mLock);if (mRunning) {// thread already startedreturn INVALID_OPERATION;}// reset status and exitPending to their default value, so we can// try again after an error happened (either below, or in readyToRun())mStatus = OK;mExitPending = false;mThread = thread_id_t(-1);// hold a strong reference on ourselfmHoldSelf = this;mRunning = true;bool res;// 可以被java 调用,走到如下if (mCanCallJava) {
// 有传入方法:_threadLoopres = createThreadEtc(_threadLoop,this, name, priority, stack, &mThread);} else {//其实此流程也是相似的res = androidCreateRawThreadEtc(_threadLoop,this, name, priority, stack, &mThread);}if (res == false) {mStatus = UNKNOWN_ERROR;   // something happened!mRunning = false;mThread = thread_id_t(-1);mHoldSelf.clear();  // "this" may have gone away after this.return UNKNOWN_ERROR;}// Do not refer to mStatus here: The thread is already running (may, in fact// already have exited with a valid mStatus result). The OK indication// here merely indicates successfully starting the thread and does not// imply successful termination/execution.return OK;// Exiting scope of mLock is a memory barrier and allows new thread to run
}-------------
createThreadEtc 在头文件中
/system/core/libutils/include/utils/AndroidThreads.hinline bool createThreadEtc(thread_func_t entryFunction,void *userData,const char* threadName = "android:unnamed_thread",int32_t threadPriority = PRIORITY_DEFAULT,size_t threadStackSize = 0,thread_id_t *threadId = nullptr)
{return androidCreateThreadEtc(entryFunction, userData, threadName,threadPriority, threadStackSize, threadId) ? true : false;
}

androidCreateThreadEtc 方法

static android_create_thread_fn gCreateThreadFn = androidCreateRawThreadEtc;int androidCreateThreadEtc(android_thread_func_t entryFunction,void *userData,const char* threadName,int32_t threadPriority,size_t threadStackSize,android_thread_id_t *threadId)
{return gCreateThreadFn(entryFunction, userData, threadName,threadPriority, threadStackSize, threadId);
}-----------------
int androidCreateRawThreadEtc(android_thread_func_t entryFunction,void *userData,const char* threadName __android_unused,int32_t threadPriority,size_t threadStackSize,android_thread_id_t *threadId)
{pthread_attr_t attr;pthread_attr_init(&attr);pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);#if defined(__ANDROID__)  /* valgrind is rejecting RT-priority create reqs */if (threadPriority != PRIORITY_DEFAULT || threadName != NULL) {// Now that the pthread_t has a method to find the associated// android_thread_id_t (pid) from pthread_t, it would be possible to avoid// this trampoline in some cases as the parent could set the properties// for the child.  However, there would be a race condition because the// child becomes ready immediately, and it doesn't work for the name.// prctl(PR_SET_NAME) only works for self; prctl(PR_SET_THREAD_NAME) was// proposed but not yet accepted.thread_data_t* t = new thread_data_t;t->priority = threadPriority;t->threadName = threadName ? strdup(threadName) : NULL;t->entryFunction = entryFunction;t->userData = userData;// 这里对 entryFunction重新进行赋值了,其实就是调用对应函数,传入参数entryFunction = (android_thread_func_t)&thread_data_t::trampoline;userData = t;}
#endifif (threadStackSize) {pthread_attr_setstacksize(&attr, threadStackSize);}errno = 0;pthread_t thread;// 创建线程,然后执行 entryFunctionint result = pthread_create(&thread, &attr,(android_pthread_entry)entryFunction, userData);pthread_attr_destroy(&attr);if (result != 0) {ALOGE("androidCreateRawThreadEtc failed (entry=%p, res=%d, %s)\n""(android threadPriority=%d)",entryFunction, result, strerror(errno), threadPriority);return 0;}// Note that *threadID is directly available to the parent only, as it is// assigned after the child starts.  Use memory barrier / lock if the child// or other threads also need access.if (threadId != nullptr) {*threadId = (android_thread_id_t)thread; // XXX: this is not portable}return 1;
}

调用 _threadLoop 方法

int Thread::_threadLoop(void* user)
{Thread* const self = static_cast<Thread*>(user);sp<Thread> strong(self->mHoldSelf);wp<Thread> weak(strong);self->mHoldSelf.clear();#if defined(__ANDROID__)// this is very useful for debugging with gdbself->mTid = gettid();
#endifbool first = true;// while 循环do {bool result;if (first) {first = false;// 第一次循环才调用 readyToRun方法
// 先调用子类的 readyToRun方法,然后调用父类的,返回OKself->mStatus = self->readyToRun();result = (self->mStatus == OK);if (result && !self->exitPending()) {// Binder threads (and maybe others) rely on threadLoop// running at least once after a successful ::readyToRun()// (unless, of course, the thread has already been asked to exit// at that point).// This is because threads are essentially used like this://   (new ThreadSubclass())->run();// The caller therefore does not retain a strong reference to// the thread and the thread would simply disappear after the// successful ::readyToRun() call instead of entering the// threadLoop at least once.// 调用子类的 threadLoop 方法result = self->threadLoop();}} else {result = self->threadLoop();}// establish a scope for mLock{Mutex::Autolock _l(self->mLock);if (result == false || self->mExitPending) {self->mExitPending = true;self->mRunning = false;// clear thread ID so that requestExitAndWait() does not exit if// called by a new thread using the same thread ID as this one.self->mThread = thread_id_t(-1);// note that interested observers blocked in requestExitAndWait are// awoken by broadcast, but blocked on mLock until break exits scopeself->mThreadExitedCondition.broadcast();break;}}// Release our strong reference, to let a chance to the thread// to die a peaceful death.strong.clear();// And immediately, re-acquire a strong reference for the next loopstrong = weak.promote();} while(strong != nullptr);return 0;
}

// 调用子类的 threadLoop 方法

  • /frameworks/av/media/libstagefright/foundation/ALooper.cpp
struct ALooper::LooperThread : public Thread {LooperThread(ALooper *looper, bool canCallJava): Thread(canCallJava),mLooper(looper),mThreadId(NULL) {}virtual status_t readyToRun() {mThreadId = androidGetThreadId();return Thread::readyToRun();}virtual bool threadLoop() {return mLooper->loop();}--------------------
// 读取消息队列,做对应的操作
bool ALooper::loop() {Event event;{Mutex::Autolock autoLock(mLock);if (mThread == NULL && !mRunningLocally) {return false;}// 消息队列是空的,当前线程等待,直到被唤醒发回true
// post 会发signal,唤醒if (mEventQueue.empty()) {mQueueChangedCondition.wait(mLock);return true;}// 获取消息队列第一条消息的发送时间【即包含有延时发消息的情况】
// 在post 会设置 mWhenUsint64_t whenUs = (*mEventQueue.begin()).mWhenUs;int64_t nowUs = GetNowUs();if (whenUs > nowUs) {int64_t delayUs = whenUs - nowUs;if (delayUs > INT64_MAX / 1000) {delayUs = INT64_MAX / 1000;}// 如果第一条消息还没有到发送时间,则等待whenUs - nowUs后唤醒线程返回truemQueueChangedCondition.waitRelative(mLock, delayUs * 1000ll);return true;}// 取出队列的头节点,将其删除出队列,然后发送消息event = *mEventQueue.begin();mEventQueue.erase(mEventQueue.begin());}event.mMessage->deliver();// NOTE: It's important to note that at this point our "ALooper" object// may no longer exist (its final reference may have gone away while// delivering the message). We have made sure, however, that loop()// won't be called again.return true;
}

loop函数,总共做了以下几件事情:

  • 条件判断:判断是否初始化线程,并且线程是否在本地运行,如果否则返回false,使可能存在的循环停止。
  • 消息队列判断:判断消息队列中是否有消息,没有的话,让线程进入等待,直到有消息入队后被唤醒。
  • 消息发送判断:判断队列中,第一条小时发送时间是否满足,满足则发送消息,并将消息移出队列。否则让线程等待,一定时间(当前时间和发送时间的时间差)后,自动唤醒线程。

4. 发送消息

4.1 Post 发送消息

msg->post(nextLapseRealUs)

  • frameworks/av/media/libstagefright/foundation/AMessage.cpp
status_t AMessage::post(int64_t delayUs) {sp<ALooper> looper = mLooper.promote();if (looper == NULL) {ALOGW("failed to post message as target looper for handler %d is gone.", mTarget);return -ENOENT;}looper->post(this, delayUs);return OK;
}
  • frameworks/av/media/libstagefright/foundation/ALooper.cpp
void ALooper::post(const sp<AMessage> &msg, int64_t delayUs) {Mutex::Autolock autoLock(mLock);int64_t whenUs;if (delayUs > 0) {int64_t nowUs = GetNowUs();whenUs = (delayUs > INT64_MAX - nowUs ? INT64_MAX : nowUs + delayUs);} else {whenUs = GetNowUs();}List<Event>::iterator it = mEventQueue.begin();while (it != mEventQueue.end() && (*it).mWhenUs <= whenUs) {++it;}Event event;event.mWhenUs = whenUs;event.mMessage = msg;if (it == mEventQueue.begin()) {mQueueChangedCondition.signal();}mEventQueue.insert(it, event);
}

其实在 MediaClock的构造函数中,有启动 looper

  • frameworks/av/media/libstagefright/MediaClock.cpp
MediaClock::MediaClock(): mAnchorTimeMediaUs(-1),mAnchorTimeRealUs(-1),mMaxTimeMediaUs(INT64_MAX),mStartingTimeMediaUs(-1),mPlaybackRate(1.0),mGeneration(0) {mLooper = new ALooper;
// 设置名称mLooper->setName("MediaClock");
// start loopermLooper->start(false /* runOnCallingThread */,false /* canCallJava */,ANDROID_PRIORITY_AUDIO);
}-----------
// 在初始化中,有向looper 注册handler 为 this
void MediaClock::init() {mLooper->registerHandler(this);
}------------
frameworks/av/media/libstagefright/foundation/ALooperRoster.cppALooper::handler_id ALooperRoster::registerHandler(const sp<ALooper> &looper, const sp<AHandler> &handler) {Mutex::Autolock autoLock(mLock);if (handler->id() != 0) {CHECK(!"A handler must only be registered once.");return INVALID_OPERATION;}HandlerInfo info;info.mLooper = looper;info.mHandler = handler;ALooper::handler_id handlerID = mNextHandlerID++;mHandlers.add(handlerID, info);// 这里又调用 handler 去设置了id,一个id 对应了一个looperhandler->setID(handlerID, looper);return handlerID;
}

// start looper
    mLooper->start(

status_t ALooper::start(bool runOnCallingThread, bool canCallJava, int32_t priority) {if (runOnCallingThread) {{Mutex::Autolock autoLock(mLock);if (mThread != NULL || mRunningLocally) {return INVALID_OPERATION;}mRunningLocally = true;}do {
// loop 循环,从消息队列中获取message} while (loop());return OK;}Mutex::Autolock autoLock(mLock);if (mThread != NULL || mRunningLocally) {return INVALID_OPERATION;}// 创建了 LooperThread线程对象mThread = new LooperThread(this, canCallJava);// 运行这个线程status_t err = mThread->run(mName.empty() ? "ALooper" : mName.c_str(), priority);if (err != OK) {mThread.clear();}return err;
}
// 该线程继承了 Thread 
struct ALooper::LooperThread : public Thread {LooperThread(ALooper *looper, bool canCallJava): Thread(canCallJava),mLooper(looper),mThreadId(NULL) {}

loop()

bool ALooper::loop() {Event event;{Mutex::Autolock autoLock(mLock);if (mThread == NULL && !mRunningLocally) {return false;}
// 如果消息队列是空,则等待if (mEventQueue.empty()) {mQueueChangedCondition.wait(mLock);return true;}
// 看是否消息队列有设置延时int64_t whenUs = (*mEventQueue.begin()).mWhenUs;int64_t nowUs = GetNowUs();if (whenUs > nowUs) {int64_t delayUs = whenUs - nowUs;if (delayUs > INT64_MAX / 1000) {delayUs = INT64_MAX / 1000;}mQueueChangedCondition.waitRelative(mLock, delayUs * 1000ll);return true;}
// 获取消息队列的头部event,并且将其移出队列event = *mEventQueue.begin();mEventQueue.erase(mEventQueue.begin());}
// 调用message 去发送消息event.mMessage->deliver();// NOTE: It's important to note that at this point our "ALooper" object// may no longer exist (its final reference may have gone away while// delivering the message). We have made sure, however, that loop()// won't be called again.return true;
}

// 调用message 去发送消息

  • frameworks/av/media/libstagefright/foundation/AMessage.cpp
void AMessage::deliver() {sp<AHandler> handler = mHandler.promote();if (handler == NULL) {ALOGW("failed to deliver message as target handler %d is gone.", mTarget);return;}handler->deliverMessage(this);
}
  • frameworks/av/media/libstagefright/foundation/AHandler.cpp
void AHandler::deliverMessage(const sp<AMessage> &msg) {
// 应为 MediaClock继承了 AHandler
// 所以执行 MediaClock方法onMessageReceived(msg);mMessageCounter++;if (mVerboseStats) {uint32_t what = msg->what();ssize_t idx = mMessages.indexOfKey(what);if (idx < 0) {mMessages.add(what, 1);} else {mMessages.editValueAt(idx)++;}}
}
  • frameworks/av/media/libstagefright/MediaClock.cpp
void MediaClock::onMessageReceived(const sp<AMessage> &msg) {switch (msg->what()) {case kWhatTimeIsUp:{int32_t generation;CHECK(msg->findInt32("generation", &generation));Mutex::Autolock autoLock(mLock);if (generation != mGeneration) {break;}processTimers_l();break;}default:TRESPASS();break;}
}

4.2 postAndAwaitResponse 和 postReply

postAndAwaitResponse 和 postReply 是成对使用的,postAndAwaitResponse是作为消息发送端,用于接收到接收端的消息的返回。即,在返回应答值之前一直wait等待响应返回值(即AMessage消息)。postReply 是接收端返回消息给发送端。

以  MediaCodec设置回调为例子:

  • /frameworks/av/media/libstagefright/MediaCodec.cpp
status_t MediaCodec::setCallback(const sp<AMessage> &callback) {// 设置消息为:kWhatSetCallbacksp<AMessage> msg = new AMessage(kWhatSetCallback, this);// 设置消息的key,value值msg->setMessage("callback", callback);sp<AMessage> response;return PostAndAwaitResponse(msg, &response);
}-----------------
status_t MediaCodec::PostAndAwaitResponse(const sp<AMessage> &msg, sp<AMessage> *response) {// 调用 postAndAwaitResponse 方法status_t err = msg->postAndAwaitResponse(response);if (err != OK) {return err;}if (!(*response)->findInt32("err", &err)) {err = OK;}return err;
}
  • /frameworks/av/media/libstagefright/foundation/AMessage.cpp
status_t AMessage::postAndAwaitResponse(sp<AMessage> *response) {// 获取looper 对象sp<ALooper> looper = mLooper.promote();if (looper == NULL) {ALOGW("failed to post message as target looper for handler %d is gone.", mTarget);return -ENOENT;}// 获取tokensp<AReplyToken> token = looper->createReplyToken();if (token == NULL) {ALOGE("failed to create reply token");return -ENOMEM;}setObject("replyID", token);// 1. post 到消息队列中,然后调用AHandler子类 onMessageReceived处理looper->post(this, 0 /* delayUs */);// 2. 返回值,下列方法线程阻塞,需要等待消息接收方回复return looper->awaitResponse(token, response);
}

 关于 AReplyToken 回复令牌:

  1. AReplyToken:意味消息的回复令牌
  2. AReplyToken中包含消息是否已经被处理过的字段mReplied,如果处理过,mReplied字段被置为true。
  3. AReplyToken中包含了回复消息本身,体现在mReply字段。

// 1. post 到消息队列中,然后调用AHandler子类 onMessageReceived处理

  • /frameworks/av/media/libstagefright/foundation/ALooper.cpp
// 增加到消息队列中
void ALooper::post(const sp<AMessage> &msg, int64_t delayUs) {Mutex::Autolock autoLock(mLock);int64_t whenUs;if (delayUs > 0) {int64_t nowUs = GetNowUs();whenUs = (delayUs > INT64_MAX - nowUs ? INT64_MAX : nowUs + delayUs);} else {whenUs = GetNowUs();}List<Event>::iterator it = mEventQueue.begin();while (it != mEventQueue.end() && (*it).mWhenUs <= whenUs) {++it;}Event event;event.mWhenUs = whenUs;event.mMessage = msg;if (it == mEventQueue.begin()) {mQueueChangedCondition.signal();}mEventQueue.insert(it, event);
}// 线程loop 进行处理
bool ALooper::loop() {Event event;{

// 2. 返回值,下列方法线程阻塞,需要等待消息接收方回复

looper->awaitResponse(token, response);

  • /frameworks/av/media/libstagefright/foundation/ALooper.cpp
// to be called by AMessage::postAndAwaitResponse only
status_t ALooper::awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response) {// return status in case we want to handle an interrupted waitMutex::Autolock autoLock(mRepliesLock);CHECK(replyToken != NULL);// retrieveReply() 方法返回false,则还没有获取到接收端的消息,则轮询while (!replyToken->retrieveReply(response)) {{Mutex::Autolock autoLock(mLock);if (mThread == NULL) {return -ENOENT;}}// 消息发送端条件变量wait等待唤醒(获取锁)mRepliesCondition.wait(mRepliesLock);}return OK;
}==========
/frameworks/av/media/libstagefright/foundation/include/media/stagefright/foundation/AMessage.hbool retrieveReply(sp<AMessage> *reply) {// 需要接收端去设置mReplied 为trueif (mReplied) {*reply = mReply;
// 清除这个 responsemReply.clear();}return mReplied;}

用于同步的对象Condition,为Android中特有。它的函数有:

 

看下 onMessageReceived 处理:

void MediaCodec::onMessageReceived(const sp<AMessage> &msg) {switch (msg->what()) {case kWhatSetCallback:{sp<AReplyToken> replyID;// 1. 消息接收端响应 senderAwaitsResponseCHECK(msg->senderAwaitsResponse(&replyID));if (mState == UNINITIALIZED|| mState == INITIALIZING|| isExecuting()) {// callback can't be set after codec is executing,// or before it's initialized (as the callback// will be cleared when it goes to INITIALIZED)PostReplyWithError(replyID, INVALID_OPERATION);break;}sp<AMessage> callback;CHECK(msg->findMessage("callback", &callback));mCallback = callback;if (mCallback != NULL) {ALOGI("MediaCodec will operate in async mode");mFlags |= kFlagIsAsync;} else {mFlags &= ~kFlagIsAsync;}sp<AMessage> response = new AMessage;// 2. 应答消息发送端消息response->postReply(replyID);break;}

// 1. 消息接收端响应 senderAwaitsResponse

msg->senderAwaitsResponse(&replyID)

bool AMessage::senderAwaitsResponse(sp<AReplyToken> *replyToken) {sp<RefBase> tmp;// 上述有 setObject,这里去获取 tmpbool found = findObject("replyID", &tmp);if (!found) {return false;}*replyToken = static_cast<AReplyToken *>(tmp.get());// 清除 replyTokentmp.clear();// 设置为空setObject("replyID", tmp);// TODO: delete Object instead of setting it to NULLreturn *replyToken != NULL;
}

// 2. 应答消息发送端消息

response->postReply(replyID)

status_t AMessage::postReply(const sp<AReplyToken> &replyToken) {if (replyToken == NULL) {ALOGW("failed to post reply to a NULL token");return -ENOENT;}sp<ALooper> looper = replyToken->getLooper();if (looper == NULL) {ALOGW("failed to post reply as target looper is gone.");return -ENOENT;}return looper->postReply(replyToken, this);
}
  • /frameworks/av/media/libstagefright/foundation/ALooper.cpp
status_t ALooper::postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &reply) {Mutex::Autolock autoLock(mRepliesLock);
// 设置replystatus_t err = replyToken->setReply(reply);if (err == OK) {// 若设置应答数据成功,则唤醒在该条件变量上正在等待【mRepliesLock】锁的其他线程mRepliesCondition.broadcast();}return err;
}--------------
/frameworks/av/media/libstagefright/foundation/AMessage.cppstatus_t AReplyToken::setReply(const sp<AMessage> &reply) {if (mReplied) {ALOGE("trying to post a duplicate reply");return -EBUSY;}CHECK(mReply == NULL);mReply = reply;// 这里设置了 mReplied  为truemReplied = true;return OK;
}

然后:status_t MediaCodec::setCallback 方法返回 OK

由此可知:

postAndAwaitResponse()和postReply()必须出成对出现的。其实这就是一个线程间异步通信的过程。

这篇关于【安卓R 源码】native层媒体模块通信AHandler机制源码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/667999

相关文章

Go中sync.Once源码的深度讲解

《Go中sync.Once源码的深度讲解》sync.Once是Go语言标准库中的一个同步原语,用于确保某个操作只执行一次,本文将从源码出发为大家详细介绍一下sync.Once的具体使用,x希望对大家有... 目录概念简单示例源码解读总结概念sync.Once是Go语言标准库中的一个同步原语,用于确保某个操

一文带你理解Python中import机制与importlib的妙用

《一文带你理解Python中import机制与importlib的妙用》在Python编程的世界里,import语句是开发者最常用的工具之一,它就像一把钥匙,打开了通往各种功能和库的大门,下面就跟随小... 目录一、python import机制概述1.1 import语句的基本用法1.2 模块缓存机制1.

Redis主从/哨兵机制原理分析

《Redis主从/哨兵机制原理分析》本文介绍了Redis的主从复制和哨兵机制,主从复制实现了数据的热备份和负载均衡,而哨兵机制可以监控Redis集群,实现自动故障转移,哨兵机制通过监控、下线、选举和故... 目录一、主从复制1.1 什么是主从复制1.2 主从复制的作用1.3 主从复制原理1.3.1 全量复制

Redis缓存问题与缓存更新机制详解

《Redis缓存问题与缓存更新机制详解》本文主要介绍了缓存问题及其解决方案,包括缓存穿透、缓存击穿、缓存雪崩等问题的成因以及相应的预防和解决方法,同时,还详细探讨了缓存更新机制,包括不同情况下的缓存更... 目录一、缓存问题1.1 缓存穿透1.1.1 问题来源1.1.2 解决方案1.2 缓存击穿1.2.1

Java如何通过反射机制获取数据类对象的属性及方法

《Java如何通过反射机制获取数据类对象的属性及方法》文章介绍了如何使用Java反射机制获取类对象的所有属性及其对应的get、set方法,以及如何通过反射机制实现类对象的实例化,感兴趣的朋友跟随小编一... 目录一、通过反射机制获取类对象的所有属性以及相应的get、set方法1.遍历类对象的所有属性2.获取

MySQL中的锁和MVCC机制解读

《MySQL中的锁和MVCC机制解读》MySQL事务、锁和MVCC机制是确保数据库操作原子性、一致性和隔离性的关键,事务必须遵循ACID原则,锁的类型包括表级锁、行级锁和意向锁,MVCC通过非锁定读和... 目录mysql的锁和MVCC机制事务的概念与ACID特性锁的类型及其工作机制锁的粒度与性能影响多版本

多模块的springboot项目发布指定模块的脚本方式

《多模块的springboot项目发布指定模块的脚本方式》该文章主要介绍了如何在多模块的SpringBoot项目中发布指定模块的脚本,作者原先的脚本会清理并编译所有模块,导致发布时间过长,通过简化脚本... 目录多模块的springboot项目发布指定模块的脚本1、不计成本地全部发布2、指定模块发布总结多模

Python中构建终端应用界面利器Blessed模块的使用

《Python中构建终端应用界面利器Blessed模块的使用》Blessed库作为一个轻量级且功能强大的解决方案,开始在开发者中赢得口碑,今天,我们就一起来探索一下它是如何让终端UI开发变得轻松而高... 目录一、安装与配置:简单、快速、无障碍二、基本功能:从彩色文本到动态交互1. 显示基本内容2. 创建链

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

Node.js 中 http 模块的深度剖析与实战应用小结

《Node.js中http模块的深度剖析与实战应用小结》本文详细介绍了Node.js中的http模块,从创建HTTP服务器、处理请求与响应,到获取请求参数,每个环节都通过代码示例进行解析,旨在帮... 目录Node.js 中 http 模块的深度剖析与实战应用一、引言二、创建 HTTP 服务器:基石搭建(一