Apache Storm 简单实践

2024-08-22 14:58
文章标签 简单 实践 apache storm

本文主要是介绍Apache Storm 简单实践,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Apache Storm 简单实践

前两篇文章介绍了Apache Storm的一些基础知识以及核心架构。

  • Apache Storm 集群安装配置\
  • Apache Strom 实时计算系统

本篇文章介绍一些Storm的简单实践场景。

创建一个Storm项目

实践场景为,基于Storm开发出一个实时统计句子中的单词个数的拓扑,实时数据我们通过随机发射句子,在实际应用场景中,实时数据可能来自MQ或者其他来源。

使用IDEA创建一个maven项目,在pom.xml文件中添加以下依赖:

<dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.1.0</version><scope>provided</scope>
</dependency>

上两篇文章说了,Storm的数据源自于Spout。所以我们需要创建一个Spout,由于是一个简单的场景,实时数据我们通过随机发射句子:

Spout的代码如下:

public static class RandomSentenceSpout extends BaseRichSpout{private SpoutOutputCollector collector;private Random random;public void open(Map conf, TopologyContext context,SpoutOutputCollector collector){this.collector = collector;this.random = new Random();}public void nextTuple(){Utils.sleep(100);String[] sentences = new String[]{"the cow jumped over the moon", "an apple a day keeps the doctor away","four score and seven years ago", "snow white and the seven dwarfs","i am at two with nature"};String sentence = sentences[random.nextInt(sentences.length)];collector.emit(new Values(sentence));}public void declareOutputFields(OutputFieldsDeclarer declarer){declarer.declare(new Fields("sentence"));}
}

上面代码很简单,在拓扑启动的时候启动的时候会调用open方法,我们在这里保存了collector,然后Storm会不断的调用nextTuple方法,所以我们在这里把句子发射出去。然后在declareOutputFields声明了发射出去的句子的索引。

上面我们已经把句子发射出去了,接下来的工作就是把句子中的单词切割出来,然后再发射出去。

切割单词的任务交给一个bolt来做

  public static class SplitSentenceBolt extends BaseRichBolt{private OutputCollector collector;public void prepare(Map conf, TopologyContext context,OutputCollector collector) {this.collector = collector;}public void execute(Tuple tuple) {String sentence = tuple.getStringByField("sentence");String[] words = sentence.split(" ");for(String word : words) {collector.emit(new Values(word));}}/*** 定义发射出去的tuple,每个field的名称*/public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}

上面代码把句子切割后发射出去,最后我们还需要一个bolt来统计单词的数量。

 public static class WordCountBolt extends BaseRichBolt {private static final long serialVersionUID = 7208077706057284643L;private static final Logger LOGGER = LoggerFactory.getLogger(WordCountBolt.class);private OutputCollector collector;private Map<String, Long> wordCounts = new HashMap<String, Long>();@SuppressWarnings("rawtypes")public void prepare(Map conf, TopologyContext context, OutputCollector collector) {this.collector = collector;}public void execute(Tuple tuple) {String word = tuple.getStringByField("word");Long count = wordCounts.get(word);if(count == null) {count = 0L;}count++;wordCounts.put(word, count);LOGGER.info("【单词计数】" + word + "出现的次数是" + count);collector.emit(new Values(word, count));}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));}
}

写完了SpoutBolt之后,接下来要创建一个Topology类,将SpoutBolt组合成为一个拓扑:

