本文主要是介绍WebSocket+Spring boot 构建一个完整消息服务,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1、添加依赖
compile project(":faas-spring-boot-starter-data-websocket")
2、定义WebSocketHandler Socket 服务入口(Header接收 jwt-token 同应用登录的Token(直接解决鉴权问题),然后定义请求的自定义参数,方便后续消息推送、支持群发、私发、模糊匹配)
@Component
@WebSocketMapping("/server")
@AllArgsConstructor
public class ServerHandler implements WebSocketHandler {private final static Logger LOG = LoggerFactory.getLogger(ServerHandler.class);private final ConcurrentHashMap<String, WebSocketSender> senders;private final WebsocketResourceProperties properties;private final WebsocketMessageApiService messageApiService;private final UserApiService userApiService;@Overridepublic Mono<Void> handle(WebSocketSession session) {HandshakeInfo info = session.getHandshakeInfo();String token = Validator.getToken(info);Terminal terminal = getTerminal(info);try {Terminal terminalByToken = Validator.verify(token, properties.getClients());String userId = terminalByToken.getUserId();//openId 转 userIdUserInfoResp infoResp = userApiService.get(UserGetReq.newBuilder().setLoginUserId(userId).build());userId = infoResp.getUserId();terminal.setUserId(userId);messageApiService.addOnlineUsers(userId);} catch (FibException e) {LOG.error("会话创建: code={}, message={}", e.getRespCode(), e.getRespMessage());session.close();return Mono.empty();}String userId = terminal.getUserId();String id = WebSocketSender.getSenderId(userId, terminal.getAppType(), terminal.getAppSn(), terminal.getTableCode());session.receive().doFinally(sig -> {LOG.info("会话终止: id={},name={}", id, sig.name());session.close();senders.remove(id);messageApiService.removeOnlineUsers(userId);}).subscribe(inMsg -> {LOG.info("接收消息: id={}, message={}", id, inMsg.getPayloadAsText());process(inMsg.getPayloadAsText());});return session.send(Flux.create(sink -> {senders.put(id, new WebSocketSender(session, sink));LOG.info("会话创建: id={},count={},senders={}", id, senders.size(), JSON.toJSONString(senders.keys()));}));}/*** 处理客户端消息** @param text*/private void process(String text) {}/*** 应用终端信息** @param info* @return*/private static Terminal getTerminal(HandshakeInfo info) {Terminal terminal = new Terminal();HttpHeaders headers = info.getHeaders();Map<String, String> queryParams = getQueryParams(info.getUri().getQuery());String appSn = headers.getFirst("appSn");String appType = headers.getFirst("appType");String tableCode = headers.getFirst("tableCode");if (StringUtils.isEmpty(appSn)) {appSn = queryParams.get("appSn");}if (StringUtils.isEmpty(tableCode)) {tableCode = queryParams.get("tableCode");}if (StringUtils.isEmpty(appType)) {appType = queryParams.get("appType");}terminal.setAppSn(appSn);terminal.setAppType(appType);terminal.setTableCode(tableCode);return terminal;}/*** 获取请求参数** @param queryStr* @return*/private static Map<String, String> getQueryParams(String queryStr) {Map<String, String> queryMap = new HashMap<>();if (!StringUtils.isEmpty(queryStr)) {String[] queryParam = queryStr.split("&");Arrays.stream(queryParam).forEach(s -> {String[] kv = s.split("=", 2);String value = kv.length == 2 ? kv[1] : "";queryMap.put(kv[0], value);});}return queryMap;}
}
3、Validator 辅助类(解析 Header中的token)
public class Validator {private static final Pattern authorizationPattern = Pattern.compile("^Bearer (?<token>[a-zA-Z0-9-._~+/]+=*)$", 2);private static String getTokenFromHeader(HttpHeaders headers) {String authorization = headers.getFirst("Authorization");if (StringUtils.isEmpty(authorization)) {return null;}if (StringUtils.startsWithIgnoreCase(authorization, "bearer")) {Matcher matcher = authorizationPattern.matcher(authorization);if (!matcher.matches()) {throw FibException.ofServerError("无效令牌[Authorization]");} else {return matcher.group("token");}} else {return null;}}public static String getToken(HandshakeInfo handshakeInfo) {String token = getTokenFromHeader(handshakeInfo.getHeaders());if (StringUtils.isEmpty(token)) {Map<String, String> queryParams = getQueryParams(handshakeInfo.getUri().getQuery());return queryParams.get("token");}return token;}/*** 验证token有效性并返回id** @param token* @return* @throws ParseException*/public static Terminal verify(String token, List<String> clients) {try {SignedJWT signedJWT = SignedJWT.parse(token);JWTClaimsSet claims = signedJWT.getJWTClaimsSet();String sub = claims.getStringClaim("sub");if (StringUtils.isEmpty(sub)) {throw FibException.ofServerError("无效令牌[Illegal sub]");}List<String> aud = claims.getStringListClaim("aud");if (aud == null || aud.isEmpty()) {throw FibException.ofServerError("无效令牌[Illegal aud]");}if (clients != null && !clients.isEmpty()) {Object audStr = aud.get(0);if (!clients.contains(audStr)) {throw FibException.ofServerError("无效令牌[Illegal aud]");}}String iss = claims.getStringClaim("iss");if (StringUtils.isEmpty(iss)) {throw FibException.ofServerError("无效令牌[Missing issuer]");}Date exp = claims.getDateClaim("exp");long now = System.currentTimeMillis();if (exp == null || now > exp.getTime()) {throw FibException.ofServerError("无效令牌[Expired]");}Terminal terminal = new Terminal();terminal.setUserId(sub);return terminal;} catch (ParseException e) {throw FibException.ofServerError("无效令牌[Unexpected token]");}}/*** 获取请求参数** @param queryStr* @return*/private static Map<String, String> getQueryParams(String queryStr) {Map<String, String> queryMap = new HashMap<>();if (!StringUtils.isEmpty(queryStr)) {String[] queryParam = queryStr.split("&");Arrays.stream(queryParam).forEach(s -> {String[] kv = s.split("=", 2);String value = kv.length == 2 ? kv[1] : "";queryMap.put(kv[0], value);});}return queryMap;}
}
4、WebSocketConfiguration
@Configuration
public class WebSocketConfiguration {@Beanpublic HandlerMapping webSocketMapping() {return new WebSocketMappingHandlerMapping();}@Beanpublic WebSocketHandlerAdapter handlerAdapter() {return new WebSocketHandlerAdapter();}@Beanpublic ConcurrentHashMap<String, WebSocketSender> senders() {return new ConcurrentHashMap<String, WebSocketSender>();}}
5、WebSocketMappingHandlerMapping
/*** Register WebSocket handlers annotated by @WebSocketMapping*/
public class WebSocketMappingHandlerMapping extends SimpleUrlHandlerMapping {private final Map<String, WebSocketHandler> handlers = new LinkedHashMap<>();@Overridepublic void initApplicationContext() throws BeansException {Map<String, Object> beanMap = obtainApplicationContext().getBeansWithAnnotation(WebSocketMapping.class);beanMap.values().forEach(bean -> {if (!(bean instanceof WebSocketHandler)) {throw new RuntimeException(String.format("Controller [%s] doesn't implement WebSocketHandler interface.", bean.getClass().getName()));}WebSocketMapping annotation = AnnotationUtils.getAnnotation(bean.getClass(), WebSocketMapping.class);handlers.put(Objects.requireNonNull(annotation).value(), (WebSocketHandler) bean);});super.setOrder(Ordered.HIGHEST_PRECEDENCE);super.setUrlMap(handlers);super.initApplicationContext();}
}
6、ServerJob、离线后重新登录,可以重新推送消息
@Service
@AllArgsConstructor
@Lazy(false)
public class ServerJob extends AbstractDistributedSchedule {private final Logger logger = LoggerFactory.getLogger(getClass());private final WebsocketMessageApiService messageApiService;private final ConcurrentHashMap<String, WebSocketSender> senders;private final UserApiService userApiService;@Override@Scheduled(cron = "0 0/1 * * * ?")public void execute() {try {logger.info(String.format("消息推送异步处理任务,当前在线用户数[%s]", senders.size()));boolean lock = lock(0, 1000);if (!lock) {logger.info("get lock fail");return;}fib.core.Pageable<cn.finopen.faas.api.websocket.dto.WebsocketMessageFindResp> pageable = messageApiService.find(WebsocketMessageFindReq.newBuilder().setStatus(MessageStatus.WAITING.ordinal()).setPage(1).setPageSize(100).build());if (pageable.getTotal() <= 0) {return;}Collection<WebsocketMessageFindResp> items = pageable.getItems();items.forEach(req -> ThreadPoolUtils.execute(() -> {String userId = req.getUserId();UserInfoResp infoResp = userApiService.get(UserGetReq.newBuilder().setLoginUserId(userId).build());String appSn = req.getAppSn();String appType = req.getAppType();String tableCode = req.getTableCode();boolean flag = WebSocketSender.send(senders, infoResp.getUserId(), appType, appSn, tableCode, req.getData());if (flag) {messageApiService.update(WebsocketMessageUpdateReq.newBuilder().setId(req.getId()).setStatus(MessageStatus.SENT.ordinal()).build());}}));} catch (Exception e) {logger.error("消息推送异步处理任务", e);}}
}
7、消息推送API 集成到具体的业务服务(支持在线和离线)
@AllArgsConstructor
@Service
public class WebsocketMessageApiServiceImpl implements WebsocketMessageApiService {private final ConcurrentHashMap<String, WebSocketSender> senders;private final WebsocketMessageRepository repository;private final RedisService redisService;private final static String WEBSOCKET_ONLINE_USERS = "WEBSOCKET_ONLINE_USERS";private final UserApiService userApiService;@Overridepublic WebsocketMessageCreateResp create(WebsocketMessageCreateReq req) {WebsocketMessageData data = req.getData();if (data.getType() == null) {throw FibException.ofBadRequest("消息类型不能为空");}String content = trim(data.getContent());if (StringUtils.isEmpty(content)) {throw FibException.ofBadRequest("消息内容不能为空");}List<String> userIdList = req.getUserId();if (userIdList == null || userIdList.isEmpty()) {throw FibException.ofBadRequest("用户id不能不能为空");}data.setTime(System.currentTimeMillis());String text = data.toString();String appSn = req.getAppSn();String appType = req.getAppType();String tableCode = req.getTableCode();String ignoreUserId = req.getIgnoreUserId();if (StringUtils.isNotEmpty(ignoreUserId)) {UserInfoResp infoResp = userApiService.get(UserGetReq.newBuilder().setLoginUserId(ignoreUserId).build());ignoreUserId = infoResp.getUserId();}List<WebsocketMessage> messages = new ArrayList<>();boolean checkOnline = req.getCheckOnline() != null && req.getCheckOnline();for (String userId : userIdList) {int status = MessageStatus.SENT.ordinal();UserInfoResp infoResp = userApiService.get(UserGetReq.newBuilder().setLoginUserId(userId).build());boolean flag = WebSocketSender.send(senders, infoResp.getUserId(), appType, appSn, tableCode, text, ignoreUserId);if (!flag) {if (checkOnline) {status = MessageStatus.FAILED.ordinal();} else {status = MessageStatus.WAITING.ordinal();}}messages.add(WebsocketMessage.newBuilder().setUserId(userId).setCheckOnline(checkOnline).setData(text).setAppSn(appSn).setAppType(appType).setTableCode(tableCode).setCreateTime(LocalDateTime.now()).setStatus(status).build());}if (!messages.isEmpty() && content.length() < 1024) {repository.saveAll(messages);}return WebsocketMessageCreateResp.newBuilder().build();}@Overridepublic WebsocketMessageGetResp get(WebsocketMessageGetReq req) {Long id = req.getId();Optional<WebsocketMessage> messageOptional = repository.findOne(Example.of(WebsocketMessage.newBuilder().setId(id).build()));if (messageOptional.isEmpty()) {throw FibException.ofNotFound("消息不存在");}WebsocketMessage entity = messageOptional.get();return WebsocketMessageGetResp.newBuilder().setId(entity.getId()).setUserId(entity.getUserId()).setData(entity.getData()).setCheckOnline(entity.getCheckOnline()).setCreateTime(entity.getCreateTime()).build();}@Overridepublic WebsocketMessageUpdateResp update(WebsocketMessageUpdateReq req) {Long id = req.getId();Optional<WebsocketMessage> messageOptional = repository.findOne(Example.of(WebsocketMessage.newBuilder().setId(id).build()));if (messageOptional.isEmpty()) {throw FibException.ofNotFound("消息不存在");}WebsocketMessage entity = messageOptional.get();entity.setStatus(req.getStatus());entity.setUpdateTime(LocalDateTime.now());repository.save(entity);return WebsocketMessageUpdateResp.newBuilder().build();}@Overridepublic Pageable<WebsocketMessageFindResp> find(WebsocketMessageFindReq req) {QWebsocketMessage qEntity = QWebsocketMessage.websocketMessage;BooleanExpression expression = qEntity.isNotNull();if (req.getStatus() != null) {expression = expression.and(qEntity.status.eq(req.getStatus()));}if (StringUtils.isNotEmpty(req.getUserId())) {expression = expression.and(qEntity.userId.eq(req.getUserId()));}List<String> createTimes = req.getCreateTime();if (createTimes != null && !createTimes.isEmpty()) {DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");if (createTimes.size() == 1) {expression = expression.and(qEntity.createTime.goe(LocalDateTime.parse(createTimes.get(0), formatter)));} else {expression = expression.and(qEntity.createTime.goe(LocalDateTime.parse(createTimes.get(0), formatter))).and(qEntity.createTime.lt(LocalDateTime.parse(createTimes.get(1), formatter)));}}PageRequest pageRequest = PageRequest.of(req.getPage() - 1, req.getPageSize(), Sort.by(Sort.Order.desc("createTime")));Page<WebsocketMessage> data = repository.findAll(expression, pageRequest);return PageImpl.of(req.getPage(), req.getPageSize(), data.getTotalElements(), parse(data.getContent()));}@Overridepublic Set<String> getOnlineUsers(WebsocketMessageGetOnlineUserReq req) {Set<String> data = redisService.get(WEBSOCKET_ONLINE_USERS, Set.class);if (data == null) {data = new HashSet<>();}return data;}@Overridepublic Boolean addOnlineUsers(String userId) {Set data = redisService.get(WEBSOCKET_ONLINE_USERS, Set.class);if (data == null) {data = new HashSet<>();}data.add(userId);redisService.set(WEBSOCKET_ONLINE_USERS, JSON.toJSONString(data), 24 * 3600);return true;}@Overridepublic Boolean removeOnlineUsers(String userId) {Set data = redisService.get(WEBSOCKET_ONLINE_USERS, Set.class);if (data != null) {data.remove(userId);redisService.set(WEBSOCKET_ONLINE_USERS, JSON.toJSONString(data), 24 * 3600);}return true;}private List<WebsocketMessageFindResp> parse(List<WebsocketMessage> content) {List<WebsocketMessageFindResp> result = new ArrayList<>();for (WebsocketMessage item : content) {WebsocketMessageFindResp temp = WebsocketMessageFindResp.newBuilder().build();AvroUtils.copy(item, temp);result.add(temp);}return result;}/*** 如果内容为json/jsonArray 剔除null属性,防止内容过长** @param src* @return*/private String trim(String src) {if (src == null || src.isEmpty()) {return src;}try {if (src.startsWith("{") && src.endsWith("}")) {return JSONObject.toJSONString(JSONObject.parseObject(src), SerializerFeature.WriteNullListAsEmpty);} else if (src.startsWith("[") && src.endsWith("]")) {return JSONObject.toJSONString(JSONArray.parseArray(src), SerializerFeature.WriteNullListAsEmpty);}} catch (Exception ignore) {}return src;}
}
8、Web在线测试工具
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>websocket在线测试工具</title><link href="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/css/bootstrap.min.css" rel="stylesheet"><link href="websocket_tool.min.css" rel="stylesheet"><style></style>
</head>
<body>
<div class="well socketBody"><div class="socketTop"><div class="socketTopColLeft"><div class="btn-group socketSelect"><button type="button" class="btn btn-default dropdown-toggle socketSelectBtn" data-toggle="dropdown"aria-expanded="false"><span class="showHeadWS">WS</span><span class="caret"> </span></button><ul class="dropdown-menu socketSelectshadow"><li><a onclick="showWS('WS')">WS</a></li><li><a onclick="showWS('WSS')">WSS</a></li></ul></div></div><div class="socketTopColRight"><input type="text" list="typelist" class="form-control urlInput"placeholder="请输入连接地址~ 如: 127.0.0.1:8000/ws"oninput="inputChange()"><datalist id="typelist" class="inputDatalist"><option><!-- 这里放默认ws地址 --></option></datalist></div></div><div class="socketBG well" id="main"></div><div class="socketBottom row"><div class="col-xs-8 socketTextareaBody"><input class="form-control socketTextarea" placeholder="请输入发送信息~"></input><input id="token" class="form-control " placeholder="Sec-WebSocket-Protocol~"></input><input id="sid" class="form-control " placeholder="sid~"></input><input id="rid" class="form-control " placeholder="rid~"></input></div><div class="col-xs-2 socketBtnSendBody"><button type="button" class="btn btn-success socketBtnSend" onclick="sendBtn()">发送</button></div><div class="col-xs-2 socketBtnBody"><button type="button" class="btn btn-primary socketBtn" onclick="connectBtn()">连接</button><button type="button" class="btn btn-info socketBtn" onclick="emptyBtn()">清屏</button><button type="button" class="btn btn-warning socketBtn" onclick="closeBtn()">断开</button></div></div><div class="alert alert-danger socketInfoTips" role="alert">...</div></div>
<script src="https://cdn.jsdelivr.net/npm/jquery@1.12.4/dist/jquery.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/js/bootstrap.min.js"></script>
<script src="websocket_tool.min.js"></script></body>
</html>
***** 另外客户端在使用过,为了避免掉线情况的发生,可以采取以下几种策略:
- 心跳机制
实现心跳包机制来维持连接。客户端定时向服务器发送心跳包,服务器收到后回应,以此来确认连接是否仍然活跃。如果在一定时间内没有收到来自对方的心跳响应,则认为连接已断开,并进行相应的处理。
- 重连机制
当检测到 WebSocket 连接断开时,自动尝试重新建立连接。可以通过设置递增的重试间隔来避免过于频繁地尝试连接,例如每次重试之间等待的时间逐渐增加。
优化网络环境适应性
根据不同的网络环境调整策略,比如在弱网环境下加大心跳包的间隔,减少数据传输频率等。
- 错误处理
对 WebSocket 的各种事件(如 onopen, onmessage, onerror, onclose)进行适当的监听和处理,确保能够及时发现并处理连接问题。
- 多实例管理
如果应用程序需要维护多个 WebSocket 连接,确保每个连接独立管理,避免相互之间的干扰导致掉线。
实例:
let socket = null;
const HEARTBEAT_INTERVAL = 30000; // 心跳间隔时间
const PING = 'ping'; // 心跳包标识符
const PONG = 'pong';function connect() {socket = uni.connectSocket({url: 'wss://example.com/socket',success: function (res) {console.log('WebSocket连接成功');startHeartbeat();},fail: function (err) {console.error('WebSocket连接失败', err);reconnect();}});
}function startHeartbeat() {const heartbeat = setInterval(() => {socket.send({data: JSON.stringify({ type: PING })});}, HEARTBEAT_INTERVAL);socket.onMessage(res => {if (res.data === PONG) {// 收到服务器的心跳响应} else {// 处理其他消息}});socket.onClose(() => {clearInterval(heartbeat);reconnect();});
}function reconnect() {// 重连逻辑setTimeout(() => {connect();}, 5000); // 每次重连等待5秒
}// 初始化连接
connect();
ping 和 pong 分别表示客户端和服务器之间的心跳信号:
Ping:客户端发送给服务器的心跳信号,用于检测连接状态。
Pong:服务器响应客户端的心跳信号,表示服务器已经收到并处理了客户端的心跳请求。
具体来说:
Ping:
客户端定期向服务器发送一个心跳包,这个心跳包的内容通常是简单的字符串 "ping"。
目的是让服务器知道客户端仍然在线并且连接正常。
Pong:
服务器收到客户端发送的 "ping" 后,会回应一个 "pong"。
客户端收到 "pong" 后,就知道连接仍然是活跃的。
这种机制可以用来检测连接的状态,确保双方都在正常工作。如果客户端在一段时间内没有收到服务器的 "pong" 响应,就可以认为连接已经断开,从而触发重连机制
这篇关于WebSocket+Spring boot 构建一个完整消息服务的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!