boost::asio::Io_service strand

2024-06-15 01:58
文章标签 boost io service asio strand

本文主要是介绍boost::asio::Io_service strand,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

构造函数 

构造函数的主要动作就是调用CreateIoCompletionPort创建了一个初始iocp。 

Dispatch和post的区别 

Post一定是PostQueuedCompletionStatus并且在GetQueuedCompletionStatus 之后执行。 

Dispatch会首先检查当前thread是不是io_service.run/runonce/poll/poll_once线程,如果是,则直接运行。 

poll和run的区别 

两者代码几乎一样,都是首先检查是否有outstanding的消息,如果没有直接返回,否则调用do_one()。唯一的不同是在调用size_t do_one(bool block, boost::system::error_code& ec)时前者block = false,后者block = true。 

该参数的作用体现在: 

BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred, 
&completion_key, &overlapped, block ? timeout : 0); 

因此可以看出,poll处理的是已经完成了的消息,也即GetQueuedCompletionStatus立刻能返回的。而run则会导致等待。 

poll的作用是依次处理当前已经完成了的消息,直到所有已经完成的消息处理完成为止。如果没有已经完成了得消息,函数将退出。poll不会等待。这个函数有点类似于PeekMessage。鉴于PeekMessage很少用到,poll的使用场景我也有点疑惑。poll的一个应用场景是如果希望handler的处理有优先级,也即,如果消息完成速度很快,同时可能完成多个消息,而消息的处理过程可能比较耗时,那么可以在完成之后的消息处理函数中不真正处理数据,而是把handler保存在队列中,然后按优先级统一处理。代码如下:

  1.   while (io_service.run_one())  {
  2.     // The custom invocation hook adds the handlers to the priority queue
  3.     // rather than executing them from within the poll_one() call.
  4.     while (io_service.poll_one())      ;    pri_queue.execute_all();  }
复制代码

循环执行poll_one让已经完成的消息的wrap_handler处理完毕,也即插入一个队列中,然后再统一处理之。这里的wrap_handler是一个class,在post的时候,用如下代码:

  1. io_service.post(pri_queue.wrap(0, low_priority_handler));
复制代码

或者

  1. acceptor.async_accept(server_socket, pri_queue.wrap(100, high_priority_handler));
  2.   template <typename Handler> wrapped_handler<Handler> handler_priority_queue::wrap(int priority, Handler handler)
  3.   {    return wrapped_handler<Handler>(*this, priority, handler);  }
复制代码

参见boost_asio/example/invocation/prioritised_handlers.cpp 

这个sample也同时表现了wrap的使用场景。 

也即把handler以及参数都wrap成一个object,然后把object插入一个队列,在pri_queue.execute_all中按优先级统一处理。 

run的作用是处理消息,如果有消息未完成将一直等待到所有消息完成并处理之后才退出。 

reset和stop 

文档中reset的解释是重置io_service以便下一次调用。 

当run,run_one,poll,poll_one是被stop掉导致退出,或者由于完成了所有任务(正常退出)导致退出时,在调用下一次run,run_one,poll,poll_one之前,必须调用此函数。reset不能在run,run_one,poll,poll_one正在运行时调用。如果是消息处理handler(用户代码)抛出异常,则可以在处理之后直接继续调用io.run,run_one,poll,poll_one。 例如:

  1. boost::asio::io_service io_service;
  2. ...
  3. for (;;)
  4. {
  5.   try
  6.   {
  7.     io_service.run();
  8.     break; // run() exited normally
  9.   }
  10.   catch (my_exception& e)
  11.   {
  12.     // Deal with exception as appropriate.
  13.   }
  14. }
复制代码

在抛出了异常的情况下,stopped_还没来得及被asio设置为1,所以无需调用reset。
reset函数的代码仅有一行:

  1. void reset()
  2. {
  3. ::InterlockedExchange(&stopped_, 0);
  4. }
复制代码

也即,当io.stop时,会设置stopped_=1。当完成所有任务时,也会设置。 

