Flink流批一体计算(22):Flink SQL之单流kafka写入mysql

2023-12-02 16:30

本文主要是介绍Flink流批一体计算(22):Flink SQL之单流kafka写入mysql,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. 准备工作

什么是Kafka源表

Kafka是分布式、高吞吐、可扩展的消息队列服务,广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域。

docker部署zookeeper
docker pull wurstmeister/zookeeperdocker run -d --restart=always \
--log-driver json-file \
--log-opt max-size=100m \
--log-opt max-file=2  \
--name zookeeper \
-p 2181:2181 \
-v /etc/localtime:/etc/localtime wurstmeister/zookeeper
Docker部署kafka
docker pull wurstmeister/kafka:2.12-2.5.0docker run -d --restart=always \
--log-driver json-file \
--log-opt max-size=100m \
--log-opt max-file=2 \
--name kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.0.6/kafka \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.6:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-v /etc/localtime:/etc/localtime wurstmeister/kafka:2.12-2.5.0

说明

-e KAFKA_BROKER_ID=0  在kafka集群中,每个kafka都有一个BROKER_ID来区分自己

-e KAFKA_ZOOKEEPER_CONNECT=192.168.244.132:2181/kafka 配置zookeeper管理kafka的路径172.16.0.13:2181/kafka

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.244.132:9092  把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。

-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口

-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间

生成数据

$ docker exec -it kafka bash

进入 /opt/kafka_2.12-2.5.0/bin/目录下

$ cd /opt/kafka_2.12-2.5.0/bin/

运行kafka生产者发送消息

$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic flink_test

> {"ts": "2020-10-09 12:23:34","oid": 7,"price_amt":20}

> {"ts": "2020-10-19 10:13:25","oid": 8,"price_amt":20}

> {"ts": "2020-12-19 14:33:35","oid": 9,"price_amt":20}

> {"ts": "2020-12-29 21:56:01","oid": 10,"price_amt":20}

> {"ts": "2020-12-29 21:56:01","oid": 11,"price_amt":30}

> {"ts": "2020-12-40 23:08:15","oid": 12,"price_amt":100}

2. 构建kafka数据源表

DROP TABLE IF EXISTS case_kafka_mysql;create table case_kafka_mysql (id BIGINT,ts VARCHAR,price_amt BIGINT,proctime AS PROCTIME ()
)with ('connector' = 'kafka','topic' = 'flink_test','properties.bootstrap.servers' = '127.0.0.1:9092','properties.group.id' = 'flink_gp_test1','scan.startup.mode' = 'latest-offset','format' = 'json');

在实际使用中请根据实际情况配置字段名和WITH参数。

相关With连接选项

Option

Required

Comment

Description

connector

Y

源表类型。

kafka

topic

Y

topic名称

topic

topic-pattern

N

匹配读取topic名称的正则表达式。

topictopic-pattern两个选项只能指定其中一个

properties.bootstrap.servers

Y

Kafka Broker地址

kafka连接地址

properties.group.id

Y

Kafka消费组ID

groupid

format

Y

Flink Kafka Connector在反序列化来自Kafka的消息体value时使用的格式。

csv, json, avro, debezium-json, canal-json

scan.startup.mode

N

Kafka读取数据的启动位点

取值如下:

earliest-offset:从Kafka最早分区开始读取。

latest-offset:从Kafka最新位点开始读取。

group-offsets(默认值):根据Group读取。如果指定的group为首次使用,则必须将properties.auto.offset.reset设置为 earliestlatest来指定首次启动的位置。

timestamp:从Kafka指定时间点读取。

需要在WITH参数中指定scan.startup.timestamp-millis参数。

specific-offsets:从Kafka指定分区指定偏移量读取。需要在WITH参数中指定scan.startup.specific-offsets参数。

scan.startup.specific-offsets

N

specific-offsets启动模式下,指定每个分区的启动偏移量。

设置消费模式为:specific-offsets时,需写出消费offset,例:'partition:0,offset:42;partition:1,offset:300'

scan.startup.timestamp-millis

N

timestamp启动模式下,指定启动位点时间戳。

