Android BlueDroid分析: OSI中的reactor的实现与使用分析

2024-03-04 12:18

本文主要是介绍Android BlueDroid分析: OSI中的reactor的实现与使用分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

说明

actor相当于内核中的worker, 用于监控与执行任务. reactor可以认为是: re+actor.

监控使用的是epoll, 而执行任务使用的是对应的epoll_wait返回后的event type, 然后调用相关的read或者write函数来完成对应event type的处理.而epoll_wait对某个fd是监控和执行一次还是多次是使用eventfd来进行控制.
epoll可以看下面的recator_change_registration这个函数的实现.

而eventfd的控制可以search event_read与event_write来找到对应的控制点.

actor的类型

不断监控: 即epoll_wait返回后, 再继续监控. 体现在函数reactor_start,这个时候可以叫做reactor

一次性的: event发生后就被移除,不再监控. 体现在函数reactor_run_once, 这个时候可以叫做actor

epoll_wait的线程休眠问题

epoll_wait在reactor中是永远等待,在event来临之前不会timeout而返回(最后一个参数-1决定的):

do {ret = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1);} while (ret == -1 && errno == EINTR);
因此调用这个epoll_wait后, thread会进行到休眠等待状态.

函数

核心函数的实现

reactor关键的有两个函数

  1. reactor_start/reactor_run_once,run_reactor : epoll_wait
  2. reactor_new : eventfd, epoll_create, epoll_ctl(EPOLL_CTL_ADD)

结构体

struct reactor_t {int epoll_fd;  //用于epoll waitint event_fd;  // 用于reactor的控制, 例如停止监控pthread_mutex_t list_lock;  // protects invalidation_list.list_t *invalidation_list;  // reactor objects that have been unregistered.pthread_t run_thread;       // the pthread on which reactor_run is executing.bool is_running;            // indicates whether |run_thread| is valid.bool object_removed;
};struct reactor_object_t {int fd;                              // the file descriptor to monitor for events.void *context;                       // a context that's passed back to the *_ready functions.reactor_t *reactor;                  // the reactor instance this object is registered with.pthread_mutex_t lock;                // protects the lifetime of this object and all variables.void (*read_ready)(void *context);   // function to call when the file descriptor becomes readable.void (*write_ready)(void *context);  // function to call when the file descriptor becomes writeable.
};

每一个reactor都是使用reactor_object_t来定义, 即reactor_object_t注册到reactor_t中

需要注意的是里面有一个List, 存放着所有没有注册的reactor, 这个是在unregister的时候将前面注册过的reactor放入到这个List中.

新创建与注册

reactor_t *reactor_new(void) {reactor_t *ret = (reactor_t *)osi_calloc(sizeof(reactor_t));if (!ret)return NULL;ret->epoll_fd = INVALID_FD;ret->event_fd = INVALID_FD;// epoll用来监控ret->epoll_fd = epoll_create(MAX_EVENTS);if (ret->epoll_fd == INVALID_FD) {LOG_ERROR("%s unable to create epoll instance: %s", __func__, strerror(errno));goto error;}// eventfd作为semophore,用来协调和控制是否继续进行监控ret->event_fd = eventfd(0, 0);if (ret->event_fd == INVALID_FD) {LOG_ERROR("%s unable to create eventfd: %s", __func__, strerror(errno));goto error;}// 这个List用来将unregistered的reactor存放pthread_mutex_init(&ret->list_lock, NULL);ret->invalidation_list = list_new(NULL);if (!ret->invalidation_list) {LOG_ERROR("%s unable to allocate object invalidation list.", __func__);goto error;}// 将需要监控的fd放入到epoll中struct epoll_event event;memset(&event, 0, sizeof(event));event.events = EPOLLIN;event.data.ptr = NULL;if (epoll_ctl(ret->epoll_fd, EPOLL_CTL_ADD, ret->event_fd, &event) == -1) {LOG_ERROR("%s unable to register eventfd with epoll set: %s", __func__, strerror(errno));goto error;}return ret;error:;reactor_free(ret);return NULL;
}

register