总的来说,单线程情况下,不管io.run是如何退出的,在下一次调用io.run之前调用一次reset没有什么坏处。例如:

  1. for(;;)
  2. {
  3. try
  4. {
  5. io.run();
  6. }
  7. catch(…)
  8. {
  9. }
  10. io.reset();
  11. }
复制代码

如果是多线程在运行io.run,则应该小心,因为reset必须是所有的run,run_one,poll,poll_one退出后才能调用。 

文档中的stop的解释是停止io_service的处理循环。 

此函数不是阻塞函数,也即,它仅仅只是给iocp发送一个退出消息而并不是等待其真正退出。因为poll和poll_one本来就不等待(GetQueuedCompletionStatus时timeout = 0),所以此函数对poll和poll_one无意义。对于run_one来说,如果该事件还未完成,则run_one会立刻返回。如果该事件已经完成,并且还在处理中,则stop并无特殊意义(会等待handler完成后自然退出)。对于run来说,stop的调用会导致run中的GetQueuedCompletionStatus立刻返回。并且由于设置了stopped = 1,此前完成的消息的handlers也不会被调用。考虑一下这种情况:在io.stop之前,有1k个消息已经完成但尚未处理,io.run正在依次从GetQueuedCompletionStatus中获得信息并且调用handlers,调用io.stop设置stopped=1将导致后许GetQueuedCompletionStatus返回的消息直接被丢弃,直到收到退出消息并退出io.run为止。

  1. void stop()
  2. {
  3. if (::InterlockedExchange(&stopped_, 1) == 0)
  4. {
  5. if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
  6. {
  7. DWORD last_error = ::GetLastError();
  8. boost::system::system_error e(
  9. boost::system::error_code(last_error,
  10. boost::asio::error::get_system_category()),
  11. "pqcs");
  12. boost::throw_exception(e);
  13. }
  14. }
  15. }
复制代码

注意除了让当前代码退出之外还有一个副作用就是设置了stopped_=1。这个副作用导致在stop之后如果不调用reset,所有run,run_one,poll,poll_one都将直接退出。 

另一个需要注意的是,stop会导致所有未完成的消息以及完成了但尚未处理得消息都直接被丢弃,不会导致handlers倍调用。 

注意这两个函数都不会CloseHandle(iocp.handle_),那是析构函数干的事情。 

注意此处有个细节:一次PostQueuedCompletionStatus仅导致一次GetQueuedCompletionStatus返回,那么如果有多个thread此时都在io.run,并且block在GetQueuedCompletionStatus时,调用io.stop将PostQueuedCompletionStatus并且导致一个thread的GetQueuedCompletionStatus返回。那么其他的thread呢?进入io_service的do_one(由run函数调用)代码可以看到,当GetQueuedCompletionStatus返回并且发现是退出消息时,会再发送一次PostQueuedCompletionStatus。代码如下:

  1.   else
  2.   {
  3.     // Relinquish responsibility for dispatching timers. If the io_service
  4.     // is not being stopped then the thread will get an opportunity to
  5.     // reacquire timer responsibility on the next loop iteration.
  6.     if (dispatching_timers)
  7.     {
  8.       ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id);
  9.     }
  10.     // The stopped_ flag is always checked to ensure that any leftover
  11.     // interrupts from a previous run invocation are ignored.
  12.     if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
  13.     {
  14.       // Wake up next thread that is blocked on GetQueuedCompletionStatus.
  15.       if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
  16.       {
  17.         last_error = ::GetLastError();
  18.         ec = boost::system::error_code(last_error,
  19.             boost::asio::error::get_system_category());
  20.         return 0;
  21.       }
  22.       ec = boost::system::error_code();
  23.       return 0;
  24.     }
  25.   }
  26. }
复制代码

Wrap 

这个函数是一个语法糖。

  1. Void func(int a);
  2. io_service.wrap(func)(a);
复制代码

相当于
io_service.dispatch(bind(func,a)); 

可以保存io_service.wrap(func)到g,以便在稍后某些时候调用g(a); 

例如: 

socket_.async_read_some(boost::asio::buffer(buffer_),      strand_.wrap( 
        boost::bind(&connection::handle_read, shared_from_this(), 
          boost::asio::placeholders::error, 
          boost::asio::placeholders::bytes_transferred))); 

