如何在Spark的Worker节点中给RocketMq发送消息

2024-06-02 15:38

本文主要是介绍如何在Spark的Worker节点中给RocketMq发送消息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 

1.背景

    之前使用spark进行数据计算,需要将计算结果发送到rocketmq上去,有两种做法:第一种是将计算结果collect到Driver端,然后统一发送。第二种是直接在各个计算结果的partition(即foreachPartition函数)分片中发送。第一种存在的问题是,如果计算结果的数据量非常庞大,如上千万,就需要很大的内存来支持,同时增加了网络传输开销。如果是第二种就不存在这种问题,直接在worker节点发送完毕,不存在数据堆积和网络开销。

    既然说是要发送数据到rocketMQ就要说到rocketmq客户端DefaultMQProducer类,该类是没有实现java的Serializable接口的,所以无法定义一个全局变量,让各个worker直接使用该变量来发送数据,所以需要用到另一种写法——静态类工具。

2.Java序列化基本规则

    上面说到需要使用静态类工具来实现在各个partition分别发送mq消息,其理论基础就是Java序列化规则。我们知道Java在默认情况下,不会对被static和transient关键词修饰的属性进行序列化和反序列化。这个可以验证,静态属性反序列化有还是默认值,利用这个原理封装rocketmq工具。

public class JavaBean {private String name;private int version;
}public class WrapperBean implements Serializable {private static JavaBean javaBean;//由于改对象没有实现Serializable接口,所以必须定义为静态属性,否则报错private static String staticName="默认静态变量值";
}###序列化
public class JdkSerializableMain {public static void main(String[] args) {String file = "D:/demo/javabean.seri";serializable(file);}private static void serializable(String file) {ObjectOutputStream oos = null;try{oos = new ObjectOutputStream(new FileOutputStream(file));Object object = getObject();System.out.println("序列化对象:"+object.toString());oos.writeObject(object);oos.flush();}catch (Exception e){e.printStackTrace();}finally {if(oos !=  null){try {oos.close();} catch (IOException e) {e.printStackTrace();}}}}private static Object getObject() {JavaBean javaBean = new JavaBean("Java设计原本", 44);WrapperBean wb = new WrapperBean(javaBean,"修改后的静态变量值");return wb;}
}
#####反序列化
public class JdkDeSerializableMain {public static void main(String[] args) {String file = "D:/demo/javabean.seri";deserializable(file);}private static void deserializable(String file) {ObjectInputStream ois = null;try{ois = new ObjectInputStream(new FileInputStream(file));Object o = ois.readObject();if(o != null){System.out.println("Class :"+o.getClass());WrapperBean jb = (WrapperBean)o;System.out.println("反序列化结果:"+jb.toString());}}catch (Exception e){e.printStackTrace();}finally {if(ois != null){try {ois.close();} catch (IOException e) {e.printStackTrace();}}}}
}结果:
序列化对象:WrapperBean{javaBean=JavaBean{name='Java设计原本', version=44}},staticName=修改后的静态变量值
反序列化结果:WrapperBean{javaBean=null},staticName=默认静态变量值

3.RocketMq工具

    该工具利用静态属性无法被序列化原理,在各个worker节点中调用getInstance()方法时,实际拿到的是该worker节点加载RocketMqUtils初始化静态代码块拿到的DefaultMQProducer实例,所以可以正常在foreachPartition()中调用发送rocketmq消息

 

public class RocketMqUtils implements Serializable {
 
    private static Logger log = LoggerFactory.getLogger(RocketMqUtils.class);
 
    private static DefaultMQProducer producer=null;
    private static  RocketMqUtils rocketMqUtils = null;
    static {
        ClassPathResource classPathResource = new ClassPathResource("/task-config.properties");
        Properties properties = null;
        try {
            properties = PropertiesLoaderUtils.loadProperties(classPathResource);
            String address = properties.getProperty("mq.namesrvAddr");
            String produceGroup = properties.getProperty("mq.producerGroup");
            log.info("初始化RocketMq,Address={},producerGroup={}",address,produceGroup);
            producer = new DefaultMQProducer(produceGroup);
            producer.setNamesrvAddr(address);
            producer.start();
        } catch (Exception e) {
            log.error("初始化RocketMq失败",e);
        }
    }
 
