EMQ共享订阅

2024-05-05 06:48
文章标签 共享 订阅 emq

本文主要是介绍EMQ共享订阅,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1 共享订阅

多个客户端订阅了同一个主题,发布者发布主题时,每个客户端都会同时收到这个主题的消息。在客户端集群部署的场景下会出现消息重复处理的问题。
EMQ支持共享订阅,多个客户端订阅了同一个主题,发布者发布主题时,只有其中一个客户端接收到消息。
共享订阅有两种方式:
(1)共享订阅:订阅前缀$queue/
多个客户端订阅了$queue/topic,发布者发布到topic,则只有一个客户端会接收到消息。
(2)分组订阅:订阅前缀$share/<group>/
多组客户端订阅了$queue/group1/topic、$queue/group2/topic...,发布者发布到topic,则消息会发布到每个group中,但是每个group中只有一个客户端会接收到消息。

2 Java客户端实现共享订阅

开发时发现,使用eclipse paho java客户端时,无法处理共享订阅。订阅$queue/topic能够订阅成功,并且跟踪代码能看到emq也把消息转发到了客户端,但是客户端丢弃掉了。
解决方法就是重写mqtt的回调函数,实现MqttCallback接口。

 

实现MqttCallback接口的代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

package com.emqtest.emqtest;

 

 

import java.util.HashMap;

import java.util.Map;

 

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

import org.eclipse.paho.client.mqttv3.IMqttMessageListener;

import org.eclipse.paho.client.mqttv3.MqttCallback;

import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.eclipse.paho.client.mqttv3.MqttTopic;

 

public class SharedSubCallbackRouter implements MqttCallback {

    private Map<String, IMqttMessageListener> topicFilterListeners;

 

    public SharedSubCallbackRouter(Map<String, IMqttMessageListener> topicFilterListeners) {

        this.topicFilterListeners = topicFilterListeners;

    }

 

    public void addSubscriber(String topicFilter, IMqttMessageListener listener) {

        if (this.topicFilterListeners == null) {

             this.topicFilterListeners = new HashMap<>();

        }

        this.topicFilterListeners.put(topicFilter, listener);

    }

 

    @Override

    public void connectionLost(Throwable cause) {

 

    }

 

    @Override

    public void messageArrived(String topic, MqttMessage message) throws Exception {

        for (Map.Entry<String, IMqttMessageListener> listenerEntry : topicFilterListeners.entrySet()) {

            String topicFilter = listenerEntry.getKey();

            if (isMatched(topicFilter, topic)) {

                listenerEntry.getValue().messageArrived(topic, message);

            }

        }

    }

 

    @Override

    public void deliveryComplete(IMqttDeliveryToken token) {

 

    }

 

    /**

     * Paho topic matcher does not work with shared subscription topic filter of emqttd

     * https://github.com/eclipse/paho.mqtt.java/issues/367#issuecomment-300100385

     * <p>

     * http://emqtt.io/docs/v2/advanced.html#shared-subscription

     *

     * @param topicFilter the topicFilter for mqtt

     * @param topic       the topic

     * @return boolean for matched

     */

    private boolean isMatched(String topicFilter, String topic) {

        if (topicFilter.startsWith("$queue/")) {

            topicFilter = topicFilter.replaceFirst("\\$queue/""");

        else if (topicFilter.startsWith("$share/")) {

            topicFilter = topicFilter.replaceFirst("\\$share/""");

            topicFilter = topicFilter.substring(topicFilter.indexOf('/'));

        }

        return MqttTopic.isMatched(topicFilter, topic);

    }

}

 

创建emq连接代码如下:

1

2

3

4

5

6

7

mqttClient = new MqttClient("tcp://localhost:1883""MqttClient");

mqttClient.connect();

Map<String, IMqttMessageListener> listeners = new HashMap<>();

IMqttMessageListener emqListener = new EmqListener();

listeners.put("$queue/testmqtt", emqListener);

mqttClient.setCallback(new SharedSubCallbackRouter(listeners));

mqttClient.subscribe("$queue/testmqtt"new EmqListener());

 

还要再写一个实现IMqttMessageListener接口的Emq消息处理类:

1

2

3

4

5

6

7

8

9

10

11

12

@Component

public class EmqListener implements IMqttMessageListener {

 

  @Override

