经过两个多月的攻关,终于搞定了live555多线程并稳定压测通过

2023-10-23 19:10

本文主要是介绍经过两个多月的攻关,终于搞定了live555多线程并稳定压测通过,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

live555已经发展了十几年了,不得不钦佩作者坚持不懈的奉献和国外的开源生态环境,live555可以说是大部分的安防从业者的入门之选,尤其是在嵌入式或者Linux系统上,其应用还是蛮广泛的,主要是其兼容性和稳定性;

live555多线程

但是随着live555十几年的不断迭代,很多开发者反复向作者Ross提到的多线程和IPv6的功能,作者也一直都没有去尝试,可能是这样会对live555的架构产生比较大的改动和影响,作者为了稳妥,选择了小改动、稳定、逐步迭代的方式, 虽然是性能稳定,但支持的路数有限,不能多线程工作始终是个坎; 网上找到几篇live555多线程的博客, 基本上大同小异,就是创建独立的UsageEnvironment和TaskSchedule, 由独立的线程分工协作; 本人也是这个思路,创建多个工作线程,每个工作线程内创建UsageEnvironment和TaskSchedule,然后各自开启EventLoop;

今天我们抛砖引玉,先大概聊一下主体思路,在后续的博客中将尽力完整地汇总这些思路和开发的过程:

目标

将live555修改为多线程, 每个通道对应一个工作线程,由工作线程对该通道进行独立处理;

大体修改点

修改支持多线程, 主要涉及到以下类的修改

  • GenericMediaServer
  • RTSPServer

GenericMediaServer.cpp
在GenericMediaServer的构造函数中, 创建工作线程个数,即最大支持的通道数;

GenericMediaServer
::GenericMediaServer(UsageEnvironment& env, int ourSocketV4, int ourSocketV6, Port ourPort,unsigned reclamationSeconds): Medium(env),fServerSocket4(ourSocketV4), fServerSocket6(ourSocketV6), fServerPort(ourPort), fReclamationSeconds(reclamationSeconds),fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)),fClientConnections(HashTable::create(ONE_WORD_HASH_KEYS)),fClientSessions(HashTable::create(STRING_HASH_KEYS)) {ignoreSigPipeOnSocket(fServerSocket4); // so that clients on the same host that are killed don't also kill usignoreSigPipeOnSocket(fServerSocket6); // so that clients on the same host that are killed don't also kill us#ifdef LIVE_MULTI_THREAD_ENABLEInitMutex(&mutexClientConnection);memset(&multiThreadCore, 0x00, sizeof(MultiThread_CORE_T));multiThreadCore.threadNum = MAX_DEFAULT_MULTI_THREAD_NUM;multiThreadCore.threadTask = new LIVE_THREAD_TASK_T[multiThreadCore.threadNum];memset(&multiThreadCore.threadTask[0], 0x00, sizeof(LIVE_THREAD_TASK_T) * multiThreadCore.threadNum);for (int i=0; i<multiThreadCore.threadNum; i++){char szName[36] = {0};sprintf(szName, "worker thread %d", i);multiThreadCore.threadTask[i].id = i;multiThreadCore.threadTask[i].extPtr = this;multiThreadCore.threadTask[i].pSubScheduler = BasicTaskScheduler::createNew();multiThreadCore.threadTask[i].pSubEnv = BasicUsageEnvironment::createNew(*multiThreadCore.threadTask[i].pSubScheduler, i+1, szName);CreateOSThread( &multiThreadCore.threadTask[i].osThread, __OSThread_Proc, (void *)&multiThreadCore.threadTask[i] );}
#endif// Arrange to handle connections from others:env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket4, incomingConnectionHandler4, this);env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket6, incomingConnectionHandler6, this);
} 

接受客户端连接

按原有流程接受客户端连接;

分配客户端请求

在收到客户端发送的DESCRIBE命令后,
在通道列表中找出空闲的通道,将该客户端关联到该通道, 然后从主线程中移除该socket, 由工作线程接管该socket的操作;
后续有客户端如访问已经存在的通道,则主线程会将该请求直接分配给对应的工作线程处理;

注意: 主线程的工作到此结束,不要执行lookupServerMediaSession的操作;

在工作线程中, 接管客户端的socket后, 马上执行lookupServerMediaSession, 在该函数中,将后缀回调给上层调用程序, 由上层调用程序判断是否存在该通道,如不存在则返回失败,如存在则向前端取流,然后填充媒体信息返回成功, 库内部则创建相应的mediasession, 再回应客户端, 后续的则完成整个rtsp流程的交互;

