springboot学习(七十二) webflux中使用WebSocket实现服务端和客户端

本文主要是介绍springboot学习(七十二) webflux中使用WebSocket实现服务端和客户端,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 前言
  • 一、服务端
    • 1、编写一个WebSocket Session封装类
    • 2、编写最重要的handler
    • 3、附JsonUtils
    • 4、Spring注册
  • 二、客户端
    • 1、客户端连接类


前言

springboot中不使用Servlet,而是使用WebFlux的情况下,可以使用其自带的websocket实现websocket的功能,网上大部分例子都只能实现一个最基本的DEMO,不能实现服务端在Handler外部推送消息到客户端。下面是我的解决办法。

一、服务端

1、编写一个WebSocket Session封装类

package cn.ac.iscas.dmo.gateway.admin.ws;import cn.ac.iscas.dmo.gateway.admin.utils.JsonUtils;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.FluxSink;import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;/*** @author zhuquanwen* @version 1.0* @date 2022/4/13 14:19* @since jdk11*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Slf4j
public class WebSocketWrap {public static final Map<String, WebSocketWrap> SENDER = new ConcurrentHashMap<>();private String id;private WebSocketSession session;private FluxSink<WebSocketMessage> sink;/*** 发送广播消息** @param obj 消息对象,会被转为JSON* @return void* @date 2022/4/13* @since jdk11*/public static void broadcastText(Object obj) {SENDER.values().forEach(wrap -> wrap.sendText(obj));}public void sendText(Object obj) {sink.next(session.textMessage(JsonUtils.toJson(obj)));}static {purge();}/*** 清理不可用的SESSION* @since jdk11* @date 2022/4/13* @return void*/@SuppressWarnings("AlibabaThreadPoolCreation")public static void purge() {Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {new ArrayList<>(SENDER.values()).forEach(wrap -> {if (!wrap.getSession().isOpen()) {log.warn(String.format("用户ID: [%s] 的session: [%s] 已经关闭,将被清理", wrap.getId(), wrap.getSession().getId()));SENDER.remove(wrap.getId());wrap.getSession().close();}});}, 30, 30, TimeUnit.SECONDS);}}

2、编写最重要的handler

要注意的有两点:

  • 认证问题
    这里我从客户端连接的URL中获取了ID参数,做了一个简单的验证,可以根据情况修改,设置可以从header中获取参数,比如可以校验TOKEN。
  • 输出输出封装
    网上大部分例子没有输入输出的封装,都是直接在handle中返回给客户端一个消息,这显然不能适用到项目中,因为大部分情况要做服务端向客户端推送的功能,需要在业务处理中做推送。按下面修改后使用第1步的WebSocketWrap就可以实现在任意位置推送数据了。
