Flink 维表关联

2023-10-25 00:52
文章标签 flink 关联 维表

本文主要是介绍Flink 维表关联,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、实时查询维表

实时查询维表是指用户在 Flink 算子中直接访问外部数据库,比如用 MySQL 来进行关联,这种方式是同步方式,数据保证是最新的。但是,当我们的流计算数据过大,会对外 部系统带来巨大的访问压力,一旦出现比如连接失败、线程池满等情况,由于我们是同步调用,所以一般会导致线程阻塞、Task 等待数据返回,影响整体任务的吞吐量。而且这种方案对外部系统的 QPS 要求较高,在大数据实时计算场景下,QPS 远远高于普通的后台系统,峰值高达十万到几十万,整体作业瓶颈转移到外部系统


public class DimSync  extends RichMapFunction<fplOverview, String> {private static final Logger LOGGER = LoggerFactory.getLogger(DimSync.class);private Connection conn = null;public void open(Configuration parameters) throws Exception {super.open(parameters);conn = DriverManager.getConnection("jdbc:test:3306/mysqldb?characterEncoding=UTF-8", "root", "qyllt1314#");}@Overridepublic String map(fplOverview fplOverview) throws Exception {JSONObject jsonObject = JSONObject.parseObject(fplOverview.toJson());String dp_id = jsonObject.getString("dp_id");//根据 dp_id 查询  上周的 fpl_amount,ywPreparedStatement pst = conn.prepareStatement("select  max(fpl_amount) as fpl_amount,max(yearweek(datatime)) as yw \n" +"from fpl_overview \n" +"where datatime >= date_sub(curdate(), interval weekday(curdate()) + 7 Day)  # 上周第一天\n" +"and datatime < date_sub(curdate(), interval weekday(curdate()) + 0 Day)  # 上周最后一天+1 \n" +"and dp_id = ? \n" +"group by dp_id");pst.setString(1,dp_id);ResultSet resultSet = pst.executeQuery();String fpl_amount = null;String yw = null ;while (resultSet.next()){fpl_amount = resultSet.getString(1);yw = resultSet.getString(2);}pst.close();jsonObject.put("lastweek_fpl_amount",fpl_amount);jsonObject.put("lastweek_yw",yw)return jsonObject.toString();}public void close() throws Exception {super.close();conn.close();}

2、LRU 缓存 (flink 异步Id)

利用 Flink 的 RichAsyncFunction 读取 mysql 的数据到缓存中,我们在关联维度表时先去查询缓存,如果缓存中不存在这条数据,就利用客户端去查询 mysql,然后插入到缓存中。


public class JDBCAsyncFunction  extends RichAsyncFunction<fplOverview, JsonObject> {private SQLClient client;@Overridepublic void open(Configuration parameters) throws Exception {Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(10).setEventLoopPoolSize(10));JsonObject config = new JsonObject().put("url", "jdbc:mysql://rm-bp161be65d56kbt4nzo.mysql.rds.aliyuncs.com:3306/mysqldb?characterEncoding=UTF-8;useSSL=false").put("driver_class", "com.mysql.cj.jdbc.Driver").put("max_pool_size", 10).put("user", "root").put("password", "");client = JDBCClient.createShared(vertx, config);}@Overridepublic void close() throws Exception {client.close();}@Overridepublic void asyncInvoke(fplOverview fplOverview, ResultFuture<JsonObject> resultFuture) throws Exception {client.getConnection(conn -> {if (conn.failed()) {return;}final SQLConnection connection = conn.result();// 执行sqlconnection.query("select  max(fpl_amount) as fpl_amount,max(yearweek(datatime)) as yw \n" +"from fpl_overview \n" +"where datatime >= date_sub(curdate(), interval weekday(curdate()) + 7 Day)  # 上周第一天\n" +"and datatime < date_sub(curdate(), interval weekday(curdate()) + 0 Day)  # 上周最后一天+1 \n" +"and dp_id = '" + fplOverview.getDp_id() + " ' " +"group by dp_id ", res2 -> {ResultSet rs = new ResultSet();if (res2.succeeded()) {rs = res2.result();}else{System.out.println("查询数据库出错");}List<JsonObject> stores = new ArrayList<>();for (JsonObject json : rs.getRows()) {stores.add(json);}connection.close();resultFuture.complete(stores);});});}}

3、预加载全量mysql数据

预加载全量mysql数据 使用 ScheduledExecutorService 每隔 5 分钟拉取一次维表数据,这种方式适用于那些实时场景不是很高,维表数据较小的场景

public class WholeLoad extends RichMapFunction<fplOverview,String> {private static final Logger LOGGER = LoggerFactory.getLogger(WholeLoad.class);// 定义map的结果,key为关联字段private static Map<String,String> cache ;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);cache = new HashMap<>();ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);executor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {load();} catch (Exception e) {e.printStackTrace();}}},0,5, TimeUnit.MINUTES); //从现在开始每隔5分钟查询数据}@Overridepublic String map(fplOverview fplOverview) throws Exception {JSONObject jsonObject = JSONObject.parseObject(fplOverview.toJson());String dp_id = jsonObject.getString("dp_id");// 获取对应id的结果String rs = cache.get(dp_id);JSONObject rsObject = JSONObject.parseObject(rs);jsonObject.putAll(rsObject);return jsonObject.toString();}public   void  load() throws Exception {Class.forName("com.mysql.jdbc.Driver");Connection con = DriverManager.getConnection("jdbc:mysql://test:3306/mysqldb?characterEncoding=UTF-8", "root", "qyllt1314#");// 执行查询的SQLPreparedStatement statement = con.prepareStatement("select  dp_id,max(fpl_amount) as fpl_amount,max(yearweek(datatime)) as yw \n" +"from fpl_overview \n" +"where datatime >= date_sub(curdate(), interval weekday(curdate()) + 7 Day)  # 上周第一天\n" +"and datatime < date_sub(curdate(), interval weekday(curdate()) + 0 Day)  # 上周最后一天+1 \n" +"group by dp_id");ResultSet rs = statement.executeQuery();while (rs.next()) {// 查询结果放入缓存String dp_id = rs.getString("dp_id");String fpl_amount = rs.getString("fpl_amount");String yw = rs.getString("yw");JSONObject jsonObject = JSONObject.parseObject("{}");jsonObject.put("lastweek_fpl_amount",fpl_amount);jsonObject.put("lastweek_yw",yw);cache.put(dp_id,jsonObject.toString());}System.out.println("数据输出测试:"+cache.toString());con.close();}
}

这篇关于Flink 维表关联的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/278920

相关文章

MYSQL关联关系查询方式

《MYSQL关联关系查询方式》文章详细介绍了MySQL中如何使用内连接和左外连接进行表的关联查询,并展示了如何选择列和使用别名,文章还提供了一些关于查询优化的建议,并鼓励读者参考和支持脚本之家... 目录mysql关联关系查询关联关系查询这个查询做了以下几件事MySQL自关联查询总结MYSQL关联关系查询

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

C++ STL关联容器Set与集合论入门

1. 简介 Set(集合)属于关联式容器,也是STL中最实用的容器,关联式容器依据特定的排序准则,自动为其元素排序。Set集合的底层使用一颗红黑树,其属于一种非线性的数据结构,每一次插入数据都会自动进行排序,注意,不是需要排序时再排序,而是每一次插入数据的时候其都会自动进行排序。因此,Set中的元素总是顺序的。 Set的性质有:数据自动进行排序且数据唯一,是一种集合元素,允许进行数学上的集合相

关联规则(一)Apriori算法

此篇文章转自 http://blog.sina.com.cn/s/blog_6a17628d0100v83b.html 个人觉得比课本上讲的更通俗易懂! 1.  挖掘关联规则 1.1   什么是关联规则 一言蔽之,关联规则是形如X→Y的蕴涵式,表示通过X可以推导“得到”Y,其中X和Y分别称为关联规则的先导(antecedent或left-hand-side, LHS)和后

【数据库实战】1_Oracle_命中关联人或黑名单或反洗钱客户

一、字段名称 1、CST_ID :客户编号 2、IDV_LGL_NM :客户姓名 3、关联方标志 RELPARTY_IND,0-否 未命中,1-是 命中 4、TBPC1010表,RSRV_FLD1_INF(备用字段)中的 第6位:黑名单标志,0无,1是。 第10位:反洗钱风险等级1-5。 反洗钱风险等级5级: 1级-低风险客户 2级-较低风险客户 3级-中风险客户 4级-较高风险客户 5级-高风

大数据之Flink(二)

4、部署模式 flink部署模式: 会话模式(Session Mode)单作业模式(Per-Job Mode)应用模式(Application Mode) 区别在于集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里执行。 4.1、会话模式 先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时资源已经确定,所有提交的作业会晶振集群中的资源。适合规模小、执

是谁还不会flink的checkpoint呀~

1、State Vs Checkpoint State:状态,是Flink中某一个Operator在某一个时刻的状态,如maxBy/sum,注意State存的是历史数据/状态,存在内存中。 Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息 一句话概括: Checkpoint就是State的快照 目的:假设作业停止了,下次启动的

ManyToMany双向外键关联(基于注解)的映射案例(简单版)

学生和老师就是多对多的关系,一个学生有多个学生,一个老师也有多个学生,这里的多对多映射采用中间表连接的映射策略,建立中间表的映射策略,建立中间表分别引入俩边的主键作为外键。通过中间表映射俩个表之间的关系。 下面就以学生类和老师类为例介绍多对多的映射关系的实例 Students类 package mtm_bfk;import java.io.Serializable;import java.

ISO26262和Aspice之间的关联

ASPICE 介绍: ASPICE(Automotive Software Process Improvement and Capability dEtermination)是汽车软件过程改进及能力评定的模型,它侧重于汽车软件的开发过程。ASPICE 定义了一系列的过程和活动,包括需求管理、软件设计、软件实现、软件测试、软件集成、软件配置管理、软件质量保证等方面。其目的是通过评估和改进汽车软件的

Flink读取kafka数据并以parquet格式写入HDFS

《2021年最新版大数据面试题全面开启更新》 《2021年最新版大数据面试题全面开启更新》 大数据业务场景中,经常有一种场景:外部数据发送到kafka中,flink作为中间件消费kafka数据并进行业务处理;处理完成之后的数据可能还需要写入到数据库或者文件系统中,比如写入hdfs中; 目前基于spark进行计算比较主流,需要读取hdfs上的数据,可以通过读取parquet:spark.read