这是一个典型的wrap用法。注意async_read_some要求的参数是一个handler,在read_some结束后被调用。由于希望真正被调用的handle_read是串行化的,在这里再post一个消息给io_service。以上代码类似于:

  1. void A::func(error,bytes_transferred)
  2. {
  3. strand_.dispatch(boost::bind(handle_read,shared_from_this(),error,bytes_transferred);
  4. }
  5. socket_.async_read_some(boost::asio::buffer(buffer_), func);
复制代码

注意1点: 
io_service.dispatch(bind(func,a1,…an)),这里面都是传值,无法指定bind(func,ref(a1)…an)); 所以如果要用ref语义,则应该在传入wrap时显式指出。例如:

  1. void func(int& i){i+=1;}
  2. void main()
  3. {
  4. int i = 0;
  5. boost::asio::io_service io;
  6. io.wrap(func)(boost::ref(i));
  7. io.run();
  8. printf("i=%d/n");
  9. }
复制代码

当然在某些场合下,传递shared_ptr也是可以的(也许更好)。 

从handlers抛出的异常的影响 

当handlers抛出异常时,该异常会传递到本线程最外层的io.run,run_one,poll,poll_one,不会影响其他线程。捕获该异常是程序员自己的责任。 

例如:

  1. boost::asio::io_service io_service;
  2. Thread1,2,3,4()
  3. {
  4. for (;;)
  5. {
  6. try
  7. {
  8. io_service.run();
  9. break; // run() exited normally
  10. }
  11. catch (my_exception& e)
  12. {
  13. // Deal with exception as appropriate.
  14. }
  15. }
  16. }
  17. Void func(void)
  18. {
  19. throw 1;
  20. }
  21. Thread5()
  22. {
  23. io_service.post(func);
  24. }
复制代码

注意这种情况下无需调用io_service.reset()。 

这种情况下也不能调用reset,因为调用reset之前必须让所有其他线程正在调用的io_service.run退出。(reset调用时不能有任何run,run_one,poll,poll_one正在运行) 

Work 

有些应用程序希望在没有pending的消息时,io.run也不退出。比如io.run运行于一个后台线程,该线程在程序的异步请求发出之前就启动了。 

可以通过如下代码实现这种需求:

  1. main()
  2. {
  3. boost::asio::io_service io_service;
  4. boost::asio::io_service::work work(io_service);
  5. Create thread
  6. Getchar();
  7. }
  8. Thread()
  9. {
  10. Io_service.run();
  11. }
复制代码

这种情况下,如果work不被析构,该线程永远不会退出。在work不被析构得情况下就让其退出,可以调用io.stop。这将导致io.run立刻退出,所有未完成的消息都将丢弃。已完成的消息(但尚未进入handler的)也不会调用其handler函数(由于在stop中设置了stopped_= 1)。 

如果希望所有发出的异步消息都正常处理之后io.run正常退出,work对象必须析构,或者显式的删除。

  1. boost::asio::io_service io_service;
  2. auto_ptr<boost::asio::io_service::work> work(
  3. new boost::asio::io_service::work(io_service));
  4. ...
  5. work.reset(); // Allow run() to normal exit.
复制代码

work是一个很小的辅助类,只支持构造函数和析构函数。(还有一个get_io_service返回所关联的io_service) 

代码如下:

  1. inline io_service::work::work(boost::asio::io_service& io_service)
  2. : io_service_(io_service)
  3. {
  4. io_service_.impl_.work_started();
  5. }
  6. inline io_service::work::work(const work& other)
  7. : io_service_(other.io_service_)
  8. {
  9. io_service_.impl_.work_started();
  10. }
  11. inline io_service::work::~work()
  12. {
  13. io_service_.impl_.work_finished();
  14. }
  15. void work_started()
  16. {
  17. ::InterlockedIncrement(&outstanding_work_);
  18. }
  19. // Notify that some work has finished.
  20. void work_finished()
  21. {
  22. if (::InterlockedDecrement(&outstanding_work_) == 0)
  23. stop();
  24. }
