Sylar C++高性能服务器学习记录09 【协程调度模块-知识储备篇】

本文主要是介绍Sylar C++高性能服务器学习记录09 【协程调度模块-知识储备篇】,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

早在19年5月就在某站上看到sylar的视频了,一直认为这是一个非常不错的视频,由于本人一直是自学编程,基础不扎实,也没有任何人的督促,没能坚持下去,每每想起倍感惋惜。恰逢互联网寒冬,在家无事,遂提笔再续前缘。

为了能更好的看懂sylar,本套笔记会分两步走,每个系统都会分为两篇博客。
分别是【知识储备篇】和【代码分析篇】
(ps:纯粹做笔记的形式给自己记录下,欢迎大家评论,不足之处请多多赐教)
QQ交流群:957100923


协程调度模块-代码分析篇

一、Schedule调度器是什么【线程级别的调度器】

我的博客已经停更一周了,并不是 五一 假期出去游玩导致的。
原因很简单:就是因为 sylar 视频中协程调度相关的知识过于复杂,导致自己反复观看了数遍才勉强理解一二。
要搞懂协程调度模块一上来就看sylar的视频或代码未免门槛太高了。
我总结了一下这部分知识为什么难以下咽

  1. 知识点过多,将调度器、线程、协程糅合在一起讲解,原本基础就不扎实,这样一锅乱炖容易糊掉。
  2. 想法没有提前告知,不知道要做什么,在做什么,雾里看花。
  3. 弹幕的误导,sylar-yin 确实会有说错写错的地方,如果你不懂,那么请你百分百遵循他的写法。
  4. 不够坚持,不要放过每一个你不熟悉的知识点,请暂停视频,把不会的知识点了解了再往下(不要过分深入)。

本人属于不太聪明的类型,所以如果你和我一样,那么请稳扎稳打,别人学一天我们就学三天,别人学一个月,我们就学一年。


1.最普通的多线程使用

首先抛开协程,我们想要使用多线程来完成多个任务需要怎么做呢?
下面我举个例子(该案例中会开启三个子线程来分别执行 唱歌、跳舞、说唱):

