Apache mina 源码再读3 I/O Service 源码剖析

2024-02-18 06:58

本文主要是介绍Apache mina 源码再读3 I/O Service 源码剖析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!


Base interface for all  IoAcceptor  and  IoConnector  that provide I/O service and manage   IoSession.

public interface IoService { }


IoAcceptor 和IoConnector  的基础接口IoService 来提供I/O服务和管理IoSession .




public interface IoService {


/**
     * Adds an {@link IoServiceListener} that listens any events related with
     * this service.
     */
    void addListener(IoServiceListener listener);


    /**
     * Removed an existing {@link IoServiceListener} that listens any events
     * related with this service.
     */
    void removeListener(IoServiceListener listener);


 /**
     * Returns the handler which will handle all connections managed by this service.
     */
    IoHandler getHandler();


    /**
     * Sets the handler which will handle all connections managed by this service.
     */
    void setHandler(IoHandler handler);


    /**
     * Returns the map of all sessions which are currently managed by this
     * service.  The key of map is the {@link IoSession#getId() ID} of the
     * session.
     *
     * @return the sessions. An empty collection if there's no session.
     */
    Map<Long, IoSession> getManagedSessions();


    /**
     * Returns the number of all sessions which are currently managed by this
     * service.
     */
    int getManagedSessionCount();


    /**
     * Returns the default configuration of the new {@link IoSession}s
     * created by this service.
     */
    IoSessionConfig getSessionConfig();


    /**
     * Returns the {@link IoFilterChainBuilder} which will build the
     * {@link IoFilterChain} of all {@link IoSession}s which is created
     * by this service.
     * The default value is an empty {@link DefaultIoFilterChainBuilder}.
     */
    IoFilterChainBuilder getFilterChainBuilder();


    /**
     * Sets the {@link IoFilterChainBuilder} which will build the
     * {@link IoFilterChain} of all {@link IoSession}s which is created
     * by this service.
     * If you specify <tt>null</tt> this property will be set to
     * an empty {@link DefaultIoFilterChainBuilder}.
     */
    void setFilterChainBuilder(IoFilterChainBuilder builder);


    /**
     * A shortcut for <tt>( ( DefaultIoFilterChainBuilder ) </tt>{@link #getFilterChainBuilder()}<tt> )</tt>.
     * Please note that the returned object is not a <b>real</b> {@link IoFilterChain}
     * but a {@link DefaultIoFilterChainBuilder}.  Modifying the returned builder
     * won't affect the existing {@link IoSession}s at all, because
     * {@link IoFilterChainBuilder}s affect only newly created {@link IoSession}s.
     *
     * @throws IllegalStateException if the current {@link IoFilterChainBuilder} is
     *                               not a {@link DefaultIoFilterChainBuilder}
     */
    DefaultIoFilterChainBuilder getFilterChain();


    /**
     * Returns a value of whether or not this service is active
     *
     * @return whether of not the service is active.
     */
    boolean isActive();


    /**
     * Returns the time when this service was activated.  It returns the last
     * time when this service was activated if the service is not active now.
     *
     * @return The time by using {@link System#currentTimeMillis()}
     */
    long getActivationTime();


    /**
     * Writes the specified {@code message} to all the {@link IoSession}s
     * managed by this service.  This method is a convenience shortcut for
     * {@link IoUtil#broadcast(Object, Collection)}.
     */
    Set<WriteFuture> broadcast(Object message);


    /**
     * Returns the {@link IoSessionDataStructureFactory} that provides
     * related data structures for a new session created by this service.
     */
    IoSessionDataStructureFactory getSessionDataStructureFactory();


    /**
     * Sets the {@link IoSessionDataStructureFactory} that provides
     * related data structures for a new session created by this service.
     */
    void setSessionDataStructureFactory(IoSessionDataStructureFactory sessionDataStructureFactory);


    /**
     * Returns the number of bytes scheduled to be written
     *
     * @return The number of bytes scheduled to be written
     */
    int getScheduledWriteBytes();


    /**
     * Returns the number of messages scheduled to be written
     *
     * @return The number of messages scheduled to be written
     */
    int getScheduledWriteMessages();


    /**
     * Returns the IoServiceStatistics object for this service.
     * 
     * @return The statistics object for this service.
     */
    IoServiceStatistics getStatistics();


}


An instance of IoService contains an Executor which will handle the incoming  events

一个包含Executor 接口的IoService 抽象类,来处理incoming events.

public abstract class AbstractIoService implements IoService { }



public abstract class AbstractIoService implements IoService {

    /**
     * The unique number identifying the Service. It's incremented
     * for each new IoService created.
     */
    private static final AtomicInteger id = new AtomicInteger();


    /**
     * The thread name built from the IoService inherited
     * instance class name and the IoService Id
     **/
    private final String threadName;


