本文主要是介绍示例:ActiveMQ+Windows服务创建消息转发器,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、目的:通过应用ActiveMQ实现,消息的转发
应用场景:在阿里云部署ActiveMQ服务器,在银行柜台部署注册消息客户端,在后台发送消息,实现互联网消息的互通
二、安装ActiveMQ
1、下载地址:http://activemq.apache.org/download.html
2、修改用户名和密码:用户名和密码在conf中的jetty-realm.properties中配置 默认admin@admin
3、启动服务../bin/activemq.bat
3、浏览器进入到127.0.0.1:8161 查看消息信息
三、发送和注册消息
1、发送端服务
/// <summary>/// 发送消息服务类/// </summary>class ActiveMQServer{private IConnectionFactory factory;string _userName;string _passWord;bool _flag;public void Init(string user, string pw, string brokerUri = "tcp://192.168.1.11:61616"){_userName = user;_passWord = pw;try{//初始化工厂factory = new ConnectionFactory(brokerUri);ActiveMQDomain.Instance.LogInfo("初始化成功");_flag = true;}catch (Exception ex){ActiveMQDomain.Instance.LogInfo("初始化失败");ActiveMQDomain.Instance.LogError(ex);}}public void SendMessage(string ms){if (!_flag) return;//建立工厂连接using (IConnection connection = factory.CreateConnection()){//通过工厂连接创建Session会话using (ISession session = connection.CreateSession()){//通过会话创建生产者,方法里new出来MQ的QueueIMessageProducer prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("firstQueue"));//创建一个发送消息的对象ITextMessage message = prod.CreateTextMessage();//给这个消息对象赋实际的消息message.Text = ms; //设置消息对象的属性,是Queue的过滤条件也是P2P的唯一指定属性message.Properties.SetString("filter", "demo");prod.Send(message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);message.Text += "发送成功" + Environment.NewLine;}}}}
2、注册接收端服务
/// <summary>/// 注册消息客户端/// </summary>class ActiveMQClient{public event Action<IMessage> BeginMessage;public void Init(string brokerUri= "tcp://localhost:61616"){//创建连接工厂IConnectionFactory factory = new ConnectionFactory(brokerUri);//通过工厂构建连接IConnection connection = factory.CreateConnection();//这个是连接的客户端名称标识connection.ClientId = "firstQueueListener";//启动连接,监听的话要主动启动连接connection.Start();//通过连接创建一个会话ISession session = connection.CreateSession();//通过会话创建一个消费者,这里就是Queue这种会话类型的监听参数设置IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("firstQueue"), "filter='demo'");//注册监听事件consumer.Listener += l=>{ITextMessage msg = (ITextMessage)l;//异步调用下,否则无法回归主线程Task.Run(() => this.BeginMessage?.Invoke(msg));};//connection.Stop();//connection.Close(); }}
注册客户端的接口:(观察者模式注册消息)
/// <summary> 注册消息对象 </summary>public interface IRegisterMessage{void Notice(string message);}
ActiveMQ应用服务(提供外部应用)
/// <summary>/// ActiveMQ注册接收总服务/// </summary>public class ActiveMQService{#region - 客户端 -ActiveMQClient _activeMQClient = new ActiveMQClient();List<IRegisterMessage> _registerMessages = new List<IRegisterMessage>();/// <summary> 启动客户端 </summary>public void StartClient(){_activeMQClient.Init(ActiveMQDomain.Instance.GetBrokerUri());_activeMQClient.BeginMessage += l =>{foreach (var item in _registerMessages){ITextMessage textMessage = l as ITextMessage;item.Notice(textMessage.Text);}};}/// <summary> 注册订阅消息 </summary>public void Register<T>() where T : IRegisterMessage{T t = Activator.CreateInstance<T>();_registerMessages.Add(t);}#endregion#region - 服务端 -ActiveMQServer _activeMQServer = new ActiveMQServer();/// <summary> 启动服务端 </summary>public void StartServer(){string user = ActiveMQDomain.Instance.GetUser();string pw = ActiveMQDomain.Instance.GetUser();string url = ActiveMQDomain.Instance.GetBrokerUri();_activeMQServer.Init(user, pw, url);}/// <summary> 发送消息 </summary>public void SendMessage(string message){_activeMQServer.SendMessage(message);}#endregion}
四、示例
启动发送端服务并发送消息
public partial class MainWindow : Window{public MainWindow(){InitializeComponent();ServerManager.Instance.StartServer();}private void Button_Click(object sender, RoutedEventArgs e){ServerManager.Instance.SendMessage(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + " - " + this.txt_message.Text);}}
启动客户端并注册消息接收器
Windows服务宿主启动
public partial class Service1 : ServiceBase{public Service1(){InitializeComponent();}protected override void OnStart(string[] args){ServerManager.Instance.InitLogger();ServerManager.Instance.LogInfo("Window服务准备启动!");ServerManager.Instance.LogInfo("准备开始启动客户端");try{ServerManager.Instance.StartClient();ServerManager.Instance.LogInfo("启动客户端完成");ServerManager.Instance.LogInfo("Window服务启动完成!");}catch (Exception ex){ServerManager.Instance.LogInfo("启动客户端错误");ServerManager.Instance.LogError(ex);}}protected override void OnStop(){ServerManager.Instance.LogInfo("Window服务停止!");}}
注册多个消息接收器并启动客户端
ActiveMQService _activeMQService = new ActiveMQService();public void StartClient(){// Message:注册消息_activeMQService.Register<LedMessageNotice>();_activeMQService.Register<VoiceMessageNotice>();// Message:启动客户端_activeMQService.StartClient();}
LedMessageNotice和VoiceMessageNotice分别实现IRegisterMessage接口
public class LedMessageNotice : IRegisterMessage{public void Notice(string message){Console.WriteLine(message);ModuleManager.Instance.LogInfo(message);}}
public class VoiceMessageNotice : IRegisterMessage{public void Notice(string message){Console.WriteLine(message);ModuleManager.Instance.LogInfo(message);SpeechService.Instance.Speek(message);}}
五、部署
1、将ActiveMQ部署在阿里云服务器上,并启动服务
2、将windows服务客户端部分注册在银行大厅局域网主机内,当接收到消息转发给各个消息输出设备
3、消息发送部分部署在可连接到阿里云的设备上进行发送消息
注意:
1)部署在阿里云上的地址同样用tcp:\\****:61616
2)设置多个客户端(如有多个客户端连接是需要设置不同的ID否则报错):
//这个是连接的客户端名称标识
connection.ClientId = "firstQueueListener1";
3)发送到不同的客户端可以通过过滤条件来设置
客户端设置分组为“demo1”
//通过会话创建一个消费者,这里就是Queue这种会话类型的监听参数设置
IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("firstQueue"), "filter='demo1'");
服务端发送到分组为“demo1”
//设置消息对象的属性,是Queue的过滤条件也是P2P的唯一指定属性
message.Properties.SetString("filter", "demo1");
(此时发送的数据只发送给demo1的客户端)
示例代码:https://github.com/HeBianGu/MQ-ActiveMQMessageDispatcher.git
这篇关于示例:ActiveMQ+Windows服务创建消息转发器的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!