【Kafka】Kafka 1.0.1案例详解之Kafka Streams

2024-01-15 10:18
文章标签 详解 案例 1.0 kafka streams

本文主要是介绍【Kafka】Kafka 1.0.1案例详解之Kafka Streams,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在这之前我们已经讲解了Kafka的安装部署和最核心的发布订阅功能,本次章节我们来介绍Kafka的新特性——Kafka Streams。

首先,要研究一样新东西,我们需要知道它是做什么的:

Kafka Streams is a client library for processing and analyzing data stored in Kafka. It builds upon important stream processing concepts such as properly distinguishing between event time and processing time, windowing support, and simple yet efficient management and real-time querying of application state.

大家仔细阅读上面一段话可以知道,Kafka Streams是一个用来处理Kafka消息的库,它包含了如下几个优势:

  1. 通过与现有的Java应用整合,我们可以设计出简单的、轻量级的客户端类库

  2. 只需要基于Kafka自身的消息系统,不需要额外的第三方系统,就可以很容易地实现水平扩展

  3. 通过可容错的状态管理,实现高效的窗口操作和聚合

  4. 支持 exactly-once语义

  5. 既支持基于时间窗口的操作,也支持每次单条数据的处理

  6. 既支持低阶的流处理接口,也支持高阶的流处理DSL(领域专用语言)

Kafka Streams处理剖析图

8dedbcba9f7a942252b660624732bf8c.jpeg

案例剖析

说了这么多理论知识,实际上用起来很简单,接下来我们通过一个简单的例子来熟悉这个新特性。

添加依赖

kafka-streams是一个单独的依赖包,并不存在于kafka-client中

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>1.0.1</version>
</dependency>

属性配置

添加属性配置,application id相当于group id,bootstrap servers配置kafka的brokers地址,并配置key与value的序列化、反序列化实现类。这两个类均实现了

org.apache.kafka.common.serialization.Serde接口

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

读取并处理输出

最后通过StreamsBuilder来创建KStream,进行数据处理转换后输出到一个新的topic或者其他外部存储器中。

builder.stream("streams-plaintext-input").to("streams-pipe-output");
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);

退出机制

最后添加退出时的处理逻辑

// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {@Overridepublic void run() {streams.close();latch.countDown();}
});

我们可以在github中查看完整的程序代码:

https://github.com/lubinsu/new-kafka

156c2c0d92e9b8ce6790e672bab69c68.jpeg

生活

岂止于美

f5b4ea41f58297ff793aaf16419c528f.jpeg

作者:苏鹭彬

长按二维码关注

这篇关于【Kafka】Kafka 1.0.1案例详解之Kafka Streams的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/608551

相关文章

PostgreSQL的扩展dict_int应用案例解析

《PostgreSQL的扩展dict_int应用案例解析》dict_int扩展为PostgreSQL提供了专业的整数文本处理能力,特别适合需要精确处理数字内容的搜索场景,本文给大家介绍PostgreS... 目录PostgreSQL的扩展dict_int一、扩展概述二、核心功能三、安装与启用四、字典配置方法

MySQL 中的 CAST 函数详解及常见用法

《MySQL中的CAST函数详解及常见用法》CAST函数是MySQL中用于数据类型转换的重要函数,它允许你将一个值从一种数据类型转换为另一种数据类型,本文给大家介绍MySQL中的CAST... 目录mysql 中的 CAST 函数详解一、基本语法二、支持的数据类型三、常见用法示例1. 字符串转数字2. 数字

SpringBoot中SM2公钥加密、私钥解密的实现示例详解

《SpringBoot中SM2公钥加密、私钥解密的实现示例详解》本文介绍了如何在SpringBoot项目中实现SM2公钥加密和私钥解密的功能,通过使用Hutool库和BouncyCastle依赖,简化... 目录一、前言1、加密信息(示例)2、加密结果(示例)二、实现代码1、yml文件配置2、创建SM2工具

MyBatis-Plus 中 nested() 与 and() 方法详解(最佳实践场景)

《MyBatis-Plus中nested()与and()方法详解(最佳实践场景)》在MyBatis-Plus的条件构造器中,nested()和and()都是用于构建复杂查询条件的关键方法,但... 目录MyBATis-Plus 中nested()与and()方法详解一、核心区别对比二、方法详解1.and()

Spring IoC 容器的使用详解(最新整理)

《SpringIoC容器的使用详解(最新整理)》文章介绍了Spring框架中的应用分层思想与IoC容器原理,通过分层解耦业务逻辑、数据访问等模块,IoC容器利用@Component注解管理Bean... 目录1. 应用分层2. IoC 的介绍3. IoC 容器的使用3.1. bean 的存储3.2. 方法注

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语

Python内置函数之classmethod函数使用详解

《Python内置函数之classmethod函数使用详解》:本文主要介绍Python内置函数之classmethod函数使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 类方法定义与基本语法2. 类方法 vs 实例方法 vs 静态方法3. 核心特性与用法(1编程客

Python函数作用域示例详解

《Python函数作用域示例详解》本文介绍了Python中的LEGB作用域规则,详细解析了变量查找的四个层级,通过具体代码示例,展示了各层级的变量访问规则和特性,对python函数作用域相关知识感兴趣... 目录一、LEGB 规则二、作用域实例2.1 局部作用域(Local)2.2 闭包作用域(Enclos

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os

Java内存分配与JVM参数详解(推荐)

《Java内存分配与JVM参数详解(推荐)》本文详解JVM内存结构与参数调整,涵盖堆分代、元空间、GC选择及优化策略,帮助开发者提升性能、避免内存泄漏,本文给大家介绍Java内存分配与JVM参数详解,... 目录引言JVM内存结构JVM参数概述堆内存分配年轻代与老年代调整堆内存大小调整年轻代与老年代比例元空