本文主要是介绍基于springboot的stomp与websocket实现实时消息推送与发送,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、什么是STOMP
STOMP 是一个简单的面向文本的消息传递协议,最初是为Ruby,Python等脚本语言创建的。和Perl连接到企业消息代理。它旨在解决常用消息传递模式的子集。 STOMP可用于任何可靠的双向流网络协议,如TCP和WebSocket。虽然STOMP是面向文本的协议,但消息的有效负载可以是文本或二进制。
二、好处
使用STOMP作为子协议使Spring Framework和Spring Security能够提供比使用原始WebSocket更丰富的编程模型。关于HTTP与原始TCP的关系以及它如何使Spring MVC和其他Web框架能够提供丰富的功能,可以做出同样的观点。以下是一系列好处:
无需发明自定义消息传递协议和消息格式。
STOMP客户端可用,包括Spring Framework中的 Java client 。
可以使用诸如RabbitMQ,ActiveMQ等消息代理(可选)来管理订阅和广播消息。
应用程序逻辑可以组织在任意数量的_671939中,并根据STOMP目标标头和处理原始WebSocket消息(针对给定连接使用单个 WebSocketHandler )路由到它们的消息。
使用Spring Security根据STOMP目标和消息类型保护消息。
三、启用STOMP
spring-messaging 和 spring-websocket 模块中提供了对WebSocket支持的STOMP。
四、采用三种常见的使用方式
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.6.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.citydo</groupId><artifactId>redis_mq_stomp_websocket</artifactId><version>0.0.1-SNAPSHOT</version><name>redis_mq_stomp_websocket</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-tomcat</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jetty</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-redis</artifactId><version>2.1.5.RELEASE</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.1.7.RELEASE</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>
4.1、stomp+websocket+原生的websocket
package com.citydo.redis_mq_stomp_websocket.stompConfig;import com.citydo.redis_mq_stomp_websocket.handler.SocketHandler;
import com.citydo.redis_mq_stomp_websocket.interceptor.WebSocketInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(new SocketHandler(), "/websocket-realtime").addInterceptors(new WebSocketInterceptor()).setAllowedOrigins("*");}
}
package com.citydo.redis_mq_stomp_websocket.stompConfig;import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;@Configuration
@EnableWebSocketMessageBroker
public class WebStompSocketConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {stompEndpointRegistry.addEndpoint("/stomp-realtime")//解决跨域问题.setAllowedOrigins("*").withSockJS();}@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {registry.enableSimpleBroker("/topic");}
}
package com.citydo.redis_mq_stomp_websocket.controller;import com.citydo.redis_mq_stomp_websocket.model.RequestMessage;
import com.citydo.redis_mq_stomp_websocket.model.ResponseMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Controller;
import java.text.DateFormat;
import java.text.SimpleDateFormat;@Controller
public class WsController {private final SimpMessagingTemplate messagingTemplate;@Autowiredpublic WsController(SimpMessagingTemplate messagingTemplate) {this.messagingTemplate = messagingTemplate;}@MessageMapping("/welcome")@SendTo("/topic/say")public ResponseMessage say(RequestMessage message) {System.out.println(message.getName());return new ResponseMessage("welcome," + message.getName() + " !");}/*** 定时推送消息*/@Scheduled(fixedRate = 1000)public void callback() {// 发现消息DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");messagingTemplate.convertAndSend("/topic/callback", "定时推送消息时间: " + df.format(System.currentTimeMillis()));}
}
package com.citydo.redis_mq_stomp_websocket.handler;import com.citydo.redis_mq_stomp_websocket.util.JacksonUtil;
import com.citydo.redis_mq_stomp_websocket.model.WsParam;
import com.citydo.redis_mq_stomp_websocket.model.WsResponse;
import com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;@Component
public class SocketHandler extends TextWebSocketHandler {private Logger logger = LoggerFactory.getLogger(this.getClass());List<WebSocketSession> sessions = new CopyOnWriteArrayList<>();@Overridepublic void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {logger.info("handleTextMessage start");// 将消息进行转化,因为是消息是json数据,可能里面包含了发送给某个人的信息,所以需要用json相关的工具类处理之后再封装成TextMessage,// 我这儿并没有做处理,消息的封装格式一般有{from:xxxx,to:xxxxx,msg:xxxxx},来自哪里,发送给谁,什么消息等等String msg = message.getPayload();logger.info("msg = " + msg);WsParam<String> wsParam = JacksonUtil.json2Bean(msg, new TypeReference<WsParam<String>>(){});if ("list".equals(wsParam.getMethod())) {logger.info("call list method...");WsResponse<String> response = new WsResponse<>();response.setResult("hello list");sendMessageToUser(session, new TextMessage(JacksonUtil.bean2Json(response)));}logger.info("handleTextMessage end");// 给所有用户群发消息//sendMessagesToUsers(msg);// 给指定用户群发消息//sendMessageToUser(userId, msg);}@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {logger.info("Connected ... " + session.getId());sessions.add(session);}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {if (session.isOpen()) {session.close();}sessions.remove(session);logger.info(String.format("Session %s closed because of %s", session.getId(), status.getReason()));}@Overridepublic void handleTransportError(WebSocketSession session, Throwable throwable) throws Exception {logger.error("error occured at sender " + session, throwable);}/*** 给所有的用户发送消息*/public void sendMessagesToUsers(TextMessage message) {for (WebSocketSession user : sessions) {try {// isOpen()在线就发送if (user.isOpen()) {user.sendMessage(message);}} catch (IOException e) {e.printStackTrace();}}}/*** 发送消息给指定的用户*/private void sendMessageToUser(WebSocketSession user, TextMessage message) {try {// 在线就发送if (user.isOpen()) {user.sendMessage(message);}} catch (IOException e) {logger.error("发送消息给指定的用户出错", e);}}
}
package com.citydo.redis_mq_stomp_websocket.interceptor;import javax.servlet.http.HttpServletRequest;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;import java.util.Map;public class WebSocketInterceptor implements HandshakeInterceptor {private Logger logger = LoggerFactory.getLogger(this.getClass());@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse arg1,WebSocketHandler arg2, Map<String, Object> arg3) throws Exception {// 将ServerHttpRequest转换成request请求相关的类,用来获取request域中的用户信息if (request instanceof ServletServerHttpRequest) {ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;HttpServletRequest httpRequest = servletRequest.getServletRequest();}logger.info("beforeHandshake完成");return true;}@Overridepublic void afterHandshake(ServerHttpRequest arg0, ServerHttpResponse arg1, WebSocketHandler arg2, Exception arg3) {logger.info("afterHandshake完成");}
}
4.2、stomp+websocket+redis
package com.citydo.redis_mq_stomp_websocket.redisConfig;import com.citydo.redis_mq_stomp_websocket.publish.MessageReserve;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;@Configuration
public class RedisConfig {@Beanpublic MessageListenerAdapter adapter(MessageReserve reserve){return new MessageListenerAdapter(reserve,"message");}@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory,MessageListenerAdapter messageListenerAdapter){RedisMessageListenerContainer container=new RedisMessageListenerContainer();container.setConnectionFactory(redisConnectionFactory);container.addMessageListener(messageListenerAdapter,new PatternTopic("chat"));return container;}
}
package com.citydo.redis_mq_stomp_websocket.redisConfig;import org.springframework.stereotype.Component;import javax.servlet.http.HttpSession;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Hashtable;
import java.util.concurrent.CopyOnWriteArraySet;@Component
@ServerEndpoint("/redis-realtime")
public class WebSocket {private Session session;private static CopyOnWriteArraySet<WebSocket> set=new CopyOnWriteArraySet<>();//创建链接@OnOpenpublic void onOpen(Session session){this.session=session;set.add(this);}@OnClosepublic void onCloae(){set.remove(this);}@OnMessagepublic void onMessage(String message){System.out.println("接收到客户端发送消息为:"+message);}public void sendAll(String message){try {for(WebSocket webSocket:set){webSocket.session.getBasicRemote().sendText(message);}} catch (IOException e) {System.out.println("发送消息出错:"+e.getMessage());}}
}
package com.citydo.redis_mq_stomp_websocket.redisConfig;import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Component
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter(){return new ServerEndpointExporter();}
}
package com.citydo.redis_mq_stomp_websocket.service;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;@Service
public class RedisService {@Autowiredprivate RedisTemplate<String,String> redisTemplate;public void chat(String channel,String message){redisTemplate.convertAndSend(channel,message);}
}
package com.citydo.redis_mq_stomp_websocket.publish;import com.citydo.redis_mq_stomp_websocket.redisConfig.WebSocket;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class MessageReserve {@Autowiredprivate WebSocket webSocket;public void message(String message){webSocket.sendAll(message);}
}
package com.citydo.redis_mq_stomp_websocket.controller;import com.citydo.redis_mq_stomp_websocket.service.RedisService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;@Controller
public class RedisController {@Autowiredprivate RedisService redisService;@RequestMapping("/redis")@ResponseBodypublic String redis(String channel,String message){this.redisService.chat(channel,message);return "消息发送成功!";}@RequestMapping("/show")public String show(){return "show";}
}
package com.citydo.redis_mq_stomp_websocket.service;import com.citydo.redis_mq_stomp_websocket.model.Message;
import com.citydo.redis_mq_stomp_websocket.model.Response;
import com.citydo.redis_mq_stomp_websocket.util.ChcpConstants;
import com.citydo.redis_mq_stomp_websocket.util.JsonHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RequestBody;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;@Service
public class WebsocketService {private static final Logger logger = LoggerFactory.getLogger(WebsocketService.class);@Autowiredprivate SimpMessagingTemplate messagingTemplate;public Response send(@RequestBody Message reqVO) {logger.info("get req msg: {}", reqVO.getMessage());String message = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now())+ " 收到一条消息: " + reqVO.getMessage();Response response = new Response(message);messagingTemplate.convertAndSend(ChcpConstants.WEBSOCKET_DESTINATION.getTopic(), response);logger.info("send ws msg: {}", JsonHelper.toJson(response).toString());return response;}}
4.3、stomp+websocket+kafka
package com.citydo.redis_mq_stomp_websocket.kafkaConfig;import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {@Overridepublic void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {// endPoint 注册协议节点,并映射指定的URl// 注册一个Stomp 协议的endpoint,并指定 SockJS协议。stompEndpointRegistry.addEndpoint("/kafka-realtime").withSockJS();}@Overridepublic void configureMessageBroker(MessageBrokerRegistry registry) {// 配置消息代理(message broker)// 广播式应配置一个/topic 消息代理registry.enableSimpleBroker("/topic");}}
package com.citydo.redis_mq_stomp_websocket.controller;import com.citydo.redis_mq_stomp_websocket.model.Message;
import com.citydo.redis_mq_stomp_websocket.util.ChcpConstants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {private KafkaTemplate<String, Object> kafkaTemplate;@AutowiredKafkaController(KafkaTemplate kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}@PostMapping("/message/send")public String sendMessage(@RequestBody Message message) {kafkaTemplate.send(ChcpConstants.KAFKA_LISTENER_TOPIC.getTopic(), message.getMessage());return message.getMessage();}}
package com.citydo.redis_mq_stomp_websocket.service;import com.citydo.redis_mq_stomp_websocket.model.Message;
import com.citydo.redis_mq_stomp_websocket.model.Response;
import com.citydo.redis_mq_stomp_websocket.util.ChcpConstants;
import com.citydo.redis_mq_stomp_websocket.util.JsonHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RequestBody;import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;@Service
public class KafkaWebsocketService {private static final Logger logger = LoggerFactory.getLogger(KafkaWebsocketService.class);@Autowiredprivate SimpMessagingTemplate messagingTemplate;public Response send(@RequestBody Message reqVO) {logger.info("get req msg: {}", reqVO.getMessage());String message = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").format(LocalDateTime.now())+ " 收到一条消息: " + reqVO.getMessage();Response response = new Response(message);messagingTemplate.convertAndSend(ChcpConstants.WEBSOCKET_DESTINATION.getTopic(), response);logger.info("send ws msg: {}", JsonHelper.toJson(response).toString());return response;}}
package com.citydo.redis_mq_stomp_websocket.util;/*** kafka 和 websocket 监听发送主题定义常量类*/
public enum ChcpConstants {KAFKA_LISTENER_TOPIC( Constants.KAFKA_LISTENER_TOPIC, "kafka 消费者监听主题"),WEBSOCKET_DESTINATION("/topic/getResponse", "websocket 发送目标");private String topic;private String desc;ChcpConstants(String topic, String desc) {this.setTopic(topic);this.desc = desc;}public String getTopic() {return topic;}public void setTopic(String topic) {this.topic = topic;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}public static class Constants {public static final String KAFKA_LISTENER_TOPIC = "topic-string";}}
package com.citydo.redis_mq_stomp_websocket.listener;import com.citydo.redis_mq_stomp_websocket.model.Message;
import com.citydo.redis_mq_stomp_websocket.model.Response;
import com.citydo.redis_mq_stomp_websocket.service.KafkaWebsocketService;
import com.citydo.redis_mq_stomp_websocket.util.ChcpConstants;
import com.citydo.redis_mq_stomp_websocket.util.JsonHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class ConsumerListener {private static final Logger logger = LoggerFactory.getLogger(ConsumerListener.class);@Autowiredprivate KafkaWebsocketService websocketService;@KafkaListener(topics = ChcpConstants.Constants.KAFKA_LISTENER_TOPIC)public void consumer(String message) {logger.info("consumer topic string get : {}", message);Message messageReq = new Message();messageReq.setMessage(message);logger.info("send websocket request : {}", JsonHelper.toJson(messageReq).toString());Response response = websocketService.send(messageReq);logger.info("send websocket response : {}", JsonHelper.toJson(response).toString());}}
参考:https://www.docs4dev.com/docs/zh/spring-framework/4.3.21.RELEASE/reference/websocket.html#stomp
这篇关于基于springboot的stomp与websocket实现实时消息推送与发送的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!