Flink实时电商数仓(二)

2023-12-20 07:28
文章标签 实时 flink 商数

本文主要是介绍Flink实时电商数仓(二),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

GitLab的用户创建和推送

  1. 在root用户-密码界面重新设置密码
  2. 添加Leader用户和自己使用的用户
  3. 使用root用户创建相应的群组
  4. 使用Leader用户创建对应的项目
  5. 设置分支配置为“初始推送后完全保护”
  6. 设置.gitignore文件,项目配置文件等其他非通用代码无需提交
  7. 安装gitlab project 2020插件
  8. 点击share project on gitlab 即可将项目上传到gitlab中

Flink集群的搭建

  • 只需要运行Yarn模式
  • 配置Hadoop的环境变量
    在这里插入图片描述
  • 将Flink1.17解压安装到对应为止即可

Hbase的配置

  1. 依赖zookeeper和hadoop这两个框架
  2. 检查Hadoop是否退出安全模式,如果丢失文件,先退出安全模式,hdfs dfsadmin -safemode leave
  3. 解压Hbase2.4.11的安装包
  4. 添加Hbase的环境变量
    在这里插入图片描述
  5. 修改配置文件
    • hbase-env.xml
      • export hbase_manages_zk=false 不使用自带的zookeeper
    • hbase-site.xml
      • hbase.cluster.distributed = true 使用集群模式
      • hbase.zookeeper.quorum = hadoop102… zookeeper连接地址
      • hbase.rootdir = hdfs://hadoop102:8020, hbase在hdfs的存放根路径
      • hbase.wal.provider = filesystem 预写日志
    • regionservers: 添加hbase小弟的主机名称

Redise的配置

  1. 进入redise目录,执行make指令进行编译
  2. make instanll安装
  3. 将myredis.conf文件复制到~/目录下
  4. 将bind 127.0.0.1 注释掉,并且关闭保护模式
  5. 设置daemon 后台启动模式为yes
  6. redis-server ./my_redis.conf后台启动

实时数仓ODS层

  • 保证数据模拟器产生的数据是有序的
    • 设置mock.if-realtime:1,重复执行数据模拟器产生数据时,会从当前时间继续产生数据。
    • Kafka数据有序:Flink并发度和Kafka的分区数一致
      • 设置三个kafka节点的分区个数都为4,num.partitions=4
      • Flink的并发度=4
  • 历史维度数据
    • 使用maxwell的bootstrap功能初始化维度信息(json格式),写入到kafka
    • 编写mysql_to_kafka_init.sh脚本
    • maxwell需要检查是否连接mysql的binlog成功,查看日志;如果出错,需要在mysql的maxwell库中删除所有表即可

实时数仓dim层

  • dim层的设计依据是维度建模理论,并且遵循三范式,使用雪花模型
  • dim层的数据存储在Hbase中
  • 开发时需要切换到dev开发分支
  • 为Flink的开发创建一个基类,名为BaseApp
    • 抽象方法handle(): 每个主程序的业务逻辑
    • 具体方法start():里面实现Flink代码的通用逻辑
  • 不同分组的数据只能消费一次,如果数据需要给多个程序使用,就需要分为不同的group

Flink-cdc获取维度信息

  1. 数据清洗
  2. 动态拆分维度表功能
    • 方式1:直接将维度表做成List< String > (维度表名称)保存
      • 如果将代码写死,后续想要修改,需要重新编译修改
    • 方式2:将维度表名称设计为单独的一个配置文件,而不是在代码里面写死;后续想要修改,直接改配置文件,重启任务即可生效
    • 方式3:热修改hotfix, 热加载配置文件,不需要重启;热加载文件一般是以时间周期作为加载逻辑。时间长时会出现时效性问题,时间短的话过于耗费资源。
    • 方式4:zookeeper的watch的监控,能够存储基础的表名,但是不适合存储完整的表格信息,除了要判断哪些是维度表,还需要记录哪些数据需要写出到Hbase。
    • 方式5:cdc,变更数据抓取,类似与maxwell。
  3. 注意:运行下面的代码需要再虚拟机的/etc/my.cnf文件中开启对应数据库的binlog日志。注意对照库名是否填写正确。
