本文主要是介绍Apache Storm 简单实践,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Apache Storm 简单实践
前两篇文章介绍了Apache Storm
的一些基础知识以及核心架构。
- Apache Storm 集群安装配置\
- Apache Strom 实时计算系统
本篇文章介绍一些Storm
的简单实践场景。
创建一个Storm项目
实践场景为,基于Storm
开发出一个实时统计句子中的单词个数的拓扑,实时数据我们通过随机发射句子,在实际应用场景中,实时数据可能来自MQ或者其他来源。
使用IDEA创建一个maven
项目,在pom.xml
文件中添加以下依赖:
<dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>1.1.0</version><scope>provided</scope>
</dependency>
上两篇文章说了,Storm
的数据源自于Spout
。所以我们需要创建一个Spout
,由于是一个简单的场景,实时数据我们通过随机发射句子:
Spout
的代码如下:
public static class RandomSentenceSpout extends BaseRichSpout{private SpoutOutputCollector collector;private Random random;public void open(Map conf, TopologyContext context,SpoutOutputCollector collector){this.collector = collector;this.random = new Random();}public void nextTuple(){Utils.sleep(100);String[] sentences = new String[]{"the cow jumped over the moon", "an apple a day keeps the doctor away","four score and seven years ago", "snow white and the seven dwarfs","i am at two with nature"};String sentence = sentences[random.nextInt(sentences.length)];collector.emit(new Values(sentence));}public void declareOutputFields(OutputFieldsDeclarer declarer){declarer.declare(new Fields("sentence"));}
}
上面代码很简单,在拓扑启动的时候启动的时候会调用open
方法,我们在这里保存了collector
,然后Storm
会不断的调用nextTuple
方法,所以我们在这里把句子发射出去。然后在declareOutputFields
声明了发射出去的句子的索引。
上面我们已经把句子发射出去了,接下来的工作就是把句子中的单词切割出来,然后再发射出去。
切割单词的任务交给一个bolt
来做
public static class SplitSentenceBolt extends BaseRichBolt{private OutputCollector collector;public void prepare(Map conf, TopologyContext context,OutputCollector collector) {this.collector = collector;}public void execute(Tuple tuple) {String sentence = tuple.getStringByField("sentence");String[] words = sentence.split(" ");for(String word : words) {collector.emit(new Values(word));}}/*** 定义发射出去的tuple,每个field的名称*/public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}
上面代码把句子切割后发射出去,最后我们还需要一个bolt
来统计单词的数量。
public static class WordCountBolt extends BaseRichBolt {private static final long serialVersionUID = 7208077706057284643L;private static final Logger LOGGER = LoggerFactory.getLogger(WordCountBolt.class);private OutputCollector collector;private Map<String, Long> wordCounts = new HashMap<String, Long>();@SuppressWarnings("rawtypes")public void prepare(Map conf, TopologyContext context, OutputCollector collector) {this.collector = collector;}public void execute(Tuple tuple) {String word = tuple.getStringByField("word");Long count = wordCounts.get(word);if(count == null) {count = 0L;}count++;wordCounts.put(word, count);LOGGER.info("【单词计数】" + word + "出现的次数是" + count);collector.emit(new Values(word, count));}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));}
}
写完了Spout
和Bolt
之后,接下来要创建一个Topology
类,将Spout
和Bolt
组合成为一个拓扑:
public class WordCountTopolpgy{public static void main(String[] args) {// 在main方法中,会去将spout和bolts组合起来,构建成一个拓扑TopologyBuilder builder = new TopologyBuilder();// 这里的第一个参数的意思,就是给这个spout设置一个名字// 第三个参数的意思,就是设置spout的executor有几个builder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);builder.setBolt("SplitSentenceBolt", new SplitSentenceBolt(), 5).setNumTasks(10).shuffleGrouping("RandomSentence");builder.setBolt("WordCountBolt", new WordCountBolt(), 10).setNumTasks(20).fieldsGrouping("SplitSentenceBolt", new Fields("word"));Config config = new Config();// 说明是在命令行执行,打算提交到storm集群上去if(args != null && args.length > 0) {config.setNumWorkers(3);try {StormSubmitter.submitTopology(args[0], config, builder.createTopology());} catch (Exception e) {e.printStackTrace();}} else {// 说明是在eclipse里面本地运行config.setMaxTaskParallelism(20);LocalCluster cluster = new LocalCluster();cluster.submitTopology("WordCountTopology", config, builder.createTopology());Utils.sleep(3000);cluster.shutdown();}}
}
上面已经开发完一个Topology
了。接下来我们可以直接在本地运行,或者扔到Storm
集群去运行。
如果在本地运行,则会模拟生成一个本地集群来运行,(注意本地运行需要去掉pom.xml
Storm
依赖中的<provided>
节点,真实集群运行需要加上这个节点)
如果要在集群中运行,需要执行命令:
mvn clean package
得到一个jar
包,将这个jar
包上传到nimbus
节点中,然后执行以下命令就可以运行了。
strom jar xxx.jar com.xxxx.WordCountTopology WordCountTopology
总结
本篇文章介绍了一个应用Storm
的简单例子,演示了Storm
的基本开发方式。可以通过这个例子,触类旁通,运行到真正的企业场景中去。
这篇关于Apache Storm 简单实践的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!