(aiohttp-asyncio-FFmpeg-Docker-SRS)实现异步摄像头转码服务器

本文主要是介绍(aiohttp-asyncio-FFmpeg-Docker-SRS)实现异步摄像头转码服务器,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. 背景介绍

在先前的博客文章中,我们已经搭建了一个基于SRS的流媒体服务器。现在,我们希望通过Web接口来控制这个服务器的行为,特别是对于正在进行的 RTSP 转码任务的管理。这将使我们能够在不停止整个服务器的情况下,动态地启动或停止摄像头的转码过程。

Docker部署 SRS rtmp/flv流媒体服务器-CSDN博客文章浏览阅读360次,点赞7次,收藏5次。SRS(Simple Realtime Server)是一款开源的流媒体服务器,具有高性能、高可靠性、高灵活性的特点,能够支持直播、点播、转码等多种流媒体应用场景。SRS 不仅提供了流媒体服务器,还提供了适用于多种平台的客户端 SDK 和在线转码等辅助服务,是一款十分强大的流媒体解决方案。https://blog.csdn.net/m0_56659620/article/details/135400510?spm=1001.2014.3001.5501

2. 技术选择

在选择技术方案时,考虑到构建视频流转码服务的需求,我们将采用Python编程语言,并结合asyncio和aiohttp库。这一选择基于异步框架的优势,以下是对异步框架和同步框架在视频流转码场景中的优缺点的明确总结:

异步框架的优势:

  • 高并发处理: 异步框架通过非阻塞方式处理请求,能够高效处理大量并发请求,确保系统在高负载下保持稳定性。
  • 异步I/O: 支持异步I/O操作,允许在等待I/O操作完成的同时继续处理其他请求,提高整体效率。
  • 资源利用率高: 能够更有效地利用系统资源,同时处理多个请求,提高视频转码效率。
  • 事件驱动: 采用事件驱动模型,适应实时性要求高的视频流处理,能够立即响应新的转码请求。

同步框架的缺点:

  • 阻塞: 阻塞调用可能导致整个程序停滞,尤其在处理大文件或网络请求时可能引发性能问题,特别是在高并发场景下。
  • 低并发: 每个请求需要独立的线程或进程,可能导致系统资源耗尽,降低并发处理能力,对于需要同时处理多个视频流的情况可能不够高效。

考虑到处理大量并发请求、提高系统性能和响应性的需求,采用异步框架是更为合适的选择。异步框架的高并发处理能力、异步I/O支持、高资源利用率以及事件驱动的特性使其更适用于实时性要求较高的视频流转码服务。

3. 代码实现(必须在linux系统运行,4步骤为部署攻略)

3.1 导入必要的库

首先,我们导入所需的库,包括asyncio、aiohttp、aiohttp_cors和logging。

import asyncio
from aiohttp import web
import aiohttp_cors
import logging

3.2 设置日志

logging.basicConfig(level=logging.INFO)

3.3 配置并发控制和任务跟踪

设置最大同时运行的ffmpeg子进程数量,并使用Semaphore限制并发进程数量。同时,使用字典跟踪正在进行的转码任务。

MAX_CONCURRENT_PROCESSES = 5
semaphore = asyncio.Semaphore(MAX_CONCURRENT_PROCESSES)
transcoding_tasks = {}

3.4 定义启动和停止转码任务的方法

定义启动和停止 RTSP 转码任务的方法

# 开始转码方法
async def perform_transcoding(ip, camera_id, rtmp_server):# 检查相同RTSP是否已有子进程在处理if camera_id in transcoding_tasks:return transcoding_tasks[camera_id]# 使用Semaphore限制并发进程数量async with semaphore:# 实际的转码操作,这里需要调用ffmpeg或其他工具ffmpeg_command = ['ffmpeg','-rtsp_transport', 'tcp','-i', ip,'-c:v', 'libx264','-c:a', 'aac','-f', 'flv',f'{rtmp_server}/live/livestream{camera_id}']# 创建异步子进程process = await asyncio.create_subprocess_exec(*ffmpeg_command)# 将任务添加到字典中transcoding_tasks[camera_id] = process# 等待子进程完成await process.communicate()# 从字典中移除已完成的任务transcoding_tasks.pop(camera_id, None)# 停止转码方法
async def stop_transcoding(camera_id):# 停止转码任务if camera_id in transcoding_tasks:process = transcoding_tasks[camera_id]process.terminate()  # 发送终止信号await process.wait()  # 等待进程结束# 从字典中移除已停止的任务transcoding_tasks.pop(camera_id, None)

3.5 定义Web接口路由

定义Web接口路由,包括启动摄像头转码、停止摄像头转码和停止所有摄像头转码的路由。

