SDC RPC到Kafka

2023-11-09 13:20
文章标签 rpc kafka sdc

本文主要是介绍SDC RPC到Kafka,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

SDC RPC 到卡夫卡(已弃用)

从一个或多个 SDC RPC 目标读取数据,并将其立即写入卡夫卡。在 SDC RPC 目标管道中使用 SDC RPC 到卡夫卡源。但是,到 Kafka 源的 SDC RPC 现已弃用,并将在将来的版本中被删除。我们建议使用 SDC RPC 源。

如果有多个 SDC RPC 源管道,其中包含要写入卡夫卡而无需额外处理的数据,请使用 SDC RPC 到卡夫卡源。

与 SDC RPC 源一样,从 SDC RPC 到卡夫卡源从另一个管道中的 SDC RPC 目标读取数据。但是,到卡夫卡源的 SDC RPC 经过优化,可将数据从多个管道直接写入卡夫卡。使用此源时,无法在写入 Kafka 之前执行其他处理。

以下是使用 SDC RPC 到卡夫卡源的推荐体系结构示例:

将 SDC RPC 配置为 Kafka 源时,可以定义源侦听数据的端口、SDC RPC ID、最大并发请求数和最大批量请求大小。您还可以配置 SSL/TLS 属性,包括默认传输协议和密码套件。

您还需要为 Kafka 配置连接信息,包括代理 URI、要写入的主题以及最大消息大小。您可以根据需要添加卡夫卡配置属性并启用卡夫卡安全性。

管道配置

在管道中使用 SDC RPC 到 Kafka 源时,请将源连接到废纸篓目标。

将 SDC RPC 写入卡夫卡源,将记录直接写入卡夫卡。源不会将记录传递到其输出端口,因此您无法执行其他处理或将数据写入其他目标系统。

但是,由于管道需要目标,因此应将原点连接到废纸篓目标以满足管道验证要求。

将 SDC RPC 连接到卡夫卡源的管道应如下所示:

并发请求

您可以指定 SDC RPC 对 Kafka 源一次处理的最大请求数。

当源管道中的 SDC RPC 目标想要将一批数据传递到源时,它会向 Kafka 源的 SDC RPC 发送请求。如果有一个源管道将数据传递到 SDC RPC 再到 Kafka 源,则可以将最大并发请求数设置为 1,因为目标一次处理一批数据。

通常,您将有多个管道将数据传递到此源。在这种情况下,应评估源管道的数量、管道的预期输出以及 Data Collector 计算机的资源,然后根据需要调整属性以提高管道性能。

例如,如果有 100 个源管道将数据传递到 SDC RPC 到 Kafka 源,但管道生成数据的速度很慢,则可以将最大值设置为 20,以防止这些管道在卷峰值期间使用过多的数据收集器资源。或者,如果数据收集器没有资源问题,并且您希望它尽快处理数据,则可以将最大值设置为 90 或 100。请注意,SDC RPC 目标还具有用于重试和回退周期的高级属性,可用于帮助调整性能。

批请求大小、卡夫卡消息大小和卡夫卡配置

将 SDC RPC 配置为彼此相关的卡夫卡最大批处理请求大小和卡夫卡消息大小属性,以及卡夫卡中配置的最大消息大小。

最大批处理请求大小 (MB)”属性确定源从每个 SDC RPC 目标接受的数据批的最大大小。收到一批数据后,源会立即将数据写入 Kafka。

为了提升峰值性能,源将尽可能多的记录写入单个 Kafka 消息。“Kafka 最大消息大小 (KB)”属性确定它所创建的消息的最大大小。

重要:源中指定的最大邮件大小必须小于 Kafka 中配置的最大邮件大小。默认情况下,Kafka 群集中的最大消息大小为 1 MB,由消息.max.bytes 属性定义。 如果源收到大于最大消息大小的消息,则无法处理批处理,并且提供批处理的目标根据为目标配置的错误记录处理来处理该批处理。

例如,假设源使用默认的 100 MB 作为最大批处理请求大小,默认的 900 KB 作为最大消息大小,Kafka 对消息.max.bytes 使用默认值 1 MB。

