本文主要是介绍pyflink读取kafka数据写入mysql实例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
依赖包下载
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/
版本
flink:1.16.0
kafka:2.13-3.2.0
实例
import logging
import sysfrom pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.jdbc import JdbcSink, JdbcConnectionOptions
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.formats.json import JsonRowSerializationSchema, JsonRowDeserializationSchemadef write_to_kafka(env):type_info = Types.ROW([Types.INT(), Types.STRING()])ds = env.from_collection([(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')],type_info=type_info)serialization_schema = JsonRowSerializationSchema.Builder() \.with_type_info(type_info) \.build()kafka_producer = FlinkKafkaProducer(topic='test_json_topic',serialization_schema=serialization_schema,producer_config={'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism': 'PLAIN', 'bootstrap.servers': '192.168.1.110:9092', 'group.id': 'test-consumer-group', 'sasl.jaas.config': 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"aaaaaaaaa\" password=\"bbbbbbb\";'})# note that the output type of ds must be RowTypeInfods.add_sink(kafka_producer)env.execute()def read_from_kafka(env):deserialization_schema = JsonRowDeserializationSchema.Builder() \.type_info(Types.ROW([Types.INT(), Types.STRING()])) \.build()kafka_consumer = FlinkKafkaConsumer(topics='test_json_topic',deserialization_schema=deserialization_schema,properties={'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism': 'PLAIN', 'bootstrap.servers': '192.168.1.110:9092', 'group.id': 'test-consumer-group', 'sasl.jaas.config': 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"aaaaa\" password=\"bbbbbb\";'})kafka_consumer.set_start_from_earliest()env.add_source(kafka_consumer).print()env.execute()def wirte_data_todb(env, data):type_info = Types.ROW([Types.INT(), Types.STRING()])env.from_collection([(101, "Stream Processing with Apache Flink"),(102, "Streaming Systems"),(103, "Designing Data-Intensive Applications"),(104, "Kafka: The Definitive Guide")], type_info=type_info) \.add_sink(JdbcSink.sink("insert into flink (id, title) values (?, ?)",type_info,JdbcConnectionOptions.JdbcConnectionOptionsBuilder().with_url('jdbc:mysql://192.168.1.110:23006/test').with_driver_name('com.mysql.jdbc.Driver').with_user_name('sino').with_password('Caib@sgcc-56').build()))env.execute()if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")env = StreamExecutionEnvironment.get_execution_environment()#env.add_jars("file:///opt/flink/flink-sql-connector-kafka-1.15.0.jar")#env.add_jars("file:///opt/flink/kafka-clients-2.8.1.jar")#env.add_jars("file:///opt/flink/flink-connector-jdbc-1.16.0.jar")#env.add_jars("file:///opt/flink/mysql-connector-java-8.0.29.jar")print("start reading data from kafka")read_from_kafka(env)#wirte_data_todb(env, "")
这篇关于pyflink读取kafka数据写入mysql实例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!