离线电商数仓知识笔记沉淀-流程及用户行为采集平台

2024-03-15 04:59

本文主要是介绍离线电商数仓知识笔记沉淀-流程及用户行为采集平台,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

learn by :尚硅谷数仓6.0

前置储备:

采集项目和数仓项目的区别:二者具有独立性

1功能角度:

采集项目:以数据的采集、传输为主

数仓项目:以数据计算为主、同时可以储存数据

2以功能延伸到技术区别:

采集项目:flume、kafka、datax、maxwell

数仓项目:mysql、hdfs、spark、Flink、MR、hive

数据库和数据仓库区分:

名称区分:

数据库database:基础核心的数据

数据仓库 data Warehouse:warehouse还有大商店之意,所以存数据不是数据仓库的本质目的,是为了把数据加工处理后对外提供服务

数据来源区分:

数据库:企业中基础核心的业务数据

数据仓库:数据库中的数据

数据存储区分:

数据库:核心作用是查找业务数据,行式存储,索引(快速定位),不能存储海量数据

数据仓库:核心作用是统计分析数据,列式存储,存储海量数据(数据越多统计越准确)

数据价值区分:

数据库:保障全企业全业务正常运行

数据仓库:将数据的统计结果为企业的经营决策提供数据支撑,不是数据流转的终点,需要将统计结果将可视化平台呈现给客户

数据流传过程

(不完整,只记录涉及到的)

客户端——业务服务器(业务数据,行为数据)——数据存储(业务数据库,行为日志)——数据的统计分析(数据仓库)—— 数据可视化(可视化平台,图表)

数据仓库:核心功能统计分析(Hive)

Spark, MR, Flink

SQL:两条技术路线  Spark On Hive ; Hive On Spark,区别:谁解析SQL谁放前面

因为Hive基于Hadoop,所以国内开发用的多一点

统计分析的基本步骤:

数据源——对接并加工数据——统计数据(算一下)——分析数据

数据仓库也应遵循这个步骤

数据存储————数据可视化

如果将数据库直接作为数据仓库的数据源会出现的问题:

1.业务数据库为行式存储,而数据仓库是列式存储,数据不能直接对接——需要把行式数据转化为列式数据

2.业务数据库中存储的数据不是海量,但数仓要求海量,所以直接对接数据量不够

3.数据库不是为了数据仓库服务的,访问会对数据库造成性能影响

所以数据仓库应该设计一个自己的数据源,为了代替和补充数据库:数据存储应和数据库同步的(汇总)

数据仓库的开发用SQL语言进行处理,那么数据的处理步骤应该采用什么方法?

需要将数据转化为结构数据——表,且每一步都有表(应对多需求,类似缓存)

数据仓库的数据源数据需要从数据库中周期性同步,一般将这个同步过程称之为采集

若数据采集时,如果想要将数据同步到数据仓库的数据源,那么就必须知道表结构

        那么采集项目和数据仓库项目就会存在耦合性,但二者应有独立性

所以实际开发中,需要将采集项目和数仓项目解耦合

数据存储—— HDFS(解耦合)——数据仓库数据源

        原理:data&file——HDFS(file)—— hive(table)=> HDFS(file)(hive将磁盘文件管     理成表),如果不选择hive,解耦合用hdfs就未必合适

数据存储(MySQL)—— 数据采集(DataX,Maxwell(data—file),Flume(file—file))——HDFS(解耦合)——数据仓库数据源

数据采集部分

数仓部分

数仓尾巴(Hive)— MySQL (解耦合)— 数据可视化

集群资源规划

