Janus架构以及基本开发

2024-01-23 07:08
文章标签 开发 基本 架构 janus

本文主要是介绍Janus架构以及基本开发,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Janus架构以及基本开发

    • 1 前言
    • 2 模块结构
      • 2.1 媒体模块
      • 2.2 信令模块
      • 2.3 插件模块
        • 2.3.1 接口
        • 2.3.2 核心数据结构
    • 3 信令协议
      • 3.1 基本格式
      • 3.2 基本交互流程
    • 4 内部数据流
      • 4.1 接收
      • 4.1.1 信令
        • 4.1.1.1 注册Transport插件
        • 4.1.1.2 创建信令处理线程/线程池
        • 4.1.1.3 分发信令
      • 4.1.2 媒体
        • 4.1.2.1 创建媒体处理线程/线程池
        • 4.1.2.2 设置本地ICE数据回调
        • 4.1.2.3 传递RTP
      • 4.2 发送
        • 4.2.1 信令
        • 4.2.2 媒体
    • 5 开发

1 前言

以前在电信行业做视频会议一般都使用MCU(Multipoint Control Unit),也就是多方推流在MCU上进行合流,在拉流的时候只有一路合流,这样的好处是无论几方推流,拉流只有一路,下行带宽比较小。但是问题也比较多,只要推流方一多,MCU的压力就比较大,而且分布式的部署也比较难,成本又很高。

现在互联网行业比较流行的是SFU(Selective Forwarding Unit),简单说就是只负责转发流,不负责合流,压力很小。这样的模式可以依托CDN进行分布式的部署,不过拉流的方数限于客户端的带宽和处理能力。目前比较常见的支持WebRTC的SFU有Janus、Licode、Kurento、Jitsi等。

2 模块结构

本文介绍的Janus版本为0.6.0。

在这里插入图片描述
上图是Janus主要的模块结构,有一些通用工具模块这里没有列出。

2.1 媒体模块

Janus不是简单转发WebRTC的媒体流,还有一定的控制能力,因此需要支持WebRTC的媒体能力,其媒体功能包含以下基本模块:

  • ICE:打洞,负责与Peer的连通,Janus可以部署在NAT后面,使用了libnice;
  • DTLS:UDP版的TLS,就是加密的UDP,WebRTC用来传递SRTP的密钥,使用了OpenSSL/BoringSSL;
  • RTP/RTCP:提供RTP/RTCP封/解包的接口,需要发送一些WebRTC支持的RTCP包,例如FIR、PLI、RR等;
  • SRTP:加密的RTP,开启后WebRTC传输的RTP负载都是加密的;
  • SDP:提供SDP封/解包的接口,用于协商媒体的协议,可以用SDP对WebRTC的一些功能进行设定;
  • SCTP:WebRTC的数据通道使用的协议,就是加上了流控的UDP,可以传输任意数据。

2.2 信令模块

除了媒体协议,Janus还要提供信令交互的协议,传统的信令协议有SIP、XMPP等,Janus上的信令应用协议可以定制,底层主要的传输协议有

  • HTTP(s);
  • WebSocket(s);
  • MQTT;
  • NanoMsg;
  • RabbitMQ。

其中WebSocket使用了libwebsockets,HTTP使用了libmicrohttpd。

2.3 插件模块

2.3.1 接口

插件模块是Janus的应用层,可以基于媒体模块、信令模块等底层的接口开发不同的应用。每个插件都被编译成动态库,在Janus启动的时候动态加载。

Janus定义了插件的通用接口,每个插件都必须实现这些接口:

struct janus_plugin {//初始化插件int (* const init)(janus_callbacks *callback, const char *config_path);//销毁插件void (* const destroy)(void);//创建插件上的一个自定义会话void (* const create_session)(janus_plugin_session *handle, int *error);//销毁插件自定义会话void (* const destroy_session)(janus_plugin_session *handle, int *error);//处理信令struct janus_plugin_result * (* const handle_message)(janus_plugin_session *handle, char *transaction, json_t *message, json_t *jsep);//建立媒体,在ICE通道建立后被调用void (* const setup_media)(janus_plugin_session *handle);//处理rtp,可以在这里进行转发void (* const incoming_rtp)(janus_plugin_session *handle, int video, char *buf, int len);//处理rtcp,可以在这里进行转发void (* const incoming_rtcp)(janus_plugin_session *handle, int video, char *buf, int len);//处理数据通道的数据,可以在这里进行转发void (* const incoming_data)(janus_plugin_session *handle, char *buf, int len);//链路恶化通知,可以在这里进行转发void (* const slow_link)(janus_plugin_session *handle, int uplink, int video);//挂断void (* const hangup_media)(janus_plugin_session *handle);	
};

