本文主要是介绍Rust实现基于Tokio的限制内存占用的channel,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Rust实现基于Tokio的限制内存占用的channel
简介
本文介绍如何基于tokio的channel实现一个限制内存占用的channel。
Tokio提供了多种协程间同步的接口,用于在不同的协程中同步数据。
常用的channel有两种:bounded
和unbounded
,其中ubbounded
的channel可以无限的发送数据,而bounded
的channel则有限的发送数据。两种channel都没有对自身的内存占用做出限制。
异步网络编程中常用一个channel
连接两个task,其中业务task
与业务交互:将要发送的数据发送到channel,而网络task
与操作系统交互:从channel中接收数据并写入socket。单有时候带宽有限或者对端接收速率过慢时,而网络task
从channel中接收的速度小于业务task
向channel中发送的速度时,会造成大量的数据阻塞在channel中,如果不对channel的占用内存做限制,则会造成内存占用过多甚至进程被OOM
。
实现
-
获取数据大小
要想限制channel总的内存占用,必须要直到每个数据的大小。比较常见的作法是所有需要发送到channel的内容都必须实现一个Trait,此Trait中定义了一个
get_size
方法,用于获取数据的大小。pub trait GetSize {/// get total sizefn get_size(&self) -> usize; }
要发送的内容必须实现
GetSize
的Trait,并实现get_size
方法。注意:get_size
方法获取到的大小需包括栈空间和堆空间,例如:struct MyData {data: Vec<u8>,}impl GetSize for MyData {fn get_size(&self) -> usize {return std::mem::size_of::<MyData>() + self.data.len();//stack size + heap size}}
-
创建
SizedSender
和SizedReceiver
SizedSender
和SizedReceiver
都可以基于tokio的UnboundedSender
和UnboundedReceiver
实现。在tokio的基础上,需要共享一个条件变量用于在sender和receiver之间同步当前是否还有可用空间。pub struct SizedSender<T: GetSize> {inner: mpsc::UnboundedSender<T>,size_semaphore: Arc<(Semaphore, usize)>, } pub struct SizedReceiver<T: GetSize> {inner: mpsc::UnboundedReceiver<T>,size_semaphore: Arc<(Semaphore, usize)>, }/// Limit space usage but not limit the number of messages, bytes_size must bigger than 0. pub fn sized_channel<T: GetSize>(bytes_size: usize) -> (SizedSender<T>, SizedReceiver<T>) {let (tx, rx) = mpsc::unbounded_channel::<T>();let semaphore = Arc::new((Semaphore::new(bytes_size), bytes_size));(SizedSender::new(tx, semaphore.clone()),SizedReceiver::new(rx, semaphore),) }
-
SizedSender
实现发送端发送时需要调用
get_size
方法获取数据的大小,然后调用Semaphore::available_permits
方法获取可用空间,如果可用空间大于数据大小,则发送成功,否则发送失败。impl<T: GetSize> SizedSender<T> {pub fn new(inner: mpsc::UnboundedSender<T>, size_semaphore: Arc<(Semaphore, usize)>) -> Self {Self {inner,size_semaphore,}}fn do_send(&self,message: T,permits: Option<SemaphorePermit<'_>>,) -> Result<(), SendError<T>> {match self.inner.send(message) {Ok(r) => {if let Some(permits) = permits {permits.forget();}Ok(r)}Err(e) => {log::debug!("send value error!");Err(e)}}}pub async fn send(&self, message: T) -> Result<(), SendError<T>> {let message_size = message.get_size();if message_size > self.size_semaphore.1 {return Err(SendError(message));}let size = match u32::try_from(message_size) {Ok(size) => size,Err(_) => {return Err(SendError(message));}};if self.size_semaphore.0.available_permits() < size as usize {// The buffer is about to be depleted, sending may be blocked.}let permits = match self.size_semaphore.0.acquire_many(size).await {Ok(perimits) => Some(perimits),Err(_) => {return Err(SendError(message));}};self.do_send(message, permits)}}
-
SizedReceiver
的实现接收端接收时需要调用
get_size
方法获取数据的大小,然后将相应大小的permits
还给信号量即可。impl<T: GetSize> SizedReceiver<T> { pub fn new(inner: mpsc::UnboundedReceiver<T>, size_semaphore: Arc<(Semaphore, usize)>) -> Self {Self {inner,size_semaphore,} }pub async fn recv(&mut self) -> Option<T> {self.inner.recv().await.map(|r| {let message_size = r.get_size();self.size_semaphore.0.add_permits(message_size);r}) } }
-
其他
在上述实现的基础上,还可以实现更多方法,比如
try_send
、try_recv
等。
这篇关于Rust实现基于Tokio的限制内存占用的channel的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!