【安卓R 源码】native层媒体模块通信AHandler机制源码

2024-02-01 17:50

本文主要是介绍【安卓R 源码】native层媒体模块通信AHandler机制源码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本章将分析native层媒体模块通信AHandler机制源码实现,类图关系如下:

类图

 from:Android native层媒体通信架构AHandler/ALooper机制实现源码分析【Part 1】_小白兔LHao的博客-CSDN博客

Native层和Java层的Handler 处理机制的不同

(1)native层Handler机制的Looper实现中会自动创建一个独立线程,而java层Handler需要应用层自己实现一个线程来完成,当然你也可以使用java层HandlerThread类更简单完成。
(2)一些类似相同作用的方法的属于实现者类有区别,即放置的地方不同如发送消息事件的方法,在java层位于Handler来完成发送消息并完成接收处理。而native层发送消息是由消息对象(AMessage)来完成发送,Handler来完成接收处理的。

frameworks/av/media/libstagefright/MediaClock.cpp

void MediaClock::processTimers_l() {int64_t nowMediaTimeUs;status_t status = getMediaTime_l(ALooper::GetNowUs(), &nowMediaTimeUs, false /* allowPastMaxTime */);。。。。。。。。// 1. 获取AMessage 对象sp<AMessage> msg = new AMessage(kWhatTimeIsUp, this);// 2. 设置值msg->setInt32("generation", mGeneration);// 3. 发送消息msg->post(nextLapseRealUs);
}
// [frameworks/av/media/libstagefright/include/media/stagefright/MediaClock.h]// MediaClock 继承了父类 AHandler 
struct MediaClock : public AHandler {virtual void onMessageReceived(const sp<AMessage> &msg);private:sp<ALooper> mLooper;
}

 Handler, Looper 和 Message 的关系

在这里插入图片描述

 

1. 获取AMessage 对象

new AMessage(kWhatTimeIsUp, this);

frameworks/av/media/libstagefright/foundation/AMessage.cpp

// 设置了 mWhat,传入 handler,其实handler 就是 MediaClock
// 最终还是会回到 MediaClock
AMessage::AMessage(uint32_t what, const sp<const AHandler> &handler): mWhat(what) {setTarget(handler);
}----------
void AMessage::setTarget(const sp<const AHandler> &handler) {if (handler == NULL) {mTarget = 0;mHandler.clear();mLooper.clear();} else {mTarget = handler->id();mHandler = handler->getHandler();
// 获取loopermLooper = handler->getLooper();}
}

 2. 设置数据值

frameworks/av/media/libstagefright/foundation/include/media/stagefright/foundation/AMessage.h

struct AMessage : public RefBase {AMessage();// 主要实现的设置数据参数setXXX()方法有如下几个声明void setInt32(const char *name, int32_t value);void setInt64(const char *name, int64_t value);void setSize(const char *name, size_t value);void setFloat(const char *name, float value);void setDouble(const char *name, double value);void setPointer(const char *name, void *value);void setString(const char *name, const char *s, ssize_t len = -1);void setString(const char *name, const AString &s);void setObject(const char *name, const sp<RefBase> &obj);void setBuffer(const char *name, const sp<ABuffer> &buffer);void setMessage(const char *name, const sp<AMessage> &obj);void setRect(const char *name,int32_t left, int32_t top, int32_t right, int32_t bottom);

方法的实现是在宏定义中实现:

  • frameworks/av/media/libstagefright/foundation/AMessage.cpp
#define BASIC_TYPE(NAME,FIELDNAME,TYPENAME)                             \
void AMessage::set##NAME(const char *name, TYPENAME value) {            \// 1. 这里有个   allocateItem 方法,增加元素到mItems 数组 Item *item = allocateItem(name);                                    \if (item) {                                                         \item->mType = kType##NAME;                                      \item->u.FIELDNAME = value;                                      \}                                                                   \
}                                                                       \\
/* NOLINT added to avoid incorrect warning/fix from clang.tidy */       \
bool AMessage::find##NAME(const char *name, TYPENAME *value) const {  /* NOLINT */ \// 2. 找到对应的item,依据name 返回对应的元素const Item *item = findItem(name, kType##NAME);                     \if (item) {                                                         \*value = item->u.FIELDNAME;                                     \return true;                                                    \}                                                                   \return false;                                                       \
}BASIC_TYPE(Int32,int32Value,int32_t)
BASIC_TYPE(Int64,int64Value,int64_t)
BASIC_TYPE(Size,sizeValue,size_t)
BASIC_TYPE(Float,floatValue,float)
BASIC_TYPE(Double,doubleValue,double)
BASIC_TYPE(Pointer,ptrValue,void *)#undef BASIC_TYPE

