Apache Storm 简单实践

2024-08-22 14:58
文章标签 简单 实践 apache storm

本文主要是介绍Apache Storm 简单实践,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Apache Storm 简单实践

前两篇文章介绍了Apache Storm的一些基础知识以及核心架构。

  • Apache Storm 集群安装配置\
  • Apache Strom 实时计算系统

本篇文章介绍一些Storm的简单实践场景。

创建一个Storm项目

实践场景为,基于Storm开发出一个实时统计句子中的单词个数的拓扑,实时数据我们通过随机发射句子,在实际应用场景中,实时数据可能来自MQ或者其他来源。

使用IDEA创建一个maven项目,在pom.xml文件中添加以下依赖:

<dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.1.0</version><scope>provided</scope>
</dependency>

上两篇文章说了,Storm的数据源自于Spout。所以我们需要创建一个Spout,由于是一个简单的场景,实时数据我们通过随机发射句子:

Spout的代码如下:

public static class RandomSentenceSpout extends BaseRichSpout{private SpoutOutputCollector collector;private Random random;public void open(Map conf, TopologyContext context,SpoutOutputCollector collector){this.collector = collector;this.random = new Random();}public void nextTuple(){Utils.sleep(100);String[] sentences = new String[]{"the cow jumped over the moon", "an apple a day keeps the doctor away","four score and seven years ago", "snow white and the seven dwarfs","i am at two with nature"};String sentence = sentences[random.nextInt(sentences.length)];collector.emit(new Values(sentence));}public void declareOutputFields(OutputFieldsDeclarer declarer){declarer.declare(new Fields("sentence"));}
}

上面代码很简单,在拓扑启动的时候启动的时候会调用open方法,我们在这里保存了collector,然后Storm会不断的调用nextTuple方法,所以我们在这里把句子发射出去。然后在declareOutputFields声明了发射出去的句子的索引。

上面我们已经把句子发射出去了,接下来的工作就是把句子中的单词切割出来,然后再发射出去。

切割单词的任务交给一个bolt来做

  public static class SplitSentenceBolt extends BaseRichBolt{private OutputCollector collector;public void prepare(Map conf, TopologyContext context,OutputCollector collector) {this.collector = collector;}public void execute(Tuple tuple) {String sentence = tuple.getStringByField("sentence");String[] words = sentence.split(" ");for(String word : words) {collector.emit(new Values(word));}}/*** 定义发射出去的tuple,每个field的名称*/public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}

上面代码把句子切割后发射出去,最后我们还需要一个bolt来统计单词的数量。

 public static class WordCountBolt extends BaseRichBolt {private static final long serialVersionUID = 7208077706057284643L;private static final Logger LOGGER = LoggerFactory.getLogger(WordCountBolt.class);private OutputCollector collector;private Map<String, Long> wordCounts = new HashMap<String, Long>();@SuppressWarnings("rawtypes")public void prepare(Map conf, TopologyContext context, OutputCollector collector) {this.collector = collector;}public void execute(Tuple tuple) {String word = tuple.getStringByField("word");Long count = wordCounts.get(word);if(count == null) {count = 0L;}count++;wordCounts.put(word, count);LOGGER.info("【单词计数】" + word + "出现的次数是" + count);collector.emit(new Values(word, count));}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));}
}

写完了SpoutBolt之后,接下来要创建一个Topology类,将SpoutBolt组合成为一个拓扑:

