sylar高性能服务器-日志(P30-P35)内容记录

2024-02-23 17:20

本文主要是介绍sylar高性能服务器-日志(P30-P35)内容记录,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • P30-P32:协程调度01-03
      • 一、Scheduler
        • 局部变量
        • FiberAndThread(任务结构体)
        • 成员变量
        • 调度协程
        • 构造函数
        • 析构函数
        • start
        • stop
        • run
        • stopping
      • 二、参考资料
    • P33-P35:协程调度04-06
      • 一、测试1
      • 二、测试2
    • 总结

P30-P32:协程调度01-03

​ 这里开始协程调度模块,封装了一个M:N协程调度器,创建N个协程在M个线程上运行,调度器的主要思想就是先查看任务队列中有没有任务需要执行,若没有任务就进入空闲状态,反之进行调度。

​ 前面3小节搭建了调度器的基础结构,下面我按照自己的理解以及他人的笔记内容对整个类进行解释。

一、Scheduler

局部变量

两个局部线程变量保存当前线程的协程调度器和主协程

// 协程调度器
static thread_local Scheduler* t_scheduler = nullptr;
// 线程主协程
static thread_local Fiber* t_fiber = nullptr;
FiberAndThread(任务结构体)

该结构体的作用是存储协程、回调函数以及线程的信息

// 协程/函数/线程组struct FiberAndThread {Fiber::ptr fiber; // 协程std::function<void()> cb; // 回调函数int thread; // 线程IDFiberAndThread(Fiber::ptr f, int thr): fiber(f), thread(thr) {}FiberAndThread(Fiber::ptr* f, int thr):thread(thr) {// 因为传入的是一个智能指针,我们使用后会造成引用数加一,可能会引发释放问题,这里swap相当于把传入的智能指针变成一个空指针// 这样原智能指针的计数就会保持不变fiber.swap(*f); }FiberAndThread(std::function<void()> f, int thr): cb(f), thread(thr) {}  FiberAndThread(std::function<void()>* f, int thr):thread(thr) {cb.swap(*f);}  // 将一个类用到STL中必须要有默认构造函数,否则无法进行初始化FiberAndThread(): thread(-1) {}// 重置void reset() {fiber = nullptr;cb = nullptr;thread = -1;}};
成员变量
private:MutexType m_mutex;  // 互斥量std::vector<Thread::ptr> m_threads; // 线程池std::list<FiberAndThread> m_fibers;   // 等待执行的协程队列Fiber::ptr m_rootFiber;  // 主协程std::string m_name; // 协程调度器名称protected:std::vector<int> m_threadIds; // 协程下的线程id数组size_t m_threadCount = 0;   // 线程数量std::atomic<size_t> m_activeThreadCount = {0}; // 工作线程数量std::atomic<size_t> m_idleThreadCount = {0}; // 空闲线程数量bool m_stopping = true; // 是否正在停止bool m_autoStop = false; // m_autoStopint m_rootThread = 0; // 主线程id(use_caller)
调度协程

检查任务队列中有无任务,将任务加入到任务队列中,若任务队列中本来就已经有任务了,就tickle进行通知

