Linux-C++开发项目:基于Reactor模式的高性能并发服务器(Ubuntu实现)———学习笔记
目录
1、项目介绍
2、项目部署
3、项目开发过程
3.1网络库模块开发
3.1.1日志宏
3.1.2Buffer模块
编辑3.1.3Socket模块
3.1.4Channel模块
3.1.5Poller模块
3.1.6Timerwheel模块
3.1.7EvenLoop模块
整合测试1
未完待续......
学习笔记,仅作交流。如有谬误,敬请指正。
1、项目介绍
本项目实现一个基于从属Reactor模式的高性能并发服务器,并且该服务器可以单独作为一个网络库组件,组件使用者可以利用该网络库组件方便地实现各种各样的服务器。
服务器使用到epoll多路转接模型,并且工作在ET模式下。
2、项目部署
操作系统:Ubuntu(只要支持C++11的正则库)
3、项目开发过程
3.1网络库模块开发
3.1.1日志宏
日志信息分级:FATAL(致命错误)、ERROR(一般错误)、WARN(警告)、INFO(一般信息)、DEBUG(调试信息)
server.hpp
#include #include #include #define NORMAL 0 // 正常 #define DEBUG 1 // 调试 #define ERROR 2 // 错误 #define LOG_LEVEL DEBUG// 控制输出 #define LOG(level,format,...) do{\ if(level " format "\n",(void *)pthread_self(),tmp,__FILE__,__LINE__,##__VA_ARGS__);\ }while(0) #define NORMAL_LOG(format,...) LOG(NORMAL,format,##__VA_ARGS__) #define DEBUG_LOG(format,...) LOG(DEBUG,format,##__VA_ARGS__) #define ERROR_LOG(format,...) LOG(ERROR,format,##__VA_ARGS__)
该日志宏使用fprintf,可以将日志输出到文件上。该日志的输出格式为:
[线程地址]–[时:分:秒]–[file:发生日志输出的文件名|line:发生日志输出的行号]=> 输出内容
这段代码的作用在于控制日志的输出,即不符合等级的日志输出统统不输出。
if(level3.1.2Buffer模块
TCP通信的数据都会被放在套接字的缓冲区当中,但是套接字的缓冲区是有大小限制的,尽管开发者可以控制这些缓冲区的大小,但是这样做很没必要。
可以直接在应用层再提供一层缓冲区,这里把它叫做Buffer。。Buffer的作用就是一个处于应用层的缓冲区,它的容量可变,为组件使用者提供一个方便、灵活的缓冲区。
Buffer.hpp
#ifndef BUFFER_HPP #define BUFFER_HPP #include #include #include #include #define BUFFER_DEFAULT_SIZE 1024 class Buffer { public: Buffer(); char *Begin();//获取缓冲区的起始地址 char *WritePosition();//获取有效数据的结束位置,也就是新数据写入的位置 char *ReadPosition();//有效位置的起始位置,也就是读取数据的起始位置 uint64_t TailFreeSize();/ 获取_writer之后的空闲空间大小 uint64_t HeadFreeSize();// 获取_reader之前的空间空间大小 uint64_t ReadAbleSize();// 获取可读数据大小 void OffsetReader(uint64_t len);// _reader向后移动,说明有数据被读走 void OffsetWriter(uint64_t len);// _writer向后移动,说明有新数据写入 void EnsureWriteSpace(uint64_t len);//确保空间大小足够容纳新数据 void Write(const void *data, uint64_t len);// 向Buffer写入数据 void WriteAndPush(const void *data, uint64_t len);// 向Buffer写入并且造成_wirter偏移 void WriteString(const std::string &data);// 向Buffer写入string对象 void WriteStringAndPush(const std::string &data);// 写入string对象并造成_writer偏移 void WriteBuffer(Buffer &data);// 写入Buffer对象 void WriteBufferAndPush(Buffer &data);// 写入Buffer对象并造成_writer偏移 void Read(void *buf, uint64_t len);// 只能读取有效的数据 void ReadAndPop(void *buf, uint64_t len);// 读取数据并且移动_reader,即从Buffer当中删除数据 std::string ReadAsString(uint64_t len);// 读取len个数据,在该函数内部封装成string对象返回出去 std::string ReadAsStringAndPop(uint64_t len); char *FindEndOfLine();// 寻找一行的结束标志'\n' std::string GetLine();// 获取一行数据 std::string GetLineAndPop(); void Clear(); private: uint67_t _reader;//有效数据的起始位置 uint64_t _writer;//有效数据的结束位置 std::vector _buffer;//使用vector进行空间管理 }; #endif // BUFFER_HBuffer.cpp
#include "Buffer.hpp" #include #include #include Buffer::Buffer() : _reader(0), _writer(0), _buffer(BUFFER_DEFAULT_SIZE) {}//这个初始化列表的顺序与头文件类中的声明顺序有关,不然会有警告 char *Buffer::Begin() { return &(*(_buffer.begin())); } char *Buffer::WritePosition() { return Begin() + _writer; } char *Buffer::ReadPosition() { return Begin() + _reader; } uint64_t Buffer::TailFreeSize() { return _buffer.size() - _writer; } uint64_t Buffer::HeadFreeSize() { return _reader; } uint64_t Buffer::ReadAbleSize() { return _writer - _reader; } void Buffer::OffsetReader(uint64_t len) { if (len == 0) return; // 最大范围是和_writer处于同一位置,说明Buffer为空;如果超过_writer,就是未定义的行为 if (len > ReadAbleSize()) abort(); _reader += len; } void Buffer::OffsetWriter(uint64_t len) { if (len == 0) return; // 最多移动到当前_buffer的最大容量处,一旦超出就可能造成越界访问 if (len > TailFreeSize()) abort(); _writer += len; } void Buffer::EnsureWriteSpace(uint64_t len) { if (TailFreeSize() >= len) return;//_writer尾部有足够的空间容纳新数据 // _reader之前、_writer之后的空间足够容纳新数据 if (TailFreeSize() + HeadFreeSize() >= len) { uint64_t oldsize = ReadAbleSize();// 保存当前有效数据大小 std::copy(ReadPosition(), ReadPosition() + oldsize, Begin());// 将数据往前挪动 _reader = 0; _writer = oldsize; } else { _buffer.resize(_writer + len); } } void Buffer::Write(const void *data, uint64_t len) { if (len == 0) return; EnsureWriteSpace(len); const char *d = (const char *)data; std::copy(d, d + len, WritePosition());// 将[d,d+len]这段区间的数据拷贝到_writer指向的位置之后 } void Buffer::WriteAndPush(const void *data, uint64_t len) { Write(data, len); OffsetWriter(len); } void Buffer::WriteString(const std::string &data) { Write(data.c_str(), data.size()); } void Buffer::WriteStringAndPush(const std::string &data) { WriteString(data); OffsetWriter(data.size()); } void Buffer::WriteBuffer(Buffer &data) { Write(data.ReadPosition(), data.ReadAbleSize()); } void Buffer::WriteBufferAndPush(Buffer &data) { WriteBuffer(data); OffsetWriter(data.ReadAbleSize()); } void Buffer::Read(void *buf, uint64_t len) { if (len > ReadAbleSize()) abort(); std::copy(ReadPosition(), ReadPosition() + len, (char *)buf); } void Buffer::ReadAndPop(void *buf, uint64_t len) { Read(buf, len); OffsetReader(len); } std::string Buffer::ReadAsString(uint64_t len) { if (len > ReadAbleSize()) abort(); std::string str; str.resize(len); Read(&str[0], len); return str; } std::string Buffer::ReadAsStringAndPop(uint64_t len) { if (len > ReadAbleSize()) abort(); std::string str = ReadAsString(len); OffsetReader(len); return str; //return std::move(str); //std::move是将对象的状态或者所有权从一个对象转移到另一个对象,只是转移,没有内存的搬迁或者内存拷贝。 //在处理大型对象时深拷贝可能非常低效,此时可以用移动语义,即将资源的所有权从一个对象转移给另一个对象,无需进行数据的实际复制 } char *Buffer::FindEndOfLine()// 寻找一行的结束标志'\n' { char *res = (char *)memchr(ReadPosition(), '\n', ReadAbleSize()); return res; } std::string Buffer::GetLine()// 获取一行数据 { char *pos = FindEndOfLine(); if (pos == nullptr) return ""; return ReadAsString(pos - ReadPosition() + 1);/ +1是为了将'\n'一并返回 } std::string Buffer::GetLineAndPop() { std::string str = GetLine(); OffsetReader(str.size()); return str; } void Buffer::Clear()// 清空 { _reader = 0; _writer = 0; }Buffer扩容机制
3.1.3Socket模块
Socket模块是将套接字的过程封装起来(有阻塞和非阻塞两种模式进行读取和发送数据;增加端口复用功能)。
Socket.hpp
#ifndef SOCKET_HPP #define SOCKET_HPP #include #include #include #include #include #include #include #include #include #include #include #define MAX_LISTEN 64 class Socket { public: Socket(int sockfd = -1); ~Socket(); int GetFd();// 获取套接字文件描述符 bool Create();// 创建套接字 bool Bind(const std::string &ip, uint16_t port);//绑定 bool Listen(int backlog = MAX_LISTEN);//监听 bool Connect(const std::string &ip, uint16_t port);//连接 int Accept();//接收 ssize_t Recv(void *buf, size_t len, int flag = 0);// 默认为阻塞式的读取数据 ssize_t NoBlockRecv(void *buf, size_t len);// 非阻塞式读取 ssize_t Send(const void *buf, size_t len, int flag = 0);// 默认为阻塞式的发送数据 ssize_t NoBlockSend(const void *buf, size_t len);//非阻塞发送 void Close();//关闭 bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0");// 直接创建一个服务器套接字 bool CreateClient(const std::string &ip, uint16_t port); void ReuseAddr();// 开启端口地址重用 void NonBlock();//设置非阻塞 private: int _sockfd; }; #endif // SOCKET_HSocket.cpp
#include "Socket.hpp" #include #include "server.hpp" Socket::Socket(int sockfd) : _sockfd(sockfd) {} Socket::~Socket() { Close(); } int Socket::GetFd() { return _sockfd; } bool Socket::Create() { _sockfd = socket(AF_INET, SOCK_STREAM, 0);//只支持TCP协议 if (_sockfd RemoveEvent(this); } void Channel::HandleEvent() //通过判断触发事件判断调用哪个回调函数 { if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) { if (_read_callback) _read_callback(); } if (_revents & EPOLLOUT) { if (_write_callback) _write_callback(); } else if (_revents & EPOLLERR) { if (_error_callback) _error_callback(); } else if (_revents & EPOLLHUP) { if (_close_callback) _close_callback(); } if (_event_callback) _event_callback(); }3.1.5Poller模块
Poller模块的作用就是进行事件的监控和通知事件触发
Poller.hpp
#ifndef POLLER_HPP #define POLLER_HPP #include #include #include #include #include "Channel.hpp" #include #define MAX_EPOLLEVENTS 1024 class Channel; class Poller { public: Poller(); void updateEvent(Channel *channel);// 更新事件的监控 void removeEvent(Channel *channel);// 移除事件对某个Channel的事件监控 void poll(std::vector *actives);// 开始监控,并且返回事件触发的Channel private: bool HanChannel(Channel *channel);// 判断Channel对象是否被Poller模块所管理 void Update(Channel *channel, int op);// 更新epoll的监控事件 int _epfd;//epoll文件描述符 struct epoll_event _evs[MAX_EPOLLEVENTS];// 存储触发事件的数组 std::unordered_map _channels;/ Poller模块会负责通知事件,通知的对象就是Channel对象 }; #endif // POLLER_HPP3.1.6Poller.cpp
相关函数
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);函数参数:
- epfd : epoll实例的fd
- op : 操作标志,下文会描述
- fd : 监控对象的fd
- event : 事件的内容,下文描述
op可以有3个值,分别为:
- EPOLL_CTL_ADD : 添加监听的事件
- EPOLL_CTL_DEL : 删除监听的事件
- EPOLL_CTL_MOD : 修改监听的事件
events表示监控的事件的集合,是一个状态值,通过状态位来表示,可以设置如下事件:
- EPOLLIN : 文件可读
- EPOLLOUT : 文件可写
- EPOLLPRI : 文件有紧急数据可读
- EPOLLHUP : 文件被挂断。这个事件是一直监控的,即使没有明确指定
#include "Poller.hpp" #include "server.hpp" #define MAX_EPOLLEVENTS 1024 Poller::Poller() : _channels() { _epfd = epoll_create(20); if (_epfd GetFd()); if (it == _channels.end()) { return false; } return true; } void Poller::Update(Channel *channel, int op) { if (channel == nullptr) { ERROR_LOG("Null channel passed to Update"); return; } int fd = channel->GetFd(); struct epoll_event ev; ev.data.fd = fd; ev.events = channel->GetEvents(); int n = epoll_ctl(_epfd, op, fd, &ev); if (n GetFd(), channel)); Update(channel, EPOLL_CTL_ADD); return; } Update(channel, EPOLL_CTL_MOD); } //假设某个文件描述符的连接断开了,就需要取消该文件描述符的事件监控,就需要通过RemoveEvent()来完成。 void Poller::removeEvent(Channel *channel) { if (channel == nullptr) { ERROR_LOG("Null channel passed to removeEvent"); return; } auto it = _channels.find(channel->GetFd()); if (it != _channels.end()) { _channels.erase(it); } Update(channel, EPOLL_CTL_DEL); } void Poller::poll(std::vector *actives) { int ret = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1); if (ret second->SetRevents(_evs[i].events); actives->push_back(it->second); } }
3.1.6Timerwheel模块
该模块由TimerTask类和TimerWheel类组成
主要任务:定时器可以用来定时处理某些任务,在服务器的典型用处就是定时处理一些非活跃的连接,以释放服务器资源。
TimerTask.hpp
主要对定时任务的处理(如:取消定时)
#ifndef TIMERTASK_HPP #define TIMERTASK_HPP #include #include using TaskFunc = std::function; using ReleaseFunc = std::function; class TimerTask { public: TimerTask(uint64_t id, uint32_t timeout, const TaskFunc &cb, int turns); ~TimerTask(); void Cancel(); // 取消定时任务 void SetRelease(const TaskFunc &cb); uint32_t DelayTime();// 返回定时时间 void ReduceTurns();// 减少圈数 int GetTurns()const;// 获得圈数 void SetTurns(int turns); private: uint64_t _id; // 定时任务id,方便定位、查询、管理 uint32_t _timeout; // 定时任务的超时时间,即多久之后执行任务 bool _canceled; // 是否取消定时任务 TaskFunc _task_cb; // 定时器任务 ReleaseFunc _release; // 删除TimerWheel当中保存的TimerTask信息,防止内存泄漏 int _turns; // 圈数 }; #endif // TIMERTASK_HPP
TimerTask.cpp
#include "TimerTask.hpp" TimerTask::TimerTask(uint64_t id, uint32_t timeout, const TaskFunc &cb, int turns) : _id(id), _timeout(timeout), _canceled(false),_task_cb(cb), _turns(turns) { }// 默认不取消定时任务 TimerTask::~TimerTask() { if (_canceled == false) _task_cb(); // 对象析构时执行定时任务 _release(); // 释放TimerWheel中所管理的TimerTask资源 } void TimerTask::Cancel() { _canceled = true; } void TimerTask::SetRelease(const ReleaseFunc &cb) { _release = cb; } uint32_t TimerTask::DelayTime() { return _timeout; } //每当秒针移动到下一个刻度时,检查该刻度上的所有任务。如果任务的圈数 _turns 大于0,则递减 _turns。当 _turns 变为0时,任务才有资格被执行。 void TimerTask::ReduceTurns() { --_turns; } int TimerTask::GetTurns() const{ return _turns; } //为了处理延时大于60秒的定时任务,每个任务有一个 _turns 成员变量。这个变量记录了任务需要经过的完整时间轮圈数。 void TimerTask::SetTurns(int turns) { _turns = turns; }
TimerWheel.hpp
shared_ptr指针:对于开辟在堆区的内存,我可以使用多个指针指向它,就相当于我先在堆区开辟一块内存使用一个指针指向这片内存区域,然后给这个指针取很多个别名;
weak_ptr指针:由shared_ptr不正当使用引发的错误来而引出,weak_ptr只能访问所指向的内存区域,当weak_ptr指针生命结束之时,其所指向的内存依旧完好无损
#ifndef TIMERWHEEL_HPP #define TIMERWHEEL_HPP #include #include #include #include #include #include #include "Channel.hpp" #include "EventLoop.hpp" #include "TimerTask.hpp" #include #include #include class EventLoop; class Channel; // 前向声明 using TaskFunc = std::function; class TimerWheel { private: //weak_ptr只能访问所指向的内存区域,当weak_ptr指针生命结束之时,其所指向的内存依旧完好无损, //由shared_ptr不正当使用引发的错误来引出weak_ptr using WeakTask = std::weak_ptr; // 指向TimerTask的弱指针 using PtrTask = std::shared_ptr; // 指向TimerTask的引用计数型指针 static int CreateTimerfd(); int ReadTimerfd(); void RunTimerTask(); void OnTime(); // 超时时间到,读事件触发,读事件触发后的回调函数 void RemoveTimer(uint64_t id); void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb); void TimerRefreshInLoop(uint64_t id); void TimerCancelInLoop(uint64_t id); public: TimerWheel(EventLoop *loop); ~TimerWheel(); void addTimer(uint64_t id, uint32_t delay, const TaskFunc &callback); //void addTimer(int timeout, std::function callback); // void removeTimer(uint64_t id); void TimerRefresh(uint64_t id); void TimerCancel(uint64_t id); bool HasTimer(uint64_t id); private: int _tick; // 秒针,心博,每秒钟变化一次 int _capacity; // 表盘的最大数量,模拟钟表 std::vector _wheel; // 时间轮,存放TimerTask的智能指针 std::unordered_map _timers; // 管理TimerTask对象 EventLoop *_loop; int _timerfd; std::unique_ptr _timer_channel; }; #endif // TIMERWHEEL_HPP
TimerWheel.cpp
int timerfd_create(int clockid, int flags);//创建timerfd描述符 //参数: //1、clockid可以填CLOCK_REALTIME,CLOCK_MONOTONIC //CLOCK_REALTIME:系统实时时间,随系统实时时间改变而改变,即从UTC1970-1-1 0:0:0开始计时, //中间时刻如果系统时间被用户改成其他,则对应的时间相应改变 //CLOCK_MONOTONIC:从系统启动这一刻起开始计时,不受系统时间被用户改变的影响 //2、flags可以填0,O_CLOEXEC,O_NONBLOCK
#include "TimerWheel.hpp" #include "TimerTask.hpp" #include "server.hpp" #include "Channel.hpp" #include #include #include #include #include TimerWheel::TimerWheel(EventLoop *loop) : _tick(0), _capacity(60), _wheel(_capacity), _loop(loop), _timerfd(CreateTimerfd()), _timer_channel(new Channel(_loop, _timerfd))// 每一个文件描述符都会配备一个Channel对象 { _timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this)); _timer_channel->EnableRead();// 启动读事件监控 } TimerWheel::~TimerWheel() { close(_timerfd); } int TimerWheel::CreateTimerfd()// 创建定时器 { int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); if (timerfd DelayTime(); int turns = delay / _capacity; pt->SetTurns(turns);// 设置圈数 int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt);// 重新添加新的定时任务对象 } void TimerWheel::TimerCancelInLoop(uint64_t id) { auto it = _timers.find(id); if (it == _timers.end()) { return; } PtrTask pt = it->second.lock(); if (pt) pt->Cancel(); } void TimerWheel::addTimer(uint64_t id, uint32_t delay, const TaskFunc &callback) { _loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, callback)); } void TimerWheel::TimerRefresh(uint64_t id) { _loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id)); } void TimerWheel::TimerCancel(uint64_t id) { _loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id)); } bool TimerWheel::HasTimer(uint64_t id)//检查定时任务是否完成 { auto it = _timers.find(id); if (it == _timers.end()) { return false; } return true; }
_tick;// 秒针,钟变化一次
_capacity;// 表盘的最大数量,模拟钟表
_wheel;// 时间轮,存储了指向TimerTask对象的shared_ptr
_timers;// 管理TimerTask对象,存储的Val值是一个weak_ptr,weak_ptr可以升级成为shared_ptr,这样一来,weak_ptr就具有了探测指向的对象是否存在的功能了。
本项目模拟的是一个钟表,每个定时任务放在每个钟表刻度上,秒针指向了哪个刻度,哪个定时任务就执行(或圈数减1)。
为了处理延时大于60秒的定时任务,每个任务有一个 圈数_turns 成员变量。这个变量记录了任务需要经过的完整时间轮圈数。每当秒针移动到下一个刻度时,检查该刻度上的所有任务。如果任务的圈数 _turns 大于0,则递减 _turns。当 _turns 变为0时,任务才有资格被执行。
3.1.7EvenLoop模块
EventLoop的功能是进行事件循环、事件监听、事件处理和定时任务。
使用者在不同的线程操作了同一个EventLoop对象,这就很容易导致线程安全问题。
使用了一种解决方案,那就是在EventLoop当中放一个任务队列。思路是这样的:在执行任何一个有可能导致线程安全问题的函数时都判断一下执行该函数的线程是否是EventLoop对象构造时的线程,如果是,那么直接执行;如果不是,就将函数封装成一个一个任务对象压入任务队列,待EventLoop处理完所有的触发事件后再统一处理任务队列的所有任务。
EvenLoop.hpp
#ifndef EVENTLOOP_HPP #define EVENTLOOP_HPP #include #include #include #include #include #include #include "Channel.hpp" #include "Poller.hpp" #include "TimerWheel.hpp" class Poller; class Channel; // 前向声明 class TimerWheel; // 前向声明 class EventLoop { public: using Functor = std::function; EventLoop(); ~EventLoop(); void Start(); bool IsInLoop();// 判断当前EventLoop对象是否处于构造线程中 void AssertInLoop(); void RunInLoop(const Functor &cb);// 所有任务的执行都必须经过这个接口 void QueueInLoop(const Functor &cb); void UpdateEvent(Channel *channel); void RemoveEvent(Channel *channel); void TimerAdd(uint64_t id, uint32_t delay, const Functor &cb); void TimerRefresh(uint64_t id); void TimerCancel(uint64_t id); bool HasTimer(uint64_t id); private: void RunAllTask();// 执行任务队列当中的所有任务 static int CreateEventFd(); void ReadEventfd();// 从_event_fd当中读取数据 void WeakUpEventFd();// 向_evenfd_fd写入数据,即触发_event_fd的可读事件 std::thread::id _thread_id; 线程id int _event_fd;// eventfd的返回值,必须要有这个,如果任务队列当中有任务,但是没有IO事件触发,任务队列的任务就一直不会执行 std::unique_ptr _event_channel; bool Running; std::unique_ptr _poller; std::unique_ptr _timer_wheel; // 使用智能指针 std::vector _tasks;// 任务队列 std::mutex _mutex;// 保证任务队列的互斥访问 }; #endif // EVENTLOOP_HPP
EvenLoop.cpp
#include "EventLoop.hpp" #include "server.hpp" // 假设有一个日志模块,用于记录错误信息 #include "Channel.hpp" #include #include #include #include EventLoop::EventLoop() : _thread_id(std::this_thread::get_id()), _event_fd(CreateEventFd()), _event_channel(new Channel(this, _event_fd)), _poller(new Poller()), _timer_wheel(new TimerWheel(this)) { // 使用 std::unique_ptr 的构造函数 // _event_fd也需要被监听 _event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this)); _event_channel->EnableRead(); } EventLoop::~EventLoop() {} void EventLoop::Start() { while (true) { std::vector actives; _poller->poll(&actives); // 调用 poll 方法 for (auto &channel : actives) { channel->HandleEvent(); // 挨个处理事件触发之后的任务 } RunAllTask(); // 最后执行任务队列的所有任务 } } bool EventLoop::IsInLoop()// 判断当前EventLoop对象是否处于构造线程中 { return (_thread_id == std::this_thread::get_id()); } void EventLoop::AssertInLoop() { if (_thread_id != std::this_thread::get_id()) abort(); } void EventLoop::RunInLoop(const Functor &cb) { if (IsInLoop()) { return cb(); // 处于构造线程的任务直接执行 } QueueInLoop(cb); // 否则压入任务队列 } void EventLoop::QueueInLoop(const Functor &cb) { { std::unique_lock _lock(_mutex); _tasks.push_back(cb); } WeakUpEventFd(); // 任务队列有任务,向_event_fd写入数据,触发读事件,读事件触发后才会执行RunAllTask()继而执行任务队列的任务 } void EventLoop::UpdateEvent(Channel *channel) { _poller->updateEvent(channel); // 调用 updateEvent 方法 } void EventLoop::RemoveEvent(Channel *channel) { _poller->removeEvent(channel); // 调用 removeEvent 方法 } void EventLoop::TimerAdd(uint64_t id, uint32_t delay, const Functor &cb) { _timer_wheel->addTimer(id, delay, cb); // 确保调用方法签名匹配 } void EventLoop::TimerRefresh(uint64_t id) { _timer_wheel->TimerRefresh(id); } void EventLoop::TimerCancel(uint64_t id) { _timer_wheel->TimerCancel(id); } bool EventLoop::HasTimer(uint64_t id) { return _timer_wheel->HasTimer(id); } void EventLoop::RunAllTask() { std::vector functor; { std::unique_lock _lock(_mutex); _tasks.swap(functor); // 交换之后,_tasks就为空了,其他线程就没有任务执行了 } for (auto &f : functor) { f(); // 执行任务 } } int EventLoop::CreateEventFd() { int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (efd
整合测试1
test1.cpp
#include #include #include #include "server.hpp" // Assuming these are the header files for Socket, Channel, Buffer, and EventLoop #include "Channel.hpp" #include "EventLoop.hpp" #include "TimerTask.hpp" #include "TimerWheel.hpp" #include "Socket.hpp" #include "Buffer.hpp" EventLoop loop; void WriteHandle(Socket *sock, Channel *ch, Buffer *buf) { // Send the data from the buffer to the socket sock->Send(buf->ReadPosition(), buf->ReadAbleSize()); // Disable write events and enable read events for the channel ch->DisableWrite(); ch->EnableRead(); } void ReadHandle(Socket *sock, Channel *ch) { char buffer[1024] = {0}; // Buffer to store received data ssize_t n = sock->Recv(buffer, sizeof(buffer) - 1); // Receive data from the socket if (n > 0) { buffer[n] = 0; // Null-terminate the received data DEBUG_LOG("Received message from connection %d: %s", sock->GetFd(), buffer); // Create a new buffer and write the received data into it Buffer *buf = new Buffer; buf->WriteAndPush(buffer, strlen(buffer)); // Disable read events and set the write callback for the channel ch->DisableRead(); ch->SetWriteCallback(std::bind(WriteHandle, sock, ch, buf)); ch->EnableWrite(); } else { // Handle the case where no data was received or an error occurred ERROR_LOG("Failed to receive data from connection %d", sock->GetFd()); } } void AcceptHandle(Socket *sock) { int connfd = sock->Accept(); // Accept a new connection if (connfd >= 0) { DEBUG_LOG("Accepted new connection: %d", connfd); // Create new socket and channel for the accepted connection Socket *connsock = new Socket(connfd); Channel *connch = new Channel(&loop, connsock->GetFd()); // Set the read callback for the new channel and enable read events connch->SetReadCallback(std::bind(ReadHandle, connsock, connch)); connch->EnableRead(); } else { // Handle the case where accepting the connection failed ERROR_LOG("Failed to accept new connection"); } } int main() { // Create and start the server socket Socket lissock; bool ret = lissock.CreateServer(9090); if (!ret) { ERROR_LOG("Failed to create server"); return -1; } // Create a channel for the listening socket and set the accept callback Channel lisch(&loop, lissock.GetFd()); lisch.SetReadCallback(std::bind(AcceptHandle, &lissock)); lisch.EnableRead(); // Start the event loop loop.Start(); return 0; }
Makefile
# Compiler CXX = g++ # Compiler flags CXXFLAGS = -std=c++11 -Wall -Iinclude # Linker flags LDFLAGS = -pthread # Directories INCLUDE_DIR = include SRC_DIR = src OBJ_DIR = obj # Source files SRCS = $(wildcard $(SRC_DIR)/*.cpp) # Object files OBJS = $(patsubst $(SRC_DIR)/%.cpp,$(OBJ_DIR)/%.o,$(SRCS)) # Executable name TARGET = test1 # Default rule all: $(TARGET) # Link the object files to create the executable $(TARGET): $(OBJS) $(CXX) $(CXXFLAGS) -o $@ $^ $(LDFLAGS) # Compile source files to object files $(OBJ_DIR)/%.o: $(SRC_DIR)/%.cpp | $(OBJ_DIR) $(CXX) $(CXXFLAGS) -c -o $@ $
在项目的路径下运行以下命令
make ./test1
运行结果
未完待续........
(由于篇幅过长,今天就先分享到这,续集见下一篇)