Java 网络编程之TCP(五):分析服务端注册OP_WRITE写数据的各种场景(三)

2024-04-25 12:52

本文主要是介绍Java 网络编程之TCP(五):分析服务端注册OP_WRITE写数据的各种场景(三),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在服务端使用多线程对同个客户端进行读写,会带来意想不到的问题。

前面的文章中,服务端都是在一个单线程main中,处理所有接收到的IO事件,为了提高效率,会自然的想到,为OP_READ和OP_WRITE事件分配多线程处理。

需求:服务端把接收到的数据,原样返回给客户端

服务端代码如下:

直接在单线程的代码上,把单线程的read和write逻辑,放入一个单独的线程

服务代码如下:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;public class SocketMultiplexingSingleThreadv2_2 {private ServerSocketChannel server = null;private Selector selector = null;   //linux 多路复用器(select poll epoll) nginx  event{}int port = 9090;public void initServer() {try {server = ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(port));selector = Selector.open();  //  select  poll  *epollserver.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}public void start() {initServer();System.out.println("服务器启动了。。。。。");try {while (true) {
//                Set<SelectionKey> keys = selector.keys();
//                System.out.println(keys.size()+"   size");while (selector.select(50) > 0) {Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iter = selectionKeys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {acceptHandler(key);} else if (key.isReadable()) {
//                            key.cancel();  //现在多路复用器里把key  cancel了System.out.println("in.....");readHandler(key);//还是阻塞的嘛? 即便以抛出了线程去读取,但是在时差里,这个key的read事件会被重复触发} else if(key.isWritable()){  //我之前没讲过写的事件!!!!!//写事件<--  send-queue  只要是空的,就一定会给你返回可以写的事件,就会回调我们的写方法//你真的要明白:什么时候写?不是依赖send-queue是不是有空间//1,你准备好要写什么了,这是第一步//2,第二步你才关心send-queue是否有空间//3,so,读 read 一开始就要注册,但是write依赖以上关系,什么时候用什么时候注册//4,如果一开始就注册了write的事件,进入死循环,一直调起!!!
//                            key.cancel();key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);writeHandler(key);}}}}} catch (IOException e) {e.printStackTrace();}}private void writeHandler(SelectionKey key) {new Thread(()->{System.out.println("write handler...");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.flip();while (buffer.hasRemaining()) {try {int write = client.write(buffer);System.out.println("write " + Thread.currentThread().getName()+ " " + write);} catch (IOException e) {e.printStackTrace();}}try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}buffer.clear();
//            key.cancel();//            try {client.shutdownOutput();
//client.close();
//
//            } catch (IOException e) {
//                e.printStackTrace();
//            }}).start();}public void acceptHandler(SelectionKey key) {try {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel client = ssc.accept();client.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(8192);client.register(selector, SelectionKey.OP_READ, buffer);System.out.println("-------------------------------------------");System.out.println("新客户端:" + client.getRemoteAddress());System.out.println("-------------------------------------------");} catch (IOException e) {e.printStackTrace();}}public void readHandler(SelectionKey key) {new Thread(()->{System.out.println("read handler.....");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.clear();int read = 0;try {while (true) {read = client.read(buffer);System.out.println("read " + Thread.currentThread().getName()+ " " + read);if (read > 0) {client.register(key.selector(), key.interestOps() + SelectionKey.OP_WRITE,buffer);} else if (read == 0) {break;} else {client.close();break;}}} catch (IOException e) {try {System.out.println("client " + client.getRemoteAddress() + " disconnected");client.close();} catch (IOException ex) {throw new RuntimeException(ex);}e.printStackTrace();}}).start();}public static void main(String[] args) {SocketMultiplexingSingleThreadv2_2 service = new SocketMultiplexingSingleThreadv2_2();service.start();}
}

测试:

先启动一个服务端,再启动一个客户端,客户端发送数据

服务端日志:

服务器启动了。。。。。
-------------------------------------------
新客户端:/127.0.0.1:21598
-------------------------------------------
in.....
in.....
read handler.....
in.....
read handler.....
read Thread-0 5
read Thread-1 0
read handler.....
read Thread-2 0
read Thread-0 0
write handler...

客户端日志:

client connected to server
1234
client receive data from consolejava.io.BufferedInputStream@6acfcaf3 : 1234

可以看到,客户端发送数据,没有接收到服务端返回的数据;

服务端接收到数据后,在写数据的时候,buffer中没有数据可写;

再仔细看下服务端的日志,可以同个客户端只发送一条数据的时候,有3个线程来处理,其他两个线程读到的数据都是0;


一个客户端的读事件,分配一个线程处理,但是线程还没处理完,下个读事件就来了,就又分配一个线程处理。。。而同一个客户端共享一个buffer,在register OP_READ的时候attach的。
这样使得buffer中的数据还没来得及写出去,就被其他读线程给冲掉了(read == 0);

tip:read事件来的时候,如果不读取数据,read事件会一直有的

解决方法:不可以并发读同一个client, 在处理一个Client的 OP_READ的时候先取消 OP_READ的注册,读完了后,在注册一个 OP_READ

