[源码解析] 机器学习参数服务器 Paracel (2)-----SSP实现

2024-03-20 12:40

本文主要是介绍[源码解析] 机器学习参数服务器 Paracel (2)-----SSP实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

[源码解析] 机器学习参数服务器 Paracel (2)-----SSP实现

0x00 摘要

Paracel是豆瓣开发的一个分布式计算框架,它基于参数服务器范式来解决机器学习的问题:逻辑回归、SVD、矩阵分解(BFGS,sgd,als,cg),LDA,Lasso…。

Paracel支持数据和模型的并行,为用户提供简单易用的通信接口,比mapreduce式的系统要更加灵活。Paracel同时支持异步的训练模式,使迭代问题收敛地更快。此外,Paracel程序的结构与串行程序十分相似,用户可以更加专注于算法本身,不需将精力过多放在分布式逻辑上。

因为 ps-lite 没有对 SSP 进行深入,而 Paracel 对 SSP的实现比较深入,所以我们本文就看看SSP如何实现

本系列其他文章是:

[ 源码解析] 机器学习参数服务器ps-lite (1) ----- PostOffice

[ 源码解析] 机器学习参数服务器ps-lite(2) ----- 通信模块Van

[ 源码解析] 机器学习参数服务器ps-lite 之(3) ----- 代理人Customer

[源码解析]机器学习参数服务器ps-lite(4) ----- 应用节点实现

[源码解析] 机器学习参数服务器 Paracel (1)-----总体架构

本文解析时候会删除部分非主体代码。

0x01 背景知识

不同的worker同时并行运算的时候,可能因为网络、机器配置等外界原因,导致不同的worker的进度是不一样的,如何控制worker的同步机制是一个比较重要的课题。

1.1 异步控制协议

许多机器学习问题可以转化为迭代任务。对于迭代控制,一般来说,有三个级别的异步控制协议:BSP(Bulk Synchronous Parallel),SSP(Stalness Synchronous Parallel)和ASP(Asynchronous Parallel),它们的同步限制依次放宽。为了追求更快的计算速度,算法可以选择更宽松的同步协议。

为了更好的说明以及行文完整,我们把ps-lite之中介绍过的段落再次拿出来。

这三个协议具体如下:

  • ASP:task之间完全不用相互等待,完全不顾worker之间的顺序,每个worker按照自己的节奏走,跑完一个迭代就update,先完成的task,继续下一轮的训练。

    • 优点:消除了等待慢task的时间,减少了GPU的空闲时间,因此与BSP相比提高了硬件效率。计算速度快,最大限度利用了集群的计算能力,所有的worker所在的机器都不用等待

    • 缺点:

      • 这个过程可能会导致梯度被计算过时的权重,从而降低统计效率。
      • 适用性差,在一些情况下并不能保证收敛性

      img

  • BSP:是一般分布式计算采用的同步协议,每一轮迭代中都需要等待所有的task计算完成。每个worker都必须在同一个迭代运行,只有一个迭代任务所有的worker都完成了,才会进行一次worker和server之间的同步和分片更新。

    • BSP的模式和单机串行因为仅仅是batch size的区别,所以在模型收敛性上是完全一样的。同时,因为每个worker在一个周期内是可以并行计算的,所以有了一定的并行能力。spark用的就是这种方式。

    • 优点:适用范围广;每一轮迭代收敛质量高

    • 缺点:每一轮迭代中,,BSP要求每个worker等待或暂停来自其他worker的梯度,这样就需要等待最慢的task,从而显著降低了硬件效率,导致整体任务计算时间长。整个worker group的性能由其中最慢的worker决定;这个worker一般称为straggler。

      bsp

  • SSP:允许一定程度的task进度不一致,但这个不一致有一个上限,称为staleness值,即最快的task最多领先最慢的task staleness轮迭代。

    • 就是把将ASP和BSP做一下折中。既然ASP是允许不同worker之间的迭代次数间隔任意大,而BSP则只允许为0,那我就取一个常数s。有了SSP,BSP就可以通过指定s=0而得到。而ASP同样可以通过制定s=∞来达到。

    • 优点:一定程度减少了task之间的等待时间,计算速度较快。

    • 缺点:每一轮迭代的收敛质量不如BSP,达到同样的收敛效果可能需要更多轮的迭代,适用性也不如BSP,部分算法不适用。

      ssp

