本文主要是介绍ZBus消息中间件和WebSocket的联合使用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1、ZBusconfig.java, zbus的启动、生产、回调处理消息的方法。
package com.accenture.icc.zbus.config;import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.zbus.broker.Broker;
import org.zbus.broker.ZbusBroker;
import org.zbus.mq.Consumer;
import org.zbus.mq.Consumer.ConsumerHandler;
import org.zbus.mq.MqAdmin;
import org.zbus.mq.Producer;
import org.zbus.net.http.Message;import com.accenture.icc.pojo.AnalogInputData;
import com.accenture.icc.pojo.DataUnit;
import com.accenture.icc.pojo.TripleDataWrapper;@Configuration
@PropertySource("classpath:data.properties")
public class ZbusConfig { private static final Logger logger = LoggerFactory.getLogger(ZbusConfig.class);@Value("${zbus.mq.sub}")private String subMq;@Value("${zbus.mq.recv}")private String recvMq;@Value("${zbus.mq.alarm}")private String alarmMq;@Value("${zbus.mq.subCommonData}")private String subIndexDataMq;@Value("${zbus.mq.recvCommonData}")private String indexDataMq;@Value("${csv.power.maxsize}")private int powerMaxListSize;@Value("${csv.power.period}")private int powerPeriod;@Value("${csv.consumption.maxsize}")private int consumptionMaxListSize;@Value("${csv.consumption.period}")private int consumptionPeriod;@Autowiredprivate TripleDataWrapper powerWrapper;@Autowiredprivate TripleDataWrapper consumptionWrapper;@Autowiredprivate TripleDataWrapper transformerWrapper;@Autowiredprivate TripleDataWrapper powerFactorWrapper;@Autowiredprivate TripleDataWrapper unbalanceWrapper;@Autowiredprivate TripleDataWrapper stationParamWrapper;/*** 回调函数* @param messaging* @return*/@Beanpublic ConsumerHandler consumerHandler(SimpMessageSendingOperations messaging) { ConsumerHandler consumerHandler = new ConsumerHandler() {@Overridepublic void handle(Message msg, Consumer consumer) throws IOException {logger.info("RECEIVING MESSAGE: {}", msg.getBodyString());String[] fields = msg.getBodyString().split(",");if(fields.length < 5){return;}String tag = fields[0];int tableId = 0;int recordId = 0
这篇关于ZBus消息中间件和WebSocket的联合使用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!