Apache Mina 源码再读2 IoSession创建过程源代码剖析

2024-02-18 06:58

本文主要是介绍Apache Mina 源码再读2 IoSession创建过程源代码剖析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!



在调用bind()函数后,AcceptorOperationFuture 被注册到AbstractPollingIoAcceptor 类中的registerQueue 队列。在AbstractPollingIoAcceptor中存在IoProcessor 类。


/**
 * An internal interface to represent an 'I/O processor' that performs
 * actual I/O operations for {@link IoSession}s.  It abstracts existing
 * reactor frameworks such as Java NIO once again to simplify transport
 * implementations.
 *
 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
 * 
 * @param <S> the type of the {@link IoSession} this processor can handle
 */
public interface IoProcessor<S extends IoSession> {}


IoProcessor 是执行IoSession 的I/O操作的内部接口类。  IoProcessor再次 抽象了在Java Nio 内部的reactor  frameworks 框架,并简化传输层的实现。


IoProcessor 实现类为SimpleIoProcessorPool 类。SImpleIoProcessorPool主要把IoSession分配到一个或者多个IoProcessor中。


An IoProcessor pool that distributes IoSessions into one or more IoProcessors. Most current transport implementations use this pool internally to perform better in a multi-core environment, and therefore, you won't need to use this pool directly unless you are running multiple IoServices in the same JVM.

If you are running multiple IoServices, you could want to share the pool among all services. To do so, you can create a new SimpleIoProcessorPool instance by yourself and provide the pool as a constructor parameter when you create the services.

This pool uses Java reflection API to create multiple IoProcessor instances. It tries to instantiate the processor in the following order:

  1. A public constructor with one ExecutorService parameter.
  2. A public constructor with one Executor parameter.
  3. A public default constructor
The following is an example for the NIO socket transport:
// Create a shared pool.SimpleIoProcessorPool<NioSession> pool = new SimpleIoProcessorPool<NioSession>(NioProcessor.class, 16);

 // Create two services that share the same pool.SocketAcceptor acceptor = new NioSocketAcceptor(pool);SocketConnector connector = new NioSocketConnector(pool)



