本文主要是介绍tokio tcp通信,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
引入crate
tokio = { version = "1.35.1", features = ["full"] }
服务端
use std::time::Duration;
use tokio::{io::{AsyncBufReadExt, AsyncWriteExt},net::{tcp::{OwnedReadHalf, OwnedWriteHalf},TcpListener, TcpStream,},sync::mpsc,
};#[tokio::main]
async fn main() {println!("Begin Start Server...");let server = TcpListener::bind("127.0.0.1:10888").await.unwrap();while let Ok((client_stream, client_addr)) = server.accept().await{println!("accept client: {}", client_addr);tokio::spawn(async move{process_client(client_stream).await;});}
}async fn process_client(client_stream: TcpStream){let (client_reader, client_writer) = client_stream.into_split();let (msg_tx, msg_rx) = mpsc::channel::<String>(100);let mut read_task = tokio::spawn(async move {read_from_client(client_reader, msg_tx).await;});let mut write_task = tokio::spawn(async move{write_to_client(client_writer, msg_rx).await;});if tokio::try_join!(&mut read_task, &mut write_task).is_err() {read_task.abort();write_task.abort();};
}async fn read_from_client(reader: OwnedReadHalf, mst_tx: mpsc::Sender<String>){let mut buf_reader = tokio::io::BufReader::new(reader);let mut buf = String::new();loop{match buf_reader.read_line(&mut buf).await{Err(_e) =>{eprintln!("read from client error");break;}Ok(0) =>{println!("client closed");break;}Ok(n) => {buf.pop(); //去除末尾的回车符let mut content = buf.drain(..).as_str().to_string();println!("read {} bytes from client. content: {}", n, content);tokio::time::sleep(Duration::from_secs(1)).await;content.push('\n');if mst_tx.send(content).await.is_err(){eprintln!("receiver closed");break;}}}}
}async fn write_to_client(writer: OwnedWriteHalf, mut msg_rx: mpsc::Receiver<String>){let mut buf_writer = tokio::io::BufWriter::new(writer);while let Some(mut str) = msg_rx.recv().await{//str.push('\n');if let Err(e) = buf_writer.write_all(str.as_bytes()).await {eprintln!("write to client failed: {}", e);break;}buf_writer.flush().await;print!("write to client: {}", str);}
}
客户端
use std::sync;
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
use chrono::{Local, NaiveDateTime};use tokio::{io::{Interest, AsyncBufReadExt, AsyncWriteExt},net::{tcp::{OwnedReadHalf, OwnedWriteHalf},TcpStream,},sync::mpsc,
};#[tokio::main]
async fn main() {let stream = TcpStream::connect("127.0.0.1:10888").await.unwrap();let (reader, writer) = stream.into_split();let mut buf = String::new();//[0u8, 12];let mut buf_reader = tokio::io::BufReader::new(reader);let mut buf_writer = tokio::io::BufWriter::new(writer);loop{let now=Local::now();let formatted=now.format("%Y-%m-%d %H:%M:%S");let content = format!("hello world {}\n", formatted);//buf_writer.write_all(b"hello world\n").await;buf_writer.write_all(content.as_bytes()).await;buf_writer.flush().await;println!("send:{}", content);match buf_reader.read_line(&mut buf).await {Err(_e) =>{eprintln!("read from server error");break;}Ok(n) =>{buf.pop();let content = buf.as_str().to_string();println!("received: {} {}", n, content);}};}}
客户端、服务端都使用TcpStream的into_split方法获取网络通信读和写实例,进而获取buffer读写对象,通过channel实现线程执行同步。子线程使用tokio::spawn函数启动。
这篇关于tokio tcp通信的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!