Java 网络编程之TCP(五):分析服务端注册OP_WRITE写数据的各种场景(三)

2024-04-25 12:52

本文主要是介绍Java 网络编程之TCP(五):分析服务端注册OP_WRITE写数据的各种场景(三),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在服务端使用多线程对同个客户端进行读写,会带来意想不到的问题。

前面的文章中,服务端都是在一个单线程main中,处理所有接收到的IO事件,为了提高效率,会自然的想到,为OP_READ和OP_WRITE事件分配多线程处理。

需求:服务端把接收到的数据,原样返回给客户端

服务端代码如下:

直接在单线程的代码上,把单线程的read和write逻辑,放入一个单独的线程

服务代码如下:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;public class SocketMultiplexingSingleThreadv2_2 {private ServerSocketChannel server = null;private Selector selector = null;   //linux 多路复用器(select poll epoll) nginx  event{}int port = 9090;public void initServer() {try {server = ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(port));selector = Selector.open();  //  select  poll  *epollserver.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}public void start() {initServer();System.out.println("服务器启动了。。。。。");try {while (true) {
//                Set<SelectionKey> keys = selector.keys();
//                System.out.println(keys.size()+"   size");while (selector.select(50) > 0) {Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iter = selectionKeys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {acceptHandler(key);} else if (key.isReadable()) {
//                            key.cancel();  //现在多路复用器里把key  cancel了System.out.println("in.....");readHandler(key);//还是阻塞的嘛? 即便以抛出了线程去读取,但是在时差里,这个key的read事件会被重复触发} else if(key.isWritable()){  //我之前没讲过写的事件!!!!!//写事件<--  send-queue  只要是空的,就一定会给你返回可以写的事件,就会回调我们的写方法//你真的要明白:什么时候写?不是依赖send-queue是不是有空间//1,你准备好要写什么了,这是第一步//2,第二步你才关心send-queue是否有空间//3,so,读 read 一开始就要注册,但是write依赖以上关系,什么时候用什么时候注册//4,如果一开始就注册了write的事件,进入死循环,一直调起!!!
//                            key.cancel();key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);writeHandler(key);}}}}} catch (IOException e) {e.printStackTrace();}}private void writeHandler(SelectionKey key) {new Thread(()->{System.out.println("write handler...");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.flip();while (buffer.hasRemaining()) {try {int write = client.write(buffer);System.out.println("write " + Thread.currentThread().getName()+ " " + write);} catch (IOException e) {e.printStackTrace();}}try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}buffer.clear();
//            key.cancel();//            try {client.shutdownOutput();
//client.close();
//
//            } catch (IOException e) {
//                e.printStackTrace();
//            }}).start();}public void acceptHandler(SelectionKey key) {try {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel client = ssc.accept();client.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(8192);client.register(selector, SelectionKey.OP_READ, buffer);System.out.println("-------------------------------------------");System.out.println("新客户端:" + client.getRemoteAddress());System.out.println("-------------------------------------------");} catch (IOException e) {e.printStackTrace();}}public void readHandler(SelectionKey key) {new Thread(()->{System.out.println("read handler.....");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.clear();int read = 0;try {while (true) {read = client.read(buffer);System.out.println("read " + Thread.currentThread().getName()+ " " + read);if (read > 0) {client.register(key.selector(), key.interestOps() + SelectionKey.OP_WRITE,buffer);} else if (read == 0) {break;} else {client.close();break;}}} catch (IOException e) {try {System.out.println("client " + client.getRemoteAddress() + " disconnected");client.close();} catch (IOException ex) {throw new RuntimeException(ex);}e.printStackTrace();}}).start();}public static void main(String[] args) {SocketMultiplexingSingleThreadv2_2 service = new SocketMultiplexingSingleThreadv2_2();service.start();}
}

测试:

先启动一个服务端,再启动一个客户端,客户端发送数据

服务端日志:

服务器启动了。。。。。
-------------------------------------------
新客户端:/127.0.0.1:21598
-------------------------------------------
in.....
in.....
read handler.....
in.....
read handler.....
read Thread-0 5
read Thread-1 0
read handler.....
read Thread-2 0
read Thread-0 0
write handler...

客户端日志:

client connected to server
1234
client receive data from consolejava.io.BufferedInputStream@6acfcaf3 : 1234

可以看到,客户端发送数据,没有接收到服务端返回的数据;

服务端接收到数据后,在写数据的时候,buffer中没有数据可写;

再仔细看下服务端的日志,可以同个客户端只发送一条数据的时候,有3个线程来处理,其他两个线程读到的数据都是0;


