sylar高性能服务器-日志(P30-P35)内容记录

2024-02-23 17:20

本文主要是介绍sylar高性能服务器-日志(P30-P35)内容记录,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • P30-P32:协程调度01-03
      • 一、Scheduler
        • 局部变量
        • FiberAndThread(任务结构体)
        • 成员变量
        • 调度协程
        • 构造函数
        • 析构函数
        • start
        • stop
        • run
        • stopping
      • 二、参考资料
    • P33-P35:协程调度04-06
      • 一、测试1
      • 二、测试2
    • 总结

P30-P32:协程调度01-03

​ 这里开始协程调度模块,封装了一个M:N协程调度器,创建N个协程在M个线程上运行,调度器的主要思想就是先查看任务队列中有没有任务需要执行,若没有任务就进入空闲状态,反之进行调度。

​ 前面3小节搭建了调度器的基础结构,下面我按照自己的理解以及他人的笔记内容对整个类进行解释。

一、Scheduler

局部变量

两个局部线程变量保存当前线程的协程调度器和主协程

// 协程调度器
static thread_local Scheduler* t_scheduler = nullptr;
// 线程主协程
static thread_local Fiber* t_fiber = nullptr;
FiberAndThread(任务结构体)

该结构体的作用是存储协程、回调函数以及线程的信息

// 协程/函数/线程组struct FiberAndThread {Fiber::ptr fiber; // 协程std::function<void()> cb; // 回调函数int thread; // 线程IDFiberAndThread(Fiber::ptr f, int thr): fiber(f), thread(thr) {}FiberAndThread(Fiber::ptr* f, int thr):thread(thr) {// 因为传入的是一个智能指针,我们使用后会造成引用数加一,可能会引发释放问题,这里swap相当于把传入的智能指针变成一个空指针// 这样原智能指针的计数就会保持不变fiber.swap(*f); }FiberAndThread(std::function<void()> f, int thr): cb(f), thread(thr) {}  FiberAndThread(std::function<void()>* f, int thr):thread(thr) {cb.swap(*f);}  // 将一个类用到STL中必须要有默认构造函数,否则无法进行初始化FiberAndThread(): thread(-1) {}// 重置void reset() {fiber = nullptr;cb = nullptr;thread = -1;}};
成员变量
private:MutexType m_mutex;  // 互斥量std::vector<Thread::ptr> m_threads; // 线程池std::list<FiberAndThread> m_fibers;   // 等待执行的协程队列Fiber::ptr m_rootFiber;  // 主协程std::string m_name; // 协程调度器名称protected:std::vector<int> m_threadIds; // 协程下的线程id数组size_t m_threadCount = 0;   // 线程数量std::atomic<size_t> m_activeThreadCount = {0}; // 工作线程数量std::atomic<size_t> m_idleThreadCount = {0}; // 空闲线程数量bool m_stopping = true; // 是否正在停止bool m_autoStop = false; // m_autoStopint m_rootThread = 0; // 主线程id(use_caller)
调度协程

检查任务队列中有无任务,将任务加入到任务队列中,若任务队列中本来就已经有任务了,就tickle进行通知

