基于Java NIO的即时聊天服务器模型

2024-06-03 23:18

本文主要是介绍基于Java NIO的即时聊天服务器模型,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!


前不久自己动手写了一个Android的聊天工具,跟服务器的交互还是基于HTTP方式的,在一般通讯上还算凑活,但是在即时聊天的时候就有点恶心了,客户端开启Service每隔3秒去询问服务器是否有自己的新消息(当然3秒有点太快了),在心疼性能和流量的前提下,只能自己动手写个服务器,传统的Socket是阻塞的,这样的话服务器对每个Socket都需要建立一个线程来操作,资源开销很大,而且线程多了直接会影响服务端的性能(曾经测试开了3000多个线程就不让创建了,所以并发数目也是有限制的),听说从JDK1.5就多了个New IO,灰常不错的样子,找了找相关的资料,网上竟然全都是最最最简单的一个demo,然后去CSDN发帖,基本上都是建议直接使用MINA框架的,这样一来根本达不到学习NIO的目的,而且现在的技术也太快餐了,只知道使用前辈留下的东西,知其然不知其所以然。

 

折腾了一个周,终于搞出来了一个雏形,相比于xmpp的xml,本人更喜欢json的简洁,为了防止客户端异常断开等,准备采用心跳检测的机制来判断用户是否在线,另外还有一种方法是学习例如Tomcat等Servlet中间件的方式,设置Session周期,定时清除过期Session。本Demo暂时实现了Session过期检测,心跳检测有空再搞,如果本例子在使用过程中有性能漏洞或者什么bug请及时通知我,谢谢


废话不多说,关于NIO的SelectionKey、Selector、Channel网上的介绍例子都很多,直接上代码:

JsonParser

Json的解析类,随便封装了下,使用的最近比较火的fastjson

复制代码
1 public class JsonParser {2     3     private static JSONObject mJson;4     5     public synchronized static String get(String json,String key) {6         mJson = JSON.parseObject(json);7         return mJson.getString(key);8     }9 }
复制代码

Main

入口,不解释

1 public class Main {
2 
3     public static void main(String... args) {
4         new SeekServer().start();
5     }
6 }

Log

复制代码
1 public class Log {2 3     public static void i(Object obj) {4         System.out.println(obj);5     }6     public static void e(Object e) {7         System.err.println(e);8     }9 }
复制代码

SeekServer:

