TeamTalk DB_PROXY_SERVER详解

2024-04-11 03:32

本文主要是介绍TeamTalk DB_PROXY_SERVER详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/hailong0715/article/details/53418594

DB_PROXY_Server数据库代理是TeamTalk TTServer中负责与数据库交互的代理服务器,在DB server中负责承载TeamTalk所有业务层面和逻辑层面的数据入库和持久化等服务,是TT_Server中比较重要的一环,在设计中采用了很多实用的技术,比如池化技术,数据库代理,单例模式等,收益匪浅,下面对TealTalk的DB_Server作个详细分析,以封面是自己阅读代码后的学习总结,一方面可以给其他学习TT_Server的人提供参考。

可以通过查看DB_Server的配置文件dbproxyserver.conf文件,DB_SERVER主要分为以下几个部分:

1、TeamTalk_Matser 采用了MYSQL数据库

2、TeamTalk_Slave   MYSQL数据库   (由于水平有限,对数据库的学习还不够深入,尚不清楚为什么需要部署主从服务器)

-------------------------------------------------------------

3、unread 未读信息实例 redis 数据库

4、group_set 群组设置实例 redis数据库

5、token 实例 redis数据库

6、sync实例  redis数据库 同步功能

7、group_member redis 数据库 

每个数据库实例都会预先打开于数据库的两个链接,不需要在每次使用的时候再打开,使用结束后释放,节省了数据打开和释放需要的时间和资源,在当前可用连接数不够的情况下再新增一个数据库链接,动态调节DB_Server的负载,同时限定了每个实例的最大可用连接数,由于系统资源是有限的,当业务比较繁忙时不能无限制创建新的连接,避免耗尽系统资源,这种场景下,当没有可用连接的时候,新的业务请求必须等待,等待可用的连接,然后再执行相应的业务操作。

DB_Server采用了多线程,在DB_Server启动的时候预先分配了配置文件中指定的线程数,用来处理具体的数据库请求,当一个请求到达DB_Server时,DB_Server将该请求封装成DB相关的额任务类,然后随机加入到预先启动的线程的任务列表中,有线程回调函数不停执行具体的任务请求,者就是整个DB_Serve的设计思路。

下面来看代码的流程:

1、CacheManager的初始化

db_proxy_server的main函数中,依次从配置文件中读取各个实例的名称以及初始DB实例个数,最大连接数等信息。

首先获取CacheManager的对象,在获取对象的同时对该对象以及该对象管理的资源进行了初始化。

 

	CacheManager* pCacheManager = CacheManager::getInstance();

 

需要明确的是,cacheManager维护了一个map,map<string, CachePool*>m_cache_pool_map;这个map的key值就是每个配置文件中每个CacheInstances的名称,对应给每一个cache实例维护了一个连接池CachePool, 在CachePool中有维护了一个list<CacheConn*>m_free_list;,这个list保存了对应cache实例的连接,CacheConn是对每个DB连接的封装。这个类中维护了对应数据库连接的一些基本信息,redis数据的话保存的是数据库连接上下文redisContext,以及操作数据库的Set 和Get方法。

创建CacheManager的对象

 

 
  1. CacheManager* CacheManager::getInstance()

  2. {

  3. if (!s_cache_manager) {

  4. s_cache_manager = new CacheManager();

  5. if (s_cache_manager->Init()) {

  6. delete s_cache_manager;

  7. s_cache_manager = NULL;

  8. }

  9. }

  10.  
  11. return s_cache_manager;

  12. }

