Confluent Platform 的快速上手

2024-08-25 07:08
文章标签 快速 platform confluent

本文主要是介绍Confluent Platform 的快速上手,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

什么是 Confluent Platform?

先说下什么是 Confluent ? Confluent由ApacheKafka®的原始创建者创立的,以Kafka为技术核心的公司。

Confluent提供了业界唯一的企业级事件流平台,从而为应用程序和数据基础架构带来了新的范例。Confluent Platform(平台)基于此理念开发出来, 可以很方便的建立实时的数据流和流处理应用。让用户更加关注于业务价值。

confluentPlatform组件

快速开始

官网提供了三种使用方式,每个人都可以根据自己实际需求选择最合适的。我因个人练习,所以使用了 Confluent Platform Quick Start (Docker)。

Confluent Platform Quick Start

  • Confluent Platform Quick Start (Local)
  • Confluent Platform Quick Start (Docker)

Confluent Platform Quick Start using Community Components

  • Quick Start using Community Components (Local)
  • Quick Start using Community Components (Docker)

Confluent Cloud Quick Start

  • Confluent Cloud Quick Start

Step1 使用的Docker-Compose 快速的启动所需服务

Docker-compose 对于搭建基础环境,简直不要太爽。Confluent Platform的基础环境Docker-Compose文件如下:

version: "2"
services:zookeeper:image: confluentinc/cp-zookeeper:5.4.1hostname: zookeepercontainer_name: zookeeperports:- "2181:2181"environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000broker:image: confluentinc/cp-server:5.4.1hostname: brokercontainer_name: brokerdepends_on:- zookeeperports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXTKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporterKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1CONFLUENT_METRICS_ENABLE: "true"CONFLUENT_SUPPORT_CUSTOMER_ID: "anonymous"schema-registry:image: confluentinc/cp-schema-registry:5.4.1hostname: schema-registrycontainer_name: schema-registrydepends_on:- zookeeper- brokerports:- "8081:8081"environment:SCHEMA_REGISTRY_HOST_NAME: schema-registrySCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"connect:image: cnfldemos/cp-server-connect-datagen:0.2.0-5.4.0hostname: connectcontainer_name: connectdepends_on:- zookeeper- broker- schema-registryports:- "8083:8083"environment:CONNECT_BOOTSTRAP_SERVERS: "broker:29092"CONNECT_REST_ADVERTISED_HOST_NAME: connectCONNECT_REST_PORT: 8083CONNECT_GROUP_ID: compose-connect-groupCONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configsCONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsetsCONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1CONNECT_STATUS_STORAGE_TOPIC: docker-connect-statusCONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverterCONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverterCONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"CONNECT_ZOOKEEPER_CONNECT: "zookeeper:2181"# CLASSPATH required due to CC-2422CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.4.1.jarCONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERRORcontrol-center:image: confluentinc/cp-enterprise-control-center:5.4.1hostname: control-centercontainer_name: control-centerdepends_on:- zookeeper- broker- schema-registry- connect- ksql-serverports:- "9021:9021"environment:CONTROL_CENTER_BOOTSTRAP_SERVERS: "broker:29092"CONTROL_CENTER_ZOOKEEPER_CONNECT: "zookeeper:2181"CONTROL_CENTER_CONNECT_CLUSTER: "connect:8083"CONTROL_CENTER_KSQL_URL: "http://ksql-server:8088"CONTROL_CENTER_KSQL_ADVERTISED_URL: "http://localhost:8088"CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"CONTROL_CENTER_REPLICATION_FACTOR: 1CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1CONFLUENT_METRICS_TOPIC_REPLICATION: 1PORT: 9021ksql-server:image: confluentinc/cp-ksql-server:5.4.1hostname: ksql-servercontainer_name: ksql-serverdepends_on:- broker- connectports:- "8088:8088"environment:KSQL_CONFIG_DIR: "/etc/ksql"KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"KSQL_BOOTSTRAP_SERVERS: "broker:29092"KSQL_HOST_NAME: ksql-serverKSQL_LISTENERS: "http://0.0.0.0:8088"KSQL_CACHE_MAX_BYTES_BUFFERING: 0KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"KSQL_KSQL_CONNECT_URL: "http://connect:8083"ksql-cli:image: confluentinc/cp-ksql-cli:5.4.1container_name: ksql-clidepends_on:- broker- connect- ksql-serverentrypoint: /bin/shtty: trueksql-datagen:# Downrev ksql-examples to 5.1.2 due to DEVX-798 (work around issues in 5.2.0)image: confluentinc/ksql-examples:5.4.1hostname: ksql-datagencontainer_name: ksql-datagendepends_on:- ksql-server- broker- schema-registry- connectcommand: "bash -c 'echo Waiting for Kafka to be ready... && \cub kafka-ready -b broker:29092 1 40 && \echo Waiting for Confluent Schema Registry to be ready... && \cub sr-ready schema-registry 8081 40 && \echo Waiting a few seconds for topic creation to finish... && \sleep 11 && \tail -f /dev/null'"environment:KSQL_CONFIG_DIR: "/etc/ksql"KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"STREAMS_BOOTSTRAP_SERVERS: broker:29092STREAMS_SCHEMA_REGISTRY_HOST: schema-registrySTREAMS_SCHEMA_REGISTRY_PORT: 8081rest-proxy:image: confluentinc/cp-kafka-rest:5.4.1depends_on:- zookeeper- broker- schema-registryports:- 8082:8082hostname: rest-proxycontainer_name: rest-proxyenvironment:KAFKA_REST_HOST_NAME: rest-proxyKAFKA_REST_BOOTSTRAP_SERVERS: "broker:29092"KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"KAFKA_REST_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"