服务器端的入口,请求的封装和接收都在此类,端口暂时写死在了代码里,mSelector.select(TIME_OUT) > 0 目的是为了当服务器空闲的时候(没有任何读写甚至请求断开事件),循环时有个间隔时间,不然基本上相当于while(true){//nothing}了,你懂的

复制代码
 1 public class SeekServer extends Thread{
 2     private final int ACCPET_PORT = 55555;
 3     private final int TIME_OUT = 1000;
 4     private Selector mSelector = null;
 5     private ServerSocketChannel mSocketChannel = null;
 6     private ServerSocket mServerSocket = null;
 7     private InetSocketAddress mAddress = null;
 8     
 9     public SeekServer() {
10         long sign = System.currentTimeMillis();
11         try {
12             mSocketChannel = ServerSocketChannel.open();
13             if(mSocketChannel == null) {
14                 System.out.println("can't open server socket channel");
15             }
16             mServerSocket = mSocketChannel.socket();
17             mAddress = new InetSocketAddress(ACCPET_PORT);
18             mServerSocket.bind(mAddress);
19             Log.i("server bind port is " + ACCPET_PORT);
20             mSelector = Selector.open();
21             mSocketChannel.configureBlocking(false);
22             SelectionKey key = mSocketChannel.register(mSelector, SelectionKey.OP_ACCEPT);
23             key.attach(new Acceptor());
24             
25             //检测Session状态
26             Looper.getInstance().loop();
27             
28             //开始处理Session
29             SessionProcessor.start();
30             
31             Log.i("Seek server startup in " + (System.currentTimeMillis() - sign) + "ms!");
32         } catch (ClosedChannelException e) {
33             Log.e(e.getMessage());
34         } catch (IOException e) {
35             Log.e(e.getMessage());
36         } 
37     }
38     
39     public void run() {
40         Log.i("server is listening...");
41         while(!Thread.interrupted()) {
42             try {
43                 if(mSelector.select(TIME_OUT) > 0) {
44                     Set<SelectionKey> keys = mSelector.selectedKeys();
45                     Iterator<SelectionKey> iterator = keys.iterator();
46                     SelectionKey key = null;
47                     while(iterator.hasNext()) {
48                         key = iterator.next();
49                         Handler at = (Handler) key.attachment();
50                         if(at != null) {
51                             at.exec();
52                         }
53                         iterator.remove();
54                     }
55                 }
56             } catch (IOException e) {
57                 Log.e(e.getMessage());
58             }
59         }
60     }
61 
62     class Acceptor extends Handler{
63 
64         public void exec(){
65             try {
66                 SocketChannel sc = mSocketChannel.accept();
67                 new Session(sc, mSelector);
68             } catch (ClosedChannelException e) {
69                 Log.e(e);
70             } catch (IOException e) {
71                 Log.e(e);
72             }
73         }
74     }
75 }
复制代码

Handler:

只有一个抽象方法exec,Session将会继承它

1 public abstract class Handler {2     3     public abstract void exec();4 }

Session:

封装了用户的请求和SelectionKey和SocketChannel,每次接收到新的请求时都重置它的最后活动时间,通过状态mState=READING or SENDING 去执行消息的接收与发送,当客户端异常断开时则从SessionManager清除该会话。

复制代码
  1 public class Session extends Handler{
  2 
  3     private SocketChannel mChannel;
  4     private SelectionKey  mKey;
  5     private ByteBuffer mRreceiveBuffer = ByteBuffer.allocate(10240);  
  6     private Charset charset = Charset.forName("UTF-8");
  7     private CharsetDecoder mDecoder = charset.newDecoder();
  8     private CharsetEncoder mEncoder = charset.newEncoder();
  9     private long lastPant;//最后活动时间
 10     private final int TIME_OUT = 1000 * 60 * 5; //Session超时时间
 11     private String key;
 12     
 13     private String sendData = "";
 14     private String receiveData = null;
 15     
 16     public static final int READING = 0,SENDING = 1;
 17     int mState = READING;
 18     
 19     public Session(SocketChannel socket, Selector selector) throws IOException {
 20         this.mChannel = socket;
 21         mChannel = socket;
 22         mChannel.configureBlocking(false);
 23         mKey = mChannel.register(selector, 0);
 24         mKey.attach(this);
 25         mKey.interestOps(SelectionKey.OP_READ);
 26         selector.wakeup();
 27         lastPant = Calendar.getInstance().getTimeInMillis();
 28     }
 29     
 30     public String getReceiveData() {
 31         return receiveData;
 32     }
 33     
 34     public void clear() {
 35         receiveData = null;
 36     }
 37 
 38     public void setSendData(String sendData) {
 39         mState = SENDING;
 40         mKey.interestOps(SelectionKey.OP_WRITE);
 41         this.sendData = sendData + "\n";
 42     }
 43 
 44     public boolean isKeekAlive() {
 45         return lastPant + TIME_OUT > Calendar.getInstance().getTimeInMillis();
 46     }
 47     
 48     public void setAlive() {
 49         lastPant = Calendar.getInstance().getTimeInMillis();
 50     }
 51     
 52     /**
 53      * 注销当前Session
 54 */
 55     public void distroy() {
 56         try {
 57             mChannel.close();
 58             mKey.cancel();
 59         } catch (IOException e) {}
 60     }
 61     
 62     @Override
 63     public synchronized void exec() {
 64         try {
 65             if(mState == READING) {
 66                 read();
 67             }else if(mState == SENDING) {
 68                 write();
 69             }
 70         } catch (IOException e) {
 71             SessionManager.remove(key);
 72             try {
 73                 mChannel.close();
 74             } catch (IOException e1) {
 75                 Log.e(e1);
 76             }
 77             mKey.cancel();
 78         }
 79     }
 80     
 81     public void read() throws IOException{
 82         mRreceiveBuffer.clear();
 83         int sign = mChannel.read(mRreceiveBuffer);
 84         if(sign == -1) { //客户端连接关闭
 85             mChannel.close();
 86             mKey.cancel();
 87         }
 88         if(sign > 0) {
 89             mRreceiveBuffer.flip();
 90             receiveData = mDecoder.decode(mRreceiveBuffer).toString();
 91             setAlive();
 92             setSign();
 93             SessionManager.addSession(key, this);
 94         }
 95     }
 96     
 97     private void setSign() {
 98         //设置当前Session的Key
 99         key = JsonParser.get(receiveData,"imei");
100         //检测消息类型是否为心跳包
101 //        String type = jo.getString("type");
102 //        if(type.equals("HEART_BEAT")) {
103 //            setAlive();
104 //        }
105     }
106     
107     
108     /**
109      * 写消息
110 */
111     public void write() {
112         try {
113             mChannel.write(mEncoder.encode(CharBuffer.wrap(sendData)));
114             sendData = null;
115             mState = READING;
116             mKey.interestOps(SelectionKey.OP_READ);
117         } catch (CharacterCodingException e) {
118             e.printStackTrace();
119         } catch (IOException e) {
120             try {
121                 mChannel.close();
122             } catch (IOException e1) {
123                 Log.e(e1);
124             }
125         }
126     }
127 }
复制代码

SessionManager:

将所有Session存放到ConcurrentHashMap,这里使用手机用户的imei做key,ConcurrentHashMap因为是线程安全的,所以能很大程度上避免自己去实现同步的过程,

封装了一些操作Session的方法例如get,remove等

复制代码
 1 public class SessionManager { 2  3     private static ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>(); 4      5     public static void addSession(String key,Session session) { 6         sessions.put(key, session); 7     } 8      9     public static Session getSession(String key) {10         return sessions.get(key);11     }12     13     public static Set<String> getSessionKeys() {14         return sessions.keySet();15     }16     17     public static int getSessionCount() {18         return sessions.size();19     }20     21     public static void remove(String[] keys) {22         for(String key:keys) {23             if(sessions.containsKey(key)) {24                 sessions.get(key).distroy();25                 sessions.remove(key);26             }27         }28     }29     public static void remove(String key) {30         if(sessions.containsKey(key)) {31             sessions.get(key).distroy();32             sessions.remove(key);33         }34     }35 }
复制代码

SessionProcessor

里面使用了JDK自带的线程池,用来分发处理所有Session中当前需要处理的请求(线程池的初始化参数不是太熟,望有了解的童鞋能告诉我),内部类Process则是将Session再次封装成SocketRequest和SocketResponse(看到这里是不是有点熟悉的感觉,对没错,JavaWeb里到处都是request和response)

复制代码
 1 public class SessionProcessor implements Runnable{
 2     
 3     private static Runnable processor = new SessionProcessor();
 4     private static ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 200, 500, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.CallerRunsPolicy());
 5     public static void start() {
 6         new Thread(processor).start();
 7     }
 8     
 9     @Override
10     public void run() {
11         while(true) {
12             Session tmp = null;
13             for(String key:SessionManager.getSessionKeys()) {
14                 tmp = SessionManager.getSession(key);
15                 //处理Session未处理的请求
16                 if(tmp.getReceiveData() != null) {
17                     pool.execute(new Process(tmp));
18                 }
19             }
20             try {
21                 Thread.sleep(10);
22             } catch (InterruptedException e) {
23                 Log.e(e);
24             }
25         }
26     }
27     
28     class Process implements Runnable {
29 
30         private SocketRequest request;
31         private SocketResponse response;
32         
33         public Process(Session session) {
34             //将Session封装成Request和Response
35             request = new SocketRequest(session);
36             response = new SocketResponse(session);
37         }
38         
39         @Override
40         public void run() {
41             new RequestTransform().transfer(request, response);
42         }
43     }
44 
45 }
复制代码