在Acceptor 线程中,processHandles 会创建一个新的IoSession.


  /*** This method will process new sessions for the Worker class.  All* keys that have had their status updates as per the Selector.selectedKeys()* method will be processed here.  Only keys that are ready to accept* connections are handled here.* <p/>* Session objects are created by making new instances of SocketSessionImpl* and passing the session object to the SocketIoProcessor class.*/@SuppressWarnings("unchecked")private void processHandles(Iterator<H> handles) throws Exception {while (handles.hasNext()) {H handle = handles.next();handles.remove();// Associates a new created connection to a processor,// and get back a sessionS session = accept(processor, handle);//当有一个新socket链接时,返回一个IoSessionif (session == null) {continue;}initSession(session, null, null);// add the session to the SocketIoProcessor//当Acceptor发现新的socket,把socket和ioproces相关联。所有io事件由ioprocess处理session.getProcessor().add(session);}}}

  protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {SelectionKey key = null;if (handle != null) {key = handle.keyFor(selector);}if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {return null;}// accept the connection from the clientSocketChannel ch = handle.accept();if (ch == null) {return null;}return new NioSocketSession(this, processor, ch);}


当accept事件触发时,会创建一个新NioSocketSession .

public abstract class NioSession extends AbstractIoSession {
    /** The NioSession processor */
    protected final IoProcessor<NioSession> processor;


    /** The communication channel */
    protected final Channel channel;


    /** The SelectionKey used for this session */
    protected SelectionKey key;


    /** The FilterChain created for this session */
    private final IoFilterChain filterChain;


    /**
     * 
     * Creates a new instance of NioSession, with its associated IoProcessor.
     * <br>
     * This method is only called by the inherited class.
     *
     * @param processor The associated IoProcessor
     */
    protected NioSession(IoProcessor<NioSession> processor, IoService service, Channel channel) {
        super(service);
        this.channel = channel;
        this.processor = processor;
        filterChain = new DefaultIoFilterChain(this);
    }

}

在抽象类NioSession中,每创建一个NioSession就会创建一个DefaultFilterChain对象。也就意味着每一个NioSession与自己独立的DefaultFilterChain相关联。


NioSession创建后,会通过initSession 方法来初始化NioSession. 


在初始化NioSession 之前,先看看一个IoSessionDataStructureFactory 工厂类。Provides data structures to a newly created session. 这个工厂类提供了创建NioSession的数据结构。NioSession中的数据结构主要包括存储自定义对象的IoSessionAttributeMap 类以及IoSession写入到缓冲区的写出队列WriteRequestQueue。


IoSessionDataStructFactory 接口源代码如下:

/*** Provides data structures to a newly created session.* * @author <a href="http://mina.apache.org">Apache MINA Project</a>*/
public interface IoSessionDataStructureFactory {/*** Returns an {@link IoSessionAttributeMap} which is going to be associated* with the specified <tt>session</tt>.  Please note that the returned* implementation must be thread-safe.*/IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception;/*** Returns an {@link WriteRequest} which is going to be associated with* the specified <tt>session</tt>.  Please note that the returned* implementation must be thread-safe and robust enough to deal* with various messages types (even what you didn't expect at all),* especially when you are going to implement a priority queue which* involves {@link Comparator}.*/WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception;
}


在DefaultIoSessionDataStructureFactory 默认实现工厂类中实现了IoSessionAttributeMap 和WriteRequestQueue两个数据结构的实现。



IoSessionAttributeMap内部为一个并发安全的HashMap可以存储键值对。

 private static class DefaultIoSessionAttributeMap implements IoSessionAttributeMap {
        private final ConcurrentHashMap<Object, Object> attributes = new ConcurrentHashMap<Object, Object>(4);

}



WriteRequestQueue为一个并发安全的队列实现。

    private static class DefaultWriteRequestQueue implements WriteRequestQueue {
        /** A queue to store incoming write requests */
        private final Queue<WriteRequest> q = new ConcurrentLinkedQueue<WriteRequest>();

}


回头看一下,在initSession()初始化方法中,主要创建IoSessionAttributeMap和WriteRequestQueue队列两个结构。



初始化NioSession完成后,就需要把NioSession分配到指定的Processor线程中。


在分配SimpleIoProcessorPool中提供分配NioSession 与Processor线程相关联的策略。


    private IoProcessor<S> getProcessor(S session) {   //获取在IoSession中PROCESSOR 为键的对象IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);//如果该IoSession尚未与IoProcessor相关联,则把IoProccessor按照如下原则相关联if (processor == null) {if (disposed || disposing) {throw new IllegalStateException("A disposed processor cannot be accessed.");}//把processor和session相关联。根据session id和proceor pool 的大小来决定。 把IoSession的ID与线程池中顺序相关联processor = pool[Math.abs((int) session.getId()) % pool.length];if (processor == null) {throw new IllegalStateException("A disposed processor cannot be accessed.");}//设置PROCESSOR 的对象为IoProccessorsession.setAttributeIfAbsent(PROCESSOR, processor);}return processor;}

然后把NioSession 方法指定Processor线程的newSessions队列中,并启动Processor线程。


同时在内部存在IoServiceStatistics 类来统计IoSession相关事件。



此时,IoSession创建过程就完成了。


小结一下,NioSession创建过程。


1、Acceptor 线程,在select()中,轮询accept事件。

2、当accept事件触发时,创建NioSession对象,每一个NioSession都有一个DefaultFilterChain相关联。

3、当创建NioSession后,需要通过initSession()方法来初始化化NioSession.这里主要涉及IoSessionDataStructFactory工厂类创建IoSessionAttrubteMap 来存储用户自定义数据,创建WriteRequestQueue队列。 

4、当NioSession构建完成后,需要在SimpleIoProcessorPool来把NioSession 分配到指定的Processor线程中。

5、NioSession被添加到指定Processor线程中newSession队列中,然后启动start  Processor线程。






这篇关于Apache Mina 源码再读2 IoSession创建过程源代码剖析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/720391

相关文章

Java 正则表达式URL 匹配与源码全解析

《Java正则表达式URL匹配与源码全解析》在Web应用开发中,我们经常需要对URL进行格式验证,今天我们结合Java的Pattern和Matcher类,深入理解正则表达式在实际应用中... 目录1.正则表达式分解:2. 添加域名匹配 (2)3. 添加路径和查询参数匹配 (3) 4. 最终优化版本5.设计思

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

PyInstaller打包selenium-wire过程中常见问题和解决指南

《PyInstaller打包selenium-wire过程中常见问题和解决指南》常用的打包工具PyInstaller能将Python项目打包成单个可执行文件,但也会因为兼容性问题和路径管理而出现各种运... 目录前言1. 背景2. 可能遇到的问题概述3. PyInstaller 打包步骤及参数配置4. 依赖

Java调用C++动态库超详细步骤讲解(附源码)

《Java调用C++动态库超详细步骤讲解(附源码)》C语言因其高效和接近硬件的特性,时常会被用在性能要求较高或者需要直接操作硬件的场合,:本文主要介绍Java调用C++动态库的相关资料,文中通过代... 目录一、直接调用C++库第一步:动态库生成(vs2017+qt5.12.10)第二步:Java调用C++

将Mybatis升级为Mybatis-Plus的详细过程

《将Mybatis升级为Mybatis-Plus的详细过程》本文详细介绍了在若依管理系统(v3.8.8)中将MyBatis升级为MyBatis-Plus的过程,旨在提升开发效率,通过本文,开发者可实现... 目录说明流程增加依赖修改配置文件注释掉MyBATisConfig里面的Bean代码生成使用IDEA生

C# WinForms存储过程操作数据库的实例讲解

《C#WinForms存储过程操作数据库的实例讲解》:本文主要介绍C#WinForms存储过程操作数据库的实例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、存储过程基础二、C# 调用流程1. 数据库连接配置2. 执行存储过程(增删改)3. 查询数据三、事务处

JSON Web Token在登陆中的使用过程

《JSONWebToken在登陆中的使用过程》:本文主要介绍JSONWebToken在登陆中的使用过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录JWT 介绍微服务架构中的 JWT 使用结合微服务网关的 JWT 验证1. 用户登录,生成 JWT2. 自定义过滤

Python实现无痛修改第三方库源码的方法详解

《Python实现无痛修改第三方库源码的方法详解》很多时候,我们下载的第三方库是不会有需求不满足的情况,但也有极少的情况,第三方库没有兼顾到需求,本文将介绍几个修改源码的操作,大家可以根据需求进行选择... 目录需求不符合模拟示例 1. 修改源文件2. 继承修改3. 猴子补丁4. 追踪局部变量需求不符合很

java中使用POI生成Excel并导出过程

《java中使用POI生成Excel并导出过程》:本文主要介绍java中使用POI生成Excel并导出过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录需求说明及实现方式需求完成通用代码版本1版本2结果展示type参数为atype参数为b总结注:本文章中代码均为

idea中创建新类时自动添加注释的实现

《idea中创建新类时自动添加注释的实现》在每次使用idea创建一个新类时,过了一段时间发现看不懂这个类是用来干嘛的,为了解决这个问题,我们可以设置在创建一个新类时自动添加注释,帮助我们理解这个类的用... 目录前言:详细操作:步骤一:点击上方的 文件(File),点击&nbmyHIgsp;设置(Setti