本文主要是介绍libuv使用的若干个坑,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
最开始考虑使用muduo,但是仔细看了一下,觉得很多地方其实没有书里写的那么好,网上讨论见到有一个国人做了uvcpp,号称在游戏服务器上运行稳定,我看一下代码还有很多值得借鉴的地方,但是还是有很多不是我喜欢的实现方式,于是自己重新封装了一遍,过程比较艰辛,因为网上资料并不太多,里面很多坑……
我封装的uv-rpc单次往返时延在0.2ms左右,单链接每秒可以调用256字节的命令2万多次。
https://github.com/robinfoxnan/uv-rpc
分享一下封装过程中遇到的问题和一些心得,取之于网,还之于网,希望对大家有用。
1)线程安全与调用方式
不正确的使用可能造成程序崩溃,或者函数不起作用!!!我在github官方询问,人家根本没有搭理我,老外直接关闭了issue,shit!
遇到这样的问题是因为: 除了uv_async_send和uv_queue_work是客户代码可以安全调用的,其他的函数应该在loop回调中调用;loop中几乎几处使用锁,所以并非线程安全的,可以认为不同的线程间无法直接调用对方管理的资源。
官方示例代码都是先调用函数,比如connect ,再启动loop循环,这样是没有问题的;最后执行uv_run()
int main() {loop = uv_default_loop();uv_tcp_t server;uv_tcp_init(loop, &server);uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr);uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);int r = uv_listen((uv_stream_t*) &server, DEFAULT_BACKLOG, on_new_connection);if (r) {fprintf(stderr, "Listen error %s\n", uv_strerror(r));return 1;}return uv_run(loop, UV_RUN_DEFAULT);
}
但是当先启动了loop循环,之后在其他线程里发送数据uv_write,或者调用定时器,或者调用close之类的操作就不一定会成功,还可以能会崩溃。这个问题搞的我也很崩溃,后来搜索到了必须要在loop回调中调用非安全的函数。
解决方法是在uv_async_send中回调函数中执行要需要执行的代码段,具体可以使用lamda表达式。具体封装方式见:https://github.com/robinfoxnan/uv-rpc/blob/main/src/EventLoop.cpp
此类操作主要包括:uv_read_start,uv_write,uv_close,等等,比如我们要在某个loop循环中启动一个定时器应该如下使用:
void test()
{EventLoop * loop = new EventLoop();loop->runInLoopEn([=](){UvTimer* uvTimer = new UvTimer(loop, 5, 5, [](UvTimer* t){printf("test\n");t->stop();t->close([](UvTimer* t){printf("delete\n");delete t;});});uvTimer->start();});loop->run();
}
而绝对不可以在随便一个线程中就uv_start_timer,可能根本就不会有回调发生!!!!
2)loop线程模型
默认的libuv提供了一个uv_default_loop()可以返回一个全局的loop结构体,官方很多示例都是使用此类操作;
启动loop循环时默认的运行方式uv_run(loop, UV_RUN_DEFAULT)会阻塞线程,所以需要在一个线程中使用;loop本身是一个循环,所以异步操作的时间都需要在loop上等待回调,并且在该loop线程中完成回调。如果回调的动作很大,耗时很长,则会阻塞其他的操作;
所以我的方式是server使用一个loop用来listen,再通过配置变量启动多个IO线程,每个线程上分别执行一个loop循环,新接入的socket分配到不同的线程循环上处理各自的读写事件,这样就可以达到多核并发的效果;否则所有的SOCKET上都在一个线程上,肯定是快不了的。
即便是这样,我在IO线程中编解码,但是耗时长的CPU密集型操作还是不能在这里做,则需要将数据解析为TASK使用任务线程池处理。见下一节。
其实还有一个问题,那就是loop循环中,如果没有激活的句柄需要操作,循环就会退出,那么那么怎么保证loop中有句柄可以处理,而不会直接退出呢?其实EventLoop中封装一个默认的异步事件,那么loop就不会退出。EventLoop类做了具体实现。
关于如何启动多个IO线程,具体代码参考:
https://github.com/robinfoxnan/uv-rpc/blob/main/src/ConnectionManager.cpp
启动多个IO线程:
for (int i = 0; i < n; i++){EventLoopPtr ptr = std::make_shared<EventLoop>();loopVector.push_back(ptr);ssize_t index = loopVector.size();threadVector.push_back(std::thread([ptr, index](){printf("io thread %jd start\n", index);ptr->run();printf("io thread %jd exit\n", index);}));}
3)线程池
在uv中集成了工作线程池模型,直接调用uv_queue_work就可以安排函数执行;uv会检查是否初始化过线程池,如果没有初始化过,会根据参数建立若干个工作线程,(默认是4个),不过可以通过设置环境变量来更改工作线程个数;
需要说的是,我们的函数如果耗时很长,则在工作线程上执行,执行后,会返回调用的loop线程,执行after_work函数,也就是接收到数据的那个loop线程,在这里发送返回的数据相对比较合适。
https://github.com/robinfoxnan/uv-rpc/blob/main/src/WorkerPool.cpp
工作线程的执行流程网上很多分析,此处不再赘述。
4) 内存池
关于使用智能指针还是内存池,我觉得各有优点,首先我觉得作为网络应用来说,使用的内存块相对比较固定,使用内存池可以避免频繁的分配和释放内存。很多牛逼的C程序都使用了内存池技术。比如glusterFS。
而且libuv提供了分配内存的回调函数,可以完美的支持自己的内存管理,所以我也做了一个。
对于读操作,我使用固定大小的内存块,因为在编码时候约定不超过某个值;对于写操作,很多人使用string或者std::vector<char>,我最终没有使用vector来管理,因为测试发现vector还是很慢的,所以我自己做一个CharVector用来序列化json和protobuf数据,
https://blog.csdn.net/robinfoxnan/article/details/118760091?spm=1001.2014.3001.5501
同时,也可以使用libuv的快速队列来管理数据:https://github.com/robinfoxnan/uv-rpc/blob/main/include/BufferQue.h,但是暂时没有使用。
下面是官方的tcp-echo-server文件演示的简单服务器。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <uv.h>#define DEFAULT_PORT 7000
#define DEFAULT_BACKLOG 128uv_loop_t *loop;
struct sockaddr_in addr;typedef struct {uv_write_t req;uv_buf_t buf;
} write_req_t;void free_write_req(uv_write_t *req) {write_req_t *wr = (write_req_t*) req;free(wr->buf.base);free(wr);
}// 我理解,给这个接口是为了大家使用自己的内存池
void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {buf->base = (char*) malloc(suggested_size);buf->len = suggested_size;
}void on_close(uv_handle_t* handle) {free(handle);
}void echo_write(uv_write_t *req, int status) {if (status) {fprintf(stderr, "Write error %s\n", uv_strerror(status));}free_write_req(req);
}void echo_read(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {if (nread > 0) {write_req_t *req = (write_req_t*) malloc(sizeof(write_req_t));req->buf = uv_buf_init(buf->base, nread);uv_write((uv_write_t*) req, client, &req->buf, 1, echo_write);return;}if (nread < 0) {if (nread != UV_EOF)fprintf(stderr, "Read error %s\n", uv_err_name(nread));uv_close((uv_handle_t*) client, on_close);}// 这里有时buf->base是NULLfree(buf->base);
}void on_new_connection(uv_stream_t *server, int status) {if (status < 0) {fprintf(stderr, "New connection error %s\n", uv_strerror(status));// error!return;}uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));uv_tcp_init(loop, client);if (uv_accept(server, (uv_stream_t*) client) == 0) {uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);}else {uv_close((uv_handle_t*) client, on_close);}
}int main() {loop = uv_default_loop();uv_tcp_t server;uv_tcp_init(loop, &server);uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr);uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);int r = uv_listen((uv_stream_t*) &server, DEFAULT_BACKLOG, on_new_connection);if (r) {fprintf(stderr, "Listen error %s\n", uv_strerror(r));return 1;}return uv_run(loop, UV_RUN_DEFAULT);
}
这里其实有几个问题:
- 频繁的申请和释放内存,效率低不说,而且会造成内存碎片;
- 注意:当客户端直接关闭退出程序,服务端会收到一个echo_read,这时有可能buf是nullptr,也可能nRead==0; 此时执行free buf其实如果使用内存池,则需要检测是否需要释放到池,否则下次从池里拿到一个空值!(所以glusterfs内存块都加了标记,检测是否是自己的)
5)读写操作
- libuv的行为和socket的常规不太一样,比如read操作读到返回值0应该是对方关闭了;但是libuv其实经常会在回调函数中遇到0字节的读回调,官方示例代码也是不处理此情况;
- 读返回值为负数,则是错误,应该执行close操作;
- 写或者写回调的返回值为负数,则是错误,应该执行close操作;
- close回调结束时,就是安全的释放TcpConnection实例以及涉及的资源了,需要注意要清理相关的lamda表达式回调函数!!!
6) lamda表达式与智能指针
在封装过程中,为了简单,我使用了很多的lamda表达式作为回调函数,但是涉及到shared_ptr需要千万小心!!!
lamda表示使用[=]捕获,或者使用[&],或者捕获了智能指针shared_ptr,则会在闭包中保存一个引用,也就是增加了智能指针的引用计数,
一不小心,就会造成内存泄露,解决发方法是:
- 将保存lamda表达式发回调函数变量设置为nullptr
- 具体参考
- https://blog.csdn.net/robinfoxnan/article/details/119148102?spm=1001.2014.3001.5501
备注:
vs2017编译libuv
libuv:
mkdir build
cd build
cmake .. -G "Visual Studio 15 2017" -A x64
这篇关于libuv使用的若干个坑的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!