本文主要是介绍Storm例子,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
例子说明:先随机产生一个数,然后乘以16,再然后添加一串字符串并转大写,再然后存入文件中
Main:
public class Main {public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("randomint", new RandomSpout(), 1);builder.setBolt("multiply", new MultiplyBolt(), 3).shuffleGrouping("randomint");builder.setBolt("add", new AddBolt(), 3).shuffleGrouping("multiply");builder.setBolt("end", new EndBolt(), 3).shuffleGrouping("add");Config config = new Config();config.setDebug(true);if(args !=null && args.length > 0){config.setNumWorkers(3);StormSubmitter.submitTopology("goto", config, builder.createTopology());} else { config.setMaxTaskParallelism(1);LocalCluster cluster = new LocalCluster();cluster.submitTopology("go", config, builder.createTopology());Thread.sleep(10000);cluster.shutdown();}}
}
randomint :
public class RandomSpout extends BaseRichSpout {private static final long serialVersionUID = 1L;private SpoutOutputCollector collator;@SuppressWarnings("rawtypes")public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.collator = collector;}public void nextTuple() {int num = getRandomInt();collator.emit(new Values(num));}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("randomint"));}private int getRandomInt(){return new Random().nextInt(10000);}
}
multiply :
public class MultiplyBolt extends BaseBasicBolt {private static final long serialVersionUID = -2510535748201371391L;public void execute(Tuple input, BasicOutputCollector collector) {Integer num = input.getInteger(0);if(num==null){num = 0;}num = num * 16;collector.emit(new Values(num));}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("multiply"));}}
add :
public class AddBolt extends BaseBasicBolt {private static final long serialVersionUID = 7852933092789941359L;public void execute(Tuple input, BasicOutputCollector collector) {Integer num = input.getInteger(0);String s = "is num : " + num;s = s.toUpperCase();collector.emit(new Values(s));}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("add"));}}
end :
public class EndBolt extends BaseBasicBolt {private static final long serialVersionUID = -7167182237528493192L;public void execute(Tuple input, BasicOutputCollector collector) {String msg = input.getString(0);FileUtil.writer(msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("end"));}}
FileUtil :
public class FileUtil {public static void writer(String msg){FileWriter writer = null;try {writer = new FileWriter("/tool/out.txt", true);writer.append(msg+"\n");writer.flush();} catch (IOException e) {e.printStackTrace();} finally {try {if(writer!=null){writer.close();writer = null;}} catch (IOException e) {e.printStackTrace();}}}
}
这篇关于Storm例子的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!