调用CacheManager的Init的方法,初始化CacheManager管理的资源,在这里就是读取配置文件,根据配置文件中的CacheInstance名称创建对应的实例,并创建相应的连接池对象,将其插入到Manager管理的Map中。

 
  1. int CacheManager::Init()

  2. {

  3. CConfigFileReader config_file("dbproxyserver.conf");

  4.  
  5. char* cache_instances = config_file.GetConfigName("CacheInstances");

  6. if (!cache_instances) {

  7. log("not configure CacheIntance");

  8. return 1;

  9. }

  10.  
  11. char host[64];

  12. char port[64];

  13. char db[64];

  14. char maxconncnt[64];

  15. CStrExplode instances_name(cache_instances, ',');

  16. for (uint32_t i = 0; i < instances_name.GetItemCnt(); i++) {

  17. char* pool_name = instances_name.GetItem(i);

  18. //printf("%s", pool_name);

  19. snprintf(host, 64, "%s_host", pool_name);

  20. snprintf(port, 64, "%s_port", pool_name);

  21. snprintf(db, 64, "%s_db", pool_name);

  22. snprintf(maxconncnt, 64, "%s_maxconncnt", pool_name);

  23.  
  24. char* cache_host = config_file.GetConfigName(host);

  25. char* str_cache_port = config_file.GetConfigName(port);

  26. char* str_cache_db = config_file.GetConfigName(db);

  27. char* str_max_conn_cnt = config_file.GetConfigName(maxconncnt);

  28. if (!cache_host || !str_cache_port || !str_cache_db || !str_max_conn_cnt) {

  29. log("not configure cache instance: %s", pool_name);

  30. return 2;

  31. }

  32.  
  33. CachePool* pCachePool = new CachePool(pool_name, cache_host, atoi(str_cache_port),

  34. atoi(str_cache_db), atoi(str_max_conn_cnt));

  35. if (pCachePool->Init()) {

  36. log("Init cache pool failed");

  37. return 3;

  38. }

  39.  
  40. m_cache_pool_map.insert(make_pair(pool_name, pCachePool));

  41. }

  42.  
  43. return 0;

  44. }

  45.  

根据在配置文件中定义的instanceNane_DB的值创建CacheConn对象,调用其init函数,并将其插入到CachePool管理的空闲链表中。

 

 

 
  1. int CachePool::Init()

  2. {

  3. for (int i = 0; i < m_cur_conn_cnt; i++) {

  4. CacheConn* pConn = new CacheConn(this);

  5. if (pConn->Init()) {

  6. delete pConn;

  7. return 1;

  8. }

  9.  
  10. m_free_list.push_back(pConn);

  11. }

  12.  
  13. log("cache pool: %s, list size: %lu", m_pool_name.c_str(), m_free_list.size());

  14. return 0;

  15. }

调用CacheConn的Init函数,创建数据库的连接。

 

 

 
  1. /*

  2. * redis初始化连接和重连操作,类似mysql_ping()

  3. */

  4. int CacheConn::Init()

  5. {

  6. if (m_pContext) {

  7. return 0;

  8. }

  9.  
  10. // 4s 尝试重连一次

  11. uint64_t cur_time = (uint64_t)time(NULL);

  12. if (cur_time < m_last_connect_time + 4) {

  13. return 1;

  14. }

  15.  
  16. m_last_connect_time = cur_time;

  17.  
  18. // 200ms超时

  19. struct timeval timeout = {0, 200000};

  20. m_pContext = redisConnectWithTimeout(m_pCachePool->GetServerIP(), m_pCachePool->GetServerPort(), timeout);

  21. if (!m_pContext || m_pContext->err) {

  22. if (m_pContext) {

  23. log("redisConnect failed: %s", m_pContext->errstr);

  24. redisFree(m_pContext);

  25. m_pContext = NULL;

  26. } else {

  27. log("redisConnect failed");

  28. }

  29.  
  30. return 1;

  31. }

  32.  
  33. redisReply* reply = (redisReply *)redisCommand(m_pContext, "SELECT %d", m_pCachePool->GetDBNum());

  34. if (reply && (reply->type == REDIS_REPLY_STATUS) && (strncmp(reply->str, "OK", 2) == 0)) {

  35. freeReplyObject(reply);

  36. return 0;

  37. } else {

  38. log("select cache db failed");

  39. return 2;

  40. }

  41. }

上述流程就完成了所有CacheInstance的初始化,并创建了数据的连接池,后续所有对数据库的操作都通过调用CashManager::GetCacheConn方法来获取数据库的连接,从cachePool管理的free_list中出队列,如果队列为空,则创建新的连接或者等待,使用完连接后加入free_list中,供下次使用。
 

 

2、CDBManager的初始化

CDBManager的初始化流程与CacheManager的初始化流程类似,唯一不同的是CDBManager保存的CDBConn是到MYSQL数据库的连接。其他过程类似,理解上面CacheManager的流程就很容易理解该部分流程,这里不再重复叙述了。

3、资源对象初始化