// 调度协程模板函数
template<class FiberOrCb>void schedule(FiberOrCb fc, int thread = -1) { // -1表示任意线程bool need_tickle = false;{MutexType::Lock lock(m_mutex);need_tickle = scheduleNoLock(fc, thread);}if(need_tickle) {tickle();}
}// 批量处理调度协程
template<class InputIterator>void schedule(InputIterator begin, InputIterator end, int thread = -1) {bool need_tickle = false;{MutexType::Lock lock(m_mutex);while(begin != end) {need_tickle = scheduleNoLock(&*begin, thread) || need_tickle;begin ++;}}if(need_tickle) {tickle();}
}// 协程调度启动(无锁)
template<class FiberOrCb>bool scheduleNoLock(FiberOrCb fc, int thread) {bool need_tickle = m_fibers.empty();FiberAndThread ft(fc, thread);if(ft.fiber || ft.cb) {m_fibers.push_back(ft);}return need_tickle;
}
构造函数
Scheduler::Scheduler(size_t threads, bool use_caller, const std::string& name): m_name(name) {// 确定线程数量大于0 SYLAR_ASSERT(threads > 0);// 是否将当前用于协程调度线程也纳入调度器if(use_caller) {sylar::Fiber::GetThis();    // 这里获得的主协程用于调度其余协程-- threads; // 线程数-1SYLAR_ASSERT(GetThis() == nullptr); // 防止出现多个调度器// 设置当前的协程调度器t_scheduler = this;// 将此fiber设置为 use_caller,协程则会与 Fiber::MainFunc() 绑定// 非静态成员函数需要传递this指针作为第一个参数,用 std::bind()进行绑定m_rootFiber.reset(new Fiber(std::bind(&Scheduler::run, this))); // 这个新协程用于执行方法// 设置线程名称sylar::Thread::SetName(m_name);// 设置当前线程的主协程为m_rootFiber// 这里的m_rootFiber是该线程的主协程(执行run任务的协程),只有默认构造出来的fiber才是主协程t_fiber = m_rootFiber.get();// 获得当前线程idm_rootThread = sylar::GetTreadId();m_threadIds.push_back(m_rootThread);} else { // 不将当前线程纳入调度器m_rootThread = -1;}// 更新线程数量m_threadCount = threads;
}
析构函数
Scheduler::~Scheduler() {// 达到停止条件SYLAR_ASSERT(m_stopping);if(GetThis() == this) {t_scheduler = nullptr;}
}
start
void Scheduler::start() {SYLAR_LOG_INFO(g_logger) << "start()";MutexType::Lock lock(m_mutex);// 为false代表已经启动了,直接返回if(!m_stopping) {return;}// 将停止状态更新为falsem_stopping = false;// 线程池为空SYLAR_ASSERT(m_threads.empty());// 创建线程池m_threads.resize(m_threadCount);for(size_t i = 0; i < m_threadCount; ++ i) {// 遍历每一个线程执行run任务m_threads[i].reset(new Thread(std::bind(&Scheduler::run, this), m_name + " " + std::to_string(i)));m_threadIds.push_back(m_threads[i]->getId());}lock.unlock();// 在这里切换线程时,swap的话会将线程的主协程与当前协程交换,// 当使用use_caller时,t_fiber = m_rootFiber,call是将当前协程与主协程交换// 为了确保在启动之后仍有任务加入任务队列中,// 所以在stop()中做该线程的启动,这样就不会漏掉任务队列中的任务if(m_rootFiber) {// m_rootFiber->swapIn();m_rootFiber->call();SYLAR_LOG_INFO(g_logger) << "call out" << m_rootFiber->getState();}SYLAR_LOG_INFO(g_logger) << "start() end";
}
stop
void Scheduler::stop() {SYLAR_LOG_INFO(g_logger) << "stop()";m_autoStop = true;// 使用use_caller,并且只有一个线程,并且主协程的状态为结束或者初始化if(m_rootFiber && m_threadCount == 0 && (m_rootFiber->getState() == Fiber::TERM || m_rootFiber->getState() == Fiber::INIT)) {SYLAR_LOG_INFO(g_logger) << this << " stopped";// 停止状态为truem_stopping = true;// 若达到停止条件则直接returnif(stopping()) {return;}}// bool exit_on_this_fiber = false;// use_caller线程// 当前调度器和t_secheduler相同if(m_rootThread != -1) {SYLAR_ASSERT(GetThis() == this);} else {SYLAR_ASSERT(GetThis() != this);}m_stopping = true;// 每个线程都tickle一下for(size_t i = 0; i < m_threadCount; ++ i) {tickle();}// 使用use_caller多tickle一下if(m_rootFiber) {tickle();}if(stopping()) {return;}
}
run
void Scheduler::run() {SYLAR_LOG_INFO(g_logger) << "run()";// 设置当前调度器setThis();// 非user_caller线程,设置主协程为线程主协程if(sylar::GetTreadId() != m_rootThread) {t_fiber = Fiber::GetThis().get();}// 定义idle_fiber,当任务队列中的任务执行完之后,执行idle()Fiber::ptr idle_fiber(new Fiber(std::bind(&Scheduler::idle, this)));// 定义回调协程Fiber::ptr cb_fiber;// 定义一个任务结构体FiberAndThread ft;while(true) {// 重置也是一个初始化ft.reset();bool tickle_me = false;{// 从任务队列中拿fiber和cbMutexType::Lock lock(m_mutex);auto it = m_fibers.begin();while(it != m_fibers.end()) {// 如果当前任务指定的线程不是当前线程,则跳过,并且tickle一下if(it->thread != -1 && it->thread != sylar::GetTreadId()) {++ it;tickle_me = true;continue;}// 确保fiber或cb存在SYLAR_ASSERT(it->fiber || it->cb);// 如果该fiber正在执行则跳过if(it->fiber && it->fiber->getState() == Fiber::EXEC) {++ it;continue;}// 取出该任务ft = *it;// 从任务队列中清除m_fibers.erase(it);}}// 取到任务tickle一下if(tickle_me) {tickle();}// 执行拿到的线程if(ft.fiber && (ft.fiber->getState() != Fiber::TERM || ft.fiber->getState() != Fiber::EXCEPT)) {++ m_activeThreadCount;// 执行任务ft.fiber->swapIn();// 执行完成,活跃的线程数量减-1-- m_activeThreadCount;ft.fiber->swapIn();// 如果线程的状态被置为了READYif(ft.fiber->getState() == Fiber::READY) {// 将fiber重新加入到任务队列中schedule(ft.fiber);} else if(ft.fiber->getState() != Fiber::TERM && ft.fiber->getState() != Fiber::EXCEPT) {ft.fiber->m_state = Fiber::HOLD;}// 执行完毕重置数据ftft.reset();// 如果任务是回调函数} else if(ft.cb) {// cb_fiber存在,重置该fiberif(cb_fiber) {cb_fiber->reset(ft.cb);} else {// cb_fiber不存在则初始化一个cb_fiber.reset(new Fiber(ft.cb));ft.cb = nullptr;}// 重置数据ftft.reset();++ m_activeThreadCount;// 执行cb任务cb_fiber->swapIn();-- m_activeThreadCount;// 若cb_fiber状态为READYif(cb_fiber->getState() == Fiber::READY) {// 重新放入任务队列中schedule(cb_fiber);// 释放智能指针cb_fiber.reset();// cb_fiber异常或结束,就重置状态,可以再次使用该cb_fiber} else if(cb_fiber->getState() == Fiber::EXCEPT || cb_fiber->getState() == Fiber::TERM) {// cb_fiber的执行任务置空cb_fiber->reset(nullptr);} else {// 设置状态为HOLD,此任务后面还会通过ft.fiber被拉起cb_fiber->m_state = Fiber::HOLD;// 释放该智能指针,调用下一个任务时要重新new一个新的cb_fibercb_fiber.reset();}// 没有任务执行} else {// 如果idle_fiber的状态为TERM则结束循环,真正的结束if(idle_fiber->getState() == Fiber::TERM) {SYLAR_LOG_INFO(g_logger) << "idle fiber term";break;}// 正在执行idle的线程数量+1++ m_idleThreadCount;// 执行idle()// 正在执行idle的线程数量-1idle_fiber->swapIn();-- m_idleThreadCount;// idle_fiber状态置为HOLDif(idle_fiber->getState() != Fiber::TERM&& idle_fiber->getState() != Fiber::EXCEPT) {idle_fiber->m_state = Fiber::HOLD;}}}
}
stopping
bool Scheduler::stopping() {MutexType::Lock lock(m_mutex);// 当自动停止 && 正在停止 && 任务队列为空 && 活跃的线程数量为0return m_autoStop && m_stopping && m_fibers.empty() && m_activeThreadCount == 0;
}