public class Test02 {public static void main(String[] args) {//创建env//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(4);System.setProperty("HADOOP_USER_NAME", "atguigu");//设置检查点和状态后端// 1.4 状态后端及检查点相关配置// 1.4.1 设置状态后端env.setStateBackend(new HashMapStateBackend()); 1.4.2 开启 checkpoint//env.enableCheckpointing(5000); 1.4.3 设置 checkpoint 模式: 精准一次//env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); 1.4.4 checkpoint 存储//env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/gmall2023/stream/" + "test01"); 1.4.5 checkpoint 并发数//env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 1.4.6 checkpoint 之间的最小间隔//env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); 1.4.7 checkpoint  的超时时间//env.getCheckpointConfig().setCheckpointTimeout(10000); 1.4.8 job 取消时 checkpoint 保留策略//env.getCheckpointConfig().setExternalizedCheckpointCleanup(RETAIN_ON_CANCELLATION);//读取数据//mysql sourceMySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname(Constant.MYSQL_HOST).port(Constant.MYSQL_PORT).username(Constant.MYSQL_USER_NAME).password(Constant.MYSQL_PASSWORD).databaseList("gmall2023_config").tableList("gmall2023_config.table_process_dim").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();DataStreamSource<String> ds = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),"kafkasource").setParallelism(1);ds.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

这篇关于Flink实时电商数仓(二)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/515201

相关文章

C#实战|大乐透选号器[6]:实现实时显示已选择的红蓝球数量

哈喽,你好啊,我是雷工。 关于大乐透选号器在前面已经记录了5篇笔记,这是第6篇; 接下来实现实时显示当前选中红球数量,蓝球数量; 以下为练习笔记。 01 效果演示 当选择和取消选择红球或蓝球时,在对应的位置显示实时已选择的红球、蓝球的数量; 02 标签名称 分别设置Label标签名称为:lblRedCount、lblBlueCount

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

三.海量数据实时分析-FlinkCDC实现Mysql数据同步到Doris

FlinkCDC 同步Mysql到Doris 参考:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/get-started/quickstart/mysql-to-doris/ 1.安装Flink 下载 Flink 1.18.0,下载后把压缩包上传到服务器,使用tar -zxvf flink-xxx-

【IPV6从入门到起飞】4-RTMP推流,ffmpeg拉流,纯HTML网页HLS实时直播

【IPV6从入门到起飞】4-RTMP推流,ffmpeg拉流,纯HTML网页HLS实时直播 1 背景2 搭建rtmp服务器2.1 nginx方案搭建2.1.1 windows 配置2.1.2 linux 配置 2.2 Docker方案搭建2.2.1 docker 下载2.2.2 宝塔软件商店下载 3 rtmp推流3.1 EV录屏推流3.2 OBS Studio推流 4 ffmpeg拉流转格式

大数据之Flink(二)

4、部署模式 flink部署模式: 会话模式(Session Mode)单作业模式(Per-Job Mode)应用模式(Application Mode) 区别在于集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里执行。 4.1、会话模式 先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时资源已经确定,所有提交的作业会晶振集群中的资源。适合规模小、执

Ubuntu 标题栏实时显示网速CPU内存

1.用 wget 下载 indicator-sysmonitor,终端执行命令: $ wget -c https://launchpad.net/indicator-sysmonitor/trunk/4.0/+download/indicator-sysmonitor_0.4.3_all.deb2.安装依赖: sudo apt-get install python python-psu

第一款实时网络游戏的开发历程全解

“我的兴趣是创建世界,而不是生活在别人创建的世界里。我希望游戏世界能让人们能跳出现实世界的局限,去尝试新的身份……不是要脱胎换骨,而是让他们找到自己真正的归属”。所以他创造了第一个网络世界。      特鲁布肖所开发的MUD1(为区别这款游戏与MUD这一游戏类型,后文游戏名统一为MUD1)依然是一个纯文字的世界,没有任何图片,但是不同计算机前的玩家可以在游戏里共同冒险、交流。   与以往具有

CVPR 2024最新论文分享┆YOLO-World:一种实时开放词汇目标检测方法

论文分享简介 本推文主要介绍了CVPR 2024上的一篇论文《YOLO-World: Real-Time Open-Vocabulary Object Detection》,论文的第一作者为Tianheng Cheng和Lin Song,该论文提出了一种开放词汇目标检测的新方法,名为YOLO-World。论文通过引入视觉-语言建模和大规模预训练解决了传统YOLO检测器在固定词汇检测中的局限性。论

是谁还不会flink的checkpoint呀~

1、State Vs Checkpoint State:状态,是Flink中某一个Operator在某一个时刻的状态,如maxBy/sum,注意State存的是历史数据/状态,存在内存中。 Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息 一句话概括: Checkpoint就是State的快照 目的:假设作业停止了,下次启动的

el-table 封装表格(完整代码-实时更新)

最新更新时间: 2024年9月6号 1. 添加行内编辑、表头搜索 <template><!-- 简单表格、多层表头、页码、没有合并列行 --><div class="maintenPublictable"element-loading-background="rgba(255,255,255,0.5)"><!--cell-style 改变某一列行的背景色 --><!-- tree-props