本文主要是介绍SpringBoot实现websocket服务端及客户端的详细过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《SpringBoot实现websocket服务端及客户端的详细过程》文章介绍了WebSocket通信过程、服务端和客户端的实现,以及可能遇到的问题及解决方案,感兴趣的朋友一起看看吧...
一、WebSocket通信过程
客户端构建一个websocket实例,并且为它绑定一个需要连接到的服务器地址,当客户端连接服务端的候,会向服务端发送一个http get报文,告诉服务端需要将通信协议切换到websocket,服务端收到http请求后将通信协议切换到websocket,同时发给客户端一个响应报文,返回的状态码为101,表示同意客户端协议转请求,并转换为websocket协议。以上过程都是利用http通信完成的,称之为websocket协议握手(websocket Protocol handshake),经过握手之后,客户端和服务端就建立了websocket连接,以后的通信走的都是websocket协议了。
二、服务端实现
1.pom文件添加依赖
<!--webSocket--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
2.启用Springboot对WebSocket的支持
package com.lby.websocket.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * @author Liby * @date 2022-04-25 16:18 * @description: * @version: */ @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
3.核心配置:WebSocketServer
因为WebSocket是类似客户端服务端的形式(采用ws协议),那么这里的WebSocketServer其实就相当于一个ws协议的Controller
@ ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端, 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
新建一个ConcurrentHashMap webSocketMap 用于接收当前userId的WebSocket,方便传递之间对userId进行推送消息。
下面是具体业务代码:
package org.example.server; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONObject; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.stereotype.Component; import Javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; /* 启动项目后可通过 http://websocket.jsonin.com/ 进行测试 输入网址ws://localhost:8080/China编程websocket/1211,检测是否能进行连接 */ @ServerEndpoint(value = "/websocket/{userId}") @Component public class WebSocket { private final static Logger logger = LogManager.getLogger(WebSocket.class); /** * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的 */ private static int onlineCount = 0; /** * concurrent包的线程安全Map,用来存放每个客户端对应的MyWebSocket对象 */ private static ConcurrentHashMap<String, WebSocket> webSocketMap = new ConcurrentHashMap<>(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; private String userId; /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { this.session = session; this.userId = userId; //加入map webSocketMap.put(userId, this); addOnlineCount(); //在线数加1 logger.info("用户{}连接成功,当前在线人数为{}", userId, getOnlineCounTkRlot()); try { sendMessage(String.valueOf(this.session.getQueryString())); } catch (IOException e) { logger.error("IO异常"); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { //从map中删除 webSocketMap.remove(userId); subOnlineCount(); //在线数减1 logger.info("用户{}关闭连接!当前在线人数为{}", userId, getOnlineCount()); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 */ @OnMessage public void onMessage(String message, Session session) { logger.info("来自客户端用户:{} 消息:{}",userId, message); //群发消息 for (String item : webSocketMap.keySet()) { try { webSocketMap.get(item).sendMessage(message); } catch (IOException e) { e.printStackTrace(); } } } /** * 发生错误时调用 * */ @OnError public void onError(Session session, Throwable error) { logger.error("用户错误:" + this.userId + ",原因:" + error.getMessage()); error.printStackTrace(); } /** * 向客户端发送消息 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); //this.session.getAsyncRemote().sendText(message); } /** * 通过userId向客户端发送消息 */ public void sendMessageByUserId(String userId, String message) throws IOException { logger.info("服务端发送消息到{},消息:{}",userId,message); if(StrUtil.isNotBlank(userId)&&webSocketMap.containsKey(userId)){ webSocketMap.get(userId).sendMessage("hello"); }else{ logger.error("用户{}不在线",userId); } } /** * 群发自定义消息 */ public void sendInfo(String message) throws IOException { for (String item : webSopythoncketMap.keySet()) { try { webSocketMap.get(item).sendMessage(message); } catch (IOException e) { e.printStackTrace(); } } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocket.onlineCount++; } public static synchronized void subOnlineCount() { WebSocket.onlineCount--; } }
三、客户端实现
1.pom文件添加依赖
<dependency> <groupId>org.java-websocket</groupId> <artifactId>Java-WebSocket</artifactId> <version>1.5.3</version> </dependency>
2.客户端实现代码
package org.example; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import java.net.URI; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @author kele * @date 2024/2/19 **/ @Slf4j public class MyWebSocketClient extends WebSocketClient { public MyWebSocketClient(URI serverUri) { super(serverUri); } @SneakyThrows @Override public void onOpen(ServerHandshake data) { try { log.info("WebSocket连接已打开。"); }catch (Exception e){ log.error("onOpen error :{}",e.getMessage()); } } @SneakyThrows @Override public void onMessage(String message) { try { if (message != null && !message.isEmpty()) { log.info("收到消息: {}",message); } }catch (Exception e){ log.error("onMessage error : {}",message); } } @Override public void onClose(int code, String reason, boolean remote) { log.info("WebSocket连接已关闭。"); } @Override public void onError(Exception ex) { log.info("WebSocket连接发生错误:{}", exhttp://www.chinasem.cn.getMessage()); } /** * 连接定时检查 */ public void startReconnectTask(long delay, TimeUnit unit) { System.out.println("WebSocket 心跳检查"); log.info("WebSocket 心跳检查"); // 以下为定时器,建议使用自定义线程池,或交给框架处理(spring) ScheduledExecutorService executorService = Executors.newSingleThreadScheduledEhttp://www.chinasem.cnxecutor(); executorService.scheduleWithFixedDelay(() -> { // 检查逻辑:判断当前连接是否连通 if (!this.isOpen()) { System.out.println("WebSocket 开始重连......"); log.info("WebSocket 开始重连......"); // 重置连接 this.reconnect(); // 以下为错误示范 //this.close(); //this.connect(); } }, 0, delay, unit); } }
3.开启客户端连接服务,并开启定时检查websocket连接
package org.example; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.net.URI; import java.util.concurrent.TimeUnit; @Slf4j @Component public class Init implements Runnable { public static MyWebSocketClient myWebSocketClient; @PostConstruct public void run () { try { //启动连接 log.info("连接websocket服务端"); log.info("项目启动"); // 服务地址 URI uri = new URI("ws://127.0.0.1:8077/websocket/123"); log.info("服务地址 -{}", uri); // 创建客户端 myWebSocketClient = new MyWebSocketClient(uri); // 建立连接 myWebSocketClient.connect(); // 开启 定时检查 myWebSocketClient.startReconnectTask(5, TimeUnit.SECONDS); }catch (Exception e){ e.printStackTrace(); } } }
四、可能遇到的问题及解决方案
1.websocket客户端重连机制
注意引入Java-websocket的版本要高于1.5.0才有reconnect()方法
2.在使用websocket连接传输Base64编码时出现错误
原因:Websocket中maxMessageSize默认是8K,接收的图片大于8K导致接收失败,解决方法:适当加大maxMessageSize。
参考资料:
注意不能设置太大,避免内存溢出。参考博客1,博客2,博客3
@OnMessage(maxMessageSize = 10240000) public void onMessage(byte[] message) throws IOException { log.info("收到图片了"); String str = Base64.encodeBase64String(message); // 图片base64 log.info(str.substring(0, 100)); }
到此这篇关于SpringBoot实现websocket服务端及客户端的文章就介绍到这了,更多相关SpringBoot websocket服务端及客户端内容请搜索China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程China编程(www.chinasem.cn)!
这篇关于SpringBoot实现websocket服务端及客户端的详细过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!