EventLoop整合与TimerWheel联合调试(整合二)

2024-02-21 15:52

本文主要是介绍EventLoop整合与TimerWheel联合调试(整合二),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

概要

tcp_cli.cc

tcp_srv.cc

server.hpp

测试结果


第二次整合

概要

本主要是将以下模块进行整合测试

Poller模块与Channel模块整合-CSDN博客

时间轮设计-CSDN博客

timerfd的认识与基本使用-CSDN博客

整合基于的理念

tcp_cli.cc

#include "../source/server.hpp"int main()
{Socket cli_sock;cli_sock.CreateClient(8500, "127.0.0.1");for(int i = 0; i < 5; i++){std::string str = "hello qingfengyuge!";cli_sock.Send(str.c_str(), str.size());char buf[1024] = {0};cli_sock.Recv(buf, 1023);DBG_LOG("%s", buf);sleep(1);}while (1) sleep(1);return 0;
}

tcp_srv.cc

#include "../source/server.hpp"void HandleClose(Channel *channel)
{DBG_LOG("close fd:%d", channel->Fd());channel->Remove(); // 移除监控delete channel;
}
void HandleRead(Channel *channel)
{int fd = channel->Fd();char buf[1024] = {0};int ret = recv(fd, buf, 1023, 0);if (ret <= 0){return HandleClose(channel); // 关闭释放}DBG_LOG("%s", buf);channel->EnableWrite(); // 启动可写事件
}
void HandleWrite(Channel *channel)
{int fd = channel->Fd();const char *data = "天气还不错!!";int ret = send(fd, data, strlen(data), 0);if (ret < 0){return HandleClose(channel); // 关闭释放}channel->DisableWrite(); // 关闭写监控
}
void HandleError(Channel *channel)
{return HandleClose(channel); // 关闭释放
}
void HandleEvent(EventLoop *loop, Channel *channel, uint64_t timerid)
{loop->TimerRefresh(timerid);
}
void Acceptor(EventLoop *loop, Channel *lst_channel)
{int fd = lst_channel->Fd();int newfd = accept(fd, NULL, NULL);if (newfd < 0){return;}uint64_t timerid = rand() % 10000;Channel *channel = new Channel(loop, newfd);channel->SetReadCallback(std::bind(HandleRead, channel));                  // 为通信套接字设置可读事件的回调函数channel->SetWriteCallback(std::bind(HandleWrite, channel));                // 可写事件的回调函数channel->SetCloseCallback(std::bind(HandleClose, channel));                // 关闭事件的回调函数channel->SetErrorCallback(std::bind(HandleError, channel));                // 错误事件的回调函数channel->SetEventCallback(std::bind(HandleEvent, loop, channel, timerid)); // 任意事件的回调函数channel->EnableRead();// 活跃连接的超时释放操作,10s后关闭连接// 主要定时销毁任务,必须在启动读事件之前,因为有可能启动了事件监控后,就立即有了事件,但是这时候还没有任务loop->TimerAdd(timerid, 10, std::bind(HandleClose, channel));
}int main()
{srand(time(NULL));EventLoop loop;Socket lst_sock;bool ret = lst_sock.CreateServer(8500);// 为监听套接字,创建一个Channel进行事件的管理,以及事件的处理Channel channel(&loop, lst_sock.Fd());// 回调中,获取新连接,为新连接创建Channel并且添加监控channel.SetReadCallback(std::bind(Acceptor, &loop, &channel));channel.EnableRead(); // 启动可读事件监控while (1){loop.Start();}lst_sock.Close();return 0;
}

 

server.hpp

#include <iostream>
#include <vector>
#include <cstdint>
#include <cassert>
#include <string>
#include <cstring>
#include <ctime>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <functional>
#include <sys/epoll.h>
#include <unordered_map>
#include <thread>
#include <mutex>
#include <sys/eventfd.h>
#include <memory>
#include <sys/timerfd.h>#define INF 0
#define DBG 1
#define ERR 2
#define LOG_LEVEL DBG#define LOG(level, format, ...)                                                             \do                                                                                      \{                                                                                       \if (level < LOG_LEVEL)                                                              \break;                                                                          \time_t t = time(NULL);                                                              \struct tm *ltm = localtime(&t);                                                     \char tmp[32] = {0};                                                                 \strftime(tmp, 31, "%H:%M:%S", ltm);                                                 \fprintf(stdout, "[%s %s:%d] " format "\n", tmp, __FILE__, __LINE__, ##__VA_ARGS__); \} while (0)#define INF_LOG(format, ...) LOG(INF, format, ##__VA_ARGS__)
#define DBG_LOG(format, ...) LOG(DBG, format, ##__VA_ARGS__)
#define ERR_LOG(format, ...) LOG(ERR, format, ##__VA_ARGS__)// 缓冲区类
#define BUFFER_DEFAULT_SIZE 1024 // Buffer 默认起始大小
class Buffer
{
private:std::vector<char> _buffer; // 使用vector进行内存空间管理uint64_t _reader_idx;      // 读偏移uint64_t _writer_idx;      // 写偏移
public:Buffer() : _reader_idx(0), _writer_idx(0), _buffer(BUFFER_DEFAULT_SIZE) {}char *Begin() { return &*_buffer.begin(); }// 获取当前写入起始地址char *WirtePosition() { return Begin() + _writer_idx; }// 获取当前读取起始地址char *ReadPosition() { return Begin() + _reader_idx; }// 获取缓冲区末尾空闲空间大小--写偏移之后的空闲空间, 总体空间大小减去写偏移uint64_t TailIdleSize() { return _buffer.size() - _writer_idx; }// 获取缓冲区起始空闲空间大小--读偏移之前的空闲空间uint64_t HeadIdleSize() { return _reader_idx; }// 获取可读数据大小 = 写偏移 - 读偏移uint16_t ReadAbleSize() { return _writer_idx - _reader_idx; };// 将读偏移向后移动void MoveReadOffset(uint64_t len){if (len == 0)return;// 向后移动的大小, 必须小于可读数据大小assert(len <= ReadAbleSize());_reader_idx += len;}// 将写偏移向后移动void MoveWriteOffset(uint64_t len){// 向后移动的大小,必须小于当前后边的空闲空间大小assert(len <= TailIdleSize());_writer_idx += len;}// 确保可写空间足够(整体空闲空间够了就移动数据,否则就扩容)void EnsureWriteSpace(uint64_t len){// 如果末尾空闲空间大小足够,直接返回if (TailIdleSize() >= len){return;}// 末尾空闲空间不够,则判断加上起始位置的空闲空间大小是否足够,够了就将数据移动到起始位置if (len <= TailIdleSize() + HeadIdleSize()){// 将数据移动到起始位置uint64_t rsz = ReadAbleSize();                            // 把当前数据大小先保存起来std::copy(ReadPosition(), ReadPosition() + rsz, Begin()); // 把可读数据拷贝到起始位置_reader_idx = 0;                                          // 将读偏移归0_writer_idx = rsz;                                        // 将写位置置为可读数据大小, 因为当前的可读数据大小就是写偏移量}else{// 总体空间不够,则需要扩容,不移动数据,直接给写偏移之后扩容足够空间即可_buffer.resize(_writer_idx + len);}}// 写入数据void Write(const void *data, uint64_t len){// 1.保证有足够空间, 2.拷贝数据进去EnsureWriteSpace(len);const char *d = (const char *)data;std::copy(d, d + len, WirtePosition());}void WirteAndPush(const void *data, uint64_t len){Write(data, len);MoveWriteOffset(len);}void WriteString(const std::string &data){return Write(data.c_str(), data.size());}void WriteStringAndPush(const std::string &data){WriteString(data);MoveWriteOffset(data.size());}void WriteBuffer(Buffer &data){return Write(data.ReadPosition(), data.ReadAbleSize());}void WirteBufferAndPush(Buffer &data){WriteBuffer(data);MoveWriteOffset(data.ReadAbleSize());}// 读取数据void Read(void *buf, uint64_t len){// 要求获取的数据大小必须小于可读数据大小assert(len <= ReadAbleSize());std::copy(ReadPosition(), ReadPosition() + len, (char *)buf);}void ReadAndPop(void *buf, uint64_t len){Read(buf, len);MoveReadOffset(len);}std::string ReadAsString(uint64_t len){// 要求获取的数据大小必须小于可读数据大小assert(len <= ReadAbleSize());std::string str;str.resize(len);Read(&str[0], len); // 这里不直接用str.c_str()的原因是,这个的返回值是const类型return str;}std::string ReadAsStringAndPop(uint64_t len){assert(len <= ReadAbleSize());std::string str = ReadAsString(len);MoveReadOffset(len);return str;}char *FindCRLF(){char *res = (char *)memchr(ReadPosition(), '\n', ReadAbleSize());return res;}// 这种情况针对的是,通常获取一行数据std::string GetLine(){char *pos = FindCRLF();if (pos == NULL)return "";// +1 是为了把换行字符也取出来return ReadAsString(pos - ReadPosition() + 1);}std::string GetLineAndPop(){std::string str = GetLine();MoveReadOffset(str.size());return str;}// 清空缓冲区void Clear(){// 只需要将偏移量归0即可_reader_idx = 0;_writer_idx = 0;}
};// 套接字类
#define MAX_LISTEN 1024
class Socket
{
private:int _sockfd;public:Socket() : _sockfd(-1) {}Socket(int fd) : _sockfd(fd) {}~Socket() { Close(); };int Fd() { return _sockfd; }// 创建套接字bool Create(){// int socket(int domain, int type, int protocol)_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);if (_sockfd < 0){ERR_LOG("CREATE SOCKET FAILED!");return false;}return true;}// 绑定地址信息bool Bind(const std::string &ip, uint64_t port){struct sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(struct sockaddr_in);// int bind(int sockfd, struct sockaddr* addr, socklen_t len)int ret = bind(_sockfd, (struct sockaddr *)&addr, len);if (ret < 0){ERR_LOG("BIND ADDRESS FAILED!");return false;}return true;}// 开始监听bool Listen(int backlog = MAX_LISTEN){// int listen(int backlog)int ret = listen(_sockfd, backlog);if (ret < 0){ERR_LOG("SOCKET LISTEN FAILED!");return false;}return true;}// 向服务器发起连接bool Connect(const std::string &ip, uint16_t port){struct sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(struct sockaddr_in);// int connect(int sockfd, struct sockaddr* addr, socklen_t len)int ret = connect(_sockfd, (struct sockaddr *)&addr, len);if (ret < 0){ERR_LOG("CONNECT SERVER FAILED!");return false;}return true;}// 获取新连接int Accept(){// int accept(int sockfd, struct sockaddr *addr, socklen_t *len);int newfd = accept(_sockfd, NULL, NULL);if (newfd < 0){ERR_LOG("SOCKET ACCEPT FAILED!");return -1;}return newfd;}// 接收数据ssize_t Recv(void *buf, size_t len, int flag = 0) // 0 阻塞{// ssize_t recv(int sockfd, void *buf, size_t len, int flag)ssize_t ret = recv(_sockfd, buf, len, flag);if (ret <= 0){// EAGAIN 当前的接收缓冲区中没用数据了,在非阻塞的情况下才有这个错误// EINTR 表示当前socket的阻塞等待,被信号打断了if (errno == EAGAIN || errno == EINTR){return 0; // 表示这次没用接收到数据}ERR_LOG("SOCKET RECV FAILED!");return -1;}return ret; // 实际接收的数据长度}ssize_t NonBlockRecv(void *buf, size_t len){return Recv(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞}// 发送数据ssize_t Send(const void *buf, size_t len, int flag = 0){// ssize_t send(int sockfd, void *data, size_t len, int flag)ssize_t ret = send(_sockfd, buf, len, flag);if (ret < 0){ERR_LOG("SOCKET SEND FAILED!");return -1;}return ret; // 实际发送的数据长度}ssize_t NonBlockSend(void *buf, size_t len){return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞}// 关闭套接字void Close(){if (_sockfd != -1){close(_sockfd);_sockfd = -1;}}// 创建一个服务器连接bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) // 接收全部{// 1.创建套接字 2.绑定地址 3.开始监听 4.设置非阻塞 5.启动地址重用if (Create() == false)return false;if (block_flag) // 默认阻塞NonBlock();if (Bind(ip, port) == false)return false;if (Listen() == false)return false;ReuseAddress();return true;}// 创建一个客户端连接bool CreateClient(uint16_t port, const std::string &ip){// 1.创建套接字 2.指向连接服务器if (Create() == false)return false;if (Connect(ip, port) == false)return false;return true;}// 设置套接字选项 -- 开启地址端口重用void ReuseAddress(){// int setsockopt(int fd, int level, int optname, void *val, int vallen)int val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(int)); // 地址val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(int)); // 端口号}// 设置套接字阻塞属性 -- 设置为非阻塞void NonBlock(){// int fcntl(int fd, int cmd, .../*arg*/)int flag = fcntl(_sockfd, F_GETFL, 0);fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);}
};class Poller; // 整合测试1:声明
class EventLoop;
// Channel类
class Channel
{
private:int _fd;EventLoop *_loop;uint32_t _events;  // 当前需要监控的事件uint32_t _revents; // 当前连接触发的事件using EventCallback = std::function<void()>;EventCallback _read_callback;  // 可读事件被触发的回调函数EventCallback _write_callback; // 可写事件被触发的回调函数EventCallback _error_callback; // 错误事件被触发的回调函数EventCallback _close_callback; // 连接断开事件被触发的回调函数EventCallback _event_callback; // 任意事件被触发的回调函数
public:Channel(EventLoop *loop, int fd) : _fd(fd), _events(0), _revents(0), _loop(loop) {}int Fd() { return _fd; }uint32_t Events() { return _events; } // 获取想要监控的事件void SetREvents(uint32_t events) { _revents = events; }void SetReadCallback(const EventCallback &cb) { _read_callback = cb; } // 设置实际就绪的事件void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }void SetEventCallback(const EventCallback &cb) { _event_callback = cb; }// 当前是否监控了可读bool ReadAble() { return (_events & EPOLLIN); }// 当前是否监控了可写bool WriteAble() { return (_events & EPOLLOUT); }// 启动读事件监控void EnableRead(){_events |= EPOLLIN;Update();}// 启动写事件监控void EnableWrite(){_events |= EPOLLOUT;Update();}// 关闭读事件监控void DisableRead(){_events &= ~EPOLLIN;Update();}// 关闭写事件监控void DisableWrite(){_events &= ~EPOLLOUT;Update();}// 关闭所有事件监控void DisableAll(){_events = 0;Update();}// 移除监控void Remove(); // 声明和实现要分离,因为实现的时候是不知道里面有什么函数成员的void Update(); // 这两个特殊,所以把实现放在Poller类的下面进行实现// 事件处理,一旦触发了事件,就调用这个函数,自己触发了什么事件如何处理自己决定void HandleEvent(){// 第二参数,对方关闭连接,第三参数,带外数据if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)){if (_event_callback) // 不管任何事件,都调用的回调函数_event_callback();if (_read_callback)_read_callback();}/*有可能会释放连接的操作事件,一次只处理一个*/if (_revents & EPOLLOUT){if (_event_callback)_event_callback(); // 放到事件处理完毕后调用,刷新活跃度if (_write_callback)_write_callback();}else if (_revents & EPOLLERR){if (_event_callback)_event_callback();if (_error_callback)_error_callback();}else if (_revents & EPOLLHUP){if (_event_callback)_event_callback();if (_close_callback)_close_callback();}}
};// Poller描述符监控类
#define MAX_EPOLLEVENTS 1024
class Poller
{
private:int _epfd;struct epoll_event _evs[MAX_EPOLLEVENTS];std::unordered_map<int, Channel *> _channels;private:// 对epoll的直接操作void Update(Channel *channel, int op){// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev)int fd = channel->Fd();struct epoll_event ev;ev.data.fd = fd;ev.events = channel->Events();int ret = epoll_ctl(_epfd, op, fd, &ev);if (ret < 0){ERR_LOG("EPOLLCTL FAILED!");}return;}// 判断一个Channel 是否已经添加了事件监控bool HasChannel(Channel *channel){auto it = _channels.find(channel->Fd());if (it == _channels.end()){return false;}return true;}public:Poller(){_epfd = epoll_create(MAX_EPOLLEVENTS); // 这个值大于0就行了,无用处if (_epfd < 0){ERR_LOG("EPOLL CREATE FAILED!");abort(); // 退出程序}}// 添加或修改监控事件void UpdateEvent(Channel *channel){bool ret = HasChannel(channel);if (ret == false){// 不存在则添加_channels.insert(std::make_pair(channel->Fd(), channel));return Update(channel, EPOLL_CTL_ADD);}return Update(channel, EPOLL_CTL_MOD);}// 移除监控void RemoveEvent(Channel *channel){auto it = _channels.find(channel->Fd());if (it != _channels.end()){_channels.erase(it);}Update(channel, EPOLL_CTL_DEL);}// 开始监控, 返回活跃连接void Poll(std::vector<Channel *> *active){// int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout);int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1); // -1阻塞监控if (nfds < 0){if (errno == EINTR) // 信号打断{return;}ERR_LOG("EPOLL WAIT ERROR:%s\n", strerror(errno));abort();}for (int i = 0; i < nfds; i++) // 添加活跃信息{auto it = _channels.find(_evs[i].data.fd); // 没找到就说明不在我们的管理之下,这是不正常的assert(it != _channels.end());it->second->SetREvents(_evs[i].events); // 设置实际就绪的事件active->push_back(it->second);}return;}
};// timerwheel时间轮定时器类
using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask
{
private:uint64_t _id;         // 定时器任务对象uint32_t _timeout;    // 定时任务的超时时间bool _canceled;       // false-表示没有被取消,true-表示被取消TaskFunc _task_cb;    // 定时器要执行的定时任务ReleaseFunc _release; // 用于删除TimerWheel中保存的定时器对象信息
public:TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb) : _id(id), _timeout(delay), _task_cb(cb), _canceled(false) {}~TimerTask(){if (_canceled == false)_task_cb();_release();}void Cancel() { _canceled = true; }void SetRelease(const ReleaseFunc &cb) { _release = cb; }uint32_t DelayTime() { return _timeout; } // 返回时间
};class TimerWheel
{
private:using WeakTask = std::weak_ptr<TimerTask>;using PtrTask = std::shared_ptr<TimerTask>;int _tick;     // 当前的的秒针,走到哪里哪里就释放执行int _capacity; // 表盘最大数量 -- 其实就是最大延迟时间std::vector<std::vector<PtrTask>> _wheel;// 用weak_ptr来构造出新的shared_ptr用来计数,不过后续要记得释放std::unordered_map<uint64_t, WeakTask> _timers;EventLoop *_loop;int _timerfd; // 定时器描述符 -- 可读事件回调就是读取计数器,执行定时任务std::unique_ptr<Channel> _timer_channel;private:void RemoveTimer(uint64_t id){auto it = _timers.find(id);if (it != _timers.end()){_timers.erase(it);}}static int CreateTimerfd(){// int timerfd_create(int clockid, int flags);int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);if (timerfd < 0){ERR_LOG("TIMERFD CREATE FAILED!");abort();}// int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec);struct itimerspec itime;itime.it_value.tv_sec = 1;                 // 设置 秒钟itime.it_value.tv_nsec = 0;                // 设置 纳秒 第一次超时时间为1s后itime.it_interval.tv_sec = 1;              // 同上itime.it_interval.tv_nsec = 0;             // 第一次超时后,每隔超时的间隔时timerfd_settime(timerfd, 0, &itime, NULL); // 0代表阻塞式return timerfd;}void ReadTimefd(){uint64_t times;int ret = read(_timerfd, &times, 8);if (ret < 0){perror("READ TIMERFD FAILED!");abort();}return;}// 这个函数应该每秒钟被执行一次,相当于秒钟向后走了一步void RunTimerTask(){_tick = (_tick + 1) % _capacity;_wheel[_tick].clear(); // 清空指定位置的数组,就会把数组中保存的所有管理定时器对象的shared_ptr释放掉.从而执行函数}void OnTime(){ReadTimefd();RunTimerTask();}void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb) // 添加定时任务{PtrTask pt(new TimerTask(id, delay, cb));                      // 实例化定时任务对象pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id)); // 第0个位置是隐藏的this指针。再把任务id绑定进去int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);_timers[id] = WeakTask(pt);}// 刷新/延迟定时任务void TimerRefreshInLoop(uint64_t id){// 通过保存的定时器对象的weak_ptr构造一个shared_ptr出来, 添加到轮子中auto it = _timers.find(id);if (it == _timers.end()){return; // 没找到定时任务, 没法刷新,没法延迟}PtrTask pt = it->second.lock(); // lock获取weak_ptr管理的对象对应的shared_ptrint delay = pt->DelayTime();    // 获取到了初始的延迟时间int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);}void TimerCancelInLoop(uint64_t id){auto it = _timers.find(id);if (it == _timers.end()){return; // 没找到定时任务, 没法刷新,没法延迟}PtrTask pt = it->second.lock(); // 当还没有过期才进行取消if (pt)pt->Cancel();}public:TimerWheel(EventLoop *loop) : _capacity(60), _tick(0), _wheel(_capacity), _loop(loop),_timerfd(CreateTimerfd()), _timer_channel(new Channel(_loop, _timerfd)){_timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));_timer_channel->EnableRead(); // 启动读事件监控}/*定时器中有个_timers成员,定时器信息的操作有可能在多线程中进行,因此需要考虑线程安全问题*//*如果不想加锁,那就把对定期的所有操作,都放在一个线程中进行*/void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb);// 刷新/延迟定时任务void TimerRefresh(uint64_t id);void TimerCancel(uint64_t id);/*这个接口存在线程安全问题--这个接口实际上不能被外界使用者调用,只能在模块内,对应的EventLoop线程内执行*/bool HasTimer(uint64_t id){auto it = _timers.find(id);if (it == _timers.end()){return false; // 没找到定时任务, 没法刷新,没法延迟}return true;}
};// EventLoop事件监控处理类
class EventLoop
{
private:using Functor = std::function<void()>;std::thread::id _thread_id;              // 线程IDint _event_fd;                           // eventfd唤醒IO事件监控有可能导致的阻塞std::unique_ptr<Channel> _event_channel; // 智能指针Poller _poller;                          // 进行所有描述符的事件监控std::vector<Functor> _tasks;             // 任务池std::mutex _mutex;                       // 实现任务池操作的线程安全TimerWheel _timer_wheel;                 // 定时器模块
public://  执行任务池中的所有任务void RunAllTask(){std::vector<Functor> functor;{std::unique_lock<std::mutex> _lock(_mutex);_tasks.swap(functor);}for (auto &f : functor){f();}return;}static int CreateEventFd(){int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if (efd < 0){ERR_LOG("CREATE EVENTFD FAILED!!");abort(); // 让程序异常退出}return efd;}void ReadEventfd(){uint64_t res = 0;int ret = read(_event_fd, &res, sizeof(res));if (ret < 0){// EINTR -- 被信号打断, EAGAIN -- 表示无数据可读if (errno == EINTR || EAGAIN){return;}ERR_LOG("READ EVENTFD FAILED!");abort();}return;}void WeakUpEventFd(){uint64_t val = 1;int ret = write(_event_fd, &val, sizeof(val));if (ret < 0){if (errno == EINTR){return;}ERR_LOG("READ EVENTFD FAILED!");abort();}return;}public:EventLoop() : _thread_id(std::this_thread::get_id()),_event_fd(CreateEventFd()),_event_channel(new Channel(this, _event_fd)),_timer_wheel(this){// 给eventfd添加可读事件回调函数,读取eventfd事件通知次数_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));// 启动eventfd的读事件监控_event_channel->EnableRead();}// 三步走--事件监控-》就绪事件处理-》执行任务void Start(){// 1.事件监控std::vector<Channel *> actives;_poller.Poll(&actives);// 2.事件处理for (auto &channel : actives){channel->HandleEvent();}// 3.执行任务RunAllTask();}// 用于判断当前线程是否是EventLoop对应的线程bool IsInLoop(){return (_thread_id == std::this_thread::get_id());}// 判断将要执行的任务是否处于当前线程中,如果是则执行,否则压入队列void RunInLoop(const Functor &cb){if (IsInLoop()){return cb();}return QueueInLoop(cb);}// 将操作压入任务池void QueueInLoop(const Functor &cb){{std::unique_lock<std::mutex> _lock(_mutex);_tasks.push_back(cb);}// 唤醒有可能因为没有事件就绪,而导致的epoll阻塞// 其实就是给eventfd写入一个数据,eventfd就会触发可读事件WeakUpEventFd();}// 添加/修改描述符的事件监控void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); }// 移除描述符的监控void RemoveEvent(Channel *channel) { return _poller.RemoveEvent(channel); }void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { return _timer_wheel.TimerAdd(id, delay, cb); }void TimerRefresh(uint64_t id) { return _timer_wheel.TimerRefresh(id); }void TimerCancel(uint64_t id) { return _timer_wheel.TimerCancel(id); }bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); }
};// 移除监控
void Channel::Remove() { return _loop->RemoveEvent(this); }
void Channel::Update() { return _loop->UpdateEvent(this); }void TimerWheel::TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb){_loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, cb));}void TimerWheel::TimerRefresh(uint64_t id){_loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));}void TimerWheel::TimerCancel(uint64_t id){_loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id));}