1)生产集群(参考腾讯云EMR官方推荐部署

        Master节点:管理节点,保证集群的调度正常运行;主要部署NameNode, ResourceManager, HMaster等进程;非HA模式下数量为1,HA模式数量为2。

注:HA模式-------------------

在数据仓库(Data Warehouse)搭建中,"HA" 模式通常指的是高可用性(High Availability)模式。高可用性是指系统能够在面对硬件故障、软件故障或其他可预见的事件时保持持续运行而不中断服务的能力。

在数据仓库的环境中,高可用性模式可以确保数据仓库系统的稳定性和可靠性,以确保业务的持续运行和数据的安全性。一般来说,实现高可用性模式需要采取多种技术手段,包括但不限于:

  1. 故障转移(Failover):在主节点(Primary Node)出现故障时,系统能够自动切换到备用节点(Secondary Node)以保持服务的连续性。

  2. 负载均衡(Load Balancing):将流量分发到多个节点上,以防止某个节点过载,从而提高整个系统的稳定性和性能。

  3. 数据复制(Data Replication):将数据复制到多个节点上,以确保即使某个节点发生故障,数据仍然可以从其他节点获取,确保数据的可用性和一致性。

  4. 监控和自动恢复(Monitoring and Automatic Recovery):实时监控系统的运行状况,当检测到异常时,自动触发相应的恢复机制,尽快恢复服务。

  5. 灾难恢复(Disaster Recovery):建立备份系统或数据中心,以应对灾难性事件,确保即使整个数据中心或系统发生严重故障,业务也能够在短时间内恢复运行。

        高可用性模式在数据仓库中尤为重要,因为数据仓库通常承载着企业的重要业务数据和决策支 持信息。通过采取高可用性措施,可以最大限度地减少系统停机时间,提高业务连续性和数据安全性。

---------------------

Core节点:为计算及存储节点,在HDFS中的数据全部存储于core节点中,因为为保证数据安全,扩容Core节点后不允许缩容;主要部署DataNode, NodeManager, RegionServer等进程。非HA>=2, HA>=3。

Common节点:为HA集群Master节点提供数据共享同步已经高可用容错服务;主要部署分布式协调器组件,如ZooKeeper,JournalNode等节点,非HA为0,HA>=3。

消耗内存的分开部署

数据传输数据比较紧密的放在一起(Kafka,ClickHouse)

客户端尽量放到一到两台服务器上,方便外部访问

有依赖关系的尽量放到同一台服务器(如:Ds-worker和Hive/Spark)

Master

Master

core

core

core

common

common

common

nn

nn

dn

dn

dn

JournalNode

JournalNode

JournalNode

rm

rm

nm

nm

nm

zk

zk

zk

hive

hive

hive

hive

hive

kafka

kafka

kafka

spark

spark

spark

spark

spark

datax

datax

datax

datax

datax

Ds-master

Ds-master

Ds-worker

Ds-worker

Ds-worker

maxwell

superset

mysql

flume

flume

flink

flink

clickhouse

redis

hbase 

2)测试集群服务器规划

服务名称

服务

服务器

hadoop102

服务器

hadoop103

服务器

hadoop104

HDFS

NameNode

DataNode

SecondaryNameNode

Yarn

NodeManager

Resourcemanager

Zookeeper

Zookeeper Server

