示例:ActiveMQ+Windows服务创建消息转发器

2024-05-26 16:48

本文主要是介绍示例:ActiveMQ+Windows服务创建消息转发器,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、目的:通过应用ActiveMQ实现,消息的转发

       应用场景:在阿里云部署ActiveMQ服务器,在银行柜台部署注册消息客户端,在后台发送消息,实现互联网消息的互通

 

二、安装ActiveMQ

 

1、下载地址:http://activemq.apache.org/download.html 

2、修改用户名和密码:用户名和密码在conf中的jetty-realm.properties中配置 默认admin@admin

3、启动服务../bin/activemq.bat

3、浏览器进入到127.0.0.1:8161 查看消息信息

 

三、发送和注册消息

1、发送端服务

 /// <summary>/// 发送消息服务类/// </summary>class ActiveMQServer{private IConnectionFactory factory;string _userName;string _passWord;bool _flag;public void Init(string user, string pw, string brokerUri = "tcp://192.168.1.11:61616"){_userName = user;_passWord = pw;try{//初始化工厂factory = new ConnectionFactory(brokerUri);ActiveMQDomain.Instance.LogInfo("初始化成功");_flag = true;}catch (Exception ex){ActiveMQDomain.Instance.LogInfo("初始化失败");ActiveMQDomain.Instance.LogError(ex);}}public void SendMessage(string ms){if (!_flag) return;//建立工厂连接using (IConnection connection = factory.CreateConnection()){//通过工厂连接创建Session会话using (ISession session = connection.CreateSession()){//通过会话创建生产者,方法里new出来MQ的QueueIMessageProducer prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("firstQueue"));//创建一个发送消息的对象ITextMessage message = prod.CreateTextMessage();//给这个消息对象赋实际的消息message.Text = ms; //设置消息对象的属性,是Queue的过滤条件也是P2P的唯一指定属性message.Properties.SetString("filter", "demo");prod.Send(message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);message.Text += "发送成功" + Environment.NewLine;}}}}

 

2、注册接收端服务

/// <summary>/// 注册消息客户端/// </summary>class ActiveMQClient{public event Action<IMessage> BeginMessage;public void Init(string brokerUri= "tcp://localhost:61616"){//创建连接工厂IConnectionFactory factory = new ConnectionFactory(brokerUri);//通过工厂构建连接IConnection connection = factory.CreateConnection();//这个是连接的客户端名称标识connection.ClientId = "firstQueueListener";//启动连接,监听的话要主动启动连接connection.Start();//通过连接创建一个会话ISession session = connection.CreateSession();//通过会话创建一个消费者,这里就是Queue这种会话类型的监听参数设置IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("firstQueue"), "filter='demo'");//注册监听事件consumer.Listener += l=>{ITextMessage msg = (ITextMessage)l;//异步调用下,否则无法回归主线程Task.Run(() => this.BeginMessage?.Invoke(msg));};//connection.Stop();//connection.Close();  }}

注册客户端的接口:(观察者模式注册消息)

  /// <summary> 注册消息对象 </summary>public interface IRegisterMessage{void Notice(string message);}

ActiveMQ应用服务(提供外部应用)

  /// <summary>/// ActiveMQ注册接收总服务/// </summary>public class ActiveMQService{#region - 客户端 -ActiveMQClient _activeMQClient = new ActiveMQClient();List<IRegisterMessage> _registerMessages = new List<IRegisterMessage>();/// <summary> 启动客户端 </summary>public void StartClient(){_activeMQClient.Init(ActiveMQDomain.Instance.GetBrokerUri());_activeMQClient.BeginMessage += l =>{foreach (var item in _registerMessages){ITextMessage textMessage = l as ITextMessage;item.Notice(textMessage.Text);}};}/// <summary> 注册订阅消息 </summary>public void Register<T>() where T : IRegisterMessage{T t = Activator.CreateInstance<T>();_registerMessages.Add(t);}#endregion#region - 服务端 -ActiveMQServer _activeMQServer = new ActiveMQServer();/// <summary> 启动服务端 </summary>public void StartServer(){string user = ActiveMQDomain.Instance.GetUser();string pw = ActiveMQDomain.Instance.GetUser();string url = ActiveMQDomain.Instance.GetBrokerUri();_activeMQServer.Init(user, pw, url);}/// <summary> 发送消息 </summary>public void SendMessage(string message){_activeMQServer.SendMessage(message);}#endregion}

四、示例

启动发送端服务并发送消息

 public partial class MainWindow : Window{public MainWindow(){InitializeComponent();ServerManager.Instance.StartServer();}private void Button_Click(object sender, RoutedEventArgs e){ServerManager.Instance.SendMessage(DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + " - " + this.txt_message.Text);}}

启动客户端并注册消息接收器

Windows服务宿主启动

public partial class Service1 : ServiceBase{public Service1(){InitializeComponent();}protected override void OnStart(string[] args){ServerManager.Instance.InitLogger();ServerManager.Instance.LogInfo("Window服务准备启动!");ServerManager.Instance.LogInfo("准备开始启动客户端");try{ServerManager.Instance.StartClient();ServerManager.Instance.LogInfo("启动客户端完成");ServerManager.Instance.LogInfo("Window服务启动完成!");}catch (Exception ex){ServerManager.Instance.LogInfo("启动客户端错误");ServerManager.Instance.LogError(ex);}}protected override void OnStop(){ServerManager.Instance.LogInfo("Window服务停止!");}}

