本文主要是介绍SDC RPC到Kafka,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
SDC RPC 到卡夫卡(已弃用)
从一个或多个 SDC RPC 目标读取数据,并将其立即写入卡夫卡。在 SDC RPC 目标管道中使用 SDC RPC 到卡夫卡源。但是,到 Kafka 源的 SDC RPC 现已弃用,并将在将来的版本中被删除。我们建议使用 SDC RPC 源。
如果有多个 SDC RPC 源管道,其中包含要写入卡夫卡而无需额外处理的数据,请使用 SDC RPC 到卡夫卡源。
与 SDC RPC 源一样,从 SDC RPC 到卡夫卡源从另一个管道中的 SDC RPC 目标读取数据。但是,到卡夫卡源的 SDC RPC 经过优化,可将数据从多个管道直接写入卡夫卡。使用此源时,无法在写入 Kafka 之前执行其他处理。
以下是使用 SDC RPC 到卡夫卡源的推荐体系结构示例:
将 SDC RPC 配置为 Kafka 源时,可以定义源侦听数据的端口、SDC RPC ID、最大并发请求数和最大批量请求大小。您还可以配置 SSL/TLS 属性,包括默认传输协议和密码套件。
您还需要为 Kafka 配置连接信息,包括代理 URI、要写入的主题以及最大消息大小。您可以根据需要添加卡夫卡配置属性并启用卡夫卡安全性。
管道配置
在管道中使用 SDC RPC 到 Kafka 源时,请将源连接到废纸篓目标。
将 SDC RPC 写入卡夫卡源,将记录直接写入卡夫卡。源不会将记录传递到其输出端口,因此您无法执行其他处理或将数据写入其他目标系统。
但是,由于管道需要目标,因此应将原点连接到废纸篓目标以满足管道验证要求。
将 SDC RPC 连接到卡夫卡源的管道应如下所示:
并发请求
您可以指定 SDC RPC 对 Kafka 源一次处理的最大请求数。
当源管道中的 SDC RPC 目标想要将一批数据传递到源时,它会向 Kafka 源的 SDC RPC 发送请求。如果有一个源管道将数据传递到 SDC RPC 再到 Kafka 源,则可以将最大并发请求数设置为 1,因为目标一次处理一批数据。
通常,您将有多个管道将数据传递到此源。在这种情况下,应评估源管道的数量、管道的预期输出以及 Data Collector 计算机的资源,然后根据需要调整属性以提高管道性能。
例如,如果有 100 个源管道将数据传递到 SDC RPC 到 Kafka 源,但管道生成数据的速度很慢,则可以将最大值设置为 20,以防止这些管道在卷峰值期间使用过多的数据收集器资源。或者,如果数据收集器没有资源问题,并且您希望它尽快处理数据,则可以将最大值设置为 90 或 100。请注意,SDC RPC 目标还具有用于重试和回退周期的高级属性,可用于帮助调整性能。
批请求大小、卡夫卡消息大小和卡夫卡配置
将 SDC RPC 配置为彼此相关的卡夫卡最大批处理请求大小和卡夫卡消息大小属性,以及卡夫卡中配置的最大消息大小。
“最大批处理请求大小 (MB)”属性确定源从每个 SDC RPC 目标接受的数据批的最大大小。收到一批数据后,源会立即将数据写入 Kafka。
为了提升峰值性能,源将尽可能多的记录写入单个 Kafka 消息。“Kafka 最大消息大小 (KB)”属性确定它所创建的消息的最大大小。
例如,假设源使用默认的 100 MB 作为最大批处理请求大小,默认的 900 KB 作为最大消息大小,Kafka 对消息.max.bytes 使用默认值 1 MB。
当源请求一批数据时,它一次最多接收 100 MB 的数据。当源写入 Kafka 时,它会将记录分组为尽可能少的消息,包括每条消息中最多 900 KB 的记录。由于消息大小小于 Kafka 1 MB 的要求,因此源已成功将所有消息写入 Kafka。
如果记录大于 900 KB 的最大消息大小,则源将生成错误,并且不会将记录(或包含该记录的批处理)写入 Kafka。为批处理提供超大记录的 SDC RPC 目标根据阶段错误记录处理来处理批处理。
其他卡夫卡属性
您可以将自定义卡夫卡配置属性添加到 SDC RPC 到卡夫卡源。
添加 Kafka 配置属性时,请输入确切的属性名称和值。该阶段不验证属性名称或值。
默认情况下定义了多个属性,您可以根据需要编辑或删除这些属性。
- 密钥序列化程序.class
- 元数据.代理列表
- 分区程序.class
- 生产者类型
- 序列化程序.class
启用卡夫卡安全
您可以将 SDC RPC 配置为卡夫卡源,以便通过 SSL/TLS 和/或 Kerberos 安全地连接到卡夫卡。
启用 SSL/红绿灯系统
执行以下步骤,使 SDC RPC 到卡夫卡源使用 SSL/TLS 连接到卡夫卡。
- 要使用 SSL/TLS 进行连接,请首先确保按照卡夫卡文档中所述,将卡夫卡配置为使用 SSL/TLS。
- 在舞台的“常规”选项卡上,将“舞台库”属性设置为相应的“卡夫卡”版本。
- 在“卡夫卡”选项卡上,添加“卡夫卡”配置属性并将其设置为 SSL。
- 然后添加并配置以下 SSL 卡夫卡属性:
- 信任库位置
- 信任库
当 Kafka 代理需要客户端身份验证时 - 当 ssl.client.auth 代理属性设置为“必需”时 , 添加并配置以下属性:- 位置
- 密码
- .key密码
某些代理可能还需要添加以下属性:- ssl.enabled.协议
- 信任库类型
- 密钥库类型
有关这些属性的详细信息,请参阅 Kafka 文档。
例如,以下属性允许阶段使用 SSL/TLS 通过客户端身份验证连接到卡夫卡:

启用 Kerberos (SASL)
使用 Kerberos 身份验证时,数据收集器将使用 Kerberos 主体和密钥选项卡连接到 Kafka。执行以下步骤,使 SDC RPC 到卡夫卡源能够使用 Kerberos 连接到卡夫卡。
- 要使用 Kerberos,请首先确保按照 Kafka 文档中的说明为 Kerberos 配置了 Kafka。
- 确保为数据收集器启用了 Kerberos 身份验证,如 Kerberos 身份验证中所述。
- 根据您的安装和认证类型,添加 Kafka 客户机所需的 Java 认证和授权服务 (JAAS) 配置属性:
- 在没有 LDAP 认证的情况下安装 RPM、压缩包或 Cloudera 管理器 - 如果数据收集器不使用 LDAP 认证,请在数据收集器计算机上创建一个单独的 JAAS 配置文件。将以下登录部分添加到文件中:
KafkaClient
<span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="<keytab path>"principal="<principal name>/<host name>@<realm>"; };</code></span></span>
例如:<span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/etc/security/keytabs/sdc.keytab"principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM"; };</code></span></span>
然后修改SDC_JAVA_OPTS环境变量,以包括以下定义 JAAS 配置文件路径的选项:<span style="color:#333333"><span style="background-color:#eeeeee"><code>-Djava.security.auth.login.config=<JAAS config path></code></span></span>
使用安装类型所需的方法修改环境变量。
- 使用 LDAP 认证的 RPM 或压缩包安装 - 如果在 RPM 或压缩包安装中启用了 LDAP 认证,请将属性添加到数据收集器使用的 JAAS 配置文件 - 该文件。将以下登录部分添加到文件末尾:
$SDC_CONF/ldap-login.conf
KafkaClient
ldap-login.conf
<span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="<keytab path>"principal="<principal name>/<host name>@<realm>"; };</code></span></span>
例如:<span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/etc/security/keytabs/sdc.keytab"principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM"; };</code></span></span>
- 使用 LDAP 身份验证进行云汇管理器安装 - 如果在云印管理器安装中启用了 LDAP 身份验证,请在云端管理器中为流集服务启用 LDAP 配置文件替换(ldap.login.file.allow.替换)属性。
如果启用了“使用安全阀编辑 LDAP 信息”属性,并且“数据收集器高级配置代码段(安全阀)”字段中配置了 LDAP 认证,那么将 JAAS 配置属性添加到同一个 ldap-login.conf 安全阀中。
如果 LDAP 认证是通过 LDAP 属性而不是 ldap-login.conf 安全值配置的,请将 JAAS 配置属性添加到数据收集器高级配置代码段(安全阀)中,以用于生成的 ldap 登录名附加.conf 字段。
将以下登录部分添加到相应的字段中,如下所示:
KafkaClient
<span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="_KEYTAB_PATH"principal="<principal name>/_HOST@<realm>"; };</code></span></span>
例如:<span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="_KEYTAB_PATH"principal="sdc/_HOST@EXAMPLE.COM"; };</code></span></span>
Cloudera 管理器会生成相应的密钥表路径和主机名。
- 在没有 LDAP 认证的情况下安装 RPM、压缩包或 Cloudera 管理器 - 如果数据收集器不使用 LDAP 认证,请在数据收集器计算机上创建一个单独的 JAAS 配置文件。将以下登录部分添加到文件中:
- 在舞台的“常规”选项卡上,将“舞台库”属性设置为相应的“卡夫卡”版本。
- 在“卡夫卡”选项卡上,添加“安全协议”配置属性,并将其设置为SASL_PLAINTEXT。
- 然后,添加 sasl.kerberos.service.name 配置属性,并将其设置为 kafka。
例如,以下卡夫卡属性允许使用 Kerberos 连接到卡夫卡:
启用 SSL/TLS 和 Kerberos
您可以启用 SDC RPC 到卡夫卡源,以使用 SSL/TLS 和 Kerberos 连接到卡夫卡。
- 确保卡夫卡配置为使用 SSL/TLS 和 Kerberos (SASL),如以下卡夫卡文档中所述:
- Apache Kafka
- Apache Kafka
- 确保为数据收集器启用了 Kerberos 身份验证,如 Kerberos 身份验证中所述。
- 根据您的安装和认证类型,添加 Kafka 客户机所需的 Java 认证和授权服务 (JAAS) 配置属性:
- 在没有 LDAP 认证的情况下安装 RPM、压缩包或 Cloudera 管理器 - 如果数据收集器不使用 LDAP 认证,请在数据收集器计算机上创建一个单独的 JAAS 配置文件。将以下登录部分添加到文件中:
KafkaClient
<span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="<keytab path>"principal="<principal name>/<host name>@<realm>"; };</code></span></span>
例如:<span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/etc/security/keytabs/sdc.keytab"principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM"; };</code></span></span>
然后修改SDC_JAVA_OPTS环境变量,以包括以下定义 JAAS 配置文件路径的选项:<span style="color:#333333"><span style="background-color:#eeeeee"><code>-Djava.security.auth.login.config=<JAAS config path></code></span></span>
使用安装类型所需的方法修改环境变量。
- 使用 LDAP 认证的 RPM 或压缩包安装 - 如果在 RPM 或压缩包安装中启用了 LDAP 认证,请将属性添加到数据收集器使用的 JAAS 配置文件 - 该文件。将以下登录部分添加到文件末尾:
$SDC_CONF/ldap-login.conf
KafkaClient
ldap-login.conf
<span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="<keytab path>"principal="<principal name>/<host name>@<realm>"; };</code></span></span>
例如:<span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="/etc/security/keytabs/sdc.keytab"principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM"; };</code></span></span>
- 使用 LDAP 身份验证进行云汇管理器安装 - 如果在云印管理器安装中启用了 LDAP 身份验证,请在云端管理器中为流集服务启用 LDAP 配置文件替换(ldap.login.file.allow.替换)属性。
如果启用了“使用安全阀编辑 LDAP 信息”属性,并且“数据收集器高级配置代码段(安全阀)”字段中配置了 LDAP 认证,那么将 JAAS 配置属性添加到同一个 ldap-login.conf 安全阀中。
如果 LDAP 认证是通过 LDAP 属性而不是 ldap-login.conf 安全值配置的,请将 JAAS 配置属性添加到数据收集器高级配置代码段(安全阀)中,以用于生成的 ldap 登录名附加.conf 字段。
将以下登录部分添加到相应的字段中,如下所示:
KafkaClient
<span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="_KEYTAB_PATH"principal="<principal name>/_HOST@<realm>"; };</code></span></span>
例如:<span style="color:#333333"><span style="background-color:#eeeeee"><code>KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab="_KEYTAB_PATH"principal="sdc/_HOST@EXAMPLE.COM"; };</code></span></span>
Cloudera 管理器会生成相应的密钥表路径和主机名。
- 在没有 LDAP 认证的情况下安装 RPM、压缩包或 Cloudera 管理器 - 如果数据收集器不使用 LDAP 认证,请在数据收集器计算机上创建一个单独的 JAAS 配置文件。将以下登录部分添加到文件中:
- 在舞台的“常规”选项卡上,将“舞台库”属性设置为相应的“卡夫卡”版本。
- 在“卡夫卡”选项卡上,添加 security.protocol 属性并将其设置为SASL_SSL。
- 然后,添加 sasl.kerberos.service.name 配置属性,并将其设置为 kafka。
- 然后添加并配置以下 SSL 卡夫卡属性:
- 信任库位置
- 信任库
当 Kafka 代理需要客户端身份验证时 - 当 ssl.client.auth 代理属性设置为“必需”时 , 添加并配置以下属性:- 位置
- 密码
- .key密码
某些代理可能还需要添加以下属性:- ssl.enabled.协议
- 信任库类型
- 密钥库类型
有关这些属性的详细信息,请参阅 Kafka 文档。
将 SDC RPC 配置为卡夫卡源
将 SDC RPC 配置为卡夫卡源,以将数据从多个 SDC RPC 目标直接写入卡夫卡。
- 在“属性”面板的“常规”选项卡上,配置以下属性:
一般财产 描述: __________ 名字 艺名。 描述: __________ 可选说明。 舞台库 要使用的库版本。 记录错误 阶段的错误记录处理: - 丢弃 - 丢弃记录。
- 发送到错误 - 将记录发送到管道以进行错误处理。
- 停止管道 - 停止管道。对群集管道无效。
- 在“卡夫卡”选项卡上,配置以下属性:
卡夫卡 房产 描述: __________ 代理 URI 卡夫卡代理的连接字符串。使用以下格式:。 <host>:<port>
要确保连接,请输入以逗号分隔的其他代理 URI 列表。
主题 卡夫卡主题阅读。 最大邮件大小 (KB) 要写入卡夫卡的消息的最大大小。 警告:必须小于 Kafka 中配置的最大邮件大小。默认值为 900 KB。
卡夫卡配置 要使用的其他卡夫卡配置属性。使用简单或批量编辑模式,单击添加图标以添加属性。定义卡夫卡属性名称和值。
按照 Kafka 的预期使用属性名称和值。
有关启用与 Kafka 的安全连接的信息,请参阅启用 Kafka 安全性。
- 在“RPC”选项卡上,配置以下属性:
断续器 化学性质 描述: __________ 接收端口 要侦听数据的端口号。必须与提供数据的 SDC RPC 目标关联的端口号之一匹配。 注意:没有其他管道或进程可以绑定到侦听端口。侦听端口只能由单个管道使用。最大并发请求数 一次允许的最大并发请求数。 危险保护程序编号 用户定义的 ID 必须与 SDC RPC 目标中定义的 RPC ID 匹配。 最大批请求大小 (MB) 一次请求和处理的最大数据量。 默认值为 100 MB。
- 要使用 SSL/TLS,请单击 TLS 选项卡并配置以下属性:
断续器属性 描述: __________ 使用红绿灯系统 启用 TLS 的使用。
密钥库文件 密钥库文件的路径。输入文件的绝对路径或相对于数据收集器资源目录的路径:$SDC_RESOURCES。 有关环境变量的详细信息,请参阅数据收集器环境配置。
缺省情况下,不使用密钥库。
密钥库类型 要使用的密钥库的类型。使用以下类型之一: - Java 密钥库文件 (JKS)
- PKCS #12 (p12 文件)
缺省值为 Java 密钥库文件 (JKS)。
密钥库密码 密钥库文件的密码。密码是可选的,但建议使用。 提示:若要保护敏感信息(如密码),可以使用 运行时资源或 凭据存储。密钥库密钥算法 用于管理密钥库的算法。 默认值为 SunX509。
使用默认协议 确定要使用的传输层安全性 (TLS) 协议。默认协议为 TLSv1.2。若要使用其他协议,请清除此选项。 传输协议 要使用的 TLS 协议。要使用默认 TLSv1.2 以外的协议,请单击添加图标并输入协议名称。您可以使用简单或批量编辑模式来添加协议。 注意:较旧的协议不如 TLSv1.2 安全。使用默认密码套件 确定执行 SSL/TLS 握手时要使用的默认密码套件。若要使用其他密码套件,请清除此选项。 密码套房 要使用的密码套件。要使用不属于默认密码集的密码套件,请单击添加图标并输入密码套件的名称。您可以使用简单或批量编辑模式来添加密码套件。 输入要使用的其他密码套件的 Java 安全套接字扩展 (JSSE) 名称。
这篇关于SDC RPC到Kafka的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!