在初始化时插件时,Janus会传入插件一些回调,用于调用数据转发等相关功能:

struct janus_callbacks {//推送一个信令给客户端,可以携带sdpint (* const push_event)(janus_plugin_session *handle, janus_plugin *plugin, const char *transaction, json_t *message, json_t *jsep);//发送rtp给客户端void (* const relay_rtp)(janus_plugin_session *handle, int video, char *buf, int len);//发送rtcp给客户端void (* const relay_rtcp)(janus_plugin_session *handle, int video, char *buf, int len);//发送数据给客户端void (* const relay_data)(janus_plugin_session *handle, char *buf, int len);//关闭WebRTC PeerConnectionvoid (* const close_pc)(janus_plugin_session *handle);//关闭一个会话void (* const end_session)(janus_plugin_session *handle);
};
2.3.2 核心数据结构
  • janus_session:标识Janus上的一个会话;
typedef struct janus_session {guint64 session_id;      //会话IDGHashTable *ice_handles; //janus_ice_handle表……
}
  • janus_ice_handle:标识一个ICE会话,封装了ICE的传输信息,用于一个通道实际的数据收发,一个janus_session上可以有若干janus_ice_handle;
struct janus_ice_handle {void *session;            //从属的Janus会话guint64 handle_id;        //ICE会话句柄void *app;                //绑定的插件void *app_handle;         //插件自定义会话句柄janus_ice_stream *stream; //ICE流NiceAgent *agent;         //nice代理	……
}
  • janus_plugin_session:标识一个底层插件会话,用于绑定ICE会话和插件自定义会话;
struct janus_plugin_session {void *gateway_handle;   //ICE会话句柄void *plugin_handle;    //插件自定义会话句柄
};
  • 插件自定义会话:标识插件内的一个会话。
struct custom_plugin_session {janus_plugin_session *handle; //底层插件会话句柄
};

在这里插入图片描述

3 信令协议

3.1 基本格式

Janus使用JSON传输信令,典型的消息如下:

创建Janus会话命令:
{ "janus": "create", "transaction": "8u8NAbBdgVN1" 
}
响应:
{"janus": "success","transaction": "8u8NAbBdgVN1","data": {"id": 6282808472350684}
}

"janus"后面的字符串代表消息的类型,主要有:

  • create:创建一个Janus会话命令;
  • attach:attach一个插件到Janus会话命令;
  • success:一个命令的成功结果;
  • error:一个命令的失败结果;
  • ack:一个命令的ack,因为不能直接返回结果,先回ack,后续的结果通过event返回;
  • event:推送给客户端的异步事件,主要由插件发出,这些事件需要插件来自己定义;
  • message:客户端发给插件的消息,message和event就构成了应用协议;
  • trickle:客户端发送的candidate,会传递给ICE句柄;
  • keepalive:客户端发送的心跳;
  • webrtcup:ICE成功建立通道的通知;
  • media:音频、视频媒体的接收通知;
  • slowlink:链路恶化通知;
  • hangup:挂断通知;
  • detached:插件从Janus会话detach的通知,释放了一个插件句柄。

"transaction"字段为事务ID,事务的概念可以参考SIP协议,可以理解为一问一答这样一个交互的过程,用事务ID来标识。

3.2 基本交互流程

所有插件都遵循的基本数据流程:

  1. 客户端发送create创建一个Janus会话;
  2. Janus回复success返回Janus会话句柄;
  3. 客户端发送attach命令在Janus会话上attach指定插件;
  4. Janus回复success返回插件的句柄;
  5. 客户端给指定的插件发送message进行信令控制
  6. Janus上的插件发送event通知事件给客户端
  7. 客户端收集candidate并通过trickle消息发送给插件绑定的ICE通道;
  8. Janus发送webrtcup通知ICE通道建立;
  9. 客户端发送媒体数据;
  10. Janus发送media消息通知媒体数据的第一次到达;
  11. Janus进行媒体数据转发。

只有粗体部分是插件相关,其他是公用的部分。

