本文主要是介绍基于Java NIO的即时聊天服务器模型,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
折腾了一个周,终于搞出来了一个雏形,相比于xmpp的xml,本人更喜欢json的简洁,为了防止客户端异常断开等,准备采用心跳检测的机制来判断用户是否在线,另外还有一种方法是学习例如Tomcat等Servlet中间件的方式,设置Session周期,定时清除过期Session。本Demo暂时实现了Session过期检测,心跳检测有空再搞,如果本例子在使用过程中有性能漏洞或者什么bug请及时通知我,谢谢
废话不多说,关于NIO的SelectionKey、Selector、Channel网上的介绍例子都很多,直接上代码:
JsonParser
Json的解析类,随便封装了下,使用的最近比较火的fastjson

1 public class JsonParser {2 3 private static JSONObject mJson;4 5 public synchronized static String get(String json,String key) {6 mJson = JSON.parseObject(json);7 return mJson.getString(key);8 }9 }

Main
入口,不解释
1 public class Main { 2 3 public static void main(String... args) { 4 new SeekServer().start(); 5 } 6 }
Log

1 public class Log {2 3 public static void i(Object obj) {4 System.out.println(obj);5 }6 public static void e(Object e) {7 System.err.println(e);8 }9 }

SeekServer:
服务器端的入口,请求的封装和接收都在此类,端口暂时写死在了代码里,mSelector.select(TIME_OUT) > 0 目的是为了当服务器空闲的时候(没有任何读写甚至请求断开事件),循环时有个间隔时间,不然基本上相当于while(true){//nothing}了,你懂的

1 public class SeekServer extends Thread{ 2 private final int ACCPET_PORT = 55555; 3 private final int TIME_OUT = 1000; 4 private Selector mSelector = null; 5 private ServerSocketChannel mSocketChannel = null; 6 private ServerSocket mServerSocket = null; 7 private InetSocketAddress mAddress = null; 8 9 public SeekServer() { 10 long sign = System.currentTimeMillis(); 11 try { 12 mSocketChannel = ServerSocketChannel.open(); 13 if(mSocketChannel == null) { 14 System.out.println("can't open server socket channel"); 15 } 16 mServerSocket = mSocketChannel.socket(); 17 mAddress = new InetSocketAddress(ACCPET_PORT); 18 mServerSocket.bind(mAddress); 19 Log.i("server bind port is " + ACCPET_PORT); 20 mSelector = Selector.open(); 21 mSocketChannel.configureBlocking(false); 22 SelectionKey key = mSocketChannel.register(mSelector, SelectionKey.OP_ACCEPT); 23 key.attach(new Acceptor()); 24 25 //检测Session状态 26 Looper.getInstance().loop(); 27 28 //开始处理Session 29 SessionProcessor.start(); 30 31 Log.i("Seek server startup in " + (System.currentTimeMillis() - sign) + "ms!"); 32 } catch (ClosedChannelException e) { 33 Log.e(e.getMessage()); 34 } catch (IOException e) { 35 Log.e(e.getMessage()); 36 } 37 } 38 39 public void run() { 40 Log.i("server is listening..."); 41 while(!Thread.interrupted()) { 42 try { 43 if(mSelector.select(TIME_OUT) > 0) { 44 Set<SelectionKey> keys = mSelector.selectedKeys(); 45 Iterator<SelectionKey> iterator = keys.iterator(); 46 SelectionKey key = null; 47 while(iterator.hasNext()) { 48 key = iterator.next(); 49 Handler at = (Handler) key.attachment(); 50 if(at != null) { 51 at.exec(); 52 } 53 iterator.remove(); 54 } 55 } 56 } catch (IOException e) { 57 Log.e(e.getMessage()); 58 } 59 } 60 } 61 62 class Acceptor extends Handler{ 63 64 public void exec(){ 65 try { 66 SocketChannel sc = mSocketChannel.accept(); 67 new Session(sc, mSelector); 68 } catch (ClosedChannelException e) { 69 Log.e(e); 70 } catch (IOException e) { 71 Log.e(e); 72 } 73 } 74 } 75 }

Handler:
只有一个抽象方法exec,Session将会继承它
1 public abstract class Handler {2 3 public abstract void exec();4 }
Session:
封装了用户的请求和SelectionKey和SocketChannel,每次接收到新的请求时都重置它的最后活动时间,通过状态mState=READING or SENDING 去执行消息的接收与发送,当客户端异常断开时则从SessionManager清除该会话。

1 public class Session extends Handler{ 2 3 private SocketChannel mChannel; 4 private SelectionKey mKey; 5 private ByteBuffer mRreceiveBuffer = ByteBuffer.allocate(10240); 6 private Charset charset = Charset.forName("UTF-8"); 7 private CharsetDecoder mDecoder = charset.newDecoder(); 8 private CharsetEncoder mEncoder = charset.newEncoder(); 9 private long lastPant;//最后活动时间 10 private final int TIME_OUT = 1000 * 60 * 5; //Session超时时间 11 private String key; 12 13 private String sendData = ""; 14 private String receiveData = null; 15 16 public static final int READING = 0,SENDING = 1; 17 int mState = READING; 18 19 public Session(SocketChannel socket, Selector selector) throws IOException { 20 this.mChannel = socket; 21 mChannel = socket; 22 mChannel.configureBlocking(false); 23 mKey = mChannel.register(selector, 0); 24 mKey.attach(this); 25 mKey.interestOps(SelectionKey.OP_READ); 26 selector.wakeup(); 27 lastPant = Calendar.getInstance().getTimeInMillis(); 28 } 29 30 public String getReceiveData() { 31 return receiveData; 32 } 33 34 public void clear() { 35 receiveData = null; 36 } 37 38 public void setSendData(String sendData) { 39 mState = SENDING; 40 mKey.interestOps(SelectionKey.OP_WRITE); 41 this.sendData = sendData + "\n"; 42 } 43 44 public boolean isKeekAlive() { 45 return lastPant + TIME_OUT > Calendar.getInstance().getTimeInMillis(); 46 } 47 48 public void setAlive() { 49 lastPant = Calendar.getInstance().getTimeInMillis(); 50 } 51 52 /** 53 * 注销当前Session 54 */ 55 public void distroy() { 56 try { 57 mChannel.close(); 58 mKey.cancel(); 59 } catch (IOException e) {} 60 } 61 62 @Override 63 public synchronized void exec() { 64 try { 65 if(mState == READING) { 66 read(); 67 }else if(mState == SENDING) { 68 write(); 69 } 70 } catch (IOException e) { 71 SessionManager.remove(key); 72 try { 73 mChannel.close(); 74 } catch (IOException e1) { 75 Log.e(e1); 76 } 77 mKey.cancel(); 78 } 79 } 80 81 public void read() throws IOException{ 82 mRreceiveBuffer.clear(); 83 int sign = mChannel.read(mRreceiveBuffer); 84 if(sign == -1) { //客户端连接关闭 85 mChannel.close(); 86 mKey.cancel(); 87 } 88 if(sign > 0) { 89 mRreceiveBuffer.flip(); 90 receiveData = mDecoder.decode(mRreceiveBuffer).toString(); 91 setAlive(); 92 setSign(); 93 SessionManager.addSession(key, this); 94 } 95 } 96 97 private void setSign() { 98 //设置当前Session的Key 99 key = JsonParser.get(receiveData,"imei"); 100 //检测消息类型是否为心跳包 101 // String type = jo.getString("type"); 102 // if(type.equals("HEART_BEAT")) { 103 // setAlive(); 104 // } 105 } 106 107 108 /** 109 * 写消息 110 */ 111 public void write() { 112 try { 113 mChannel.write(mEncoder.encode(CharBuffer.wrap(sendData))); 114 sendData = null; 115 mState = READING; 116 mKey.interestOps(SelectionKey.OP_READ); 117 } catch (CharacterCodingException e) { 118 e.printStackTrace(); 119 } catch (IOException e) { 120 try { 121 mChannel.close(); 122 } catch (IOException e1) { 123 Log.e(e1); 124 } 125 } 126 } 127 }

SessionManager:
将所有Session存放到ConcurrentHashMap,这里使用手机用户的imei做key,ConcurrentHashMap因为是线程安全的,所以能很大程度上避免自己去实现同步的过程,
封装了一些操作Session的方法例如get,remove等

1 public class SessionManager { 2 3 private static ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>(); 4 5 public static void addSession(String key,Session session) { 6 sessions.put(key, session); 7 } 8 9 public static Session getSession(String key) {10 return sessions.get(key);11 }12 13 public static Set<String> getSessionKeys() {14 return sessions.keySet();15 }16 17 public static int getSessionCount() {18 return sessions.size();19 }20 21 public static void remove(String[] keys) {22 for(String key:keys) {23 if(sessions.containsKey(key)) {24 sessions.get(key).distroy();25 sessions.remove(key);26 }27 }28 }29 public static void remove(String key) {30 if(sessions.containsKey(key)) {31 sessions.get(key).distroy();32 sessions.remove(key);33 }34 }35 }

SessionProcessor
里面使用了JDK自带的线程池,用来分发处理所有Session中当前需要处理的请求(线程池的初始化参数不是太熟,望有了解的童鞋能告诉我),内部类Process则是将Session再次封装成SocketRequest和SocketResponse(看到这里是不是有点熟悉的感觉,对没错,JavaWeb里到处都是request和response)

1 public class SessionProcessor implements Runnable{ 2 3 private static Runnable processor = new SessionProcessor(); 4 private static ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 200, 500, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.CallerRunsPolicy()); 5 public static void start() { 6 new Thread(processor).start(); 7 } 8 9 @Override 10 public void run() { 11 while(true) { 12 Session tmp = null; 13 for(String key:SessionManager.getSessionKeys()) { 14 tmp = SessionManager.getSession(key); 15 //处理Session未处理的请求 16 if(tmp.getReceiveData() != null) { 17 pool.execute(new Process(tmp)); 18 } 19 } 20 try { 21 Thread.sleep(10); 22 } catch (InterruptedException e) { 23 Log.e(e); 24 } 25 } 26 } 27 28 class Process implements Runnable { 29 30 private SocketRequest request; 31 private SocketResponse response; 32 33 public Process(Session session) { 34 //将Session封装成Request和Response 35 request = new SocketRequest(session); 36 response = new SocketResponse(session); 37 } 38 39 @Override 40 public void run() { 41 new RequestTransform().transfer(request, response); 42 } 43 } 44 45 }

RequestTransform里的transfer方法利用反射对请求参数中的请求类别和请求动作来调用不同类的不同方法(UserHandler和MessageHandler)

1 public class RequestTransform { 2 3 public void transfer(SocketRequest request,SocketResponse response) { 4 String action = request.getValue("action"); 5 String handlerName = request.getValue("handler"); 6 //根据Session的请求类型,让不同的类方法去处理 7 try { 8 Class<?> c= Class.forName("com.seek.server.handler." + handlerName); 9 Class<?>[] arg=new Class[]{SocketRequest.class,SocketResponse.class};10 Method method=c.getMethod(action,arg);11 method.invoke(c.newInstance(), new Object[]{request,response});12 } catch (Exception e) {13 e.printStackTrace();14 }15 }16 }

SocketRequest和SocketResponse

1 public class SocketRequest { 2 3 private Session mSession; 4 private String mReceive; 5 6 public SocketRequest(Session session) { 7 mSession = session; 8 mReceive = session.getReceiveData(); 9 mSession.clear(); 10 } 11 12 public String getValue(String key) { 13 return JsonParser.get(mReceive, key); 14 } 15 16 public String getQueryString() { 17 return mReceive; 18 } 19 }


1 public class SocketResponse { 2 3 private Session mSession; 4 public SocketResponse(Session session) { 5 mSession = session; 6 } 7 8 public void write(String msg) { 9 mSession.setSendData(msg);10 }11 }

最后则是两个处理请求的Handler

1 public class UserHandler { 2 3 public void login(SocketRequest request,SocketResponse response) { 4 System.out.println(request.getQueryString()); 5 //TODO: 处理用户登录 6 response.write("你肯定收到消息了"); 7 } 8 }


1 public class MessageHandler {2 public void send(SocketRequest request,SocketResponse response) {3 System.out.println(request.getQueryString());4 //消息发送5 String key = request.getValue("imei");6 Session session = SessionManager.getSession(key);7 new SocketResponse(session).write(request.getValue("sms"));8 }9 }

还有个监测是否超时的类Looper,定期去删除Session

1 public class Looper extends Thread{ 2 private static Looper looper = new Looper(); 3 private static boolean isStart = false; 4 private final int INTERVAL = 1000 * 60 * 5; 5 private Looper(){} 6 public static Looper getInstance() { 7 return looper; 8 } 9 10 public void loop() { 11 if(!isStart) { 12 isStart = true; 13 this.start(); 14 } 15 } 16 17 public void run() { 18 Task task = new Task(); 19 while(true) { 20 //Session过期检测 21 task.checkState(); 22 //心跳包检测 23 //task.sendAck(); 24 try { 25 Thread.sleep(INTERVAL); 26 } catch (InterruptedException e) { 27 Log.e(e); 28 } 29 } 30 } 31 }


1 public class Task { 2 public void checkState() { 3 Set<String> keys = SessionManager.getSessionKeys(); 4 if(keys.size() == 0) { 5 return; 6 } 7 List<String> removes = new ArrayList<String>(); 8 Iterator<String> iterator = keys.iterator(); 9 String key = null;10 while(iterator.hasNext()) {11 key = iterator.next();12 if(!SessionManager.getSession(key).isKeekAlive()) {13 removes.add(key);14 }15 }16 if(removes.size() > 0) {17 Log.i("sessions is time out,remove " + removes.size() + "session");18 }19 SessionManager.remove(removes.toArray(new String[removes.size()]));20 }21 22 public void sendAck() {23 Set<String> keys = SessionManager.getSessionKeys();24 if(keys.size() == 0) {25 return;26 }27 Iterator<String> iterator = keys.iterator();28 while(iterator.hasNext()) {29 iterator.next();30 //TODO 发送心跳包31 }32 }33 }

注意,在Task和SessionProcessor类里都有对SessionManager的sessions做遍历,文中使用的方法并不是很好,主要是效率问题,推荐使用遍历Entry的方式来获取Key和Value,
因为一直在JavaWeb上折腾,所以会的童鞋看到Request和Response会挺亲切,这个例子没有经过任何安全和性能测试,如果需要放到生产环境上得话请先自行做测试- -!
客户端请求时的数据内容例如{handler:"UserHandler",action:"login",imei:"2364656512636".......},这些约定就自己来定了
原文出自: http://www.2cto.com/kf/201406/307005.html
这篇关于基于Java NIO的即时聊天服务器模型的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!