# 开始转码任务
async def play_camera(request):data = await request.post()# 从表单数据中获取摄像头的ID和rtsp流camera_id = data.get('id')rtsp = data.get('ip')# 这里设置你的 RTMP 服务器地址rtmp_server = 'rtmp://192.168.14.93:1935'# 执行实际的转码操作task = await perform_transcoding(rtsp, camera_id, rtmp_server)# 返回包含转码后的RTMP URL的JSON响应rtmp_url = f'http://192.168.14.93:8080/live/livestream{camera_id}.flv'return web.json_response({'message': '转码启动成功', 'flv_data': rtmp_url})# 停止转码任务
async def stop_camera(request):data = await request.post()camera_id = data.get('id')# 停止指定摄像头的转码任务await stop_transcoding(camera_id)return web.json_response({'code':200,'message': '转码停止成功'})# 如果页面进行刷新或者关闭停止全部转码任务
async def stop_all_camera(request):# 获取所有正在运行的任务的列表tasks = [stop_transcoding(camera_id) for camera_id in transcoding_tasks.keys()]# 并发停止所有任务await asyncio.gather(*tasks)# 清空字典,表示所有任务都已停止transcoding_tasks.clear()return web.json_response({'code':200,'message': '转码停止成功'})

3.6 创建Web应用和配置CORS

创建Web应用,配置CORS(跨域资源共享)中间件,以确保接口可以被跨域访问。

app = web.Application()# CORS配置
cors = aiohttp_cors.setup(app, defaults={"*": aiohttp_cors.ResourceOptions(allow_credentials=True,expose_headers="*",allow_headers="*",)
})

3.7 添加Web接口路由

添加Web接口路由,包括启动摄像头转码、停止摄像头转码和停止所有摄像头转码的路由。

app.router.add_route('POST', '/play_camera', play_camera)  # 开始转码任务路由
app.router.add_route('POST', '/stop_camera', stop_camera)  # 停止转码任务路由
app.router.add_route('POST', '/stop_all_camera', stop_all_camera)  # 停止全部转码任务路由

3.8 添加CORS中间件

添加CORS中间件,确保接口可以被跨域访问。

# 添加 CORS 中间件
for route in list(app.router.routes()):cors.add(route)

3.9 运行Web应用

运行Web应用,监听指定的主机和端口。

if __name__ == '__main__':web.run_app(app, host='0.0.0.0', port=7000,access_log=logging.getLogger())

 3.10 完整代码

import asyncio
from aiohttp import web
import aiohttp_cors
import logging# 设置日志级别
logging.basicConfig(level=logging.INFO)# 最大同时运行的ffmpeg子进程数量
MAX_CONCURRENT_PROCESSES = 5# 使用Semaphore限制并发进程数量
semaphore = asyncio.Semaphore(MAX_CONCURRENT_PROCESSES)# 字典用于跟踪正在进行的转码任务
transcoding_tasks = {}# 开始转码方法
async def perform_transcoding(ip, camera_id, rtmp_server):# 检查相同RTSP是否已有子进程在处理if camera_id in transcoding_tasks:return transcoding_tasks[camera_id]# 使用Semaphore限制并发进程数量async with semaphore:# 实际的转码操作,这里需要调用ffmpeg或其他工具ffmpeg_command = ['ffmpeg','-rtsp_transport', 'tcp','-i', ip,'-c:v', 'libx264','-c:a', 'aac','-f', 'flv',f'{rtmp_server}/live/livestream{camera_id}']# 创建异步子进程process = await asyncio.create_subprocess_exec(*ffmpeg_command)# 将任务添加到字典中transcoding_tasks[camera_id] = process# 等待子进程完成await process.communicate()# 从字典中移除已完成的任务transcoding_tasks.pop(camera_id, None)# 停止转码方法
async def stop_transcoding(camera_id):# 停止转码任务if camera_id in transcoding_tasks:process = transcoding_tasks[camera_id]process.terminate()  # 发送终止信号await process.wait()  # 等待进程结束# 从字典中移除已停止的任务transcoding_tasks.pop(camera_id, None)# 开始转码任务
async def play_camera(request):data = await request.post()# 从表单数据中获取摄像头的ID和rtsp流camera_id = data.get('id')rtsp = data.get('ip')# 这里设置你的 RTMP 服务器地址rtmp_server = 'rtmp://192.168.14.93:1935'# 执行实际的转码操作task = await perform_transcoding(rtsp, camera_id, rtmp_server)# 返回包含转码后的RTMP URL的JSON响应rtmp_url = f'http://192.168.14.93:8080/live/livestream{camera_id}.flv'return web.json_response({'message': '转码启动成功', 'flv_data': rtmp_url})# 停止转码任务
async def stop_camera(request):data = await request.post()camera_id = data.get('id')# 停止指定摄像头的转码任务await stop_transcoding(camera_id)return web.json_response({'code':200,'message': '转码停止成功'})# 如果页面进行刷新或者关闭停止全部转码任务
async def stop_all_camera(request):# 获取所有正在运行的任务的列表tasks = [stop_transcoding(camera_id) for camera_id in transcoding_tasks.keys()]# 并发停止所有任务await asyncio.gather(*tasks)# 清空字典,表示所有任务都已停止transcoding_tasks.clear()return web.json_response({'code':200,'message': '转码停止成功'})app = web.Application()# CORS配置
cors = aiohttp_cors.setup(app, defaults={"*": aiohttp_cors.ResourceOptions(allow_credentials=True,expose_headers="*",allow_headers="*",)
})app.router.add_route('POST', '/play_camera', play_camera)  # 开始转码任务路由
app.router.add_route('POST', '/stop_camera', stop_camera)  # 停止转码任务路由
app.router.add_route('POST', '/stop_all_camera', stop_all_camera)  # 停止全部转码任务路由# 添加 CORS 中间件
for route in list(app.router.routes()):cors.add(route)if __name__ == '__main__':web.run_app(app, host='0.0.0.0', port=7000,access_log=logging.getLogger())

