本文主要是介绍Spring boot 项目作为客户端调用 服务端websocket,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- java客户端请求websocket
- Spring boot 导入包
- 客户端调用方法
- 测试执行方法
- connectWebSocket
- HandshakeMessage
- sendHandshake
- WebSocketConfig.queue.take
- 方法对应实体类
- 配置 yaml 资源
- WebSocketConfig 配置类
- 注入配置websocketUrl:
- LinkedBlockingQueue
- LinkedBlockingQueue的特点
- connectWebSocket 连接
- URI
- WebSocketClientHandler
- connectBlocking方法
- connectBlocking方法有两个参数:
- sendMessage
- close
- WebSocketClientHandler 配置类
- onOpen 方法
- onMessage 方法
- onClose 方法
- onError 方法
- java服务端websocket
- WebSocketConfig 配置类
- WebSocketHandler 监听类
java客户端请求websocket
Spring boot 导入包
pom.xml 导入
客户端调用方法
测试执行方法
-
connectWebSocket
- 连接websocket 客户端,并携带过期时间等参数
-
HandshakeMessage
- 发送消息对象
-
sendHandshake
- 发送消息
-
WebSocketConfig.queue.take
- 队列信息,等待数据返回,并消费,获取websocket 返回的消息数据
public String test() {try {WebSocketConfig.connectWebSocket();HandshakeMessage handshakeMessage = new HandshakeMessage();handshakeMessage.setMessage("test");handshakeMessage.setClientId(UUID.randomUUID().toString());handshakeMessage.setType("handshake");WebSocketConfig.sendHandshake(handshakeMessage);String take = WebSocketConfig.queue.take();System.out.println("test:" + take);WebSocketConfig.close();}catch (InterruptedException ex){System.out.println("连接异常"+ ex.getMessage());}return null;
}
方法对应实体类
public class HandshakeMessage {private String type;private String clientId;private String message;public String getType() {return type;}public void setType(String type) {this.type = type;}public String getClientId() {return clientId;}public void setClientId(String clientId) {this.clientId = clientId;}public String getMessage() {return message;}public void setMessage(String message) {this.message = message;}}
配置 yaml 资源
websocket:url: ws://localhost:8080/websocket
WebSocketConfig 配置类
注入配置websocketUrl:
使用@Value注解将websocket.url注入到类的私有成员变量websocketUrl中。确保在类初始化时就设置好静态变量。
LinkedBlockingQueue
LinkedBlockingQueue是Java并发集合框架中的一种线程安全的队列实现,它继承自BlockingQueue接口。LinkedBlockingQueue使用链表结构来存储元素,并且提供了阻塞操作,可以在队列为空或满时自动阻塞生产者或消费者线程,直到队列变为非空或非满。
-
LinkedBlockingQueue的特点
- 线程安全性:LinkedBlockingQueue是线程安全的,可以在多线程环境中安全地使用。
- 阻塞操作:提供了put和take等阻塞方法,当队列满时调用put会阻塞,当队列为空时调用take会阻塞。
- 容量可配置:LinkedBlockingQueue可以被初始化为一个固定容量的队列,也可以是一个无界队列(默认情况下,如果未指定容量,则容量为Integer.MAX_VALUE)。
connectWebSocket 连接
-
URI
- URI类可以帮助你处理和解析Web地址,并确保这些地址格式正确。
-
WebSocketClientHandler
- 继承 WebSocketClient 类 ,实现一些 websocket 方法重写
-
connectBlocking方法
sendMessage
sendMessage方法用于向WebSocket服务器发送文本消息
send 方法
- 检查WebSocket连接状态:
- 如果WebSocket连接尚未打开(!this.isOpen()),则抛出WebsocketNotConnectedException异常。这是因为只有在连接建立后才能发送数据。
- 检查参数有效性:
- 如果传入的frames参数为null,则抛出IllegalArgumentException异常。这是为了确保传入的数据是有效的。
- 准备发送的数据帧:
- 创建一个新的ArrayList来存储即将发送的二进制帧。
- 遍历frames集合中的每一个Framedata对象。
- 对于每一个Framedata对象,调用draft.createBinaryFrame(f)方法将其转换为ByteBuffer,然后添加到outgoingFrames列表中。
- 在遍历过程中,通过日志记录每一步操作的信息(如果启用了日志的trace级别)。
- 发送数据帧:
- 最后,调用write方法,将准备好的outgoingFrames列表作为参数传递进去,完成实际的数据发送操作。
public void send(String text) {if (text == null) {throw new IllegalArgumentException("Cannot send 'null' data to a WebSocketImpl.");} else {this.send((Collection)this.draft.createFrames(text, this.role == Role.CLIENT));} } private void send(Collection<Framedata> frames) {if (!this.isOpen()) {throw new WebsocketNotConnectedException();} else if (frames == null) {throw new IllegalArgumentException();} else {ArrayList<ByteBuffer> outgoingFrames = new ArrayList();Iterator var3 = frames.iterator();while(var3.hasNext()) {Framedata f = (Framedata)var3.next();this.log.trace("send frame: {}", f);outgoingFrames.add(this.draft.createBinaryFrame(f));}this.write((List)outgoingFrames);} }
close
关闭连接
package com.dog.websocket;import com.alibaba.fastjson2.JSONObject;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;@Component
public class WebSocketConfig {private static String websocketUrl;public static LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(1000);@Value("${websocket.url}")public void setWebsocketUrl(String websocketUrl){WebSocketConfig.websocketUrl = websocketUrl;}private static WebSocketClient client;public static void connectWebSocket(){try{URI uri = new URI(websocketUrl);client = new WebSocketClientHandler(uri);client.connectBlocking(4000, TimeUnit.MINUTES);}catch (URISyntaxException|InterruptedException ex){ex.printStackTrace();throw new RuntimeException("websocket 连接异常");}}/*** 直接发送信息* @param sendMessage*/public static void sendMessage(String sendMessage){client.send(sendMessage);}public static void sendHandshake(HandshakeMessage handshakeMessage){String sendMessage = JSONObject.toJSONString(handshakeMessage);System.out.println(sendMessage);client.send(sendMessage);}public void sendByteMessage(byte[] bytes){client.send(bytes);}/*** 连接关闭*/public static void close(){if (client != null && client.isOpen()) {client.close();}}
}
WebSocketClientHandler 配置类
继承WebSocketClient 并重写了几个关键的方法来处理WebSocket连接的不同生命周期事件
onOpen 方法
@Override
public void onOpen(ServerHandshake serverHandshake) {System.out.println("连接websocket 状态:"+ serverHandshake.getHttpStatus());
}
当WebSocket连接成功建立时,这个方法会被调用。它打印出连接的状态码。
onMessage 方法
@Override
public void onMessage(String s) {System.out.println("message: "+ s);try {// 尝试在一定时间内将消息放入队列if (!queue.offer(s, 10, TimeUnit.SECONDS)) {System.err.println("无法在规定时间内将消息放入队列");}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("向队列中添加消息时被中断");}
}
当从WebSocket服务器接收到消息时,这个方法会被调用。它首先打印接收到的消息,然后尝试将消息放入WebSocketConfig.queue队列中。如果在向队列中添加消息时发生中断异常,则恢复中断状态并打印错误信息。
onClose 方法
@Override
public void onClose(int i, String s, boolean b) {System.out.println("WebSocket连接已关闭: " + s);
}
当WebSocket连接关闭时,这个方法会被调用。它打印出关闭连接的原因。
onError 方法
@Override
public void onError(Exception ex) {ex.printStackTrace();System.err.println("WebSocket发生错误: " + ex.getMessage());
}
当WebSocket连接发生错误时,这个方法会被调用。它打印出错误信息及其堆栈跟踪。
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.stereotype.Component;import java.net.URI;
import java.util.concurrent.TimeUnit;import static com.dog.websocket.WebSocketConfig.queue;public class WebSocketClientHandler extends WebSocketClient {public WebSocketClientHandler(URI serverUri) {super(serverUri);}@Overridepublic void onOpen(ServerHandshake serverHandshake) {System.out.println("连接websocket 状态:"+ serverHandshake.getHttpStatus());}@Overridepublic void onMessage(String s) {System.out.println("message: "+ s);try {// 尝试在一定时间内将消息放入队列if (!queue.offer(s, 10, TimeUnit.SECONDS)) {System.err.println("无法在规定时间内将消息放入队列");}} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("向队列中添加消息时被中断");}}@Overridepublic void onClose(int i, String s, boolean b) {System.out.println("WebSocket连接已关闭: " + s);}@Overridepublic void onError(Exception ex) {ex.printStackTrace();System.err.println("WebSocket发生错误: " + ex.getMessage());}
}
上面只是根据所需要自行调整
java服务端websocket
在上一篇博客已做详细简绍,不做补充
WebSocketConfig 配置类
package com.ruoyi.common.utils.socket;import com.ruoyi.common.utils.socket.handler.WebSocketHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {private final WebSocketHandler webSocketHandler;public WebSocketConfig(WebSocketHandler webSocketHandler) {this.webSocketHandler = webSocketHandler;}@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(webSocketHandler, "/websocket").setAllowedOrigins("*");}
}
WebSocketHandler 监听类
package com.ruoyi.common.utils.socket.handler;import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ruoyi.common.utils.socket.HandshakeMessage;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class WebSocketHandler extends TextWebSocketHandler {private static final Map<String, WebSocketSession> clientSessions = new ConcurrentHashMap<>();private static final ObjectMapper objectMapper = new ObjectMapper();@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {super.afterConnectionEstablished(session);String sessionId = session.getId();System.out.println("WebSocket connection established with session ID: " + sessionId);}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {String payload = message.getPayload();HandshakeMessage handshakeMessage = objectMapper.readValue(payload, HandshakeMessage.class);if ("handshake".equals(handshakeMessage.getType())) {String clientId = handshakeMessage.getClientId();String sessionId = session.getId();// 存储clientId与sessionId的映射关系clientSessions.put(clientId, session);handshakeMessage.setMessage("success");// 可以选择回复客户端确认握手成功的消息session.sendMessage(new TextMessage(JSON.toJSONString(handshakeMessage)));}}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {super.afterConnectionClosed(session, status);String sessionId = session.getId();System.out.println("WebSocket connection closed with session ID: " + sessionId);// 移除会话clientSessions.values().removeIf(s -> s.getId().equals(sessionId));}public void sendMessageToClient(String clientId, String message) {WebSocketSession session = clientSessions.get(clientId);if (session != null && session.isOpen()) {try {session.sendMessage(new TextMessage(message));} catch (Exception e) {e.printStackTrace();}}}
}
这篇关于Spring boot 项目作为客户端调用 服务端websocket的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!