flume采集数据到kafka和hive

2024-06-17 13:58
文章标签 数据 采集 hive kafka flume

本文主要是介绍flume采集数据到kafka和hive,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

  1. 构建ftp服务
    在安装flume的机器上添加sftp服务
useradd flumetest
passwd flumetest

#ubuntu-查看所有用户
cat /etc/shadow

apt-get install vsftpd
#查看
service vsftpd status
#创建接受数据目录
mkdir /home/flumetest/alarm

在vsftpd服务配置文件中设置:

# Allowanonymous FTP? (Disabled by default)
anonymous_enable=NO#Uncomment this to enable any form of FTP write command.
write_enable=YES#chroot_local_user=YES
chroot_list_enable=YES
#(default follows)
chroot_list_file=/etc/vsftpd.chroot_list
  1. 配置kafka
    参考网址:http://kafka.apache.org/quickstart
#启动kafka,kafka节点都需要启动
nohup sh bin/kafka-server-start.sh config/server.properties > /dev/null2>&1 &#创建topic
bin/kafka-topics.sh --create --zookeeper 116.62.*.*:2181--replication-factor 2 --partitions 2 --topic alarm#查看topic List
bin/kafka-topics.sh --list --zookeeper 116.62.*.*:2181启动consumer,查看数据
bin/kafka-console-consumer.sh --bootstrap-server 116.62.*.*:9092--topic alarm --from-beginning#删除topic
bin/kafka-topics.sh --zookeeper 116.62.*.*:2181 --delete --topic alarm
#如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion,此时你若想真正删除它,可以如下操作:
#(1)登录zookeeper客户端:命令:./bin/zookeeper-client
#(2)找到topic所在的目录:ls /brokers/topics
#(3)找到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。

3 创建hive表

create table if not EXISTS alarm_no_partition(alm_timestring,alm_timeMs int,tag_NameID string,alm_Type int,priID int,alm_Ack_Timestring,alm_Ack_TimeMs int,alm_Group int,alm_Sub_Area int,tag_Data_Typestring,alm_Ack_Flg string,alm_Remove_Flg string,alm_Remove_Timestring,alm_Remove_TimeMs int,alarm_date string) clusteredby (priID) into 2 buckets stored as orc TBLPROPERTIES("transactional"="true");

4 配置flume
参考网址:http://flume.apache.org/FlumeUserGuide.html
配置flume-conf.properties文件:一个source,多个sink、

channel
agent1.sources= alarms1
agent1.channels= alarmc1 alarmc2 alarmc3
agent1.sinks= alarmk1 alarmk2 alarmk3#SpoolingDirectory
#setalarms1
agent1.sources.alarms1.type=spooldir
agent1.sources.alarms1.spoolDir=/home/flumetest/alarm/
agent1.sources.alarms1.channels=alarmc1alarmc2 alarmc3
agent1.sources.alarms1.fileHeader= false#setalarmk1
agent1.sinks.alarmk1.channel=alarmc1
agent1.sinks.alarmk1.type= org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.alarmk1.topic= alarm
agent1.sinks.alarmk1.kafka.bootstrap.servers= 116.62.*.*:9092;116.62.*.*:9092;116.62.*.*:9092
agent1.sinks.alarmk1.kafka.flumeBatchSize= 20
agent1.sinks.alarmk1.kafka.producer.acks= 1
agent1.sinks.alarmk1.kafka.producer.linger.ms= 1
agent1.sinks.alarmk1.kafka.producer.compression.type= snappy
#setalarmk2
agent1.sinks.alarmk2.channel=alarmc2
agent1.sinks.alarmk2.type=hive
agent1.sinks.alarmk2.hive.metastore= thrift://127.0.0.1:9083
agent1.sinks.alarmk2.hive.database= alarm
agent1.sinks.alarmk2.hive.table= alarm_no_partition
#agent1.sinks.alarmk2.hive.partition=%{alarm_date}
agent1.sinks.alarmk2.useLocalTimeStamp= false
#agent1.sinks.alarmk2.roundValue= 10
#agent1.sinks.alarmk2.roundUnit= minute
agent1.sinks.alarmk2.serializer= DELIMITED
agent1.sinks.alarmk2.serializer.delimiter=,
agent1.sinks.alarmk2.serializer.serdeSeparator='\t'
agent1.sinks.alarmk2.serializer.fieldnames=alm_time,alm_timems,tag_nameid,alm_type,priid,alm_ack_time,alm_ack_timems,alm_group,alm_sub_area,tag_data_type,alm_ack_flg,alm_remove_flg,alm_remove_time,alm_remove_timems,alarm_date
#setalarmk3
#setalarmk3
#agent1.sinks.alarmk3.channel=alarmc3
#agent1.sinks.alarmk3.type=hbase
#agent1.sinks.alarmk3.table=alarm_test
#agent1.sinks.alarmk3.columnFamily=
#agent1.sinks.alarmk3.serializer=org.apache.flume.sink.hbase.RegexHbaseEventSerializer#setalarmc1
agent1.channels.alarmc1.type= memory
agent1.channels.alarmc1.capacity= 1000 
agent1.channels.alarmc1.transactionCapacity= 100
#setalarmc2
agent1.channels.alarmc2.type= memory
agent1.channels.alarmc2.capacity= 1000 
agent1.channels.alarmc2.transactionCapacity= 100
#setalarmc3
agent1.channels.alarmc3.type= memory
agent1.channels.alarmc3.capacity= 1000 
agent1.channels.alarmc3.transactionCapacity= 100