Flume(采集日志

Flume

Kafka

Kafka

Flume

消费Kafka日志

Flume

Flume

消费Kafka业务

Flume

Hive

MySQL

MySQL

DataX

Spark

DolphinScheduler

ApiApplicationServer

AlertServer

MasterServer

WorkerServer

LoggerServer

Superset

Superset

Flink

ClickHouse

Redis

Hbase

服务数总计

20

11

12

用户行为日志

包括用户的各项行为信息以及行为所处的环境信息,收集手段通常为埋点。

主流埋点方式:代码埋点、可视化埋点、全埋点

用户行为日志内容:

        本项目收集和分析的用户行为信息主要有页面浏览记录、动作记录、曝光记录、启动记录和错误记录

页面浏览记录:记录的是访客对页面的浏览行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息及页面信息等。

动作记录:记录的是用户的业务操作行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息 及动作目标对象信息等。

曝光记录:记录的是曝光行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息及曝光对象信息等。

启动记录:记录的是用户启动应用的行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息、启动类型及开屏广告信息等。

错误记录:记录的是用户在使用应用过程中的报错行为,该行为的环境信息主要有用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息、以及可能与报错相关的页面信息、动作信息、曝光信息和动作信息。

日志格式:大致分两类:页面日志和启动日志

页面日志:以页面浏览为单位,即一个页面浏览记录,生成一条页面埋点日志。一条完整的页面日志包含,一个页面浏览记录,若干个用户在该页面所做的动作记录,若干个该页面的曝光记录,以及一个在该页面发生的报错记录。除上述行为信息,页面日志还包含了这些行为所处的各种环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。

{"common": {                     -- 环境信息"ar": "15",                 -- 省份ID"ba": "iPhone",             -- 手机品牌"ch": "Appstore",           -- 渠道"is_new": "1",              -- 是否首日使用,首次使用的当日,该字段值为1,过了24:00,该字段置为0。"md": "iPhone 8",           -- 手机型号"mid": "YXfhjAYH6As2z9Iq",  -- 设备id"os": "iOS 13.2.9",         -- 操作系统"sid": "3981c171-558a-437c-be10-da6d2553c517"     -- 会话id"uid": "485",               -- 会员id"vc": "v2.1.134"            -- app版本号},"actions": [{                   -- 动作(事件)"action_id": "favor_add",   -- 动作id"item": "3",                -- 目标id"item_type": "sku_id",      -- 目标类型"ts": 1585744376605         -- 动作时间戳}],"displays": [{                  -- 曝光"displayType": "query", -- 曝光类型"item": "3",            -- 曝光对象id"item_type": "sku_id",  -- 曝光对象类型"order": 1,             -- 出现顺序"pos_id": 2             -- 曝光位置"pos_seq": 1             -- 曝光序列号(同一坑位多个对象的编号)},{"displayType": "promotion","item": "6","item_type": "sku_id","order": 2,"pos_id": 1"pos_seq": 1},{"displayType": "promotion","item": "9","item_type": "sku_id","order": 3,"pos_id": 3"pos_seq": 1},{"displayType": "recommend","item": "6","item_type": "sku_id","order": 4,"pos_id": 2"pos_seq": 1},{"displayType": "query ","item": "6","item_type": "sku_id","order": 5,"pos_id": 1"pos_seq": 1}],"page": {                          -- 页面信息"during_time": 7648,           -- 持续时间毫秒"item": "3", 	               -- 目标id"item_type": "sku_id",         -- 目标类型"last_page_id": "login",       -- 上页ID"page_id": "good_detail",      -- 页面ID"from_pos_id":999,           -- 来源坑位ID
"from_pos_seq":999,           -- 来源坑位序列号
"refer_id":"2",			  -- 外部营销渠道ID"sourceType": "promotion"      -- 来源类型},                                 "err": {                           --错误"error_code": "1234",          --错误码"msg": "***********"           --错误信息},                                 "ts": 1585744374423                --跳入时间戳
}

启动日志:以启动为单位,及一次启动行为,生成一条启动日志。一条完整的启动日志包括一个启动记录,一个本次启动时的报错记录,以及启动时所处的环境信息,包括用户信息、时间信息、地理位置信息、设备信息、应用信息、渠道信息等。

{"common": {"ar": "370000","ba": "Honor","ch": "wandoujia","is_new": "1","md": "Honor 20s","mid": "eQF5boERMJFOujcp",
"os": "Android 11.0",
"sid":"a1068e7a-e25b-45dc-9b9a-5a55ae83fc81""uid": "76","vc": "v2.1.134"},"start": {   
"entry": "icon",         --icon手机图标  notice 通知   install 安装后启动"loading_time": 18803,  --启动加载时间"open_ad_id": 7,        --广告页ID"open_ad_ms": 3449,    -- 广告总共播放时间"open_ad_skip_ms": 1989   --  用户跳过广告时点},
"err":{                     --错误
"error_code": "1234",      --错误码"msg": "***********"       --错误信息
},"ts": 1585744304000
}

服务器和JDK准备

配置hadoop102、hadoop103、hadoop104三台主机(问题及Hadoop相关另行总结)

编写集群分发脚本xsync

1)xsync集群分发脚本

        需求:循环复制文件到所有节点的相同目录下

        需求分析:

        ①rsync命令原始拷贝

rsync  -av     /opt/module  		 root@hadoop103:/opt/

        ②期望脚本:xsync要同步的文件名称

        ③说明:在/home/atguigu/bin这个目录下存放的脚本,atguigu用户可以在系统任何地方直接执行。

[atguigu@hadoop102 ~]$ echo $PATH
/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/home/atguigu/.local/bin:/home/atguigu/bin

脚本实现:

①在用的家目录/home/atguigu下创建bin文件夹

[atguigu@hadoop102 ~]$ mkdir bin

②在/home/atguigu/bin目录下创建xsync文件,以便全局调用

[atguigu@hadoop102 ~]$ cd /home/atguigu/bin
[atguigu@hadoop102 ~]$ vim xsync

        在该文件中编写如下代码

#!/bin/bash#1. 判断参数个数
if [ $# -lt 1 ]
thenecho Not Enough Arguement!exit;
fi#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
doecho ====================  $host  ====================#3. 遍历所有目录,挨个发送for file in $@do#4 判断文件是否存在if [ -e $file ]then#5. 获取父目录pdir=$(cd -P $(dirname $file); pwd)#6. 获取当前文件的名称fname=$(basename $file)ssh $host "mkdir -p $pdir"rsync -av $pdir/$fname $host:$pdirelseecho $file does not exists!fidone
done

③修改脚本xsync具有执行权限

[atguigu@hadoop102 bin]$ chmod 777 xsync

④测试脚本

atguigu@hadoop102 bin]$ xsync xsync