二、参考资料

  1. 源码
  2. 笔记

P33-P35:协程调度04-06

​ 这几节主要对协程调度的代码进行了调试,针对几个小bug视频花了不少时间去解决,整个问题解决步骤我这里就不记录了,下面通过一个具体的例子来演示协程调度器的作用。

一、测试1

#include "../sylar/sylar.h"sylar::Logger::ptr g_logger = SYLAR_LOG_ROOT();void test_fiber() {static int s_count = 5;SYLAR_LOG_INFO(g_logger) << "test in fiber s_count = " << s_count;sleep(1);if(-- s_count >= 0) {// 未指定线程ID,表示任意线程都能执行任务sylar::Scheduler::GetThis()->schedule(&test_fiber);}}int main(int argc, char** argv) {SYLAR_LOG_INFO(g_logger) << "main";sylar::Scheduler sc(3,false, "test");sleep(2);sc.start();sc.schedule(&test_fiber);sc.stop();SYLAR_LOG_INFO(g_logger) << "over";return 0;
}

​ 上面这段测试定义了3个线程,并且将use_caller设为false,表示不让用于调度的线程执行任务。此外在执行任务时,由于没有指定线程ID,说明任意线程都能执行任务。

结果

可以看到3个协程不停的切换执行了任务

image-20240223144838276

二、测试2

下面测试修改了两处代码,首先在执行任务时指定第一个执行任务的线程去执行所有的任务,其次将use_caller设置为true,表示用于调度的线程也能参与执行任务,那么就可以少开一个线程,提高效率。

sylar::Scheduler::GetThis()->schedule(&test_fiber, sylar::GetTreadId());sylar::Scheduler sc(3,true, "test");

结果

可以看到所有的任务都是由下标为1的线程去执行,并且线程池中的线程一共就只有3个。

image-20240223145144672

总结

​ 这6节视频全部看完后并且调试通代码才大概在功能层面了解sylar做的这个协程调度器,一开始听的时候确实很迷茫,一直在不停改代码但不知道为什么要去改。不过最后也还没有完全搞懂协程调度器的细节,现在这个阶段能理解代码的运行逻辑就行了,后面二刷的时候再去深究,总之后面的内容越来越复杂了,对于我这种之前没有服务器基础的理解起来相当困难。

