skynet源码分析3:消息调度

2024-05-07 05:18

本文主要是介绍skynet源码分析3:消息调度,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

从四个方面来说:

  1、消息发送

  2、工作线程控制

  3、信箱调度

  4、消息分发

与调度相关的代码实现在/skynet-src/skynet_mq.c,/skynet-src/skynet_start.c,/skynet-src/skynet_server.c三个文件中,整体上是一个m:n的调度器。


消息发送


skynet的消息定义在/skynet-src/skynet_mq.h中:

 

复制代码
struct skynet_message {uint32_t source;int session;void * data;size_t sz;
};// type is encoding in skynet_message.sz high 8bit
#define MESSAGE_TYPE_MASK (SIZE_MAX >> 8)
#define MESSAGE_TYPE_SHIFT ((sizeof(size_t)-1) * 8)
复制代码

 

source:消息源(sc)的句柄。

session:用来做上下文的标识

data:消息指针

sz:消息长度,消息的请求类型定义在高8位

消息发出后,会被保存在接收者sc的信箱(message_queue字段)中,发送消息也就是向信箱压入一条消息。来看看发送函数吧,在/skynet-src/skynet_server.c中:

复制代码
 1 int
 2 skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) {
 3     if ((sz & MESSAGE_TYPE_MASK) != sz) {
 4         skynet_error(context, "The message to %x is too large", destination);
 5         if (type & PTYPE_TAG_DONTCOPY) {
 6             skynet_free(data);
 7         }
 8         return -1;
 9     }
10     _filter_args(context, type, &session, (void **)&data, &sz);
11 
12     if (source == 0) {
13         source = context->handle;
14     }
15 
16     if (destination == 0) {
17         return session;
18     }
19     if (skynet_harbor_message_isremote(destination)) {
20         struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
21         rmsg->destination.handle = destination;
22         rmsg->message = data;
23         rmsg->sz = sz;
24         skynet_harbor_send(rmsg, source, session);
25     } else {
26         struct skynet_message smsg;
27         smsg.source = source;
28         smsg.session = session;
29         smsg.data = data;
30         smsg.sz = sz;
31 
32         if (skynet_context_push(destination, &smsg)) {
33             skynet_free(data);
34             return -1;
35         }
36     }
37     return session;
38 }
复制代码

3-9行对消息长度做了限制,MESSAGE_TYPE_MASK等于(SIZE_MAX >> 8),也就是最大只能为224,16MB。

_filter_args根据type做了两个处理:

1、(type & PTYPE_TAG_DONTCOPY) == 0

  会将data复制一份用作实际发送,这种情况下原来的data就要由调用者负责释放。

2、(type & PTYPE_TAG_ALLOCSESSION) > 0

  会从sc的session计数器分配一个session.

处理完后,type会合并到sz的高8位。

最后一步就是投递到接收者的信箱了,根据接收者句柄判断是否为远程节点,如果是就用harbo发送。(内置的集群方案,现在已经不推荐使用)。成功返回session,失败返回-1,并且释放data.

 


 

工作线程的控制

skynet运行后,会启动固定的线程来轮流调度sc(skynet_context),线程数由配置文件中的thread字段定义,默认是4个。那它是如何控制这些线程的呢?具体实现在/skynet-src/skynet_start.c中。

在208行,启动了工作线程:

复制代码
static int weight[] = { -1, -1, -1, -1, 0, 0, 0, 0,1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, };struct worker_parm wp[thread];for (i=0;i<thread;i++) {wp[i].m = m;wp[i].id = i;if (i < sizeof(weight)/sizeof(weight[0])) {wp[i].weight= weight[i];} else {wp[i].weight = 0;}create_thread(&pid[i+3], thread_worker, &wp[i]);}
复制代码

直接来看线程函数thread_worker把,在152行:

复制代码
 1 static void *
 2 thread_worker(void *p) {
 3     struct worker_parm *wp = p;
 4     int id = wp->id;
 5     int weight = wp->weight;
 6     struct monitor *m = wp->m;
 7     struct skynet_monitor *sm = m->m[id];
 8     skynet_initthread(THREAD_WORKER);
 9     struct message_queue * q = NULL;
10     while (!m->quit) {
11         q = skynet_context_message_dispatch(sm, q, weight);
12         if (q == NULL) {
13             if (pthread_mutex_lock(&m->mutex) == 0) {
14                 ++ m->sleep;
15                 // "spurious wakeup" is harmless,
16                 // because skynet_context_message_dispatch() can be call at any time.
17                 if (!m->quit)
18                     pthread_cond_wait(&m->cond, &m->mutex);
19                 -- m->sleep;
20                 if (pthread_mutex_unlock(&m->mutex)) {
21                     fprintf(stderr, "unlock mutex error");
22                     exit(1);
23                 }
24             }
25         }
26     }
27     return NULL;
28 }
复制代码

控制这种生命周期与进程一致的工作线程,主要有两个细节:1、均匀不重复的分配任务。2、不空转、最小时延。前者处理线程同步就好。来看看skynet是如何处理后者的吧:

它用得是条件变量来处理空转的,用条件变量有两点好处:1、让出cpu时间片.2、由外部决定何时唤醒,这样可以在有任务时再唤醒,既能最大化的不空转,又能减小处理任务的时延。

具体实现是条件变量的标准应用了,和《unix高级编程》条件变量的例子几乎一样。这里还有一个sleep的计数,有什么用呢?用来判断要不要调用pthread_cond_signal的。

最后还有一个问题,等待的线程是在哪里被唤醒的呢?在socket线程和timer线程里唤醒的,前者有socket消息时会调用一次,后者每个刷新时间会唤醒一次。


信箱的调度

上一篇时,在sc里我们看到过一个message_queue类型的字段,这就是信箱。skynet中用了两种队列来存储消息并完成调度,下面称为12级队列,1级队列是一个单链表,每个节点是2级队列,2级队列(message_queue)是一个自动扩展的循环队列,用来存储消息。这两个队列实现在/skynet-src/skynet_mq.c中,实现的很简单,并没有用复杂的无锁结构,而是自旋锁保证线程安全的链表,循环队列。

信箱的调度就是12级队列的调度,整体结构描述如下:

while(1){

  1级队列出队;

  调度2级队列;

  1级队列入队;

}

这部分实现在/skynet-src/skynet_server的275行skynet_context_message_dispatch()中:

复制代码
 1 struct message_queue * 
 2 skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
 3     if (q == NULL) {
 4         q = skynet_globalmq_pop();
 5         if (q==NULL)
 6             return NULL;
 7     }
 8 
 9     uint32_t handle = skynet_mq_handle(q);
10 
11     struct skynet_context * ctx = skynet_handle_grab(handle);
12     if (ctx == NULL) {
13         struct drop_t d = { handle };
14         skynet_mq_release(q, drop_message, &d);
15         return skynet_globalmq_pop();
16     }
17 
18     int i,n=1;
19     struct skynet_message msg;
20 
21     for (i=0;i<n;i++) {
22         if (skynet_mq_pop(q,&msg)) {
23             skynet_context_release(ctx);
24             return skynet_globalmq_pop();
25         } else if (i==0 && weight >= 0) {
26             n = skynet_mq_length(q);
27             n >>= weight;
28         }
29         int overload = skynet_mq_overload(q);
30         if (overload) {
31             skynet_error(ctx, "May overload, message queue length = %d", overload);
32         }
33 
34         skynet_monitor_trigger(sm, msg.source , handle);
35 
36         if (ctx->cb == NULL) {
37             skynet_free(msg.data);
38         } else {
39             dispatch_message(ctx, &msg);
40         }
41 
42         skynet_monitor_trigger(sm, 0,0);
43     }
44 
45     assert(q == ctx->queue);
46     struct message_queue *nq = skynet_globalmq_pop();
47     if (nq) {
48         // If global mq is not empty , push q back, and return next queue (nq)
49         // Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
50         skynet_globalmq_push(q);
51         q = nq;
52     } 
53     skynet_context_release(ctx);
54 
55     return q;
56 }
复制代码

这个函数的作用是,调度传入的2级队列,并返回下一个可调度的2级队列。在上面的实现中,有四个细节之处:

1、22-24行,当2级队列为空时并没有将其压入1级队列,那它从此就消失了吗?不,这样做是为了减少空转1级队列,那这个2级队列是什么时候压回的呢?在message_queue中,有一个

in_global标记是否在1级队列中,当2级队列的出队(skynet_mq_pop)失败时,这个标记就会被置0,在2级队列入队时(skynet_mq_push)会判断这个标记,如果为0,那么就会将自己压入1级队列。(skynet_mq_mark_release也会判断)所以这个2级队列在下次入队时会压回。

2、25-27,修改了for循环的次数,也就是每次调度处理多少条消息。这个次数与传入的weight有关,我们回过头来看这个weight是从哪里来的,源头在工作线程创建时:

static int weight[] = { -1, -1, -1, -1, 0, 0, 0, 0,1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, };struct worker_parm wp[thread];for (i=0;i<thread;i++) {wp[i].m = m;wp[i].id = i;if (i < sizeof(weight)/sizeof(weight[0])) {wp[i].weight= weight[i];} else {wp[i].weight = 0;}create_thread(&pid[i+3], thread_worker, &wp[i]);}

再来看看 n >>= weight,嗯,大致就是:把工作线程分为组,前四组每组8个,超过的归入第5组,AE组每次调度处理一条消息,B组每次处理(n/2)条,C组每次处理(n/4)条,D组每次处理(n/8)条。是为了均匀的使用多核。

3、29-32做了一个负载判断,负载的阀值是1024。不过也仅仅是输出一条log提醒一下而以.