package cn.ac.iscas.dmo.gateway.admin.ws;import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;/*** @author zhuquanwen* @version 1.0* @date 2022/4/13 13:37* @since jdk11*/
@Component
@Slf4j
public class AdminWebSocketHandler implements WebSocketHandler {private static final String CONNECT = "connect:";@Overridepublic Mono<Void> handle(WebSocketSession session) {// 校验权限HandshakeInfo handshakeInfo = session.getHandshakeInfo();Map<String, String> queryMap = getQueryMap(handshakeInfo.getUri().getQuery());String id = queryMap.get("id");// 暂时只校验了是否携带了ID,以后可以改为校验TOKENif (StringUtils.isNotBlank(id)) {// 输入输出封装Mono<Void> input = session.receive().doOnNext(message -> this.messageHandle(session, message)).log().doOnError(throwable -> log.error("webSocket发生异常:" + throwable)).doOnComplete(() -> log.info("webSocket结束")).then();Mono<Void> output = session.send(Flux.create(sink -> WebSocketWrap.SENDER.put(id, new WebSocketWrap(id, session, sink))));return Mono.zip(input, output).then();} else {return session.close(new CloseStatus(1016, "连接未通过校验,即将关闭连接"));}}@SuppressWarnings(value = "unused")private void messageHandle(WebSocketSession session, WebSocketMessage message) {// 接收客户端请求的处理回调switch (message.getType()) {case TEXT:case BINARY:case PONG:case PING:break;default:}}private Map<String, String> getQueryMap(String queryStr) {Map<String, String> queryMap = new HashMap<>(4);if (!StringUtils.isEmpty(queryStr)) {String[] queryParam = queryStr.split("&");Arrays.stream(queryParam).forEach(s -> {String[] kv = s.split("=", 2);String value = kv.length == 2 ? kv[1] : "";queryMap.put(kv[0], value);});}return queryMap;}}

3、附JsonUtils

package cn.ac.iscas.dmo.gateway.admin.utils;import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;import java.io.*;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Map;/*** JSON工具类* @author zhuquanwen* @version 1.0* @date 2022/4/7 10:03* @since jdk11*/
@SuppressWarnings(value = "unused")
public class JsonUtils {private static volatile ObjectMapper mapper;/*** 对象转json** @param object 对象* @return String JSON串*/public static String toJson(Object object) {try {return getMapper().writeValueAsString(object);} catch (JsonProcessingException e) {e.printStackTrace();throw new RuntimeException(e);
//			throw new DataSongException(Status.PARAM_ERROR, String.format("object to json error: [%s]",DataSongExceptionUtils.getExceptionInfo(e)));}
//        return null;}public static <T> T fromJson(String json, Class<T> classOfT) {try {return getMapper().readValue(json, classOfT);} catch (IOException e) {e.printStackTrace();throw new RuntimeException(e);}}/*** @param json JSON字符串* @param typeReference 类型* @return T 转换后的对象*/@SuppressWarnings({"rawtypes", "unchecked"})public static <T> T fromJson(String json, TypeReference typeReference) {try {return (T) getMapper().readValue(json, typeReference);} catch (IOException e) {e.printStackTrace();throw new RuntimeException(e);}
//        return null;}/*** 定义一个嵌套的泛型、子泛型*/@SuppressWarnings("rawtypes")static class ParametricTypes {/*** 泛型1*/private Class clazz;/*** 子泛型*/private List<ParametricTypes> subClazz;public Class getClazz() {return clazz;}public void setClazz(Class clazz) {this.clazz = clazz;}public List<ParametricTypes> getSubClazz() {return subClazz;}public void setSubClazz(List<ParametricTypes> subClazz) {this.subClazz = subClazz;}}@SuppressWarnings(value = {"AliDeprecation", "deprecation"})private static ObjectMapper getMapper() {synchronized (JsonUtils.class) {if (mapper == null) {synchronized (JsonUtils.class) {mapper = new ObjectMapper();//为null的不输出mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);//大小写问题mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);//设置等同于@JsonIgnoreProperties(ignoreUnknown = true)mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);//防止转为json是首字母大写的属性会出现两次mapper.setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE);mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);//设置JSON时间格式SimpleDateFormat myDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");mapper.setDateFormat(myDateFormat);}}}return mapper;}/*** 单位缩进字符串。*/private static final String SPACE = "\t";/*** 返回格式化JSON字符串。** @param json 未格式化的JSON字符串。* @return 格式化的JSON字符串。*/public static String formatJson(String json) {StringBuilder result = new StringBuilder();int length = json.length();int number = 0;char key;//遍历输入字符串。for (int i = 0; i < length; i++) {//1、获取当前字符。key = json.charAt(i);//2、如果当前字符是前方括号、前花括号做如下处理:if ((key == '[') || (key == '{')) {//(1)如果前面还有字符,并且字符为“:”,打印:换行和缩进字符字符串。if ((i - 1 > 0) && (json.charAt(i - 1) == ':')) {result.append('\n');result.append(indent(number));}//(2)打印:当前字符。result.append(key);//(3)前方括号、前花括号,的后面必须换行。打印:换行。result.append('\n');//(4)每出现一次前方括号、前花括号;缩进次数增加一次。打印:新行缩进。number++;result.append(indent(number));//(5)进行下一次循环。continue;}//3、如果当前字符是后方括号、后花括号做如下处理:if ((key == ']') || (key == '}')) {//(1)后方括号、后花括号,的前面必须换行。打印:换行。result.append('\n');//(2)每出现一次后方括号、后花括号;缩进次数减少一次。打印:缩进。number--;result.append(indent(number));//(3)打印:当前字符。result.append(key);//(4)如果当前字符后面还有字符,并且字符不为“,”,打印:换行。if (((i + 1) < length) && (json.charAt(i + 1) != ',')) {result.append('\n');}//(5)继续下一次循环。continue;}//4、如果当前字符是逗号。逗号后面换行,并缩进,不改变缩进次数。if ((key == ',')) {result.append(key);result.append('\n');result.append(indent(number));continue;}//5、打印:当前字符。result.append(key);}return result.toString();}/*** 返回指定次数的缩进字符串。每一次缩进三个空格,即SPACE。** @param number 缩进次数。* @return 指定缩进次数的字符串。*/private static String indent(int number) {return SPACE.repeat(Math.max(0, number));}/*** 校验一个JSON串是否为JSON结构,必须满足Map或集合结构*/public static boolean validateJson(String json) {try {JsonUtils.fromJson(json, Map.class);return true;} catch (Exception ignored) {}try {JsonUtils.fromJson(json, List.class);return true;} catch (Exception ignored) {}return false;}/*** 向JSON中追加参数* 注意:只支持Map类型的JSON** @param json 原始JSON字符串。* @param data 要添加的数据,数组类型,数组里两个值,第一个值为key,第二个值为value* @return 追加后的JSON字符串。*/@SuppressWarnings({"rawtypes", "unchecked"})public static String appendJson(String json, Object[]... data) throws RuntimeException {Map map;try {map = JsonUtils.fromJson(json, Map.class);} catch (Exception e) {throw new RuntimeException("JSON格式错误,只支持Map格式的JSON", e);}if (data != null) {for (Object[] datum : data) {if (datum == null || datum.length != 2) {throw new RuntimeException("传入的追加格式错误");}map.put(datum[0], datum[1]);}}return toJson(map);}/*** 嵌套一层泛型序列化* add by zqw*/@SuppressWarnings("rawtypes")public static <T> T fromJson(String json, Class mainClass, Class subClass) {try {JavaType javaType = getMapper().getTypeFactory().constructParametricType(mainClass, subClass);return getMapper().readValue(json, javaType);} catch (IOException e) {e.printStackTrace();throw new RuntimeException(e);}}/*** 嵌套泛型序列化* add by zqw*/public static <T> T fromJson(String json, ParametricTypes parametricTypes) {try {
//            getMapper().getTypeFactory().constructParametricType()JavaType javaType = getJavaType(parametricTypes);return getMapper().readValue(json, javaType);} catch (IOException e) {e.printStackTrace();throw new RuntimeException(e);}}@SuppressWarnings("rawtypes")private static JavaType getJavaType(ParametricTypes parametricTypes) {JavaType javaType;Class clazz = parametricTypes.getClazz();List<ParametricTypes> subClazz = parametricTypes.getSubClazz();if (subClazz == null || subClazz.size() == 0) {Class[] classes = new Class[0];javaType = getMapper().getTypeFactory().constructParametricType(clazz, classes);} else {JavaType[] javaTypes = new JavaType[subClazz.size()];for (int i = 0; i < subClazz.size(); i++) {JavaType jt = getJavaType(subClazz.get(i));javaTypes[i] = jt;}javaType = getMapper().getTypeFactory().constructParametricType(clazz, javaTypes);}return javaType;}/*** 对象直接序列化为字节数组*/public static byte[] toBytes(Object object) {try {return getMapper().writeValueAsBytes(object);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}/*** 对象直接序列化到输出流*/public static void toOutputStream(OutputStream os, Object object) {try {getMapper().writeValue(os, object);} catch (IOException e) {throw new RuntimeException(e);}}/*** 对象直接序列化到文件*/public static void toFile(File file, Object object) {try {getMapper().writeValue(file, object);} catch (IOException e) {throw new RuntimeException(e);}}/*** 从输入流读取JSON并转化*/public static <T> T fromJson(InputStream is, Class<T> classOfT) {try {return getMapper().readValue(is, classOfT);} catch (IOException e) {throw new RuntimeException(e);}}/*** 从输入流读取JSON并转化*/public static <T> T fromJson(InputStream is, TypeReference<T> typeReference) {try {return getMapper().readValue(is, typeReference);} catch (IOException e) {throw new RuntimeException(e);}}/*** 从输入流读取JSON并转化*/public static <T> T fromJson(InputStream is, ParametricTypes parametricTypes) {try {return getMapper().readValue(is, getJavaType(parametricTypes));} catch (IOException e) {throw new RuntimeException(e);}}/*** 从输入流读取JSON并转化*/public static <T> T fromJson(Reader reader, Class<T> classOfT) {try {return getMapper().readValue(reader, classOfT);} catch (IOException e) {throw new RuntimeException(e);}}/*** 从输入流读取JSON并转化*/public static <T> T fromJson(Reader reader, TypeReference<T> typeReference) {try {return getMapper().readValue(reader, typeReference);} catch (IOException e) {throw new RuntimeException(e);}}/*** 从输入流读取JSON并转化*/public static <T> T fromJson(Reader reader, ParametricTypes parametricTypes) {try {return getMapper().readValue(reader, getJavaType(parametricTypes));} catch (IOException e) {throw new RuntimeException(e);}}/*** 从文件读取JSON并转化*/public static <T> T fromJson(File file, Class<T> classOfT) {try {return getMapper().readValue(file, classOfT);} catch (IOException e) {throw new RuntimeException(e);}}/*** 从文件读取JSON并转化*/public static <T> T fromJson(File file, TypeReference<T> typeReference) {try {return getMapper().readValue(file, typeReference);} catch (IOException e) {throw new RuntimeException(e);}}/*** 从文件读取JSON并转化*/public static <T> T fromJson(File file, ParametricTypes parametricTypes) {try {return getMapper().readValue(file, getJavaType(parametricTypes));} catch (IOException e) {throw new RuntimeException(e);}}/*** 从字节数组读取JSON并转化*/public static <T> T fromJson(byte[] bytes, Class<T> classOfT) {try {return getMapper().readValue(bytes, classOfT);} catch (IOException e) {throw new RuntimeException(e);}}/*** 从字节数组读取JSON并转化*/public static <T> T fromJson(byte[] bytes, TypeReference<T> typeReference) {try {return getMapper().readValue(bytes, typeReference);} catch (IOException e) {throw new RuntimeException(e);}}/*** 从字节数组读取JSON并转化*/public static <T> T fromJson(byte[] bytes, ParametricTypes parametricTypes) {try {return getMapper().readValue(bytes, getJavaType(parametricTypes));} catch (IOException e) {throw new RuntimeException(e);}}}

4、Spring注册

@Configuration
@SuppressWarnings(value = "unused")
public class WebSocketConfiguration {@Beanpublic HandlerMapping webSocketMapping(final AdminWebSocketHandler handler) {final Map<String, WebSocketHandler> map = new HashMap<>();map.put("/local/ws", handler);final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);mapping.setUrlMap(map);return mapping;}@Beanpublic WebSocketHandlerAdapter handlerAdapter() {return new WebSocketHandlerAdapter();}
}

二、客户端

客户端当然可以使用JS,这里我还是使用的WebFlux实现。

1、客户端连接类

与服务端类似,直接调用connectAdminWs()函数就行了

package cn.ac.iscas.dmo.gateway.core.ws;import cn.ac.iscas.dmo.gateway.admin.client.model.SelectorChanged;
import cn.ac.iscas.dmo.gateway.admin.utils.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;import java.net.URI;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** @author zhuquanwen* @version 1.0* @date 2022/4/13 15:11* @since jdk11*/
@Slf4j
@RequiredArgsConstructor
@SuppressWarnings(value = "unused")
public class AdminWebSocketClient {private final GatewayConfig gatewayConfig;private WsWrap wsWrap;public void connectAdminWs() {try {log.info("发送WebSocket连接");WebSocketClient client = new ReactorNettyWebSocketClient();String prefix = gatewayConfig.getAdminProps().getUrl();prefix = prefix.replace("http://", "ws://").replace("https://", "wss://");client.execute(URI.create(prefix + "/local/ws?id=" + UUID.randomUUID()), session -> {Mono<Void> input = session.receive().doOnNext(webSocketMessage -> messageHandle(session, webSocketMessage)).doOnError(throwable -> log.error("发生异常:" + throwable)).doOnComplete(() -> log.info("WebSocketClient结束")).then();Mono<Void> output = session.send(Flux.create(sink -> wsWrap = new WsWrap(session, sink)));return Mono.zip(input, output).then().doFinally(signalType -> {log.error("WebSocket连接断开,5秒后发起重连");try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}// 重新连接connectAdminWs();});}).onTerminateDetach().doOnError(throwable -> log.error("发生异常:" + throwable)).subscribe(aVoid -> {});} catch (Throwable e) {log.error("webSocket连接出错,5秒后发起重连", e);try {wsWrap.getSession().close();} catch (Exception ignore) {}try {TimeUnit.SECONDS.sleep(5);} catch (Exception ignore) {}//重连connectAdminWs();}}@Data@AllArgsConstructorprivate static class WsWrap {private WebSocketSession session;private FluxSink<WebSocketMessage> sink;public void sendText(Object obj) {sink.next(session.textMessage(JsonUtils.toJson(obj)));}}private void messageHandle(WebSocketSession session, WebSocketMessage message) {switch (message.getType()) {case TEXT: {String text = message.getPayloadAsText();// todo 业务处理} catch (Exception e) {log.warn("无法处理的消息", e);}break;}case BINARY:case PING:case PONG:break;default:}}}

这篇关于springboot学习(七十二) webflux中使用WebSocket实现服务端和客户端的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1077571

相关文章

Java实现检查多个时间段是否有重合

《Java实现检查多个时间段是否有重合》这篇文章主要为大家详细介绍了如何使用Java实现检查多个时间段是否有重合,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录流程概述步骤详解China编程步骤1:定义时间段类步骤2:添加时间段步骤3:检查时间段是否有重合步骤4:输出结果示例代码结语作

Java中String字符串使用避坑指南

《Java中String字符串使用避坑指南》Java中的String字符串是我们日常编程中用得最多的类之一,看似简单的String使用,却隐藏着不少“坑”,如果不注意,可能会导致性能问题、意外的错误容... 目录8个避坑点如下:1. 字符串的不可变性:每次修改都创建新对象2. 使用 == 比较字符串,陷阱满

Java判断多个时间段是否重合的方法小结

《Java判断多个时间段是否重合的方法小结》这篇文章主要为大家详细介绍了Java中判断多个时间段是否重合的方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录判断多个时间段是否有间隔判断时间段集合是否与某时间段重合判断多个时间段是否有间隔实体类内容public class D

Python使用国内镜像加速pip安装的方法讲解

《Python使用国内镜像加速pip安装的方法讲解》在Python开发中,pip是一个非常重要的工具,用于安装和管理Python的第三方库,然而,在国内使用pip安装依赖时,往往会因为网络问题而导致速... 目录一、pip 工具简介1. 什么是 pip?2. 什么是 -i 参数?二、国内镜像源的选择三、如何

使用C++实现链表元素的反转

《使用C++实现链表元素的反转》反转链表是链表操作中一个经典的问题,也是面试中常见的考题,本文将从思路到实现一步步地讲解如何实现链表的反转,帮助初学者理解这一操作,我们将使用C++代码演示具体实现,同... 目录问题定义思路分析代码实现带头节点的链表代码讲解其他实现方式时间和空间复杂度分析总结问题定义给定

IDEA编译报错“java: 常量字符串过长”的原因及解决方法

《IDEA编译报错“java:常量字符串过长”的原因及解决方法》今天在开发过程中,由于尝试将一个文件的Base64字符串设置为常量,结果导致IDEA编译的时候出现了如下报错java:常量字符串过长,... 目录一、问题描述二、问题原因2.1 理论角度2.2 源码角度三、解决方案解决方案①:StringBui

Linux使用nload监控网络流量的方法

《Linux使用nload监控网络流量的方法》Linux中的nload命令是一个用于实时监控网络流量的工具,它提供了传入和传出流量的可视化表示,帮助用户一目了然地了解网络活动,本文给大家介绍了Linu... 目录简介安装示例用法基础用法指定网络接口限制显示特定流量类型指定刷新率设置流量速率的显示单位监控多个

Java覆盖第三方jar包中的某一个类的实现方法

《Java覆盖第三方jar包中的某一个类的实现方法》在我们日常的开发中,经常需要使用第三方的jar包,有时候我们会发现第三方的jar包中的某一个类有问题,或者我们需要定制化修改其中的逻辑,那么应该如何... 目录一、需求描述二、示例描述三、操作步骤四、验证结果五、实现原理一、需求描述需求描述如下:需要在

Java中ArrayList和LinkedList有什么区别举例详解

《Java中ArrayList和LinkedList有什么区别举例详解》:本文主要介绍Java中ArrayList和LinkedList区别的相关资料,包括数据结构特性、核心操作性能、内存与GC影... 目录一、底层数据结构二、核心操作性能对比三、内存与 GC 影响四、扩容机制五、线程安全与并发方案六、工程

JavaScript中的reduce方法执行过程、使用场景及进阶用法

《JavaScript中的reduce方法执行过程、使用场景及进阶用法》:本文主要介绍JavaScript中的reduce方法执行过程、使用场景及进阶用法的相关资料,reduce是JavaScri... 目录1. 什么是reduce2. reduce语法2.1 语法2.2 参数说明3. reduce执行过程