reactor_object_t *reactor_register(reactor_t *reactor,int fd, void *context,void (*read_ready)(void *context),void (*write_ready)(void *context)) {assert(reactor != NULL);assert(fd != INVALID_FD);reactor_object_t *object = (reactor_object_t *)osi_calloc(sizeof(reactor_object_t));if (!object) {LOG_ERROR("%s unable to allocate reactor object: %s", __func__, strerror(errno));return NULL;}object->reactor = reactor;object->fd = fd;object->context = context;object->read_ready = read_ready;object->write_ready = write_ready;pthread_mutex_init(&object->lock, NULL);// 和reactor_change类似, 也是给给回调.struct epoll_event event;memset(&event, 0, sizeof(event));if (read_ready)event.events |= (EPOLLIN | EPOLLRDHUP);if (write_ready)event.events |= EPOLLOUT;event.data.ptr = object;// 重点就在于这里, 和上面的reactor_new类似if (epoll_ctl(reactor->epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {LOG_ERROR("%s unable to register fd %d to epoll set: %s", __func__, fd, strerror(errno));pthread_mutex_destroy(&object->lock);osi_free(object);return NULL;}return object;
}

修改reactor

bool reactor_change_registration(reactor_object_t *object, // 需要被修改的reactor可以从object获取void (*read_ready)(void *context), //传入函数指针,函数参数为void * contextvoid (*write_ready)(void *context)) {assert(object != NULL);struct epoll_event event;memset(&event, 0, sizeof(event));if (read_ready)  // 如果需要监控读的话,设置对应的FLAGevent.events |= (EPOLLIN | EPOLLRDHUP);if (write_ready)  // 如果需要监控写的话,设置对应的FLAGevent.events |= EPOLLOUT;event.data.ptr = object;// 更改epoll FLAGif (epoll_ctl(object->reactor->epoll_fd, EPOLL_CTL_MOD, object->fd, &event) == -1) {LOG_ERROR("%s unable to modify interest set for fd %d: %s", __func__, object->fd, strerror(errno));return false;}pthread_mutex_lock(&object->lock);object->read_ready = read_ready;  // 更改read event发生后的 callbackobject->write_ready = write_ready;pthread_mutex_unlock(&object->lock);return true;
}

run_reactor是实施监控的核心

