sylar高性能服务器-日志(P30-P35)内容记录

2024-02-23 17:20

本文主要是介绍sylar高性能服务器-日志(P30-P35)内容记录,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • P30-P32:协程调度01-03
      • 一、Scheduler
        • 局部变量
        • FiberAndThread(任务结构体)
        • 成员变量
        • 调度协程
        • 构造函数
        • 析构函数
        • start
        • stop
        • run
        • stopping
      • 二、参考资料
    • P33-P35:协程调度04-06
      • 一、测试1
      • 二、测试2
    • 总结

P30-P32:协程调度01-03

​ 这里开始协程调度模块,封装了一个M:N协程调度器,创建N个协程在M个线程上运行,调度器的主要思想就是先查看任务队列中有没有任务需要执行,若没有任务就进入空闲状态,反之进行调度。

​ 前面3小节搭建了调度器的基础结构,下面我按照自己的理解以及他人的笔记内容对整个类进行解释。

一、Scheduler

局部变量

两个局部线程变量保存当前线程的协程调度器和主协程

// 协程调度器
static thread_local Scheduler* t_scheduler = nullptr;
// 线程主协程
static thread_local Fiber* t_fiber = nullptr;
FiberAndThread(任务结构体)

该结构体的作用是存储协程、回调函数以及线程的信息

// 协程/函数/线程组struct FiberAndThread {Fiber::ptr fiber; // 协程std::function<void()> cb; // 回调函数int thread; // 线程IDFiberAndThread(Fiber::ptr f, int thr): fiber(f), thread(thr) {}FiberAndThread(Fiber::ptr* f, int thr):thread(thr) {// 因为传入的是一个智能指针,我们使用后会造成引用数加一,可能会引发释放问题,这里swap相当于把传入的智能指针变成一个空指针// 这样原智能指针的计数就会保持不变fiber.swap(*f); }FiberAndThread(std::function<void()> f, int thr): cb(f), thread(thr) {}  FiberAndThread(std::function<void()>* f, int thr):thread(thr) {cb.swap(*f);}  // 将一个类用到STL中必须要有默认构造函数,否则无法进行初始化FiberAndThread(): thread(-1) {}// 重置void reset() {fiber = nullptr;cb = nullptr;thread = -1;}};
成员变量
private:MutexType m_mutex;  // 互斥量std::vector<Thread::ptr> m_threads; // 线程池std::list<FiberAndThread> m_fibers;   // 等待执行的协程队列Fiber::ptr m_rootFiber;  // 主协程std::string m_name; // 协程调度器名称protected:std::vector<int> m_threadIds; // 协程下的线程id数组size_t m_threadCount = 0;   // 线程数量std::atomic<size_t> m_activeThreadCount = {0}; // 工作线程数量std::atomic<size_t> m_idleThreadCount = {0}; // 空闲线程数量bool m_stopping = true; // 是否正在停止bool m_autoStop = false; // m_autoStopint m_rootThread = 0; // 主线程id(use_caller)
调度协程

检查任务队列中有无任务,将任务加入到任务队列中,若任务队列中本来就已经有任务了,就tickle进行通知