新的服务端代码:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;public class SocketMultiplexingSingleThreadv2 {private ServerSocketChannel server = null;private Selector selector = null;   //linux 多路复用器(select poll epoll) nginx  event{}int port = 9090;public void initServer() {try {server = ServerSocketChannel.open();server.configureBlocking(false);server.bind(new InetSocketAddress(port));selector = Selector.open();  //  select  poll  *epollserver.register(selector, SelectionKey.OP_ACCEPT);} catch (IOException e) {e.printStackTrace();}}public void start() {initServer();System.out.println("服务器启动了。。。。。");try {while (true) {
//                Set<SelectionKey> keys = selector.keys();
//                System.out.println(keys.size()+"   size");while (selector.select(50) > 0) {Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iter = selectionKeys.iterator();while (iter.hasNext()) {SelectionKey key = iter.next();iter.remove();if (key.isAcceptable()) {acceptHandler(key);} else if (key.isReadable()) {
//                            key.cancel();  //现在多路复用器里把key  cancel了System.out.println("in.....");// 同一个Client,读之前先取消OP_READ,防止多线程冲突吹key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);readHandler(key);//还是阻塞的嘛? 即便以抛出了线程去读取,但是在时差里,这个key的read事件会被重复触发} else if(key.isWritable()){  //我之前没讲过写的事件!!!!!//写事件<--  send-queue  只要是空的,就一定会给你返回可以写的事件,就会回调我们的写方法//你真的要明白:什么时候写?不是依赖send-queue是不是有空间//1,你准备好要写什么了,这是第一步//2,第二步你才关心send-queue是否有空间//3,so,读 read 一开始就要注册,但是write依赖以上关系,什么时候用什么时候注册//4,如果一开始就注册了write的事件,进入死循环,一直调起!!!
//                            key.cancel();key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);writeHandler(key);}}}}} catch (IOException e) {e.printStackTrace();}}private void writeHandler(SelectionKey key) {new Thread(()->{System.out.println("write handler...");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.flip();while (buffer.hasRemaining()) {try {int write = client.write(buffer);System.out.println("write " + Thread.currentThread().getName()+ " " + write);} catch (IOException e) {e.printStackTrace();}}try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}buffer.clear();
//            key.cancel();//            try {client.shutdownOutput();
//client.close();
//
//            } catch (IOException e) {
//                e.printStackTrace();
//            }}).start();}public void acceptHandler(SelectionKey key) {try {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel client = ssc.accept();client.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(8192);client.register(selector, SelectionKey.OP_READ, buffer);System.out.println("-------------------------------------------");System.out.println("新客户端:" + client.getRemoteAddress());System.out.println("-------------------------------------------");} catch (IOException e) {e.printStackTrace();}}public void readHandler(SelectionKey key) {new Thread(()->{System.out.println("read handler.....");SocketChannel client = (SocketChannel) key.channel();ByteBuffer buffer = (ByteBuffer) key.attachment();buffer.clear();int read = 0;try {while (true) {read = client.read(buffer);System.out.println(Thread.currentThread().getName()+ " " + read);if (read > 0) {// 同一个Client,读完数据后,要再次注册OP_READ,读后面发送过来的数据key.interestOps(  SelectionKey.OP_READ);client.register(key.selector(), key.interestOps() + SelectionKey.OP_WRITE,buffer);} else if (read == 0) {break;} else {client.close();break;}}} catch (IOException e) {try {System.out.println("client " + client.getRemoteAddress() + " disconnected");client.close();} catch (IOException ex) {throw new RuntimeException(ex);}e.printStackTrace();}}).start();}public static void main(String[] args) {SocketMultiplexingSingleThreadv2 service = new SocketMultiplexingSingleThreadv2();service.start();}
}

测试:

先启动一个服务端,再启动一个客户端1,客户端1发送数据

服务端日志:

服务器启动了。。。。。
-------------------------------------------
新客户端:/127.0.0.1:24029
-------------------------------------------
in.....
read handler.....
Thread-0 8
Thread-0 0
write handler...
write Thread-1 8

客户端1日志:

client connected to server
client1
client receive data from consolejava.io.BufferedInputStream@65231a33 : client1client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:8: client1

可以看到,客户单和服务端都可以正常接收和发送数据。

再添加一个客户端2,发送数据

服务端日志:

服务器启动了。。。。。
-------------------------------------------
新客户端:/127.0.0.1:24029
-------------------------------------------
in.....
read handler.....
Thread-0 8
Thread-0 0
write handler...
write Thread-1 8
-------------------------------------------
新客户端:/127.0.0.1:24105
-------------------------------------------
in.....
read handler.....
Thread-2 8
Thread-2 0
write handler...
write Thread-3 8

客户端2的日志:

client connected to server
client2
client receive data from consolejava.io.BufferedInputStream@65231a33 : client2client receive data from serverjava.net.Socket$SocketInputStream@27f8302d data size:8: client2

可以看到,客户端2和服务端都可以正常接收和发送数据。

客户端1,再次发送数据

客户端日志:

client connected to server
client1
client receive data from consolejava.io.BufferedInputStream@65231a33 : client1client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:8: client1clent1_2
client receive data from consolejava.io.BufferedInputStream@65231a33 : clent1_2client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:9: clent1_2

服务端日志:

服务器启动了。。。。。
-------------------------------------------
新客户端:/127.0.0.1:24029
-------------------------------------------
in.....
read handler.....
Thread-0 8
Thread-0 0
write handler...
write Thread-1 8
-------------------------------------------
新客户端:/127.0.0.1:24105
-------------------------------------------
in.....
read handler.....
Thread-2 8
Thread-2 0
write handler...
write Thread-3 8
in.....
read handler.....
Thread-4 9
Thread-4 0
write handler...
write Thread-5 9

从服务端日志中,可以看到,每个客户端的读事件,只有一个线程处理。

整个处理流程是服务预期的。

这篇关于Java 网络编程之TCP(五):分析服务端注册OP_WRITE写数据的各种场景(三)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/934738

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定