srs集群下行edge处理逻辑

2024-02-22 17:28

本文主要是介绍srs集群下行edge处理逻辑,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

官方关于源站集群的介绍:

Origin Cluster | SRS

下行边缘是指观众端从边缘edge拉流,边缘edge回源到源站origin节点拉流,然后再

把流转给客户端

边缘处理类SrsPlayEdge

当服务器收到播放请求时,创建对应的consumer消费者。在创建消费者consumer时会判断当前服务器的类型,如果服务器是边缘edge,就通过play_edge进行处理。每一个SrsLiveSource都有一个对应的 SrsPlayEdge *play_edge,如果配置文件指定了remote才开启边缘逻辑。

srs_error_t SrsLiveSource::create_consumer(ISrsConnection* conn, SrsLiveConsumer*& consumer)
{srs_error_t err = srs_success;consumer = new SrsLiveConsumer(this, conn);consumers.push_back(consumer);if (conn != NULL) {conn->srsConsumer = consumer;}// There should be one consumer, so reset the timeout.stream_die_at_ = 0;publisher_idle_at_ = 0;//通过配置文件中的参数,判断是否是边缘服务器//如果是边缘服务器,则调用 play_edge进行拉流播放//SrsPlayEdge* play_edge;// for edge, when play edge stream, check the stateif (_srs_config->get_vhost_is_edge(req->vhost)) {// notice edge to start for the first client.if ((err = play_edge->on_client_play()) != srs_success) {return srs_error_wrap(err, "play edge");}}return err;
}

SrsPlayEdge会通过SrsEdgeIngester进行拉流

srs_error_t SrsPlayEdge::on_client_play()
{srs_error_t err = srs_success;//SrsEdgeIngester ingester 启动一个新的协程去源站拉流// start ingest when init state.if (state == SrsEdgeStateInit) {state = SrsEdgeStatePlay;err = ingester->start();} else if (state == SrsEdgeStateIngestStopping) {return srs_error_new(ERROR_RTMP_EDGE_PLAY_STATE, "state is stopping");}return err;
}

拉流类SrsEdgeIngester

SrsEdgeIngester会启动一个协程SrsSTCoroutine进行拉流处理 

srs_error_t SrsEdgeIngester::start()
{srs_error_t err = srs_success;if ((err = source->on_publish()) != srs_success) {return srs_error_wrap(err, "notify source");}srs_freep(trd);trd = new SrsSTCoroutine("edge-igs", this);if ((err = trd->start()) != srs_success) {return srs_error_wrap(err, "coroutine");}return err;
}

真正拉流类 SrsEdgeUpstream

协程会有一个while循环不停的去拉流,目前边缘回源拉流支持两种协议rtmp和flv,根据配置参数创建对应的拉流对象

srs_error_t SrsEdgeIngester::do_cycle()
{while (true) {if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "do cycle pull");}// Use protocol in config.string edge_protocol = _srs_config->get_vhost_edge_protocol(req->vhost);// If follow client protocol, change to protocol of client.bool follow_client = _srs_config->get_vhost_edge_follow_client(req->vhost);if (follow_client && !req->protocol.empty()) {edge_protocol = req->protocol;}// Create object by protocol.srs_freep(upstream);//根据边缘协议创建对应的拉流类if (edge_protocol == "flv" || edge_protocol == "flvs") {upstream = new SrsEdgeFlvUpstream(edge_protocol == "flv"? "http" : "https");} else {upstream = new SrsEdgeRtmpUpstream(redirect);}if ((err = source->on_source_id_changed(_srs_context->get_id())) != srs_success) {return srs_error_wrap(err, "on source id changed");}//边缘服务连接源站服务,一般源站会部署多个节点,边缘选取源站节点时也是通过RoundRobin算法选取//其中一个节点进行拉流//这里需要注意一点,如果负载到一台没有流的源站节点上怎么办?//其实如果发现连接的源站没有流,会触发302 redirect重连逻辑if ((err = upstream->connect(req, lb)) != srs_success) {return srs_error_wrap(err, "connect upstream");}if ((err = edge->on_ingest_play()) != srs_success) {return srs_error_wrap(err, "notify edge play");}// set to larger timeout to read av data from origin.upstream->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT);//拉流处理函数err = ingest(redirect);if (srs_is_client_gracefully_close(err)) {srs_warn("origin disconnected, retry, error %s", srs_error_desc(err).c_str());srs_error_reset(err);}break;}}