4、34、42触发了一下monitor,这个监控是用来检测消息处理是否发生了死循环,不过也仅仅只是输出一条log提醒一下。这个检测是放在一个专门的监控线程里做的,判断死循环的时间是5秒。具体机制这里就不说了,其实现在/skynet-src/skynet_monitor.c中


消息分发

信箱调度时,从2级队列取出消息后就会调用dispatch_message函数做分发,在/skynet-src/skynet_server.c中:

复制代码
 1 static void
 2 dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
 3     assert(ctx->init);
 4     CHECKCALLING_BEGIN(ctx)
 5     pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
 6     int type = msg->sz >> MESSAGE_TYPE_SHIFT;
 7     size_t sz = msg->sz & MESSAGE_TYPE_MASK;
 8     if (ctx->logfile) {
 9         skynet_log_output(ctx->logfile, msg->source, type, msg->session, msg->data, sz);
10     }
11     if (!ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz)) {
12         skynet_free(msg->data);
13     } 
14     CHECKCALLING_END(ctx)
15 }
复制代码

step1:将sc句柄保存在线程本地变量中.

step2:如果开启了录像功能,就将data的数据dump到日志文件

step3:调用sc的回调函数,根据返回值觉得是否释放data,0释放,1不释放.

这篇关于skynet源码分析3:消息调度的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/966428

相关文章

Go使用pprof进行CPU,内存和阻塞情况分析

《Go使用pprof进行CPU,内存和阻塞情况分析》Go语言提供了强大的pprof工具,用于分析CPU、内存、Goroutine阻塞等性能问题,帮助开发者优化程序,提高运行效率,下面我们就来深入了解下... 目录1. pprof 介绍2. 快速上手:启用 pprof3. CPU Profiling:分析 C

Spring Boot整合消息队列RabbitMQ的实现示例

《SpringBoot整合消息队列RabbitMQ的实现示例》本文主要介绍了SpringBoot整合消息队列RabbitMQ的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的... 目录RabbitMQ 简介与安装1. RabbitMQ 简介2. RabbitMQ 安装Spring

springboot rocketmq配置生产者和消息者的步骤

《springbootrocketmq配置生产者和消息者的步骤》本文介绍了如何在SpringBoot中集成RocketMQ,包括添加依赖、配置application.yml、创建生产者和消费者,并展... 目录1. 添加依赖2. 配置application.yml3. 创建生产者4. 创建消费者5. 使用在

MySQL表锁、页面锁和行锁的作用及其优缺点对比分析

《MySQL表锁、页面锁和行锁的作用及其优缺点对比分析》MySQL中的表锁、页面锁和行锁各有特点,适用于不同的场景,表锁锁定整个表,适用于批量操作和MyISAM存储引擎,页面锁锁定数据页,适用于旧版本... 目录1. 表锁(Table Lock)2. 页面锁(Page Lock)3. 行锁(Row Lock

springboot的调度服务与异步服务使用详解

《springboot的调度服务与异步服务使用详解》本文主要介绍了Java的ScheduledExecutorService接口和SpringBoot中如何使用调度线程池,包括核心参数、创建方式、自定... 目录1.调度服务1.1.JDK之ScheduledExecutorService1.2.spring

Springboot中分析SQL性能的两种方式详解

《Springboot中分析SQL性能的两种方式详解》文章介绍了SQL性能分析的两种方式:MyBatis-Plus性能分析插件和p6spy框架,MyBatis-Plus插件配置简单,适用于开发和测试环... 目录SQL性能分析的两种方式:功能介绍实现方式:实现步骤:SQL性能分析的两种方式:功能介绍记录

如何通过Python实现一个消息队列

《如何通过Python实现一个消息队列》这篇文章主要为大家详细介绍了如何通过Python实现一个简单的消息队列,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录如何通过 python 实现消息队列如何把 http 请求放在队列中执行1. 使用 queue.Queue 和 reque

最长公共子序列问题的深度分析与Java实现方式

《最长公共子序列问题的深度分析与Java实现方式》本文详细介绍了最长公共子序列(LCS)问题,包括其概念、暴力解法、动态规划解法,并提供了Java代码实现,暴力解法虽然简单,但在大数据处理中效率较低,... 目录最长公共子序列问题概述问题理解与示例分析暴力解法思路与示例代码动态规划解法DP 表的构建与意义动

C#使用DeepSeek API实现自然语言处理,文本分类和情感分析

《C#使用DeepSeekAPI实现自然语言处理,文本分类和情感分析》在C#中使用DeepSeekAPI可以实现多种功能,例如自然语言处理、文本分类、情感分析等,本文主要为大家介绍了具体实现步骤,... 目录准备工作文本生成文本分类问答系统代码生成翻译功能文本摘要文本校对图像描述生成总结在C#中使用Deep

解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)

《解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)》该文章介绍了使用Redis的阻塞队列和Stream流的消息队列来优化秒杀系统的方案,通过将秒杀流程拆分为两条流水线,使用Redi... 目录Redis秒杀优化方案(阻塞队列+Stream流的消息队列)什么是消息队列?消费者组的工作方式每