RequestTransform里的transfer方法利用反射对请求参数中的请求类别和请求动作来调用不同类的不同方法(UserHandler和MessageHandler)

复制代码
 1 public class RequestTransform { 2  3     public void transfer(SocketRequest request,SocketResponse response) { 4         String action = request.getValue("action"); 5         String handlerName = request.getValue("handler"); 6         //根据Session的请求类型,让不同的类方法去处理 7         try { 8             Class<?> c= Class.forName("com.seek.server.handler." + handlerName); 9             Class<?>[] arg=new Class[]{SocketRequest.class,SocketResponse.class};10             Method method=c.getMethod(action,arg);11             method.invoke(c.newInstance(), new Object[]{request,response});12         } catch (Exception e) {13             e.printStackTrace();14         }15     }16 }
复制代码

SocketRequest和SocketResponse

复制代码
 1 public class SocketRequest {
 2 
 3     private Session mSession;
 4     private String  mReceive;
 5     
 6     public SocketRequest(Session session) {
 7         mSession = session;
 8         mReceive = session.getReceiveData();
 9         mSession.clear();
10     }
11     
12     public String getValue(String key) {
13         return JsonParser.get(mReceive, key);
14     }
15     
16     public String getQueryString() {
17         return mReceive;
18     }
19 }
复制代码
复制代码
 1 public class SocketResponse { 2  3     private Session mSession; 4     public SocketResponse(Session session) { 5         mSession = session; 6     } 7      8     public void write(String msg) { 9         mSession.setSendData(msg);10     }11 }
复制代码

最后则是两个处理请求的Handler

复制代码
1 public class UserHandler {
2 
3     public void login(SocketRequest request,SocketResponse response) {
4         System.out.println(request.getQueryString());
5         //TODO: 处理用户登录
6         response.write("你肯定收到消息了");
7     }
8 }
复制代码
复制代码
1 public class MessageHandler {2     public void send(SocketRequest request,SocketResponse response) {3         System.out.println(request.getQueryString());4         //消息发送5         String key = request.getValue("imei");6         Session session = SessionManager.getSession(key);7         new SocketResponse(session).write(request.getValue("sms"));8     }9 }
复制代码

还有个监测是否超时的类Looper,定期去删除Session

复制代码
 1 public class Looper extends Thread{
 2     private static Looper looper = new Looper();
 3     private static boolean isStart = false;
 4     private final int INTERVAL = 1000 * 60 * 5;
 5     private Looper(){}
 6     public static Looper getInstance() {
 7         return looper;
 8     }
 9     
10     public void loop() {
11         if(!isStart) {
12             isStart = true;
13             this.start();
14         }
15     }
16     
17     public void run() {
18         Task task = new Task();
19         while(true) {
20             //Session过期检测
21             task.checkState();
22             //心跳包检测
23 //task.sendAck();
24             try {
25                 Thread.sleep(INTERVAL);
26             } catch (InterruptedException e) {
27                 Log.e(e);
28             }
29         }
30     }
31 }
复制代码
复制代码
 1 public class Task { 2     public void checkState() { 3         Set<String> keys = SessionManager.getSessionKeys(); 4         if(keys.size() == 0) { 5             return; 6         } 7         List<String> removes = new ArrayList<String>(); 8         Iterator<String> iterator = keys.iterator(); 9         String key = null;10         while(iterator.hasNext()) {11             key = iterator.next();12             if(!SessionManager.getSession(key).isKeekAlive()) {13                 removes.add(key);14             }15         }16         if(removes.size() > 0) {17             Log.i("sessions is time out,remove " + removes.size() + "session");18         }19         SessionManager.remove(removes.toArray(new String[removes.size()]));20     }21     22     public void sendAck() {23         Set<String> keys = SessionManager.getSessionKeys();24         if(keys.size() == 0) {25             return;26         }27         Iterator<String> iterator = keys.iterator();28         while(iterator.hasNext()) {29             iterator.next();30             //TODO 发送心跳包31         }32     }33 }
复制代码

注意,在Task和SessionProcessor类里都有对SessionManager的sessions做遍历,文中使用的方法并不是很好,主要是效率问题,推荐使用遍历Entry的方式来获取Key和Value,

因为一直在JavaWeb上折腾,所以会的童鞋看到Request和Response会挺亲切,这个例子没有经过任何安全和性能测试,如果需要放到生产环境上得话请先自行做测试- -!

客户端请求时的数据内容例如{handler:"UserHandler",action:"login",imei:"2364656512636".......},这些约定就自己来定了


原文出自:  http://www.2cto.com/kf/201406/307005.html   

这篇关于基于Java NIO的即时聊天服务器模型的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1028395

相关文章

Java实现检查多个时间段是否有重合

《Java实现检查多个时间段是否有重合》这篇文章主要为大家详细介绍了如何使用Java实现检查多个时间段是否有重合,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录流程概述步骤详解China编程步骤1:定义时间段类步骤2:添加时间段步骤3:检查时间段是否有重合步骤4:输出结果示例代码结语作

Java中String字符串使用避坑指南

《Java中String字符串使用避坑指南》Java中的String字符串是我们日常编程中用得最多的类之一,看似简单的String使用,却隐藏着不少“坑”,如果不注意,可能会导致性能问题、意外的错误容... 目录8个避坑点如下:1. 字符串的不可变性:每次修改都创建新对象2. 使用 == 比较字符串,陷阱满

Java判断多个时间段是否重合的方法小结

《Java判断多个时间段是否重合的方法小结》这篇文章主要为大家详细介绍了Java中判断多个时间段是否重合的方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录判断多个时间段是否有间隔判断时间段集合是否与某时间段重合判断多个时间段是否有间隔实体类内容public class D

IDEA编译报错“java: 常量字符串过长”的原因及解决方法

《IDEA编译报错“java:常量字符串过长”的原因及解决方法》今天在开发过程中,由于尝试将一个文件的Base64字符串设置为常量,结果导致IDEA编译的时候出现了如下报错java:常量字符串过长,... 目录一、问题描述二、问题原因2.1 理论角度2.2 源码角度三、解决方案解决方案①:StringBui

Java覆盖第三方jar包中的某一个类的实现方法

《Java覆盖第三方jar包中的某一个类的实现方法》在我们日常的开发中,经常需要使用第三方的jar包,有时候我们会发现第三方的jar包中的某一个类有问题,或者我们需要定制化修改其中的逻辑,那么应该如何... 目录一、需求描述二、示例描述三、操作步骤四、验证结果五、实现原理一、需求描述需求描述如下:需要在

Java中ArrayList和LinkedList有什么区别举例详解

《Java中ArrayList和LinkedList有什么区别举例详解》:本文主要介绍Java中ArrayList和LinkedList区别的相关资料,包括数据结构特性、核心操作性能、内存与GC影... 目录一、底层数据结构二、核心操作性能对比三、内存与 GC 影响四、扩容机制五、线程安全与并发方案六、工程

ElasticSearch+Kibana通过Docker部署到Linux服务器中操作方法

《ElasticSearch+Kibana通过Docker部署到Linux服务器中操作方法》本文介绍了Elasticsearch的基本概念,包括文档和字段、索引和映射,还详细描述了如何通过Docker... 目录1、ElasticSearch概念2、ElasticSearch、Kibana和IK分词器部署

部署Vue项目到服务器后404错误的原因及解决方案

《部署Vue项目到服务器后404错误的原因及解决方案》文章介绍了Vue项目部署步骤以及404错误的解决方案,部署步骤包括构建项目、上传文件、配置Web服务器、重启Nginx和访问域名,404错误通常是... 目录一、vue项目部署步骤二、404错误原因及解决方案错误场景原因分析解决方案一、Vue项目部署步骤

JavaScript中的reduce方法执行过程、使用场景及进阶用法

《JavaScript中的reduce方法执行过程、使用场景及进阶用法》:本文主要介绍JavaScript中的reduce方法执行过程、使用场景及进阶用法的相关资料,reduce是JavaScri... 目录1. 什么是reduce2. reduce语法2.1 语法2.2 参数说明3. reduce执行过程

如何使用Java实现请求deepseek

《如何使用Java实现请求deepseek》这篇文章主要为大家详细介绍了如何使用Java实现请求deepseek功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1.deepseek的api创建2.Java实现请求deepseek2.1 pom文件2.2 json转化文件2.2