glib库异步队列和线程池代码分析

2024-01-17 06:48

本文主要是介绍glib库异步队列和线程池代码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文章主要讲了两部分内容:一是分析了异步队列的原理和实现,二是分析线程池的原理和实现。
在多线程程序的运行中,如果经常地创建和销毁执行过程相似而所用数据不同的线程,系统的效率,系统资源的利用率将会受到极大的影响。对于这一问题可用类似glib库中的线程池的解决办法。
  
我 们可以这样想像线程池的处理,当有新的数据要交给线程处理时,主程序/主线程 就从线程池中找到一个未被使用的线程处理这新来的数据,如果线程池中没有找到可用的空闲线程,就新创建一个线程来处理这个数据,并在处理完后不销毁它而是 把这个线程放到线程池中,以备后用。线程池的这个原理和内存管理中slab机制有异曲同工之妙!我想无论是线程池的这种处理方式还是slab机制,其本质 思想还是一致的。
近来做的项目,在框架中用到了多线程的异步队列,实现形式和glib中的异步队列极其相似,而glib线程池中的代码也用到了异步队列(名字用同步队列更合适),因此就先分析一下异步队列。
异 步队列的概念是这样的:所有的数据组织成队列,供多线程并发访问,而这些并发控制全部在异步队列里面实现,对外面只提供读写接口;当队列中的数据为空时, 如果是读线程访问异步队列,那么这一读线程就等待,直到有数据为止;写线程向队列放数据时,如果有线程在等待数据就唤醒等待线程。
异步队列主要代码剖析:
异步队列的数据结构如下:
struct _GAsyncQueue
{
GMutex *mutex; //互斥变量
GCond *cond; //等待条件
GQueue *queue; //数据队列
guint waiting_threads; //等待的读线程个数
gint32 ref_count;
};
g_async_queue_push_unlocked (GAsyncQueue* queue, gpointer data)
{
………//这些点代表一些省略的代码
//把数据放入队列
g_queue_push_head (queue->queue, data);
//现在队列已经有数据了,判断是否有读线程在等待数据,
//如果有就发送信号唤醒读线程
if (queue->waiting_threads > 0)
    g_cond_signal (queue->cond);
}
g_async_queue_push (GAsyncQueue* queue, gpointer data)
{
……..
g_mutex_lock (queue->mutex); //在访问临界区前先获得互斥变量
g_async_queue_push_unlocked (queue, data); //执行写数据操作
g_mutex_unlock (queue->mutex); //释放互斥变量,以使其它线程可以进入临界区
}
从以上的接口可看出,”…._ unlocked” 这样的接口就是异步队列这个对象已获得互斥变量的接口,glib中线程处理相关接口都有类似的命名规则,在接下来的代码分析中,如没有特别的需要就只看”…._ unlocked” 这样的接口。
// 读线程从异步队列中获取数据的接口
// try参数和时间参数在多线程同步/内核多进程的实现中是很常见的东西了,在这里就不再作特殊的解释了。
g_async_queue_pop_intern_unlocked (GAsyncQueue *queue,
                               gboolean     try,
                               GTimeVal    *end_time)
{
gpointer retval;
//判断是否有数据在队列中,如果没有就要执行if语句相应的睡眠等待,直到被写进程唤醒
if (!g_queue_peek_tail_link (queue->queue))
    {
      if (try)//如果try为真,则永远不睡眠
       return NULL;
     
    // 接下来是要让线程进行睡眠等待了,在等待之前先确保等待条件已创建
      if (!queue->cond)
       queue->cond = g_cond_new ();
      if (!end_time) // 等待无时间限制
        {
          queue->waiting_threads++; // 等待线程数加一
      // 这里为什么用循环?因为这是多线程的环境,有可能有多个读线程在等待
      // 当前线程被唤醒时,有可能数据队列中的数据又被别的线程读走了,所以
      // 当前线程就得继续睡眠等待
      // 注意:睡眠等待时会暂时放弃互斥锁,被唤醒时会重新获取互斥锁
       while (!g_queue_peek_tail_link (queue->queue))
            g_cond_wait (queue->cond, queue->mutex);
          queue->waiting_threads--; // 等待线程数减一
        }
      else
        {
          queue->waiting_threads++;
          while (!g_queue_peek_tail_link (queue->queue))
           if (!g_cond_timed_wait (queue->cond, queue->mutex, end_time))
              break;
          queue->waiting_threads--;
          if (!g_queue_peek_tail_link (queue->queue))
           return NULL;
        }
    }
retval = g_queue_pop_tail (queue->queue);
g_assert (retval);
return retval;
}
/* 返回数据队列的长度,也即数据队列中的数据个数.
* 如果是负值表明是等待数据的线程个数,正数表示数据队列的数据个数
* g_async_queue_length == 0 表示是有 'n' 个数据和' n' 个等待线程在数据队列
* 这种特殊情况可能是在对数据队列加锁或调度时发生
*/
g_async_queue_length_unlocked (GAsyncQueue* queue)
{
g_return_val_if_fail (queue, 0);
g_return_val_if_fail (g_atomic_int_get (&queue->ref_count) > 0, 0);
return queue->queue->length - queue->waiting_threads;
}
  