4. 部署(Docker环境)

部署所需Dockerfile文件代码如下

FROM python:3.7-slimWORKDIR /appCOPY requirements.txt .RUN apt-get update \&& apt-get install -y ffmpeg \&& rm -rf /var/lib/apt/lists/* \&& pip install --no-cache-dir -r requirements.txtCOPY . .CMD ["python", "async_io_io.py"]

部署所需requirements.txt如下

aiohttp
aiohttp-cors
ffmpeg

根目录进行打包及启动

请求接口实现转码

5. 总结

通过以上的步骤,我们成功构建了一个流媒体服务器控制接口,可以通过Web接口实现对摄像头的 RTSP 转码任务的动态管理。这个接口可以集成到现有的流媒体服务器中,提供更多控制和管理的可能性。

这篇关于(aiohttp-asyncio-FFmpeg-Docker-SRS)实现异步摄像头转码服务器的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/584414

相关文章

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

如何用Docker运行Django项目

本章教程,介绍如何用Docker创建一个Django,并运行能够访问。 一、拉取镜像 这里我们使用python3.11版本的docker镜像 docker pull python:3.11 二、运行容器 这里我们将容器内部的8080端口,映射到宿主机的80端口上。 docker run -itd --name python311 -p

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

让树莓派智能语音助手实现定时提醒功能

最初的时候是想直接在rasa 的chatbot上实现,因为rasa本身是带有remindschedule模块的。不过经过一番折腾后,忽然发现,chatbot上实现的定时,语音助手不一定会有响应。因为,我目前语音助手的代码设置了长时间无应答会结束对话,这样一来,chatbot定时提醒的触发就不会被语音助手获悉。那怎么让语音助手也具有定时提醒功能呢? 我最后选择的方法是用threading.Time

Android实现任意版本设置默认的锁屏壁纸和桌面壁纸(两张壁纸可不一致)

客户有些需求需要设置默认壁纸和锁屏壁纸  在默认情况下 这两个壁纸是相同的  如果需要默认的锁屏壁纸和桌面壁纸不一样 需要额外修改 Android13实现 替换默认桌面壁纸: 将图片文件替换frameworks/base/core/res/res/drawable-nodpi/default_wallpaper.*  (注意不能是bmp格式) 替换默认锁屏壁纸: 将图片资源放入vendo

C#实战|大乐透选号器[6]:实现实时显示已选择的红蓝球数量

哈喽,你好啊,我是雷工。 关于大乐透选号器在前面已经记录了5篇笔记,这是第6篇; 接下来实现实时显示当前选中红球数量,蓝球数量; 以下为练习笔记。 01 效果演示 当选择和取消选择红球或蓝球时,在对应的位置显示实时已选择的红球、蓝球的数量; 02 标签名称 分别设置Label标签名称为:lblRedCount、lblBlueCount

Kubernetes PodSecurityPolicy:PSP能实现的5种主要安全策略

Kubernetes PodSecurityPolicy:PSP能实现的5种主要安全策略 1. 特权模式限制2. 宿主机资源隔离3. 用户和组管理4. 权限提升控制5. SELinux配置 💖The Begin💖点点关注,收藏不迷路💖 Kubernetes的PodSecurityPolicy(PSP)是一个关键的安全特性,它在Pod创建之前实施安全策略,确保P

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、