设置消费模式为:'timestamp'时,指定时间, 单位为毫秒。

sink.partitioner

N

注意:

1. 如果您还需要直接配置Connector使用的Kafka Consumer,可以在Kafka Consumer配置参数前添加properties前缀,并将该Kafka Consumer配置信息追加至WITH参数。例如Kafka集群需要SASLSimple Authentication and Security Layer)认证。

CREATE TABLE kafkaTable (...
) WITH (...'properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN','properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";'
);

说明 Per-Job集群支持Kafka认证。

2.Kafka源表只在checkpoint成功后将当前消费位点提交至Kafka集群。如果您的checkpoint间隔设置较长,您在Kafka集群侧观察到的消费位点会有延迟。在进行checkpoint时,Kafka源表会将当前读取进度存储在状态中,并不依赖于提交到集群上的位点进行故障恢复,提交位点仅仅是为了在Kafka侧能够监控到读取进度,位点提交失败不会对数据正确性产生任何影响。

网络连接排查

如果Flink作业在启动时出现Timed out waiting for a node assignment错误,一般是FlinkKafka之间的网络连通问题导致的。Kafka客户端与服务端建立连接的过程如下:

客户端使用您指定的bootstrap.servers地址连接Kafka服务端,Kafka服务端根据配置向客户端返回集群中各台broker的元信息,包括各台broker的连接地址。

客户端使用第一步broker返回的连接地址连接各台broker进行读取或写入。如果Kafka服务端没有正确配置,客户端在第一步收到的连接地址有误,即使bootstrap.servers配置的地址可以连接上,也无法正常读取或写入数据。该问题经常在FlinkKafka之间存在代理、端口转发、专线等网络转发机制时发生。

3. Kafka连接器

Table APISQL编写的Flink程序中,可以在创建表的时候用WITH子句指定连接器(connector),这样就可以连接到外部系统进行数据交互了。

Table Source负责从外部系统中读取数据并转换成表,Table Sink则负责将结果表写入外部系统。

Flink 1.13以及以后版本的API调用中,已经不去区分Table SourceTable Sink,我们只要建立到外部系统的连接并创建表就可以,Flink 自动会从程序的处理逻辑中解析出它们的用途。

Flink Table APISQL支持了各种不同的连接器。当然,最简单的其实就是连接到控制台打印输出:

CREATE TABLE ResultTable (user STRING,cnt BIGINT
WITH ('connector' = 'print'
);

这里只需要在WITH中定义connectorprint就可以了。而对于其它的外部系统,则需要增加一些配置项。

Kafka SQL 连接器可以从 Kafka 的主题(topic)读取数据转换成表,也可以将表数据写入 Kafka 的主题。换句话说,创建表的时候指定连接器为 Kafka,则这个表既可以作为输入表,也可以作为输出表。

1. 引入依赖

需要下载对应的 jar 包放到 lib 目录下,注意一定是pat jar

Flink 为各种连接器提供了一系列的"表格式"table formats),比如 CSVJSON AvroParquet 等等。

这些表格式定义了底层存储的二进制数据和表的列之间的转换方式,相当于表的序列化工具。

对于 Kafka 而言,CSVJSONAvro 等主要格式都是支持的, 根据 Kafka 连接器中配置的格式,我们可能需要引入对应的依赖支持。比如flink-csv等。

由于SQL客户端中已经内置了 CSVJSON 的支持,因此使用时无需专门引入;而对于 没有内置支持的格式(比如 Avro),则仍然要下载相应的 jar 包。

为此,需要配置flink集群环境

配置conf/flink.yaml各项

lib增加jar文件:flink-sql-connector-kafka_2.11-1.14.5.jarkafka-clients-2.5.0.jar

standalone方式启动集群

bin/stop-cluster.sh

bin/start-cluster.sh

2. 创建连接到 Kafka 的表

创建一个连接到 Kafka 表,需要在 CREATE TABLE DDL 中在 WITH 子句里指定连接 器为 Kafka,并定义必要的配置参数