有 了前面的异步队列基础就可以分析线程池是怎么实现的了。在glib库中的线程池的实现和使用有两种方式:1. 单个线程池对象不共享方式;2. 多个线程池对象共享线程方式,也即把各个具体的线程池对象创建的把任务做完了的线程统一放在全局线程池中进行统一管理,各个具体的线程池对象要使用线程 时,可以先向全局线程池中取线程,如果全局线程池没有线程了具体的线程池对象就可自行创建线程。
       线程池的数据结构有两部分,一部分是在头文件中,另一部分在C文件中。这是C语言中常用的信息隐藏方法之一,把要暴露给用户的数据放在头文件中,而要隐藏的数据则放在C文件中。下面是线程池头文件中的数据结构:
typedef struct _GThreadPool     GThreadPool;
struct _GThreadPool
{
   // 具体处理数据的函数
   // 它的第一个参数为g_thread_pool_push进去的数据,也即要执行的任务
GFunc func;
gpointer user_data; // func的第二个参数
// 通过这个成员控制线程池对象创建的线程是否在全局线程池中共享,
// TRUE为不共享,FALSE为共享
gboolean exclusive;
};
C文件中线程池的数据结构:
typedef struct _GRealThreadPool GRealThreadPool;
struct _GRealThreadPool
{
GThreadPool pool; // 头文件已定义
GAsyncQueue* queue; // 异步数据队列
GCond* cond;
gint max_threads; // 线程池对象持有的线程数上限
gint num_threads;// 池程池对象当前持有的线程数
gboolean running;
gboolean immediate;
gboolean waiting;
GCompareDataFunc sort_func;
gpointer sort_user_data;
};
我们可以先来分析单个线程对象不共享的主要实现。在分析它的实现之前,可以先看看一个流程图
从上图可见当主线程有数据交给线程池处理时,只要调用异步队列相关的push接口,线程池中的任何一个线程都可以为这服务。根据以上的流程图看看单个线程对象不共享方式的主要实现代码,它的调用从创建线程池对象开始:
g_thread_pool_new---> g_thread_pool_start_thread---> g_thread_create(g_thread_pool_thread_proxy,pool,FALSE,&local_error)--->> g_thread_pool_wait_for_new_task(pool ---->  g_async_queue_pop_unlocked (pool->queue);
// max_threads为 -1 时表示线程池中的线程数无限制并且线程由动态生成
// max_threads为正整数时,线程池就会预先创建max_threads个线程
g_thread_pool_new (GFunc             func,
                 gpointer         user_data,
                 gint             max_threads,
                 gboolean         exclusive,
                 GError         **error)
{
GRealThreadPool *retval;
    ……………. //这些点代表一些省略的代码
retval = g_new (GRealThreadPool, 1);
retval->pool.func = func;
retval->pool.user_data = user_data;
retval->pool.exclusive = exclusive;
retval->queue = g_async_queue_new (); // 创建异步队列
retval->cond = NULL;
retval->max_threads = max_threads;
retval->num_threads = 0;
retval->running = TRUE;
    …………….
if (retval->pool.exclusive)
{
      g_async_queue_lock (retval->queue);
      while (retval->num_threads < retval->max_threads)
          {
             GError *local_error = NULL;
             g_thread_pool_start_thread (retval, &local_error);//起动新的线程
             …………….
       }
      g_async_queue_unlock (retval->queue);
}
return (GThreadPool*) retval;
}
g_thread_pool_start_thread (GRealThreadPool *pool,
                         GError          **error)
{
gboolean success = FALSE;
if (pool->num_threads >= pool->max_threads && pool->max_threads != -1)
    /* Enough threads are already running */
    return;
…………….
if (!success)
{
      GError *local_error = NULL;
      /* No thread was found, we have to start a new one */
      // 真正创建一个新的线程
      g_thread_create (g_thread_pool_thread_proxy, pool, FALSE, &local_error);
      ……………….
}
pool->num_threads++;
}
g_thread_pool_thread_proxy (gpointer data)
{
GRealThreadPool *pool;
pool = data;
……………..
g_async_queue_lock (pool->queue);
while (TRUE)
{
      gpointer task;
      // 线程等待任务,也即等待数据,线程在等待就是处在线程池中的空闲线程
      task = g_thread_pool_wait_for_new_task (pool);
      // 如果线程被唤醒收到并数据就用此线程执行任务,否则继续循环等待
      // 注意:当任务做完时,继续循环又会调用上面的g_thread_pool_wait_for_new_task
      // 而进入等待状态,
if (task)
       {
             if (pool->running || !pool->immediate)
              {
                /* A task was received and the thread pool is active, so
              * execute the function.
              */
                g_async_queue_unlock (pool->queue);
                pool->pool.func (task, pool->pool.user_data);
                g_async_queue_lock (pool->queue);
           }
       }
      else
       {
            ………………
      }
}
return NULL;
}
g_thread_pool_wait_for_new_task (GRealThreadPool *pool)
{
gpointer task = NULL;
if (pool->running || (!pool->immediate &&
                     g_async_queue_length_unlocked (pool->queue) > 0))
{
      /* This thread pool is still active. */
      if (pool->num_threads > pool->max_threads && pool->max_threads != -1)
       {
           …………..
       }
     else if (pool->pool.exclusive)
       {
           /* Exclusive threads stay attached to the pool. */
        // 调用异步队列的pop接口进入等待状态,到此一个线程的创建过程就完成了
           task = g_async_queue_pop_unlocked (pool->queue);
       }
      else
       {
           ………….
       }
}
else
{
     …………
}
return task;
}

现在可以结合流程图分析线程池中创建一个线程的一个情景:从函数g_thread_pool_new的while循环调用了 g_thread_pool_start_thread函数,在函数中直接调用g_thread_create创建线程,被创建的线程调用函数 g_thread_pool_wait_for_new_task循环等待任务的到来,函数 g_thread_pool_wait_for_new_task调用g_async_queue_pop_unlocked (pool->queue)真正进入等待。如此可知,最终新创建的线程是调用异步队列的pop接口进入等待状态的,这样一个线程的创建就大功告成 了。而函数g_thread_pool_new的while循环结束时就创建了max_threads个等待线程,也即这个新建的线程池对象有了 max_threads个线程以备使用。

       创建线程池、线程池中的线程是为了使用它,在线程池中取线程,叫线程干活的过程就很简单多了,这个调用过程:g_thread_pool_push--à g_thread_pool_queue_push_unlocked--à g_async_queue_push_unlocked。可见最终调用的是异步数据队列的push接口,把要处理的数据插入队列后它就会唤醒等待异步队列数据的等待线程。

g_thread_pool_push (GThreadPool *pool,
gpointer      data,
GError      **error)
{
……………
//
if (g_async_queue_length_unlocked (real->queue) >= 0)
/* No thread is waiting in the queue */
g_thread_pool_start_thread (real, error);
g_thread_pool_queue_push_unlocked (real, data);
g_async_queue_unlock (real->queue);
}
g_thread_pool_queue_push_unlocked (GRealThreadPool *pool,
gpointer         data)
{
………….
g_async_queue_push_unlocked (pool->queue, data);
}

    总结:单个线程池对象不共享方式在管理多线程时是以线程池对象中的异步队列为中心,新创建的线程或做完任务的线程并不释放,让它调用异步队列的pop接口进入等待状态,而在使用唤醒线程池中的线程就是调用异步队列的push接口。

    以上对于理解线程池的实现已经足够,多个线程池对象共享线程方式和具体线程池的销毁的技巧,在这里就不讨论了。

这篇关于glib库异步队列和线程池代码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/615168

相关文章

hdu1180(广搜+优先队列)

此题要求最少到达目标点T的最短时间,所以我选择了广度优先搜索,并且要用到优先队列。 另外此题注意点较多,比如说可以在某个点停留,我wa了好多两次,就是因为忽略了这一点,然后参考了大神的思想,然后经过反复修改才AC的 这是我的代码 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

活用c4d官方开发文档查询代码

当你问AI助手比如豆包,如何用python禁止掉xpresso标签时候,它会提示到 这时候要用到两个东西。https://developers.maxon.net/论坛搜索和开发文档 比如这里我就在官方找到正确的id描述 然后我就把参数标签换过来

poj 1258 Agri-Net(最小生成树模板代码)

感觉用这题来当模板更适合。 题意就是给你邻接矩阵求最小生成树啦。~ prim代码:效率很高。172k...0ms。 #include<stdio.h>#include<algorithm>using namespace std;const int MaxN = 101;const int INF = 0x3f3f3f3f;int g[MaxN][MaxN];int n

poj 3190 优先队列+贪心

题意: 有n头牛,分别给他们挤奶的时间。 然后每头牛挤奶的时候都要在一个stall里面,并且每个stall每次只能占用一头牛。 问最少需要多少个stall,并输出每头牛所在的stall。 e.g 样例: INPUT: 51 102 43 65 84 7 OUTPUT: 412324 HINT: Explanation of the s

poj 2431 poj 3253 优先队列的运用

poj 2431: 题意: 一条路起点为0, 终点为l。 卡车初始时在0点,并且有p升油,假设油箱无限大。 给n个加油站,每个加油站距离终点 l 距离为 x[i],可以加的油量为fuel[i]。 问最少加几次油可以到达终点,若不能到达,输出-1。 解析: 《挑战程序设计竞赛》: “在卡车开往终点的途中,只有在加油站才可以加油。但是,如果认为“在到达加油站i时,就获得了一

计算机毕业设计 大学志愿填报系统 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试

🍊作者:计算机编程-吉哥 🍊简介:专业从事JavaWeb程序开发,微信小程序开发,定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事,生活就是快乐的。 🍊心愿:点赞 👍 收藏 ⭐评论 📝 🍅 文末获取源码联系 👇🏻 精彩专栏推荐订阅 👇🏻 不然下次找不到哟~Java毕业设计项目~热门选题推荐《1000套》 目录 1.技术选型 2.开发工具 3.功能

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

MOLE 2.5 分析分子通道和孔隙

软件介绍 生物大分子通道和孔隙在生物学中发挥着重要作用,例如在分子识别和酶底物特异性方面。 我们介绍了一种名为 MOLE 2.5 的高级软件工具,该工具旨在分析分子通道和孔隙。 与其他可用软件工具的基准测试表明,MOLE 2.5 相比更快、更强大、功能更丰富。作为一项新功能,MOLE 2.5 可以估算已识别通道的物理化学性质。 软件下载 https://pan.quark.cn/s/57

代码随想录冲冲冲 Day39 动态规划Part7

198. 打家劫舍 dp数组的意义是在第i位的时候偷的最大钱数是多少 如果nums的size为0 总价值当然就是0 如果nums的size为1 总价值是nums[0] 遍历顺序就是从小到大遍历 之后是递推公式 对于dp[i]的最大价值来说有两种可能 1.偷第i个 那么最大价值就是dp[i-2]+nums[i] 2.不偷第i个 那么价值就是dp[i-1] 之后取这两个的最大值就是d