SSH无密登录配置

说明:这里面只配置了hadoop102、hadoop103到其他主机的无密登录;因为hadoop102配置的是NameNode,hadoop103配置的是ResourceManager,都要求对其他节点无密访问。

1)hadoop102上生成公钥和私钥:

[atguigu@hadoop102 .ssh]$ ssh-keygen -t rsa

然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)。

2)将hadoop102公钥拷贝到要免密登录的目标机器上

[atguigu@hadoop102 .ssh]$ ssh-copy-id hadoop102
[atguigu@hadoop102 .ssh]$ ssh-copy-id hadoop103
[atguigu@hadoop102 .ssh]$ ssh-copy-id hadoop104

3)hadoop103上生成公钥和私钥:

[atguigu@hadoop103 .ssh]$ ssh-keygen -t rsa

4)拷贝操作亦同hadoop102

JDK准备

1)卸载三台节点上的现有JDK

[atguigu@hadoop102 opt]# sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps[atguigu@hadoop103 opt]# sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps[atguigu@hadoop104 opt]# sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps

操作批注:

(1)rpm -qa:表示查询所有已经安装的软件包

(2)grep -i:表示过滤时不区分大小写

(3)xargs -n1:表示一次获取上次执行结果的一个值

(4)rpm -e --nodeps:表示卸载软件

2)用XShell工具将JDK导入到hadoop102的/opt/software文件夹下面

3)在Linux系统下的opt目录查看是否导入成功(ls)

4)解压JDK到/opt/module目录下(tar)

[atguigu@hadoop102 software]# tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/[atguigu@hadoop102 module]$ mv jdk1.8.0_212/ jdk-1.8.0

5)配置JDK环境变量

(1)新建/etc/profile.d/my_env.sh文件(在module下sudo vim)

        添加如下内容,然后保存(:wq)退出。

#JAVA_HOME
export JAVA_HOME=/opt/module/jdk-1.8.0
export PATH=$PATH:$JAVA_HOME/bin

(2)让环境变量生效

[atguigu@hadoop102 software]$ source /etc/profile.d/my_env.sh

6)测试安装是否成功(java -version)

7)分发JDK(执行刚才的xsync脚本)

[atguigu@hadoop102 module]$ xsync /opt/module/jdk-1.8.0

8)分发环境变量配置文件

[atguigu@hadoop102 module]$ sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh

9)在hadoop103,hadoop104上分别执行source

