使用libuv,可以非常方便的创建tcp服务端,基本上除了初始化,其他所有的处理都是在回调函数中处理的。可以非常轻松的实现异步读写。其中需要注意的是,uv_read_start的第二个参数,uv_alloc_cb回调函数,在每次接收到数据触发uv_read_cb回调之前都会被调用一次,用来给接收缓存做初始化,如果是每次通过malloc申请的内存,那么就要自己手动free掉,试例中就是使用的这种方式,只不过做了处理,接收的时候没有free掉,而是通过uv_buf_init函数把它又赋值给了uv_write所需的发送缓存,最后在发送完成回调uv_write_cb中一并free了。通过这种方式,实现了简单的tcp echo服务。
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.** Permission is hereby granted, free of charge, to any person obtaining a copy* of this software and associated documentation files (the "Software"), to* deal in the Software without restriction, including without limitation the* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or* sell copies of the Software, and to permit persons to whom the Software is* furnished to do so, subject to the following conditions:** The above copyright notice and this permission notice shall be included in* all copies or substantial portions of the Software.** THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS* IN THE SOFTWARE.*/#ifndef TASK_H_
#define TASK_H_#include "uv.h"#include <stdio.h>
#include <stddef.h>
#include <stdlib.h>#if defined(_MSC_VER) && _MSC_VER < 1600
#include "stdint-msvc2008.h"
#include <stdint.h>
#endif#if !defined(_WIN32)
#include <sys/time.h>
#include <sys/resource.h> /* setrlimit() */
#endif#ifdef __clang__
#pragma clang diagnostic ignored "-Wvariadic-macros"
#pragma clang diagnostic ignored "-Wc99-extensions"
#endif#define TEST_PORT 9123
#define TEST_PORT_2 9124#ifdef _WIN32
#define TEST_PIPENAME "\\\\?\\pipe\\uv-test"
#define TEST_PIPENAME_2 "\\\\?\\pipe\\uv-test2"
#define TEST_PIPENAME "/tmp/uv-test-sock"
#define TEST_PIPENAME_2 "/tmp/uv-test-sock2"
#endif#ifdef _WIN32
#include <io.h>
#ifndef S_IRUSR
#define S_IRUSR _S_IREAD
#ifndef S_IWUSR
#endif#define ARRAY_SIZE(a) (sizeof(a) / sizeof((a)[0]))#define container_of(ptr, type, member) \((type *) ((char *) (ptr) - offsetof(type, member)))typedef enum {TCP = 0,UDP,PIPE
} stream_type;/* Die with fatal error. */
#define FATAL(msg) \do { \fprintf(stderr, \"Fatal error in %s on line %d: %s\n", \__FILE__, \__LINE__, \msg); \fflush(stderr); \abort(); \} while (0)/* Have our own assert, so we are sure it does not get optimized away in* a release build.*/
#define ASSERT(expr) \do { \if (!(expr)) { \fprintf(stderr, \"Assertion failed in %s on line %d: %s\n", \__FILE__, \__LINE__, \#expr); \abort(); \} \} while (0)/* This macro cleans up the main loop. This is used to avoid valgrind* warnings about memory being "leaked" by the main event loop.*/
#define MAKE_VALGRIND_HAPPY() \do { \close_loop(uv_default_loop()); \uv_loop_delete(uv_default_loop()); \} while (0)/* Just sugar for wrapping the main() for a task or helper. */
#define TEST_IMPL(name) \int run_test_##name(void); \int run_test_##name(void)#define BENCHMARK_IMPL(name) \int run_benchmark_##name(void); \int run_benchmark_##name(void)#define HELPER_IMPL(name) \int run_helper_##name(void); \int run_helper_##name(void)/* Pause the calling thread for a number of milliseconds. */
void uv_sleep(int msec);/* Format big numbers nicely. WARNING: leaks memory. */
const char* fmt(double d);/* Reserved test exit codes. */
enum test_status {TEST_OK = 0,TEST_TODO,TEST_SKIP
};#define RETURN_OK() \do { \return TEST_OK; \} while (0)#define RETURN_TODO(explanation) \do { \fprintf(stderr, "%s\n", explanation); \fflush(stderr); \return TEST_TODO; \} while (0)#define RETURN_SKIP(explanation) \do { \fprintf(stderr, "%s\n", explanation); \fflush(stderr); \return TEST_SKIP; \} while (0)#if !defined(_WIN32)#define TEST_FILE_LIMIT(num) \do { \struct rlimit lim; \lim.rlim_cur = (num); \lim.rlim_max = lim.rlim_cur; \if (setrlimit(RLIMIT_NOFILE, &lim)) \RETURN_SKIP("File descriptor limit too low."); \} while (0)#else /* defined(_WIN32) */#define TEST_FILE_LIMIT(num) do {} while (0)#endif#if defined _WIN32 && ! defined __GNUC__#include <stdarg.h>/* Define inline for MSVC<2015 */
#if defined(_MSC_VER) && _MSC_VER < 1900
#define inline __inline
#endif#if defined(_MSC_VER) && _MSC_VER < 1900/* Emulate snprintf() on MSVC<2015, _snprintf() doesn't zero-terminate the buffer* on overflow...*/
inline int snprintf(char* buf, size_t len, const char* fmt, ...) {va_list ap;int n;va_start(ap, fmt);n = _vsprintf_p(buf, len, fmt, ap);va_end(ap);/* It's a sad fact of life that no one ever checks the return value of* snprintf(). Zero-terminating the buffer hopefully reduces the risk* of gaping security holes.*/if (n < 0)if (len > 0)buf[0] = '\0';return n;
#endif#endif#if defined(__clang__) || \defined(__GNUC__) || \defined(__INTEL_COMPILER) || \defined(__SUNPRO_C)
#define UNUSED __attribute__((unused))
#define UNUSED
#endif/* Fully close a loop */
static void close_walk_cb(uv_handle_t* handle, void* arg) {if (!uv_is_closing(handle))uv_close(handle, NULL);
}UNUSED static void close_loop(uv_loop_t* loop) {uv_walk(loop, close_walk_cb, NULL);uv_run(loop, UV_RUN_DEFAULT);
}UNUSED static int can_ipv6(void) {uv_interface_address_t* addr;int supported;int count;int i;if (uv_interface_addresses(&addr, &count))return 1; /* Assume IPv6 support on failure. */supported = 0;for (i = 0; supported == 0 && i < count; i += 1)supported = (AF_INET6 == addr[i].address.address6.sin6_family);uv_free_interface_addresses(addr, count);return supported;
}#endif /* TASK_H_ */
#include <stdio.h>
#include <uv.h>
#include <stdlib.h>
#include "task.h"typedef struct
{int nm;
} conn;typedef struct
{uv_write_t req;uv_buf_t buf;
} write_req_t;
static uv_loop_t* loop;static int server_closed;
static stream_type serverType;
static uv_tcp_t tcpServer;
static uv_udp_t udpServer;
static uv_pipe_t pipeServer;
static uv_handle_t* server;static void after_write(uv_write_t* req, int status);
static void after_read(uv_stream_t*, ssize_t nread, const uv_buf_t* buf);
static void on_close(uv_handle_t* peer);
static void on_server_close(uv_handle_t* handle);
static void on_connection(uv_stream_t*, int status);static void on_server_close(uv_handle_t* handle)
{ASSERT(handle == server);
}static void after_write(uv_write_t* req, int status)
{write_req_t* wr;/* Free the read/write buffer and the request */wr = (write_req_t*) req;printf("wr->buf.base=%p\n",wr->buf.base);free(wr->buf.base);free(wr);if (status == 0)return;fprintf(stderr,"uv_write error: %s - %s\n",uv_err_name(status),uv_strerror(status));
}static void after_shutdown(uv_shutdown_t* req, int status)
{uv_close((uv_handle_t*) req->handle, on_close);free(req);
}static void after_read(uv_stream_t* handle,ssize_t nread,const uv_buf_t* buf)
{int i;write_req_t *wr;uv_shutdown_t* sreq;if (nread < 0){/* Error or EOF */ASSERT(nread == UV_EOF);free(buf->base);sreq = malloc(sizeof* sreq);ASSERT(0 == uv_shutdown(sreq, handle, after_shutdown));return;}if (nread == 0){/* Everything OK, but nothing read. */free(buf->base);return;}/** Scan for the letter Q which signals that we should quit the server.* If we get QS it means close the stream.*/for (i = 0; i < nread; i++){printf("0x%02x ", buf->base[i]);}printf("\n");if (!server_closed){for (i = 0; i < nread; i++){if (buf->base[i] == 'Q'){if (i + 1 < nread && buf->base[i + 1] == 'S'){free(buf->base);uv_close((uv_handle_t*) handle, on_close);return;}else{uv_close(server, on_server_close);server_closed = 1;}}}}wr = (write_req_t*) malloc(sizeof *wr);ASSERT(wr != NULL);wr->buf = uv_buf_init(buf->base, nread);if (uv_write(&wr->req, handle, &wr->buf, 1, after_write)){FATAL("uv_write failed");}
}static void on_close(uv_handle_t* peer)
}static void echo_alloc(uv_handle_t* handle,size_t suggested_size,uv_buf_t* buf)
{buf->base = malloc(suggested_size);buf->len = suggested_size;printf("suggested_size =%d %p\n", suggested_size,buf->base);
}static void on_connection(uv_stream_t* server, int status)
{uv_stream_t* stream;int r;if (status != 0){fprintf(stderr, "Connect error %s\n", uv_err_name(status));}ASSERT(status == 0);switch (serverType){case TCP:stream = malloc(sizeof (uv_tcp_t));ASSERT(stream != NULL);r = uv_tcp_init(loop, (uv_tcp_t*) stream);ASSERT(r == 0);break;case PIPE:stream = malloc(sizeof (uv_pipe_t));ASSERT(stream != NULL);r = uv_pipe_init(loop, (uv_pipe_t*) stream, 0);ASSERT(r == 0);break;default:ASSERT(0 && "Bad serverType");abort();}/* associate server with stream */stream->data = server;r = uv_accept(server, stream);ASSERT(r == 0);r = uv_read_start(stream, echo_alloc, after_read);ASSERT(r == 0);
}static int tcp4_echo_start(int port)
{struct sockaddr_in addr;int r;ASSERT(0 == uv_ip4_addr("", port, &addr));server = (uv_handle_t*) & tcpServer;serverType = TCP;r = uv_tcp_init(loop, &tcpServer);if (r){/* TODO: Error codes */fprintf(stderr, "Socket creation error\n");return 1;}r = uv_tcp_bind(&tcpServer, (const struct sockaddr*) &addr, 0);if (r){/* TODO: Error codes */fprintf(stderr, "Bind error\n");return 1;}r = uv_listen((uv_stream_t*) & tcpServer, SOMAXCONN, on_connection);if (r){/* TODO: Error codes */fprintf(stderr, "Listen error %s\n", uv_err_name(r));return 1;}return 0;
}int main()
{loop = uv_default_loop();tcp4_echo_start(30001);uv_run(loop, UV_RUN_DEFAULT);return 0;