本文主要是介绍Apache Mina 源码再读2 IoSession创建过程源代码剖析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
在调用bind()函数后,AcceptorOperationFuture 被注册到AbstractPollingIoAcceptor 类中的registerQueue 队列。在AbstractPollingIoAcceptor中存在IoProcessor 类。
/**
* An internal interface to represent an 'I/O processor' that performs
* actual I/O operations for {@link IoSession}s. It abstracts existing
* reactor frameworks such as Java NIO once again to simplify transport
* implementations.
*
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*
* @param <S> the type of the {@link IoSession} this processor can handle
*/
public interface IoProcessor<S extends IoSession> {}
IoProcessor 是执行IoSession 的I/O操作的内部接口类。 IoProcessor再次 抽象了在Java Nio 内部的reactor frameworks 框架,并简化传输层的实现。
IoProcessor 实现类为SimpleIoProcessorPool 类。SImpleIoProcessorPool主要把IoSession分配到一个或者多个IoProcessor中。
An IoProcessor
pool that distributes IoSession
s into one or more IoProcessor
s. Most current transport implementations use this pool internally to perform better in a multi-core environment, and therefore, you won't need to use this pool directly unless you are running multiple IoService
s in the same JVM.
If you are running multiple IoService
s, you could want to share the pool among all services. To do so, you can create a new SimpleIoProcessorPool
instance by yourself and provide the pool as a constructor parameter when you create the services.
This pool uses Java reflection API to create multiple IoProcessor
instances. It tries to instantiate the processor in the following order:
- A public constructor with one
ExecutorService
parameter. - A public constructor with one
Executor
parameter. - A public default constructor
// Create a shared pool.SimpleIoProcessorPool<NioSession> pool = new SimpleIoProcessorPool<NioSession>(NioProcessor.class, 16);
// Create two services that share the same pool.SocketAcceptor acceptor = new NioSocketAcceptor(pool);SocketConnector connector = new NioSocketConnector(pool)
在Acceptor 线程中,processHandles 会创建一个新的IoSession.
/*** This method will process new sessions for the Worker class. All* keys that have had their status updates as per the Selector.selectedKeys()* method will be processed here. Only keys that are ready to accept* connections are handled here.* <p/>* Session objects are created by making new instances of SocketSessionImpl* and passing the session object to the SocketIoProcessor class.*/@SuppressWarnings("unchecked")private void processHandles(Iterator<H> handles) throws Exception {while (handles.hasNext()) {H handle = handles.next();handles.remove();// Associates a new created connection to a processor,// and get back a sessionS session = accept(processor, handle);//当有一个新socket链接时,返回一个IoSessionif (session == null) {continue;}initSession(session, null, null);// add the session to the SocketIoProcessor//当Acceptor发现新的socket,把socket和ioproces相关联。所有io事件由ioprocess处理session.getProcessor().add(session);}}}
protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {SelectionKey key = null;if (handle != null) {key = handle.keyFor(selector);}if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {return null;}// accept the connection from the clientSocketChannel ch = handle.accept();if (ch == null) {return null;}return new NioSocketSession(this, processor, ch);}
public abstract class NioSession extends AbstractIoSession {
/** The NioSession processor */
protected final IoProcessor<NioSession> processor;
/** The communication channel */
protected final Channel channel;
/** The SelectionKey used for this session */
protected SelectionKey key;
/** The FilterChain created for this session */
private final IoFilterChain filterChain;
/**
*
* Creates a new instance of NioSession, with its associated IoProcessor.
* <br>
* This method is only called by the inherited class.
*
* @param processor The associated IoProcessor
*/
protected NioSession(IoProcessor<NioSession> processor, IoService service, Channel channel) {
super(service);
this.channel = channel;
this.processor = processor;
filterChain = new DefaultIoFilterChain(this);
}
}
在抽象类NioSession中,每创建一个NioSession就会创建一个DefaultFilterChain对象。也就意味着每一个NioSession与自己独立的DefaultFilterChain相关联。
NioSession创建后,会通过initSession 方法来初始化NioSession.
在初始化NioSession 之前,先看看一个IoSessionDataStructureFactory 工厂类。Provides data structures to a newly created session. 这个工厂类提供了创建NioSession的数据结构。NioSession中的数据结构主要包括存储自定义对象的IoSessionAttributeMap 类以及IoSession写入到缓冲区的写出队列WriteRequestQueue。
IoSessionDataStructFactory 接口源代码如下:
/*** Provides data structures to a newly created session.* * @author <a href="http://mina.apache.org">Apache MINA Project</a>*/
public interface IoSessionDataStructureFactory {/*** Returns an {@link IoSessionAttributeMap} which is going to be associated* with the specified <tt>session</tt>. Please note that the returned* implementation must be thread-safe.*/IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception;/*** Returns an {@link WriteRequest} which is going to be associated with* the specified <tt>session</tt>. Please note that the returned* implementation must be thread-safe and robust enough to deal* with various messages types (even what you didn't expect at all),* especially when you are going to implement a priority queue which* involves {@link Comparator}.*/WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception;
}
在DefaultIoSessionDataStructureFactory 默认实现工厂类中实现了IoSessionAttributeMap 和WriteRequestQueue两个数据结构的实现。
IoSessionAttributeMap内部为一个并发安全的HashMap可以存储键值对。
private static class DefaultIoSessionAttributeMap implements IoSessionAttributeMap {
private final ConcurrentHashMap<Object, Object> attributes = new ConcurrentHashMap<Object, Object>(4);
}
WriteRequestQueue为一个并发安全的队列实现。
private static class DefaultWriteRequestQueue implements WriteRequestQueue {
/** A queue to store incoming write requests */
private final Queue<WriteRequest> q = new ConcurrentLinkedQueue<WriteRequest>();
}
回头看一下,在initSession()初始化方法中,主要创建IoSessionAttributeMap和WriteRequestQueue队列两个结构。
初始化NioSession完成后,就需要把NioSession分配到指定的Processor线程中。
在分配SimpleIoProcessorPool中提供分配NioSession 与Processor线程相关联的策略。
private IoProcessor<S> getProcessor(S session) { //获取在IoSession中PROCESSOR 为键的对象IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);//如果该IoSession尚未与IoProcessor相关联,则把IoProccessor按照如下原则相关联if (processor == null) {if (disposed || disposing) {throw new IllegalStateException("A disposed processor cannot be accessed.");}//把processor和session相关联。根据session id和proceor pool 的大小来决定。 把IoSession的ID与线程池中顺序相关联processor = pool[Math.abs((int) session.getId()) % pool.length];if (processor == null) {throw new IllegalStateException("A disposed processor cannot be accessed.");}//设置PROCESSOR 的对象为IoProccessorsession.setAttributeIfAbsent(PROCESSOR, processor);}return processor;}
然后把NioSession 方法指定Processor线程的newSessions队列中,并启动Processor线程。
同时在内部存在IoServiceStatistics 类来统计IoSession相关事件。
此时,IoSession创建过程就完成了。
小结一下,NioSession创建过程。
1、Acceptor 线程,在select()中,轮询accept事件。
2、当accept事件触发时,创建NioSession对象,每一个NioSession都有一个DefaultFilterChain相关联。
3、当创建NioSession后,需要通过initSession()方法来初始化化NioSession.这里主要涉及IoSessionDataStructFactory工厂类创建IoSessionAttrubteMap 来存储用户自定义数据,创建WriteRequestQueue队列。
4、当NioSession构建完成后,需要在SimpleIoProcessorPool来把NioSession 分配到指定的Processor线程中。
5、NioSession被添加到指定Processor线程中newSession队列中,然后启动start Processor线程。
这篇关于Apache Mina 源码再读2 IoSession创建过程源代码剖析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!