拉流源站没有流触发302

边缘服务通过负载均衡获取源站节点 ,然后去源站拉流,如果当前源站节点没有流,会触发320 redirect 重定向另一台。srs目前会重试三次,如果三次之后还是拉不到流,就认为失败了

srs_error_t SrsEdgeFlvUpstream::do_connect(SrsRequest* r, SrsLbRoundRobin* lb, int redirect_depth)
{//第一次连接源站节点时 redirect_depth = 0,通过lb->select负载均衡随机选择一台//如果连接的源站没有流,触发302,再连接另一台if (redirect_depth == 0) {SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);// @see https://github.com/ossrs/srs/issues/79// when origin is error, for instance, server is shutdown,// then user remove the vhost then reload, the conf is empty.if (!conf) {return srs_error_new(ERROR_EDGE_VHOST_REMOVED, "vhost %s removed", req->vhost.c_str());}// select the origin.std::string server = lb->select(conf->args);int port = SRS_DEFAULT_HTTP_PORT;if (schema_ == "https") {port = SRS_DEFAULT_HTTPS_PORT;}srs_parse_hostport(server, server, port);// Remember the current selected server.selected_ip = server;selected_port = port;} else {// If HTTP redirect, use the server in location.schema_ = req->schema;selected_ip = req->host;selected_port = req->port;}sdk_ = new SrsHttpClient();if ((err = sdk_->initialize(schema_, selected_ip, selected_port, cto)) != srs_success) {return srs_error_wrap(err, "edge pull %s failed, cto=%dms.", url.c_str(), srsu2msi(cto));}if ((err = sdk_->get(path, "", &hr_)) != srs_success) {return srs_error_wrap(err, "edge get %s failed, path=%s", url.c_str(), path.c_str());}if (hr_->status_code() == 404) {return srs_error_new(ERROR_RTMP_STREAM_NOT_FOUND, "Connect to %s, status=%d", url.c_str(), hr_->status_code());}if ((err = sdk_->get(path, "", &hr_)) != srs_success) {return srs_error_wrap(err, "edge get %s failed, path=%s", url.c_str(), path.c_str());}if (hr_->status_code() == 404) {return srs_error_new(ERROR_RTMP_STREAM_NOT_FOUND, "Connect to %s, status=%d", url.c_str(), hr_->status_code());}//如果状态码为302,开启重连另一台逻辑string location;if (hr_->status_code() == 302) {//获取302返回的地址location = hr_->header()->get("Location");}srs_trace("Edge: Connect to %s ok, status=%d, location=%s", url.c_str(), hr_->status_code(), location.c_str());if (hr_->status_code() == 302) {//最多重试三次if (redirect_depth >= 3) {return srs_error_new(ERROR_HTTP_302_INVALID, "redirect to %s fail, depth=%d", location.c_str(), redirect_depth);}string app;string stream_name;if (true) {string tcUrl;srs_parse_rtmp_url(location, tcUrl, stream_name);int port;string schema, host, vhost, param;srs_discovery_tc_url(tcUrl, schema, host, vhost, app, stream_name, port, param);r->schema = schema; r->host = host; r->port = port;r->app = app; r->stream = stream_name; r->param = param;}//重连return do_connect(r, lb, redirect_depth + 1);}
}

回源拉流的逻辑

边缘节点连接源站成功后,即找到有流的源站,然后就开始通过upstream进行拉流