复制代码

可以看出构造一个work时,outstanding_work_+1,使得io.run在完成所有异步消息后判断outstanding_work_时不会为0,因此会继续调用GetQueuedCompletionStatus并阻塞在这个函数上。 

而析构函数中将其-1,并判断其是否为0,如果是,则post退出消息给GetQueuedCompletionStatus让其退出。 

因此work如果析构,则io.run会在处理完所有消息之后正常退出。work如果不析构,则io.run会一直运行不退出。如果用户直接调用io.stop,则会让io.run立刻退出。 

特别注意的是,work提供了一个拷贝构造函数,因此可以直接在任意地方使用。对于一个io_service来说,有多少个work实例关联,则outstanding_work_就+1了多少次,只有关联到同一个io_service的work全被析构之后,io.run才会在所有消息处理结束之后正常退出。 

strand 

strand是另一个辅助类,提供2个接口dispatch和post,语义和io_service的dispatch和post类似。区别在于,同一个strand所发出的dispatch和post绝对不会并行执行,dispatch和post所包含的handlers也不会并行。因此如果希望串行处理每一个tcp连接,则在accept之后应该在该连接的数据结构中构造一个strand,并且所有dispatch/post(recv/send)操作都由该strand发出。strand的作用巨大,考虑如下场景:有多个thread都在执行async_read_some,那么由于线程调度,很有可能后接收到的包先被处理,为了避免这种情况,就只能收完数据后放入一个队列中,然后由另一个线程去统一处理。

  1. void connection::start()
  2. {
  3. socket_.async_read_some(boost::asio::buffer(buffer_),
  4. strand_.wrap(
  5. boost::bind(&connection::handle_read, shared_from_this(),
  6. boost::asio::placeholders::error,
  7. boost::asio::placeholders::bytes_transferred)));
  8. }
复制代码

不使用strand的处理方式: 

前端tcp iocp收包,并且把同一个tcp连接的包放入一个list,如果list以前为空,则post一个消息给后端vnn iocp。后端vnn iocp收到post的消息后循环从list中获取数据,并且处理,直到list为空为止。处理结束后重新调用GetQueuedCompletionStatus进入等待。如果前端tcp iocp发现list过大,意味着处理速度小于接收速度,则不再调用iocpRecv,并且设置标志,当vnn iocp thread处理完了当前所有积压的数据包后,检查这个标志,重新调用一次iocpRecv。 

使用strand的处理方式: 

前端tcp iocp收包,收到包后直接通过strand.post(on_recved)发给后端vnn iocp。后端vnn iocp处理完之后再调用一次strand.async_read_some。 

这两种方式我没看出太大区别来。如果对数据包的处理的确需要阻塞操作,例如db query,那么使用后端iocp以及后端thread是值得考虑的。这种情况下,前端iocp由于仅用来异步收发数据,因此1个thread就够了。在确定使用2级iocp的情况下,前者似乎更为灵活,也没有增加什么开销。 

值得讨论的是,如果后端多个thread都处于db query状态,那么实际上此时依然没有thread可以提供数据处理服务,因此2级iocp意义其实就在于在这种情况下,前端tcp iocp依然可以accept,以及recv第一次数据,不会导致用户connect不上的情况。在后端thread空闲之后会处理这期间的recv到的数据并在此async_read_some。 

如果是单级iocp(假定handlers没有阻塞操作),多线程,那么strand的作用很明显。这种情况下,很明显应该让一个tcp连接的数据处理过程串行化。 

Strand的实现原理

Strand内部实现机制稍微有点复杂。每次发出strand请求(例如async_read(strand_.wrap(funobj1))),strand再次包裹了一次成为funobj2。在async_read完成时,系统调用funobj2,检查是否正在执行该strand所发出的完成函数(检查该strand的一个标志位),如果没有,则直接调用funobj2。如果有,则检查是否就是当前thread在执行,如果是,则直接调用funobj2(这种情况可能发生在嵌套调用的时候,但并不产生同步问题,就像同一个thread可以多次进入同一个critical_session一样)。如果不是,则把该funobj2插入到strand内部维护的一个队列中。

