Apache Storm 简单实践

2024-08-22 14:58
文章标签 简单 实践 apache storm

本文主要是介绍Apache Storm 简单实践,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Apache Storm 简单实践

前两篇文章介绍了Apache Storm的一些基础知识以及核心架构。

  • Apache Storm 集群安装配置\
  • Apache Strom 实时计算系统

本篇文章介绍一些Storm的简单实践场景。

创建一个Storm项目

实践场景为,基于Storm开发出一个实时统计句子中的单词个数的拓扑,实时数据我们通过随机发射句子,在实际应用场景中,实时数据可能来自MQ或者其他来源。

使用IDEA创建一个maven项目,在pom.xml文件中添加以下依赖:

<dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.1.0</version><scope>provided</scope>
</dependency>

上两篇文章说了,Storm的数据源自于Spout。所以我们需要创建一个Spout,由于是一个简单的场景,实时数据我们通过随机发射句子:

Spout的代码如下:

public static class RandomSentenceSpout extends BaseRichSpout{private SpoutOutputCollector collector;private Random random;public void open(Map conf, TopologyContext context,SpoutOutputCollector collector){this.collector = collector;this.random = new Random();}public void nextTuple(){Utils.sleep(100);String[] sentences = new String[]{"the cow jumped over the moon", "an apple a day keeps the doctor away","four score and seven years ago", "snow white and the seven dwarfs","i am at two with nature"};String sentence = sentences[random.nextInt(sentences.length)];collector.emit(new Values(sentence));}public void declareOutputFields(OutputFieldsDeclarer declarer){declarer.declare(new Fields("sentence"));}
}

上面代码很简单,在拓扑启动的时候启动的时候会调用open方法,我们在这里保存了collector,然后Storm会不断的调用nextTuple方法,所以我们在这里把句子发射出去。然后在declareOutputFields声明了发射出去的句子的索引。

上面我们已经把句子发射出去了,接下来的工作就是把句子中的单词切割出来,然后再发射出去。

切割单词的任务交给一个bolt来做

  public static class SplitSentenceBolt extends BaseRichBolt{private OutputCollector collector;public void prepare(Map conf, TopologyContext context,OutputCollector collector) {this.collector = collector;}public void execute(Tuple tuple) {String sentence = tuple.getStringByField("sentence");String[] words = sentence.split(" ");for(String word : words) {collector.emit(new Values(word));}}/*** 定义发射出去的tuple,每个field的名称*/public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}

上面代码把句子切割后发射出去,最后我们还需要一个bolt来统计单词的数量。

 public static class WordCountBolt extends BaseRichBolt {private static final long serialVersionUID = 7208077706057284643L;private static final Logger LOGGER = LoggerFactory.getLogger(WordCountBolt.class);private OutputCollector collector;private Map<String, Long> wordCounts = new HashMap<String, Long>();@SuppressWarnings("rawtypes")public void prepare(Map conf, TopologyContext context, OutputCollector collector) {this.collector = collector;}public void execute(Tuple tuple) {String word = tuple.getStringByField("word");Long count = wordCounts.get(word);if(count == null) {count = 0L;}count++;wordCounts.put(word, count);LOGGER.info("【单词计数】" + word + "出现的次数是" + count);collector.emit(new Values(word, count));}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));}
}

写完了SpoutBolt之后,接下来要创建一个Topology类,将SpoutBolt组合成为一个拓扑:

public class WordCountTopolpgy{public static void main(String[] args) {// 在main方法中,会去将spout和bolts组合起来,构建成一个拓扑TopologyBuilder builder = new TopologyBuilder();// 这里的第一个参数的意思,就是给这个spout设置一个名字// 第三个参数的意思,就是设置spout的executor有几个builder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);builder.setBolt("SplitSentenceBolt", new SplitSentenceBolt(), 5).setNumTasks(10).shuffleGrouping("RandomSentence");builder.setBolt("WordCountBolt", new WordCountBolt(), 10).setNumTasks(20).fieldsGrouping("SplitSentenceBolt", new Fields("word"));Config config = new Config();// 说明是在命令行执行,打算提交到storm集群上去if(args != null && args.length > 0) {config.setNumWorkers(3);try {StormSubmitter.submitTopology(args[0], config, builder.createTopology());} catch (Exception e) {e.printStackTrace();}} else {// 说明是在eclipse里面本地运行config.setMaxTaskParallelism(20);LocalCluster cluster = new LocalCluster();cluster.submitTopology("WordCountTopology", config, builder.createTopology());Utils.sleep(3000);cluster.shutdown();}}
}

上面已经开发完一个Topology了。接下来我们可以直接在本地运行,或者扔到Storm集群去运行。

如果在本地运行,则会模拟生成一个本地集群来运行,(注意本地运行需要去掉pom.xml Storm依赖中的<provided> 节点,真实集群运行需要加上这个节点)

如果要在集群中运行,需要执行命令:

mvn clean package 

得到一个jar包,将这个jar包上传到nimbus节点中,然后执行以下命令就可以运行了。

strom jar xxx.jar com.xxxx.WordCountTopology  WordCountTopology 

总结

本篇文章介绍了一个应用Storm的简单例子,演示了Storm的基本开发方式。可以通过这个例子,触类旁通,运行到真正的企业场景中去。

这篇关于Apache Storm 简单实践的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1096602

相关文章

Spring Security简介、使用与最佳实践

《SpringSecurity简介、使用与最佳实践》SpringSecurity是一个能够为基于Spring的企业应用系统提供声明式的安全访问控制解决方案的安全框架,本文给大家介绍SpringSec... 目录一、如何理解 Spring Security?—— 核心思想二、如何在 Java 项目中使用?——

防止Linux rm命令误操作的多场景防护方案与实践

《防止Linuxrm命令误操作的多场景防护方案与实践》在Linux系统中,rm命令是删除文件和目录的高效工具,但一旦误操作,如执行rm-rf/或rm-rf/*,极易导致系统数据灾难,本文针对不同场景... 目录引言理解 rm 命令及误操作风险rm 命令基础常见误操作案例防护方案使用 rm编程 别名及安全删除

C++统计函数执行时间的最佳实践

《C++统计函数执行时间的最佳实践》在软件开发过程中,性能分析是优化程序的重要环节,了解函数的执行时间分布对于识别性能瓶颈至关重要,本文将分享一个C++函数执行时间统计工具,希望对大家有所帮助... 目录前言工具特性核心设计1. 数据结构设计2. 单例模式管理器3. RAII自动计时使用方法基本用法高级用法

PHP应用中处理限流和API节流的最佳实践

《PHP应用中处理限流和API节流的最佳实践》限流和API节流对于确保Web应用程序的可靠性、安全性和可扩展性至关重要,本文将详细介绍PHP应用中处理限流和API节流的最佳实践,下面就来和小编一起学习... 目录限流的重要性在 php 中实施限流的最佳实践使用集中式存储进行状态管理(如 Redis)采用滑动

ShardingProxy读写分离之原理、配置与实践过程

《ShardingProxy读写分离之原理、配置与实践过程》ShardingProxy是ApacheShardingSphere的数据库中间件,通过三层架构实现读写分离,解决高并发场景下数据库性能瓶... 目录一、ShardingProxy技术定位与读写分离核心价值1.1 技术定位1.2 读写分离核心价值二

深入浅出Spring中的@Autowired自动注入的工作原理及实践应用

《深入浅出Spring中的@Autowired自动注入的工作原理及实践应用》在Spring框架的学习旅程中,@Autowired无疑是一个高频出现却又让初学者头疼的注解,它看似简单,却蕴含着Sprin... 目录深入浅出Spring中的@Autowired:自动注入的奥秘什么是依赖注入?@Autowired

MySQL分库分表的实践示例

《MySQL分库分表的实践示例》MySQL分库分表适用于数据量大或并发压力高的场景,核心技术包括水平/垂直分片和分库,需应对分布式事务、跨库查询等挑战,通过中间件和解决方案实现,最佳实践为合理策略、备... 目录一、分库分表的触发条件1.1 数据量阈值1.2 并发压力二、分库分表的核心技术模块2.1 水平分

Python 基于http.server模块实现简单http服务的代码举例

《Python基于http.server模块实现简单http服务的代码举例》Pythonhttp.server模块通过继承BaseHTTPRequestHandler处理HTTP请求,使用Threa... 目录测试环境代码实现相关介绍模块简介类及相关函数简介参考链接测试环境win11专业版python

SpringBoot通过main方法启动web项目实践

《SpringBoot通过main方法启动web项目实践》SpringBoot通过SpringApplication.run()启动Web项目,自动推断应用类型,加载初始化器与监听器,配置Spring... 目录1. 启动入口:SpringApplication.run()2. SpringApplicat

Java整合Protocol Buffers实现高效数据序列化实践

《Java整合ProtocolBuffers实现高效数据序列化实践》ProtocolBuffers是Google开发的一种语言中立、平台中立、可扩展的结构化数据序列化机制,类似于XML但更小、更快... 目录一、Protocol Buffers简介1.1 什么是Protocol Buffers1.2 Pro