启动flume

nohup bin/flume-ng agent -cconf -f conf/flume-conf.properties -n agent1 -Dflume.root.logger=INFO,LOGFILE-Dflume.log.dir=logs >> /dev/null 2>&1

5 加载数据
向目录中添加文件加载完成文件后缀添加.COMPLETED

这篇关于flume采集数据到kafka和hive的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1069638

相关文章

【服务器运维】MySQL数据存储至数据盘

查看磁盘及分区 [root@MySQL tmp]# fdisk -lDisk /dev/sda: 21.5 GB, 21474836480 bytes255 heads, 63 sectors/track, 2610 cylindersUnits = cylinders of 16065 * 512 = 8225280 bytesSector size (logical/physical)

SQL Server中,查询数据库中有多少个表,以及数据库其余类型数据统计查询

sqlserver查询数据库中有多少个表 sql server 数表:select count(1) from sysobjects where xtype='U'数视图:select count(1) from sysobjects where xtype='V'数存储过程select count(1) from sysobjects where xtype='P' SE

数据时代的数字企业

1.写在前面 讨论数据治理在数字企业中的影响和必要性,并介绍数据治理的核心内容和实践方法。作者强调了数据质量、数据安全、数据隐私和数据合规等方面是数据治理的核心内容,并介绍了具体的实践措施和案例分析。企业需要重视这些方面以实现数字化转型和业务增长。 数字化转型行业小伙伴可以加入我的星球,初衷成为各位数字化转型参考库,星球内容每周更新 个人工作经验资料全部放在这里,包含数据治理、数据要

如何在Java中处理JSON数据?

如何在Java中处理JSON数据? 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿!今天我们将探讨在Java中如何处理JSON数据。JSON(JavaScript Object Notation)作为一种轻量级的数据交换格式,在现代应用程序中被广泛使用。Java通过多种库和API提供了处理JSON的能力,我们将深入了解其用法和最佳

WordPress网创自动采集并发布插件

网创教程:WordPress插件网创自动采集并发布 阅读更新:随机添加文章的阅读数量,购买数量,喜欢数量。 使用插件注意事项 如果遇到404错误,请先检查并调整网站的伪静态设置,这是最常见的问题。需要定制化服务,请随时联系我。 本次更新内容 我们进行了多项更新和优化,主要包括: 界面设置:用户现在可以更便捷地设置文章分类和发布金额。代码优化:改进了采集和发布代码,提高了插件的稳定

两个基因相关性CPTAC蛋白组数据

目录 蛋白数据下载 ①蛋白数据下载 1,TCGA-选择泛癌数据  2,TCGA-TCPA 3,CPTAC(非TCGA) ②蛋白相关性分析 1,数据整理 2,蛋白相关性分析 PCAS在线分析 蛋白数据下载 CPTAC蛋白组学数据库介绍及数据下载分析 – 王进的个人网站 (jingege.wang) ①蛋白数据下载 可以下载泛癌蛋白数据:UCSC Xena (xena

常用MQ消息中间件Kafka、ZeroMQ和RabbitMQ对比及RabbitMQ详解

1、概述   在现代的分布式系统和实时数据处理领域,消息中间件扮演着关键的角色,用于解决应用程序之间的通信和数据传递的挑战。在众多的消息中间件解决方案中,Kafka、ZeroMQ和RabbitMQ 是备受关注和广泛应用的代表性系统。它们各自具有独特的特点和优势,适用于不同的应用场景和需求。   Kafka 是一个高性能、可扩展的分布式消息队列系统,被设计用于处理大规模的数据流和实时数据传输。它

BD错误集锦9——查询hive表格时出错:Wrong FS: hdfs://s233/user/../warehouse expected: hdfs://mycluster

集群环境描述:HDFS集群处于HA模式下,同时启动了YARN\JN\KAFKA\ZK。 现象: FAILED: SemanticException Unable to determine if hdfs://s233/user/hive/warehouse/mydb.db/ext_calllogs_in_hbase is encrypted: java.lang.IllegalArgument

BD错误集锦1——[Hive]ERROR StatusLogger No log4j2 configuration file found. Using default configuration:

错误描述:在使用IDEA进行jdbc方式连接到hive数据仓库时,出现以下错误:                ERROR StatusLogger No log4j2 configuration file found. 问题原因:缺少log4j2.xml文件   <?xml version="1.0" encoding="UTF-8"?><Configuration><Appender

小红书商家电话采集软件使用指南

使用小红书商家电话采集软件可以提高商家电话的采集效率,以下是使用指南及附带代码。 步骤一:安装Python和相关库 首先,确保你的电脑已经安装了Python运行环境(建议安装Python3版本)。安装完成后,同样需要安装一些相关的库,如requests、beautifulsoup4等。在命令行窗口中输入以下命令进行安装: pip install requestspip install bea