// Runs the reactor loop for a maximum of |iterations|.
// 0 |iterations| means loop forever.
// |reactor| may not be NULL.
static reactor_status_t run_reactor(reactor_t *reactor, int iterations) {assert(reactor != NULL);reactor->run_thread = pthread_self();reactor->is_running = true;struct epoll_event events[MAX_EVENTS];for (int i = 0; iterations == 0 || i < iterations; ++i) {pthread_mutex_lock(&reactor->list_lock);list_clear(reactor->invalidation_list);pthread_mutex_unlock(&reactor->list_lock);int ret;do { // wait将block并等待Event发生ret = epoll_wait(reactor->epoll_fd, events, MAX_EVENTS, -1);} while (ret == -1 && errno == EINTR);if (ret == -1) {LOG_ERROR("%s error in epoll_wait: %s", __func__, strerror(errno));reactor->is_running = false;return REACTOR_STATUS_ERROR;}for (int j = 0; j < ret; ++j) {// The event file descriptor is the only one that registers with// a NULL data pointer. We use the NULL to identify it and break// out of the reactor loop.if (events[j].data.ptr == NULL) {eventfd_t value;eventfd_read(reactor->event_fd, &value); //监控的控制,即使用eventfd来完成epoll流程的更改,例如这里的退出监控reactor->is_running = false;return REACTOR_STATUS_STOP;}reactor_object_t *object = (reactor_object_t *)events[j].data.ptr;pthread_mutex_lock(&reactor->list_lock);if (list_contains(reactor->invalidation_list, object)) {pthread_mutex_unlock(&reactor->list_lock);continue;}// Downgrade the list lock to an object lock.pthread_mutex_lock(&object->lock);pthread_mutex_unlock(&reactor->list_lock);reactor->object_removed = false;if (events[j].events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && object->read_ready)//根据Event type来调用回调object->read_ready(object->context);//调用read回调if (!reactor->object_removed && events[j].events & EPOLLOUT && object->write_ready)object->write_ready(object->context); //调用write回调完成处理pthread_mutex_unlock(&object->lock);if (reactor->object_removed) {pthread_mutex_destroy(&object->lock);osi_free(object);}}}reactor->is_running = false;return REACTOR_STATUS_DONE;
}

使用分析与示例

创建是在thread创建的时候做的, 然后注册则是在需要监控某个fd的时候使用.

例如HCI H4使用的是串口, 那么在打开tty后有一个fd, 然后就需要对这个fd进行监控;

例如对其进行读写操作, 从而完成HCI的信息传输.

对于这个例子,调用代码的顺序如下:

在hci_hal_h4.c中有:

uart_stream = eager_reader_new(uart_fd, &allocator_malloc, HCI_HAL_SERIAL_BUFFER_SIZE, SIZE_MAX, "hci_single_channel");

其中uart_fd,就是打开串口的fd, 然后:

  ret->inbound_read_object = reactor_register(thread_get_reactor(ret->inbound_read_thread), // 从前面创建的thread中获取在thread中创建的reactor, 这样子因为reactor的epoll_wait会睡眠也就会在这个新创建的thread中睡眠了.fd_to_read,ret,inbound_data_waiting,NULL);

这里面说到的新创建的thread的创建位于eager_reader.c中:

  ret->inbound_read_thread = thread_new(thread_name);
这个thread_new会创建reactor:

osi/src/thread.c
调用流程如下:
thread_t *thread_new_sized(const char *name, size_t work_queue_capacity) {ret->reactor = reactor_new();
....
}

thread_t *thread_new(const char *name) {return thread_new_sized(name, DEFAULT_WORK_QUEUE_CAPACITY);
}


前面的fd变成了fd_to_read, 接下来就到了epoll_ctl中的fd:

(epoll_ctl(reactor->epoll_fd,EPOLL_CTL_ADD,fd, &event)== -1)

这个时候就被加入到监控列表中了, 但是还没有调用epoll_wait来进行监控,仅仅只是加入到epoll的监控fd中.

void eager_reader_register(eager_reader_t *reader, reactor_t *reactor, eager_reader_cb read_cb, void *context) {assert(reader != NULL);assert(reactor != NULL);assert(read_cb != NULL);// Make sure the reader isn't currently registered.eager_reader_unregister(reader);reader->outbound_read_ready = read_cb;reader->outbound_context = context;reader->outbound_registration = reactor_register(reactor, reader->bytes_available_fd, reader, internal_outbound_read_ready, NULL);
}

下面是hci_hal_h4.c中的处理:

  return eager_reader_read(uart_stream, buffer, max_size, block);


// SEE HEADER FOR THREAD SAFETY NOTE
size_t eager_reader_read(eager_reader_t *reader, uint8_t *buffer, size_t max_size, bool block) {assert(reader != NULL);assert(buffer != NULL);// If the caller wants nonblocking behavior, poll to see if we have// any bytes available before reading.if (!block && !has_byte(reader))//只有非block才会去has_type(里面是select尝试读取),见下面函数的分析return 0;// Find out how many bytes we have available in our various buffers.eventfd_t bytes_available;if (eventfd_read(reader->bytes_available_fd, &bytes_available) == -1) {LOG_ERROR("%s unable to read semaphore for output data.", __func__);return 0;}// 上面的eventfd_read会block, 要等到有数据才会才会返回了.if (max_size > bytes_available)max_size = bytes_available;size_t bytes_consumed = 0;while (bytes_consumed < max_size) {if (!reader->current_buffer)reader->current_buffer = fixed_queue_dequeue(reader->buffers);// 这个queue里面的数据是在inbound_data_waiting中read并enqueue的,在以后的eager_reader.c分析中会有说明.size_t bytes_to_copy = reader->current_buffer->length - reader->current_buffer->offset;if (bytes_to_copy > (max_size - bytes_consumed))bytes_to_copy = max_size - bytes_consumed;memcpy(&buffer[bytes_consumed], &reader->current_buffer->data[reader->current_buffer->offset], bytes_to_copy);bytes_consumed += bytes_to_copy;reader->current_buffer->offset += bytes_to_copy;if (reader->current_buffer->offset >= reader->current_buffer->length) {reader->allocator->free(reader->current_buffer);reader->current_buffer = NULL;}}// 将没有读完即unconsumed的字节数使用eventfd_write写回去,这样子下次来读有可以继续从前面读完的位置继续读取bytes_available -= bytes_consumed;if (eventfd_write(reader->bytes_available_fd, bytes_available) == -1) {LOG_ERROR("%s unable to write back bytes available for output data.", __func__);}return bytes_consumed;
}


has_byte实现

static bool has_byte(const eager_reader_t *reader) {assert(reader != NULL);fd_set read_fds;FD_ZERO(&read_fds);FD_SET(reader->bytes_available_fd, &read_fds);// Immediate timeoutstruct timeval timeout;timeout.tv_sec = 0;timeout.tv_usec = 0;
// 设置的timeout时间是0,所以仅仅是try read一把,不会block select(reader->bytes_available_fd + 1, &read_fds, NULL, NULL, &timeout);return FD_ISSET(reader->bytes_available_fd, &read_fds);
}

总结

以上就是reactor的分析. 总结起来便是:

  • 1. 创建一个reactor,这个被thread.c中封装在new thread中
  • 2. reactor_register完成reactor的注册
  • 3. 调用run_reactor的封装函数完成一次性或者不停的监控, 这个依然被thread.c中的run_thread进行了封装.
  • 4. 使用reactor_unregister来将fd从epoll中移除,放入到unregistered list中
  • 5. 线程stop/exit/kill的时候,使用reactor_stop来停止epoll监控.

这篇关于Android BlueDroid分析: OSI中的reactor的实现与使用分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/773140

相关文章

Spring IoC 容器的使用详解(最新整理)

《SpringIoC容器的使用详解(最新整理)》文章介绍了Spring框架中的应用分层思想与IoC容器原理,通过分层解耦业务逻辑、数据访问等模块,IoC容器利用@Component注解管理Bean... 目录1. 应用分层2. IoC 的介绍3. IoC 容器的使用3.1. bean 的存储3.2. 方法注

MySQL中查找重复值的实现

《MySQL中查找重复值的实现》查找重复值是一项常见需求,比如在数据清理、数据分析、数据质量检查等场景下,我们常常需要找出表中某列或多列的重复值,具有一定的参考价值,感兴趣的可以了解一下... 目录技术背景实现步骤方法一:使用GROUP BY和HAVING子句方法二:仅返回重复值方法三:返回完整记录方法四:

Python内置函数之classmethod函数使用详解

《Python内置函数之classmethod函数使用详解》:本文主要介绍Python内置函数之classmethod函数使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 类方法定义与基本语法2. 类方法 vs 实例方法 vs 静态方法3. 核心特性与用法(1编程客

IDEA中新建/切换Git分支的实现步骤

《IDEA中新建/切换Git分支的实现步骤》本文主要介绍了IDEA中新建/切换Git分支的实现步骤,通过菜单创建新分支并选择是否切换,创建后在Git详情或右键Checkout中切换分支,感兴趣的可以了... 前提:项目已被Git托管1、点击上方栏Git->NewBrancjsh...2、输入新的分支的

怎样通过分析GC日志来定位Java进程的内存问题

《怎样通过分析GC日志来定位Java进程的内存问题》:本文主要介绍怎样通过分析GC日志来定位Java进程的内存问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、GC 日志基础配置1. 启用详细 GC 日志2. 不同收集器的日志格式二、关键指标与分析维度1.

Linux中压缩、网络传输与系统监控工具的使用完整指南

《Linux中压缩、网络传输与系统监控工具的使用完整指南》在Linux系统管理中,压缩与传输工具是数据备份和远程协作的桥梁,而系统监控工具则是保障服务器稳定运行的眼睛,下面小编就来和大家详细介绍一下它... 目录引言一、压缩与解压:数据存储与传输的优化核心1. zip/unzip:通用压缩格式的便捷操作2.

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os

关于集合与数组转换实现方法

《关于集合与数组转换实现方法》:本文主要介绍关于集合与数组转换实现方法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、Arrays.asList()1.1、方法作用1.2、内部实现1.3、修改元素的影响1.4、注意事项2、list.toArray()2.1、方

使用Python实现可恢复式多线程下载器

《使用Python实现可恢复式多线程下载器》在数字时代,大文件下载已成为日常操作,本文将手把手教你用Python打造专业级下载器,实现断点续传,多线程加速,速度限制等功能,感兴趣的小伙伴可以了解下... 目录一、智能续传:从崩溃边缘抢救进度二、多线程加速:榨干网络带宽三、速度控制:做网络的好邻居四、终端交互

Python中注释使用方法举例详解

《Python中注释使用方法举例详解》在Python编程语言中注释是必不可少的一部分,它有助于提高代码的可读性和维护性,:本文主要介绍Python中注释使用方法的相关资料,需要的朋友可以参考下... 目录一、前言二、什么是注释?示例:三、单行注释语法:以 China编程# 开头,后面的内容为注释内容示例:示例:四