一个客户端的读事件,分配一个线程处理,但是线程还没处理完,下个读事件就来了,就又分配一个线程处理。。。而同一个客户端共享一个buffer,在register OP_READ的时候attach的。
这样使得buffer中的数据还没来得及写出去,就被其他读线程给冲掉了(read == 0);

tip:read事件来的时候,如果不读取数据,read事件会一直有的

解决方法:不可以并发读同一个client, 在处理一个Client的 OP_READ的时候先取消 OP_READ的注册,读完了后,在注册一个 OP_READ

新的服务端代码:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;public class SocketMultiplexingSingleThreadv2 {private ServerSocketChannel server = null;private Selector selector = null;   //linux 多路复用器(select poll epoll) nginx  event{}int port = 9090;public void initServer() {try {server = ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(port));selector = Selector.open();  //  select  poll  *epollserver.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}public void start() {initServer();System.out.println("服务器启动了。。。。。");try {while (true) {
//                Set<SelectionKey> keys = selector.keys();
//                System.out.println(keys.size()+"   size");while (selector.select(50) > 0) {Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iter = selectionKeys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {acceptHandler(key);} else if (key.isReadable()) {
//                            key.cancel();  //现在多路复用器里把key  cancel了System.out.println("in.....");// 同一个Client,读之前先取消OP_READ,防止多线程冲突吹key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);readHandler(key);//还是阻塞的嘛? 即便以抛出了线程去读取,但是在时差里,这个key的read事件会被重复触发} else if(key.isWritable()){  //我之前没讲过写的事件!!!!!//写事件<--  send-queue  只要是空的,就一定会给你返回可以写的事件,就会回调我们的写方法//你真的要明白:什么时候写?不是依赖send-queue是不是有空间//1,你准备好要写什么了,这是第一步//2,第二步你才关心send-queue是否有空间//3,so,读 read 一开始就要注册,但是write依赖以上关系,什么时候用什么时候注册//4,如果一开始就注册了write的事件,进入死循环,一直调起!!!
//                            key.cancel();key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);writeHandler(key);}}}}} catch (IOException e) {e.printStackTrace();}}private void writeHandler(SelectionKey key) {new Thread(()->{System.out.println("write handler...");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.flip();while (buffer.hasRemaining()) {try {int write = client.write(buffer);System.out.println("write " + Thread.currentThread().getName()+ " " + write);} catch (IOException e) {e.printStackTrace();}}try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}buffer.clear();
//            key.cancel();//            try {client.shutdownOutput();
//client.close();
//
//            } catch (IOException e) {
//                e.printStackTrace();
//            }}).start();}public void acceptHandler(SelectionKey key) {try {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel client = ssc.accept();client.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(8192);client.register(selector, SelectionKey.OP_READ, buffer);System.out.println("-------------------------------------------");System.out.println("新客户端:" + client.getRemoteAddress());System.out.println("-------------------------------------------");} catch (IOException e) {e.printStackTrace();}}public void readHandler(SelectionKey key) {new Thread(()->{System.out.println("read handler.....");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.clear();int read = 0;try {while (true) {read = client.read(buffer);System.out.println(Thread.currentThread().getName()+ " " + read);if (read > 0) {// 同一个Client,读完数据后,要再次注册OP_READ,读后面发送过来的数据key.interestOps(  SelectionKey.OP_READ);client.register(key.selector(), key.interestOps() + SelectionKey.OP_WRITE,buffer);} else if (read == 0) {break;} else {client.close();break;}}} catch (IOException e) {try {System.out.println("client " + client.getRemoteAddress() + " disconnected");client.close();} catch (IOException ex) {throw new RuntimeException(ex);}e.printStackTrace();}}).start();}public static void main(String[] args) {SocketMultiplexingSingleThreadv2 service = new SocketMultiplexingSingleThreadv2();service.start();}
}

测试:

先启动一个服务端,再启动一个客户端1,客户端1发送数据

服务端日志:

服务器启动了。。。。。
-------------------------------------------
新客户端:/127.0.0.1:24029
-------------------------------------------
in.....
read handler.....
Thread-0 8
Thread-0 0
write handler...
write Thread-1 8

客户端1日志:

client connected to server
client1
client receive data from consolejava.io.BufferedInputStream@65231a33 : client1client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:8: client1

可以看到,客户单和服务端都可以正常接收和发送数据。

再添加一个客户端2,发送数据

服务端日志:

服务器启动了。。。。。
-------------------------------------------
新客户端:/127.0.0.1:24029
-------------------------------------------
in.....
read handler.....
Thread-0 8
Thread-0 0
write handler...
write Thread-1 8
-------------------------------------------
新客户端:/127.0.0.1:24105
-------------------------------------------
in.....
read handler.....
Thread-2 8
Thread-2 0
write handler...
write Thread-3 8