public class WordCountTopolpgy{public static void main(String[] args) {// 在main方法中,会去将spout和bolts组合起来,构建成一个拓扑TopologyBuilder builder = new TopologyBuilder();// 这里的第一个参数的意思,就是给这个spout设置一个名字// 第三个参数的意思,就是设置spout的executor有几个builder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);builder.setBolt("SplitSentenceBolt", new SplitSentenceBolt(), 5).setNumTasks(10).shuffleGrouping("RandomSentence");builder.setBolt("WordCountBolt", new WordCountBolt(), 10).setNumTasks(20).fieldsGrouping("SplitSentenceBolt", new Fields("word"));Config config = new Config();// 说明是在命令行执行,打算提交到storm集群上去if(args != null && args.length > 0) {config.setNumWorkers(3);try {StormSubmitter.submitTopology(args[0], config, builder.createTopology());} catch (Exception e) {e.printStackTrace();}} else {// 说明是在eclipse里面本地运行config.setMaxTaskParallelism(20);LocalCluster cluster = new LocalCluster();cluster.submitTopology("WordCountTopology", config, builder.createTopology());Utils.sleep(3000);cluster.shutdown();}}
}

上面已经开发完一个Topology了。接下来我们可以直接在本地运行,或者扔到Storm集群去运行。

如果在本地运行,则会模拟生成一个本地集群来运行,(注意本地运行需要去掉pom.xml Storm依赖中的<provided> 节点,真实集群运行需要加上这个节点)

如果要在集群中运行,需要执行命令:

mvn clean package 

得到一个jar包,将这个jar包上传到nimbus节点中,然后执行以下命令就可以运行了。

strom jar xxx.jar com.xxxx.WordCountTopology  WordCountTopology 

总结

本篇文章介绍了一个应用Storm的简单例子,演示了Storm的基本开发方式。可以通过这个例子,触类旁通,运行到真正的企业场景中去。

这篇关于Apache Storm 简单实践的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1096602

相关文章

SpringBoot使用Apache Tika检测敏感信息

《SpringBoot使用ApacheTika检测敏感信息》ApacheTika是一个功能强大的内容分析工具,它能够从多种文件格式中提取文本、元数据以及其他结构化信息,下面我们来看看如何使用Ap... 目录Tika 主要特性1. 多格式支持2. 自动文件类型检测3. 文本和元数据提取4. 支持 OCR(光学

Java内存泄漏问题的排查、优化与最佳实践

《Java内存泄漏问题的排查、优化与最佳实践》在Java开发中,内存泄漏是一个常见且令人头疼的问题,内存泄漏指的是程序在运行过程中,已经不再使用的对象没有被及时释放,从而导致内存占用不断增加,最终... 目录引言1. 什么是内存泄漏?常见的内存泄漏情况2. 如何排查 Java 中的内存泄漏?2.1 使用 J

利用Python编写一个简单的聊天机器人

《利用Python编写一个简单的聊天机器人》这篇文章主要为大家详细介绍了如何利用Python编写一个简单的聊天机器人,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 使用 python 编写一个简单的聊天机器人可以从最基础的逻辑开始,然后逐步加入更复杂的功能。这里我们将先实现一个简单的

使用IntelliJ IDEA创建简单的Java Web项目完整步骤

《使用IntelliJIDEA创建简单的JavaWeb项目完整步骤》:本文主要介绍如何使用IntelliJIDEA创建一个简单的JavaWeb项目,实现登录、注册和查看用户列表功能,使用Se... 目录前置准备项目功能实现步骤1. 创建项目2. 配置 Tomcat3. 项目文件结构4. 创建数据库和表5.

使用PyQt5编写一个简单的取色器

《使用PyQt5编写一个简单的取色器》:本文主要介绍PyQt5搭建的一个取色器,一共写了两款应用,一款使用快捷键捕获鼠标附近图像的RGB和16进制颜色编码,一款跟随鼠标刷新图像的RGB和16... 目录取色器1取色器2PyQt5搭建的一个取色器,一共写了两款应用,一款使用快捷键捕获鼠标附近图像的RGB和16

四种简单方法 轻松进入电脑主板 BIOS 或 UEFI 固件设置

《四种简单方法轻松进入电脑主板BIOS或UEFI固件设置》设置BIOS/UEFI是计算机维护和管理中的一项重要任务,它允许用户配置计算机的启动选项、硬件设置和其他关键参数,该怎么进入呢?下面... 随着计算机技术的发展,大多数主流 PC 和笔记本已经从传统 BIOS 转向了 UEFI 固件。很多时候,我们也

Linux中Curl参数详解实践应用

《Linux中Curl参数详解实践应用》在现代网络开发和运维工作中,curl命令是一个不可或缺的工具,它是一个利用URL语法在命令行下工作的文件传输工具,支持多种协议,如HTTP、HTTPS、FTP等... 目录引言一、基础请求参数1. -X 或 --request2. -d 或 --data3. -H 或

基于Qt开发一个简单的OFD阅读器

《基于Qt开发一个简单的OFD阅读器》这篇文章主要为大家详细介绍了如何使用Qt框架开发一个功能强大且性能优异的OFD阅读器,文中的示例代码讲解详细,有需要的小伙伴可以参考一下... 目录摘要引言一、OFD文件格式解析二、文档结构解析三、页面渲染四、用户交互五、性能优化六、示例代码七、未来发展方向八、结论摘要

Apache Tomcat服务器版本号隐藏的几种方法

《ApacheTomcat服务器版本号隐藏的几种方法》本文主要介绍了ApacheTomcat服务器版本号隐藏的几种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需... 目录1. 隐藏HTTP响应头中的Server信息编辑 server.XML 文件2. 修China编程改错误

Docker集成CI/CD的项目实践

《Docker集成CI/CD的项目实践》本文主要介绍了Docker集成CI/CD的项目实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录一、引言1.1 什么是 CI/CD?1.2 docker 在 CI/CD 中的作用二、Docke