4 内部数据流

4.1 接收

4.1.1 信令

4.1.1.1 注册Transport插件

在janus.c中

定义transport回调,所有的transport插件都是用同一个回调:
static janus_transport_callbacks janus_handler_transport = {.incoming_request = janus_transport_incoming_request,.transport_gone = janus_transport_gone,.is_api_secret_needed = janus_transport_is_api_secret_needed,.is_api_secret_valid = janus_transport_is_api_secret_valid,.is_auth_token_needed = janus_transport_is_auth_token_needed,.is_auth_token_valid = janus_transport_is_auth_token_valid,.events_is_enabled = janus_events_is_enabled,.notify_event = janus_transport_notify_event,
};

在main函数中,加载所有transport插件,并注册回调:

char transportpath[1024];
while((transportent = readdir(dir))) {void *transport = dlopen(transportpath, RTLD_NOW | RTLD_GLOBAL);janus_transport->init(&janus_handler_transport, configs_folder)……
}

这样无论底层是什么协议(WebSocket、HTTP),信令消息都会通过回调janus_transport_incoming_request得到。

4.1.1.2 创建信令处理线程/线程池

在main函数中:

//创建信令请求队列
requests = g_async_queue_new_full((GDestroyNotify) janus_request_destroy);
//创建信令处理/分发线程
GThread *requests_thread = g_thread_try_new("sessions requests", &janus_transport_requests, NULL, &error);
//创建异步信令(给插件的“message”消息)处理线程池
tasks = g_thread_pool_new(janus_transport_task, NULL, -1, FALSE, &error);
4.1.1.3 分发信令

在janus.c中:

void janus_transport_incoming_request(janus_transport *plugin, janus_transport_session *transport, void *request_id, gboolean admin, json_t *message, json_error_t *error) {//信令入队janus_request *request = janus_request_new(plugin, transport, request_id, admin, message);g_async_queue_push(requests, request);
}
static void *janus_transport_requests(void *data) {janus_request *request = NULL;while(!g_atomic_int_get(&stop)) {//从信令请求队列弹出一个信令request = g_async_queue_pop(requests); json_t *message = json_object_get(request->message, "janus");const gchar *message_text = json_string_value(message);//如果是给插件的“message”消息if(message_text && !strcasecmp(message_text, "message")) {//分发到tasks线程池处理GError *tperror = NULL;			g_thread_pool_push(tasks, request, &tperror); } else {//如果不是给插件的“message”消息,则自己处理janus_process_incoming_request(request); }}JANUS_LOG(LOG_INFO, "Leaving Janus requests handler thread\n");return NULL;
}

4.1.2 媒体

4.1.2.1 创建媒体处理线程/线程池

当前版本Janus提供两种线程模式处理媒体,由janus.jcfg的【general/event_loops】配置项决定:

  • 0:对每个媒体会话创建一个线程;
  • n:预先创建包含n个线程的线程池。
    在janus.c的main函数中:
	item = janus_config_get(config, config_general, janus_config_type_item, "event_loops");if(item && item->value)janus_ice_set_static_event_loops(atoi(item->value)); //创建包含event_loops个线程的线程池

在ice.c中:

void janus_ice_set_static_event_loops(int loops) {int i = 0;for(i=0; i<loops; i++) { //创建指定线程janus_ice_static_event_loop *loop = g_malloc0(sizeof(janus_ice_static_event_loop));loop->id = static_event_loops;loop->mainctx = g_main_context_new();loop->mainloop = g_main_loop_new(loop->mainctx, FALSE);GError *error = NULL;char tname[16];g_snprintf(tname, sizeof(tname), "hloop %d", loop->id);loop->thread = g_thread_try_new(tname, &janus_ice_static_event_loop_thread, loop, &error);event_loops = g_slist_append(event_loops, loop);static_event_loops++;}current_loop = event_loops;
}static void *janus_ice_static_event_loop_thread(void *data) {janus_ice_static_event_loop *loop = data;JANUS_LOG(LOG_DBG, "[loop#%d] Looping...\n", loop->id);g_main_loop_run(loop->mainloop); //在线程中运行loop。g_main_loop_unref(loop->mainloop);g_main_context_unref(loop->mainctx);return NULL;
}

在Janus收到“attach”命令后,调用以下函数处理:

gint janus_ice_handle_attach_plugin(void *core_session, janus_ice_handle *handle, janus_plugin *plugin) {if(static_event_loops == 0) {//如果event_loops配置项为0,创建一个新的loop。handle->mainctx = g_main_context_new();handle->mainloop = g_main_loop_new(handle->mainctx, FALSE); //绑定loop到handle} else {//如果使用线程池,则获取线程池中一个线程的loop。janus_refcount_increase(&handle->ref);janus_mutex_lock(&event_loops_mutex);janus_ice_static_event_loop *loop = (janus_ice_static_event_loop *)current_loop->data;handle->mainctx = loop->mainctx;handle->mainloop = loop->mainloop; //绑定loop到handlecurrent_loop = current_loop->next;if(current_loop == NULL)current_loop = event_loops;janus_mutex_unlock(&event_loops_mutex);}handle->rtp_source = janus_ice_outgoing_traffic_create(handle, (GDestroyNotify)g_free);g_source_set_priority(handle->rtp_source, G_PRIORITY_DEFAULT);g_source_attach(handle->rtp_source, handle->mainctx);if(static_event_loops == 0) {//如果event_loops配置项为0,创建一个新的线程。GError *terror = NULL;char tname[16];g_snprintf(tname, sizeof(tname), "hloop %"SCNu64, handle->handle_id);janus_refcount_increase(&handle->ref);handle->thread = g_thread_try_new(tname, &janus_ice_handle_thread, handle, &terror);}
}static void *janus_ice_handle_thread(void *data) {janus_ice_handle *handle = data;g_main_loop_run(handle->mainloop); //在新创建的线程中运行loop。return NULL;
}
4.1.2.2 设置本地ICE数据回调

在收到客户端的信令中携带的sdp(收到offer或者answer)后,Janus会调用以下方法启动ICE:

int janus_ice_setup_local(janus_ice_handle *handle, int offer, int audio, int video, int data, int trickle) {//设置libnice回调参数janus_ice_stream *stream = g_malloc0(sizeof(janus_ice_stream));stream->handle = handle;janus_ice_component *component = g_malloc0(sizeof(janus_ice_component));component->stream = stream;//设置libnice回调,传入component作为返回参数。nice_agent_attach_recv(handle->agent, handle->stream_id, 1, g_main_loop_get_context(handle->mainloop),janus_ice_cb_nice_recv, component);
}
4.1.2.3 传递RTP
static void janus_ice_cb_nice_recv(NiceAgent *agent, guint stream_id, guint component_id, guint len, gchar *buf, gpointer ice) {//得到传入的componentjanus_ice_component *component = (janus_ice_component *)ice;//获取插件指针janus_ice_stream *stream = component->stream;janus_ice_handle *handle = stream->handle;janus_plugin *plugin = (janus_plugin *)handle->app;if(plugin && plugin->incoming_rtp && handle->app_handle &&!g_atomic_int_get(&handle->app_handle->stopped) &&!g_atomic_int_get(&handle->destroyed))//传递rtp给插件plugin->incoming_rtp(handle->app_handle, video, buf, buflen); 
}

4.2 发送

4.2.1 信令

在2.3.1节中说到在插件初始化的时候传入一个janus_callbacks类型的回调,其回调方法push_event可以向客户端发送一个event:

int (*const push_event)(janus_plugin_session *handle, janus_plugin *plugin, const char *transaction, json_t *message, json_t *jsep);

参数:

  • handle:底层插件会话句柄,已经与插件自定义会话句柄绑定,任何时候都能够获取;
  • plugin:插件指针,每个插件维护的一个静态全局指针,任何时候都能够获取;
  • transaction:事务ID,如果是对之前客户端某个命令返回的事件,则应该携带该命令的transaction,否则可以自行指定;
  • message:事件的消息内容;
  • jsep:事件携带的sdp。
4.2.2 媒体

janus_callbacks回调的relay_rtp方法可以向客户端发送rtp:

void (*const relay_rtp)(janus_plugin_session *handle, int video, char *buf, int len);

参数:

  • handle:底层插件会话句柄,已经与插件自定义会话句柄绑定,任何时候都能够获取;
  • video:视频或者音频;
  • buf/len:RTP数据。

5 开发

开发方式不外乎以下两种:

  • 修改默认插件,找到功能比较接近的插件,然后在上面开发可以快速验证;
  • 添加新的插件,需要修改编译配置文件。

主要的开发工作是在插件上设计信令协议,以及RTP包的处理。

这篇关于Janus架构以及基本开发的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/635598

相关文章

mybatis的整体架构

mybatis的整体架构分为三层: 1.基础支持层 该层包括:数据源模块、事务管理模块、缓存模块、Binding模块、反射模块、类型转换模块、日志模块、资源加载模块、解析器模块 2.核心处理层 该层包括:配置解析、参数映射、SQL解析、SQL执行、结果集映射、插件 3.接口层 该层包括:SqlSession 基础支持层 该层保护mybatis的基础模块,它们为核心处理层提供了良好的支撑。

百度/小米/滴滴/京东,中台架构比较

小米中台建设实践 01 小米的三大中台建设:业务+数据+技术 业务中台--从业务说起 在中台建设中,需要规范化的服务接口、一致整合化的数据、容器化的技术组件以及弹性的基础设施。并结合业务情况,判定是否真的需要中台。 小米参考了业界优秀的案例包括移动中台、数据中台、业务中台、技术中台等,再结合其业务发展历程及业务现状,整理了中台架构的核心方法论,一是企业如何共享服务,二是如何为业务提供便利。

这15个Vue指令,让你的项目开发爽到爆

1. V-Hotkey 仓库地址: github.com/Dafrok/v-ho… Demo: 戳这里 https://dafrok.github.io/v-hotkey 安装: npm install --save v-hotkey 这个指令可以给组件绑定一个或多个快捷键。你想要通过按下 Escape 键后隐藏某个组件,按住 Control 和回车键再显示它吗?小菜一碟: <template

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

基本知识点

1、c++的输入加上ios::sync_with_stdio(false);  等价于 c的输入,读取速度会加快(但是在字符串的题里面和容易出现问题) 2、lower_bound()和upper_bound() iterator lower_bound( const key_type &key ): 返回一个迭代器,指向键值>= key的第一个元素。 iterator upper_bou

嵌入式QT开发:构建高效智能的嵌入式系统

摘要: 本文深入探讨了嵌入式 QT 相关的各个方面。从 QT 框架的基础架构和核心概念出发,详细阐述了其在嵌入式环境中的优势与特点。文中分析了嵌入式 QT 的开发环境搭建过程,包括交叉编译工具链的配置等关键步骤。进一步探讨了嵌入式 QT 的界面设计与开发,涵盖了从基本控件的使用到复杂界面布局的构建。同时也深入研究了信号与槽机制在嵌入式系统中的应用,以及嵌入式 QT 与硬件设备的交互,包括输入输出设

OpenHarmony鸿蒙开发( Beta5.0)无感配网详解

1、简介 无感配网是指在设备联网过程中无需输入热点相关账号信息,即可快速实现设备配网,是一种兼顾高效性、可靠性和安全性的配网方式。 2、配网原理 2.1 通信原理 手机和智能设备之间的信息传递,利用特有的NAN协议实现。利用手机和智能设备之间的WiFi 感知订阅、发布能力,实现了数字管家应用和设备之间的发现。在完成设备间的认证和响应后,即可发送相关配网数据。同时还支持与常规Sof

活用c4d官方开发文档查询代码

当你问AI助手比如豆包,如何用python禁止掉xpresso标签时候,它会提示到 这时候要用到两个东西。https://developers.maxon.net/论坛搜索和开发文档 比如这里我就在官方找到正确的id描述 然后我就把参数标签换过来

【IPV6从入门到起飞】5-1 IPV6+Home Assistant(搭建基本环境)

【IPV6从入门到起飞】5-1 IPV6+Home Assistant #搭建基本环境 1 背景2 docker下载 hass3 创建容器4 浏览器访问 hass5 手机APP远程访问hass6 更多玩法 1 背景 既然电脑可以IPV6入站,手机流量可以访问IPV6网络的服务,为什么不在电脑搭建Home Assistant(hass),来控制你的设备呢?@智能家居 @万物互联

Linux_kernel驱动开发11

一、改回nfs方式挂载根文件系统         在产品将要上线之前,需要制作不同类型格式的根文件系统         在产品研发阶段,我们还是需要使用nfs的方式挂载根文件系统         优点:可以直接在上位机中修改文件系统内容,延长EMMC的寿命         【1】重启上位机nfs服务         sudo service nfs-kernel-server resta