测试结果

符合预期

后记:贴必须得贴上去,代码多拷贝就要多贴几份,不管不管

这篇关于EventLoop整合与TimerWheel联合调试(整合二)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/732304

相关文章

springboot整合 xxl-job及使用步骤

《springboot整合xxl-job及使用步骤》XXL-JOB是一个分布式任务调度平台,用于解决分布式系统中的任务调度和管理问题,文章详细介绍了XXL-JOB的架构,包括调度中心、执行器和Web... 目录一、xxl-job是什么二、使用步骤1. 下载并运行管理端代码2. 访问管理页面,确认是否启动成功

SpringBoot整合kaptcha验证码过程(复制粘贴即可用)

《SpringBoot整合kaptcha验证码过程(复制粘贴即可用)》本文介绍了如何在SpringBoot项目中整合Kaptcha验证码实现,通过配置和编写相应的Controller、工具类以及前端页... 目录SpringBoot整合kaptcha验证码程序目录参考有两种方式在springboot中使用k

Spring Boot 中整合 MyBatis-Plus详细步骤(最新推荐)

《SpringBoot中整合MyBatis-Plus详细步骤(最新推荐)》本文详细介绍了如何在SpringBoot项目中整合MyBatis-Plus,包括整合步骤、基本CRUD操作、分页查询、批... 目录一、整合步骤1. 创建 Spring Boot 项目2. 配置项目依赖3. 配置数据源4. 创建实体类