这篇关于sylar高性能服务器-日志(P30-P35)内容记录的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/739351

相关文章

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

MySQL高性能优化规范

前言:      笔者最近上班途中突然想丰富下自己的数据库优化技能。于是在查阅了多篇文章后,总结出了这篇! 数据库命令规范 所有数据库对象名称必须使用小写字母并用下划线分割 所有数据库对象名称禁止使用mysql保留关键字(如果表名中包含关键字查询时,需要将其用单引号括起来) 数据库对象的命名要能做到见名识意,并且最后不要超过32个字符 临时库表必须以tmp_为前缀并以日期为后缀,备份

Node.js学习记录(二)

目录 一、express 1、初识express 2、安装express 3、创建并启动web服务器 4、监听 GET&POST 请求、响应内容给客户端 5、获取URL中携带的查询参数 6、获取URL中动态参数 7、静态资源托管 二、工具nodemon 三、express路由 1、express中路由 2、路由的匹配 3、路由模块化 4、路由模块添加前缀 四、中间件

Linux服务器Java启动脚本

Linux服务器Java启动脚本 1、初版2、优化版本3、常用脚本仓库 本文章介绍了如何在Linux服务器上执行Java并启动jar包, 通常我们会使用nohup直接启动,但是还是需要手动停止然后再次启动, 那如何更优雅的在服务器上启动jar包呢,让我们一起探讨一下吧。 1、初版 第一个版本是常用的做法,直接使用nohup后台启动jar包, 并将日志输出到当前文件夹n

flume系列之:查看flume系统日志、查看统计flume日志类型、查看flume日志

遍历指定目录下多个文件查找指定内容 服务器系统日志会记录flume相关日志 cat /var/log/messages |grep -i oom 查找系统日志中关于flume的指定日志 import osdef search_string_in_files(directory, search_string):count = 0

我在移动打工的日志

客户:给我搞一下录音 我:不会。不在服务范围。 客户:是不想吧 我:笑嘻嘻(气笑) 客户:小姑娘明明会,却欺负老人 我:笑嘻嘻 客户:那我交话费 我:手机号 客户:给我搞录音 我:不会。不懂。没搞过。 客户:那我交话费 我:手机号。这是电信的啊!!我这是中国移动!! 客户:我不管,我要充话费,充话费是你们的 我:可是这是移动!!中国移动!! 客户:我这是手机号 我:那又如何,这是移动!你是电信!!

两个月冲刺软考——访问位与修改位的题型(淘汰哪一页);内聚的类型;关于码制的知识点;地址映射的相关内容

1.访问位与修改位的题型(淘汰哪一页) 访问位:为1时表示在内存期间被访问过,为0时表示未被访问;修改位:为1时表示该页面自从被装入内存后被修改过,为0时表示未修改过。 置换页面时,最先置换访问位和修改位为00的,其次是01(没被访问但被修改过)的,之后是10(被访问了但没被修改过),最后是11。 2.内聚的类型 功能内聚:完成一个单一功能,各个部分协同工作,缺一不可。 顺序内聚:

记录每次更新到仓库 —— Git 学习笔记 10

记录每次更新到仓库 文章目录 文件的状态三个区域检查当前文件状态跟踪新文件取消跟踪(un-tracking)文件重新跟踪(re-tracking)文件暂存已修改文件忽略某些文件查看已暂存和未暂存的修改提交更新跳过暂存区删除文件移动文件参考资料 咱们接着很多天以前的 取得Git仓库 这篇文章继续说。 文件的状态 不管是通过哪种方法,现在我们已经有了一个仓库,并从这个仓

速盾:直播 cdn 服务器带宽?

在当今数字化时代,直播已经成为了一种非常流行的娱乐和商业活动形式。为了确保直播的流畅性和高质量,直播平台通常会使用 CDN(Content Delivery Network,内容分发网络)服务器来分发直播流。而 CDN 服务器的带宽则是影响直播质量的一个重要因素。下面我们就来探讨一下速盾视角下的直播 CDN 服务器带宽问题。 一、直播对带宽的需求 高清视频流 直播通常需要传输高清视频

学习记录:js算法(二十八):删除排序链表中的重复元素、删除排序链表中的重复元素II

文章目录 删除排序链表中的重复元素我的思路解法一:循环解法二:递归 网上思路 删除排序链表中的重复元素 II我的思路网上思路 总结 删除排序链表中的重复元素 给定一个已排序的链表的头 head , 删除所有重复的元素,使每个元素只出现一次 。返回 已排序的链表 。 图一 图二 示例 1:(图一)输入:head = [1,1,2]输出:[1,2]示例 2:(图