#include <chrono>
#include <iostream>
#include <pthread.h>
#include <vector>
#include <list>
#include <unistd.h>
#include <functional>
#include "sylar/thread.h"
#include "sylar/mutex.h"//测试用的全局累加计数字段
int num = 0;
//循环累加的次数
uint64_t loop_times = 100000000;typedef sylar::Mutex MutexType;
MutexType g_mutx;//唱歌的方法,会专门分配一个线程来处理
void sing(){MutexType::Lock lock(g_mutx);for(size_t i = 0; i < loop_times; ++i){++num;}std::cout << sylar::Thread::GetName() << " sing~ " << num << std::endl;
}//跳舞的方法,会专门分配一个线程来处理
void dance(){MutexType::Lock lock(g_mutx);for(size_t i = 0; i < loop_times; ++i){++num;}std::cout <<sylar::Thread::GetName() << " dance~ " << num << std::endl;   
}//说唱的方法,会专门分配一个线程来处理
void rap(){MutexType::Lock lock(g_mutx);for(size_t i = 0; i < loop_times; ++i){++num;}std::cout <<sylar::Thread::GetName() << " rap~ " << num << std::endl;   
}int main(){std::cout << "====Main start====" << std::endl;{//记录当前时间auto start = std::chrono::high_resolution_clock::now();//创建线程用于执行唱歌的方法sylar::Thread::ptr thr1(new sylar::Thread(&sing,"THREAD_1"));//创建线程用于执行跳舞的方法sylar::Thread::ptr thr2(new sylar::Thread(&dance,"THREAD_2"));//创建线程用于执行说唱的方法sylar::Thread::ptr thr3(new sylar::Thread(&rap,"THREAD_3"));thr1->join();thr2->join();thr3->join();//记录结束时间auto end = std::chrono::high_resolution_clock::now();//计算花费的时间std::chrono::duration<double,std::milli> elapsed = end - start;std::cout << "Elapsed: " << (int)elapsed.count() << std::endl;}std::cout << "====Main end====" << std::endl;return 0;
}

以下是输出(可以看到三个线程按顺序执行了):

===Main start===
THREAD_1 sing~ 100000000
THREAD_2 dance~ 200000000
THREAD_3 rap~ 300000000
Elapsed: 294
===Main end====

2.复用多线程

上述的代码中,我们有三个任务(唱歌、跳舞、说唱),同时我们开辟了三个线程来执行。
但是很多情况下我们的线程数量是需要控制在适当范围内的。

最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目CPU核数为4核,一个任务线程cpu耗时为20ms,线程等待(网络IO、磁盘IO)耗时80ms,
那最佳线程数目:( 80 + 20 )/20 * 4 = 20。也就是设置20个线程数最佳。线程的等待时间越大,线程数就要设置越大
1、CPU密集型:操作内存处理的业务,一般线程数设置为:CPU核数 + 1 或者 CPU核数*2。核数为4的话,一般设置 5 或 8
2、IO密集型:文件操作,网络操作,数据库操作,一般线程设置为:cpu核数 / (1-0.9),核数为4的话,一般设置 40

我的机器是1核的,我这里其实适合使用两个线程,但是我在上述代码中使用了三个线程,那么多线程反而变成了低效的。
所以我们可以复用其中一个线程,在该线程执行完成任务后再执行其他的任务。
代码如下(改动很少,改动部分用 !!!标注了):

int main(){std::cout << "====Main start====" << std::endl;{//记录当前时间auto start = std::chrono::high_resolution_clock::now();//创建线程用于执行唱歌的方法sylar::Thread::ptr thr1(new sylar::Thread(&sing,"THREAD_1"));//创建线程用于执行跳舞的方法sylar::Thread::ptr thr2(new sylar::Thread(&dance,"THREAD_2"));thr1->join();thr2->join();//!!!复用线程用于执行说唱的方法!!!thr1.reset(new sylar::Thread(&rap,"THREAD_1"));thr1->join();//记录结束时间auto end = std::chrono::high_resolution_clock::now();//计算花费的时间std::chrono::duration<double,std::milli> elapsed = end - start;std::cout << "Elapsed: " << (int)elapsed.count() << std::endl;}std::cout << "====Main end====" << std::endl;return 0;
}

以下是输出:

===Main start====
THREAD_1 sing~ 100000000
THREAD_2 dance~ 200000000
THREAD_1 rap~ 300000000
Elapsed: 265
==Main end====

问题:
问题来了,以上的两份代码都存在以下问题:
1.就是需要程序员自己来控制线程的创建。
2.需要程序员手动来进行线程的调度。
3.需要程序员手动的去复用一个线程对象。
4.需要程序员手动的join对应的每一个线程。
这样是很繁琐的事情,而且一不小心就会出错,那么我们有什么好的办法吗?


3.调度器的出现

想要解决以上代码带来的问题,我们就要提出需求:
1.将创建线程的事情交由【调度器】执行,我们只需创建调度器时指定线程数量即可。
2.将线程调度和复用线程的事情交由【调度器】执行,我们只需提供所需执行的 任务 即可。
3.等待各个线程完成的jion()方法也由【调度器】执行,我们只需等待调度器结束即可。

综上所述,我们给出以下代码(记得先看main方法中的调度器使用,再看实现):

#include <chrono>
#include <iostream>
#include <pthread.h>
#include <vector>
#include <list>
#include <unistd.h>
#include <functional>
#include "sylar/thread.h"
#include "sylar/mutex.h"//测试用的全局累加计数字段
int num = 0;
//循环累加的次数
uint64_t loop_times = 100000000;//唱歌的方法,会专门分配一个线程来处理
void sing(){for(size_t i = 0; i < loop_times; ++i){++num;}std::cout << sylar::Thread::GetName() << " sing~ " << num << std::endl;
}//跳舞的方法,会专门分配一个线程来处理
void dance(){for(size_t i = 0; i < loop_times; ++i){++num;}std::cout <<sylar::Thread::GetName() << " dance~ " << num << std::endl;   
}//说唱的方法,会专门分配一个线程来处理
void rap(){for(size_t i = 0; i < loop_times; ++i){++num;}std::cout <<sylar::Thread::GetName() << " rap~ " << num << std::endl;   
}//调度器
class Schedule {
public://定义锁,使用typedef方便锁类型替换typedef sylar::Mutex MutexType;//调度器构造函数,指定线程池的大小Schedule(size_t size = 1):m_poll_size(size){std::cout << "Specify a thread poll size of "<< size << std::endl;}//调度器析构函数~Schedule(){}//调度方法void run(){//多个线程操作任务队列,涉及到线程安全,所以要加锁MutexType::Lock lock(m_mutex);//定义一个任务对象来存放后续从任务队列中取出的任务std::function<void()> task;//开始对任务队列进行迭代auto it = m_tasks_queue.begin();while(it != m_tasks_queue.end()){//将任务从任务队列中取出task = *it;//将取出的任务从任务队列中擦除m_tasks_queue.erase(it);//每次按顺序取出一个,取出后跳出循环break;}//判断是否有取到任务if(task != nullptr){//有取到任务则执行任务task();}}//开启调度器方法void start(){std::cout << "Schedule start..." << std::endl;std::cout << "Init a thread poll" << std::endl;//根据调度器初始化时指定的线程池大小,来初始化线程池容器的大小 m_thread_poll.resize(m_poll_size);std::cout << "Schedule run..." << std::endl;//构建线程池中的每一个线程对象,每个线程都将绑定调度器的run方法,且给每个线程对象指定了名称用于区分for(size_t i = 0; i < m_poll_size; ++i){//这里的reset方法是为了复用线程对象节约内存资源m_thread_poll[i].reset(new sylar::Thread(std::bind(&Schedule::run,this),"THREAD_"+std::to_string(i)));}}//停止调度器方法void stop(){//判断任务队列中的任务是否全部被消化while(!m_tasks_queue.empty()){for(size_t i = 0; i < m_poll_size; ++i){m_thread_poll[i].reset(new sylar::Thread(std::bind(&Schedule::run,this),"THREAD_"+std::to_string(i)));}}//如果任务队列中的任务全部被消化了,那么就在这等待每个子线程的结束for(size_t i = 0; i < m_poll_size; ++i){m_thread_poll[i]->join();}std::cout << "Schedule stop..." << std::endl; }//接受调度任务方法void schedule(std::function<void()> task){std::cout << "Push task into queue " << &task << std::endl;//将任务对象依次存放到调度器的任务队列中m_tasks_queue.push_back(task);}private://由于是多线程环境下所以提供锁MutexType m_mutex;//线程池的大小size_t m_poll_size = 0;//线程池std::vector<sylar::Thread::ptr> m_thread_poll;//任务队列std::list<std::function<void()> > m_tasks_queue;      
};int main(){std::cout << "====Schedule====" << std::endl;std::cout << "====Main start====" << std::endl;{//记录当前时间auto start = std::chrono::high_resolution_clock::now();//构建调度器,指定线程池中线程数量Schedule sc(2);//依次将要执行的任务塞入调度器sc.schedule(std::bind(&sing));sc.schedule(std::bind(&dance));sc.schedule(std::bind(&rap));//执行调度器sc.start();//停止调度器sc.stop();//记录结束时间auto end = std::chrono::high_resolution_clock::now();//计算花费的时间std::chrono::duration<double,std::milli> elapsed = end - start;std::cout << "Elapsed: " << (int)elapsed.count() << std::endl;}std::cout << "====Main end====" << std::endl;return 0;
}

以下是输出:

===Schedule===
===Main start====
Push task into queue  0x7ffd602ffb00
Push task into queue  0x7ffd602ffb40
Push task into queue  0x7ffd602ffb80
Schedule start...
Init a thread poll
Schedule run...
THREAD_1 sing~ 100000000
THREAD_2 dance~ 200000000
THREAD_1 rap~ 300000000
Schedule stop...
Elapsed: 234
===Main end====

可以看到以上的代码使用了调度器,代码的编写会很舒服,以下是使用调度器代码的部分:

//构建调度器,指定线程池中线程数量
Schedule sc(2);
//依次将要执行的任务塞入调度器
sc.schedule(std::bind(&sing));
sc.schedule(std::bind(&dance));
sc.schedule(std::bind(&rap));
//执行调度器
sc.start();
//停止调度器
sc.stop();

这样一来,无论是有多少任务需要执行,程序员都不需要关系如何创建线程和如何进行调度了,
只需要把所要执行的任务全部一股脑的交给调度器就行了。
线程也不需要自己来维护了,只需要告诉调度器需要多少个线程就行。
只要调度器完善的够好,程序员就只需要专心处理业务问题就好了。

看到这里相信你对调度器的功能和必要性有了一定的了解,那么我们来继续优化这个调度器,让它更健硕。


二、考虑多线程下的多个调度器问题【线程级别的调度器】

  1. 由于上边代码调度器Schedule类可以在其他任意线程中构建与执行,所以需要考虑多线程下调度器的区分问题。
  2. 由于这里为了由浅入深,所以没有使用协程,那么调度器就必须占用一个线程来执行。

接下来的场景会变成:一个调度器线程和多个其他的线程和一系列的函数任务。
所以需要使用线程局部变量来控制一个线程只存在一个调度器
当然这个判断需要将调度器的this存储起来做比较

以下是控制一个线程只存在一个调度器的核心逻辑

static thread_local Schedule* t_schedule = nullptr;Schedule(size_t size = 1):m_poll_size(size){//断言当前线程是否不存在调度器,如果已存在那么报错SYLAR_ASSERT2(GetThis()==nullptr,"one thread one schedule");//设置当前线程上的调度器为当前调度器setThis();
}~Schedule(){//如果当前调度器就是对应线程上的调度器那么将线程上的调度器清空if(GetThis()==this){t_schedule = nullptr;}
}static Schedule* GetThis(){return t_schedule;
}void setThis(){t_schedule = this;
}

例子(完整代码):

#include <chrono>
#include <iostream>
#include <pthread.h>
#include <vector>
#include <list>
#include <unistd.h>
#include <functional>
#include "sylar/log.h"
#include "sylar/thread.h"
#include "sylar/mutex.h"
#include "sylar/macro.h"int num = 0;
uint64_t loop_times = 100000000;void sing(){for(size_t i = 0; i < loop_times; ++i){++num;}std::cout << sylar::Thread::GetName() << " sing~ " << num << std::endl;
}void dance(){for(size_t i = 0; i < loop_times; ++i){++num;}std::cout <<sylar::Thread::GetName() << " dance~ " << num << std::endl;   
}void rap(){for(size_t i = 0; i < loop_times; ++i){++num;}std::cout <<sylar::Thread::GetName() << " rap~ " << num << std::endl;   
}class Schedule;static thread_local Schedule* t_schedule = nullptr;class Schedule {
public:typedef sylar::Mutex MutexType;Schedule(size_t size = 1):m_poll_size(size){SYLAR_ASSERT2(GetThis()==nullptr,"one thread one schedule");setThis();m_callerThread = sylar::Thread::GetThis();}~Schedule(){if(GetThis()==this){t_schedule = nullptr;}}static Schedule* GetThis(){return t_schedule;}void setThis(){t_schedule = this;}void run(){MutexType::Lock lock(m_mutex);std::function<void()> task;auto it = m_tasks_queue.begin();while(it != m_tasks_queue.end()){task = *it;m_tasks_queue.erase(it);break;}if(task != nullptr){task();}}void start(){std::cout << "Schedule start..." << std::endl;std::cout << "Init a thread poll" << std::endl;m_thread_poll.resize(m_poll_size);std::cout << "Schedule run..." << std::endl;for(size_t i = 0; i < m_poll_size; ++i){m_thread_poll[i].reset(new sylar::Thread(std::bind(&Schedule::run,this),"THREAD_"+std::to_string(i)));}}void stop(){while(!m_tasks_queue.empty()){for(size_t i = 0; i < m_poll_size; ++i){m_thread_poll[i].reset(new sylar::Thread(std::bind(&Schedule::run,this),"THREAD_"+std::to_string(i)));}}for(size_t i = 0; i < m_poll_size; ++i){m_thread_poll[i]->join();}std::cout << "Schedule stop..." << std::endl; }void schedule(std::function<void()> task){std::cout << "Push task into queue " << &task << std::endl;m_tasks_queue.push_back(task);}private:MutexType m_mutex;size_t m_poll_size = 0;std::vector<sylar::Thread::ptr> m_thread_poll;std::list<std::function<void()> > m_tasks_queue;      sylar::Thread* m_callerThread = 0;
};//第二个调度线程
void subThread(){std::cout << "Sub Schedule Thread" << std::endl;Schedule sc(1);sc.schedule(std::bind(&rap));sc.start();sc.stop();
}int main(){std::cout << "====Schedule====" << std::endl;std::cout << "====Main start====" << std::endl;{auto start = std::chrono::high_resolution_clock::now();Schedule sc(2);sc.schedule(std::bind(&subThread));sc.schedule(std::bind(&dance));// sc.schedule(std::bind(&rap));sc.start();sc.stop();auto end = std::chrono::high_resolution_clock::now();std::chrono::duration<double,std::milli> elapsed = end - start;std::cout << "Elapsed: " << (int)elapsed.count() << std::endl;}std::cout << "====Main end====" << std::endl;return 0;
}

可以看到,调度器可以在任何一个线程中使用,互不影响。


三、加入协程的概念【协程级别的调度器】

一旦加入协程的概念,那么调度器的复杂度就会上升一个层次。
首先要理清楚有哪些东西参与到这个调度器中。
其次要统一称呼,视频中就是很多称呼不明确导致看不懂。
需要理清楚以下几个点:

  1. 加入协程的概念后,调度器就不需要占用一个线程来专门做调度,而是退居到某个线程下的 协程 中去。
  2. 一旦退居到某个协程中,那么该调度器所在的协程我们叫做 【调度协程】
  3. 【调度协程】 所在的线程我称它为 【调度线程】
  4. 在一个线程中会有一个 【主协程】 ,要记住 【调度协程】 可以是 【主协程】 也可能不是(调度线程是主线程时必然不是)。
  5. 在多线程中会有一个 【主线程】,要记住 【调度线程】 可以是 【主线程】 也可以不是。
  6. 调度任务会变成两种类型:【函数任务】、【协程任务】,这里可以把他们包装到一起,统称 【任务】
  7. 【函数任务】最终可以包装成【协程任务】,这样统一调用方式。
  8. 我们封装的协程对象会有很多状态,所以在调度器中需要有比较复杂的判断(这个需要看代码了)。

四、总结

1.要看懂协程调度必须拆分着看,辩证的看。
2.由于协程调度的IO协程篇还没看完,所以存在几个遗留方法,这样无法看到全貌,可以理解大概流程后继续。
3.作者代码中确实有不足的地方,需要先内心赞同他的一切做法,当自己完全看懂后再做修改(比如Fiber中有Schedule的耦合代码)。
4.本人还没完全看完协程调度相关的视频,在看完IO协程调度后可能会回过头来修改这部分的博客内容。

这篇关于Sylar C++高性能服务器学习记录09 【协程调度模块-知识储备篇】的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/970332

相关文章

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

python: 多模块(.py)中全局变量的导入

文章目录 global关键字可变类型和不可变类型数据的内存地址单模块(单个py文件)的全局变量示例总结 多模块(多个py文件)的全局变量from x import x导入全局变量示例 import x导入全局变量示例 总结 global关键字 global 的作用范围是模块(.py)级别: 当你在一个模块(文件)中使用 global 声明变量时,这个变量只在该模块的全局命名空

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

学习hash总结

2014/1/29/   最近刚开始学hash,名字很陌生,但是hash的思想却很熟悉,以前早就做过此类的题,但是不知道这就是hash思想而已,说白了hash就是一个映射,往往灵活利用数组的下标来实现算法,hash的作用:1、判重;2、统计次数;

【C++ Primer Plus习题】13.4

大家好,这里是国中之林! ❥前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。有兴趣的可以点点进去看看← 问题: 解答: main.cpp #include <iostream>#include "port.h"int main() {Port p1;Port p2("Abc", "Bcc", 30);std::cout <<

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

C++包装器

包装器 在 C++ 中,“包装器”通常指的是一种设计模式或编程技巧,用于封装其他代码或对象,使其更易于使用、管理或扩展。包装器的概念在编程中非常普遍,可以用于函数、类、库等多个方面。下面是几个常见的 “包装器” 类型: 1. 函数包装器 函数包装器用于封装一个或多个函数,使其接口更统一或更便于调用。例如,std::function 是一个通用的函数包装器,它可以存储任意可调用对象(函数、函数