python与QT联合的详细步骤记录

《python与QT联合的详细步骤记录》:本文主要介绍python与QT联合的详细步骤,文章还展示了如何在Python中调用QT的.ui文件来实现GUI界面,并介绍了多窗口的应用,文中通过代码介绍... 目录一、文章简介二、安装pyqt5三、GUI页面设计四、python的使用python文件创建pytho

SpringBoot整合InfluxDB的详细过程

《SpringBoot整合InfluxDB的详细过程》InfluxDB是一个开源的时间序列数据库,由Go语言编写,适用于存储和查询按时间顺序产生的数据,它具有高效的数据存储和查询机制,支持高并发写入和... 目录一、简单介绍InfluxDB是什么?1、主要特点2、应用场景二、使用步骤1、集成原生的Influ

SpringBoot整合Canal+RabbitMQ监听数据变更详解

《SpringBoot整合Canal+RabbitMQ监听数据变更详解》在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求,本文将介绍SpringBoot如何通过整合Canal和Rabbit... 目录需求步骤环境搭建整合SpringBoot与Canal实现客户端Canal整合RabbitMQSp

ASIO网络调试助手之一:简介

多年前,写过几篇《Boost.Asio C++网络编程》的学习文章,一直没机会实践。最近项目中用到了Asio,于是抽空写了个网络调试助手。 开发环境: Win10 Qt5.12.6 + Asio(standalone) + spdlog 支持协议: UDP + TCP Client + TCP Server 独立的Asio(http://www.think-async.com)只包含了头文件,不依

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

计算机毕业设计 大学志愿填报系统 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试

🍊作者:计算机编程-吉哥 🍊简介:专业从事JavaWeb程序开发,微信小程序开发,定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事,生活就是快乐的。 🍊心愿:点赞 👍 收藏 ⭐评论 📝 🍅 文末获取源码联系 👇🏻 精彩专栏推荐订阅 👇🏻 不然下次找不到哟~Java毕业设计项目~热门选题推荐《1000套》 目录 1.技术选型 2.开发工具 3.功能