当源请求一批数据时,它一次最多接收 100 MB 的数据。当源写入 Kafka 时,它会将记录分组为尽可能少的消息,包括每条消息中最多 900 KB 的记录。由于消息大小小于 Kafka 1 MB 的要求,因此源已成功将所有消息写入 Kafka。

如果记录大于 900 KB 的最大消息大小,则源将生成错误,并且不会将记录(或包含该记录的批处理)写入 Kafka。为批处理提供超大记录的 SDC RPC 目标根据阶段错误记录处理来处理批处理。

其他卡夫卡属性

您可以将自定义卡夫卡配置属性添加到 SDC RPC 到卡夫卡源。

添加 Kafka 配置属性时,请输入确切的属性名称和值。该阶段不验证属性名称或值。

默认情况下定义了多个属性,您可以根据需要编辑或删除这些属性。

注意:由于舞台使用多个配置属性,因此它会忽略以下属性的用户定义值:
  • 密钥序列化程序.class
  • 元数据.代理列表
  • 分区程序.class
  • 生产者类型
  • 序列化程序.class

启用卡夫卡安全

您可以将 SDC RPC 配置为卡夫卡源,以便通过 SSL/TLS 和/或 Kerberos 安全地连接到卡夫卡。

启用 SSL/红绿灯系统

执行以下步骤,使 SDC RPC 到卡夫卡源使用 SSL/TLS 连接到卡夫卡。

  1. 要使用 SSL/TLS 进行连接,请首先确保按照卡夫卡文档中所述,将卡夫卡配置为使用 SSL/TLS。
  2. 在舞台的“常规”选项卡上,将“舞台库”属性设置为相应的“卡夫卡”版本。
  3. 在“卡夫卡”选项卡上,添加“夫卡”配置属性并将其设置为 SSL
  4. 然后添加并配置以下 SSL 卡夫卡属性:
    • 信任库位置
    • 信任库
    当 Kafka 代理需要客户端身份验证时 - 当 ssl.client.auth 代理属性设置为“必需”时 , 添加并配置以下属性:
    • 位置
    • 密码
    • .key密码
    某些代理可能还需要添加以下属性:
    • ssl.enabled.协议
    • 信任库类型
    • 密钥库类型

    有关这些属性的详细信息,请参阅 Kafka 文档。

例如,以下属性允许阶段使用 SSL/TLS 通过客户端身份验证连接到卡夫卡:

启用 Kerberos (SASL)

使用 Kerberos 身份验证时,数据收集器将使用 Kerberos 主体和密钥选项卡连接到 Kafka。执行以下步骤,使 SDC RPC 到卡夫卡源能够使用 Kerberos 连接到卡夫卡。

  1. 要使用 Kerberos,请首先确保按照 Kafka 文档中的说明为 Kerberos 配置了 Kafka。
  2. 确保为数据收集器启用了 Kerberos 身份验证,如 Kerberos 身份验证中所述。
  3. 根据您的安装和认证类型,添加 Kafka 客户机所需的 Java 认证和授权服务 (JAAS) 配置属性:
    • 在没有 LDAP 认证的情况下安装 RPM、压缩包或 Cloudera 管理器 - 如果数据收集器不使用 LDAP 认证,请在数据收集器计算机上创建一个单独的 JAAS 配置文件。将以下登录部分添加到文件中:KafkaClient
      <span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="<keytab path>"principal="<principal name>/<host name>@<realm>";
      };</code></span></span>
      例如:
      <span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/etc/security/keytabs/sdc.keytab"principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM";
      };</code></span></span>
      然后修改SDC_JAVA_OPTS环境变量,以包括以下定义 JAAS 配置文件路径的选项:
      <span style="color:#333333"><span style="background-color:#eeeeee"><code>-Djava.security.auth.login.config=<JAAS config path></code></span></span>

      使用安装类型所需的方法修改环境变量。

    • 使用 LDAP 认证的 RPM 或压缩包安装 - 如果在 RPM 或压缩包安装中启用了 LDAP 认证,请将属性添加到数据收集器使用的 JAAS 配置文件 - 该文件。将以下登录部分添加到文件末尾:$SDC_CONF/ldap-login.confKafkaClientldap-login.conf
      <span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="<keytab path>"principal="<principal name>/<host name>@<realm>";
      };</code></span></span>
      例如:
      <span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/etc/security/keytabs/sdc.keytab"principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM";
      };</code></span></span>
    • 使用 LDAP 身份验证进行云汇管理器安装 - 如果在云印管理器安装中启用了 LDAP 身份验证,请在云端管理器中为流集服务启用 LDAP 配置文件替换(ldap.login.file.allow.替换)属性。

      如果启用了“使用安全阀编辑 LDAP 信息”属性,并且“数据收集器高级配置代码段(安全阀)”字段中配置了 LDAP 认证,那么将 JAAS 配置属性添加到同一个 ldap-login.conf 安全阀中。

      如果 LDAP 认证是通过 LDAP 属性而不是 ldap-login.conf 安全值配置的,请将 JAAS 配置属性添加到数据收集器高级配置代码段(安全阀)中,以用于生成的 ldap 登录名附加.conf 字段。

      将以下登录部分添加到相应的字段中,如下所示:KafkaClient

      <span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="_KEYTAB_PATH"principal="<principal name>/_HOST@<realm>";
      };</code></span></span>
      例如:
      <span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="_KEYTAB_PATH"principal="sdc/_HOST@EXAMPLE.COM";
      };</code></span></span>

      Cloudera 管理器会生成相应的密钥表路径和主机名。

  4. 在舞台的“常规”选项卡上,将“舞台库”属性设置为相应的“卡夫卡”版本。
  5. 在“卡夫卡”选项卡上,添加“安全协议”配置属性,并将其设置为SASL_PLAINTEXT
  6. 然后,添加 sasl.kerberos.service.name 配置属性,并将其设置为 kafka

例如,以下卡夫卡属性允许使用 Kerberos 连接到卡夫卡:

启用 SSL/TLS 和 Kerberos

您可以启用 SDC RPC 到卡夫卡源,以使用 SSL/TLS 和 Kerberos 连接到卡夫卡。

要使用 SSL/TLS 和 Kerberos,请组合所需的步骤以启用每个步骤,并按如下方式设置 security.protocol 属性:
  1. 确保卡夫卡配置为使用 SSL/TLS 和 Kerberos (SASL),如以下卡夫卡文档中所述:
    • Apache Kafka
    • Apache Kafka
  2. 确保为数据收集器启用了 Kerberos 身份验证,如 Kerberos 身份验证中所述。
  3. 根据您的安装和认证类型,添加 Kafka 客户机所需的 Java 认证和授权服务 (JAAS) 配置属性:
    • 在没有 LDAP 认证的情况下安装 RPM、压缩包或 Cloudera 管理器 - 如果数据收集器不使用 LDAP 认证,请在数据收集器计算机上创建一个单独的 JAAS 配置文件。将以下登录部分添加到文件中:KafkaClient
      <span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="<keytab path>"principal="<principal name>/<host name>@<realm>";
      };</code></span></span>
      例如:
      <span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/etc/security/keytabs/sdc.keytab"principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM";
      };</code></span></span>
      然后修改SDC_JAVA_OPTS环境变量,以包括以下定义 JAAS 配置文件路径的选项:
      <span style="color:#333333"><span style="background-color:#eeeeee"><code>-Djava.security.auth.login.config=<JAAS config path></code></span></span>

      使用安装类型所需的方法修改环境变量。

    • 使用 LDAP 认证的 RPM 或压缩包安装 - 如果在 RPM 或压缩包安装中启用了 LDAP 认证,请将属性添加到数据收集器使用的 JAAS 配置文件 - 该文件。将以下登录部分添加到文件末尾:$SDC_CONF/ldap-login.confKafkaClientldap-login.conf
      <span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="<keytab path>"principal="<principal name>/<host name>@<realm>";
      };</code></span></span>
      例如:
      <span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/etc/security/keytabs/sdc.keytab"principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM";
      };</code></span></span>
    • 使用 LDAP 身份验证进行云汇管理器安装 - 如果在云印管理器安装中启用了 LDAP 身份验证,请在云端管理器中为流集服务启用 LDAP 配置文件替换(ldap.login.file.allow.替换)属性。

      如果启用了“使用安全阀编辑 LDAP 信息”属性,并且“数据收集器高级配置代码段(安全阀)”字段中配置了 LDAP 认证,那么将 JAAS 配置属性添加到同一个 ldap-login.conf 安全阀中。

      如果 LDAP 认证是通过 LDAP 属性而不是 ldap-login.conf 安全值配置的,请将 JAAS 配置属性添加到数据收集器高级配置代码段(安全阀)中,以用于生成的 ldap 登录名附加.conf 字段。

      将以下登录部分添加到相应的字段中,如下所示:KafkaClient

      <span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="_KEYTAB_PATH"principal="<principal name>/_HOST@<realm>";
      };</code></span></span>
      例如:
      <span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="_KEYTAB_PATH"principal="sdc/_HOST@EXAMPLE.COM";
      };</code></span></span>

      Cloudera 管理器会生成相应的密钥表路径和主机名。

  4. 在舞台的“常规”选项卡上,将“舞台库”属性设置为相应的“卡夫卡”版本。
  5. 在“卡夫卡”选项卡上,添加 security.protocol 属性并将其设置为SASL_SSL
  6. 然后,添加 sasl.kerberos.service.name 配置属性,并将其设置为 kafka
  7. 然后添加并配置以下 SSL 卡夫卡属性:
    • 信任库位置
    • 信任库
    当 Kafka 代理需要客户端身份验证时 - 当 ssl.client.auth 代理属性设置为“必需”时 , 添加并配置以下属性:
    • 位置
    • 密码
    • .key密码
    某些代理可能还需要添加以下属性:
    • ssl.enabled.协议
    • 信任库类型
    • 密钥库类型

    有关这些属性的详细信息,请参阅 Kafka 文档。

将 SDC RPC 配置为卡夫卡源

将 SDC RPC 配置为卡夫卡源,以将数据从多个 SDC RPC 目标直接写入卡夫卡。

  1. 在“属性”面板的“常规”选项卡上,配置以下属性:
    一般财产描述: __________
    名字艺名。
    描述: __________可选说明。
    舞台库要使用的库版本。
    记录错误阶段的错误记录处理:
    • 丢弃 - 丢弃记录。
    • 发送到错误 - 将记录发送到管道以进行错误处理。
    • 停止管道 - 停止管道。对群集管道无效。
  2. 在“卡夫卡”选项卡上,配置以下属性:
    卡夫卡 房产描述: __________
    代理 URI卡夫卡代理的连接字符串。使用以下格式:。<host>:<port>

    要确保连接,请输入以逗号分隔的其他代理 URI 列表。

    主题卡夫卡主题阅读。
    最大邮件大小 (KB)要写入卡夫卡的消息的最大大小。
    警告:必须小于 Kafka 中配置的最大邮件大小。

    默认值为 900 KB。

    卡夫卡配置

    要使用的其他卡夫卡配置属性。使用简单或批量编辑模式,单击添加图标以添加属性。定义卡夫卡属性名称和值。

    按照 Kafka 的预期使用属性名称和值。

    有关启用与 Kafka 的安全连接的信息,请参阅启用 Kafka 安全性。

  3. “RPC”选项卡上,配置以下属性:
    断续器 化学性质描述: __________
    接收端口要侦听数据的端口号。必须与提供数据的 SDC RPC 目标关联的端口号之一匹配。
    注意:没有其他管道或进程可以绑定到侦听端口。侦听端口只能由单个管道使用。
    最大并发请求数一次允许的最大并发请求数。
    危险保护程序编号用户定义的 ID 必须与 SDC RPC 目标中定义的 RPC ID 匹配。
    最大批请求大小 (MB)一次请求和处理的最大数据量。

    默认值为 100 MB。

  4. 要使用 SSL/TLS,请单击 TLS 选项卡并配置以下属性:
    断续器属性描述: __________
    使用红绿灯系统

    启用 TLS 的使用。

    密钥库文件密钥库文件的路径。输入文件的绝对路径或相对于数据收集器资源目录的路径:$SDC_RESOURCES

    有关环境变量的详细信息,请参阅数据收集器环境配置。

    缺省情况下,不使用密钥库。

    密钥库类型要使用的密钥库的类型。使用以下类型之一:
    • Java 密钥库文件 (JKS)
    • PKCS #12 (p12 文件)

    缺省值为 Java 密钥库文件 (JKS)。

    密钥库密码密钥库文件的密码。密码是可选的,但建议使用。
    提示:若要保护敏感信息(如密码),可以使用 运行时资源或 凭据存储。
    密钥库密钥算法用于管理密钥库的算法。

    默认值为 SunX509。

    使用默认协议确定要使用的传输层安全性 (TLS) 协议。默认协议为 TLSv1.2。若要使用其他协议,请清除此选项。
    传输协议要使用的 TLS 协议。要使用默认 TLSv1.2 以外的协议,请单击添加图标并输入协议名称。您可以使用简单或批量编辑模式来添加协议。
    注意:较旧的协议不如 TLSv1.2 安全。
    使用默认密码套件确定执行 SSL/TLS 握手时要使用的默认密码套件。若要使用其他密码套件,请清除此选项。
    密码套房要使用的密码套件。要使用不属于默认密码集的密码套件,请单击添加图标并输入密码套件的名称。您可以使用简单或批量编辑模式来添加密码套件。

    输入要使用的其他密码套件的 Java 安全套接字扩展 (JSSE) 名称。

这篇关于SDC RPC到Kafka的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/376350

相关文章

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D

关于rpc长连接与短连接的思考记录

《关于rpc长连接与短连接的思考记录》文章总结了RPC项目中长连接和短连接的处理方式,包括RPC和HTTP的长连接与短连接的区别、TCP的保活机制、客户端与服务器的连接模式及其利弊分析,文章强调了在实... 目录rpc项目中的长连接与短连接的思考什么是rpc项目中的长连接和短连接与tcp和http的长连接短

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

Kafka拦截器的神奇操作方法

《Kafka拦截器的神奇操作方法》Kafka拦截器是一种强大的机制,用于在消息发送和接收过程中插入自定义逻辑,它们可以用于消息定制、日志记录、监控、业务逻辑集成、性能统计和异常处理等,本文介绍Kafk... 目录前言拦截器的基本概念Kafka 拦截器的定义和基本原理:拦截器是 Kafka 消息传递的不可或缺

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

IDEA中的Kafka管理神器详解

《IDEA中的Kafka管理神器详解》这款基于IDEA插件实现的Kafka管理工具,能够在本地IDE环境中直接运行,简化了设置流程,为开发者提供了更加紧密集成、高效且直观的Kafka操作体验... 目录免安装:IDEA中的Kafka管理神器!简介安装必要的插件创建 Kafka 连接第一步:创建连接第二步:选

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

Java消息队列:RabbitMQ与Kafka的集成与应用

Java消息队列:RabbitMQ与Kafka的集成与应用 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介绍如何在Java应用中集成RabbitMQ和Kafka,并展示它们的应用场景。 消息队

Kafka (快速)安装部署

文章目录 1、软件下载&配置环境1_JDK安装2_Zookeeper安装3_Kafka安装 2、单机安装1_配置主机名和IP映射2_单机Kafka配置 3、集群安装1_配置主机名和IP的映射关系2_时钟同步3_Zookeeper配置信息4_集群Kafka配置 4、kafka的其他脚本命令 1、软件下载&配置环境 下面的操作无论是单机部署还是分布式集群环境下都是通用的。 准

Kafka 分布式消息系统详细介绍

Kafka 分布式消息系统 一、Kafka 概述1.1 Kafka 定义1.2 Kafka 设计目标1.3 Kafka 特点 二、Kafka 架构设计2.1 基本架构2.2 Topic 和 Partition2.3 消费者和消费者组2.4 Replica 副本 三、Kafka 分布式集群搭建3.1 下载解压3.1.1 上传解压 3.2 修改 Kafka 配置文件3.2.1 修改zookeep