    /**
     * The associated executor, responsible for handling execution of I/O events.
     */


    private final Executor executor;
    /**
     * A flag used to indicate that the local executor has been created
     * inside this instance, and not passed by a caller.
     * 
     * If the executor is locally created, then it will be an instance
     * of the ThreadPoolExecutor class.
     */
    private final boolean createdExecutor;


    /**
     * The IoHandler in charge of managing all the I/O Events. It is
     */
    private IoHandler handler;


    /**
     * The default {@link IoSessionConfig} which will be used to configure new sessions.
     */
    protected final IoSessionConfig sessionConfig;


  /**
     * Current filter chain builder.
     */
    private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();


    private IoSessionDataStructureFactory sessionDataStructureFactory = new DefaultIoSessionDataStructureFactory();


    /**
     * Maintains the {@link IoServiceListener}s of this service.
     */
    private final IoServiceListenerSupport listeners;


    /**
     * A lock object which must be acquired when related resources are
     * destroyed.
     */
    protected final Object disposalLock = new Object();


    private volatile boolean disposing;


    private volatile boolean disposed;


    /**
     * {@inheritDoc}
     */
    private IoServiceStatistics stats = new IoServiceStatistics(this);

}

A helper class which provides addition and removal of  IoServiceListener and firing

在AbstractIoService 中涉及到IoServiceListenerSupport 类,来关联IoServiceListener 监听器和fire事件。IoServiceStatistics 来统计与IoSession相关数据。


public interface IoServiceListener extends EventListener {/*** Invoked when a new service is activated by an {@link IoService}.** @param service the {@link IoService}*/void serviceActivated(IoService service) throws Exception;/*** Invoked when a service is idle.*/void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception;/*** Invoked when a service is deactivated by an {@link IoService}.** @param service the {@link IoService}*/void serviceDeactivated(IoService service) throws Exception;/*** Invoked when a new session is created by an {@link IoService}.** @param session the new session*/void sessionCreated(IoSession session) throws Exception;/*** Invoked when a new session is closed by an {@link IoService}.* * @param session*            the new session*/void sessionClosed(IoSession session) throws Exception;/*** Invoked when a session is being destroyed by an {@link IoService}.* * @param session*            the session to be destroyed*/void sessionDestroyed(IoSession session) throws Exception;
}

public class IoServiceListenerSupport {/** The {@link IoService} that this instance manages. */private final IoService service;/** A list of {@link IoServiceListener}s. */private final List<IoServiceListener> listeners = new CopyOnWriteArrayList<IoServiceListener>();/** Tracks managed sessions. */private final ConcurrentMap<Long, IoSession> managedSessions = new ConcurrentHashMap<Long, IoSession>();/**  Read only version of {@link #managedSessions}. */private final Map<Long, IoSession> readOnlyManagedSessions = Collections.unmodifiableMap(managedSessions);private final AtomicBoolean activated = new AtomicBoolean();/** Time this listenerSupport has been activated */private volatile long activationTime;/** A counter used to store the maximum sessions we managed since the listenerSupport has been activated */private volatile int largestManagedSessionCount = 0;/** A global counter to count the number of sessions managed since the start */private volatile long cumulativeManagedSessionCount = 0;}