完成DB相关资源的对象初始化,这些类都用了单例模式,因此初始化这些对象都只需要调用自身的GetInstance方法,

 

 
  1. puts("db init success");

  2. // 主线程初始化单例,不然在工作线程可能会出现多次初始化

  3. if (!CAudioModel::getInstance()) {

  4. return -1;

  5. }

  6.  
  7. if (!CGroupMessageModel::getInstance()) {

  8. return -1;

  9. }

  10.  
  11. if (!CGroupModel::getInstance()) {

  12. return -1;

  13. }

  14.  
  15. if (!CMessageModel::getInstance()) {

  16. return -1;

  17. }

  18.  
  19. if (!CSessionModel::getInstance()) {

  20. return -1;

  21. }

  22.  
  23. if(!CRelationModel::getInstance())

  24. {

  25. return -1;

  26. }

  27.  
  28. if (!CUserModel::getInstance()) {

  29. return -1;

  30. }

  31.  
  32. if (!CFileModel::getInstance()) {

  33. return -1;

  34. }

 

4、工作线程池初始化,任务执行回调初始化。

 

这个部分可谓是db_proxy_server中重要的一环,所有的DB任务都是通过从工作线程池中的线程通过获取对应命令ID的回调函数来执行所有的数据库操作任务,因此理解了这部分差不多就理解了db_proxy_server的一半,下面介绍这部分流程。

 

 
  1. int init_proxy_conn(uint32_t thread_num)

  2. {

  3. s_handler_map = CHandlerMap::getInstance();

  4. g_thread_pool.Init(thread_num);

  5.  
  6. netlib_add_loop(proxy_loop_callback, NULL);

  7.  
  8. signal(SIGTERM, sig_handler);

  9.  
  10. return netlib_register_timer(proxy_timer_callback, NULL, 1000);

  11. }

该部分的初始化时在main函数中调用全局函数init_proxy_conn函数完成的,该函数的一个入参thread_num指定了工作线程池的线程数量。

 

(1)首先获取CHandlerMap的对象,在获取对象的同时完成了所有回调函数的注册。

 

 
  1. CHandlerMap* CHandlerMap::getInstance()

  2. {

  3. if (!s_handler_instance) {

  4. s_handler_instance = new CHandlerMap();

  5. s_handler_instance->Init();

  6. }

  7.  
  8. return s_handler_instance;

  9. }


 

 
  1. void CHandlerMap::Init()

  2. {

  3. // Login validate

  4. m_handler_map.insert(make_pair(uint32_t(CID_OTHER_VALIDATE_REQ), DB_PROXY::doLogin));

  5. m_handler_map.insert(make_pair(uint32_t(CID_LOGIN_REQ_PUSH_SHIELD), DB_PROXY::doPushShield));

  6. m_handler_map.insert(make_pair(uint32_t(CID_LOGIN_REQ_QUERY_PUSH_SHIELD), DB_PROXY::doQueryPushShield));

  7.  
  8. // recent session

  9. m_handler_map.insert(make_pair(uint32_t(CID_BUDDY_LIST_RECENT_CONTACT_SESSION_REQUEST), DB_PROXY::getRecentSession));

  10. m_handler_map.insert(make_pair(uint32_t(CID_BUDDY_LIST_REMOVE_SESSION_REQ), DB_PROXY::deleteRecentSession));

  11.  
  12. // users

  13. m_handler_map.insert(make_pair(uint32_t(CID_BUDDY_LIST_USER_INFO_REQUEST), DB_PROXY::getUserInfo));

  14. m_handler_map.insert(make_pair(uint32_t(CID_BUDDY_LIST_ALL_USER_REQUEST), DB_PROXY::getChangedUser));

  15. m_handler_map.insert(make_pair(uint32_t(CID_BUDDY_LIST_DEPARTMENT_REQUEST), DB_PROXY::getChgedDepart));

  16. m_handler_map.insert(make_pair(uint32_t(CID_BUDDY_LIST_CHANGE_SIGN_INFO_REQUEST), DB_PROXY::changeUserSignInfo));

  17.  
  18.  
  19. // message content

  20. m_handler_map.insert(make_pair(uint32_t(CID_MSG_DATA), DB_PROXY::sendMessage));

  21. m_handler_map.insert(make_pair(uint32_t(CID_MSG_LIST_REQUEST), DB_PROXY::getMessage));

  22. m_handler_map.insert(make_pair(uint32_t(CID_MSG_UNREAD_CNT_REQUEST), DB_PROXY::getUnreadMsgCounter));

  23. m_handler_map.insert(make_pair(uint32_t(CID_MSG_READ_ACK), DB_PROXY::clearUnreadMsgCounter));

  24. m_handler_map.insert(make_pair(uint32_t(CID_MSG_GET_BY_MSG_ID_REQ), DB_PROXY::getMessageById));

  25. m_handler_map.insert(make_pair(uint32_t(CID_MSG_GET_LATEST_MSG_ID_REQ), DB_PROXY::getLatestMsgId));

  26.  
  27. // device token

  28. m_handler_map.insert(make_pair(uint32_t(CID_LOGIN_REQ_DEVICETOKEN), DB_PROXY::setDevicesToken));

  29. m_handler_map.insert(make_pair(uint32_t(CID_OTHER_GET_DEVICE_TOKEN_REQ), DB_PROXY::getDevicesToken));

  30.  
  31. //push 鎺ㄩ€佽缃?

  32. m_handler_map.insert(make_pair(uint32_t(CID_GROUP_SHIELD_GROUP_REQUEST), DB_PROXY::setGroupPush));

  33. m_handler_map.insert(make_pair(uint32_t(CID_OTHER_GET_SHIELD_REQ), DB_PROXY::getGroupPush));

  34.  
  35.  
  36. // group

  37. m_handler_map.insert(make_pair(uint32_t(CID_GROUP_NORMAL_LIST_REQUEST), DB_PROXY::getNormalGroupList));

  38. m_handler_map.insert(make_pair(uint32_t(CID_GROUP_INFO_REQUEST), DB_PROXY::getGroupInfo));

  39. m_handler_map.insert(make_pair(uint32_t(CID_GROUP_CREATE_REQUEST), DB_PROXY::createGroup));

  40. m_handler_map.insert(make_pair(uint32_t(CID_GROUP_CHANGE_MEMBER_REQUEST), DB_PROXY::modifyMember));

  41.  
  42.  
  43. // file

  44. m_handler_map.insert(make_pair(uint32_t(CID_FILE_HAS_OFFLINE_REQ), DB_PROXY::hasOfflineFile));

  45. m_handler_map.insert(make_pair(uint32_t(CID_FILE_ADD_OFFLINE_REQ), DB_PROXY::addOfflineFile));

  46. m_handler_map.insert(make_pair(uint32_t(CID_FILE_DEL_OFFLINE_REQ), DB_PROXY::delOfflineFile));

  47.  
  48. }