在调用funobj2时,首先调用funobj1,调用完成之后,检查队列中是否还有等待执行的funobj2,如果有,则执行队列中的第一个

这篇关于boost::asio::Io_service strand的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1062116

相关文章

ASIO网络调试助手之一:简介

多年前,写过几篇《Boost.Asio C++网络编程》的学习文章,一直没机会实践。最近项目中用到了Asio,于是抽空写了个网络调试助手。 开发环境: Win10 Qt5.12.6 + Asio(standalone) + spdlog 支持协议: UDP + TCP Client + TCP Server 独立的Asio(http://www.think-async.com)只包含了头文件,不依

Java IO 操作——个人理解

之前一直Java的IO操作一知半解。今天看到一个便文章觉得很有道理( 原文章),记录一下。 首先,理解Java的IO操作到底操作的什么内容,过程又是怎么样子。          数据来源的操作: 来源有文件,网络数据。使用File类和Sockets等。这里操作的是数据本身,1,0结构。    File file = new File("path");   字

springboot体会BIO(阻塞式IO)

使用springboot体会阻塞式IO 大致的思路为: 创建一个socket服务端,监听socket通道,并打印出socket通道中的内容。 创建两个socket客户端,向socket服务端写入消息。 1.创建服务端 public class RedisServer {public static void main(String[] args) throws IOException {

Java基础回顾系列-第七天-高级编程之IO

Java基础回顾系列-第七天-高级编程之IO 文件操作字节流与字符流OutputStream字节输出流FileOutputStream InputStream字节输入流FileInputStream Writer字符输出流FileWriter Reader字符输入流字节流与字符流的区别转换流InputStreamReaderOutputStreamWriter 文件复制 字符编码内存操作流(

android java.io.IOException: open failed: ENOENT (No such file or directory)-api23+权限受权

问题描述 在安卓上,清单明明已经受权了读写文件权限,但偏偏就是创建不了目录和文件 调用mkdirs()总是返回false. <uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE"/><uses-permission android:name="android.permission.READ_E

UserWarning: mkl-service package failed to import

安装完成anaconda,并设置了两个环境变量  之后再控制台运行python环境,输入import numpy as np,提示错误 D:\InstallFolder\Anaconda3\lib\site-packages\numpy\__init__.py:143: UserWarning: mkl-service package failed to import, therefore

JavaEE-文件操作与IO

目录 1,两种路径 二,两种文件 三,文件的操作/File类: 1)文件系统操作 File类 2)文件内容操作(读文件,写文件) (1)打开文件 (2)关闭文件 (3)读文件/InputStream (4)写文件/OutputStream (5)读文件/reader (6)写文件/writer (7)Scanner 四,练习: 1,两种路径 1)绝对路径

Python---文件IO流及对象序列化

文章目录 前言一、pandas是什么?二、使用步骤 1.引入库2.读入数据总结 前言 前文模块中提到加密模块,本文将终点介绍加密模块和文件流。 一、文件流和IO流概述         在Python中,IO流是用于输入和输出数据的通道。它可以用于读取输入数据或将数据写入输出目标。IO流可以是标准输入/输出流(stdin和stdout),也可以是文件流,网络流等。

标准IO与系统IO

概念区别 标准IO:(libc提供) fopen fread fwrite 系统IO:(linux系统提供) open read write 操作效率 因为内存与磁盘的执行效率不同 系统IO: 把数据从内存直接写到磁盘上 标准IO: 数据写到缓存,再刷写到磁盘上

Boost程序库入门学习

优秀的程序员要能够知其所以然,而不是重复的造轮子,近期目标是学习优秀的第三方库,同时尝试使用C++11/14新特性,然后吸取精华用到项目中去,加油~ 参考书籍: 罗剑锋写的《Boost程序库完全开发指南》和《Boost程序库探秘》,前者是一个大体的介绍,后者是针对一些诸如模板元编程等高级特性做了深入的探讨。 一、Boost库概述 Boost是一个功能强大、构造精巧、跨平台、开源并且完全