视频监听安防平台-libuv库通信协议封装-支持udp和tcpserver同时使用

本文主要是介绍视频监听安防平台-libuv库通信协议封装-支持udp和tcpserver同时使用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

             视频监听安防平台-libuv库通信协议封装-支持udp和tcpserver同时使用

最近在上网找资料的时候,看到libuv的通信库挺强大的,而且一直都有人在维护,所有就使用这个通信库做服务端接收和发送,在库封装好之后,发现在接收和发送消息的时候总是卡主了,才发现是使用libuv库的发送的时候出现的问题,果断将发送直接用底层发送,使用socket发送,下面附注一下libuv通信库的源码,希望对大家有帮助。

1、首先下载libuv库,我是自己裁剪部分代码出来封装的库。先编译一份libuv.a的静态库

2、对应用层进行封装,下面粘贴一下源码:

#ifndef __UV_SOCKET_TRANS_H__
#define __UV_SOCKET_TRANS_H__#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <poll.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
#include <sys/epoll.h>
#include <sys/ioctl.h>
#include <arpa/inet.h>
#include <sys/mman.h>
#include <map>#include "MyList.h"
#include "MyThread.h"
#include "TypeDef.h"
#include "tool.h"
#include "SysIf.h"
#include "uv.h"#define UV_TCPSVR_LISTEN_MAXCONN 1024
#define UV_SOCKET_BUFFER_SIZE	1024*1024*10//typedef void (*UvRecvPacketCallBack)(int sock,const char *pData, int len,const char *pSrcIP, int SrcPort,const char* pDestIP,int DestPort, bool bTcpRecv);using namespace std;typedef struct {uv_write_t req;uv_buf_t buf;
} write_req_t;typedef struct _TUvSocketItem
{int 					socketfd;SOCKETTYPE_E			socktype;char					srcip[IPSTR_MAX_LEN + 1];int						srcport;char					destip[IPSTR_MAX_LEN + 1];int						destport;//uv_tcp_t 				tcphandle;//uv_udp_t				udphandle;//uv_buf_t 				recvbuf;   void 					*sockethandle;char 					*pTcpRecvData;int 					nTcpRecvLen;_TUvSocketItem(){socketfd = -1;socktype = SOCKETTYPE_UDP;srcip[0] = '\0';srcport = 0;destip[0] = '\0';destport = 0;sockethandle = NULL;pTcpRecvData = NULL;nTcpRecvLen = 0;}
}TUvSocketItem;class CUvSocketTransMgr
{
public:CUvSocketTransMgr();~CUvSocketTransMgr();public:bool BindSocket(const char _srcip[], int _srcport, SOCKETTYPE_E type, char* _destip = NULL, int _destport = 0);int MoveSocket(const char _srcip[],int _srcport,SOCKETTYPE_E type,const char* _destip,int _destport);virtual int RecvPacketCallBack(int sock,const char *pData, int len,const char *pSrcIP, int SrcPort,const char* pDestIP,int DestPort, bool bTcpRecv) = 0;int SendPacket(int sock,const char *pData, int Datalen,const char *pSrcIP, int SrcPort,const char* pDestIP,int DestPort, bool bTcpSend);
private:bool tcp4_echo_start(const char *ip, int port);bool udp4_echo_start(const char *ip, int port);bool run(int status = UV_RUN_DEFAULT);static void echo_alloc(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);static void on_closeserver(uv_handle_t* handle);static void on_close(uv_handle_t* handle);static void after_write(uv_write_t* req, int status);static void after_read(uv_stream_t* client, ssize_t nread, const uv_buf_t* rcvbuf);static void on_connection(uv_stream_t* tcpserver, int status);static void on_send(uv_udp_send_t* req, int status);static void on_recv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* rcvbuf, const struct sockaddr* addr, unsigned flags); 
private:void sip_tcp_sticky_packet(TUvSocketItem * pSocketItem, const char *pData, int len, const char *pSrcIP, int SrcPort,const char* pDestIP,int DestPort);TUvSocketItem *OpenAUvSocketItem(int socketfd, SOCKETTYPE_E type, char *srcip, int srcport, char *destip, int destport, void *handle);TUvSocketItem *FindAUvSocketItem(int socketfd);void RemoveAUvSocketItem(int socketfd);void RemoveAllUvSocketItem(void);std::map<int, TUvSocketItem*> m_UvSocketItemMap;                     //scoketfd ->  TUvSocketItemtypedef std::map<int, TUvSocketItem*>::iterator UVSOCKETMAP_IT;typedef std::map<int, TUvSocketItem*>::value_type UVSOCKETMAP_VT;
public:uv_loop_t* loop;uv_tcp_t tcpServer;uv_udp_t udpServer;uv_handle_t* server;char m_serverip[IPSTR_MAX_LEN + 1];int  m_serverport;uv_mutex_t mutex_data_;//clients map mutex
private:uv_mutex_t mutex_sockets_;//clients map mutexuv_thread_t start_threadhandle_;//start thread handlestatic void StartThread(void* arg);//start thread,run until use close the serverbool m_runing;
};#endif 