注意: 创建MediaSession时,必须将工作线程中的UsageEnvironment传进去, 不能使用主线程中的envir();

int RTSPServer::RTSPClientConnection
::handleCmd_DESCRIBE(UsageEnvironment *pEnv, char const* urlPreSuffix, char const* urlSuffix, char const* fullRequestStr, LIVE_THREAD_TASK_T **pThreadTask) 
{int handleCmdRet = 0;ServerMediaSession* session = NULL;char* sdpDescription = NULL;char* rtspURL = NULL;do {char urlTotalSuffix[2*RTSP_PARAM_STRING_MAX];// enough space for urlPreSuffix/urlSuffix'\0'urlTotalSuffix[0] = '\0';if (urlPreSuffix[0] != '\0') {strcat(urlTotalSuffix, urlPreSuffix);strcat(urlTotalSuffix, "/");}strcat(urlTotalSuffix, urlSuffix);if (!authenticationOK("DESCRIBE", urlTotalSuffix, fullRequestStr)) break;// We should really check that the request contains an "Accept:" #####// for "application/sdp", because that's what we're sending back #####_TRACE(TRACE_LOG_DEBUG, "handleCmd_DESCRIBE  socket[%d]\n", this->fOurSocket);#ifdef LIVE_MULTI_THREAD_ENABLE//如果当前是主线程,则进入到查找通道流程if (pEnv->GetEnvirId() == 1000){fOurServer.LockClientConnection();  //LockUsageEnvironment  *pChEnv = fOurServer.GetEnvBySuffix(urlSuffix, this, pThreadTask);if (NULL == pChEnv){fOurServer.UnlockClientConnection();        //UnlockhandleCmdRet = -1;this->assignSink = False;this->pEnv = NULL;handleCmd_notFound();break;}else{_TRACE(TRACE_LOG_DEBUG, "将socket[%d] 关联到[%s]\n", this->fOurSocket, pChEnv->GetEnvirName());//将socket从主线程移到工作线程中UsageEnvironment  *pMainEnv = &envir();envir().taskScheduler().disableBackgroundHandling(fOurSocket);fOurServer.UnlockClientConnection();        //Unlockreturn 1000;}break;}#endif// Begin by looking up the "ServerMediaSession" object for the specified "urlTotalSuffix"://在工作线程中执行 lookupServerMediaSessionsession = fOurServer.lookupServerMediaSession(pEnv, 1, this, urlTotalSuffix);if (session == NULL) {//pChEnv->taskScheduler().disableBackgroundHandling(fOurSocket);_TRACE(TRACE_LOG_DEBUG, "socket[%d] 在[%s]中, 源未就绪[%s]\n", this->fOurSocket, pEnv->GetEnvirName(), urlTotalSuffix);this->assignSink = False;this->pEnv = NULL;handleCmdRet = -1;//envir().taskScheduler().disableBackgroundHandling(fOurSocket);//fOurServer.ResetEnvBySuffix(urlSuffix, this);handleCmd_notFound();break;}session->incrementReferenceCount();// Then, assemble a SDP description for this session:sdpDescription = session->generateSDPDescription(fOurIPVer);if (sdpDescription == NULL) {// This usually means that a file name that was specified for a// "ServerMediaSubsession" does not exist.setRTSPResponse("404 File Not Found, Or In Incorrect Format");break;}unsigned sdpDescriptionSize = strlen(sdpDescription);// Also, generate our RTSP URL, for the "Content-Base:" header// (which is necessary to ensure that the correct URL gets used in subsequent "SETUP" requests).rtspURL = fOurRTSPServer.rtspURL(session, fOurIPVer, fClientInputSocket);snprintf((char*)fResponseBuffer, sizeof fResponseBuffer,"RTSP/1.0 200 OK\r\nCSeq: %s\r\n""%s""Content-Base: %s/\r\n""Content-Type: application/sdp\r\n""Content-Length: %d\r\n\r\n""%s",fCurrentCSeq,dateHeader(),rtspURL,sdpDescriptionSize,sdpDescription);} while (0);if (session != NULL) {// Decrement its reference count, now that we're done using it:session->decrementReferenceCount();if (session->referenceCount() == 0 && session->deleteWhenUnreferenced()) {fOurServer.removeServerMediaSession(pEnv, session, True);}session->SetStreamStatus(1);        //置标志,让后续访问该通道的客户端可以得到迅速响应}delete[] sdpDescription;delete[] rtspURL;return handleCmdRet;
}

历经2个多月,终于将多线程问题搞定. 在此记录一下, 欢迎探讨;

目前用户测试反馈的情况是:

  • 连续压测8天时间;
  • 接入摄像机30台;
  • 客户端反复启动/停止播放;
  • 内存、CPU、程序均非常稳定!

live555技术交流

邮件:289042893@qq.com

live555技术交流群:475947825

这篇关于经过两个多月的攻关,终于搞定了live555多线程并稳定压测通过的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/269886

相关文章

Springboot的ThreadPoolTaskScheduler线程池轻松搞定15分钟不操作自动取消订单

《Springboot的ThreadPoolTaskScheduler线程池轻松搞定15分钟不操作自动取消订单》:本文主要介绍Springboot的ThreadPoolTaskScheduler线... 目录ThreadPoolTaskScheduler线程池实现15分钟不操作自动取消订单概要1,创建订单后

锐捷和腾达哪个好? 两个品牌路由器对比分析

《锐捷和腾达哪个好?两个品牌路由器对比分析》在选择路由器时,Tenda和锐捷都是备受关注的品牌,各自有独特的产品特点和市场定位,选择哪个品牌的路由器更合适,实际上取决于你的具体需求和使用场景,我们从... 在选购路由器时,锐捷和腾达都是市场上备受关注的品牌,但它们的定位和特点却有所不同。锐捷更偏向企业级和专

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

pip-tools:打造可重复、可控的 Python 开发环境,解决依赖关系,让代码更稳定

在 Python 开发中,管理依赖关系是一项繁琐且容易出错的任务。手动更新依赖版本、处理冲突、确保一致性等等,都可能让开发者感到头疼。而 pip-tools 为开发者提供了一套稳定可靠的解决方案。 什么是 pip-tools? pip-tools 是一组命令行工具,旨在简化 Python 依赖关系的管理,确保项目环境的稳定性和可重复性。它主要包含两个核心工具:pip-compile 和 pip

跨系统环境下LabVIEW程序稳定运行

在LabVIEW开发中,不同电脑的配置和操作系统(如Win11与Win7)可能对程序的稳定运行产生影响。为了确保程序在不同平台上都能正常且稳定运行,需要从兼容性、驱动、以及性能优化等多个方面入手。本文将详细介绍如何在不同系统环境下,使LabVIEW开发的程序保持稳定运行的有效策略。 LabVIEW版本兼容性 LabVIEW各版本对不同操作系统的支持存在差异。因此,在开发程序时,尽量使用

两个月冲刺软考——访问位与修改位的题型(淘汰哪一页);内聚的类型;关于码制的知识点;地址映射的相关内容

1.访问位与修改位的题型(淘汰哪一页) 访问位:为1时表示在内存期间被访问过,为0时表示未被访问;修改位:为1时表示该页面自从被装入内存后被修改过,为0时表示未修改过。 置换页面时,最先置换访问位和修改位为00的,其次是01(没被访问但被修改过)的,之后是10(被访问了但没被修改过),最后是11。 2.内聚的类型 功能内聚:完成一个单一功能,各个部分协同工作,缺一不可。 顺序内聚:

多线程解析报表

假如有这样一个需求,当我们需要解析一个Excel里多个sheet的数据时,可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成。 Way1 join import java.time.LocalTime;public class Main {public static void main(String[] args) thro

Java 多线程概述

多线程技术概述   1.线程与进程 进程:内存中运行的应用程序,每个进程都拥有一个独立的内存空间。线程:是进程中的一个执行路径,共享一个内存空间,线程之间可以自由切换、并发执行,一个进程最少有一个线程,线程实际数是在进程基础之上的进一步划分,一个进程启动之后,进程之中的若干执行路径又可以划分成若干个线程 2.线程的调度 分时调度:所有线程轮流使用CPU的使用权,平均分配时间抢占式调度

Java 多线程的基本方式

Java 多线程的基本方式 基础实现两种方式: 通过实现Callable 接口方式(可得到返回值):

分享5款免费录屏的工具,搞定网课不怕错过!

虽然现在学生们不怎么上网课, 但是对于上班族或者是没有办法到学校参加课程的人来说,网课还是很重要的,今天,我就来跟大家分享一下我用过的几款录屏软件=,看看它们在录制网课时的表现如何。 福昕录屏大师 网址:https://www.foxitsoftware.cn/REC/ 这款软件给我的第一印象就是界面简洁,操作起来很直观。它支持全屏录制,也支持区域录制,这对于我这种需要同时录制PPT和老师讲