springboot 优雅使用函数式编程处理 websocket @OnMessage 消息

本文主要是介绍springboot 优雅使用函数式编程处理 websocket @OnMessage 消息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景

现在大多业务功能使用 socket.io实现长连接,但是部分第三方设备对接 只支持基础的websocket。
spring中使用基础的websocket, @OnMessage 收到消息,对消息的处理,if else 将会繁琐,难以维护。

本文仅介绍了如何使用enum枚举、java.util.function jdk8 函数式接口,实现消息的处理。

websocket 定义JSON 数据交换格式

本文使用的 示例格式:

//连接成功
{"cmd":"connect","sn":"A7888","data":{...}}
//设置人员
{"cmd":"setUser","data":{"userId":"1"}}
//控制设备 --多层级 的格式,第二层里面解析 仍可按照同样的方式来处理
{"cmd":"to_client","data":{"type":"openDoor","value":"ON"}}

springboot 集成websocket

pom.xml 依赖
        <!-- spring websocket--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><!-- spring websocket启动异常、排除spring-boot-starter-tomcat--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-tomcat</artifactId></exclusion></exclusions></dependency>       
定义WebSocketConfig
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configuration
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
定义 @ServerEndpoint
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.OnOpen;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;/*** 定义websocket端点*/
@Slf4j
@ServerEndpoint(value = "/socket/device")
@Component
public class DeviceServerEndpoint {/*** 记录当前在线连接数*/private static AtomicInteger onlineCount = new AtomicInteger(0);/*** 连接的对象*/public static final Map<String, Session> clientMap = new ConcurrentHashMap<>();/*** 收到客户端消息** @param message 客户端发送过来的消息* @throws*/@OnMessagepublic void onMessage(String message, Session session) {try {DeviceMsg deviceMsg = JSON.parseObject(message, DeviceMsg.class);if (deviceMsg != null && deviceMsg.getCmd() != null) {// jdk8 函数式处理消息deviceMsg.getCmd().consumer.accept(session, message);} else {log.info("无法自动处理,客户端消息:{}", message);}} catch (Exception e) {log.error("消息处理失败", session.getId(), message);e.printStackTrace();}}/*** 连接建立成功*/@OnOpenpublic void onOpen(Session session) {onlineCount.incrementAndGet(); // 在线数加1log.info("有新连接加入:{},当前在线数为:{}", session.getId(), onlineCount.get());}/*** 连接关闭*/@OnClosepublic void onClose(Session session) {onlineCount.decrementAndGet(); // 在线数减1log.info("有一连接关闭:{},当前在线数为:{}", session.getId(), onlineCount.get());}@OnErrorpublic void onError(Session session, Throwable error) {onlineCount.decrementAndGet(); // 在线数减1error.printStackTrace();}/*** 测试 控制开锁*/public static void openDoor() {//所有客户端发送消息clientMap.forEach((id, session) -> {session.getBasicRemote().sendText("ON");}}
deviceMsg实体类
import lombok.Data;
import java.io.Serializable;@Data
public class DeviceMsg implements Serializable {/*** 指令*/Cmd cmd;/*** 数据块*/JSONObject data;
}
Cmd 核心消息处理 枚举类

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import javax.websocket.Session;
import java.util.function.BiConsumer;@Getter
@Slf4j
@AllArgsConstructor
public enum Cmd {connect("设备连接成功", (Session session, String msg) -> {//设备端连接成功,发送设置端消息,将Session记录起来DeviceServerEndpoint.clientMap.put(session.getId(), session);}),ping("设备心跳", (Session session, String msg) -> {session.getBasicRemote().sendText("pong");}),setUser("配置用户", (Session session, String msg) -> {//拿到msg 转换对象或者其他操作session.getBasicRemote().sendText("ok");}),to_client("客户端消息", (Session session, String msg) -> {		try {String string = JSON.parseObject(msg).getJSONObject("data").getString("type");Client clientCmd = Client.valueOf(string);clientCmd.consumer.accept(session, msg);} catch (Exception e) {log.info("to_client客户端消息,无法自动处理:{}", msg);}});/*** 描述*/String desc;/*** 处理*/BiConsumer<Session, String> consumer;
}
Client 消息处理枚举类

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;import javax.websocket.Session;
import java.util.function.BiConsumer;@Getter
@Slf4j
@AllArgsConstructor
public enum Client{openDoor("客户端控制", (Session session, String msg) -> {//测试 连接成功 直接 开门DeviceServerEndpoint.openDoor();}),otherCmd("其他指令", (Session session, String msg) -> {session.getBasicRemote().sendText("ok");});/*** 描述*/String desc;/*** 处理*/BiConsumer<Session, String> consumer;
}

JDK8常用函数式编程接口介绍:

  • Function<T, R>:接受一个类型为 T 的参数,返回类型为 R 的结果。常用方法包括 apply(T t)。
  • Predicate:接受一个类型为 T 的参数,返回一个布尔值。常用方法包括 test(T t)。
  • Consumer:接受一个类型为 T 的参数,没有返回值。常用方法包括 accept(T t)。
  • Supplier:不接受任何参数,返回一个类型为 T 的结果。常用方法包括 get()。
  • UnaryOperator:继承自 Function<T, R>,接受一个类型为 T 的参数,返回类型也为 T 的结果。常用方法包括 apply(T t)。
  • BinaryOperator:继承自 BiFunction<T, U, R>,接受两个类型为 T 的参数,返回类型也为 T 的结果。常用方法包括 apply(T t1, T t2)。
  • BiFunction<T, U, R>:接受两个参数,一个类型为 T,一个类型为 U,返回类型为 R 的结果。常用方法包括 apply(T t, U u)。
  • BiPredicate<T, U>:接受两个参数,一个类型为 T,一个类型为 U,返回一个布尔值。常用方法包括 test(T t, U u)。
  • BiConsumer<T, U> :用于接受两个参数,一个类型为 T,一个类型为 U,并且没有返回值。

函数式编程接口的引入,使得在 Java 中能够更方便地实现函数式编程的特性,如Lambda表达式和方法引用。它们可以用于各种场景,例如集合的处理、条件判断、函数的组合等。通过使用这些接口,可以编写更简洁、可读性更高的代码。

附:
WebSocket介绍

这篇关于springboot 优雅使用函数式编程处理 websocket @OnMessage 消息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/649514

相关文章

Spring Boot + MyBatis Plus 高效开发实战从入门到进阶优化(推荐)

《SpringBoot+MyBatisPlus高效开发实战从入门到进阶优化(推荐)》本文将详细介绍SpringBoot+MyBatisPlus的完整开发流程,并深入剖析分页查询、批量操作、动... 目录Spring Boot + MyBATis Plus 高效开发实战:从入门到进阶优化1. MyBatis

SpringCloud动态配置注解@RefreshScope与@Component的深度解析

《SpringCloud动态配置注解@RefreshScope与@Component的深度解析》在现代微服务架构中,动态配置管理是一个关键需求,本文将为大家介绍SpringCloud中相关的注解@Re... 目录引言1. @RefreshScope 的作用与原理1.1 什么是 @RefreshScope1.

Java并发编程必备之Synchronized关键字深入解析

《Java并发编程必备之Synchronized关键字深入解析》本文我们深入探索了Java中的Synchronized关键字,包括其互斥性和可重入性的特性,文章详细介绍了Synchronized的三种... 目录一、前言二、Synchronized关键字2.1 Synchronized的特性1. 互斥2.

Spring Boot 配置文件之类型、加载顺序与最佳实践记录

《SpringBoot配置文件之类型、加载顺序与最佳实践记录》SpringBoot的配置文件是灵活且强大的工具,通过合理的配置管理,可以让应用开发和部署更加高效,无论是简单的属性配置,还是复杂... 目录Spring Boot 配置文件详解一、Spring Boot 配置文件类型1.1 applicatio

使用Sentinel自定义返回和实现区分来源方式

《使用Sentinel自定义返回和实现区分来源方式》:本文主要介绍使用Sentinel自定义返回和实现区分来源方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Sentinel自定义返回和实现区分来源1. 自定义错误返回2. 实现区分来源总结Sentinel自定

Pandas使用SQLite3实战

《Pandas使用SQLite3实战》本文主要介绍了Pandas使用SQLite3实战,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学... 目录1 环境准备2 从 SQLite3VlfrWQzgt 读取数据到 DataFrame基础用法:读

JSON Web Token在登陆中的使用过程

《JSONWebToken在登陆中的使用过程》:本文主要介绍JSONWebToken在登陆中的使用过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录JWT 介绍微服务架构中的 JWT 使用结合微服务网关的 JWT 验证1. 用户登录,生成 JWT2. 自定义过滤

Java中StopWatch的使用示例详解

《Java中StopWatch的使用示例详解》stopWatch是org.springframework.util包下的一个工具类,使用它可直观的输出代码执行耗时,以及执行时间百分比,这篇文章主要介绍... 目录stopWatch 是org.springframework.util 包下的一个工具类,使用它

Java进行文件格式校验的方案详解

《Java进行文件格式校验的方案详解》这篇文章主要为大家详细介绍了Java中进行文件格式校验的相关方案,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、背景异常现象原因排查用户的无心之过二、解决方案Magandroidic Number判断主流检测库对比Tika的使用区分zip

Java实现时间与字符串互相转换详解

《Java实现时间与字符串互相转换详解》这篇文章主要为大家详细介绍了Java中实现时间与字符串互相转换的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、日期格式化为字符串(一)使用预定义格式(二)自定义格式二、字符串解析为日期(一)解析ISO格式字符串(二)解析自定义