运行 docker-compose up -d 启动服务就好

可以去Github上下载最新的配置文件. github 地址为 https://github.com/confluentinc/examples, 下载 cp-all-in-one 目录下的 docker-compose.yml 文件

启动好之后,通过 docker-compose ps 可以看到正常启动的服务

     Name                    Command                  State                         Ports                   
------------------------------------------------------------------------------------------------------------
broker            /etc/confluent/docker/run        Up             0.0.0.0:9092->9092/tcp                    
connect           /etc/confluent/docker/run        Up             0.0.0.0:8083->8083/tcp, 9092/tcp          
control-center    /etc/confluent/docker/run        Up             0.0.0.0:9021->9021/tcp                    
ksql-cli          /bin/sh                          Up                                                       
ksql-datagen      bash -c echo Waiting for K ...   Up                                                       
ksql-server       /etc/confluent/docker/run        Up (healthy)   0.0.0.0:8088->8088/tcp                    
rest-proxy        /etc/confluent/docker/run        Up             0.0.0.0:8082->8082/tcp                    
schema-registry   /etc/confluent/docker/run        Up             0.0.0.0:8081->8081/tcp                    
zookeeper         /etc/confluent/docker/run        Up             0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp

Step2 创建练习需要使用的 Topics

服务启动成功之后,进入 Confluent 控制中心。Confluent 控制 中心提供了数据流处理应用。

  1. 浏览器中输入 http://localhost:9021 就可以打开。

c3-landing-page

  1. 从集群中选择 Topics ,并且点击 Add a topic 就可以添加。

c3-create-topic

  1. 创建一个名为 pageviews 的Topic,并且选中 Create with defaults

c3-create-topic-name

  1. 重复2、3 步骤,创建一个名为 users Kafka 主题。

Step3 安装一个Kafka 连接器并且生成一些简单的数据

这一步中,我们选用 kafka-connect-datagen 连接器来演示,如何简单入门怎么使用Kafka 连接器。kafka-connect-datagen 连接器是 CP 自带的,并且会为 pageviewsusers 两个主题产生一些简单数据。

  1. 启动一个 Kafka Connect Datagen 连接器的运行实例,以 AVRO 格式将Kafka数据发送到 pageviews 主题中。

  2. Cluster 集群主界面,点击导航栏中的 Connect

  3. 找到 DatagenConnector 连接器,并且点击 Connect 按钮

connect-page-new-source

  1. 命名新建的连接器为 datagen-pageviews。新建的连接器属性定义如下:
    • Key converter class 属性, 写入 org.apache.kafka.connect.storage.StringConverter.
    • kafka.topic 属性, 写入 pageviews.
    • max.interval 属性, 写入 100.
    • iterations 属性, 写入 1000000000.
    • quickstart 属性, 写入 pageviews.

connect-configure-pageviews

  1. 完成后,点击继续按钮。属性配置大概如下;

connect-review-pageviews

使用同样的方式创建第二个连接器,名为datagen-users。将 users 主题下的数据导入,不同的在于将前面的 max.interval 属性设置为 1000 而不是 100

Step4 使用KSQL 来创建和写入 Stream 和 Table

KSQL 面向Apache Kafka的一种数据流SQL引擎,非常轻量,上手简单。

创建 Stream 和 Tables

着这里,我们为Kafka中的 pageviews 主题来创建一个 Stream,为 users 主题来创建一个表(table)。

  1. 在Cluster界面中,点击 KSQL 导航栏,选择 KSQL Application 进入
  2. KSQL EDITOR 界面来操作,点击工具 栏中 **Streams ** 中的 Add Stream

ksql-interface-create-stream2

  1. 选中出现的 pageviews 主题.

c3-ksql-create-stream-pageview

  1. 选中你自定义的 Stream 操作
    • Encoding 属性中选中 AVRO
    • 确保Stream中字段的类型选中如下
      • viewtime 的类型为 BIGINT
      • userid 的类型为 VARCHAR
      • pageid 的类型为 VARCHAR

c3-ksql-create-stream-pageview-2

  1. 点击 Save Stream 按钮就好

以下步骤为如何为 Kafka 中的 users 主题来创建 Table。

  1. 选中工具栏中的 Table

    • Encoding 属性中选中 AVRO
    • Key 属性中,选中 userid.
    • 确保Stream中字段的类型选中如下
      • registertime 的类型为 BIGINT
      • userid 的类型为 VARCHAR
      • regionid 的类型为 VARCHAR
      • gender 的类型为 VARCHAR

c3-ksql-create-table-users

  1. 完成后 Save Table

编写查询语句

在KSQL 的编辑界面 ,在 Add query properties 中 添加一个自定义查询属性,记得将 auto.offset.reset 设置为 earliest。还有很多参数可以设置,详情见

KSQL 的语法同标注的SQL很像。比如下面

SELECT pageid FROM pageviews EMIT CHANGES LIMIT 3;

输出的结构类似于如下:

c3-ksql-query-results-pageid

如果我们想将前面创建的 pageviews Stream 中的数据和 users Table中的数据,(根据userid)右连接一下,生成新的流数据,过滤出其中 gender = 'FEMALE' 的数据,并且将新生成的流数据写入到 Kafka 中的 PAGEVIEWS_FEMALE 主题中。如下的KSQL可以实现

CREATE STREAM pageviews_female ASSELECT users.userid AS userid, pageid, regionid, gender FROM pageviews LEFT JOIN users ON pageviews.userid = users.userid WHERE gender = 'FEMALE';

运行成功后,可见如下的输出结果

c3-ksql-persist-query-pv-female-results

在前面创建的好的 ``PAGEVIEWS_FEMALE主题下, 使用LIKE语句创建一个满足指定的 regionid 条件的持久查询,并将该查询的结果写入名为pageviews_enriched_r8_r9`的Kafka主题中。

CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='AVRO') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';

运行成功后,可见如下的输出结果

c3-ksql-persist-query-pv-female89-results

创建一个持久查询,当计数大于1时,将在30秒的 tumbling window 中对每个区域和性别组合的浏览量进行计数。由于该过程是分组和计数结果,因此结果是表(Table)而不是流(Stream)。该查询的结果将写入名为PAGEVIEWS_REGIONS的Kafka主题。

CREATE TABLE pageviews_regions AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1;

运行成功后,可见如下的输出结果

c3-ksql-persist-query-table-results

点击,Running queries 可以看到所有正在运行的查询。

c3-ksql-persistent-query1

Editor 右侧的,点开 All available streams and tables 可以看到所有的 Table 和 Stream。选择任意一个,可以看到对应的Schema。

c3-ksql-stream-table-view-1

Step5 监控消费者滞后

导航到 Consumers 视图,点击消费者组ID来查看所有的详细视图。比如看具体的 _confluent-ksql-default_query_CSAS_PAGEVIEWS_FEMALE_3 消费者组。

ksql-interface-monitor

在此页面上,您可以查看流查询的消费者滞后值和消费值。

Step 6: 停止Docker 容器

使用完Docker后,您可以停止和删除Docker容器和映像。

  1. 查看所有Docker容器ID的列表。

    docker container ls -aq
    
  2. 运行以下命令以停止Confluent的Docker容器:

    docker container stop $(docker container ls -a -q -f "label=io.confluent.docker")
    
  3. 运行以下命令可以停止容器并修剪Docker系统。运行这些命令将删除容器,网络,卷和映像。释放磁盘空间:

    docker container stop $(docker container ls -a -q -f "label=io.confluent.docker") && docker system prune -a -f --volumes
    

参考的翻译原文链接:https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html#step-5-monitor-consumer-lag

这篇关于Confluent Platform 的快速上手的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1104880

相关文章

shell脚本快速检查192.168.1网段ip是否在用的方法

《shell脚本快速检查192.168.1网段ip是否在用的方法》该Shell脚本通过并发ping命令检查192.168.1网段中哪些IP地址正在使用,脚本定义了网络段、超时时间和并行扫描数量,并使用... 目录脚本:检查 192.168.1 网段 IP 是否在用脚本说明使用方法示例输出优化建议总结检查 1

Rust中的Option枚举快速入门教程

《Rust中的Option枚举快速入门教程》Rust中的Option枚举用于表示可能不存在的值,提供了多种方法来处理这些值,避免了空指针异常,文章介绍了Option的定义、常见方法、使用场景以及注意事... 目录引言Option介绍Option的常见方法Option使用场景场景一:函数返回可能不存在的值场景

电脑桌面文件删除了怎么找回来?别急,快速恢复攻略在此

在日常使用电脑的过程中,我们经常会遇到这样的情况:一不小心,桌面上的某个重要文件被删除了。这时,大多数人可能会感到惊慌失措,不知所措。 其实,不必过于担心,因为有很多方法可以帮助我们找回被删除的桌面文件。下面,就让我们一起来了解一下这些恢复桌面文件的方法吧。 一、使用撤销操作 如果我们刚刚删除了桌面上的文件,并且还没有进行其他操作,那么可以尝试使用撤销操作来恢复文件。在键盘上同时按下“C

hdu 4565 推倒公式+矩阵快速幂

题意 求下式的值: Sn=⌈ (a+b√)n⌉%m S_n = \lceil\ (a + \sqrt{b}) ^ n \rceil\% m 其中: 0<a,m<215 0< a, m < 2^{15} 0<b,n<231 0 < b, n < 2^{31} (a−1)2<b<a2 (a-1)^2< b < a^2 解析 令: An=(a+b√)n A_n = (a +

v0.dev快速开发

探索v0.dev:次世代开发者之利器 今之技艺日新月异,开发者之工具亦随之进步不辍。v0.dev者,新兴之开发者利器也,迅速引起众多开发者之瞩目。本文将引汝探究v0.dev之基本功能与优势,助汝速速上手,提升开发之效率。 何谓v0.dev? v0.dev者,现代化之开发者工具也,旨在简化并加速软件开发之过程。其集多种功能于一体,助开发者高效编写、测试及部署代码。无论汝为前端开发者、后端开发者

利用Django框架快速构建Web应用:从零到上线

随着互联网的发展,Web应用的需求日益增长,而Django作为一个高级的Python Web框架,以其强大的功能和灵活的架构,成为了众多开发者的选择。本文将指导你如何从零开始使用Django框架构建一个简单的Web应用,并将其部署到线上,让世界看到你的作品。 Django简介 Django是由Adrian Holovaty和Simon Willison于2005年开发的一个开源框架,旨在简

CentOs7上Mysql快速迁移脚本

因公司业务需要,对原来在/usr/local/mysql/data目录下的数据迁移到/data/local/mysql/mysqlData。 原因是系统盘太小,只有20G,几下就快满了。 参考过几篇文章,基于大神们的思路,我封装成了.sh脚本。 步骤如下: 1) 先修改好/etc/my.cnf,        ##[mysqld]       ##datadir=/data/loc

SAM2POINT:以zero-shot且快速的方式将任何 3D 视频分割为视频

摘要 我们介绍 SAM2POINT,这是一种采用 Segment Anything Model 2 (SAM 2) 进行零样本和快速 3D 分割的初步探索。 SAM2POINT 将任何 3D 数据解释为一系列多向视频,并利用 SAM 2 进行 3D 空间分割,无需进一步训练或 2D-3D 投影。 我们的框架支持各种提示类型,包括 3D 点、框和掩模,并且可以泛化到不同的场景,例如 3D 对象、室

UE5 半透明阴影 快速解决方案

Step 1: 打开该选项 Step 2: 将半透明材质给到模型后,设置光照的Shadow Resolution Scale,越大,阴影的效果越好

快速排序(java代码实现)

简介: 1.采用“分治”的思想,对于一组数据,选择一个基准元素,这里选择中间元素mid 2.通过第一轮扫描,比mid小的元素都在mid左边,比mid大的元素都在mid右边 3.然后使用递归排序这两部分,直到序列中所有数据均有序为止。 public class csdnTest {public static void main(String[] args){int[] arr = {3,