源文件:

#include "UVSocketTransMgr.h"CUvSocketTransMgr::CUvSocketTransMgr()
{loop = NULL;int iret = uv_mutex_init(&mutex_data_);if (iret) {printf("%s: uv_mutex_init failed iret:%s\n", __FUNCTION__, uv_err_name(iret));}iret = uv_mutex_init(&mutex_sockets_);if (iret) {printf("%s: uv_mutex_init failed iret:%s\n", __FUNCTION__, uv_err_name(iret));}m_serverip[0] = '\0';m_serverport = 0;m_UvSocketItemMap.clear();m_runing = false;
}CUvSocketTransMgr::~CUvSocketTransMgr()
{RemoveAllUvSocketItem();uv_stop(loop);//pthread_exit(NULL);uv_thread_join(&start_threadhandle_);uv_loop_delete(loop);uv_mutex_destroy(&mutex_data_);uv_mutex_destroy(&mutex_sockets_);
}void CUvSocketTransMgr::echo_alloc(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) 
{buf->base = (char *)malloc(suggested_size);buf->len = suggested_size;
}void CUvSocketTransMgr::on_closeserver(uv_handle_t* handle) 
{printf("%s:%d >>>>>>>on_closeserver:%p\n", __FUNCTION__, __LINE__, handle);
}void CUvSocketTransMgr::on_close(uv_handle_t* handle) 
{	printf("%s:%d >>>>>>>on_close:%p\n", __FUNCTION__, __LINE__, handle);free(handle);
}void CUvSocketTransMgr::after_write(uv_write_t* req, int status) 
{write_req_t* wr;/* Free the read/write buffer and the request */wr = (write_req_t*) req;free(wr->buf.base);free(wr);if (status == 0)return;printf("%s: uv_write error: %s - %s\n", __FUNCTION__, uv_err_name(status), uv_strerror(status));
}void CUvSocketTransMgr::after_read(uv_stream_t* client, ssize_t nread, const uv_buf_t* rcvbuf) 
{uv_stream_t *tcpserver = (uv_stream_t *)client->data;if (tcpserver == NULL){printf("%s:%d tcpserver is null\n", __FUNCTION__, __LINE__);return;}CUvSocketTransMgr *socketmgr = (CUvSocketTransMgr *)tcpserver->data;if (socketmgr == NULL){printf("%s:%d socketmgr is null\n", __FUNCTION__, __LINE__);return;}if (nread > 0){TUvSocketItem *pSocketItem = socketmgr->FindAUvSocketItem(client->io_watcher.fd);if (pSocketItem == NULL){printf("%s:%d find a uvsocketitem socket:%d is null\n", __FUNCTION__, __LINE__, client->io_watcher.fd);return;}if (pSocketItem != NULL){// sticky sip tcp packetsocketmgr->sip_tcp_sticky_packet(pSocketItem, rcvbuf->base, nread, pSocketItem->srcip, pSocketItem->srcport, pSocketItem->destip, pSocketItem->destport);}}if (nread < 0) {        if (nread != UV_EOF) {printf("%s:Read error %s\n", __FUNCTION__, uv_err_name(nread));   }socketmgr->RemoveAUvSocketItem(client->io_watcher.fd);uv_close((uv_handle_t*) client, on_close);    }free(rcvbuf->base);return;
}void CUvSocketTransMgr::on_connection(uv_stream_t* tcpserver, int status) 
{if (status != 0) {printf("%s: Connect error %s\n", __FUNCTION__, uv_err_name(status));}CUvSocketTransMgr *socketmgr = (CUvSocketTransMgr *)tcpserver->data;if (socketmgr == NULL){printf("%s:%d socketmgr is null\n", __FUNCTION__, __LINE__);return;}uv_stream_t *client = (uv_stream_t *)malloc(sizeof(uv_tcp_t));if (client == NULL){	printf("%s:%d client malloc faild\n", __FUNCTION__, __LINE__);return;}int iret = uv_tcp_init(socketmgr->loop, (uv_tcp_t*)client);if (iret){printf("%s: uv_tcp_init failed iret:%s\n", __FUNCTION__, uv_err_name(iret));return;}/* associate server with stream */client->data = tcpserver;iret = uv_accept(tcpserver, client);if (iret == 0){struct sockaddr_in serveraddr;	int serverlen = sizeof(struct sockaddr);    char svrip[IPSTR_MAX_LEN+1] = {0};uv_tcp_getsockname((uv_tcp_t*)client, (struct sockaddr*)&serveraddr, &serverlen);  inet_ntop(AF_INET, (struct in_addr *)&serveraddr.sin_addr.s_addr, svrip, IPSTR_MAX_LEN);int svrport = ntohs(serveraddr.sin_port);struct sockaddr_in clientaddr;	int clientlen = sizeof(struct sockaddr);	uv_tcp_getpeername((uv_tcp_t*)client, (struct sockaddr*)&clientaddr, &clientlen); char clientip[IPSTR_MAX_LEN+1] = {0};inet_ntop(AF_INET, (struct in_addr *)&clientaddr.sin_addr.s_addr, clientip, IPSTR_MAX_LEN);  //使用线程安全函数int clientport = ntohs(clientaddr.sin_port);	// add socketitemsocketmgr->OpenAUvSocketItem(client->io_watcher.fd, SOCKETTYPE_TCPSERVER, svrip, svrport, clientip, clientport, client);iret= uv_read_start(client, echo_alloc, after_read);if (iret){printf("%s: uv_read_start failed iret:%s\n", __FUNCTION__, uv_err_name(iret));return;}}else{socketmgr->RemoveAUvSocketItem(client->io_watcher.fd);//close socketuv_close((uv_handle_t*)client, on_close);}
}bool CUvSocketTransMgr::tcp4_echo_start(const char *ip, int port) 
{struct sockaddr_in addr;int iret;iret = uv_ip4_addr(ip, port, &addr);if (iret){printf("%s: uv_ip4_addr failed iret:%s\n", __FUNCTION__, uv_err_name(iret));return false;}//server = (uv_handle_t*)&tcpServer;iret = uv_tcp_init(loop, &tcpServer);if (iret) {printf("%s: uv_tcp_init failed iret:%s\n", __FUNCTION__, uv_err_name(iret));return false;}iret = uv_tcp_bind(&tcpServer, (const struct sockaddr*) &addr, 0);if (iret) {printf("%s: uv_tcp_bind failed iret:%s\n", __FUNCTION__, uv_err_name(iret));return false;}tcpServer.data = this;iret = uv_listen((uv_stream_t*)&tcpServer, UV_TCPSVR_LISTEN_MAXCONN, on_connection);if (iret) {printf("%s: uv_listen failed iret:%s\n", __FUNCTION__, uv_err_name(iret));return false;}return true;
}void CUvSocketTransMgr::on_send(uv_udp_send_t* req, int status) 
{if (status) {printf("%s:Send error %s\n", __FUNCTION__, uv_strerror(status));return;}//if (status == 0)free(req);
}void CUvSocketTransMgr::on_recv(uv_udp_t* handle, ssize_t nread, const uv_buf_t* rcvbuf, const struct sockaddr* addr, unsigned flags)
{if (nread > 0){CUvSocketTransMgr *socketmgr = (CUvSocketTransMgr *)handle->data;if (socketmgr == NULL){return;}struct sockaddr_in svraddr;	int clientlen = sizeof(struct sockaddr);char svrip[IPSTR_MAX_LEN + 1] = {0};uv_udp_getsockname(handle, (struct sockaddr*)&svraddr, &clientlen);inet_ntop(AF_INET, (struct in_addr *)&svraddr.sin_addr.s_addr, svrip, IPSTR_MAX_LEN);  //使用线程安全函数		int svrport = ntohs(svraddr.sin_port);struct sockaddr_in *clientaddr = (struct sockaddr_in *)addr;	char dstip[IPSTR_MAX_LEN+1] = {0};inet_ntop(AF_INET, (struct in_addr *)&clientaddr->sin_addr.s_addr, dstip, IPSTR_MAX_LEN); int dstport = ntohs(clientaddr->sin_port);//static int recvtimes = 0;//printf("1.[%d]on_recvxxxxxxxxxxxxsocketfd:%d, handle:%p, nread:%d src<%s:%d>, dst<%s:%d>\n", ++recvtimes, handle->io_watcher.fd, handle, nread, svrip, svrport, dstip, dstport);		//回调uv_mutex_lock(&socketmgr->mutex_data_);socketmgr->RecvPacketCallBack(handle->io_watcher.fd, rcvbuf->base, nread, svrip, svrport, dstip, dstport, false);uv_mutex_unlock(&socketmgr->mutex_data_);//free rcvbuffree(rcvbuf->base);}
}bool CUvSocketTransMgr::udp4_echo_start(const char *ip, int port) 
{int iret;struct sockaddr_in addr;iret = uv_ip4_addr(ip, port, &addr);if (iret){printf("%s: uv_ip4_addr failed iret:%s\n", __FUNCTION__, uv_err_name(iret));return false;}//server = (uv_handle_t*)&udpServer;iret = uv_udp_init(loop, &udpServer);if (iret) {printf("%s: uv_udp_init failed iret:%s\n", __FUNCTION__, uv_err_name(iret));return false;}//uv_udp_nodelay//UV_UDP_REUSEADDRiret = uv_udp_bind(&udpServer, (const struct sockaddr *)&addr, UV_UDP_REUSEADDR);if (iret) {printf("%s: uv_udp_bind failed iret:%s\n", __FUNCTION__, uv_err_name(iret));return false;}udpServer.data = this;iret = uv_udp_recv_start(&udpServer, echo_alloc, on_recv);if (iret) {printf("%s: uv_udp_recv_start failed iret:%s\n", __FUNCTION__, uv_err_name(iret));return false;}OpenAUvSocketItem(udpServer.io_watcher.fd, SOCKETTYPE_UDP, m_serverip, m_serverport, NULL, 0, &udpServer);return true;
}//bool CUvSocketTransMgr::Start(SOCKETTYPE_E socktype, const char* ip, int port)
bool CUvSocketTransMgr::BindSocket(const char _srcip[],int _srcport,SOCKETTYPE_E type, char* _destip,int _destport)
{if (strlen(_srcip) <= 0 || _srcport <= 0){return false;}if (loop == NULL){loop = uv_default_loop();}if (loop == NULL){printf("%s: uv_default_loop failed\n", __FUNCTION__);return false;}strncpy2(m_serverip, _srcip, IPSTR_MAX_LEN);m_serverport = _srcport;if (type == SOCKETTYPE_UDP){udp4_echo_start(_srcip, _srcport);}else if (type == SOCKETTYPE_TCPSERVER){tcp4_echo_start(_srcip, _srcport);}else{return false;}if (!m_runing){int iret = uv_thread_create(&start_threadhandle_, StartThread, this);//use thread to wait for start succeed.if (iret) {printf("%s: uv_thread_create failed iret:%s\n", __FUNCTION__, uv_err_name(iret));return false;}m_runing = true;}return true;
}int CUvSocketTransMgr::MoveSocket(const char _srcip[],int _srcport,SOCKETTYPE_E type,const char* _destip,int _destport)
{//close udp socketif (type == SOCKETTYPE_UDP){uv_close((uv_handle_t*)&udpServer, on_closeserver);return 0;}else if (type == SOCKETTYPE_TCPSERVER){if (m_UvSocketItemMap.size() > 0){uv_mutex_lock(&mutex_sockets_);//close all tcp clientTUvSocketItem *pSocketItem = NULL;UVSOCKETMAP_IT it;for (it=m_UvSocketItemMap.begin(); it != m_UvSocketItemMap.end(); ++it){pSocketItem = (TUvSocketItem *)it->second;if (pSocketItem != NULL){if (pSocketItem->socktype == SOCKETTYPE_TCPSERVER){uv_close((uv_handle_t*)pSocketItem->sockethandle, on_close);  if (pSocketItem->pTcpRecvData != NULL){SAFE_DELETE(pSocketItem->pTcpRecvData);}}}}uv_mutex_unlock(&mutex_sockets_);}printf(">>>>>tcpServer.io_watcher.fd:%d\n", tcpServer.io_watcher.fd);uv_close((uv_handle_t*)&tcpServer, on_closeserver);  //close(tcpServer.io_watcher.fd);}return 0;	
}bool CUvSocketTransMgr::run(int status)
{printf("server runing.\n");int iret = uv_run(loop, (uv_run_mode)status);if (iret) {printf("%s: uv_run failed iret:%s\n", __FUNCTION__, uv_err_name(iret));return false;}return true;
}void CUvSocketTransMgr::StartThread(void* arg)
{CUvSocketTransMgr* socketmgr = (CUvSocketTransMgr*)arg;socketmgr->run();
}void CUvSocketTransMgr::sip_tcp_sticky_packet(TUvSocketItem * pSocketItem, const char *pData, int len, const char *pSrcIP, int SrcPort,const char* pDestIP,int DestPort)
{if(len > 0){char *end_sip = NULL;char *cl_header = NULL;int cl_size = 0;if(pSocketItem->pTcpRecvData != NULL){/* concat old data with new data */pSocketItem->pTcpRecvData = (char *)realloc(pSocketItem->pTcpRecvData, pSocketItem->nTcpRecvLen+len+1);if(NULL == pSocketItem->pTcpRecvData){printf("realloc pTcpRecvData failed!\n");pSocketItem->nTcpRecvLen = 0;return;}strncpy(pSocketItem->pTcpRecvData+pSocketItem->nTcpRecvLen, pData, len);pSocketItem->pTcpRecvData[pSocketItem->nTcpRecvLen+len] = '\0';pSocketItem->nTcpRecvLen = pSocketItem->nTcpRecvLen + len;}if(NULL == pSocketItem->pTcpRecvData){pSocketItem->pTcpRecvData = (char *)malloc(len+1);if(pSocketItem->pTcpRecvData != NULL){memset(pSocketItem->pTcpRecvData, 0, len+1); }strncpy(pSocketItem->pTcpRecvData, pData, len);pSocketItem->pTcpRecvData[len] = '\0';	pSocketItem->nTcpRecvLen = len;}end_sip = strstr(pSocketItem->pTcpRecvData, "\r\n\r\n");while(end_sip != NULL){cl_header = mystrcasestr(pSocketItem->pTcpRecvData, "\ncontent-length ");if (cl_header == NULL || cl_header > end_sip)cl_header = mystrcasestr(pSocketItem->pTcpRecvData,"\ncontent-length:");if (cl_header == NULL || cl_header > end_sip)cl_header =mystrcasestr(pSocketItem->pTcpRecvData,"\r\nl ");if (cl_header == NULL || cl_header > end_sip)cl_header =mystrcasestr(pSocketItem->pTcpRecvData,"\r\nl:");if (cl_header != NULL && cl_header < end_sip)cl_header = strchr(cl_header, ':');if (cl_header == NULL || cl_header >= end_sip) {/* remove data up to crlfcrlf and restart */memmove(pSocketItem->pTcpRecvData, end_sip+4,pSocketItem->nTcpRecvLen -(end_sip + 4 -pSocketItem->pTcpRecvData) + 1);pSocketItem->nTcpRecvLen = pSocketItem->nTcpRecvLen - (end_sip +4 -pSocketItem->pTcpRecvData);pSocketItem->pTcpRecvData = (char *)realloc(pSocketItem->pTcpRecvData, pSocketItem->nTcpRecvLen+1);if(pSocketItem->pTcpRecvData == NULL){pSocketItem->nTcpRecvLen = 0;break;}end_sip =strstr(pSocketItem->pTcpRecvData,"\r\n\r\n");continue;	/* and restart from new CRLFCRLF */}/* header content-length was found before CRLFCRLF -> all headers are available */cl_header++;	/* after ':' char */cl_size = atoi(cl_header);if (cl_size == 0|| (cl_size >0 && (end_sip + 4 + cl_size <=pSocketItem->pTcpRecvData+pSocketItem->nTcpRecvLen))){uv_mutex_lock(&mutex_data_);RecvPacketCallBack(pSocketItem->socketfd,pSocketItem->pTcpRecvData, (end_sip + 4 + cl_size -pSocketItem->pTcpRecvData),\pSrcIP,SrcPort,pDestIP,DestPort, true);	uv_mutex_unlock(&mutex_data_);if (pSocketItem->nTcpRecvLen -(end_sip + 4 + cl_size -pSocketItem->pTcpRecvData) == 0) {end_sip = NULL;pSocketItem->nTcpRecvLen = 0;free(pSocketItem->pTcpRecvData);pSocketItem->pTcpRecvData = NULL;continue;}memmove(pSocketItem->pTcpRecvData,end_sip + 4 + cl_size, pSocketItem->nTcpRecvLen -(end_sip + 4 + cl_size -pSocketItem->pTcpRecvData) + 1);pSocketItem->nTcpRecvLen = pSocketItem->nTcpRecvLen - (end_sip +4 +cl_size -pSocketItem->pTcpRecvData);pSocketItem->pTcpRecvData = (char *)realloc(pSocketItem->pTcpRecvData,pSocketItem->nTcpRecvLen + 1);if (pSocketItem->pTcpRecvData == NULL){printf("realloc  pSocketItem->pTcpRecvDatapSocketItem->pTcpRecvDatapSocketItem->pTcpRecvData!!\n");pSocketItem->nTcpRecvLen = 0;break;}end_sip =strstr(pSocketItem->pTcpRecvData,"\r\n\r\n");continue;	/* and restart from new CRLFCRLF */}/* uncomplete SIP message */end_sip = NULL;		}if (pSocketItem->nTcpRecvLen== 0) {/* all data consumed are reallocation error ? */return;}}else if(len< 0){printf("Could not read socket (%d)- close it\n",pSocketItem->socketfd);//DelSocketItem(pSocketItem);//close(pSocketItem->sock);}else if (len == 0){printf("End of stream (read 0 byte from %s:%i)\n", pSocketItem->srcip, pSocketItem->srcport);//DelSocketItem(pSocketItem);//close(pSocketItem->sock);}else{printf("Dummy SIP message received (size=%d)\n", len);}}int CUvSocketTransMgr::SendPacket(int sock,const char *pData, int Datalen,const char *pSrcIP, int SrcPort,const char* pDestIP,int DestPort, bool bTcpSend)
{if (sock <= 0){struct sockaddr_in to;    bzero(&to, sizeof(struct sockaddr_in) );to.sin_family = AF_INET;to.sin_addr.s_addr = inet_addr(pDestIP);to.sin_port = htons(DestPort);int len = sendto(udpServer.io_watcher.fd, (char*)pData, Datalen, 0, (struct sockaddr *)&to, sizeof(struct sockaddr_in) );if (len != Datalen){printf("sendto <%s:%d> error: len = %d, datalen = %d! \n", pDestIP, DestPort, len, Datalen);}return len;}int iret = -1;TUvSocketItem *pSocketItem = FindAUvSocketItem(sock);if (pSocketItem != NULL){if (pSocketItem->socktype == SOCKETTYPE_UDP)	{
#if 1	struct sockaddr_in to;bzero(&to, sizeof(struct sockaddr_in) );to.sin_family = AF_INET;to.sin_addr.s_addr = inet_addr(pDestIP);to.sin_port = htons(DestPort);int len = sendto(sock, (char*)pData, Datalen, 0, (struct sockaddr *)&to, sizeof(struct sockaddr_in) );if (len != Datalen){printf("sendto <%s:%d> error: len = %d, datalen = %d! \n", pDestIP, DestPort, len, Datalen);}return len;	
#else	//send udpstruct sockaddr_in addr;iret = uv_ip4_addr(pDestIP, DestPort, &addr);if (iret){printf("%s: uv_ip4_addr failed iret:%s\n", __FUNCTION__, uv_err_name(iret));return -1;}uv_udp_send_t* req;req = (uv_udp_send_t *)malloc(sizeof(*req));if (req == NULL){return -1;}uv_buf_t sndbuf = uv_buf_init((char *)pData, Datalen);	iret = uv_udp_send(req, &udpServer, &sndbuf, 1, (struct sockaddr*)&addr, on_send);if (iret){printf("%s: uv_udp_send failed iret:%s\n", __FUNCTION__, uv_err_name(iret));return -1;}
#endif	}else if (pSocketItem->socktype == SOCKETTYPE_TCPSERVER){
#if 1		int len = send(sock, pData, Datalen, 0);return len;
#else			write_req_t *wr;wr = (write_req_t*) malloc(sizeof *wr);if (wr == NULL){printf("%s:%d write_req_t malloc failed!\n", __FUNCTION__, __LINE__);return -1;}wr->buf.base = (char *)malloc(Datalen + 1);wr->buf.len = Datalen;memcpy(wr->buf.base, pData, Datalen);wr->buf.base[Datalen] = '\0';	//wr->buf = uv_buf_init((char *)pData, Datalen);iret = uv_write(&wr->req, (uv_stream_t *)pSocketItem->sockethandle, &wr->buf, 1, after_write);if (iret) {printf("%s: uv_write failed iret:%s\n", __FUNCTION__, uv_err_name(iret));return -1;}
#endif			}}return iret;
}TUvSocketItem *CUvSocketTransMgr::OpenAUvSocketItem(int socketfd, SOCKETTYPE_E type, char *srcip, int srcport, char *destip, int destport, void *handle)
{uv_mutex_lock(&mutex_sockets_);TUvSocketItem *pSocketItem = NULL;UVSOCKETMAP_IT it;it = m_UvSocketItemMap.find(socketfd);if (it != m_UvSocketItemMap.end()){pSocketItem = (TUvSocketItem *)it->second;pSocketItem->socketfd = socketfd;pSocketItem->socktype = type;if (srcip != NULL)strncpy2(pSocketItem->srcip, srcip, IPSTR_MAX_LEN); pSocketItem->srcport = srcport;if (destip != NULL)strncpy2(pSocketItem->destip, destip, IPSTR_MAX_LEN);  pSocketItem->destport = destport;pSocketItem->sockethandle = handle;printf("%s: find a uvsocket socket:%d, socktype:%d, srcip:%s, srcport:%d, destip:%s, destport:%d, sockethandle:%p", __FUNCTION__, \socketfd, type, srcip, srcport, destip, destport, handle);	uv_mutex_unlock(&mutex_sockets_);return pSocketItem;}pSocketItem = new TUvSocketItem();pSocketItem->socketfd = socketfd;pSocketItem->socktype = type;if (srcip != NULL)strncpy2(pSocketItem->srcip, srcip, IPSTR_MAX_LEN); pSocketItem->srcport = srcport;if (destip != NULL)strncpy2(pSocketItem->destip, destip, IPSTR_MAX_LEN);  pSocketItem->destport = destport;pSocketItem->sockethandle = handle;m_UvSocketItemMap.insert(UVSOCKETMAP_VT(socketfd, pSocketItem));printf("%s: open a uvsocket socket:%d, socktype:%d, srcip:%s, srcport:%d, destip:%s, destport:%d, sockethandle:%p", __FUNCTION__, \socketfd, type, srcip, srcport, destip, destport, handle);uv_mutex_unlock(&mutex_sockets_);return pSocketItem;
}TUvSocketItem *CUvSocketTransMgr::FindAUvSocketItem(int socketfd)
{uv_mutex_lock(&mutex_sockets_);UVSOCKETMAP_IT it;it = m_UvSocketItemMap.find(socketfd);if (it != m_UvSocketItemMap.end()){uv_mutex_unlock(&mutex_sockets_);return (TUvSocketItem *)it->second;}uv_mutex_unlock(&mutex_sockets_);return NULL;
}void CUvSocketTransMgr::RemoveAUvSocketItem(int socketfd)
{uv_mutex_lock(&mutex_sockets_);UVSOCKETMAP_IT it;it = m_UvSocketItemMap.find(socketfd);if (it == m_UvSocketItemMap.end()){uv_mutex_unlock(&mutex_sockets_);return;}TUvSocketItem *pSocketItem = it->second;if (pSocketItem->pTcpRecvData != NULL){SAFE_DELETE(pSocketItem->pTcpRecvData);}SAFE_DELETE(it->second);m_UvSocketItemMap.erase(it);uv_mutex_unlock(&mutex_sockets_);return;
}void CUvSocketTransMgr::RemoveAllUvSocketItem(void)
{printf(">>>>>remove all socket item:%d\n", (int)m_UvSocketItemMap.size());uv_mutex_lock(&mutex_sockets_);if (m_UvSocketItemMap.size() <= 0){uv_mutex_unlock(&mutex_sockets_);return;}UVSOCKETMAP_IT it;for (it=m_UvSocketItemMap.begin(); it != m_UvSocketItemMap.end();){SAFE_DELETE(it->second);m_UvSocketItemMap.erase(it++);}uv_mutex_unlock(&mutex_sockets_);return;
}

使用方法:

启动接口,支持udp和tcpserver:

bool BindSocket(const char _srcip[], int _srcport, SOCKETTYPE_E type, char* _destip = NULL, int _destport = 0);

关闭socket:
int MoveSocket(const char _srcip[],int _srcport,SOCKETTYPE_E type,const char* _destip,int _destport);
将数据回调出上层接口:
virtual int RecvPacketCallBack(int sock,const char *pData, int len,const char *pSrcIP, int SrcPort,const char* pDestIP,int DestPort, bool bTcpRecv) = 0;

发送数据:
int SendPacket(int sock,const char *pData, int Datalen,const char *pSrcIP, int SrcPort,const char* pDestIP,int DestPort, bool bTcpSend);




这篇关于视频监听安防平台-libuv库通信协议封装-支持udp和tcpserver同时使用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/631601

相关文章

流媒体平台/视频监控/安防视频汇聚EasyCVR播放暂停后视频画面黑屏是什么原因?

视频智能分析/视频监控/安防监控综合管理系统EasyCVR视频汇聚融合平台,是TSINGSEE青犀视频垂直深耕音视频流媒体技术、AI智能技术领域的杰出成果。该平台以其强大的视频处理、汇聚与融合能力,在构建全栈视频监控系统中展现出了独特的优势。视频监控管理系统EasyCVR平台内置了强大的视频解码、转码、压缩等技术,能够处理多种视频流格式,并以多种格式(RTMP、RTSP、HTTP-FLV、WebS

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

Makefile简明使用教程

文章目录 规则makefile文件的基本语法:加在命令前的特殊符号:.PHONY伪目标: Makefilev1 直观写法v2 加上中间过程v3 伪目标v4 变量 make 选项-f-n-C Make 是一种流行的构建工具,常用于将源代码转换成可执行文件或者其他形式的输出文件(如库文件、文档等)。Make 可以自动化地执行编译、链接等一系列操作。 规则 makefile文件

使用opencv优化图片(画面变清晰)

文章目录 需求影响照片清晰度的因素 实现降噪测试代码 锐化空间锐化Unsharp Masking频率域锐化对比测试 对比度增强常用算法对比测试 需求 对图像进行优化,使其看起来更清晰,同时保持尺寸不变,通常涉及到图像处理技术如锐化、降噪、对比度增强等 影响照片清晰度的因素 影响照片清晰度的因素有很多,主要可以从以下几个方面来分析 1. 拍摄设备 相机传感器:相机传

综合安防管理平台LntonAIServer视频监控汇聚抖动检测算法优势

LntonAIServer视频质量诊断功能中的抖动检测是一个专门针对视频稳定性进行分析的功能。抖动通常是指视频帧之间的不必要运动,这种运动可能是由于摄像机的移动、传输中的错误或编解码问题导致的。抖动检测对于确保视频内容的平滑性和观看体验至关重要。 优势 1. 提高图像质量 - 清晰度提升:减少抖动,提高图像的清晰度和细节表现力,使得监控画面更加真实可信。 - 细节增强:在低光条件下,抖

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

如何解决线上平台抽佣高 线下门店客流少的痛点!

目前,许多传统零售店铺正遭遇客源下降的难题。尽管广告推广能带来一定的客流,但其费用昂贵。鉴于此,众多零售商纷纷选择加入像美团、饿了么和抖音这样的大型在线平台,但这些平台的高佣金率导致了利润的大幅缩水。在这样的市场环境下,商家之间的合作网络逐渐成为一种有效的解决方案,通过资源和客户基础的共享,实现共同的利益增长。 以最近在上海兴起的一个跨行业合作平台为例,该平台融合了环保消费积分系统,在短

pdfmake生成pdf的使用

实际项目中有时会有根据填写的表单数据或者其他格式的数据,将数据自动填充到pdf文件中根据固定模板生成pdf文件的需求 文章目录 利用pdfmake生成pdf文件1.下载安装pdfmake第三方包2.封装生成pdf文件的共用配置3.生成pdf文件的文件模板内容4.调用方法生成pdf 利用pdfmake生成pdf文件 1.下载安装pdfmake第三方包 npm i pdfma