Item 是个结构体

    struct Item {// 联合体union {int32_t int32Value;int64_t int64Value;size_t sizeValue;float floatValue;double doubleValue;void *ptrValue;RefBase *refValue;AString *stringValue;Rect rectValue;} u;const char *mName;size_t      mNameLength;Type mType;void setName(const char *name, size_t len);Item() : mName(nullptr), mNameLength(0), mType(kTypeInt32) { }Item(const char *name, size_t length);};enum {kMaxNumItems = 256};// mItems 是一个数组std::vector<Item> mItems;

有6种 setXXX() 设置数据参数方法通过宏定义来完成实现,那么也肯定会有对应的6种 findXXX() 查询获取对应数据参数值的方法。
并且存储的每一个参数数据都是通过一个创建一个Key值名称为(name)参数名的数据项对象来缓存的,并且最终会将整个AMessage中的每个参数数据都添加到数据项 Item 类型的数组参数集中。【注意:该数组只允许最多携带64个数据项数据,若超出则程序将抛错并停止运行】
 

 // 1. 这里有个   allocateItem 方法,增加元素到mItems 数组 

frameworks/av/media/libstagefright/foundation/AMessage.cpp

AMessage::Item *AMessage::allocateItem(const char *name) {size_t len = strlen(name);size_t i = findItemIndex(name, len);Item *item;if (i < mItems.size()) {item = &mItems[i];freeItemValue(item);} else {//默认是 kTypeInt32,4个字节,kMaxNumItems为256,默认是64个数据CHECK(mItems.size() < kMaxNumItems);i = mItems.size();// place a 'blank' item at the end - this is of type kTypeInt32// 传入的参数为如下:
// Item() : mName(nullptr), mNameLength(0), mType(kTypeInt32) 默认是 kTypeInt32
// mName为字符串指针mItems.emplace_back(name, len);// 返回item 的指针item = &mItems[i];}return item;
}

// 2. 找到对应的item,依据name 返回对应的元素

const AMessage::Item *AMessage::findItem(const char *name, Type type) const {/// 找到数据的索引值size_t i = findItemIndex(name, strlen(name));if (i < mItems.size()) {// 依据索引值获取对应 item const Item *item = &mItems[i];
// item 的数据类型return item->mType == type ? item : NULL;}return NULL;
}

3. 线程启动,开启循环消息队列流程

以NuPlayerDriver 的构造函数看下:

  • /frameworks/av/media/libmediaplayerservice/nuplayer/NuPlayerDriver.cpp
NuPlayerDriver::NuPlayerDriver(pid_t pid): mState(STATE_IDLE),mIsAsyncPrepare(false),mAsyncResult(UNKNOWN_ERROR),mSetSurfaceInProgress(false),mDurationUs(-1),mPositionUs(-1),mSeekInProgress(false),mPlayingTimeUs(0),mRebufferingTimeUs(0),mRebufferingEvents(0),mRebufferingAtExit(false),// 初始化Looper 对象mLooper(new ALooper),mMediaClock(new MediaClock),mPlayer(new NuPlayer(pid, mMediaClock)),mPlayerFlags(0),mMetricsItem(NULL),mClientUid(-1),mAtEOS(false),mLooping(false),mAutoLoop(false) {ALOGD("NuPlayerDriver(%p) created, clientPid(%d)", this, pid);mLooper->setName("NuPlayerDriver Looper");mMediaClock->init();// set up an analytics recordmMetricsItem = mediametrics::Item::create(kKeyPlayer);// 启动线程mLooper->start(// 下列2 个参数,1 不运行在本地,即创建一个线程;2.可以被java端调用false, /* runOnCallingThread */true,  /* canCallJava */PRIORITY_AUDIO);mLooper->registerHandler(mPlayer);mPlayer->init(this);
}

 

runOnCallingThread 决定了当调用线程调用Alooper::start函数后,取消息,发送消息的工作在当前线程执行,还是子线程执行。区别在于:

  • 如果runOnCallingThread = true:那么当前线程不会再做其它工作,陷入一个死循环。用于循环执行loop()函数。
  • 如果runOnCallingThread = false:会创建一个子线程,并将loop()逻辑放到这个特定子线程中处理。

  • /frameworks/av/media/libstagefright/foundation/ALooper.cpp
status_t ALooper::start(bool runOnCallingThread, bool canCallJava, int32_t priority) {// 如果runOnCallingThread 为true,则在调用的线程中轮询if (runOnCallingThread) {{Mutex::Autolock autoLock(mLock);if (mThread != NULL || mRunningLocally) {return INVALID_OPERATION;}mRunningLocally = true;}do {} while (loop());return OK;}// 如果runOnCallingThread 为false,则创建一个线程Mutex::Autolock autoLock(mLock);if (mThread != NULL || mRunningLocally) {return INVALID_OPERATION;}// 创建 LooperThread对象mThread = new LooperThread(this, canCallJava);// 启动这个线程status_t err = mThread->run(mName.empty() ? "ALooper" : mName.c_str(), priority);if (err != OK) {mThread.clear();}return err;
}

创建 LooperThread对象并启动线程:

  • /frameworks/av/media/libstagefright/foundation/ALooper.cpp
// LooperThread 继承了 Thread,调用了父类的run 方法
struct ALooper::LooperThread : public Thread {LooperThread(ALooper *looper, bool canCallJava): Thread(canCallJava),mLooper(looper),mThreadId(NULL) {}virtual status_t readyToRun() {mThreadId = androidGetThreadId();return Thread::readyToRun();}// 重写了父类的方法virtual bool threadLoop() {return mLooper->loop();}bool isCurrentThread() const {return mThreadId == androidGetThreadId();}

  • /system/core/libutils/Threads.cpp
status_t Thread::run(const char* name, int32_t priority, size_t stack)
{LOG_ALWAYS_FATAL_IF(name == nullptr, "thread name not provided to Thread::run");Mutex::Autolock _l(mLock);if (mRunning) {// thread already startedreturn INVALID_OPERATION;}// reset status and exitPending to their default value, so we can// try again after an error happened (either below, or in readyToRun())mStatus = OK;mExitPending = false;mThread = thread_id_t(-1);// hold a strong reference on ourselfmHoldSelf = this;mRunning = true;bool res;// 可以被java 调用,走到如下if (mCanCallJava) {
// 有传入方法:_threadLoopres = createThreadEtc(_threadLoop,this, name, priority, stack, &mThread);} else {//其实此流程也是相似的res = androidCreateRawThreadEtc(_threadLoop,this, name, priority, stack, &mThread);}if (res == false) {mStatus = UNKNOWN_ERROR;   // something happened!mRunning = false;mThread = thread_id_t(-1);mHoldSelf.clear();  // "this" may have gone away after this.return UNKNOWN_ERROR;}// Do not refer to mStatus here: The thread is already running (may, in fact// already have exited with a valid mStatus result). The OK indication// here merely indicates successfully starting the thread and does not// imply successful termination/execution.return OK;// Exiting scope of mLock is a memory barrier and allows new thread to run
}-------------
createThreadEtc 在头文件中
/system/core/libutils/include/utils/AndroidThreads.hinline bool createThreadEtc(thread_func_t entryFunction,void *userData,const char* threadName = "android:unnamed_thread",int32_t threadPriority = PRIORITY_DEFAULT,size_t threadStackSize = 0,thread_id_t *threadId = nullptr)
{return androidCreateThreadEtc(entryFunction, userData, threadName,threadPriority, threadStackSize, threadId) ? true : false;
}

androidCreateThreadEtc 方法

static android_create_thread_fn gCreateThreadFn = androidCreateRawThreadEtc;int androidCreateThreadEtc(android_thread_func_t entryFunction,void *userData,const char* threadName,int32_t threadPriority,size_t threadStackSize,android_thread_id_t *threadId)
{return gCreateThreadFn(entryFunction, userData, threadName,threadPriority, threadStackSize, threadId);
}-----------------
int androidCreateRawThreadEtc(android_thread_func_t entryFunction,void *userData,const char* threadName __android_unused,int32_t threadPriority,size_t threadStackSize,android_thread_id_t *threadId)
{pthread_attr_t attr;pthread_attr_init(&attr);pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);#if defined(__ANDROID__)  /* valgrind is rejecting RT-priority create reqs */if (threadPriority != PRIORITY_DEFAULT || threadName != NULL) {// Now that the pthread_t has a method to find the associated// android_thread_id_t (pid) from pthread_t, it would be possible to avoid// this trampoline in some cases as the parent could set the properties// for the child.  However, there would be a race condition because the// child becomes ready immediately, and it doesn't work for the name.// prctl(PR_SET_NAME) only works for self; prctl(PR_SET_THREAD_NAME) was// proposed but not yet accepted.thread_data_t* t = new thread_data_t;t->priority = threadPriority;t->threadName = threadName ? strdup(threadName) : NULL;t->entryFunction = entryFunction;t->userData = userData;// 这里对 entryFunction重新进行赋值了,其实就是调用对应函数,传入参数entryFunction = (android_thread_func_t)&thread_data_t::trampoline;userData = t;}
#endifif (threadStackSize) {pthread_attr_setstacksize(&attr, threadStackSize);}errno = 0;pthread_t thread;// 创建线程,然后执行 entryFunctionint result = pthread_create(&thread, &attr,(android_pthread_entry)entryFunction, userData);pthread_attr_destroy(&attr);if (result != 0) {ALOGE("androidCreateRawThreadEtc failed (entry=%p, res=%d, %s)\n""(android threadPriority=%d)",entryFunction, result, strerror(errno), threadPriority);return 0;}// Note that *threadID is directly available to the parent only, as it is// assigned after the child starts.  Use memory barrier / lock if the child// or other threads also need access.if (threadId != nullptr) {*threadId = (android_thread_id_t)thread; // XXX: this is not portable}return 1;
}

调用 _threadLoop 方法

int Thread::_threadLoop(void* user)
{Thread* const self = static_cast<Thread*>(user);sp<Thread> strong(self->mHoldSelf);wp<Thread> weak(strong);self->mHoldSelf.clear();#if defined(__ANDROID__)// this is very useful for debugging with gdbself->mTid = gettid();
#endifbool first = true;// while 循环do {bool result;if (first) {first = false;// 第一次循环才调用 readyToRun方法
// 先调用子类的 readyToRun方法,然后调用父类的,返回OKself->mStatus = self->readyToRun();result = (self->mStatus == OK);if (result && !self->exitPending()) {// Binder threads (and maybe others) rely on threadLoop// running at least once after a successful ::readyToRun()// (unless, of course, the thread has already been asked to exit// at that point).// This is because threads are essentially used like this://   (new ThreadSubclass())->run();// The caller therefore does not retain a strong reference to// the thread and the thread would simply disappear after the// successful ::readyToRun() call instead of entering the// threadLoop at least once.// 调用子类的 threadLoop 方法result = self->threadLoop();}} else {result = self->threadLoop();}// establish a scope for mLock{Mutex::Autolock _l(self->mLock);if (result == false || self->mExitPending) {self->mExitPending = true;self->mRunning = false;// clear thread ID so that requestExitAndWait() does not exit if// called by a new thread using the same thread ID as this one.self->mThread = thread_id_t(-1);// note that interested observers blocked in requestExitAndWait are// awoken by broadcast, but blocked on mLock until break exits scopeself->mThreadExitedCondition.broadcast();break;}}// Release our strong reference, to let a chance to the thread// to die a peaceful death.strong.clear();// And immediately, re-acquire a strong reference for the next loopstrong = weak.promote();} while(strong != nullptr);return 0;
}

// 调用子类的 threadLoop 方法

  • /frameworks/av/media/libstagefright/foundation/ALooper.cpp
struct ALooper::LooperThread : public Thread {LooperThread(ALooper *looper, bool canCallJava): Thread(canCallJava),mLooper(looper),mThreadId(NULL) {}virtual status_t readyToRun() {mThreadId = androidGetThreadId();return Thread::readyToRun();}virtual bool threadLoop() {return mLooper->loop();}--------------------
// 读取消息队列,做对应的操作
bool ALooper::loop() {Event event;{Mutex::Autolock autoLock(mLock);if (mThread == NULL && !mRunningLocally) {return false;}// 消息队列是空的,当前线程等待,直到被唤醒发回true
// post 会发signal,唤醒if (mEventQueue.empty()) {mQueueChangedCondition.wait(mLock);return true;}// 获取消息队列第一条消息的发送时间【即包含有延时发消息的情况】
// 在post 会设置 mWhenUsint64_t whenUs = (*mEventQueue.begin()).mWhenUs;int64_t nowUs = GetNowUs();if (whenUs > nowUs) {int64_t delayUs = whenUs - nowUs;if (delayUs > INT64_MAX / 1000) {delayUs = INT64_MAX / 1000;}// 如果第一条消息还没有到发送时间,则等待whenUs - nowUs后唤醒线程返回truemQueueChangedCondition.waitRelative(mLock, delayUs * 1000ll);return true;}// 取出队列的头节点,将其删除出队列,然后发送消息event = *mEventQueue.begin();mEventQueue.erase(mEventQueue.begin());}event.mMessage->deliver();// NOTE: It's important to note that at this point our "ALooper" object// may no longer exist (its final reference may have gone away while// delivering the message). We have made sure, however, that loop()// won't be called again.return true;
}

loop函数,总共做了以下几件事情:

  • 条件判断:判断是否初始化线程,并且线程是否在本地运行,如果否则返回false,使可能存在的循环停止。
  • 消息队列判断:判断消息队列中是否有消息,没有的话,让线程进入等待,直到有消息入队后被唤醒。
  • 消息发送判断:判断队列中,第一条小时发送时间是否满足,满足则发送消息,并将消息移出队列。否则让线程等待,一定时间(当前时间和发送时间的时间差)后,自动唤醒线程。

4. 发送消息

4.1 Post 发送消息

msg->post(nextLapseRealUs)

  • frameworks/av/media/libstagefright/foundation/AMessage.cpp
status_t AMessage::post(int64_t delayUs) {sp<ALooper> looper = mLooper.promote();if (looper == NULL) {ALOGW("failed to post message as target looper for handler %d is gone.", mTarget);return -ENOENT;}looper->post(this, delayUs);return OK;
}
  • frameworks/av/media/libstagefright/foundation/ALooper.cpp
void ALooper::post(const sp<AMessage> &msg, int64_t delayUs) {Mutex::Autolock autoLock(mLock);int64_t whenUs;if (delayUs > 0) {int64_t nowUs = GetNowUs();whenUs = (delayUs > INT64_MAX - nowUs ? INT64_MAX : nowUs + delayUs);} else {whenUs = GetNowUs();}List<Event>::iterator it = mEventQueue.begin();while (it != mEventQueue.end() && (*it).mWhenUs <= whenUs) {++it;}Event event;event.mWhenUs = whenUs;event.mMessage = msg;if (it == mEventQueue.begin()) {mQueueChangedCondition.signal();}mEventQueue.insert(it, event);
}

其实在 MediaClock的构造函数中,有启动 looper

  • frameworks/av/media/libstagefright/MediaClock.cpp
MediaClock::MediaClock(): mAnchorTimeMediaUs(-1),mAnchorTimeRealUs(-1),mMaxTimeMediaUs(INT64_MAX),mStartingTimeMediaUs(-1),mPlaybackRate(1.0),mGeneration(0) {mLooper = new ALooper;
// 设置名称mLooper->setName("MediaClock");
// start loopermLooper->start(false /* runOnCallingThread */,false /* canCallJava */,ANDROID_PRIORITY_AUDIO);
}-----------
// 在初始化中,有向looper 注册handler 为 this
void MediaClock::init() {mLooper->registerHandler(this);
}------------
frameworks/av/media/libstagefright/foundation/ALooperRoster.cppALooper::handler_id ALooperRoster::registerHandler(const sp<ALooper> &looper, const sp<AHandler> &handler) {Mutex::Autolock autoLock(mLock);if (handler->id() != 0) {CHECK(!"A handler must only be registered once.");return INVALID_OPERATION;}HandlerInfo info;info.mLooper = looper;info.mHandler = handler;ALooper::handler_id handlerID = mNextHandlerID++;mHandlers.add(handlerID, info);// 这里又调用 handler 去设置了id,一个id 对应了一个looperhandler->setID(handlerID, looper);return handlerID;
}

// start looper
    mLooper->start(

status_t ALooper::start(bool runOnCallingThread, bool canCallJava, int32_t priority) {if (runOnCallingThread) {{Mutex::Autolock autoLock(mLock);if (mThread != NULL || mRunningLocally) {return INVALID_OPERATION;}mRunningLocally = true;}do {
// loop 循环,从消息队列中获取message} while (loop());return OK;}Mutex::Autolock autoLock(mLock);if (mThread != NULL || mRunningLocally) {return INVALID_OPERATION;}// 创建了 LooperThread线程对象mThread = new LooperThread(this, canCallJava);// 运行这个线程status_t err = mThread->run(mName.empty() ? "ALooper" : mName.c_str(), priority);if (err != OK) {mThread.clear();}return err;
}
// 该线程继承了 Thread 
struct ALooper::LooperThread : public Thread {LooperThread(ALooper *looper, bool canCallJava): Thread(canCallJava),mLooper(looper),mThreadId(NULL) {}

loop()

bool ALooper::loop() {Event event;{Mutex::Autolock autoLock(mLock);if (mThread == NULL && !mRunningLocally) {return false;}
// 如果消息队列是空,则等待if (mEventQueue.empty()) {mQueueChangedCondition.wait(mLock);return true;}
// 看是否消息队列有设置延时int64_t whenUs = (*mEventQueue.begin()).mWhenUs;int64_t nowUs = GetNowUs();if (whenUs > nowUs) {int64_t delayUs = whenUs - nowUs;if (delayUs > INT64_MAX / 1000) {delayUs = INT64_MAX / 1000;}mQueueChangedCondition.waitRelative(mLock, delayUs * 1000ll);return true;}
// 获取消息队列的头部event,并且将其移出队列event = *mEventQueue.begin();mEventQueue.erase(mEventQueue.begin());}
// 调用message 去发送消息event.mMessage->deliver();// NOTE: It's important to note that at this point our "ALooper" object// may no longer exist (its final reference may have gone away while// delivering the message). We have made sure, however, that loop()// won't be called again.return true;
}

// 调用message 去发送消息

  • frameworks/av/media/libstagefright/foundation/AMessage.cpp
void AMessage::deliver() {sp<AHandler> handler = mHandler.promote();if (handler == NULL) {ALOGW("failed to deliver message as target handler %d is gone.", mTarget);return;}handler->deliverMessage(this);
}
  • frameworks/av/media/libstagefright/foundation/AHandler.cpp
void AHandler::deliverMessage(const sp<AMessage> &msg) {
// 应为 MediaClock继承了 AHandler
// 所以执行 MediaClock方法onMessageReceived(msg);mMessageCounter++;if (mVerboseStats) {uint32_t what = msg->what();ssize_t idx = mMessages.indexOfKey(what);if (idx < 0) {mMessages.add(what, 1);} else {mMessages.editValueAt(idx)++;}}
}
  • frameworks/av/media/libstagefright/MediaClock.cpp
void MediaClock::onMessageReceived(const sp<AMessage> &msg) {switch (msg->what()) {case kWhatTimeIsUp:{int32_t generation;CHECK(msg->findInt32("generation", &generation));Mutex::Autolock autoLock(mLock);if (generation != mGeneration) {break;}processTimers_l();break;}default:TRESPASS();break;}
}

4.2 postAndAwaitResponse 和 postReply

postAndAwaitResponse 和 postReply 是成对使用的,postAndAwaitResponse是作为消息发送端,用于接收到接收端的消息的返回。即,在返回应答值之前一直wait等待响应返回值(即AMessage消息)。postReply 是接收端返回消息给发送端。

以  MediaCodec设置回调为例子:

  • /frameworks/av/media/libstagefright/MediaCodec.cpp
status_t MediaCodec::setCallback(const sp<AMessage> &callback) {// 设置消息为:kWhatSetCallbacksp<AMessage> msg = new AMessage(kWhatSetCallback, this);// 设置消息的key,value值msg->setMessage("callback", callback);sp<AMessage> response;return PostAndAwaitResponse(msg, &response);
}-----------------
status_t MediaCodec::PostAndAwaitResponse(const sp<AMessage> &msg, sp<AMessage> *response) {// 调用 postAndAwaitResponse 方法status_t err = msg->postAndAwaitResponse(response);if (err != OK) {return err;}if (!(*response)->findInt32("err", &err)) {err = OK;}return err;
}
  • /frameworks/av/media/libstagefright/foundation/AMessage.cpp
status_t AMessage::postAndAwaitResponse(sp<AMessage> *response) {// 获取looper 对象sp<ALooper> looper = mLooper.promote();if (looper == NULL) {ALOGW("failed to post message as target looper for handler %d is gone.", mTarget);return -ENOENT;}// 获取tokensp<AReplyToken> token = looper->createReplyToken();if (token == NULL) {ALOGE("failed to create reply token");return -ENOMEM;}setObject("replyID", token);// 1. post 到消息队列中,然后调用AHandler子类 onMessageReceived处理looper->post(this, 0 /* delayUs */);// 2. 返回值,下列方法线程阻塞,需要等待消息接收方回复return looper->awaitResponse(token, response);
}

 关于 AReplyToken 回复令牌:

  1. AReplyToken:意味消息的回复令牌
  2. AReplyToken中包含消息是否已经被处理过的字段mReplied,如果处理过,mReplied字段被置为true。
  3. AReplyToken中包含了回复消息本身,体现在mReply字段。

// 1. post 到消息队列中,然后调用AHandler子类 onMessageReceived处理

  • /frameworks/av/media/libstagefright/foundation/ALooper.cpp
// 增加到消息队列中
void ALooper::post(const sp<AMessage> &msg, int64_t delayUs) {Mutex::Autolock autoLock(mLock);int64_t whenUs;if (delayUs > 0) {int64_t nowUs = GetNowUs();whenUs = (delayUs > INT64_MAX - nowUs ? INT64_MAX : nowUs + delayUs);} else {whenUs = GetNowUs();}List<Event>::iterator it = mEventQueue.begin();while (it != mEventQueue.end() && (*it).mWhenUs <= whenUs) {++it;}Event event;event.mWhenUs = whenUs;event.mMessage = msg;if (it == mEventQueue.begin()) {mQueueChangedCondition.signal();}mEventQueue.insert(it, event);
}// 线程loop 进行处理
bool ALooper::loop() {Event event;{

// 2. 返回值,下列方法线程阻塞,需要等待消息接收方回复

looper->awaitResponse(token, response);

  • /frameworks/av/media/libstagefright/foundation/ALooper.cpp
// to be called by AMessage::postAndAwaitResponse only
status_t ALooper::awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response) {// return status in case we want to handle an interrupted waitMutex::Autolock autoLock(mRepliesLock);CHECK(replyToken != NULL);// retrieveReply() 方法返回false,则还没有获取到接收端的消息,则轮询while (!replyToken->retrieveReply(response)) {{Mutex::Autolock autoLock(mLock);if (mThread == NULL) {return -ENOENT;}}// 消息发送端条件变量wait等待唤醒(获取锁)mRepliesCondition.wait(mRepliesLock);}return OK;
}==========
/frameworks/av/media/libstagefright/foundation/include/media/stagefright/foundation/AMessage.hbool retrieveReply(sp<AMessage> *reply) {// 需要接收端去设置mReplied 为trueif (mReplied) {*reply = mReply;
// 清除这个 responsemReply.clear();}return mReplied;}

用于同步的对象Condition,为Android中特有。它的函数有:

 

看下 onMessageReceived 处理:

void MediaCodec::onMessageReceived(const sp<AMessage> &msg) {switch (msg->what()) {case kWhatSetCallback:{sp<AReplyToken> replyID;// 1. 消息接收端响应 senderAwaitsResponseCHECK(msg->senderAwaitsResponse(&replyID));if (mState == UNINITIALIZED|| mState == INITIALIZING|| isExecuting()) {// callback can't be set after codec is executing,// or before it's initialized (as the callback// will be cleared when it goes to INITIALIZED)PostReplyWithError(replyID, INVALID_OPERATION);break;}sp<AMessage> callback;CHECK(msg->findMessage("callback", &callback));mCallback = callback;if (mCallback != NULL) {ALOGI("MediaCodec will operate in async mode");mFlags |= kFlagIsAsync;} else {mFlags &= ~kFlagIsAsync;}sp<AMessage> response = new AMessage;// 2. 应答消息发送端消息response->postReply(replyID);break;}

// 1. 消息接收端响应 senderAwaitsResponse

msg->senderAwaitsResponse(&replyID)

bool AMessage::senderAwaitsResponse(sp<AReplyToken> *replyToken) {sp<RefBase> tmp;// 上述有 setObject,这里去获取 tmpbool found = findObject("replyID", &tmp);if (!found) {return false;}*replyToken = static_cast<AReplyToken *>(tmp.get());// 清除 replyTokentmp.clear();// 设置为空setObject("replyID", tmp);// TODO: delete Object instead of setting it to NULLreturn *replyToken != NULL;
}

// 2. 应答消息发送端消息

response->postReply(replyID)

status_t AMessage::postReply(const sp<AReplyToken> &replyToken) {if (replyToken == NULL) {ALOGW("failed to post reply to a NULL token");return -ENOENT;}sp<ALooper> looper = replyToken->getLooper();if (looper == NULL) {ALOGW("failed to post reply as target looper is gone.");return -ENOENT;}return looper->postReply(replyToken, this);
}
  • /frameworks/av/media/libstagefright/foundation/ALooper.cpp
status_t ALooper::postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &reply) {Mutex::Autolock autoLock(mRepliesLock);
// 设置replystatus_t err = replyToken->setReply(reply);if (err == OK) {// 若设置应答数据成功,则唤醒在该条件变量上正在等待【mRepliesLock】锁的其他线程mRepliesCondition.broadcast();}return err;
}--------------
/frameworks/av/media/libstagefright/foundation/AMessage.cppstatus_t AReplyToken::setReply(const sp<AMessage> &reply) {if (mReplied) {ALOGE("trying to post a duplicate reply");return -EBUSY;}CHECK(mReply == NULL);mReply = reply;// 这里设置了 mReplied  为truemReplied = true;return OK;
}

然后:status_t MediaCodec::setCallback 方法返回 OK

由此可知:

postAndAwaitResponse()和postReply()必须出成对出现的。其实这就是一个线程间异步通信的过程。

这篇关于【安卓R 源码】native层媒体模块通信AHandler机制源码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/667999

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

python: 多模块(.py)中全局变量的导入

文章目录 global关键字可变类型和不可变类型数据的内存地址单模块(单个py文件)的全局变量示例总结 多模块(多个py文件)的全局变量from x import x导入全局变量示例 import x导入全局变量示例 总结 global关键字 global 的作用范围是模块(.py)级别: 当你在一个模块(文件)中使用 global 声明变量时,这个变量只在该模块的全局命名空

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

安卓链接正常显示,ios#符被转义%23导致链接访问404

原因分析: url中含有特殊字符 中文未编码 都有可能导致URL转换失败,所以需要对url编码处理  如下: guard let allowUrl = webUrl.addingPercentEncoding(withAllowedCharacters: .urlQueryAllowed) else {return} 后面发现当url中有#号时,会被误伤转义为%23,导致链接无法访问

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

native和static native区别

本文基于Hello JNI  如有疑惑,请看之前几篇文章。 native 与 static native java中 public native String helloJni();public native static String helloJniStatic();1212 JNI中 JNIEXPORT jstring JNICALL Java_com_test_g

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、