// 调度协程模板函数
template<class FiberOrCb>void schedule(FiberOrCb fc, int thread = -1) { // -1表示任意线程bool need_tickle = false;{MutexType::Lock lock(m_mutex);need_tickle = scheduleNoLock(fc, thread);}if(need_tickle) {tickle();}
}// 批量处理调度协程
template<class InputIterator>void schedule(InputIterator begin, InputIterator end, int thread = -1) {bool need_tickle = false;{MutexType::Lock lock(m_mutex);while(begin != end) {need_tickle = scheduleNoLock(&*begin, thread) || need_tickle;begin ++;}}if(need_tickle) {tickle();}
}// 协程调度启动(无锁)
template<class FiberOrCb>bool scheduleNoLock(FiberOrCb fc, int thread) {bool need_tickle = m_fibers.empty();FiberAndThread ft(fc, thread);if(ft.fiber || ft.cb) {m_fibers.push_back(ft);}return need_tickle;
}
构造函数
Scheduler::Scheduler(size_t threads, bool use_caller, const std::string& name): m_name(name) {// 确定线程数量大于0 SYLAR_ASSERT(threads > 0);// 是否将当前用于协程调度线程也纳入调度器if(use_caller) {sylar::Fiber::GetThis();    // 这里获得的主协程用于调度其余协程-- threads; // 线程数-1SYLAR_ASSERT(GetThis() == nullptr); // 防止出现多个调度器// 设置当前的协程调度器t_scheduler = this;// 将此fiber设置为 use_caller,协程则会与 Fiber::MainFunc() 绑定// 非静态成员函数需要传递this指针作为第一个参数,用 std::bind()进行绑定m_rootFiber.reset(new Fiber(std::bind(&Scheduler::run, this))); // 这个新协程用于执行方法// 设置线程名称sylar::Thread::SetName(m_name);// 设置当前线程的主协程为m_rootFiber// 这里的m_rootFiber是该线程的主协程(执行run任务的协程),只有默认构造出来的fiber才是主协程t_fiber = m_rootFiber.get();// 获得当前线程idm_rootThread = sylar::GetTreadId();m_threadIds.push_back(m_rootThread);} else { // 不将当前线程纳入调度器m_rootThread = -1;}// 更新线程数量m_threadCount = threads;
}
析构函数
Scheduler::~Scheduler() {// 达到停止条件SYLAR_ASSERT(m_stopping);if(GetThis() == this) {t_scheduler = nullptr;}
}
start
void Scheduler::start() {SYLAR_LOG_INFO(g_logger) << "start()";MutexType::Lock lock(m_mutex);// 为false代表已经启动了,直接返回if(!m_stopping) {return;}// 将停止状态更新为falsem_stopping = false;// 线程池为空SYLAR_ASSERT(m_threads.empty());// 创建线程池m_threads.resize(m_threadCount);for(size_t i = 0; i < m_threadCount; ++ i) {// 遍历每一个线程执行run任务m_threads[i].reset(new Thread(std::bind(&Scheduler::run, this), m_name + " " + std::to_string(i)));m_threadIds.push_back(m_threads[i]->getId());}lock.unlock();// 在这里切换线程时,swap的话会将线程的主协程与当前协程交换,// 当使用use_caller时,t_fiber = m_rootFiber,call是将当前协程与主协程交换// 为了确保在启动之后仍有任务加入任务队列中,// 所以在stop()中做该线程的启动,这样就不会漏掉任务队列中的任务if(m_rootFiber) {// m_rootFiber->swapIn();m_rootFiber->call();SYLAR_LOG_INFO(g_logger) << "call out" << m_rootFiber->getState();}SYLAR_LOG_INFO(g_logger) << "start() end";
}
stop
void Scheduler::stop() {SYLAR_LOG_INFO(g_logger) << "stop()";m_autoStop = true;// 使用use_caller,并且只有一个线程,并且主协程的状态为结束或者初始化if(m_rootFiber && m_threadCount == 0 && (m_rootFiber->getState() == Fiber::TERM || m_rootFiber->getState() == Fiber::INIT)) {SYLAR_LOG_INFO(g_logger) << this << " stopped";// 停止状态为truem_stopping = true;// 若达到停止条件则直接returnif(stopping()) {return;}}// bool exit_on_this_fiber = false;// use_caller线程// 当前调度器和t_secheduler相同if(m_rootThread != -1) {SYLAR_ASSERT(GetThis() == this);} else {SYLAR_ASSERT(GetThis() != this);}m_stopping = true;// 每个线程都tickle一下for(size_t i = 0; i < m_threadCount; ++ i) {tickle();}// 使用use_caller多tickle一下if(m_rootFiber) {tickle();}if(stopping()) {return;}
}
run
void Scheduler::run() {SYLAR_LOG_INFO(g_logger) << "run()";// 设置当前调度器setThis();// 非user_caller线程,设置主协程为线程主协程if(sylar::GetTreadId() != m_rootThread) {t_fiber = Fiber::GetThis().get();}// 定义idle_fiber,当任务队列中的任务执行完之后,执行idle()Fiber::ptr idle_fiber(new Fiber(std::bind(&Scheduler::idle, this)));// 定义回调协程Fiber::ptr cb_fiber;// 定义一个任务结构体FiberAndThread ft;while(true) {// 重置也是一个初始化ft.reset();bool tickle_me = false;{// 从任务队列中拿fiber和cbMutexType::Lock lock(m_mutex);auto it = m_fibers.begin();while(it != m_fibers.end()) {// 如果当前任务指定的线程不是当前线程,则跳过,并且tickle一下if(it->thread != -1 && it->thread != sylar::GetTreadId()) {++ it;tickle_me = true;continue;}// 确保fiber或cb存在SYLAR_ASSERT(it->fiber || it->cb);// 如果该fiber正在执行则跳过if(it->fiber && it->fiber->getState() == Fiber::EXEC) {++ it;continue;}// 取出该任务ft = *it;// 从任务队列中清除m_fibers.erase(it);}}// 取到任务tickle一下if(tickle_me) {tickle();}// 执行拿到的线程if(ft.fiber && (ft.fiber->getState() != Fiber::TERM || ft.fiber->getState() != Fiber::EXCEPT)) {++ m_activeThreadCount;// 执行任务ft.fiber->swapIn();// 执行完成,活跃的线程数量减-1-- m_activeThreadCount;ft.fiber->swapIn();// 如果线程的状态被置为了READYif(ft.fiber->getState() == Fiber::READY) {// 将fiber重新加入到任务队列中schedule(ft.fiber);} else if(ft.fiber->getState() != Fiber::TERM && ft.fiber->getState() != Fiber::EXCEPT) {ft.fiber->m_state = Fiber::HOLD;}// 执行完毕重置数据ftft.reset();// 如果任务是回调函数} else if(ft.cb) {// cb_fiber存在,重置该fiberif(cb_fiber) {cb_fiber->reset(ft.cb);} else {// cb_fiber不存在则初始化一个cb_fiber.reset(new Fiber(ft.cb));ft.cb = nullptr;}// 重置数据ftft.reset();++ m_activeThreadCount;// 执行cb任务cb_fiber->swapIn();-- m_activeThreadCount;// 若cb_fiber状态为READYif(cb_fiber->getState() == Fiber::READY) {// 重新放入任务队列中schedule(cb_fiber);// 释放智能指针cb_fiber.reset();// cb_fiber异常或结束,就重置状态,可以再次使用该cb_fiber} else if(cb_fiber->getState() == Fiber::EXCEPT || cb_fiber->getState() == Fiber::TERM) {// cb_fiber的执行任务置空cb_fiber->reset(nullptr);} else {// 设置状态为HOLD,此任务后面还会通过ft.fiber被拉起cb_fiber->m_state = Fiber::HOLD;// 释放该智能指针,调用下一个任务时要重新new一个新的cb_fibercb_fiber.reset();}// 没有任务执行} else {// 如果idle_fiber的状态为TERM则结束循环,真正的结束if(idle_fiber->getState() == Fiber::TERM) {SYLAR_LOG_INFO(g_logger) << "idle fiber term";break;}// 正在执行idle的线程数量+1++ m_idleThreadCount;// 执行idle()// 正在执行idle的线程数量-1idle_fiber->swapIn();-- m_idleThreadCount;// idle_fiber状态置为HOLDif(idle_fiber->getState() != Fiber::TERM&& idle_fiber->getState() != Fiber::EXCEPT) {idle_fiber->m_state = Fiber::HOLD;}}}
}
stopping
bool Scheduler::stopping() {MutexType::Lock lock(m_mutex);// 当自动停止 && 正在停止 && 任务队列为空 && 活跃的线程数量为0return m_autoStop && m_stopping && m_fibers.empty() && m_activeThreadCount == 0;
}

