Socket编程---TCP篇

2024-09-04 12:20
文章标签 编程 tcp socket

本文主要是介绍Socket编程---TCP篇,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

一.   TCP协议

二.   服务端模块代码实现

三.   服务端调用模块代码实现

四.   客户端模块代码实现

五.   初始版本结果展示

六.   多进程版服务端

七.   多线程版服务端

八.   线程池版服务端


前文已经讲了UDP的知识(点此查看)。今天来讲讲TCP。

一.   TCP协议

何为TCP协议的含义,之前粗略提及了一下TCP与UDP的区别:

TCP:

• 传输层协议

• 有连接

• 可靠传输

• 面向字节流

UDP:

• 传输层协议

• 无连接

• 不可靠传输

• 面向数据报

那何为可靠,何为不可靠呢?

 TCP协议是有连接的。如果两台主机想要建立通信,就必须先建立连接,通过三次握手(后续博客会讲到)建立连接,只有当连接成功后,才能进行通信。

TCP可靠性体现在:如果数据在传输过程中出现了丢包等等情况,会有相应的解决方法。

TCP可靠性实现方法:

  1. 确认和重传: TCP 使用确认和重传机制来确保数据的完整性和可靠性。接收方会发送确认(ACK)给发送方,告知已成功接收到数据,如果发送方未收到确认,会重新发送数据。
  2. 序号和顺序控制: TCP 会为每个数据段分配一个序号,并且在接收端按序重组数据,以确保数据包按正确的顺序交付。
  3. 流量控制: TCP 使用滑动窗口协议进行流量控制,确保发送方和接收方之间的数据传输速率合理,避免了数据包的过载和丢失。
  4. 拥塞控制: TCP 还实现了拥塞控制机制,通过动态调整发送速率来避免网络拥塞,以提高整体网络性能和稳定性。

但是,并不是说,TCP就是百利而无一害的。前面说了,TCP还有一个特性---面向字节流,这就导致了,目标主机读取到的内容可能并不是完整的源主机发送的内容。此处来填补这个知识。

由于TCP是面向字节流的,所以可能会产生粘包问题

顾名思义,粘包问题指的是多条数据黏在了一起,被当做一条数据,造成读取错误。

其本质就是tcp在传输层对应用数据边界不敏感(不关注应用层数据边界)。

因此需要程序猿在应用层进行数据边界管理。如何管理呢?

  • 特殊字符间隔: 使用此方法则必须对数据中的特殊字符进行转义,否则会造成二义
  • 数据定长:规定固定长度的数据,实际数据少的则需要进行补位
  • 在应用层头部定义数据长度(例如http协议,udp协议,先取头部,再根据头部中的数据长度取出数据) 

二.   服务端模块代码实现

同样我们将服务端封装成一个类。

const static int defaultsockfd=-1;class TcpServer
{
public:TcpServer(int port):_port(port),_listensock(defaultsockfd),_isrunning(false){}~TcpServer(){if(_listensock>defaultsockfd){::close(_listensock);}}
private:uint16_t _port;int _listensock;bool _isrunning;
};

前两个成员不必多说,即端口号套接字。我们还加了一个bool类型的变量,表示服务器是否在运行,那我们每次启动服务器的时候就应该判断_isrunning==true吗?只有为真才能启动服务器!

与UDP一样,服务器需要一个初始化函数。

服务端初始成员函数:

enum
{SOCKET_ERROR = 1,BIND_ERROR,LISTEN_ERROR,USAGE_ERROR
};const static int gbacklog=10;void InitServer()
{//1.创建流式套接字_listensock=::socket(AF_INET,SOCK_STREAM,0);if(_listensock<0){LOG(FATAL,"socket error\n");exit(SOCKET_ERROR);}LOG(DEBUG,"socket create success, sockfd is: %d\n",_listensock);//2.bindstruct sockaddr_in local;memset(&local,0,sizeof(local));local.sin_family=AF_INET;local.sin_port=htons(_port);local.sin_addr.s_addr=INADDR_ANY;int n=::bind(_listensock,(struct sockaddr*)&local,sizeof(local));if(n<0){LOG(FATAL,"bind error\n");exit(BIND_ERROR);}LOG(DEBUG,"bind success,sockfd is: %d\n",_listensock);//3.tcp是面向连接的,所以通信之前,必须先建立连接,服务器是被连接的//tcpserver启动,未来首先要一直等待客户的连接到来n=::listen(_listensock,gbacklog);if(n<0){LOG(FATAL,"listen error\n");exit(LISTEN_ERROR);}LOG(DEBUG,"listen success,sockfd is: %d\n",_listensock);}

LOG函数是我们实现的日志功能函数。

#pragma once//日志
#include<iostream>
#include<fstream>
#include<cstdio>
#include<string>
#include<ctime>
#include<unistd.h>
#include<sys/types.h>
#include<stdarg.h>
#include<pthread.h>
#include"LockGuard.hpp"using namespace std;bool gIsSave=false;
const string logname="log.txt";void SaveFile(const string& filename,const string& message)
{ofstream out(filename,ios::app);if(!out.is_open()){return;}out<<message;out.close();
}//1.日志是有等级的
enum Level
{DEBUG=0,INFO,WARNING,ERROR,FATAL
};string LevelToString(int level)
{switch(level){case DEBUG: return "Debug";break;case INFO: return "Info";break;case WARNING: return "Warning";break;case ERROR: return "Error";break;case FATAL: return "Fatal";break;default: return "Unknown";break;}
}string GetTimeString()
{time_t curr_time=time(nullptr);struct tm* format_time=localtime(&curr_time);if(format_time==nullptr) return "None";char time_buffer[64];snprintf(time_buffer,sizeof(time_buffer),"%d-%d-%d %d:%d:%d",format_time->tm_year+1900,format_time->tm_mon+1,format_time->tm_mday,format_time->tm_hour,format_time->tm_min,format_time->tm_sec);return time_buffer;
}pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;//2.日志是由格式的
// 日志等级 时间 代码所在的文件名/行数 日志的内容...
void LogMessage(string filename,int line,bool issave,int level,const char* format,...)
{string levelstr=LevelToString(level);string timestr=GetTimeString();pid_t selfid=getpid();//可变参数部分处理char buffer[1024];va_list arg;va_start(arg,format);vsnprintf(buffer,sizeof(buffer),format,arg);va_end(arg);LockGuard lockguard(&lock);string message;message="["+timestr+"]"+"["+levelstr+"]"+"[pid: "+to_string(selfid)+"]"+"["+filename+"]"+"["+to_string(line)+"]"+buffer+"\n";if(!issave){cout<<message;}else{SaveFile(logname,message);}
}void Test(int num,...)
{va_list arg;va_start(arg,num);while(true){int data=va_arg(arg,int);cout<<"data: "<<data<<endl;num--;}va_end(arg);//arg==NULL
}//C99新特性 __VA_ARGS__
#define LOG(level,format,...) do {LogMessage(__FILE__,__LINE__,gIsSave,level,format,##__VA_ARGS__);} while(0)
#define EnableFile() do {gIsSave=true;} while(0)
#define EnableScreen() do {gIsSave=false;} while(0)

可以看见前面两部跟UDP是大差不差的,都是创建套接字,然后绑定结构体信息。而我们的TCP还多了一步,就是需要先通过listen函数建立连接,因为TCP是面向连接的,所以通信之前必须先建立连接。

先来介绍一下listen接口。

int listen(int sockfd, int backlog);

参数介绍: 

  • sockfd:即开始创建的套接字。
  • backlog:定义sockfd的挂起连接队列可能增长到的最大长度,通俗来讲就是最多可以有多少字节。如果超过了这个长度,就会发生错误。

返回值介绍:

 成功的话,0会被返回;失败1会被返回,错误码也会被设置。

 服务端运行成员函数:

我们先来看看初始版本 ,下面会讲解其他版本。

void Service(int sockfd,InetAddr client)
{LOG(DEBUG,"get a new link,info %s:%d,fd: %d\n",client.Ip().c_str(),client.Port(),sockfd);string clientaddr="["+client.Ip()+":"+to_string(client.Port())+"]# ";while(true){char inbuffer[1024];//1. tcp面向字节流,不能保证inbuffer,是一个完整的命令字符串,例如"ls -a -l",可能只读到ls -a//所以对于tcp,我们必须保证在应用层面我们收到的是一个完整的请求ssize_t n=read(sockfd,inbuffer,sizeof(inbuffer)-1);//bugif(n>0){inbuffer[n]=0;cout<<clientaddr<<inbuffer<<endl;string echo_string="[server echo]# ";echo_string+=inbuffer;write(sockfd,echo_string.c_str(),echo_string.size());}else if(n==0){//client退出&&关闭连接了LOG(INFO,"%s quit\n",clientaddr.c_str());break;}else{LOG(ERROR,"read error\n",clientaddr.c_str());break;}}cout<<"server开始退出"<<endl;shutdown(sockfd,SHUT_RD);cout<<"shut _ rd"<<endl;//::close(sockfd);//不关闭会发生文件描述符泄露
}void Loop()//服务端运行函数
{_isrunning=true;//4.不能直接接收数据,应该先获取连接while(true){struct sockaddr_in peer;socklen_t len=sizeof(peer);int sockfd=::accept(_listensock,(struct sockaddr*)&peer,&len);//注意此处accept返回值也是一个文件描述符,要区分于_listensockif(sockfd<0){LOG(WARNING,"accept error\n");//accept失败了,并不影响,再次accept即可continue;}//Version 0Service(sockfd,InetAddr(peer));}_isrunning=false;
}

InetAddr是我们封装的类。

#include<iostream>
#include<string>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>using namespace std;class InetAddr
{
private:void GetAddress(string* ip,uint16_t* port){*port=ntohs(_addr.sin_port);//网络字节序转为主机字节序*ip=inet_ntoa(_addr.sin_addr);//将网络字节序IP转为点分式十进制IP}
public:InetAddr(const struct sockaddr_in &addr):_addr(addr){GetAddress(&_ip,&_port);}string Ip(){return _ip;}uint16_t Port(){return _port;}~InetAddr(){}
private:struct sockaddr_in _addr;string _ip;uint16_t _port;
};

先来讲讲服务端运行函数---Loop,此处我们不能像UDP一样直接read接收数据,而应该获取连接。再次说明了TCP是有连接的。

先来介绍接收函数accept

int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);

参数介绍: 

  • sockfd:即开始创建的套接字。
  • addr:指向sockaddr结构体。这个结构体的信息用接收到的客户端的信息填充(端口号、协议簇等等)
  • addrlen:前面addr指向的结构的长度取地址。

返回值介绍:

成功的话,会返回一个文件描述符;失败则会返回-1,错误码会被设置。

此处会有读者疑惑,为什么又返回了一个文件描述符,那之前TcpServer类里面的成员_listensock又跟这个什么关系呢?下面来举个例子:

假如,今天天气不错,读者大大出去逛着街,饭点了,刚好经过一家餐馆,门口有一位服务员小A,他看见你,就来招呼你进去吃饭,你也同意了。然后服务员小A领着你进门,朝里面喊了一句:客人一位,然后后厨出来了一个服务员小B,就领着你到餐桌,让你点菜。小A又继续出去询问路过的人吃不吃饭。

在这个故事中,服务员小A就是我们的TcpServer类中的成员_listensock,服务员小B就是我们的accept接口返回的文件描述符。

所以我们的Loop函数中,如果accept函数接受失败了,那么也无所谓,只需不用做处理,下一次继续接收就行。

三.   服务端调用模块代码实现

只需创建出TcpServer对象,然后依次调用初始化函数InitServer和Loop函数即可。

#include"TcpServer.hpp"
#include<memory>using namespace std;void Usage(string proc)
{cout<<"Usage:\n\t"<<proc<<" local_port\n"<<endl;
}// ./tcpserver port
int main(int argc,char *argv[])
{if(argc!=2){Usage(argv[0]);return 1;}EnableScreen();uint16_t port=stoi(argv[1]);unique_ptr<TcpServer> tsvr=make_unique<TcpServer>(port);tsvr->InitServer();tsvr->Loop();return 0;
}

TcpServer.hpp文件即我们的服务端代码。 

四.   客户端模块代码实现

#include<iostream>
#include<string>
#include<unistd.h>
#include<cstring>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>#include"Log.hpp"using namespace std;void Usage(string proc)
{cout<<"Usage:\n\t"<<proc<<" serverip serverport\n"<<endl;
}// ./tcpclient serverip serverport
int main(int argc,char *argv[])
{if(argc!=3){Usage(argv[0]);exit(1);}string serverip=argv[1];uint16_t serverport=stoi(argv[2]);int sockfd=::socket(AF_INET,SOCK_STREAM,0);if(sockfd<0){cerr<<"socket error\n"<<endl;exit(2);}//与udpclient一样,不需显式bind//构建目标主机的socket信息struct sockaddr_in server;memset(&server,0,sizeof(server));server.sin_family=AF_INET;server.sin_port=htons(serverport);server.sin_addr.s_addr=inet_addr(serverip.c_str());int n=connect(sockfd,(struct sockaddr*)&server,sizeof(server));if(n<0){cerr<<"connect error"<<endl;exit(3);}while(true){cout<<"Please Enter# ";string outstring;getline(cin,outstring);ssize_t s=send(sockfd,outstring.c_str(),outstring.size(),0);//也可以用writeif(s>0){char inbuffer[1024];ssize_t m=recv(sockfd,inbuffer,sizeof(inbuffer)-1,0);//也可以用readif(m>0){inbuffer[m]=0;cout<<inbuffer<<endl;}else{break;}}else{break;}}shutdown(sockfd,SHUT_WR);//close(sockfd);return 0;
}

前面两步依旧与UDP一样,都是创建出套接字,然后绑定结构体信息。由于TCP是有连接的,所以第三步需要调用connect接口进行连接。

下面先来讲讲connect接口:

int connect(int sockfd, const struct sockaddr *addr,socklen_t addrlen);

参数介绍: 

  • sockfd:即开始创建的套接字。
  • addr:指向sockaddr结构体。这个结构体的信息用接收到的客户端的信息填充(端口号、协议簇等等)
  • addrlen:前面addr指向的结构的长度取地址。

返回值介绍:

成功的话,会返回0;失败则会返回-1,错误码会被设置。

后续就是与UDP类似的了,先发出信息,然后接收服务端返回回来的信息。

五.   初始版本结果展示

首先,可以实现要求。但是既然我们说他是初始版本,那肯定说明还有更优的版本。我们来看看这个版本有什么缺陷。

当上升到多台客户端的时候,发现只有一个客户端能与服务器通信。

这就是我们这个版本的缺陷:一次只能处理一个请求。

为什么呢?因为是单进程的,所以服务器只能容许 一个客户端与其通信。

那为什么前面UDP单进程的时候,还是能实现多个客户端通信呢?

因为前面UDP只有一个sockfd,此处涉及多个sockfd。

所以自然而然的想到了第二个版本----采用多进程来实现服务端

六.   多进程版服务端

由于前面单进程版服务端的问题,所以来介绍多进程版的服务端。 

void Service(int sockfd,InetAddr client)
{LOG(DEBUG,"get a new link,info %s:%d,fd: %d\n",client.Ip().c_str(),client.Port(),sockfd);string clientaddr="["+client.Ip()+":"+to_string(client.Port())+"]# ";while(true){char inbuffer[1024];//1. tcp面向字节流,不能保证inbuffer,是一个完整的命令字符串,例如"ls -a -l",可能只读到ls -a//所以对于tcp,我们必须保证在应用层面我们收到的是一个完整的请求ssize_t n=read(sockfd,inbuffer,sizeof(inbuffer)-1);//bugif(n>0){inbuffer[n]=0;cout<<clientaddr<<inbuffer<<endl;string echo_string="[server echo]# ";echo_string+=inbuffer;write(sockfd,echo_string.c_str(),echo_string.size());}else if(n==0){//client退出&&关闭连接了LOG(INFO,"%s quit\n",clientaddr.c_str());break;}else{LOG(ERROR,"read error\n",clientaddr.c_str());break;}}cout<<"server开始退出"<<endl;shutdown(sockfd,SHUT_RD);cout<<"shut _ rd"<<endl;//::close(sockfd);//不关闭会发生文件描述符泄露
}void Loop()
{_isrunning=true;//4.不能直接接收数据,应该先获取连接while(true){struct sockaddr_in peer;socklen_t len=sizeof(peer);int sockfd=::accept(_listensock,(struct sockaddr*)&peer,&len);//注意此处accept返回值也是一个文件描述符,要区分于_listensockif(sockfd<0){LOG(WARNING,"accept error\n");//accept失败了,并不影响,再次accept即可continue;}pid_t id=fork();if(id==0){//child:关心sockfd,不关心listensock::close(_listensock);//不需要listensock,建议关掉,不影响父进程if(fork()>0) exit(0);Service(sockfd,InetAddr(peer));//孙子进程--孤儿进程--系统领养exit(0);}//father:关心listensock,不关心sockfd::close(sockfd);//不需要sockfd,建议关掉waitpid(id,nullptr,0);}_isrunning=false;
}

 Service函数还是跟前面一样,重点是Loop函数

我们发现多进程用了两次fork函数创建子进程。我们来着重研究一下。

pid_t id=fork();
if(id==0)
{//child:关心sockfd,不关心listensock::close(_listensock);//不需要listensock,建议关掉,不影响父进程if(fork()>0) exit(0);Service(sockfd,InetAddr(peer));//孙子进程--孤儿进程--系统领养exit(0);}
//father:关心listensock,不关心sockfd
::close(sockfd);//不需要sockfd,建议关掉
waitpid(id,nullptr,0);

来看看最后的效果吧:

首先单客户端仍然能进行通信,那多个呢?

可以看到仍然能正常通信。

既然多进程可以实现,猜想一下多线程是不是也能实现呢?结果是显然的。

七.   多线程版服务端

struct ThreadData
{
public:ThreadData(int fd,InetAddr addr,TcpServer* s):sockfd(fd),clientaddr(addr),self(s){}
public:int sockfd;InetAddr clientaddr;TcpServer* self;
};void Service(int sockfd,InetAddr client)
{LOG(DEBUG,"get a new link,info %s:%d,fd: %d\n",client.Ip().c_str(),client.Port(),sockfd);string clientaddr="["+client.Ip()+":"+to_string(client.Port())+"]# ";while(true){char inbuffer[1024];//1. tcp面向字节流,不能保证inbuffer,是一个完整的命令字符串,例如"ls -a -l",可能只读到ls -a//所以对于tcp,我们必须保证在应用层面我们收到的是一个完整的请求ssize_t n=read(sockfd,inbuffer,sizeof(inbuffer)-1);//bugif(n>0){inbuffer[n]=0;cout<<clientaddr<<inbuffer<<endl;string echo_string="[server echo]# ";echo_string+=inbuffer;write(sockfd,echo_string.c_str(),echo_string.size());}else if(n==0){//client退出&&关闭连接了LOG(INFO,"%s quit\n",clientaddr.c_str());break;}else{LOG(ERROR,"read error\n",clientaddr.c_str());break;}}cout<<"server开始退出"<<endl;shutdown(sockfd,SHUT_RD);cout<<"shut _ rd"<<endl;//::close(sockfd);//不关闭会发生文件描述符泄露
}static void* HandlerSock(void* args)
{pthread_detach(pthread_self());ThreadData* td=static_cast<ThreadData*>(args);td->self->Service(td->sockfd,td->clientaddr);delete td;return nullptr;
}void Loop()
{_isrunning=true;//4.不能直接接收数据,应该先获取连接while(true){struct sockaddr_in peer;socklen_t len=sizeof(peer);int sockfd=::accept(_listensock,(struct sockaddr*)&peer,&len);//注意此处accept返回值也是一个文件描述符,要区分于_listensockif(sockfd<0){LOG(WARNING,"accept error\n");//accept失败了,并不影响,再次accept即可continue;}pthread_t t;ThreadData* td=new ThreadData(sockfd,InetAddr(peer),this);pthread_create(&t,nullptr,HandlerSock,td);//将线程分离,使主线程不用等待}_isrunning=false;
}

此处我们新封装了一个结构体ThreadData,方便处理。

其实大致思路还是不变,只是利用了线程分离,让创造出来的多线程去处理业务,让主线程不需等待。

来看看效果:

也是能实现预期效果的。

我们还可以利用线程池来实现业务处理。

八.   线程池版服务端

using task_t=function<void()>;void Service(int sockfd,InetAddr client)
{LOG(DEBUG,"get a new link,info %s:%d,fd: %d\n",client.Ip().c_str(),client.Port(),sockfd);string clientaddr="["+client.Ip()+":"+to_string(client.Port())+"]# ";while(true){char inbuffer[1024];//1. tcp面向字节流,不能保证inbuffer,是一个完整的命令字符串,例如"ls -a -l",可能只读到ls -a//所以对于tcp,我们必须保证在应用层面我们收到的是一个完整的请求ssize_t n=read(sockfd,inbuffer,sizeof(inbuffer)-1);//bugif(n>0){inbuffer[n]=0;cout<<clientaddr<<inbuffer<<endl;string echo_string="[server echo]# ";echo_string+=inbuffer;write(sockfd,echo_string.c_str(),echo_string.size());}else if(n==0){//client退出&&关闭连接了LOG(INFO,"%s quit\n",clientaddr.c_str());break;}else{LOG(ERROR,"read error\n",clientaddr.c_str());break;}}cout<<"server开始退出"<<endl;shutdown(sockfd,SHUT_RD);cout<<"shut _ rd"<<endl;//::close(sockfd);//不关闭会发生文件描述符泄露
}void Loop()
{_isrunning=true;//4.不能直接接收数据,应该先获取连接while(true){struct sockaddr_in peer;socklen_t len=sizeof(peer);int sockfd=::accept(_listensock,(struct sockaddr*)&peer,&len);//注意此处accept返回值也是一个文件描述符,要区分于_listensockif(sockfd<0){LOG(WARNING,"accept error\n");//accept失败了,并不影响,再次accept即可continue;}task_t t=bind(&TcpServer::Service,this,sockfd,InetAddr(peer));ThreadPool<task_t>::GetInstance()->Enqueue(t);}_isrunning=false;
}

线程池代码为:

#pragma once//单例模式的线程池
#include<iostream>
#include<vector>
#include<queue>
#include<pthread.h>
#include"Thread.hpp"
#include"Log.hpp"
#include"LockGuard.hpp"using namespace std;
using namespace ThreadModule;const static int gdefaultthreadnum=3;template<typename T>
class ThreadPool
{
private:void LockQueue(){pthread_mutex_lock(&_mutex);}void UnLockQueue(){pthread_mutex_unlock(&_mutex);}void ThreadSleep(){pthread_cond_wait(&_cond,&_mutex);}void ThreadWakeup(){pthread_cond_signal(&_cond);}void ThreadWakeAll(){pthread_cond_broadcast(&_cond);}//私有的ThreadPool(int threadnum=gdefaultthreadnum):_threadnum(threadnum),_waitnum(0),_isrunning(false){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_cond,nullptr);LOG(INFO,"ThreadPool Construct()");}void Start(){for(auto& thread:_threads){thread.Start();}}void HandlerTask(string name)//类的成员方法,也可以成为另一个类的回调方法,方便我们继续类级别的互相调用{LOG(INFO,"%s is running\n",name.c_str());while(true){//1.保证队列安全LockQueue();//2.队列中不一定有数据while(_task_queue.empty() && _isrunning){_waitnum++;ThreadSleep();_waitnum--;}//2.1 如果线程池已经退出了 && 任务队列是空的if(_task_queue.empty() && !_isrunning){UnLockQueue();break;}//2.2 如果线程池不退出 && 任务队列不是空的//2.3 如果线程池已经退出 && 任务队列不是空的 --- 处理完所有的任务,然后再退出//3.一定有任务,处理任务T t=_task_queue.front();_task_queue.pop();UnLockQueue();LOG(DEBUG,"%s get a task",name.c_str());//4.处理任务,这个任务属于线程独占的任务,所以不能放在加锁和解锁之间t();//LOG(DEBUG,"%s handler a task,result is: %s",name.c_str(),t.ResultToString());}}void InitThreadPool(){//指向构建出所有的线程,并不自动for(int num=0;num<_threadnum;num++){string name="thread-"+to_string(num+1);_threads.emplace_back(bind(&ThreadPool::HandlerTask,this,placeholders::_1),name);LOG(INFO,"init thread %s done\n",name.c_str());}_isrunning=true;}//复制拷贝禁用ThreadPool<T> &operator=(const ThreadPool<T>&)=delete;ThreadPool(const ThreadPool<T> &)=delete;
public:static ThreadPool<T> *GetInstance(){//如果是多线程获取线程池对象,下面的代码就有问题,所以要加锁//双判断的方式,可以有效减少获取单例的加锁成本,而且保证线程安全if(_instance==nullptr)//只有第一次会创建对象,后续都是获取,这样就不用每次都申请锁{//保证第二次之后,所有线程,不用再加锁,直接返回_instance单例对象LockGuard lockguard(&_lock);if (_instance == nullptr){_instance = new ThreadPool<T>();_instance->InitThreadPool();_instance->Start();LOG(DEBUG, "创建线程池单例\n");}else{LOG(DEBUG, "获取线程池单例\n");}}return _instance;}bool Enqueue(const T& t){bool ret=false;LockQueue();if(_isrunning){   _task_queue.push(t);if(_waitnum>0){ThreadWakeup();}LOG(DEBUG,"enqueue task success\n");ret=true;}UnLockQueue();return ret;}void Stop(){LockQueue();_isrunning=false;ThreadWakeAll();UnLockQueue();}void Wait(){for(auto& thread:_threads){thread.Join();LOG(INFO,"%s is quit",thread.name().c_str());}}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}
private:int _threadnum;vector<Thread> _threads;queue<T> _task_queue;pthread_mutex_t _mutex;pthread_cond_t _cond;int _waitnum;bool _isrunning;//添加单例模式--懒汉static ThreadPool<T> *_instance;static pthread_mutex_t _lock;//保护单例的锁
};template<typename T>
ThreadPool<T> *ThreadPool<T>::_instance=nullptr;template<typename T>
pthread_mutex_t ThreadPool<T>::_lock=PTHREAD_MUTEX_INITIALIZER;

其中Thread.hpp文件是我们封装的原生线程库:

//封装原生线程库#ifndef __THREAD_HPP__
#define __THREAD_HPP__#include<iostream>
#include<string>
#include<functional>
#include<unistd.h>
#include<pthread.h>using namespace std;namespace ThreadModule
{using func_t=function<void(string&)>;class Thread{public:void Excute(){_func(_threadname);}public:Thread(func_t func,const string& name="none-name"):_func(func),_threadname(name),_stop(true){}static void* threadroutine(void* args)//类成员函数,形参是有this指针的!{Thread *self=static_cast<Thread*>(args);self->Excute();return nullptr;}bool Start(){int n=pthread_create(&_tid,nullptr,threadroutine,this);if(!n){_stop=false;return true;}else {return false;}}void Detach(){if(!_stop){pthread_detach(_tid);}}void Join(){if(!_stop){pthread_join(_tid,nullptr);}}string name(){return _threadname;}void Stop(){_stop=true;}~Thread(){}private:pthread_t _tid;string _threadname;func_t _func;bool _stop;};
}#endif

LockGuard.hpp文件是仿照C++RAII(点此查看)思想封装的锁:

#ifndef __lock_GUARD_HPP__
#define __lock_GUARD_HPP__#include<iostream>
#include<pthread.h>class LockGuard
{
public:LockGuard(pthread_mutex_t* mutex):_mutex(mutex){pthread_mutex_lock(_mutex);//构造加锁}~LockGuard(){pthread_mutex_unlock(_mutex);}
private:pthread_mutex_t* _mutex;
};#endif

回归正题,线程池版服务端,就是通过线程池转发任务。

来看看效果:

也符合预期。


总结:

好了,到这里今天的知识就讲完了,大家有错误一点要在评论指出,我怕我一人搁这瞎bb,没人告诉我错误就寄了。

祝大家越来越好,不用关注我(疯狂暗示)

这篇关于Socket编程---TCP篇的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1136011

相关文章

QT实现TCP客户端自动连接

《QT实现TCP客户端自动连接》这篇文章主要为大家详细介绍了QT中一个TCP客户端自动连接的测试模型,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录版本 1:没有取消按钮 测试效果测试代码版本 2:有取消按钮测试效果测试代码版本 1:没有取消按钮 测试效果缺陷:无法手动停

C#反射编程之GetConstructor()方法解读

《C#反射编程之GetConstructor()方法解读》C#中Type类的GetConstructor()方法用于获取指定类型的构造函数,该方法有多个重载版本,可以根据不同的参数获取不同特性的构造函... 目录C# GetConstructor()方法有4个重载以GetConstructor(Type[]

Linux 网络编程 --- 应用层

一、自定义协议和序列化反序列化 代码: 序列化反序列化实现网络版本计算器 二、HTTP协议 1、谈两个简单的预备知识 https://www.baidu.com/ --- 域名 --- 域名解析 --- IP地址 http的端口号为80端口,https的端口号为443 url为统一资源定位符。CSDNhttps://mp.csdn.net/mp_blog/creation/editor

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

【编程底层思考】垃圾收集机制,GC算法,垃圾收集器类型概述

Java的垃圾收集(Garbage Collection,GC)机制是Java语言的一大特色,它负责自动管理内存的回收,释放不再使用的对象所占用的内存。以下是对Java垃圾收集机制的详细介绍: 一、垃圾收集机制概述: 对象存活判断:垃圾收集器定期检查堆内存中的对象,判断哪些对象是“垃圾”,即不再被任何引用链直接或间接引用的对象。内存回收:将判断为垃圾的对象占用的内存进行回收,以便重新使用。

Go Playground 在线编程环境

For all examples in this and the next chapter, we will use Go Playground. Go Playground represents a web service that can run programs written in Go. It can be opened in a web browser using the follow

深入理解RxJava:响应式编程的现代方式

在当今的软件开发世界中,异步编程和事件驱动的架构变得越来越重要。RxJava,作为响应式编程(Reactive Programming)的一个流行库,为Java和Android开发者提供了一种强大的方式来处理异步任务和事件流。本文将深入探讨RxJava的核心概念、优势以及如何在实际项目中应用它。 文章目录 💯 什么是RxJava?💯 响应式编程的优势💯 RxJava的核心概念

函数式编程思想

我们经常会用到各种各样的编程思想,例如面向过程、面向对象。不过笔者在该博客简单介绍一下函数式编程思想. 如果对函数式编程思想进行概括,就是f(x) = na(x) , y=uf(x)…至于其他的编程思想,可能是y=a(x)+b(x)+c(x)…,也有可能是y=f(x)=f(x)/a + f(x)/b+f(x)/c… 面向过程的指令式编程 面向过程,简单理解就是y=a(x)+b(x)+c(x)

Java并发编程之——BlockingQueue(队列)

一、什么是BlockingQueue BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种: 1. 当队列满了的时候进行入队列操作2. 当队列空了的时候进行出队列操作123 因此,当一个线程试图对一个已经满了的队列进行入队列操作时,它将会被阻塞,除非有另一个线程做了出队列操作;同样,当一个线程试图对一个空

【Go】go连接clickhouse使用TCP协议

离开你是傻是对是错 是看破是软弱 这结果是爱是恨或者是什么 如果是种解脱 怎么会还有眷恋在我心窝 那么爱你为什么                      🎵 黄品源/莫文蔚《那么爱你为什么》 package mainimport ("context""fmt""log""time""github.com/ClickHouse/clickhouse-go/v2")func main(