Processor 处理newSession 相关逻辑。


  /*** Loops over the new sessions blocking queue and returns the number of* sessions which are effectively created** @return The number of new sessions*/private int handleNewSessions() {int addedSessions = 0;//Acceptor 线程把新链接通过并发队列放入到IoProccessor线程中。IoProccessor线程优先新链接的socketfor (S session = newSessions.poll(); session != null; session = newSessions.poll()) {if (addNow(session)) {// A new session has been createdaddedSessions++;}}return addedSessions;}

Processor 把newSession队列中的NioSession移除掉。然后IoServiceListenerSupport 执行fireSessionCreated()事件。


  /*** Process a new session :* - initialize it* - create its chain* - fire the CREATED listeners if any** @param session The session to create* @return true if the session has been registered*/private boolean addNow(S session) {boolean registered = false;//处理一个新socket的流程,初始化IoSession,创建一个handler chain 和创建监听器try {init(session);registered = true;// Build the filter chain of this session.IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();chainBuilder.buildFilterChain(session.getFilterChain());// DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here// in AbstractIoFilterChain.fireSessionOpened().// Propagate the SESSION_CREATED event up to the chainIoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();listeners.fireSessionCreated(session);} catch (Exception e) {ExceptionMonitor.getInstance().exceptionCaught(e);try {destroy(session);} catch (Exception e1) {ExceptionMonitor.getInstance().exceptionCaught(e1);} finally {registered = false;}}return registered;}

  /*** Calls {@link IoServiceListener#sessionCreated(IoSession)} for all registered listeners.* * @param session The session which has been created*/public void fireSessionCreated(IoSession session) {boolean firstSession = false;if (session.getService() instanceof IoConnector) {synchronized (managedSessions) {firstSession = managedSessions.isEmpty();}}// If already registered, ignore.if (managedSessions.putIfAbsent(session.getId(), session) != null) {return;}// If the first connector session, fire a virtual service activation event.if (firstSession) {fireServiceActivated();}// Fire session events.IoFilterChain filterChain = session.getFilterChain();filterChain.fireSessionCreated();filterChain.fireSessionOpened();int managedSessionCount = managedSessions.size();if (managedSessionCount > largestManagedSessionCount) {largestManagedSessionCount = managedSessionCount;}cumulativeManagedSessionCount++;// Fire listener events.for (IoServiceListener l : listeners) {try {l.sessionCreated(session);} catch (Exception e) {ExceptionMonitor.getInstance().exceptionCaught(e);}}}

Processor 线程把NioSession 放到IoServiceListenerSupport 的managedSessions 队列中。在IoServiceListenerSupport 来追踪IoSession.


    public void fireSessionCreated(IoSession session) {boolean firstSession = false;if (session.getService() instanceof IoConnector) {synchronized (managedSessions) {firstSession = managedSessions.isEmpty();}}// If already registered, ignore.if (managedSessions.putIfAbsent(session.getId(), session) != null) {return;}// If the first connector session, fire a virtual service activation event.if (firstSession) {fireServiceActivated();}// Fire session events.IoFilterChain filterChain = session.getFilterChain();filterChain.fireSessionCreated();filterChain.fireSessionOpened();int managedSessionCount = managedSessions.size();if (managedSessionCount > largestManagedSessionCount) {largestManagedSessionCount = managedSessionCount;}cumulativeManagedSessionCount++;// Fire listener events.for (IoServiceListener l : listeners) {try {l.sessionCreated(session);} catch (Exception e) {ExceptionMonitor.getInstance().exceptionCaught(e);}}}


在IoServiceListenerSupport 分别触发了fireSessionCreated(),fireSessionOpened()事件。



这篇关于Apache mina 源码再读3 I/O Service 源码剖析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/720392

相关文章

使用TomCat,service输出台出现乱码的解决

《使用TomCat,service输出台出现乱码的解决》本文介绍了解决Tomcat服务输出台中文乱码问题的两种方法,第一种方法是修改`logging.properties`文件中的`prefix`和`... 目录使用TomCat,service输出台出现乱码问题1解决方案问题2解决方案总结使用TomCat,

Go中sync.Once源码的深度讲解

《Go中sync.Once源码的深度讲解》sync.Once是Go语言标准库中的一个同步原语,用于确保某个操作只执行一次,本文将从源码出发为大家详细介绍一下sync.Once的具体使用,x希望对大家有... 目录概念简单示例源码解读总结概念sync.Once是Go语言标准库中的一个同步原语,用于确保某个操

SpringBoot使用Apache Tika检测敏感信息

《SpringBoot使用ApacheTika检测敏感信息》ApacheTika是一个功能强大的内容分析工具,它能够从多种文件格式中提取文本、元数据以及其他结构化信息,下面我们来看看如何使用Ap... 目录Tika 主要特性1. 多格式支持2. 自动文件类型检测3. 文本和元数据提取4. 支持 OCR(光学

解决systemctl reload nginx重启Nginx服务报错:Job for nginx.service invalid问题

《解决systemctlreloadnginx重启Nginx服务报错:Jobfornginx.serviceinvalid问题》文章描述了通过`systemctlstatusnginx.se... 目录systemctl reload nginx重启Nginx服务报错:Job for nginx.javas

Apache Tomcat服务器版本号隐藏的几种方法

《ApacheTomcat服务器版本号隐藏的几种方法》本文主要介绍了ApacheTomcat服务器版本号隐藏的几种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需... 目录1. 隐藏HTTP响应头中的Server信息编辑 server.XML 文件2. 修China编程改错误

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

Node.js 中 http 模块的深度剖析与实战应用小结

《Node.js中http模块的深度剖析与实战应用小结》本文详细介绍了Node.js中的http模块,从创建HTTP服务器、处理请求与响应,到获取请求参数,每个环节都通过代码示例进行解析,旨在帮... 目录Node.js 中 http 模块的深度剖析与实战应用一、引言二、创建 HTTP 服务器:基石搭建(一

SpringBoot使用Apache POI库读取Excel文件的操作详解

《SpringBoot使用ApachePOI库读取Excel文件的操作详解》在日常开发中,我们经常需要处理Excel文件中的数据,无论是从数据库导入数据、处理数据报表,还是批量生成数据,都可能会遇到... 目录项目背景依赖导入读取Excel模板的实现代码实现代码解析ExcelDemoInfoDTO 数据传输

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL