pyflink读取kafka数据写入mysql实例

2023-10-11 18:45

本文主要是介绍pyflink读取kafka数据写入mysql实例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

依赖包下载

https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/

版本

flink:1.16.0

kafka:2.13-3.2.0

实例

import logging
import sysfrom pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.formats.json import JsonRowSerializationSchema, JsonRowDeserializationSchemadef write_to_kafka(env):type_info = Types.ROW([Types.INT(), Types.STRING()])ds = env.from_collection([(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')],type_info=type_info)serialization_schema = JsonRowSerializationSchema.Builder() \.with_type_info(type_info) \.build()kafka_producer = FlinkKafkaProducer(topic='test_json_topic',serialization_schema=serialization_schema,producer_config={'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism': 'PLAIN', 'bootstrap.servers': '192.168.1.110:9092', 'group.id': 'test-consumer-group', 'sasl.jaas.config': 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"aaaaaaaaa\" password=\"bbbbbbb\";'})# note that the output type of ds must be RowTypeInfods.add_sink(kafka_producer)env.execute()def read_from_kafka(env):deserialization_schema = JsonRowDeserializationSchema.Builder() \.type_info(Types.ROW([Types.INT(), Types.STRING()])) \.build()kafka_consumer = FlinkKafkaConsumer(topics='test_json_topic',deserialization_schema=deserialization_schema,properties={'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism': 'PLAIN', 'bootstrap.servers': '192.168.1.110:9092', 'group.id': 'test-consumer-group', 'sasl.jaas.config': 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"aaaaa\" password=\"bbbbbb\";'})kafka_consumer.set_start_from_earliest()env.add_source(kafka_consumer).print()env.execute()def wirte_data_todb(env, data):type_info = Types.ROW([Types.INT(), Types.STRING()])env.from_collection([(101, "Stream Processing with Apache Flink"),(102, "Streaming Systems"),(103, "Designing Data-Intensive Applications"),(104, "Kafka: The Definitive Guide")], type_info=type_info) \.add_sink(JdbcSink.sink("insert into flink (id, title) values (?, ?)",type_info,JdbcConnectionOptions.JdbcConnectionOptionsBuilder().with_url('jdbc:mysql://192.168.1.110:23006/test').with_driver_name('com.mysql.jdbc.Driver').with_user_name('sino').with_password('Caib@sgcc-56').build()))env.execute()if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")env = StreamExecutionEnvironment.get_execution_environment()#env.add_jars("file:///opt/flink/flink-sql-connector-kafka-1.15.0.jar")#env.add_jars("file:///opt/flink/kafka-clients-2.8.1.jar")#env.add_jars("file:///opt/flink/flink-connector-jdbc-1.16.0.jar")#env.add_jars("file:///opt/flink/mysql-connector-java-8.0.29.jar")print("start reading data from kafka")read_from_kafka(env)#wirte_data_todb(env, "")

这篇关于pyflink读取kafka数据写入mysql实例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/190090

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

SQL中的外键约束

外键约束用于表示两张表中的指标连接关系。外键约束的作用主要有以下三点: 1.确保子表中的某个字段(外键)只能引用父表中的有效记录2.主表中的列被删除时,子表中的关联列也会被删除3.主表中的列更新时,子表中的关联元素也会被更新 子表中的元素指向主表 以下是一个外键约束的实例展示

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

如何去写一手好SQL

MySQL性能 最大数据量 抛开数据量和并发数,谈性能都是耍流氓。MySQL没有限制单表最大记录数,它取决于操作系统对文件大小的限制。 《阿里巴巴Java开发手册》提出单表行数超过500万行或者单表容量超过2GB,才推荐分库分表。性能由综合因素决定,抛开业务复杂度,影响程度依次是硬件配置、MySQL配置、数据表设计、索引优化。500万这个值仅供参考,并非铁律。 博主曾经操作过超过4亿行数据

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间