注册多个消息接收器并启动客户端

    ActiveMQService _activeMQService = new ActiveMQService();public void StartClient(){//  Message:注册消息_activeMQService.Register<LedMessageNotice>();_activeMQService.Register<VoiceMessageNotice>();//  Message:启动客户端_activeMQService.StartClient();}

LedMessageNotice和VoiceMessageNotice分别实现IRegisterMessage接口

 public class LedMessageNotice : IRegisterMessage{public void Notice(string message){Console.WriteLine(message);ModuleManager.Instance.LogInfo(message);}}
  public class VoiceMessageNotice : IRegisterMessage{public void Notice(string message){Console.WriteLine(message);ModuleManager.Instance.LogInfo(message);SpeechService.Instance.Speek(message);}}

五、部署

1、将ActiveMQ部署在阿里云服务器上,并启动服务

2、将windows服务客户端部分注册在银行大厅局域网主机内,当接收到消息转发给各个消息输出设备

3、消息发送部分部署在可连接到阿里云的设备上进行发送消息

注意:

1)部署在阿里云上的地址同样用tcp:\\****:61616

2)设置多个客户端(如有多个客户端连接是需要设置不同的ID否则报错):

            //这个是连接的客户端名称标识
            connection.ClientId = "firstQueueListener1";

3)发送到不同的客户端可以通过过滤条件来设置

         客户端设置分组为“demo1”

             //通过会话创建一个消费者,这里就是Queue这种会话类型的监听参数设置
            IMessageConsumer consumer = session.CreateConsumer(new                      Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("firstQueue"), "filter='demo1'");

         服务端发送到分组为“demo1”

                    //设置消息对象的属性,是Queue的过滤条件也是P2P的唯一指定属性
                    message.Properties.SetString("filter", "demo1");

        (此时发送的数据只发送给demo1的客户端)

 

 

示例代码:https://github.com/HeBianGu/MQ-ActiveMQMessageDispatcher.git

这篇关于示例:ActiveMQ+Windows服务创建消息转发器的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1005029

相关文章

Python中logging模块用法示例总结

《Python中logging模块用法示例总结》在Python中logging模块是一个强大的日志记录工具,它允许用户将程序运行期间产生的日志信息输出到控制台或者写入到文件中,:本文主要介绍Pyt... 目录前言一. 基本使用1. 五种日志等级2.  设置报告等级3. 自定义格式4. C语言风格的格式化方法

Spring 中的切面与事务结合使用完整示例

《Spring中的切面与事务结合使用完整示例》本文给大家介绍Spring中的切面与事务结合使用完整示例,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录 一、前置知识:Spring AOP 与 事务的关系 事务本质上就是一个“切面”二、核心组件三、完

sky-take-out项目中Redis的使用示例详解

《sky-take-out项目中Redis的使用示例详解》SpringCache是Spring的缓存抽象层,通过注解简化缓存管理,支持Redis等提供者,适用于方法结果缓存、更新和删除操作,但无法实现... 目录Spring Cache主要特性核心注解1.@Cacheable2.@CachePut3.@Ca

QT Creator配置Kit的实现示例

《QTCreator配置Kit的实现示例》本文主要介绍了使用Qt5.12.12与VS2022时,因MSVC编译器版本不匹配及WindowsSDK缺失导致配置错误的问题解决,感兴趣的可以了解一下... 目录0、背景:qt5.12.12+vs2022一、症状:二、原因:(可以跳过,直奔后面的解决方法)三、解决方

MySQL中On duplicate key update的实现示例

《MySQL中Onduplicatekeyupdate的实现示例》ONDUPLICATEKEYUPDATE是一种MySQL的语法,它在插入新数据时,如果遇到唯一键冲突,则会执行更新操作,而不是抛... 目录1/ ON DUPLICATE KEY UPDATE的简介2/ ON DUPLICATE KEY UP

Python中Json和其他类型相互转换的实现示例

《Python中Json和其他类型相互转换的实现示例》本文介绍了在Python中使用json模块实现json数据与dict、object之间的高效转换,包括loads(),load(),dumps()... 项目中经常会用到json格式转为object对象、dict字典格式等。在此做个记录,方便后续用到该方

MySQL分库分表的实践示例

《MySQL分库分表的实践示例》MySQL分库分表适用于数据量大或并发压力高的场景,核心技术包括水平/垂直分片和分库,需应对分布式事务、跨库查询等挑战,通过中间件和解决方案实现,最佳实践为合理策略、备... 目录一、分库分表的触发条件1.1 数据量阈值1.2 并发压力二、分库分表的核心技术模块2.1 水平分

SpringBoot请求参数传递与接收示例详解

《SpringBoot请求参数传递与接收示例详解》本文给大家介绍SpringBoot请求参数传递与接收示例详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋... 目录I. 基础参数传递i.查询参数(Query Parameters)ii.路径参数(Path Va

RabbitMQ 延时队列插件安装与使用示例详解(基于 Delayed Message Plugin)

《RabbitMQ延时队列插件安装与使用示例详解(基于DelayedMessagePlugin)》本文详解RabbitMQ通过安装rabbitmq_delayed_message_exchan... 目录 一、什么是 RabbitMQ 延时队列? 二、安装前准备✅ RabbitMQ 环境要求 三、安装延时队

sysmain服务可以禁用吗? 电脑sysmain服务关闭后的影响与操作指南

《sysmain服务可以禁用吗?电脑sysmain服务关闭后的影响与操作指南》在Windows系统中,SysMain服务(原名Superfetch)作为一个旨在提升系统性能的关键组件,一直备受用户关... 在使用 Windows 系统时,有时候真有点像在「开盲盒」。全新安装系统后的「默认设置」,往往并不尽编