// 调度协程模板函数
template<class FiberOrCb>void schedule(FiberOrCb fc, int thread = -1) { // -1表示任意线程bool need_tickle = false;{MutexType::Lock lock(m_mutex);need_tickle = scheduleNoLock(fc, thread);}if(need_tickle) {tickle();}
}// 批量处理调度协程
template<class InputIterator>void schedule(InputIterator begin, InputIterator end, int thread = -1) {bool need_tickle = false;{MutexType::Lock lock(m_mutex);while(begin != end) {need_tickle = scheduleNoLock(&*begin, thread) || need_tickle;begin ++;}}if(need_tickle) {tickle();}
}// 协程调度启动(无锁)
template<class FiberOrCb>bool scheduleNoLock(FiberOrCb fc, int thread) {bool need_tickle = m_fibers.empty();FiberAndThread ft(fc, thread);if(ft.fiber || ft.cb) {m_fibers.push_back(ft);}return need_tickle;
}
构造函数
Scheduler::Scheduler(size_t threads, bool use_caller, const std::string& name): m_name(name) {// 确定线程数量大于0 SYLAR_ASSERT(threads > 0);// 是否将当前用于协程调度线程也纳入调度器if(use_caller) {sylar::Fiber::GetThis();    // 这里获得的主协程用于调度其余协程-- threads; // 线程数-1SYLAR_ASSERT(GetThis() == nullptr); // 防止出现多个调度器// 设置当前的协程调度器t_scheduler = this;// 将此fiber设置为 use_caller,协程则会与 Fiber::MainFunc() 绑定// 非静态成员函数需要传递this指针作为第一个参数,用 std::bind()进行绑定m_rootFiber.reset(new Fiber(std::bind(&Scheduler::run, this))); // 这个新协程用于执行方法// 设置线程名称sylar::Thread::SetName(m_name);// 设置当前线程的主协程为m_rootFiber// 这里的m_rootFiber是该线程的主协程(执行run任务的协程),只有默认构造出来的fiber才是主协程t_fiber = m_rootFiber.get();// 获得当前线程idm_rootThread = sylar::GetTreadId();m_threadIds.push_back(m_rootThread);} else { // 不将当前线程纳入调度器m_rootThread = -1;}// 更新线程数量m_threadCount = threads;
}
析构函数
Scheduler::~Scheduler() {// 达到停止条件SYLAR_ASSERT(m_stopping);if(GetThis() == this) {t_scheduler = nullptr;}
}
start
void Scheduler::start() {SYLAR_LOG_INFO(g_logger) << "start()";MutexType::Lock lock(m_mutex);// 为false代表已经启动了,直接返回if(!m_stopping) {return;}// 将停止状态更新为falsem_stopping = false;// 线程池为空SYLAR_ASSERT(m_threads.empty());// 创建线程池m_threads.resize(m_threadCount);for(size_t i = 0; i < m_threadCount; ++ i) {// 遍历每一个线程执行run任务m_threads[i].reset(new Thread(std::bind(&Scheduler::run, this), m_name + " " + std::to_string(i)));m_threadIds.push_back(m_threads[i]->getId());}lock.unlock();// 在这里切换线程时,swap的话会将线程的主协程与当前协程交换,// 当使用use_caller时,t_fiber = m_rootFiber,call是将当前协程与主协程交换// 为了确保在启动之后仍有任务加入任务队列中,// 所以在stop()中做该线程的启动,这样就不会漏掉任务队列中的任务if(m_rootFiber) {// m_rootFiber->swapIn();m_rootFiber->call();SYLAR_LOG_INFO(g_logger) << "call out" << m_rootFiber->getState();}SYLAR_LOG_INFO(g_logger) << "start() end";
}
stop
void Scheduler::stop() {SYLAR_LOG_INFO(g_logger) << "stop()";m_autoStop = true;// 使用use_caller,并且只有一个线程,并且主协程的状态为结束或者初始化if(m_rootFiber && m_threadCount == 0 && (m_rootFiber->getState() == Fiber::TERM || m_rootFiber->getState() == Fiber::INIT)) {SYLAR_LOG_INFO(g_logger) << this << " stopped";// 停止状态为truem_stopping = true;// 若达到停止条件则直接returnif(stopping()) {return;}}// bool exit_on_this_fiber = false;// use_caller线程// 当前调度器和t_secheduler相同if(m_rootThread != -1) {SYLAR_ASSERT(GetThis() == this);} else {SYLAR_ASSERT(GetThis() != this);}m_stopping = true;// 每个线程都tickle一下for(size_t i = 0; i < m_threadCount; ++ i) {tickle();}// 使用use_caller多tickle一下if(m_rootFiber) {tickle();}if(stopping()) {return;}
}
run
void Scheduler::run() {SYLAR_LOG_INFO(g_logger) << "run()";// 设置当前调度器setThis();// 非user_caller线程,设置主协程为线程主协程if(sylar::GetTreadId() != m_rootThread) {t_fiber = Fiber::GetThis().get();}// 定义idle_fiber,当任务队列中的任务执行完之后,执行idle()Fiber::ptr idle_fiber(new Fiber(std::bind(&Scheduler::idle, this)));// 定义回调协程Fiber::ptr cb_fiber;// 定义一个任务结构体FiberAndThread ft;while(true) {// 重置也是一个初始化ft.reset();bool tickle_me = false;{// 从任务队列中拿fiber和cbMutexType::Lock lock(m_mutex);auto it = m_fibers.begin();while(it != m_fibers.end()) {// 如果当前任务指定的线程不是当前线程,则跳过,并且tickle一下if(it->thread != -1 && it->thread != sylar::GetTreadId()) {++ it;tickle_me = true;continue;}// 确保fiber或cb存在SYLAR_ASSERT(it->fiber || it->cb);// 如果该fiber正在执行则跳过if(it->fiber && it->fiber->getState() == Fiber::EXEC) {++ it;continue;}// 取出该任务ft = *it;// 从任务队列中清除m_fibers.erase(it);}}// 取到任务tickle一下if(tickle_me) {tickle();}// 执行拿到的线程if(ft.fiber && (ft.fiber->getState() != Fiber::TERM || ft.fiber->getState() != Fiber::EXCEPT)) {++ m_activeThreadCount;// 执行任务ft.fiber->swapIn();// 执行完成,活跃的线程数量减-1-- m_activeThreadCount;ft.fiber->swapIn();// 如果线程的状态被置为了READYif(ft.fiber->getState() == Fiber::READY) {// 将fiber重新加入到任务队列中schedule(ft.fiber);} else if(ft.fiber->getState() != Fiber::TERM && ft.fiber->getState() != Fiber::EXCEPT) {ft.fiber->m_state = Fiber::HOLD;}// 执行完毕重置数据ftft.reset();// 如果任务是回调函数} else if(ft.cb) {// cb_fiber存在,重置该fiberif(cb_fiber) {cb_fiber->reset(ft.cb);} else {// cb_fiber不存在则初始化一个cb_fiber.reset(new Fiber(ft.cb));ft.cb = nullptr;}// 重置数据ftft.reset();++ m_activeThreadCount;// 执行cb任务cb_fiber->swapIn();-- m_activeThreadCount;// 若cb_fiber状态为READYif(cb_fiber->getState() == Fiber::READY) {// 重新放入任务队列中schedule(cb_fiber);// 释放智能指针cb_fiber.reset();// cb_fiber异常或结束,就重置状态,可以再次使用该cb_fiber} else if(cb_fiber->getState() == Fiber::EXCEPT || cb_fiber->getState() == Fiber::TERM) {// cb_fiber的执行任务置空cb_fiber->reset(nullptr);} else {// 设置状态为HOLD,此任务后面还会通过ft.fiber被拉起cb_fiber->m_state = Fiber::HOLD;// 释放该智能指针,调用下一个任务时要重新new一个新的cb_fibercb_fiber.reset();}// 没有任务执行} else {// 如果idle_fiber的状态为TERM则结束循环,真正的结束if(idle_fiber->getState() == Fiber::TERM) {SYLAR_LOG_INFO(g_logger) << "idle fiber term";break;}// 正在执行idle的线程数量+1++ m_idleThreadCount;// 执行idle()// 正在执行idle的线程数量-1idle_fiber->swapIn();-- m_idleThreadCount;// idle_fiber状态置为HOLDif(idle_fiber->getState() != Fiber::TERM&& idle_fiber->getState() != Fiber::EXCEPT) {idle_fiber->m_state = Fiber::HOLD;}}}
}
stopping
bool Scheduler::stopping() {MutexType::Lock lock(m_mutex);// 当自动停止 && 正在停止 && 任务队列为空 && 活跃的线程数量为0return m_autoStop && m_stopping && m_fibers.empty() && m_activeThreadCount == 0;
}