  public void messageArrived(String topic, MqttMessage message) throws Exception {

    try {

      System.out.println("topic: " + topic);

    catch (Exception e) {

      e.printStackTrace();

    }

  }

}

 

 

 

 

参考链接:

1 emq的github上关于这个问题的讨论:
https://github.com/emqx/emqx/issues/921#event-1023359646
2 网上有人给的一个解决方法示例代码:
https://github.com/yogin16/paho-shared-sub-example
3 eclipse paho的github链接:
https://github.com/eclipse/paho.mqtt.java

这篇关于EMQ共享订阅的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/961042

相关文章

laravel 多个项目共享SESSION

只讨论一个域下的项目。 eg: a.xxx.com 和 b.xxx.com 来共享session 如果多个laravel项目共享SESSION要满足以下条件: SESSION可以存放在一个地方,eg:共用一个reids用户表为连接同一个数据库的用户表需要在同一域下 操作步骤:以将session 存放到redis中为例: 1. 安装redis库composer require predis/

【计算机网络篇】数据链路层(13)共享式以太网与交换式以太网的对比

文章目录 🍔共享式以太网与交换式以太网的对比🔎主机发送单播帧的情况🔎主机发送广播帧的情况🔎多对主机同时通信 🛸使用集线器和交换机扩展共享式以太网的区别 🍔共享式以太网与交换式以太网的对比 下图是使用集线器将4台主机互联而成的小型共享式以太网 下图是使用交换机将4台主机互联而成的小型交换式以太网 我们假设交换机的转发表已经学习到了所有主机与自己各接口的对应

redis 订阅/发布

本系列已经过半了,这一篇我们来看看redis好玩的发布订阅模式,其实在很多的MQ产品中都存在这样的一个模式,我们常听到的一个例子 就是邮件订阅的场景,什么意思呢,也就是说100个人订阅了你的博客,如果博主发表了文章,那么100个人就会同时收到通知邮件,除了这个 场景还能找到其他场景么,当然有啦,你想想,如果你要在内存里面做一个读写分离的程序,为了维持数据的完整性,你是不是需要保证在写

【database2】redis:优化/备份/订阅

文章目录 1.redis安装:加载.conf2.操作:set/get,push/pop,add/rem3.Jedis:java程序连接redis,拿到jedis4.案例_好友列表:json = om.4.1 前端:index.html4.2 web:FriendServlet .java4.3 service:FriendService.java4.4 dao:FriendDao.java4

Linux下多进程访问同一个共享库处理流程

两个测试程序实现调用同一个SO库:                 ​​​​​​​         #include <stdio.h> #include "a/a.h" #include <unistd.h> int main() {     int a = 4,b = 5;     sum(a, b);     int ret = get();     printf(

Rust FFI 编程 - Rust导出共享库06

本节主要介绍 Rust 导出共享库时,如何通过指针在 Rust 和 C 之间传递结构体。上一节的示例是结构体的内存在 C 端分配,本节介绍内存在 Rust 这边分配,由 C 填充和使用。 设计 本节的示例: Rust 中导出共享库,包含三个函数:student_new,Rust 端分配内存并用默认值初始化,由 C 端填充和更新;student_alice,Rust 端分配内存并初始化,由 C 端

如何用Spring使用Redis作为消息订阅?

目录 一、Spring 框架介绍二、Redis 框架介绍三、什么是消息订阅四、如何用Spring使用Redis作为消息订阅 一、Spring 框架介绍 Spring 框架是一个开源的 Java 平台,它提供了全面的基础设施支持,以便您可以更容易地开发 Java 应用程序。Spring 处理了基础设施,这样您就可以专注于您的应用程序。Spring 框架最初是由 Rod Jo

Linux线程互斥量--进程共享属性

多线程中,在互斥量和 读写锁的 属性中,都有一个叫 进程共享属性 。 对于互斥量,查询和设置这个属性的方法为: pthread_mutexattr_getpshared pthread_mutexattr_setpshared 我一开始不理解什么是 进程共享属性。 看了man中的说明如下 The pthread_mutexattr_getpshared() function shall

java中共享变量分析和volatile

文章目录 1 共享变量1.1 简单理解1.2 CountDownLatch示例以及说明1.3 JAVA内存模型 2 volatile2.1 volatile简介2.2 缓存(工作内存)2.3 使用2.3.1 术语定义2.3.2 基本操作2.3.3 volatile保证可见性不保证原子性2.3.4 禁止指令重排序优化 2.4 原理2.4.1 Volatile实现原理2.4.2 Volatile

使用java +paho mqtt编写模拟发布温度及订阅的过程

启动mqtt 服务 创建项目,在项目中添加模块  添加文件夹 添加maven依赖 <dependencies><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.0</version></dependency></