    public static synchronized RocketMqUtils getInstance(){
        if(rocketMqUtils ==null){
            rocketMqUtils = new RocketMqUtils();
        }
        return rocketMqUtils;
    }
 
    public static void main(String[] args) throws Exception {
        RocketMqUtils rm = new RocketMqUtils();
        Message msg = new Message();
        msg.setTopic("test_jcc");
        msg.setTags("jcc");
        msg.setKeys("kkk");
        msg.setBody("test msg".getBytes());
        rm.sendMsg(msg);
        rm.shutDownMq();
    }
 
    public  void sendMsg(Message msg) throws Exception {
        try {
            SendResult sendResult = producer.send(msg);
            log.info("sendMsg = " + sendResult.toString());
            System.out.println(sendResult.toString());
        } catch (Exception var3) {
            log.error("MQ send ERROR", var3);
            throw new Exception("操作MQ出错!");
        }
    }
 
    public void shutDownMq(){
        if (producer != null){
            producer.shutdown();
        }
    }
}

 

转载于:https://my.oschina.net/u/1159254/blog/2999520

这篇关于如何在Spark的Worker节点中给RocketMq发送消息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1024387

相关文章

解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)

《解读Redis秒杀优化方案(阻塞队列+基于Stream流的消息队列)》该文章介绍了使用Redis的阻塞队列和Stream流的消息队列来优化秒杀系统的方案,通过将秒杀流程拆分为两条流水线,使用Redi... 目录Redis秒杀优化方案(阻塞队列+Stream流的消息队列)什么是消息队列?消费者组的工作方式每

使用C/C++调用libcurl调试消息的方式

《使用C/C++调用libcurl调试消息的方式》在使用C/C++调用libcurl进行HTTP请求时,有时我们需要查看请求的/应答消息的内容(包括请求头和请求体)以方便调试,libcurl提供了多种... 目录1. libcurl 调试工具简介2. 输出请求消息使用 CURLOPT_VERBOSE使用 C

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

Python手搓邮件发送客户端

《Python手搓邮件发送客户端》这篇文章主要为大家详细介绍了如何使用Python手搓邮件发送客户端,支持发送邮件,附件,定时发送以及个性化邮件正文,感兴趣的可以了解下... 目录1. 简介2.主要功能2.1.邮件发送功能2.2.个性签名功能2.3.定时发送功能2. 4.附件管理2.5.配置加载功能2.6.

解决Cron定时任务中Pytest脚本无法发送邮件的问题

《解决Cron定时任务中Pytest脚本无法发送邮件的问题》文章探讨解决在Cron定时任务中运行Pytest脚本时邮件发送失败的问题,先优化环境变量,再检查Pytest邮件配置,接着配置文件确保SMT... 目录引言1. 环境变量优化:确保Cron任务可以正确执行解决方案:1.1. 创建一个脚本1.2. 修

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

Django中使用SMTP实现邮件发送功能

《Django中使用SMTP实现邮件发送功能》在Django中使用SMTP发送邮件是一个常见的需求,通常用于发送用户注册确认邮件、密码重置邮件等,下面我们来看看如何在Django中配置S... 目录1. 配置 Django 项目以使用 SMTP2. 创建 Django 应用3. 添加应用到项目设置4. 创建

SpringBoot 自定义消息转换器使用详解

《SpringBoot自定义消息转换器使用详解》本文详细介绍了SpringBoot消息转换器的知识,并通过案例操作演示了如何进行自定义消息转换器的定制开发和使用,感兴趣的朋友一起看看吧... 目录一、前言二、SpringBoot 内容协商介绍2.1 什么是内容协商2.2 内容协商机制深入理解2.2.1 内容

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分