CREATE TABLE KafkaTable (
`user` STRING,`url` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH ('connector' = 'kafka','topic' = 'events','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'csv'
)

这里定义了 Kafka 连接器对应的主题(topic),Kafka 服务器,消费者组 ID,消费者起始模式以及表格式。

需要特别说明的是,在KafkaTable的字段中有一个ts,它的声明中用到了 METADATA FROM,这是表示一个元数据列metadata column),它是由 Kafka 连接器的元数据“timestamp”生成的。

这里的timestamp其实就是Kafka中数据自带的时间戳,我们把它直接作为元数据提取出来,转换成一个新的字段ts.

3. Upsert Kafka

正常情况下,Kafka作为保持数据顺序的消息队列,读取和写入都应该是流式的数据,对应在表中就是仅追加(append-only)模式。

如果我们想要将有更新操作(比如分组聚合)的结 果表写入 Kafka,就会因为Kafka无法识别撤回(retract)或更新插入(upsert)消息而导致异常。

为了解决这个问题,Flink专门增加了一个Upsert Kafka连接器。这个连接器支持以更新插入(UPSERT)的方式向Kafkatopic中读写数据。

具体来说,Upsert Kafka连接器处理的是更新日志(changlog)流。如果作为 TableSource,连接器会将读取到的topic中的数据(key, value),解释为对当前key的数据值的更新(UPDATE), 也就是查找动态表中 key 对应的一行数据,将 value 更新为最新的值;因为是 Upsert 操作,所 以如果没有 key 对应的行,那么也会执行插入(INSERT)操作。另外,如果遇到 value 为空 null),连接器就把这条数据理解为对相应 key 那一行的删除(DELETE)操作。

如果作为 TableSinkUpsert Kafka连接器会将有更新操作的结果表,转换成更新日志(changelog)流。如果遇到插入(INSERT)或者更新后(UPDATE_AFTER)的数据,对应的是一个添加(add)消息,那么就直接正常写入 Kafka 主题;如果是删除(DELETE)或者 更新前的数据,对应是一个撤回(retract)消息,那么就把 value 为空(null)的数据写入 Kafka 由于 Flink 是根据键(key)的值对数据进行分区的,这样就可以保证同一个 key 上的更新和删除消息都会落到同一个分区中。

下面是一个创建和使用 Upsert Kafka 表的例子:

CREATE TABLE pageviews_per_region (user_region STRING,pv BIGINT,uv BIGINT,PRIMARY KEY (user_region) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka','topic' = 'pageviews_per_region','properties.bootstrap.servers' = '...','key.format' = 'avro','value.format' = 'avro'
);CREATE TABLE pageviews (user_id BIGINT,page_id BIGINT,viewtime TIMESTAMP,user_region STRING,WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH ('connector' = 'kafka','topic' = 'pageviews','properties.bootstrap.servers' = '...','format' = 'json'
);-- 计算 pv、uv 并插入到 upsert-kafka 表中
INSERT INTO pageviews_per_region
SELECTuser_region,COUNT(*),COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;

这里我们从Kafkapageviews中读取数据,统计每个区域的PV(全部浏览量)和UV(对用户去重),这是一个分组聚合的更新查询,得到的结果表会不停地更新数据。

为了将结果表写入Kafkapageviews_per_region 主题,我们定义了一个 Upsert Kafka 表,它的字段中需要用PRIMARY KEY来指定主键,并且在WITH子句中分别指定keyvalue的序列化格式。

4.构建JDBC sink表

mysql构建数据表

JDBC 连接器允许使用 JDBC 驱动程序从任何关系数据库读取数据并将数据写入数据。介绍如何设置 JDBC 连接器以针对关系数据库运行 SQL 查询。

如果在 DDL 上定义了主键,则 JDBC sink 以 upsert 模式与外部系统交换 UPDATE/DELETE 消息,否则,它以 append 模式运行,不支持消费 UPDATE/DELETE 消息。

引入依赖

flink-connector-jdbc-1.11-1.14.5.jar

连接到指定的数据库还需要驱动程序依赖项。 以下是当前支持的驱动程序:

Driver

Group Id

Artifact Id

MySQL

mysql

mysql-connector-java

PostgreSQL

org.postgresql

postgresql

Derby

org.apache.derby

derby

JDBC 连接器和驱动程序目前不是 Flink 二进制发行版的一部分。

CREATE TABLE sync_test_1 (`ts` varchar(64) NOT NULL,`total_gmv` bigint(11) DEFAULT NULL,PRIMARY KEY (`ts`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
构建MySQL结果表
DROP TABLE IF EXISTS sync_test_1;CREATE TABLE sync_test_1 (
ts string,
total_gmv bigint,
PRIMARY KEY (ts) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8',
'table-name' = 'sync_test_1',
'username' = 'root',
'password' = 'Admin'
);

说明

  • MySQL数据库支持自增主键,因此在DDL中不声明该自增字段。例如ID是自增字段,Flink DDL不声明该自增字段,则数据库在一行数据写入过程中会自动填补相关自增字段。
  • DDL声明的字段必须至少存在一个非主键的字段,否则产生报错。
  • NOT ENFORCED表示Flink自身对主键不做强制校验,需要您自行保证主键的正确性和完整性。Flink并不充分支持强制校验,Flink将假设列的可为空性与主键中的列是对齐的,从而认为主键是正确的
With参数

选项

是否

必须

默认

数据

类型

描述

connector

Y

String

结果表类型,固定值为jdbc。

url

Y

String

数据库url。

table-name

Y

String

JDBC表名

driver

N

String

用于连接到此URL的JDBC驱动程序的类名,如果未设置,它将自动从URL派生。

username

N

String

JDBC用户名。如果指定了用户名和密码,则必须同时指定它们。

password

N

String

JDBC用户密码

connection.max-retry-timeout

N

60s

Duration

重试之间的最大超时。超时应以秒为单位,且不应小于1秒

scan.partition.column

N

String

用于分区的列名。

scan.partition.num

N

Integer

分区数量

scan.partition.lower-bound

N

Integer

第一个分区的最小值。

scan.partition.upper-bound

N

Integer

最后一个分区的最大值。

scan.fetch-size

N

0

Integer

每次读取时应从数据库中提取的行数。如果指定的值为零,则忽略提示。

scan.auto-commit

N

true

Boolean

在JDBC驱动程序上设置自动提交标志,该标志确定是否在事务中自动提交每个语句。一些JDBC驱动程序,特别是Postgres,可能需要将其设置为false,以便流式传输结果。

lookup.cache.max-rows

N

Integer

查找缓存的最大行数,超过此值时,最旧的行将过期。默认情况下禁用查找缓存

lookup.cache.ttl

N

Duration

查找缓存中每行的最长生存时间,在此时间内,最旧的行将过期。默认情况下禁用查找缓存。

lookup.max-retries

N

3

Integer

查找数据库失败时的最大重试次数

sink.buffer-flush.max-rows

N

100

Integer

刷新前,缓存记录的最大值。可以设置为零以禁用它。

sink.buffer-flush.interval

N

1s

Duration

flush数据的时间间隔。数据在Flink中缓存的时间超过该参数指定的时间后,异步线程将flush数据到数据库中。默认值为1秒。可以设置为0来禁用它,即不再缓存记录,直接flush数据。

sink.max-retries

N

3

Integer

将记录写入数据库失败时的最大重试次数。

sink.parallelism

N

Integer

定义JDBC sink操作的并行性。默认情况下,并行度由框架与上游运算相同的并行度来确定。

技术说明
1. 主键处理

Flink在将数据写入外部数据库时使用DDL中定义的主键。如果定义了主键,则连接器以upsert模式运行,否则,连接器以追加(append)模式运行。

在upsert模式下,Flink会根据主键插入新行或更新现有行,Flink可以通过这种方式保证幂等性。

为保证输出结果符合预期,建议为表定义主键,并确保主键是底层数据库表的唯一键集或主键之一。

在append模式下,Flink会将所有记录解释为INSERT消息,如果底层数据库发生主键或唯一约束违规,INSERT操作可能会失败。

2.分区扫描

为了加速并行 Source 任务实例中的数据读取,Flink 提供了 JDBC 表的分区扫描功能。

如果指定了以下所有扫描分区选项,则必须全部指定。

它们描述了从多个任务并行读取时如何对表进行分区。

scan.partition.column 必须是相关表中的数字、日期或时间戳列。

请注意,scan.partition.lower-bound和scan.partition.upper-bound用于决定分区步长和过滤表中的行。

如果是批处理作业,也可以在提交flink作业之前先获取最大值和最小值。

scan.partition.column:用于对输入进行分区的列名。

scan.partition.num:分区数。

scan.partition.lower-bound:第一个分区的最小值。

scan.partition.upper-bound:最后一个分区的最大值。

3.查找缓存(Lookup Cache )

在流式计算中,维表(dim_dic)是一个很常见的概念,一般用于sql的join中,对流式数据进行数据补全,或在不复杂的模型中对事实表做轻量化的维度退化。

比如我们的source stream是来自日志的订单数据,但日志中我们只是记录了订单商品的id,却没有其他的附加信息(如sku、促销活动、优惠券信息等),但我们把订单数据存入实时数仓进行数据分析的时候,却需要同步获取sku名称、优惠券等其他的信息,这种问题我们可以在进行流处理的时候通过查询维表的方式对数据进行数据补全。

维表一般存储在外部存储中,如mysql、hbase(使用phoenix操作)、redis等等。

JDBC 连接器可以在时间连接中用作查找源(又名维度表)。目前,仅支持同步查找模式。

默认情况下,查找缓存未启用。您可以通过设置如下2个属性来启用它。

lookup.cache.max-rows

lookup.cache.ttl

查找缓存用于提高临时连接 JDBC 连接器的性能。

默认情况下,查找缓存未启用,因此所有请求都发送到外部数据库。

启用查找缓存后,每个进程(即 TaskManager)都会持有一个缓存。

Flink 会先查找缓存,只有在缓存缺失时才会向外部数据库发送请求,并根据返回的行更新缓存。

当缓存达到最大缓存行lookup.cache.max-rows 或行超过最大存活时间lookup.cache.ttl 时,缓存中最旧的行将过期。

缓存的行可能不是最新的,用户可以将 lookup.cache.ttl 调整为较小的值以获得更快的数据更新,但这可能会增加发送到数据库的请求数量。

所以这是吞吐量和正确性之间的平衡。

4.幂等写入(Idempotent Writes)

如果在DDL中定义了主键,则JDBC接收器将使用upsert语义而不是普通的INSERT语句。

Upsert语义是指如果底层数据库中存在唯一约束违规,则原子地添加新行或更新现有行,这提供了幂等性。

如果出现故障,Flink 作业将从上一个成功的检查点恢复并重新处理,这可能导致恢复期间重新处理消息。

强烈建议使用upsert模式,因为如果需要重新处理记录,它有助于避免违反约束或重复数据。

除了故障恢复之外,随着时间的推移,源主题自然也可能包含多个具有相同主键的记录,这使得 upserts 是可取的。

5. 计算

INSERT INTO sync_test_1
SELECT ts,SUM(price_amt) AS total_gmv
FROM case_kafka_mysql
GROUP BY ts;

查看执行结果,验证

SELECT ts, total_gmv FROM sync_test_1;

继续生成数据:

$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic flink_test

> {"ts": "2020-10-09 12:23:34","id": 7,"price_amt":20}

> {"ts": "2020-10-19 10:13:25","id": 8,"price_amt":20}

> {"ts": "2020-12-19 14:33:35","id": 9,"price_amt":20}

> {"ts": "2020-12-29 21:56:01","id": 10,"price_amt":20}

> {"ts": "2020-12-29 21:56:01","id": 11,"price_amt":30}

继续查验结果:

SELECT ts, total_gmv FROM sync_test_1;
Top-N

Top-N查询按列排序的N个最小或最大值。最小和最大值集都被视为Top-N查询。当需要在某个条件下仅显示批处理/流式处理表中的N个最底层或N个最顶层记录时,Top-N查询非常有用。该结果集可用于进一步分析。

Flink使用OVER窗口子句和过滤条件的组合来表示Top-N查询。借助OVER window PARTITION BY子句的强大功能,Flink还支持每组Top-N。例如,每个类别中实时销售额最高的前五种产品。批处理表和流表上的SQL支持Top-N查询。

Flink SQL将根据顺序键对输入数据流进行排序,因此如果前N条记录已更改,则更改后的记录将作为收回/更新记录发送到下游。建议使用支持更新的存储作为Top-N查询的接收器。此外,如果前N条记录需要存储在外部存储器中,则结果表应具有与top-N查询相同的唯一键。

Top-N查询的唯一键是分区列和rownum列的组合。

SELECT *
FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY ts ORDER BY price_amt DESC) AS row_numFROM case_kafka_mysql)
WHERE row_num <= 5

如上所述,rownum字段将作为唯一键的一个字段写入结果表,这可能会导致大量记录写入结果表。例如,当排名9的记录(例如product-1001)被更新并且其排名被升级为1时,排名1~9的所有记录将作为更新消息输出到结果表。如果结果表接收到太多数据,它将成为SQL作业的瓶颈。

优化方法是在Top-N查询的外部SELECT子句中省略rownum字段。这是合理的,因为前N条记录的数量通常不多,因此消费者可以自己快速排序记录。在上面的示例中,如果没有rownum字段,只需要将更改的记录(ID)发送到下游,这可以减少结果表的IO。

优化后:

SELECT ts, amount
FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY ts ORDER BY price_amt DESC) AS row_numFROM case_kafka_mysql)
WHERE row_num <= 5