1.2 Straggler 问题

传统的方法是使用BSP来完成迭代,这意味着我们必须在每个迭代器的末尾进行同步。这导致了straggler问题:由于一些软硬件的原因,节点的计算能力往往不尽相同。对于迭代问题来说,每一轮结束时算得快的节点都需等待算得慢的节点算完,再进行下一轮迭代。这种等待在节点数增多时将变得尤为明显,从而拖慢整体的性能。

有两种方法可以解决这个问题:

  • 首先,我们必须编写一些复杂的代码,使负载不平衡,这样我们可以使一个快速的worker训练更多的数据。
  • 其次,我们可以做一些异步控制来放松同步条件。

Paracel使用第二种方法,放宽了同步条件,即放宽了“每个迭代步都等待”这个约束:

假设最快的worker与最慢的worker之间的同步不超过一个有界参数,这是每次迭代的收敛性和总收敛时间之间的折衷。当在一轮迭代结束时,算得快的节点可以继续下一轮迭代,但不能比最慢的节点领先参数s个迭代步。当领先超过s个迭代步,Paracel才会强制进行等待。

这样异步的控制方式既从整体上省去了等待时间,也能间接地帮助慢的节点赶上。从优化问题的角度来看,虽然单迭代步收敛得慢了,然而每个迭代步的时间开销变少了,总体上收敛也就变快了。

这种做法就是Staleness Synchronous Parallel (SSP),基本思想是允许各机器以不同步调对模型进行更新,但是加一个限制,使得最快的机器的进度和最慢机器的进度之差不要太大。这样做的好处是:既减轻慢的机器拖整个系统的后腿,又能保证模型的最终收敛。

0x02 实现

我们首先回忆一下前文总结的架构。

img

2.1 ssp_switch

ssp_switch 用来控制是否使用 ssp。

我们以 include/ps.hpp 的 paracel_read 为例。

