storm-0.8.2集群模式安装部署

2024-06-04 09:18

本文主要是介绍storm-0.8.2集群模式安装部署,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

标签(空格分隔): storm


hadoop,spark,kafka交流群:224209501

集群:
一台主机用来运行nimbus,另外两台运行superior。

安装及测试:

1,安装ZeroMQ
2,安装jzmq
3,安装Python
4,安装storm
5,配置storm
6,启动storm
7,测试storm

1,安装依赖

sudo yum install uuid*
sudo yum install libtool
sudo yum install libuuid 
sudo yum install libuuid-devel
sudo yum install libtool*

2,安装zeromq

 tar -zxvf zeromq-2.1.7.tar.gz -C /opt/storm/./configuremakesudo make install

3,安装jzmq

unzip jzmq-master.zip -d /opt/storm/
./autogen.sh 
./configure
make
make install

4,配置storm

1,说明一下:storm.local.dir表示storm要用到的本地目录。nimbus.host表示哪一台机器是master,即nimbus。storm.zookeeper.port表示zookeeper的端口号,这里一定要与zookeeper配置端口号一致,否则会出现通信错误。当然你也可以配置superevisor.slot.ports,supervisor.slots.ports表示supervisor节点的槽数,就是最多能跑几个worker进程(每个spout或者bolt默认只启动一个worker,但是可以通过conf修改成多个)
2,java.library.path这是storm所依赖的本地依赖(ZeroMQ和JZMQ)的加载地址,默认的是:/usr/local/lib:/opt/local/lib:/usr/lib,大多数情况下是对的,所以你应该不用更改这个配置。
注意事项:
1,配置时一定注意在每一项的开始时是要加空格(最好加两个空格),冒号之后也要加空格,否则storm不认识这个配置文件。
2,在目录/usr/tmp下面增加storm文件夹。
在storm.yaml文件中配置如下内容:

 storm.zookeeper.servers:- "miaodonghua.host"
#     - "server2"
# nimbus.host: "miaodonghua.host"

5,启动nimbus

bin/storm nimbus

nimbun启动成功.png-9.3kB

6,启动storm的web UI

经测试,StormUI必须和Storm Nimbus部署在同一台机器上,否则UI将无法正常工作,因为UI进程会检查本机是否存在Nimbus链接。

nohup bin/storm ui & 

storm ui.png-11kB
浏览器中输入下面链接及端口号

http://miaodonghua.host:8080///浏览器

主节点ui.png-60.9kB

7,启动从节点

在两台supervisor机器中启动

nohup bin/storm supervisor &

supervisor启动成功.png-9.6kB
stom ui supervisor.png-49.3kB

8,运行

bin/storm jar /opt/storm/storm-0.8.2/lifeCycle.jar  cn.storm.topology.TopoMain
bin/storm list

stormlist.png-21.5kB

http://miaodonghua.host:8080

执行成功.png-45.6kB
lifecycle.png-55.7kB

9,storm-start-master

下载storm-start:
storm-starter-0.7.0.zip
进入主目录修改m2-pom.xml(将twitter4j-core和twitter4j-stream替换为下面的部分)

<dependency><groupId>org.twitter4j</groupId><artifactId>twitter4j-core</artifactId>
<version>[2.2,)</version>
</dependency>
<dependency><groupId>org.twitter4j</groupId><artifactId>twitter4j-stream</artifactId><version>[2.2,)</version>
</dependency>

编译

mvn -f m2-pom.xml package

编译成功.png-23.3kB
复制 storm-starter目录下的m2_pom.xml 为pom.xml,放在与m2_pom.xml同一目录下
打jar包

mvn jar:jar

打成jar包.png-7.3kB
如果需要对工程代码进行修改,可以导入eclipse

mvn eclipse:eclipse

生成eclipse成功.png-10.1kB

10,源码内容如下

1,编写RandomWordSpout

package cn.storm.spout;import java.util.Map;
import java.util.Random;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
/*** 随机从String数组当中读取一个单词发送给下一个bolt* @author Administrator**/
public class RandomWordSpout extends BaseRichSpout {private static final long serialVersionUID = -4287209449750623371L;private static final Log log = LogFactory.getLog(RandomWordSpout.class);private SpoutOutputCollector collector;private String[] words = new String[]{"storm", "hadoop", "hive", "flume"};private Random random = new Random();public RandomWordSpout() {log.warn("&&&&&&&&&&&&&&&&& RandomWordSpout constructor method invoked");}@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {log.warn("################# RandomWordSpout open() method invoked");this.collector = collector;}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {log.warn("################# RandomWordSpout declareOutputFields() method invoked");declarer.declare(new Fields("str"));}@Overridepublic void nextTuple() {log.warn("################# RandomWordSpout nextTuple() method invoked");Utils.sleep(500);String str = words[random.nextInt(words.length)];collector.emit(new Values(str));}@Overridepublic void activate() {log.warn("################# RandomWordSpout activate() method invoked");}@Overridepublic void deactivate() {log.warn("################# RandomWordSpout deactivate() method invoked");}
}

2,编写TransferBolt

package cn.storm.bolt;import java.util.Map;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;/*** 将数据做简单的 传递的Bolt* @author Administrator**/
public class TransferBolt extends BaseBasicBolt {private static final long serialVersionUID = 4223708336037089125L;private static final Log log = LogFactory.getLog(TransferBolt.class);public TransferBolt() {log.warn("&&&&&&&&&&&&&&&&& TransferBolt constructor method invoked");}@Overridepublic void prepare(Map stormConf, TopologyContext context) {log.warn("################# TransferBolt prepare() method invoked");}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {log.warn("################# TransferBolt declareOutputFields() method invoked");declarer.declare(new Fields("word"));}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {log.warn("################# TransferBolt execute() method invoked");String word = input.getStringByField("str");collector.emit(new Values(word));}}