这篇关于Flink流批一体计算(22):Flink SQL之单流kafka写入mysql的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/446027

相关文章

SQL中的外键约束

外键约束用于表示两张表中的指标连接关系。外键约束的作用主要有以下三点: 1.确保子表中的某个字段(外键)只能引用父表中的有效记录2.主表中的列被删除时,子表中的关联列也会被删除3.主表中的列更新时,子表中的关联元素也会被更新 子表中的元素指向主表 以下是一个外键约束的实例展示

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

如何去写一手好SQL

MySQL性能 最大数据量 抛开数据量和并发数,谈性能都是耍流氓。MySQL没有限制单表最大记录数,它取决于操作系统对文件大小的限制。 《阿里巴巴Java开发手册》提出单表行数超过500万行或者单表容量超过2GB,才推荐分库分表。性能由综合因素决定,抛开业务复杂度,影响程度依次是硬件配置、MySQL配置、数据表设计、索引优化。500万这个值仅供参考,并非铁律。 博主曾经操作过超过4亿行数据

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

MySQL高性能优化规范

前言:      笔者最近上班途中突然想丰富下自己的数据库优化技能。于是在查阅了多篇文章后,总结出了这篇! 数据库命令规范 所有数据库对象名称必须使用小写字母并用下划线分割 所有数据库对象名称禁止使用mysql保留关键字(如果表名中包含关键字查询时,需要将其用单引号括起来) 数据库对象的命名要能做到见名识意,并且最后不要超过32个字符 临时库表必须以tmp_为前缀并以日期为后缀,备份

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

poj 1113 凸包+简单几何计算

题意: 给N个平面上的点,现在要在离点外L米处建城墙,使得城墙把所有点都包含进去且城墙的长度最短。 解析: 韬哥出的某次训练赛上A出的第一道计算几何,算是大水题吧。 用convexhull算法把凸包求出来,然后加加减减就A了。 计算见下图: 好久没玩画图了啊好开心。 代码: #include <iostream>#include <cstdio>#inclu

uva 1342 欧拉定理(计算几何模板)

题意: 给几个点,把这几个点用直线连起来,求这些直线把平面分成了几个。 解析: 欧拉定理: 顶点数 + 面数 - 边数= 2。 代码: #include <iostream>#include <cstdio>#include <cstdlib>#include <algorithm>#include <cstring>#include <cmath>#inc

uva 11178 计算集合模板题

题意: 求三角形行三个角三等分点射线交出的内三角形坐标。 代码: #include <iostream>#include <cstdio>#include <cstdlib>#include <algorithm>#include <cstring>#include <cmath>#include <stack>#include <vector>#include <