二、参考资料

  1. 源码
  2. 笔记

P33-P35:协程调度04-06

​ 这几节主要对协程调度的代码进行了调试,针对几个小bug视频花了不少时间去解决,整个问题解决步骤我这里就不记录了,下面通过一个具体的例子来演示协程调度器的作用。

一、测试1

#include "../sylar/sylar.h"sylar::Logger::ptr g_logger = SYLAR_LOG_ROOT();void test_fiber() {static int s_count = 5;SYLAR_LOG_INFO(g_logger) << "test in fiber s_count = " << s_count;sleep(1);if(-- s_count >= 0) {// 未指定线程ID,表示任意线程都能执行任务sylar::Scheduler::GetThis()->schedule(&test_fiber);}}int main(int argc, char** argv) {SYLAR_LOG_INFO(g_logger) << "main";sylar::Scheduler sc(3,false, "test");sleep(2);sc.start();sc.schedule(&test_fiber);sc.stop();SYLAR_LOG_INFO(g_logger) << "over";return 0;
}

​ 上面这段测试定义了3个线程,并且将use_caller设为false,表示不让用于调度的线程执行任务。此外在执行任务时,由于没有指定线程ID,说明任意线程都能执行任务。

结果

可以看到3个协程不停的切换执行了任务

image-20240223144838276

二、测试2

下面测试修改了两处代码,首先在执行任务时指定第一个执行任务的线程去执行所有的任务,其次将use_caller设置为true,表示用于调度的线程也能参与执行任务,那么就可以少开一个线程,提高效率。

sylar::Scheduler::GetThis()->schedule(&test_fiber, sylar::GetTreadId());sylar::Scheduler sc(3,true, "test");

结果

可以看到所有的任务都是由下标为1的线程去执行,并且线程池中的线程一共就只有3个。

image-20240223145144672

总结