3,编写WriterBolt

package cn.storm.bolt;import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
/*** 将接收到的单词写入到一个文件当中* @author Administrator**/
public class WriterBolt extends BaseBasicBolt {private static final long serialVersionUID = -6586283337287975719L;private static final Log log = LogFactory.getLog(WriterBolt.class);private FileWriter writer = null;public WriterBolt() {log.warn("&&&&&&&&&&&&&&&&& WriterBolt constructor method invoked");}@Overridepublic void prepare(Map stormConf, TopologyContext context) {log.warn("################# WriterBolt prepare() method invoked");try {writer = new FileWriter("/home/" + this);} catch (IOException e) {log.error(e);throw new RuntimeException(e);}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {log.warn("################# WriterBolt declareOutputFields() method invoked");}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {log.warn("################# WriterBolt execute() method invoked");String s = input.getString(0);try {writer.write(s);writer.write("\n");writer.flush();} catch (IOException e) {log.error(e);throw new RuntimeException(e);}}}

4,编写TopoMain

package cn.storm.topology;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import cn.itcast.storm.bolt.TransferBolt;
import cn.itcast.storm.bolt.WriterBolt;
import cn.itcast.storm.spout.RandomWordSpout;public class TopoMain {private static final Log log = LogFactory.getLog(TopoMain.class);/*** @param args*/public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("random", new RandomWordSpout(), 2);builder.setBolt("transfer", new TransferBolt(), 4).shuffleGrouping("random");builder.setBolt("writer", new WriterBolt(), 4).fieldsGrouping("transfer", new Fields("word"));Config conf = new Config();conf.setNumWorkers(2);conf.setDebug(true);log.warn("$$$$$$$$$$$ submitting topology...");StormSubmitter.submitTopology("life-cycle", conf, builder.createTopology());log.warn("$$$$$$$4$$$ topology submitted !");}}

这篇关于storm-0.8.2集群模式安装部署的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1029667

相关文章

闲置电脑也能活出第二春?鲁大师AiNAS让你动动手指就能轻松部署

对于大多数人而言,在这个“数据爆炸”的时代或多或少都遇到过存储告急的情况,这使得“存储焦虑”不再是个别现象,而将会是随着软件的不断臃肿而越来越普遍的情况。从不少手机厂商都开始将存储上限提升至1TB可以见得,我们似乎正处在互联网信息飞速增长的阶段,对于存储的需求也将会不断扩大。对于苹果用户而言,这一问题愈发严峻,毕竟512GB和1TB版本的iPhone可不是人人都消费得起的,因此成熟的外置存储方案开

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

Zookeeper安装和配置说明

一、Zookeeper的搭建方式 Zookeeper安装方式有三种,单机模式和集群模式以及伪集群模式。 ■ 单机模式:Zookeeper只运行在一台服务器上,适合测试环境; ■ 伪集群模式:就是在一台物理机上运行多个Zookeeper 实例; ■ 集群模式:Zookeeper运行于一个集群上,适合生产环境,这个计算机集群被称为一个“集合体”(ensemble) Zookeeper通过复制来实现

CentOS7安装配置mysql5.7 tar免安装版

一、CentOS7.4系统自带mariadb # 查看系统自带的Mariadb[root@localhost~]# rpm -qa|grep mariadbmariadb-libs-5.5.44-2.el7.centos.x86_64# 卸载系统自带的Mariadb[root@localhost ~]# rpm -e --nodeps mariadb-libs-5.5.44-2.el7

Centos7安装Mongodb4

1、下载源码包 curl -O https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-rhel70-4.2.1.tgz 2、解压 放到 /usr/local/ 目录下 tar -zxvf mongodb-linux-x86_64-rhel70-4.2.1.tgzmv mongodb-linux-x86_64-rhel70-4.2.1/

HDFS—集群扩容及缩容

白名单:表示在白名单的主机IP地址可以,用来存储数据。 配置白名单步骤如下: 1)在NameNode节点的/opt/module/hadoop-3.1.4/etc/hadoop目录下分别创建whitelist 和blacklist文件 (1)创建白名单 [lytfly@hadoop102 hadoop]$ vim whitelist 在whitelist中添加如下主机名称,假如集群正常工作的节

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

Centos7安装JDK1.8保姆版

工欲善其事,必先利其器。这句话同样适用于学习Java编程。在开始Java的学习旅程之前,我们必须首先配置好适合的开发环境。 通过事先准备好这些工具和配置,我们可以避免在学习过程中遇到因环境问题导致的代码异常或错误。一个稳定、高效的开发环境能够让我们更加专注于代码的学习和编写,提升学习效率,减少不必要的困扰和挫折感。因此,在学习Java之初,投入一些时间和精力来配置好开发环境是非常值得的。这将为我

阿里开源语音识别SenseVoiceWindows环境部署

SenseVoice介绍 SenseVoice 专注于高精度多语言语音识别、情感辨识和音频事件检测多语言识别: 采用超过 40 万小时数据训练,支持超过 50 种语言,识别效果上优于 Whisper 模型。富文本识别:具备优秀的情感识别,能够在测试数据上达到和超过目前最佳情感识别模型的效果。支持声音事件检测能力,支持音乐、掌声、笑声、哭声、咳嗽、喷嚏等多种常见人机交互事件进行检测。高效推

在JS中的设计模式的单例模式、策略模式、代理模式、原型模式浅讲

1. 单例模式(Singleton Pattern) 确保一个类只有一个实例,并提供一个全局访问点。 示例代码: class Singleton {constructor() {if (Singleton.instance) {return Singleton.instance;}Singleton.instance = this;this.data = [];}addData(value)