本文主要是介绍如何在Spark的Worker节点中给RocketMq发送消息,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1.背景
之前使用spark进行数据计算,需要将计算结果发送到rocketmq上去,有两种做法:第一种是将计算结果collect到Driver端,然后统一发送。第二种是直接在各个计算结果的partition(即foreachPartition函数)分片中发送。第一种存在的问题是,如果计算结果的数据量非常庞大,如上千万,就需要很大的内存来支持,同时增加了网络传输开销。如果是第二种就不存在这种问题,直接在worker节点发送完毕,不存在数据堆积和网络开销。
既然说是要发送数据到rocketMQ就要说到rocketmq客户端DefaultMQProducer类,该类是没有实现java的Serializable接口的,所以无法定义一个全局变量,让各个worker直接使用该变量来发送数据,所以需要用到另一种写法——静态类工具。
2.Java序列化基本规则
上面说到需要使用静态类工具来实现在各个partition分别发送mq消息,其理论基础就是Java序列化规则。我们知道Java在默认情况下,不会对被static和transient关键词修饰的属性进行序列化和反序列化。这个可以验证,静态属性反序列化有还是默认值,利用这个原理封装rocketmq工具。
public class JavaBean {private String name;private int version; }public class WrapperBean implements Serializable {private static JavaBean javaBean;//由于改对象没有实现Serializable接口,所以必须定义为静态属性,否则报错private static String staticName="默认静态变量值"; }###序列化 public class JdkSerializableMain {public static void main(String[] args) {String file = "D:/demo/javabean.seri";serializable(file);}private static void serializable(String file) {ObjectOutputStream oos = null;try{oos = new ObjectOutputStream(new FileOutputStream(file));Object object = getObject();System.out.println("序列化对象:"+object.toString());oos.writeObject(object);oos.flush();}catch (Exception e){e.printStackTrace();}finally {if(oos != null){try {oos.close();} catch (IOException e) {e.printStackTrace();}}}}private static Object getObject() {JavaBean javaBean = new JavaBean("Java设计原本", 44);WrapperBean wb = new WrapperBean(javaBean,"修改后的静态变量值");return wb;} } #####反序列化 public class JdkDeSerializableMain {public static void main(String[] args) {String file = "D:/demo/javabean.seri";deserializable(file);}private static void deserializable(String file) {ObjectInputStream ois = null;try{ois = new ObjectInputStream(new FileInputStream(file));Object o = ois.readObject();if(o != null){System.out.println("Class :"+o.getClass());WrapperBean jb = (WrapperBean)o;System.out.println("反序列化结果:"+jb.toString());}}catch (Exception e){e.printStackTrace();}finally {if(ois != null){try {ois.close();} catch (IOException e) {e.printStackTrace();}}}} }结果: 序列化对象:WrapperBean{javaBean=JavaBean{name='Java设计原本', version=44}},staticName=修改后的静态变量值 反序列化结果:WrapperBean{javaBean=null},staticName=默认静态变量值
3.RocketMq工具
该工具利用静态属性无法被序列化原理,在各个worker节点中调用getInstance()方法时,实际拿到的是该worker节点加载RocketMqUtils初始化静态代码块拿到的DefaultMQProducer实例,所以可以正常在foreachPartition()中调用发送rocketmq消息
public class RocketMqUtils implements Serializable {
private static Logger log = LoggerFactory.getLogger(RocketMqUtils.class);
private static DefaultMQProducer producer=null;
private static RocketMqUtils rocketMqUtils = null;
static {
ClassPathResource classPathResource = new ClassPathResource("/task-config.properties");
Properties properties = null;
try {
properties = PropertiesLoaderUtils.loadProperties(classPathResource);
String address = properties.getProperty("mq.namesrvAddr");
String produceGroup = properties.getProperty("mq.producerGroup");
log.info("初始化RocketMq,Address={},producerGroup={}",address,produceGroup);
producer = new DefaultMQProducer(produceGroup);
producer.setNamesrvAddr(address);
producer.start();
} catch (Exception e) {
log.error("初始化RocketMq失败",e);
}
}
public static synchronized RocketMqUtils getInstance(){
if(rocketMqUtils ==null){
rocketMqUtils = new RocketMqUtils();
}
return rocketMqUtils;
}
public static void main(String[] args) throws Exception {
RocketMqUtils rm = new RocketMqUtils();
Message msg = new Message();
msg.setTopic("test_jcc");
msg.setTags("jcc");
msg.setKeys("kkk");
msg.setBody("test msg".getBytes());
rm.sendMsg(msg);
rm.shutDownMq();
}
public void sendMsg(Message msg) throws Exception {
try {
SendResult sendResult = producer.send(msg);
log.info("sendMsg = " + sendResult.toString());
System.out.println(sendResult.toString());
} catch (Exception var3) {
log.error("MQ send ERROR", var3);
throw new Exception("操作MQ出错!");
}
}
public void shutDownMq(){
if (producer != null){
producer.shutdown();
}
}
}
转载于:https://my.oschina.net/u/1159254/blog/2999520
这篇关于如何在Spark的Worker节点中给RocketMq发送消息的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!