如果启用了 sup,则:

  • 如果时钟为0或者total_iters,说明是ssp启动 或者 时间间隔(迭代次数)到了,这时候需要重新获取对应数值,更新cache。
  • 如果命中缓存,则直接返回。
  • 如果Miss,则如果当前时钟已经大于某个数值 (stale_cache + limit_s < clock) ,则 while 循环等待。
    • 即,算得快的节点可以继续下一轮迭代,但不能比最慢的节点领先参数s个迭代步。当领先超过s个迭代步,Paracel会强制进行等待。所以使用 pull_int(paracel::str_type("server_clock") 来增加 server的时钟。回忆一下前面讲的 SSP 核心思想(允许一定程度的task进度不一致,但这个不一致有一个上限,称为staleness值,即最快的task最多领先最慢的task staleness轮迭代)。
    • server_clock 是专门用来SSP时钟协调的。“server_clock” 就是服务器时钟,worker 就是获取这个数值来看是否落后或者领先。
    • stale_cache 初始为0,每次强制等待的循环之中,会设置为 “server_clock” 传回的 数值。

其中缓存定义:

  paracel::dict_type<paracel::str_type, boost::any> cached_para;

具体代码如下:

  template <class V>bool paracel_read(const paracel::str_type & key,V & val,int replica_id = -1) {if(ssp_switch) {if(clock == 0 || clock == total_iters) { // check total_iters for last // 说明是ssp启动或者时间间隔(迭代次数)到了,这时候需要重新获取对应数值,更新cache。cached_para[key] = boost::any_cast<V>(ps_obj->kvm[ps_obj->p_ring->get_server(key)].pull<V>(key));val = boost::any_cast<V>(cached_para[key]);} else if(stale_cache + limit_s > clock) {// cache hit 如果命中缓存,则直接返回val = boost::any_cast<V>(cached_para[key]);} else {// cache miss// 如果Miss,如果当前时钟已经大于某个数值 ,则 while 循环等待// pull from server until leading slowest less than s clockswhile(stale_cache + limit_s < clock) {// 时间同步stale_cache = ps_obj->kvm[clock_server].pull_int(paracel::str_type("server_clock"));}// 获取key对应权重的最新数值cached_para[key] = boost::any_cast<V>(ps_obj->kvm[ps_obj->p_ring->get_server(key)].pull<V>(key));val = boost::any_cast<V>(cached_para[key]);}return true;}return ps_obj->kvm[ps_obj->p_ring->get_server(key)].pull(key, val); }

kvclt 之中有pull_int方法,就是与Clock server交互,进行时间同步:

  int pull_int(const paracel::str_type & key) {if(p_ssp_sock == nullptr) {p_ssp_sock.reset(create_req_sock(ports_lst[4]));}auto scrip = paste(paracel::str_type("pull_int"), key);int val = -1;bool r = req_send_recv(*p_ssp_sock, scrip, val);if(!r) ERROR_ABORT("key: pull_int does not exist");return val;}

2.2 thrd_exec_ssp

在 include/server.hpp之中,thrd_exec_ssp 是专门处理ssp的线程。

其用到的ssp_tbl 在 include/kv_def.hpp 之中。

namespace paracel {paracel::kvs<paracel::str_type, int> ssp_tbl; // 这里是ssp专用KV存储paracel::kvs<paracel::str_type, paracel::str_type> tbl_store;
}

以 pull_int 这个命令为例,就是从服务器拉取 “ssp专用KV存储” 对应的数据。

thrd_exec_ssp 具体代码如下:

// thread entry for ssp 
void thrd_exec_ssp(zmq::socket_t & sock) {paracel::packer<> pk;paracel::ssp_tbl.set("server_clock", 0);while(1) {zmq::message_t s;sock.recv(&s);auto scrip = paracel::str_type(static_cast<const char *>(s.data()), s.size());auto msg = paracel::str_split_by_word(scrip, paracel::seperator);auto indicator = pk.unpack(msg[0]);//std::cout << indicator << std::endl;if(indicator == "push_int") { // 推送数据auto key = pk.unpack(msg[1]);paracel::packer<int> pk_i;auto val = pk_i.unpack(msg[2]);paracel::ssp_tbl.set(key, val);bool result = true;rep_pack_send(sock, result);}if(indicator == "incr_int") { // 更改数据auto key = pk.unpack(msg[1]);if(paracel::startswith(key, "client_clock_")) {if(paracel::ssp_tbl.get(key)) {paracel::ssp_tbl.incr(key, 1);} else {paracel::ssp_tbl.set(key, 1);}if(paracel::ssp_tbl.get(key) >= paracel::ssp_tbl.get("worker_sz")) {paracel::ssp_tbl.incr("server_clock", 1);paracel::ssp_tbl.set(key, 0); }}paracel::packer<int> pk_i;int delta = pk_i.unpack(msg[2]);paracel::ssp_tbl.incr(key, delta);bool result = true;rep_pack_send(sock, result);}if(indicator == "pull_int") { // 拉取数据auto key = pk.unpack(msg[1]);int result = 0;auto exist = paracel::ssp_tbl.get(key, result); // 获取对应的keyif(!exist) {paracel::str_type tmp = "nokey";rep_send(sock, tmp);}rep_pack_send(sock, result);}} // while
}

逻辑如下(注意,因为篇幅所限,这里省略了上图部分变量,加入了新的变量与逻辑):

+------------------+                                worker         +          server
| paralg           |                                               |
|                  |                                               |
|                  |                                               |
|  parasrv *ps_obj |                                               |
|            +     |                                               |  +------------------+
|            |     |                                               |  | start_server     |
+------------------+                                               |  |                  ||                                                     |  |                  ||                                                     |  |                  |v                                                     |  |                  |
+------------+-----+         +------------------+     +---------+  |  |                  |
| parasrv          |         |kvclt             |     | kvclt   |  |  |                  |
|                  |         |                  |     |         |  |  |    thrd_exec     |
|                  |         |     host         |     |         |  |  |                  |
|         servers  |         |                  |     |         |  |  |    ssp_tbl       |
|                  |         |     ports_lst    |     |         |  |  |                  |
|         kvm +----------->  |                  |.....|         |  |  |    tbl_store     |
|                  |         |     context      |     |         |  |  |                  |
|         p_ring   |         |                  |     |         |  |  |    thrd_exec_ssp |
|            +     |         |     conn_prefix  |     |         |  |  |                  |
|            |     |         |                  |     |         |  |  |       ^          |
+------------------+         |     p_ssp_sock   |     |         |  |  |       |          ||               |           +      |     |         |  |  |       |          ||               |           |      |     |         |  |  |       |          ||               |           |      |     |         |  |  |       |          |v               |           |      |     |         |  |  |       |          |
+------------+------+        +------------------+     +---------+  |  |       |          |
| ring              |                    |                         |  +------------------+
|                   |                    |                         |          |
|                   |                    |                         |          |
|  srv_hashring     |                    |                         |          |
|                   |                    |                         |          |
|  srv_hashring_dct |                    +------------------------------------+
|                   |                                              |
+-------------------+                                              +

手机如下:

img

2.3 转换

用户只需添加几行代码即可将BSP进程转换为异步进程。比如一个非常简单的示例。

主要就是使用iter_commit() 在每次迭代结束之后,把本地更新结果提交到参数服务器。

class logistic_regression: public paracel::paralg {public:logistic_regression(paracel::Comm comm,std::string hosts_dct_str,std::string _output,int _rounds,int _limit_s,bool _ssp_switch) :paracel::paralg(hosts_dct_str,comm,_output,_rounds,_limit_s,_ssp_switch) {}void training() {theta = paracel::random_double_list(data_dim);paracel_write("theta", theta); // init pushfor(int iter = 0; iter < rounds; ++iter) {for(int i = 0; i < data_dim; ++i) {delta[i] = 0.;}random_shuffle(idx.begin(), idx.end());// pull thetatheta = paracel_read<vector<double> >("theta");for(auto sample_id : idx) {for(int i = 0; i < data_dim; ++i) {delta[i] += coff1 *samples[sample_id][i] - coff2 * theta[i];}} // traverse// update theta with deltaparacel_bupdate("theta",delta,"update.so","lg_theta_update");// commit to server at the end of each iterationiter_commit(); // 这里是添加的,在每次迭代结束之后,把本地更新结果提交到参数服务器}// last pulltheta = paracel_read<vector<double> >("theta");}void solve() {// init training dataauto parser = [](const std::vector<std::string>) {/* ... */};auto lines = paracel_load(input);parser(lines);paracel_sync();// set total iterations of your training processset_total_iters(rounds);// trainingtraining();}}; // class logistic regression

2.4 逻辑串联

前面每个部分我们其实都讲解得不透彻,需要在此串联起来。

我们假设有5个worker,limit_s 是 3,即最快的节点不能比最慢的节点领先参数 3 个迭代步。当领先超过 3 个迭代步,Paracel会强制进行等待。

2.4.1 初始化

在 paralg 构建函数中,会对各种数据进行初始化,这里重要的是服务器端 key “worker_sz” 对应的数值被设置为 worker_comm.get_size() ,就是worker 数值 5。

“worker_sz” 的意义是:目前应该有多少个服务器一起训练。

  paralg(paracel::str_type hosts_dct_str, paracel::Comm comm,paracel::str_type _output = "",int _rounds = 1,int _limit_s = 0,bool _ssp_switch = false) : worker_comm(comm),output(_output),nworker(comm.get_size()),rounds(_rounds),limit_s(_limit_s),ssp_switch(_ssp_switch) {ps_obj = new parasrv(hosts_dct_str);init_output(_output);clock = 0;stale_cache = 0;clock_server = 0;total_iters = rounds;if(worker_comm.get_rank() == 0) {paracel::str_type key = "worker_sz";(ps_obj->kvm[clock_server]).push_int(key, worker_comm.get_size());  // 设置为 5}paracel_sync();}
2.4.2 worker 端 iter_commit

在 iter_commit 之中,逻辑如下。

  • iter_commit 是每次迭代增加 本地 clock;
  • 如果 (clock == total_iters),说明本 worker 已经达到了总体迭代数值,就减少服务器 “worker_sz” 数值。即:本服务器已经跑完了训练,所以下面一起训练的服务器数目需要减少 1。
  // put where you want to control iter with sspvoid iter_commit() {paracel::str_type clock_key;if(limit_s == 0) {clock_key = "client_clock_0";} else {clock_key = "client_clock_" + std::to_string(clock % limit_s);}ps_obj->kvm[clock_server].incr_int(paracel::str_type(clock_key), 1); // value 1 is not importantclock += 1;if(clock == total_iters) { // 如果已经达到了总体迭代数值,就减少服务器 "worker_sz" 数值ps_obj->kvm[clock_server].incr_int(paracel::str_type("worker_sz"), -1);}}

kvclt 之中有如下代码,其实就是给服务器转发请求,所以我们可以略过:

  bool incr_int(const paracel::str_type & key,int delta) {if(p_ssp_sock == nullptr) {p_ssp_sock.reset(create_req_sock(ports_lst[4]));}auto scrip = paste(paracel::str_type("incr_int"),key,delta);bool stat;auto r = req_send_recv(*p_ssp_sock, scrip, stat);return r && stat;}int pull_int(const paracel::str_type & key) {if(p_ssp_sock == nullptr) {p_ssp_sock.reset(create_req_sock(ports_lst[4]));}auto scrip = paste(paracel::str_type("pull_int"), key);int val = -1;bool r = req_send_recv(*p_ssp_sock, scrip, val);assert(val != -1);assert(r);if(!r) ERROR_ABORT("key: pull_int does not exist");return val;}
2.4.3 服务端 incr_int

服务器收到了kvclt 转发的请求,处理举例如下:

在 thread_exec_ssp 中,incr_int 部分代码如下:

  • 如果 key 是 “client_clock_”,则
    • 把对应的key增加对应的数值,或者添加这个数值;
    • 如果 key 的数值大于"worker_sz"的数值,说明所有worker 都完成了一轮迭代,所以需要:
      • 把"server_clock"数值增加 1。“server_clock” 就是服务器时钟,worker 就是获取这个数值来看是否落后或者领先;
      • 把对应的 “client_clock_” 重置为 0,则说明需要考虑下次迭代了。
  • 对于其他key,则增加参数的数值;
    if(indicator == "incr_int") {auto key = pk.unpack(msg[1]);if(paracel::startswith(key, "client_clock_")) {if(paracel::ssp_tbl.get(key)) {paracel::ssp_tbl.incr(key, 1); // 把对应的key增加对应的数值} else {paracel::ssp_tbl.set(key, 1 // 添加这个数值}if(paracel::ssp_tbl.get(key) >= paracel::ssp_tbl.get("worker_sz")) //所有worker 都完成了一轮迭代paracel::ssp_tbl.incr("server_clock", 1); //服务器迭代增加1paracel::ssp_tbl.set(key, 0); //重置为 0,说明需要考虑下次迭代了,因为本次迭代中,所有client都完成了,下次迭代又要重新计算}}paracel::packer<int> pk_i;int delta = pk_i.unpack(msg[2]);paracel::ssp_tbl.incr(key, delta);bool result = true;rep_pack_send(sock, result);}
2.4.4 串联

把所有逻辑串联起来,名词解释如下:

  • client_clock_X ,表示本轮 虚拟迭代 之中的 实际迭代中,分别有几个 worker 运行完成, 0 <= X < limit_s 。
  • worker_sz 表示 目前应该有多少个服务器一起训练。
  • server_clock 就是服务器时钟,代表总体已经训练完成了几个迭代(实际迭代),worker 就是获取这个数值来看是否落后或者领先。

具体如下:

  • limit_s 是 3,即最快的节点不能比最慢的节点领先参数 3 个迭代步。当领先超过 3 个迭代步,Paracel会强制进行等待。因此,有两种迭代:

    • 大的迭代是虚拟迭代,包含 3 个小迭代步骤(limit s 数目)
    • 小的迭代就是实际迭代步,使用 client_clock_X 表示, clock_key_0 表示本轮 虚拟迭代 之中的 第一次实际迭代 中,分别有几个 worker 完成运行。
  • 在 worker 的 paralg 构建函数中,会对各种数据进行初始化,这里重要的是服务器端 key “worker_sz” 对应的数值被设置为 worker_comm.get_size() ,就是worker 数值 5。

    “worker_sz” 的意义是:目前应该有多少个服务器一起训练。

  • 在 worker 的 paracel_read 之中,一直用本地的 clock 与远端 “server_clock” 做比较,如果小于 limit_s 则强制本worker等待

  • 在worker 的 iter_commit 之中:

    • 增加 本地 clock 的数值
      • clock 从 0 开始递增,就是本地实际迭代的次数
      • 如果 (clock == total_iters),说明本 worker 本地训练已经达到了总体迭代数值,就减少服务器 “worker_sz” 数值。即:本服务器已经跑完了训练,所以下面一起训练的服务器数目需要减少 1;
    • 假如 limit_s 为3,则 clock_key 为 client_clock_0, client_clock_1, client_clock_2,根据本地 clock 的数值,给服务器 (clock % limit_s) 增加 1;clock_key_0 表示本轮 虚拟迭代 之中的 第一次实际迭代 中,分别有几个 worker 完成运行;
  • 递交 iter_commit 之后,在 server 之中:

    • 如果 key 是 “client_clock_”,则
      • 把对应的key增加对应的数值;
      • 如果 key 的数值大于"worker_sz"的数值,说明所有worker 都完成了一轮迭代,所以需要:
        • 把"server_clock"数值增加 1。“server_clock” 就是服务器时钟,worker 就是获取这个数值来看是否落后或者领先;
        • 把对应的 “client_clock_” 重置为 0,则说明需要考虑下次迭代了。
    • 对于其他key,则增加参数的数值;

我们可以看看逻辑图:

                                    worker 1  +  Server 1||
+-----------------------------------------+   | +-------------------------------------+
| paracel_read() {                        |   | |                                     |
|                                         |   | |auto key = pk.unpack(msg[1]);        |
|  while(stale_cache + limit_s < clock) { |   | |if(startswith(key, "client_clock_")){|
|    stale_cache = get("server_clock")    |   | |	if(ssp_tbl.get(key)) {              |
|  }                                      |   | |		incr(key, 1);                      |
| }                                       |   | |	} else {                            |
+-----------------------------------------+   | |		set(key, 1);                       || |	}                                   |
+---------------------------------------------+ |	if(get(key) >= get("worker_sz")) {  |worker 2     | |	 incr("server_clock", 1);           || | set(key, 0);                        |
+-----------------------------------------+   | |	}                                   |
| iter_commit() {                         |   | |}                                    |
|                                         |   | |ssp_tbl.incr(key, delta);            |
|   if(limit_s == 0) {                    |   | |                                     |
|     clock_key = "client_clock_0"        |   | +-------------------------------------+
|   } else {                              |   |
|     clock_key = "client_clock_" +       |   |
|                 (clock % limit_s)       |   |
|   }                                     |   |
|                                         |   |
|   incr_int(clock_key, 1);               |   |
|                                         |   |
|   clock += 1;                           |   |
|                                         |   |
|   if(clock == total_iters) {            |   |
|     incr_int("worker_sz"), +1);         |   |
|   }                                     |   |
|  }                                      |   |
| }                                       |   |
+-----------------------------------------+   +

手机如下:

img

我们也可以用图表展示下逻辑过程,其中:

  • client_clock_1 缩写为 c_c_1,表示本轮 虚拟迭代 之中的 实际迭代 中,分别有几个 worker 完成运行
  • worker_sz 缩写为 w_sz,表示 目前应该有多少个服务器一起训练。
  • server_clock 缩写为 s_c。“server_clock” 就是服务器时钟,代表总体已经训练完成了几个迭代(实际迭代),worker 就是获取这个数值来看是否落后或者领先。
  • 这几个变量都是服务器端的变量。

首先开始启动训练,表格中从上到下顺序执行。

第一个worker开始训练,实际训练两步,增加c_c_0,c_c_1

c_c_0c_c_1c_c_2w_szs_c说明
worker1115第一个worker开始训练,实际训练两步,增加c_c_0,c_c_1
worker2
worker3
worker4
worker5

第二个worker开始训练,实际训练两步,增加c_c_0,c_c_1

c_c_0c_c_1c_c_2w_szs_c说明
worker1115第一个worker开始训练,实际训练两步,增加c_c_0,c_c_1
worker2225第二个worker开始训练,实际训练两步,增加c_c_0,c_c_1
worker3
worker4
worker5

第三个worker开始训练,实际训练两步,增加c_c_0,c_c_1

c_c_0c_c_1c_c_2w_szs_c说明
worker1115第一个worker开始训练,实际训练两步,增加c_c_0,c_c_1
worker2225第二个worker开始训练,实际训练两步,增加c_c_0,c_c_1
worker3335第三个worker开始训练,实际训练两步,增加c_c_0,c_c_1
worker4
worker5

第四个worker开始训练,实际训练两步,增加c_c_0,c_c_1

c_c_0c_c_1c_c_2w_szs_c说明
worker1115第一个worker开始训练,实际训练两步,增加c_c_0,c_c_1
worker2225第二个worker开始训练,实际训练两步,增加c_c_0,c_c_1
worker3335第三个worker开始训练,实际训练两步,增加c_c_0,c_c_1
worker4445第四个worker开始训练,实际训练两步,增加c_c_0,c_c_1
worker5

第五个worker开始训练,实际训练一步,增加c_c_0,因为已经完成了一轮实际迭代,所以server_clock增加 1。

此时,worker 5 落后了一个迭代(server_clock = 1)。

c_c_0c_c_1c_c_2w_szs_c说明
worker1115第一个worker开始训练,实际训练两步,增加c_c_0,c_c_1
worker2225第二个worker开始训练,实际训练两步,增加c_c_0,c_c_1
worker3335第三个worker开始训练,实际训练两步,增加c_c_0,c_c_1
worker4445第四个worker开始训练,实际训练两步,增加c_c_0,c_c_1
worker55 --> 051第五个worker开始训练,实际训练一步,增加c_c_0,因为所有5个worker都已经完成了一轮实际迭代,所以server_clock增加 1,然后对应的 “client_clock_0” 重置为 0,则说明需要考虑下次迭代了。

下面看看特殊情况

首先,4个worker都运行完3步,但是worker 5没有运行,状况如下:

c_c_0c_c_1c_c_2w_szs_c说明
worker11115本轮第一个worker实际训练三步,增加c_c_0,c_c_1,c_c_2
worker22225本轮第二个worker实际训练三步,增加c_c_0,c_c_1,c_c_2
worker33335本轮第三个worker实际训练三步,增加c_c_0,c_c_1,c_c_2
worker44445本轮第四个worker实际训练三步,增加c_c_0,c_c_1,c_c_2
worker5

假设worker 5 的 iter_commit 之中,如果worker 5 发现自己 (clock == total_iters),说明本 worker 5 已经达到了总体迭代数值,就减少服务器 “worker_sz” 数值。即:本服务器已经跑完了训练,所以下面一起训练的服务器数目需要减少 1;

因为 worker 5 一下子完成 3步训练,所以 s_c 变成 3,即总体迭代次数为 3。

因为 本次虚拟迭代中,5 个worker都完成了训练,所以 c_c_1 ~ c_c_2 都先变成 5, 然后重置为 0。

c_c_0c_c_1c_c_2w_szs_c说明
worker11115本轮第一个worker实际训练三步,增加c_c_0,c_c_1,c_c_2
worker22225本轮第二个worker实际训练三步,增加c_c_0,c_c_1,c_c_2
worker33335本轮第三个worker实际训练三步,增加c_c_0,c_c_1,c_c_2
worker44445本轮第四个worker实际训练三步,增加c_c_0,c_c_1,c_c_2
worker55 --> 05 --> 05 --> 043本轮第五个worker训练完成,worker 5 又发现自己 (clock == total_iters),则"worker_sz" 数值减少1,以后只要看 4 个服务器即可,。

至此,SSP相关我们分析完毕,下文解析数据/模型加载。

0xEE 个人信息

★★★★★★关于生活和技术的思考★★★★★★

微信公众账号:罗西的思考

如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,敬请关注。

在这里插入图片描述

0xFF 参考

https://blog.csdn.net/weixin_47364682/article/details/119392575

这篇关于[源码解析] 机器学习参数服务器 Paracel (2)-----SSP实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/829482

相关文章

windos server2022里的DFS配置的实现

《windosserver2022里的DFS配置的实现》DFS是WindowsServer操作系统提供的一种功能,用于在多台服务器上集中管理共享文件夹和文件的分布式存储解决方案,本文就来介绍一下wi... 目录什么是DFS?优势:应用场景:DFS配置步骤什么是DFS?DFS指的是分布式文件系统(Distr

NFS实现多服务器文件的共享的方法步骤

《NFS实现多服务器文件的共享的方法步骤》NFS允许网络中的计算机之间共享资源,客户端可以透明地读写远端NFS服务器上的文件,本文就来介绍一下NFS实现多服务器文件的共享的方法步骤,感兴趣的可以了解一... 目录一、简介二、部署1、准备1、服务端和客户端:安装nfs-utils2、服务端:创建共享目录3、服

MySQL中时区参数time_zone解读

《MySQL中时区参数time_zone解读》MySQL时区参数time_zone用于控制系统函数和字段的DEFAULTCURRENT_TIMESTAMP属性,修改时区可能会影响timestamp类型... 目录前言1.时区参数影响2.如何设置3.字段类型选择总结前言mysql 时区参数 time_zon

C#使用yield关键字实现提升迭代性能与效率

《C#使用yield关键字实现提升迭代性能与效率》yield关键字在C#中简化了数据迭代的方式,实现了按需生成数据,自动维护迭代状态,本文主要来聊聊如何使用yield关键字实现提升迭代性能与效率,感兴... 目录前言传统迭代和yield迭代方式对比yield延迟加载按需获取数据yield break显式示迭

Python实现高效地读写大型文件

《Python实现高效地读写大型文件》Python如何读写的是大型文件,有没有什么方法来提高效率呢,这篇文章就来和大家聊聊如何在Python中高效地读写大型文件,需要的可以了解下... 目录一、逐行读取大型文件二、分块读取大型文件三、使用 mmap 模块进行内存映射文件操作(适用于大文件)四、使用 pand

python实现pdf转word和excel的示例代码

《python实现pdf转word和excel的示例代码》本文主要介绍了python实现pdf转word和excel的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价... 目录一、引言二、python编程1,PDF转Word2,PDF转Excel三、前端页面效果展示总结一

Python xmltodict实现简化XML数据处理

《Pythonxmltodict实现简化XML数据处理》Python社区为提供了xmltodict库,它专为简化XML与Python数据结构的转换而设计,本文主要来为大家介绍一下如何使用xmltod... 目录一、引言二、XMLtodict介绍设计理念适用场景三、功能参数与属性1、parse函数2、unpa

C#实现获得某个枚举的所有名称

《C#实现获得某个枚举的所有名称》这篇文章主要为大家详细介绍了C#如何实现获得某个枚举的所有名称,文中的示例代码讲解详细,具有一定的借鉴价值,有需要的小伙伴可以参考一下... C#中获得某个枚举的所有名称using System;using System.Collections.Generic;usi

Go语言实现将中文转化为拼音功能

《Go语言实现将中文转化为拼音功能》这篇文章主要为大家详细介绍了Go语言中如何实现将中文转化为拼音功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 有这么一个需求:新用户入职 创建一系列账号比较麻烦,打算通过接口传入姓名进行初始化。想把姓名转化成拼音。因为有些账号即需要中文也需要英

C# 读写ini文件操作实现

《C#读写ini文件操作实现》本文主要介绍了C#读写ini文件操作实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录一、INI文件结构二、读取INI文件中的数据在C#应用程序中,常将INI文件作为配置文件,用于存储应用程序的