二、参考资料

  1. 源码
  2. 笔记

P33-P35:协程调度04-06

​ 这几节主要对协程调度的代码进行了调试,针对几个小bug视频花了不少时间去解决,整个问题解决步骤我这里就不记录了,下面通过一个具体的例子来演示协程调度器的作用。

一、测试1

#include "../sylar/sylar.h"sylar::Logger::ptr g_logger = SYLAR_LOG_ROOT();void test_fiber() {static int s_count = 5;SYLAR_LOG_INFO(g_logger) << "test in fiber s_count = " << s_count;sleep(1);if(-- s_count >= 0) {// 未指定线程ID,表示任意线程都能执行任务sylar::Scheduler::GetThis()->schedule(&test_fiber);}}int main(int argc, char** argv) {SYLAR_LOG_INFO(g_logger) << "main";sylar::Scheduler sc(3,false, "test");sleep(2);sc.start();sc.schedule(&test_fiber);sc.stop();SYLAR_LOG_INFO(g_logger) << "over";return 0;
}

​ 上面这段测试定义了3个线程,并且将use_caller设为false,表示不让用于调度的线程执行任务。此外在执行任务时,由于没有指定线程ID,说明任意线程都能执行任务。

结果

可以看到3个协程不停的切换执行了任务

image-20240223144838276

二、测试2

下面测试修改了两处代码,首先在执行任务时指定第一个执行任务的线程去执行所有的任务,其次将use_caller设置为true,表示用于调度的线程也能参与执行任务,那么就可以少开一个线程,提高效率。