srs_error_t SrsEdgeIngester::ingest(string& redirect)
{while (true) {if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "thread quit");}pprint->elapse();// pithy printif (pprint->can_print()) {upstream->kbps_sample(SRS_CONSTS_LOG_EDGE_PLAY, pprint->age());}// read from client.SrsCommonMessage* msg = NULL;//upstream拉流if ((err = upstream->recv_message(&msg)) != srs_success) {return srs_error_wrap(err, "recv message");}srs_assert(msg);SrsAutoFree(SrsCommonMessage, msg);//处理拉到的流if ((err = process_publish_message(msg, redirect)) != srs_success) {return srs_error_wrap(err, "process message");}}
}

处理拉到的流,拉到流后和普通单节点就一样了,把流转给 SrsLiveSource ,然后再转给对应的consumer

srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, string& redirect)
{srs_error_t err = srs_success;// process audio packetif (msg->header.is_audio()) {if ((err = source->on_audio(msg)) != srs_success) {return srs_error_wrap(err, "source consume audio");}}// process video packetif (msg->header.is_video()) {if ((err = source->on_video(msg)) != srs_success) {return srs_error_wrap(err, "source consume video");}}}

这篇关于srs集群下行edge处理逻辑的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/735978

相关文章

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

无人叉车3d激光slam多房间建图定位异常处理方案-墙体画线地图切分方案

墙体画线地图切分方案 针对问题:墙体两侧特征混淆误匹配,导致建图和定位偏差,表现为过门跳变、外月台走歪等 ·解决思路:预期的根治方案IGICP需要较长时间完成上线,先使用切分地图的工程化方案,即墙体两侧切分为不同地图,在某一侧只使用该侧地图进行定位 方案思路 切分原理:切分地图基于关键帧位置,而非点云。 理论基础:光照是直线的,一帧点云必定只能照射到墙的一侧,无法同时照到两侧实践考虑:关

HDFS—集群扩容及缩容

白名单:表示在白名单的主机IP地址可以,用来存储数据。 配置白名单步骤如下: 1)在NameNode节点的/opt/module/hadoop-3.1.4/etc/hadoop目录下分别创建whitelist 和blacklist文件 (1)创建白名单 [lytfly@hadoop102 hadoop]$ vim whitelist 在whitelist中添加如下主机名称,假如集群正常工作的节

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言

Thymeleaf:生成静态文件及异常处理java.lang.NoClassDefFoundError: ognl/PropertyAccessor

我们需要引入包: <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId></dependency><dependency><groupId>org.springframework</groupId><artifactId>sp

jenkins 插件执行shell命令时,提示“Command not found”处理方法

首先提示找不到“Command not found,可能我们第一反应是查看目标机器是否已支持该命令,不过如果相信能找到这里来的朋友估计遇到的跟我一样,其实目标机器是没有问题的通过一些远程工具执行shell命令是可以执行。奇怪的就是通过jenkinsSSH插件无法执行,经一番折腾各种搜索发现是jenkins没有加载/etc/profile导致。 【解决办法】: 需要在jenkins调用shell脚

CRtmpServer转推流到Nginx Rtmp及SRS(SimpleRtmpServer)的经历

转自:http://blog.csdn.net/fengyily/article/details/42557841 本人一直用的是CRtmpServer服务,在CRtmpServer服务中根据自已的想法也加入了许多功能,如通过http接口来加载配置等,苦于不支持HLS,自已添加ts分片水平又有限,思来想去决定借助SimpleRtmpServer的HLS功能。说干就干,马上查找相关资源

一种改进的red5集群方案的应用、基于Red5服务器集群负载均衡调度算法研究

转自: 一种改进的red5集群方案的应用: http://wenku.baidu.com/link?url=jYQ1wNwHVBqJ-5XCYq0PRligp6Y5q6BYXyISUsF56My8DP8dc9CZ4pZvpPz1abxJn8fojMrL0IyfmMHStpvkotqC1RWlRMGnzVL1X4IPOa_  基于Red5服务器集群负载均衡调度算法研究 http://ww