客户端2的日志:

client connected to server
client2
client receive data from consolejava.io.BufferedInputStream@65231a33 : client2client receive data from serverjava.net.Socket$SocketInputStream@27f8302d data size:8: client2

可以看到,客户端2和服务端都可以正常接收和发送数据。

客户端1,再次发送数据

客户端日志:

client connected to server
client1
client receive data from consolejava.io.BufferedInputStream@65231a33 : client1client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:8: client1clent1_2
client receive data from consolejava.io.BufferedInputStream@65231a33 : clent1_2client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:9: clent1_2

服务端日志:

服务器启动了。。。。。
-------------------------------------------
新客户端:/127.0.0.1:24029
-------------------------------------------
in.....
read handler.....
Thread-0 8
Thread-0 0
write handler...
write Thread-1 8
-------------------------------------------
新客户端:/127.0.0.1:24105
-------------------------------------------
in.....
read handler.....
Thread-2 8
Thread-2 0
write handler...
write Thread-3 8
in.....
read handler.....
Thread-4 9
Thread-4 0
write handler...
write Thread-5 9

从服务端日志中,可以看到,每个客户端的读事件,只有一个线程处理。

整个处理流程是服务预期的。

这篇关于Java 网络编程之TCP(五):分析服务端注册OP_WRITE写数据的各种场景(三)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/934738

相关文章

Spring Boot @RestControllerAdvice全局异常处理最佳实践

《SpringBoot@RestControllerAdvice全局异常处理最佳实践》本文详解SpringBoot中通过@RestControllerAdvice实现全局异常处理,强调代码复用、统... 目录前言一、为什么要使用全局异常处理?二、核心注解解析1. @RestControllerAdvice2

Spring IoC 容器的使用详解(最新整理)

《SpringIoC容器的使用详解(最新整理)》文章介绍了Spring框架中的应用分层思想与IoC容器原理,通过分层解耦业务逻辑、数据访问等模块,IoC容器利用@Component注解管理Bean... 目录1. 应用分层2. IoC 的介绍3. IoC 容器的使用3.1. bean 的存储3.2. 方法注

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语

Spring事务传播机制最佳实践

《Spring事务传播机制最佳实践》Spring的事务传播机制为我们提供了优雅的解决方案,本文将带您深入理解这一机制,掌握不同场景下的最佳实践,感兴趣的朋友一起看看吧... 目录1. 什么是事务传播行为2. Spring支持的七种事务传播行为2.1 REQUIRED(默认)2.2 SUPPORTS2

怎样通过分析GC日志来定位Java进程的内存问题

《怎样通过分析GC日志来定位Java进程的内存问题》:本文主要介绍怎样通过分析GC日志来定位Java进程的内存问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、GC 日志基础配置1. 启用详细 GC 日志2. 不同收集器的日志格式二、关键指标与分析维度1.

Java进程异常故障定位及排查过程

《Java进程异常故障定位及排查过程》:本文主要介绍Java进程异常故障定位及排查过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、故障发现与初步判断1. 监控系统告警2. 日志初步分析二、核心排查工具与步骤1. 进程状态检查2. CPU 飙升问题3. 内存

Linux中压缩、网络传输与系统监控工具的使用完整指南

《Linux中压缩、网络传输与系统监控工具的使用完整指南》在Linux系统管理中,压缩与传输工具是数据备份和远程协作的桥梁,而系统监控工具则是保障服务器稳定运行的眼睛,下面小编就来和大家详细介绍一下它... 目录引言一、压缩与解压:数据存储与传输的优化核心1. zip/unzip:通用压缩格式的便捷操作2.

java中新生代和老生代的关系说明

《java中新生代和老生代的关系说明》:本文主要介绍java中新生代和老生代的关系说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、内存区域划分新生代老年代二、对象生命周期与晋升流程三、新生代与老年代的协作机制1. 跨代引用处理2. 动态年龄判定3. 空间分

Java设计模式---迭代器模式(Iterator)解读

《Java设计模式---迭代器模式(Iterator)解读》:本文主要介绍Java设计模式---迭代器模式(Iterator),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录1、迭代器(Iterator)1.1、结构1.2、常用方法1.3、本质1、解耦集合与遍历逻辑2、统一

Java内存分配与JVM参数详解(推荐)

《Java内存分配与JVM参数详解(推荐)》本文详解JVM内存结构与参数调整,涵盖堆分代、元空间、GC选择及优化策略,帮助开发者提升性能、避免内存泄漏,本文给大家介绍Java内存分配与JVM参数详解,... 目录引言JVM内存结构JVM参数概述堆内存分配年轻代与老年代调整堆内存大小调整年轻代与老年代比例元空