环境变量配置说明

        Linux的环境变量可在多个文件中配置,如/etc/profile,/etc/profile.d/*.sh,~/.bashrc,~/.bash_profile等,下面说明上述几个文件之间的关系和区别。

        bash的运行模式可以分为 login shell 和 non-login shell

(例如,我们通过终端,输入用户名、密码,登录系统之后,得到就是一个login shell。而当我们执行以下命令ssh hadoop103 command,在hadoop103执行command的就是一个non-login shell。)

这两种shell的主要区别在于,它们启动时会加载不同的配置文件,login shell启动时会加载/etc/profile,~/.bash_profile~/.bashrcnon-login shell启动时会加载~/.bashrc

数据模拟

1)将application.ymlgmall-remake-mock-2023-02-17.jarpath.jsonlogback.xml上传到hadoop102的/opt/module/applog目录下(需要 mkdir创建)

2)配置文件

①application.yml文件:可以根据需求生成对应日期的用户行为日志

vim出文件后修改内容(照搬尚硅谷,太繁琐。。)

# 外部配置打开
logging.config: ./logback.xml#http模式下,发送的地址
mock:log:type: "file"      #"file" "http" "kafka" "none"http:url: "http://localhost:8090/applog"kafka:server: "hadoop102:9092,hadoop102:9092,hadoop102:9092"topic: "topic_log"spring:datasource:type: com.alibaba.druid.pool.DruidDataSourcedruid:url: jdbc:mysql://hadoop102:3306/gmall?characterEncoding=utf-8&allowPublicKeyRetrieval=true&useSSL=false&serverTimezone=GMT%2B8username: rootpassword: "000000"driver-class-name:  com.mysql.cj.jdbc.Drivermax-active: 20test-on-borrow: truemybatis-plus.global-config.db-config.field-strategy: not_null
mybatis-plus:mapper-locations: classpath:mapper/*.xmlmybatis:mapper-locations: classpath:mapper/*.xml#业务日期, 并非Linux系统时间的日期,而是生成模拟数据的日期
mock.date: "2022-06-08"# 日志是否写入数据库一份  写入z_log表中
mock.log.db.enable: 1# 清空
mock.clear.busi: 1# 清空用户
mock.clear.user: 0# 批量生成新用户
mock.new.user: 0#session次数
mock.user-session.count: 200#设备最大值
mock.max.mid: 1000000# 是否针对实时生成数据,若启用(置为1)则数据的 yyyy-MM-dd 与 mock.date 一致而 HH:mm:ss 与系统时间一致;若禁用则数据的 yyyy-MM-dd 与 mock.date 一致而 HH:mm:ss 随机分布,此处禁用
mock.if-realtime: 0
#访问时间分布权重
mock.start-time-weight: "10:5:0:0:0:0:5:5:5:10:10:15:20:10:10:10:10:10:20:25:30:35:30:20"#支付类型占比 支付宝 :微信 :银联
mock.payment_type_weight: "40:50:10"#页面平均访问时间
mock.page.during-time-ms: 20000#错误概率 百分比
mock.error.rate: 3#每条日志发送延迟 ms
mock.log.sleep: 100#课程详情来源  用户查询,商品推广,智能推荐, 促销活动
mock.detail.source-type-rate: "40:25:15:20"mock.if-cart-rate: 100mock.if-favor-rate: 70mock.if-order-rate: 100mock.if-refund-rate: 50#搜索关键词
mock.search.keyword: "java,python,多线程,前端,数据库,大数据,hadoop,flink"#用户数据变化概率
mock.user.update-rate: 20# 男女浏览品牌比重(11 品牌)
mock.tm-weight.male: "3:2:5:5:5:1:1:1:1:1:1"
mock.tm-weight.female: "1:5:1:1:2:2:2:5:5:5:5"# 外连类型比重(5 种)
mock.refer-weight: "10:2:3:4:5"# 线程池相关配置
mock.pool.core: 20
mock.pool.max-core: 100

②path.json, 用来配置访问路径,根据需求可以灵活配置用户点击路径

[{"path":["start_app","home", "search", "good_list","good_detail","good_detail" ,"good_detail","cart","order","payment","mine","order_list","end"],"rate":100 },{"path":["start_app","home", "good_list","good_detail","good_detail" ,"good_detail","cart","end"],"rate":30 },{"path":["start_app","home", "activity1111","good_detail"  ,"cart","good_detail","cart","order","payment","end"],"rate":30 },{"path":[ "activity1111","good_detail" ,"activity1111" ,"good_detail","order","payment","end"],"rate":200 },{"path":[ "start_app","home" ,"activity1111" ,"good_detail","order","payment","end"],"rate":200 },{"path":[ "start_app","home" , "good_detail","order","payment","end"],"rate":30 },{"path":[  "good_detail","order","payment","end"],"rate":650 },{"path":[  "good_detail"  ],"rate":30 },{"path":[  "start_app","home","mine","good_detail"  ],"rate":30 },{"path":[  "start_app","home", "good_detail","good_detail","good_detail","cart","order","payment","end"  ],"rate":200 },{"path":[  "start_app","home", "search","good_list","good_detail","cart","order","payment","end"  ],"rate":200 }
]

③logback配置文件,可配置日志生成路径

<?xml version="1.0" encoding="UTF-8"?>
<configuration><property name="LOG_HOME" value="/opt/module/applog/log" /><appender name="console" class="ch.qos.logback.core.ConsoleAppender"><target>System.out</target><encoder><pattern>%msg%n</pattern></encoder></appender><appender name="console_em" class="ch.qos.logback.core.ConsoleAppender"><target>System.err</target><encoder><pattern>%msg%n</pattern></encoder></appender><appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender"><file>${LOG_HOME}/app.log</file><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern></rollingPolicy><encoder><pattern>%msg%n</pattern></encoder></appender><!-- 将某一个包下日志单独打印日志 --><logger name="com.atguigu.mock.util.LogUtil"level="INFO" additivity="false"><appender-ref ref="rollingFile" />
<!--           <appender-ref ref="console" />--></logger><logger name="com.atguigu.gmallre.mock.task.UserMockTask" level="INFO" additivity="false" ><appender-ref ref="console_em" /></logger><!--    <logger name="com.alibaba.druid.pool" level="error" additivity="false" >-->
<!--        <appender-ref ref="console" />-->
<!--    </logger>--><!--    <logger  name="com.atguigu.edu2021.mock.mapper" level="debug">-->
<!--         <appender-ref ref="console" />-->
<!--    </logger>--><!--      <logger  name="com.atguigu.edu2021.mock.service.impl.UserInfoServiceImpl" level="debug"><appender-ref ref="console" /></logger>--><root level="error"  ><appender-ref ref="console_em" /><!-- <appender-ref ref="async-rollingFile" />  --></root>
</configuration>

3)生成日志

        进入到/opt/module/applog路径,执行以下命令

[atguigu@hadoop102 applog]$ java -jar gmall-remake-mock-2023-02-17.jar test 100 2022-06-08

其中:

 增加test参数为测试模式,只生成用户行为数据不生成业务数据。

 100 为产生的用户session数一个session默认产生1条启动日志和5条页面方法日志。

 第三个参数为日志数据的日期,测试模式下不会加载配置文件,要指定数据日期只能通过命令行传参实现。

 三个参数的顺序必须与示例保持一致

 第二个参数和第三个参数可以省略,如果test后面不填写参数,默认为1000

在/opt/module/applog/log目录下查看生成日志

[atguigu@hadoop102 log]$ ll

集群日志生成脚本

(1)在/home/atguigu/bin目录下vim脚本lg.sh

(2)在脚本中编写如下内容

#!/bin/bash
echo "========== hadoop102 =========="
ssh hadoop102 "cd /opt/module/applog/; nohup java -jar gmall-remake-mock-2023-02-17.jar $1 $2 $3 >/dev/null 2>&1 &"
done

注:

/opt/module/applog/为jar包及配置文件所在路径

②/dev/null代表Linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称“黑洞”。

标准输入0:从键盘获得输入 /proc/self/fd/0

标准输出1:输出到屏幕(即控制台) /proc/self/fd/1

错误输出2:输出到屏幕(即控制台) /proc/self/fd/2

(3)修改脚本执行权限(chmod 777)

(4)将jar包及配置文件上传至hadoop103/opt/module/applog/路径

(5)启动脚本

atguigu@hadoop102 module]$ lg.sh test 100

(6)分别在hadoop102、hadoop103的/opt/module/applog/log目录上查看生成的数据

[atguigu@hadoop102 log]$ ls
app.log

用户行为数据采集模块

用户行为日志数据通道:

环境准备

集群命令批量执行脚本xcall,vim后修改然后chmod777, 启动是$ xcall jps

#! /bin/bashfor i in hadoop102 hadoop103 hadoop104
doecho --------- $i ----------ssh $i "$*"
done

注:

jps 命令是 Java Virtual Machine Process Status Tool 的缩写,用于显示 Java 虚拟机(JVM)中正在运行的 Java 进程的信息。它通常用于识别在当前系统中正在运行的 Java 进程,以及它们的进程 ID(PID)和主类名。

jps 命令可以在命令行中直接运行,不需要任何参数。它会列出当前系统中正在运行的所有 Java 进程的信息,包括它们的 PID 和主类名。具体来说,jps 命令的输出包括以下信息:

  1. 进程 ID(PID):Java 进程的唯一标识符。
  2. 主类名(Main Class Name):启动 Java 进程时指定的主类名,即包含 main() 方法的类。

jps 命令可以用于以下情况:

  • 识别在系统中运行的 Java 进程,以便进行监控、调试或管理。
  • 确定正在运行的 Java 进程的主类名,以便进一步分析或调试。

Hadoop安装

Zookeeper安装

Kafka安装

Flume安装

日志采集Flume

        Apache Flume 是一个分布式、可靠且高可用的系统,用于收集、聚合和传输大量日志数据或事件数据到中心化数据存储中,如 Apache Hadoop 的 HDFS、Apache HBase、以及 Apache Kafka 等。它的主要作用是简化大规模数据的采集和传输过程,实现数据流的可靠传输,并提供了一套灵活的配置和可扩展的架构,使其能够适应不同类型和规模的数据采集需求。

        按照规划,需要采集的用户行为日志存放在hadoop102,故需要在该节点配置日志采集Flume。日志采集Flume需要采集日志文件内容,并对日志格式(json)进行校验,然后将校验通过的日志发送到Kafka。↓

(注:

Flume 如何将采集的日志传输给 Kafka 的基本流程:

  1. 定义数据流配置: 首先,需要定义 Flume 的数据流配置,包括数据源、数据通道和数据目的地。数据源可以是各种数据来源,如日志文件、网络端口、消息队列等。数据通道用于在 Flume 内部缓存和传输数据。数据目的地则指定数据传输的最终目的地,如 Kafka、HDFS、HBase 等。

  2. 启动 Flume Agent: 根据定义的数据流配置,启动 Flume Agent。Flume Agent 是 Flume 的运行实例,负责接收、处理和传输数据。当 Agent 启动后,它会按照配置从数据源获取数据,并将数据传输到指定的目的地。

  3. (在Flume中使用KafkaChannel可省去这一步)配置 Kafka Sink: 在数据流配置中,需要配置一个 Kafka Sink,用于将数据传输到 Kafka。Kafka Sink 是 Flume 提供的一个插件,用于与 Kafka 交互并将数据写入 Kafka 的 Topic 中。在配置 Kafka Sink 时,需要指定 Kafka 的连接信息、Topic 名称等参数。

  4. 数据传输到 Kafka: 当 Flume Agent 运行时,它会从数据源获取数据,并通过配置的数据通道将数据传输到 Kafka Sink。Kafka Sink 将接收到的数据转换为 Kafka 的消息格式,并将消息写入指定的 Kafka Topic 中。

  5. 数据消费: 一旦数据写入 Kafka Topic,消费者可以使用 Kafka 提供的 API 或工具来消费这些数据。消费者可以订阅相应的 Topic,并实时获取到 Flume 传输的日志数据,进行后续的处理和分析。

通过以上流程,Flume 可以将采集的日志数据可靠地传输到 Kafka 中,实现数据的收集、传输和分发,为后续的数据处理和分析提供了可靠的数据来源。

此处可以选择Flume中TaildirSource和KafkaChannel组件,并配置日志校验拦截器。选择原因:

TaildirSource:

TaildirSource 是 Flume 提供的一个源(Source)组件,用于从文件系统中实时读取日志文件的内容,并将其发送到 Flume 的通道(Channel)中。它可以监视指定目录下的日志文件,不断地读取新增的日志内容,并将其转发给 Flume 的后续组件进行处理。TaildirSource 通常用于实时采集应用程序生成的日志数据。

TailDirSource相比ExecSource、SpoolingDirectorySource的优势。

TailDirSource:断点续传、多目录Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。

ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。

SpoolingDirectorySource监控目录,支持断点续传。

KafkaChannel

KafkaChannel 是 Flume 提供的一个通道(Channel)组件,用于与 Apache Kafka 集成,实现将数据从 Flume 传输到 Kafka 中。KafkaChannel 使用 Kafka 作为底层的数据存储和传输介质,将 Flume 接收到的数据写入 Kafka 的 Topic 中,以便后续的数据消费和处理。KafkaChannel 可以保证数据的可靠性和高吞吐量,适用于大规模的数据传输场景。

采用Kafka Channel,省去了Sink,提高了效率。

日志采集Flume关键配置如下:

日志采集Flume配置实操

1)创建Flume配置文件

        在hadoop102节点的Flume的job目录下创建file_to_kafka.conf。(mkdir,vim)

2)配置文件内容如下

#定义组件
a1.sources = r1
a1.channels = c1#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false#组装 
a1.sources.r1.channels = c1

日志采集Flume测试

1)启动Zookeeper、Kafka集群

2)启动hadoop102的日志采集Flume

3)启动一个Kafka的Console-Consumer

Kafka 的 console-consumer 是 Kafka 提供的一个命令行工具,用于从 Kafka Topic 中消费消息并在控制台上显示。它是 Kafka 提供的一个实用工具,用于在开发和调试过程中查看和验证 Kafka 中的消息数据。

以下是 console-consumer 命令的基本用法和一些常用选项:

kafka-console-consumer.sh --bootstrap-server <broker-list> --topic <topic-name> [options] 

主要参数说明:

  • --bootstrap-server <broker-list>: 指定 Kafka 集群的地址列表,用于连接到 Kafka 集群。例如:localhost:9092

  • --topic <topic-name>: 指定要消费消息的 Kafka Topic 名称。

一些常用选项包括:

  • --from-beginning: 从最早的消息开始消费,即使在之前已经有消费者消费过该 Topic 中的消息,也会重新消费所有消息。

  • --max-messages <num-messages>: 指定最大消费消息的数量。

  • --property <key=value>: 设置 Kafka Consumer 的配置属性,如 auto.offset.reset, group.id 等。

  • 此处操作代码如下:

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log

4)生成数据

5)观察Kafka消费者是否能消费到数据

日志采集Flume启停脚本

在hadoop102节点的/home/atguigu/bin目录下创建脚本f1.sh

#!/bin/bashcase $1 in
"start"){echo " --------启动 hadoop102 采集flume-------"ssh hadoop102 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &"
};; 
"stop"){echo " --------停止 hadoop102 采集flume-------"ssh hadoop102 "ps -ef | grep file_to_kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
};;
esac

之后chmod777,启动 f1.sh start 停止 f1.sh stop

这篇关于离线电商数仓知识笔记沉淀-流程及用户行为采集平台的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/810872

相关文章

Security OAuth2 单点登录流程

单点登录(英语:Single sign-on,缩写为 SSO),又译为单一签入,一种对于许多相互关连,但是又是各自独立的软件系统,提供访问控制的属性。当拥有这项属性时,当用户登录时,就可以获取所有系统的访问权限,不用对每个单一系统都逐一登录。这项功能通常是以轻型目录访问协议(LDAP)来实现,在服务器上会将用户信息存储到LDAP数据库中。相同的,单一注销(single sign-off)就是指

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

流媒体平台/视频监控/安防视频汇聚EasyCVR播放暂停后视频画面黑屏是什么原因?

视频智能分析/视频监控/安防监控综合管理系统EasyCVR视频汇聚融合平台,是TSINGSEE青犀视频垂直深耕音视频流媒体技术、AI智能技术领域的杰出成果。该平台以其强大的视频处理、汇聚与融合能力,在构建全栈视频监控系统中展现出了独特的优势。视频监控管理系统EasyCVR平台内置了强大的视频解码、转码、压缩等技术,能够处理多种视频流格式,并以多种格式(RTMP、RTSP、HTTP-FLV、WebS

综合安防管理平台LntonAIServer视频监控汇聚抖动检测算法优势

LntonAIServer视频质量诊断功能中的抖动检测是一个专门针对视频稳定性进行分析的功能。抖动通常是指视频帧之间的不必要运动,这种运动可能是由于摄像机的移动、传输中的错误或编解码问题导致的。抖动检测对于确保视频内容的平滑性和观看体验至关重要。 优势 1. 提高图像质量 - 清晰度提升:减少抖动,提高图像的清晰度和细节表现力,使得监控画面更加真实可信。 - 细节增强:在低光条件下,抖

sqlite3 相关知识

WAL 模式 VS 回滚模式 特性WAL 模式回滚模式(Rollback Journal)定义使用写前日志来记录变更。使用回滚日志来记录事务的所有修改。特点更高的并发性和性能;支持多读者和单写者。支持安全的事务回滚,但并发性较低。性能写入性能更好,尤其是读多写少的场景。写操作会造成较大的性能开销,尤其是在事务开始时。写入流程数据首先写入 WAL 文件,然后才从 WAL 刷新到主数据库。数据在开始

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

如何解决线上平台抽佣高 线下门店客流少的痛点!

目前,许多传统零售店铺正遭遇客源下降的难题。尽管广告推广能带来一定的客流,但其费用昂贵。鉴于此,众多零售商纷纷选择加入像美团、饿了么和抖音这样的大型在线平台,但这些平台的高佣金率导致了利润的大幅缩水。在这样的市场环境下,商家之间的合作网络逐渐成为一种有效的解决方案,通过资源和客户基础的共享,实现共同的利益增长。 以最近在上海兴起的一个跨行业合作平台为例,该平台融合了环保消费积分系统,在短

Android平台播放RTSP流的几种方案探究(VLC VS ExoPlayer VS SmartPlayer)

技术背景 好多开发者需要遴选Android平台RTSP直播播放器的时候,不知道如何选的好,本文针对常用的方案,做个大概的说明: 1. 使用VLC for Android VLC Media Player(VLC多媒体播放器),最初命名为VideoLAN客户端,是VideoLAN品牌产品,是VideoLAN计划的多媒体播放器。它支持众多音频与视频解码器及文件格式,并支持DVD影音光盘,VCD影

【学习笔记】 陈强-机器学习-Python-Ch15 人工神经网络(1)sklearn

系列文章目录 监督学习:参数方法 【学习笔记】 陈强-机器学习-Python-Ch4 线性回归 【学习笔记】 陈强-机器学习-Python-Ch5 逻辑回归 【课后题练习】 陈强-机器学习-Python-Ch5 逻辑回归(SAheart.csv) 【学习笔记】 陈强-机器学习-Python-Ch6 多项逻辑回归 【学习笔记 及 课后题练习】 陈强-机器学习-Python-Ch7 判别分析 【学