本文主要是介绍akka.io的基本用法,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
akka.io的api已经非常非常简单了, 实在很难挑剔. 如果用它来做单进程的游戏服务器, 基本上分成三个步骤就可以完成了.
1. akka.io的环境初始化, 包括了tcp extension的初始化.
2. 绑定一个端口, 并将这个端口上的事件交给某个actor处理, 如连接到来事件.
3. 有连接到来时将其指派给某个业务actor处理, 接下来这个业务actor就负责自己身上的所有事件了, 如消息到来事件.
-------------------------------------------------------
1. 环境初始化.
/** 服务器启动. */
public final static boolean init()
{try{AkMgr.sys = ActorSystem.create();TcpExt ext = Tcp.get(AkMgr.sys);ActorRef ref = ext.manager();//ActorRef srv = AkMgr.sys.actorOf(Props.create(Srv.class), Srv.class.getName());InetSocketAddress addr = new InetSocketAddress("0.0.0.0", 20001);ref.tell(TcpMessage.bind(srv, addr, 0x10000, AkMgr.setOpt(), false), srv);return true;} catch (Exception e){Log.error(Log.trace(e));return false;}
}
/** 服务器端口套接字选项. */
private static final List<Inet.SocketOption> setOpt()
{List<Inet.SocketOption> options = new ArrayList<Inet.SocketOption>();options.add(TcpSO.reuseAddress(true));options.add(TcpSO.sendBufferSize(0x400 * 10));options.add(TcpSO.receiveBufferSize(0x400 * 10));return options;
}
tcp的选项似乎有点奇怪, 在bind的时候指定, 然后它们将被应用到所有的连接上去. 换句话说, 是统一指定的. 并且只提供了下面几个选项(版本是2.3.7)
没有看到linger. timeout什么的. 看来akka觉得我们不需要其它的. 事实上通过测试结果来看, 也确实如此. 连接所关联的actor在stop的时候,
立即就被销毁了. 也没有看到tcp缓冲区残留等待, time_wait状态, 换句话说, 连接上的关闭是暴力的. 如果想延迟关闭, 你可能得单独处理.
2. 监听端口事件处理actor.
上面的Srv就是负责处理监听端口上的事件类了, onReceive函数中重要的事件就是Tcp.Connected了, 它表示了一个连接到来.
public void onReceive(Object msg) throws Exception
{try{if (msg instanceof Tcp.Bound)this.boundEvn((Bound) msg);else if (msg instanceof Tcp.Connected)this.connEvn((Tcp.Connected) msg, getSender());else{Log.error("it`s an unexpected message: %s\n", msg);this.unhandled(msg);}} catch (Exception e){Log.error(Log.trace(e));}
}
3. 当连接到来的时候, 将业务actor Peer注册到连接上就可以了.
/** 连接到来事件. */
private void connEvn(Tcp.Connected msg, ActorRef sender)
{if (Log.isTrace())Log.trace("got a connection from peer: %s\n", msg.remoteAddress());ActorRef peer = AkMgr.actorOf(Props.create(Peer.class, sender, msg.remoteAddress())); /* 构造一个peer. */this.getSender().tell(TcpMessage.register(peer), this.getSelf()); /* 连接上的事件交予peer处理. */
}
下面是Peer类的消息入口, 可能的事件在Tcp.*中都有定义. 包括了报文送达, 连接断开, 和这里没有处理的send过载等事件.
/** 消息入口. */
public void onReceive(Object msg) throws Exception
{if (msg instanceof Tcp.Received)this.datEvn((Tcp.Received) msg);else if (msg instanceof Tcp.ErrorClosed)this.disEvn((Tcp.ErrorClosed) msg);elseLog.error("it`s an unexpected message: %s\n", msg.getClass().getName());
}
这里有一个值得怀疑的地方是, 每个Tcp.Received消息是无法控制的. 相当于只要tcp缓冲区中有数据, akka就会把它拿出来, 以Tcp.Received的形式扔到应用上
来.由应用自己去decode消息流. 因为消息流是无边界的, 那么应用自己需要额外开辟一段内存来去缓存不完整的消息.
因此在注册一个Peer到连接上的时候(TcpMessage.register(peer)), 如果能让应用指定一个缓冲区是不是更好呢? 这样应用层和akka可以共同操作这片区域,
从而减少来回的copy呢?
4. 关于性能.
akka.io的性能是非常好的. 在10000个连接, 15000条消息/s, 8Mbytes/s的压力下. 占用了一个i5 4核cpu的120%, 也就是一个cpu多一点点. gc也比较少, 内存的使用也很稳定.
这篇关于akka.io的基本用法的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!