public class WordCountTopolpgy{public static void main(String[] args) {// 在main方法中,会去将spout和bolts组合起来,构建成一个拓扑TopologyBuilder builder = new TopologyBuilder();// 这里的第一个参数的意思,就是给这个spout设置一个名字// 第三个参数的意思,就是设置spout的executor有几个builder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);builder.setBolt("SplitSentenceBolt", new SplitSentenceBolt(), 5).setNumTasks(10).shuffleGrouping("RandomSentence");builder.setBolt("WordCountBolt", new WordCountBolt(), 10).setNumTasks(20).fieldsGrouping("SplitSentenceBolt", new Fields("word"));Config config = new Config();// 说明是在命令行执行,打算提交到storm集群上去if(args != null && args.length > 0) {config.setNumWorkers(3);try {StormSubmitter.submitTopology(args[0], config, builder.createTopology());} catch (Exception e) {e.printStackTrace();}} else {// 说明是在eclipse里面本地运行config.setMaxTaskParallelism(20);LocalCluster cluster = new LocalCluster();cluster.submitTopology("WordCountTopology", config, builder.createTopology());Utils.sleep(3000);cluster.shutdown();}}
}

上面已经开发完一个Topology了。接下来我们可以直接在本地运行,或者扔到Storm集群去运行。

如果在本地运行,则会模拟生成一个本地集群来运行,(注意本地运行需要去掉pom.xml Storm依赖中的<provided> 节点,真实集群运行需要加上这个节点)

如果要在集群中运行,需要执行命令:

mvn clean package 

得到一个jar包,将这个jar包上传到nimbus节点中,然后执行以下命令就可以运行了。

strom jar xxx.jar com.xxxx.WordCountTopology  WordCountTopology 

总结

本篇文章介绍了一个应用Storm的简单例子,演示了Storm的基本开发方式。可以通过这个例子,触类旁通,运行到真正的企业场景中去。

这篇关于Apache Storm 简单实践的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1096602

相关文章

MySQL 迁移至 Doris 最佳实践方案(最新整理)

《MySQL迁移至Doris最佳实践方案(最新整理)》本文将深入剖析三种经过实践验证的MySQL迁移至Doris的最佳方案,涵盖全量迁移、增量同步、混合迁移以及基于CDC(ChangeData... 目录一、China编程JDBC Catalog 联邦查询方案(适合跨库实时查询)1. 方案概述2. 环境要求3.

Linux进程CPU绑定优化与实践过程

《Linux进程CPU绑定优化与实践过程》Linux支持进程绑定至特定CPU核心,通过sched_setaffinity系统调用和taskset工具实现,优化缓存效率与上下文切换,提升多核计算性能,适... 目录1. 多核处理器及并行计算概念1.1 多核处理器架构概述1.2 并行计算的含义及重要性1.3 并

全面掌握 SQL 中的 DATEDIFF函数及用法最佳实践

《全面掌握SQL中的DATEDIFF函数及用法最佳实践》本文解析DATEDIFF在不同数据库中的差异,强调其边界计算原理,探讨应用场景及陷阱,推荐根据需求选择TIMESTAMPDIFF或inte... 目录1. 核心概念:DATEDIFF 究竟在计算什么?2. 主流数据库中的 DATEDIFF 实现2.1

Spring WebFlux 与 WebClient 使用指南及最佳实践

《SpringWebFlux与WebClient使用指南及最佳实践》WebClient是SpringWebFlux模块提供的非阻塞、响应式HTTP客户端,基于ProjectReactor实现,... 目录Spring WebFlux 与 WebClient 使用指南1. WebClient 概述2. 核心依

MyBatis-Plus 中 nested() 与 and() 方法详解(最佳实践场景)

《MyBatis-Plus中nested()与and()方法详解(最佳实践场景)》在MyBatis-Plus的条件构造器中,nested()和and()都是用于构建复杂查询条件的关键方法,但... 目录MyBATis-Plus 中nested()与and()方法详解一、核心区别对比二、方法详解1.and()

Spring Boot @RestControllerAdvice全局异常处理最佳实践

《SpringBoot@RestControllerAdvice全局异常处理最佳实践》本文详解SpringBoot中通过@RestControllerAdvice实现全局异常处理,强调代码复用、统... 目录前言一、为什么要使用全局异常处理?二、核心注解解析1. @RestControllerAdvice2

Spring事务传播机制最佳实践

《Spring事务传播机制最佳实践》Spring的事务传播机制为我们提供了优雅的解决方案,本文将带您深入理解这一机制,掌握不同场景下的最佳实践,感兴趣的朋友一起看看吧... 目录1. 什么是事务传播行为2. Spring支持的七种事务传播行为2.1 REQUIRED(默认)2.2 SUPPORTS2

Java中的雪花算法Snowflake解析与实践技巧

《Java中的雪花算法Snowflake解析与实践技巧》本文解析了雪花算法的原理、Java实现及生产实践,涵盖ID结构、位运算技巧、时钟回拨处理、WorkerId分配等关键点,并探讨了百度UidGen... 目录一、雪花算法核心原理1.1 算法起源1.2 ID结构详解1.3 核心特性二、Java实现解析2.

MySQL 中 ROW_NUMBER() 函数最佳实践

《MySQL中ROW_NUMBER()函数最佳实践》MySQL中ROW_NUMBER()函数,作为窗口函数为每行分配唯一连续序号,区别于RANK()和DENSE_RANK(),特别适合分页、去重... 目录mysql 中 ROW_NUMBER() 函数详解一、基础语法二、核心特点三、典型应用场景1. 数据分

深度解析Spring AOP @Aspect 原理、实战与最佳实践教程

《深度解析SpringAOP@Aspect原理、实战与最佳实践教程》文章系统讲解了SpringAOP核心概念、实现方式及原理,涵盖横切关注点分离、代理机制(JDK/CGLIB)、切入点类型、性能... 目录1. @ASPect 核心概念1.1 AOP 编程范式1.2 @Aspect 关键特性2. 完整代码实