sylar::Scheduler::GetThis()->schedule(&test_fiber, sylar::GetTreadId());sylar::Scheduler sc(3,true, "test");

结果

可以看到所有的任务都是由下标为1的线程去执行,并且线程池中的线程一共就只有3个。

image-20240223145144672

总结

​ 这6节视频全部看完后并且调试通代码才大概在功能层面了解sylar做的这个协程调度器,一开始听的时候确实很迷茫,一直在不停改代码但不知道为什么要去改。不过最后也还没有完全搞懂协程调度器的细节,现在这个阶段能理解代码的运行逻辑就行了,后面二刷的时候再去深究,总之后面的内容越来越复杂了,对于我这种之前没有服务器基础的理解起来相当困难。

这篇关于sylar高性能服务器-日志(P30-P35)内容记录的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/739351

相关文章

Oracle查询优化之高效实现仅查询前10条记录的方法与实践

《Oracle查询优化之高效实现仅查询前10条记录的方法与实践》:本文主要介绍Oracle查询优化之高效实现仅查询前10条记录的相关资料,包括使用ROWNUM、ROW_NUMBER()函数、FET... 目录1. 使用 ROWNUM 查询2. 使用 ROW_NUMBER() 函数3. 使用 FETCH FI

Window Server创建2台服务器的故障转移群集的图文教程

《WindowServer创建2台服务器的故障转移群集的图文教程》本文主要介绍了在WindowsServer系统上创建一个包含两台成员服务器的故障转移群集,文中通过图文示例介绍的非常详细,对大家的... 目录一、 准备条件二、在ServerB安装故障转移群集三、在ServerC安装故障转移群集,操作与Ser

NFS实现多服务器文件的共享的方法步骤

《NFS实现多服务器文件的共享的方法步骤》NFS允许网络中的计算机之间共享资源,客户端可以透明地读写远端NFS服务器上的文件,本文就来介绍一下NFS实现多服务器文件的共享的方法步骤,感兴趣的可以了解一... 目录一、简介二、部署1、准备1、服务端和客户端:安装nfs-utils2、服务端:创建共享目录3、服

Python MySQL如何通过Binlog获取变更记录恢复数据

《PythonMySQL如何通过Binlog获取变更记录恢复数据》本文介绍了如何使用Python和pymysqlreplication库通过MySQL的二进制日志(Binlog)获取数据库的变更记录... 目录python mysql通过Binlog获取变更记录恢复数据1.安装pymysqlreplicat

Go语言使用Buffer实现高性能处理字节和字符

《Go语言使用Buffer实现高性能处理字节和字符》在Go中,bytes.Buffer是一个非常高效的类型,用于处理字节数据的读写操作,本文将详细介绍一下如何使用Buffer实现高性能处理字节和... 目录1. bytes.Buffer 的基本用法1.1. 创建和初始化 Buffer1.2. 使用 Writ

C++中实现调试日志输出

《C++中实现调试日志输出》在C++编程中,调试日志对于定位问题和优化代码至关重要,本文将介绍几种常用的调试日志输出方法,并教你如何在日志中添加时间戳,希望对大家有所帮助... 目录1. 使用 #ifdef _DEBUG 宏2. 加入时间戳:精确到毫秒3.Windows 和 MFC 中的调试日志方法MFC

SpringBoot如何使用TraceId日志链路追踪

《SpringBoot如何使用TraceId日志链路追踪》文章介绍了如何使用TraceId进行日志链路追踪,通过在日志中添加TraceId关键字,可以将同一次业务调用链上的日志串起来,本文通过实例代码... 目录项目场景:实现步骤1、pom.XML 依赖2、整合logback,打印日志,logback-sp

Python项目打包部署到服务器的实现

《Python项目打包部署到服务器的实现》本文主要介绍了PyCharm和Ubuntu服务器部署Python项目,包括打包、上传、安装和设置自启动服务的步骤,具有一定的参考价值,感兴趣的可以了解一下... 目录一、准备工作二、项目打包三、部署到服务器四、设置服务自启动一、准备工作开发环境:本文以PyChar

Apache Tomcat服务器版本号隐藏的几种方法

《ApacheTomcat服务器版本号隐藏的几种方法》本文主要介绍了ApacheTomcat服务器版本号隐藏的几种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需... 目录1. 隐藏HTTP响应头中的Server信息编辑 server.XML 文件2. 修China编程改错误

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群