本文主要是介绍Apache mina 源码再读3 I/O Service 源码剖析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Base interface for all IoAcceptor and IoConnector that provide I/O service and manage IoSession.
public interface IoService { }
IoAcceptor 和IoConnector 的基础接口IoService 来提供I/O服务和管理IoSession .
public interface IoService {
/**
* Adds an {@link IoServiceListener} that listens any events related with
* this service.
*/
void addListener(IoServiceListener listener);
/**
* Removed an existing {@link IoServiceListener} that listens any events
* related with this service.
*/
void removeListener(IoServiceListener listener);
/**
* Returns the handler which will handle all connections managed by this service.
*/
IoHandler getHandler();
/**
* Sets the handler which will handle all connections managed by this service.
*/
void setHandler(IoHandler handler);
/**
* Returns the map of all sessions which are currently managed by this
* service. The key of map is the {@link IoSession#getId() ID} of the
* session.
*
* @return the sessions. An empty collection if there's no session.
*/
Map<Long, IoSession> getManagedSessions();
/**
* Returns the number of all sessions which are currently managed by this
* service.
*/
int getManagedSessionCount();
/**
* Returns the default configuration of the new {@link IoSession}s
* created by this service.
*/
IoSessionConfig getSessionConfig();
/**
* Returns the {@link IoFilterChainBuilder} which will build the
* {@link IoFilterChain} of all {@link IoSession}s which is created
* by this service.
* The default value is an empty {@link DefaultIoFilterChainBuilder}.
*/
IoFilterChainBuilder getFilterChainBuilder();
/**
* Sets the {@link IoFilterChainBuilder} which will build the
* {@link IoFilterChain} of all {@link IoSession}s which is created
* by this service.
* If you specify <tt>null</tt> this property will be set to
* an empty {@link DefaultIoFilterChainBuilder}.
*/
void setFilterChainBuilder(IoFilterChainBuilder builder);
/**
* A shortcut for <tt>( ( DefaultIoFilterChainBuilder ) </tt>{@link #getFilterChainBuilder()}<tt> )</tt>.
* Please note that the returned object is not a <b>real</b> {@link IoFilterChain}
* but a {@link DefaultIoFilterChainBuilder}. Modifying the returned builder
* won't affect the existing {@link IoSession}s at all, because
* {@link IoFilterChainBuilder}s affect only newly created {@link IoSession}s.
*
* @throws IllegalStateException if the current {@link IoFilterChainBuilder} is
* not a {@link DefaultIoFilterChainBuilder}
*/
DefaultIoFilterChainBuilder getFilterChain();
/**
* Returns a value of whether or not this service is active
*
* @return whether of not the service is active.
*/
boolean isActive();
/**
* Returns the time when this service was activated. It returns the last
* time when this service was activated if the service is not active now.
*
* @return The time by using {@link System#currentTimeMillis()}
*/
long getActivationTime();
/**
* Writes the specified {@code message} to all the {@link IoSession}s
* managed by this service. This method is a convenience shortcut for
* {@link IoUtil#broadcast(Object, Collection)}.
*/
Set<WriteFuture> broadcast(Object message);
/**
* Returns the {@link IoSessionDataStructureFactory} that provides
* related data structures for a new session created by this service.
*/
IoSessionDataStructureFactory getSessionDataStructureFactory();
/**
* Sets the {@link IoSessionDataStructureFactory} that provides
* related data structures for a new session created by this service.
*/
void setSessionDataStructureFactory(IoSessionDataStructureFactory sessionDataStructureFactory);
/**
* Returns the number of bytes scheduled to be written
*
* @return The number of bytes scheduled to be written
*/
int getScheduledWriteBytes();
/**
* Returns the number of messages scheduled to be written
*
* @return The number of messages scheduled to be written
*/
int getScheduledWriteMessages();
/**
* Returns the IoServiceStatistics object for this service.
*
* @return The statistics object for this service.
*/
IoServiceStatistics getStatistics();
}
An instance of IoService contains an Executor which will handle the incoming events
一个包含Executor 接口的IoService 抽象类,来处理incoming events.
public abstract class AbstractIoService implements IoService { }
public abstract class AbstractIoService implements IoService {
/**
* The unique number identifying the Service. It's incremented
* for each new IoService created.
*/
private static final AtomicInteger id = new AtomicInteger();
/**
* The thread name built from the IoService inherited
* instance class name and the IoService Id
**/
private final String threadName;
/**
* The associated executor, responsible for handling execution of I/O events.
*/
private final Executor executor;
/**
* A flag used to indicate that the local executor has been created
* inside this instance, and not passed by a caller.
*
* If the executor is locally created, then it will be an instance
* of the ThreadPoolExecutor class.
*/
private final boolean createdExecutor;
/**
* The IoHandler in charge of managing all the I/O Events. It is
*/
private IoHandler handler;
/**
* The default {@link IoSessionConfig} which will be used to configure new sessions.
*/
protected final IoSessionConfig sessionConfig;
/**
* Current filter chain builder.
*/
private IoFilterChainBuilder filterChainBuilder = new DefaultIoFilterChainBuilder();
private IoSessionDataStructureFactory sessionDataStructureFactory = new DefaultIoSessionDataStructureFactory();
/**
* Maintains the {@link IoServiceListener}s of this service.
*/
private final IoServiceListenerSupport listeners;
/**
* A lock object which must be acquired when related resources are
* destroyed.
*/
protected final Object disposalLock = new Object();
private volatile boolean disposing;
private volatile boolean disposed;
/**
* {@inheritDoc}
*/
private IoServiceStatistics stats = new IoServiceStatistics(this);
}
A helper class which provides addition and removal of IoServiceListener and firing
在AbstractIoService 中涉及到IoServiceListenerSupport 类,来关联IoServiceListener 监听器和fire事件。IoServiceStatistics 来统计与IoSession相关数据。
public interface IoServiceListener extends EventListener {/*** Invoked when a new service is activated by an {@link IoService}.** @param service the {@link IoService}*/void serviceActivated(IoService service) throws Exception;/*** Invoked when a service is idle.*/void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception;/*** Invoked when a service is deactivated by an {@link IoService}.** @param service the {@link IoService}*/void serviceDeactivated(IoService service) throws Exception;/*** Invoked when a new session is created by an {@link IoService}.** @param session the new session*/void sessionCreated(IoSession session) throws Exception;/*** Invoked when a new session is closed by an {@link IoService}.* * @param session* the new session*/void sessionClosed(IoSession session) throws Exception;/*** Invoked when a session is being destroyed by an {@link IoService}.* * @param session* the session to be destroyed*/void sessionDestroyed(IoSession session) throws Exception;
}
public class IoServiceListenerSupport {/** The {@link IoService} that this instance manages. */private final IoService service;/** A list of {@link IoServiceListener}s. */private final List<IoServiceListener> listeners = new CopyOnWriteArrayList<IoServiceListener>();/** Tracks managed sessions. */private final ConcurrentMap<Long, IoSession> managedSessions = new ConcurrentHashMap<Long, IoSession>();/** Read only version of {@link #managedSessions}. */private final Map<Long, IoSession> readOnlyManagedSessions = Collections.unmodifiableMap(managedSessions);private final AtomicBoolean activated = new AtomicBoolean();/** Time this listenerSupport has been activated */private volatile long activationTime;/** A counter used to store the maximum sessions we managed since the listenerSupport has been activated */private volatile int largestManagedSessionCount = 0;/** A global counter to count the number of sessions managed since the start */private volatile long cumulativeManagedSessionCount = 0;}
/*** Loops over the new sessions blocking queue and returns the number of* sessions which are effectively created** @return The number of new sessions*/private int handleNewSessions() {int addedSessions = 0;//Acceptor 线程把新链接通过并发队列放入到IoProccessor线程中。IoProccessor线程优先新链接的socketfor (S session = newSessions.poll(); session != null; session = newSessions.poll()) {if (addNow(session)) {// A new session has been createdaddedSessions++;}}return addedSessions;}
Processor 把newSession队列中的NioSession移除掉。然后IoServiceListenerSupport 执行fireSessionCreated()事件。
/*** Process a new session :* - initialize it* - create its chain* - fire the CREATED listeners if any** @param session The session to create* @return true if the session has been registered*/private boolean addNow(S session) {boolean registered = false;//处理一个新socket的流程,初始化IoSession,创建一个handler chain 和创建监听器try {init(session);registered = true;// Build the filter chain of this session.IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();chainBuilder.buildFilterChain(session.getFilterChain());// DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here// in AbstractIoFilterChain.fireSessionOpened().// Propagate the SESSION_CREATED event up to the chainIoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();listeners.fireSessionCreated(session);} catch (Exception e) {ExceptionMonitor.getInstance().exceptionCaught(e);try {destroy(session);} catch (Exception e1) {ExceptionMonitor.getInstance().exceptionCaught(e1);} finally {registered = false;}}return registered;}
/*** Calls {@link IoServiceListener#sessionCreated(IoSession)} for all registered listeners.* * @param session The session which has been created*/public void fireSessionCreated(IoSession session) {boolean firstSession = false;if (session.getService() instanceof IoConnector) {synchronized (managedSessions) {firstSession = managedSessions.isEmpty();}}// If already registered, ignore.if (managedSessions.putIfAbsent(session.getId(), session) != null) {return;}// If the first connector session, fire a virtual service activation event.if (firstSession) {fireServiceActivated();}// Fire session events.IoFilterChain filterChain = session.getFilterChain();filterChain.fireSessionCreated();filterChain.fireSessionOpened();int managedSessionCount = managedSessions.size();if (managedSessionCount > largestManagedSessionCount) {largestManagedSessionCount = managedSessionCount;}cumulativeManagedSessionCount++;// Fire listener events.for (IoServiceListener l : listeners) {try {l.sessionCreated(session);} catch (Exception e) {ExceptionMonitor.getInstance().exceptionCaught(e);}}}
Processor 线程把NioSession 放到IoServiceListenerSupport 的managedSessions 队列中。在IoServiceListenerSupport 来追踪IoSession.
public void fireSessionCreated(IoSession session) {boolean firstSession = false;if (session.getService() instanceof IoConnector) {synchronized (managedSessions) {firstSession = managedSessions.isEmpty();}}// If already registered, ignore.if (managedSessions.putIfAbsent(session.getId(), session) != null) {return;}// If the first connector session, fire a virtual service activation event.if (firstSession) {fireServiceActivated();}// Fire session events.IoFilterChain filterChain = session.getFilterChain();filterChain.fireSessionCreated();filterChain.fireSessionOpened();int managedSessionCount = managedSessions.size();if (managedSessionCount > largestManagedSessionCount) {largestManagedSessionCount = managedSessionCount;}cumulativeManagedSessionCount++;// Fire listener events.for (IoServiceListener l : listeners) {try {l.sessionCreated(session);} catch (Exception e) {ExceptionMonitor.getInstance().exceptionCaught(e);}}}
在IoServiceListenerSupport 分别触发了fireSessionCreated(),fireSessionOpened()事件。
这篇关于Apache mina 源码再读3 I/O Service 源码剖析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!