在ChandleMap注册数据库请求的CommandID对应的回调函数,在处理接收的请求时,根据CmdID查询CHandleMap,获取回调函数处理请求。

 

(2)初始化工作线程池

g_thread_pool.Init(thread_num);这行代码根据传入的参数创建了指定数量的线程,每个工作线程中都维护了一个任务队列,在有数据请求来的时候,系统随机将任务添加到线程的任务队列中,线程一次执行自己任务队列中的任务。

 

 
  1. int CThreadPool::Init(uint32_t worker_size)

  2. {

  3. m_worker_size = worker_size;

  4. m_worker_list = new CWorkerThread [m_worker_size];

  5. if (!m_worker_list) {

  6. return 1;

  7. }

  8.  
  9. for (uint32_t i = 0; i < m_worker_size; i++) {

  10. m_worker_list[i].SetThreadIdx(i);

  11. m_worker_list[i].Start();

  12. }

  13.  
  14. return 0;

  15. }

 
  1. void CWorkerThread::Start()

  2. {

  3. (void)pthread_create(&m_thread_id, NULL, StartRoutine, this);

  4. }

线程回调

 

 

 
  1. void* CWorkerThread::StartRoutine(void* arg)

  2. {

  3. CWorkerThread* pThread = (CWorkerThread*)arg;

  4.  
  5. pThread->Execute();

  6.  
  7. return NULL;

  8. }

处理任务

 

 

 
  1. void CWorkerThread::Execute()

  2. {

  3. while (true) {

  4. m_thread_notify.Lock();

  5.  
  6. // put wait in while cause there can be spurious wake up (due to signal/ENITR)

  7. while (m_task_list.empty()) {

  8. m_thread_notify.Wait();

  9. }

  10.  
  11. CTask* pTask = m_task_list.front();

  12. m_task_list.pop_front();

  13. m_thread_notify.Unlock();

  14.  
  15. pTask->run();

  16.  
  17. delete pTask;

  18.  
  19. m_task_cnt++;

  20. //log("%d have the execute %d task\n", m_thread_idx, m_task_cnt);

  21. }

  22. }

(3)环回检查,超时检测处理异常中断

 

 

 
  1. netlib_add_loop(proxy_loop_callback, NULL);

  2.  
  3. signal(SIGTERM, sig_handler);

  4.  
  5. return netlib_register_timer(proxy_timer_callback, NULL, 1000);

DB操作中的相应并不是来一个请求发送一个响应,CProxyConn中维护了一个static list<ResponsePdu_t*>s_response_pdu_list,将需要发送的相应PDU插入到该list中,在db_proxy_server的时间分发器中,每次循环都会调用CheckLoop函数,根据注册的proxy_loop_callback回调发送响应信息。

 

 

	signal(SIGTERM, sig_handler);

注册程序异常终止时的信号处理函数,处理函数异常退出时的一些清理工作,发送未发送的数据,停止接受请求等
 

 
  1. static void sig_handler(int sig_no)

  2. {

  3. if (sig_no == SIGTERM) {

  4. log("receive SIGTERM, prepare for exit");

  5. CImPdu cPdu;

  6. IM::Server::IMStopReceivePacket msg;

  7. msg.set_result(0);

  8. cPdu.SetPBMsg(&msg);

  9. cPdu.SetServiceId(IM::BaseDefine::SID_OTHER);

  10. cPdu.SetCommandId(IM::BaseDefine::CID_OTHER_STOP_RECV_PACKET);

  11. for (ConnMap_t::iterator it = g_proxy_conn_map.begin(); it != g_proxy_conn_map.end(); it++) {

  12. CProxyConn* pConn = (CProxyConn*)it->second;

  13. pConn->SendPdu(&cPdu);

  14. }

  15. // Add By ZhangYuanhao

  16. // Before stop we need to stop the sync thread,otherwise maybe will not sync the internal data any more

  17. CSyncCenter::getInstance()->stopSync();

  18.  
  19. // callback after 4 second to exit process;

  20. netlib_register_timer(exit_callback, NULL, 4000);

  21. }

  22. }

netlib_register_timer(proxy_timer_callback, NULL, 1000)注册心跳,1000tick到发送一个心跳包,防止客户端掉线时占用系统资源,及时清理对应客户端的连接符等系统资源。

 

 

5、同步聊天记录等信息

 
  1. CSyncCenter::getInstance()->init();

  2. CSyncCenter::getInstance()->startSync();

 
  1. void CSyncCenter::init()

  2. {

  3. // Load total update time

  4. CacheManager* pCacheManager = CacheManager::getInstance();

  5. // increase message count

  6. CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");

  7. if (pCacheConn)

  8. {

  9. string strTotalUpdate = pCacheConn->get("total_user_updated");

  10.  
  11. string strLastUpdateGroup = pCacheConn->get("last_update_group");

  12. pCacheManager->RelCacheConn(pCacheConn);

  13. if(strTotalUpdate != "")

  14. {

  15. m_nLastUpdate = string2int(strTotalUpdate);

  16. }

  17. else

  18. {

  19. updateTotalUpdate(time(NULL));

  20. }

  21. if(strLastUpdateGroup.empty())

  22. {

  23. m_nLastUpdateGroup = string2int(strLastUpdateGroup);

  24. }

  25. else

  26. {

  27. updateLastUpdateGroup(time(NULL));

  28. }

  29. }

  30. else

  31. {

  32. log("no cache connection to get total_user_updated");

  33. }

  34. }

创建线程并开始同步

 
  1. void CSyncCenter::startSync()

  2. {

  3. #ifdef _WIN32

  4. (void)CreateThread(NULL, 0, doSyncGroupChat, NULL, 0, &m_nGroupChatThreadId);

  5. #else

  6. (void)pthread_create(&m_nGroupChatThreadId, NULL, doSyncGroupChat, NULL);

  7. #endif

  8. }


由于主线程执行需要执行其他任务,为了避免主线程耗时,同步聊天记录,群主信息功能在新创建的线程中执行,节省主线程启动时间;所有同步操作均在线程回调函数中完成doSyncGroupChat.
 

 

6、事件分发,主线程助理主流程,接受客户连接,处理客户请求

所有TT_SERVER的网络处理统一在事件分发起中执行,包括从socket中接收连接,读写数据,处理异常

 
  1. void CEventDispatch::StartDispatch(uint32_t wait_timeout)

  2. {

  3. struct epoll_event events[1024];

  4. int nfds = 0;

  5.  
  6. if(running)

  7. return;

  8. running = true;

  9.  
  10. while (running)

  11. {

  12. nfds = epoll_wait(m_epfd, events, 1024, wait_timeout);

  13. for (int i = 0; i < nfds; i++)

  14. {

  15. int ev_fd = events[i].data.fd;

  16. CBaseSocket* pSocket = FindBaseSocket(ev_fd);

  17. if (!pSocket)

  18. continue;

  19.  
  20. //Commit by zhfu @2015-02-28

  21. #ifdef EPOLLRDHUP

  22. if (events[i].events & EPOLLRDHUP)

  23. {

  24. //log("On Peer Close, socket=%d, ev_fd);

  25. pSocket->OnClose();

  26. }

  27. #endif

  28. // Commit End

  29.  
  30. if (events[i].events & EPOLLIN)

  31. {

  32. //log("OnRead, socket=%d\n", ev_fd);

  33. pSocket->OnRead();

  34. }

  35.  
  36. if (events[i].events & EPOLLOUT)

  37. {

  38. //log("OnWrite, socket=%d\n", ev_fd);

  39. pSocket->OnWrite();

  40. }

  41.  
  42. if (events[i].events & (EPOLLPRI | EPOLLERR | EPOLLHUP))

  43. {

  44. //log("OnClose, socket=%d\n", ev_fd);

  45. pSocket->OnClose();

  46. }

  47.  
  48. pSocket->ReleaseRef();

  49. }

  50.  
  51. _CheckTimer();

  52. _CheckLoop();

  53. }

  54. }

在事件分发中需要详解的是从socket中读取客户请求数据并解析PDU数据包,处理客户请求,回复效应,其他socket连接等部分在之前的文章中已经叙述过了,这里就不再重复了,下面主要陈述db_proxy_server中处理客户请求的流程。

在db_proxy_server中接收到用户的请求数据后都会在如下的函数中解析请求数据。

 

 
  1. void CProxyConn::HandlePduBuf(uchar_t* pdu_buf, uint32_t pdu_len)

  2. {

  3. CImPdu* pPdu = NULL;

  4. pPdu = CImPdu::ReadPdu(pdu_buf, pdu_len);

  5. if (pPdu->GetCommandId() == IM::BaseDefine::CID_OTHER_HEARTBEAT) {

  6. return;

  7. }

  8.  
  9. pdu_handler_t handler = s_handler_map->GetHandler(pPdu->GetCommandId());

  10.  
  11. if (handler) {

  12. CTask* pTask = new CProxyTask(m_uuid, handler, pPdu);

  13. g_thread_pool.AddTask(pTask);

  14. } else {

  15. log("no handler for packet type: %d", pPdu->GetCommandId());

  16. }

  17. }

解析接收到的PDU数据包,根据CommandId判断是否是心跳包,如果是心跳包直接丢失,不是心跳包则在CHandlerMap中获取对应CommandId的回调,创建一个新的CProxyTask任务类,将任务类随机的加入到工作线程的任务队列中。

 

 

 

 
  1. void CThreadPool::AddTask(CTask* pTask)

  2. {

  3. /*

  4. * select a random thread to push task

  5. * we can also select a thread that has less task to do

  6. * but that will scan the whole thread list and use thread lock to get each task size

  7. */

  8. uint32_t thread_idx = random() % m_worker_size;

  9. m_worker_list[thread_idx].PushTask(pTask);

  10. }

线程在处理任务的时候会调用相应任务类的Run方法,处理任务

 

 

 
  1. void CProxyTask::run()

  2. {

  3.  
  4. if (!m_pPdu) {

  5. // tell CProxyConn to close connection with m_conn_uuid

  6. CProxyConn::AddResponsePdu(m_conn_uuid, NULL);

  7. } else {

  8. if (m_pdu_handler) {

  9. m_pdu_handler(m_pPdu, m_conn_uuid);

  10. }

  11. }

  12. }



 

 

 

至此db_proxy_server的整个初始化流程和处理流程都介绍完毕。在整个db server中核心思想就是池化技术,(DB连接池,工作线程池);整个核心流程是创建线程池,根据命令ID注册对应的处理回调,在线程回调函数中处理任务队列,依次取出任务,根据注册的回调处理任务请求,回响应。

 

鉴于本人理解有限,在行文的过程中可能存在一些理解或者描述错误的地方,请各位看官指正,TT_SERVER详解系列,是本人学习teamtalk源码的一些理解和心得,看源码主要看的是设计方法,处理思维,在不断学习中进步。

这篇关于TeamTalk DB_PROXY_SERVER详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/892965

相关文章

MySQL中删除重复数据SQL的三种写法

《MySQL中删除重复数据SQL的三种写法》:本文主要介绍MySQL中删除重复数据SQL的三种写法,文中通过代码示例讲解的非常详细,对大家的学习或工作有一定的帮助,需要的朋友可以参考下... 目录方法一:使用 left join + 子查询删除重复数据(推荐)方法二:创建临时表(需分多步执行,逻辑清晰,但会

Oracle的to_date()函数详解

《Oracle的to_date()函数详解》Oracle的to_date()函数用于日期格式转换,需要注意Oracle中不区分大小写的MM和mm格式代码,应使用mi代替分钟,此外,Oracle还支持毫... 目录oracle的to_date()函数一.在使用Oracle的to_date函数来做日期转换二.日

Java实现任务管理器性能网络监控数据的方法详解

《Java实现任务管理器性能网络监控数据的方法详解》在现代操作系统中,任务管理器是一个非常重要的工具,用于监控和管理计算机的运行状态,包括CPU使用率、内存占用等,对于开发者和系统管理员来说,了解这些... 目录引言一、背景知识二、准备工作1. Maven依赖2. Gradle依赖三、代码实现四、代码详解五

Mysql 中的多表连接和连接类型详解

《Mysql中的多表连接和连接类型详解》这篇文章详细介绍了MySQL中的多表连接及其各种类型,包括内连接、左连接、右连接、全外连接、自连接和交叉连接,通过这些连接方式,可以将分散在不同表中的相关数据... 目录什么是多表连接?1. 内连接(INNER JOIN)2. 左连接(LEFT JOIN 或 LEFT

Java中switch-case结构的使用方法举例详解

《Java中switch-case结构的使用方法举例详解》:本文主要介绍Java中switch-case结构使用的相关资料,switch-case结构是Java中处理多个分支条件的一种有效方式,它... 目录前言一、switch-case结构的基本语法二、使用示例三、注意事项四、总结前言对于Java初学者

Linux内核之内核裁剪详解

《Linux内核之内核裁剪详解》Linux内核裁剪是通过移除不必要的功能和模块,调整配置参数来优化内核,以满足特定需求,裁剪的方法包括使用配置选项、模块化设计和优化配置参数,图形裁剪工具如makeme... 目录简介一、 裁剪的原因二、裁剪的方法三、图形裁剪工具四、操作说明五、make menuconfig

详解Java中的敏感信息处理

《详解Java中的敏感信息处理》平时开发中常常会遇到像用户的手机号、姓名、身份证等敏感信息需要处理,这篇文章主要为大家整理了一些常用的方法,希望对大家有所帮助... 目录前后端传输AES 对称加密RSA 非对称加密混合加密数据库加密MD5 + Salt/SHA + SaltAES 加密平时开发中遇到像用户的

mysql重置root密码的完整步骤(适用于5.7和8.0)

《mysql重置root密码的完整步骤(适用于5.7和8.0)》:本文主要介绍mysql重置root密码的完整步骤,文中描述了如何停止MySQL服务、以管理员身份打开命令行、替换配置文件路径、修改... 目录第一步:先停止mysql服务,一定要停止!方式一:通过命令行关闭mysql服务方式二:通过服务项关闭

Springboot使用RabbitMQ实现关闭超时订单(示例详解)

《Springboot使用RabbitMQ实现关闭超时订单(示例详解)》介绍了如何在SpringBoot项目中使用RabbitMQ实现订单的延时处理和超时关闭,通过配置RabbitMQ的交换机、队列和... 目录1.maven中引入rabbitmq的依赖:2.application.yml中进行rabbit

C语言线程池的常见实现方式详解

《C语言线程池的常见实现方式详解》本文介绍了如何使用C语言实现一个基本的线程池,线程池的实现包括工作线程、任务队列、任务调度、线程池的初始化、任务添加、销毁等步骤,感兴趣的朋友跟随小编一起看看吧... 目录1. 线程池的基本结构2. 线程池的实现步骤3. 线程池的核心数据结构4. 线程池的详细实现4.1 初