​ 这6节视频全部看完后并且调试通代码才大概在功能层面了解sylar做的这个协程调度器,一开始听的时候确实很迷茫,一直在不停改代码但不知道为什么要去改。不过最后也还没有完全搞懂协程调度器的细节,现在这个阶段能理解代码的运行逻辑就行了,后面二刷的时候再去深究,总之后面的内容越来越复杂了,对于我这种之前没有服务器基础的理解起来相当困难。

这篇关于sylar高性能服务器-日志(P30-P35)内容记录的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/739351

相关文章

SpringBoot项目注入 traceId 追踪整个请求的日志链路(过程详解)

《SpringBoot项目注入traceId追踪整个请求的日志链路(过程详解)》本文介绍了如何在单体SpringBoot项目中通过手动实现过滤器或拦截器来注入traceId,以追踪整个请求的日志链... SpringBoot项目注入 traceId 来追踪整个请求的日志链路,有了 traceId, 我们在排

MySQL 中的服务器配置和状态详解(MySQL Server Configuration and Status)

《MySQL中的服务器配置和状态详解(MySQLServerConfigurationandStatus)》MySQL服务器配置和状态设置包括服务器选项、系统变量和状态变量三个方面,可以通过... 目录mysql 之服务器配置和状态1 MySQL 架构和性能优化1.1 服务器配置和状态1.1.1 服务器选项

ElasticSearch+Kibana通过Docker部署到Linux服务器中操作方法

《ElasticSearch+Kibana通过Docker部署到Linux服务器中操作方法》本文介绍了Elasticsearch的基本概念,包括文档和字段、索引和映射,还详细描述了如何通过Docker... 目录1、ElasticSearch概念2、ElasticSearch、Kibana和IK分词器部署

部署Vue项目到服务器后404错误的原因及解决方案

《部署Vue项目到服务器后404错误的原因及解决方案》文章介绍了Vue项目部署步骤以及404错误的解决方案,部署步骤包括构建项目、上传文件、配置Web服务器、重启Nginx和访问域名,404错误通常是... 目录一、vue项目部署步骤二、404错误原因及解决方案错误场景原因分析解决方案一、Vue项目部署步骤

Linux流媒体服务器部署流程

《Linux流媒体服务器部署流程》文章详细介绍了流媒体服务器的部署步骤,包括更新系统、安装依赖组件、编译安装Nginx和RTMP模块、配置Nginx和FFmpeg,以及测试流媒体服务器的搭建... 目录流媒体服务器部署部署安装1.更新系统2.安装依赖组件3.解压4.编译安装(添加RTMP和openssl模块

如何解决Pycharm编辑内容时有光标的问题

《如何解决Pycharm编辑内容时有光标的问题》文章介绍了如何在PyCharm中配置VimEmulator插件,包括检查插件是否已安装、下载插件以及安装IdeaVim插件的步骤... 目录Pycharm编辑内容时有光标1.如果Vim Emulator前面有对勾2.www.chinasem.cn如果tools工

关于Spring @Bean 相同加载顺序不同结果不同的问题记录

《关于Spring@Bean相同加载顺序不同结果不同的问题记录》本文主要探讨了在Spring5.1.3.RELEASE版本下,当有两个全注解类定义相同类型的Bean时,由于加载顺序不同,最终生成的... 目录问题说明测试输出1测试输出2@Bean注解的BeanDefiChina编程nition加入时机总结问题说明

Spring Boot整合log4j2日志配置的详细教程

《SpringBoot整合log4j2日志配置的详细教程》:本文主要介绍SpringBoot项目中整合Log4j2日志框架的步骤和配置,包括常用日志框架的比较、配置参数介绍、Log4j2配置详解... 目录前言一、常用日志框架二、配置参数介绍1. 日志级别2. 输出形式3. 日志格式3.1 PatternL

JavaWeb-WebSocket浏览器服务器双向通信方式

《JavaWeb-WebSocket浏览器服务器双向通信方式》文章介绍了WebSocket协议的工作原理和应用场景,包括与HTTP的对比,接着,详细介绍了如何在Java中使用WebSocket,包括配... 目录一、概述二、入门2.1 POM依赖2.2 编写配置类2.3 编写WebSocket服务2.4 浏

查询SQL Server数据库服务器IP地址的多种有效方法

《查询SQLServer数据库服务器IP地址的多种有效方法》作为数据库管理员或开发人员,了解如何查询SQLServer数据库服务器的IP地址是一项重要技能,本文将介绍几种简单而有效的方法,帮助你轻松... 目录使用T-SQL查询方法1:使用系统函数方法2:使用系统视图使用SQL Server Configu