【Kafka】Kafka 1.0.1案例详解之Kafka Streams

2024-01-15 10:18
文章标签 详解 案例 1.0 kafka streams

本文主要是介绍【Kafka】Kafka 1.0.1案例详解之Kafka Streams,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在这之前我们已经讲解了Kafka的安装部署和最核心的发布订阅功能,本次章节我们来介绍Kafka的新特性——Kafka Streams。

首先,要研究一样新东西,我们需要知道它是做什么的:

Kafka Streams is a client library for processing and analyzing data stored in Kafka. It builds upon important stream processing concepts such as properly distinguishing between event time and processing time, windowing support, and simple yet efficient management and real-time querying of application state.

大家仔细阅读上面一段话可以知道,Kafka Streams是一个用来处理Kafka消息的库,它包含了如下几个优势:

  1. 通过与现有的Java应用整合,我们可以设计出简单的、轻量级的客户端类库

  2. 只需要基于Kafka自身的消息系统,不需要额外的第三方系统,就可以很容易地实现水平扩展

  3. 通过可容错的状态管理,实现高效的窗口操作和聚合

  4. 支持 exactly-once语义

  5. 既支持基于时间窗口的操作,也支持每次单条数据的处理

  6. 既支持低阶的流处理接口,也支持高阶的流处理DSL(领域专用语言)

Kafka Streams处理剖析图

8dedbcba9f7a942252b660624732bf8c.jpeg

案例剖析

说了这么多理论知识,实际上用起来很简单,接下来我们通过一个简单的例子来熟悉这个新特性。

添加依赖

kafka-streams是一个单独的依赖包,并不存在于kafka-client中

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>1.0.1</version>
</dependency>

属性配置

添加属性配置,application id相当于group id,bootstrap servers配置kafka的brokers地址,并配置key与value的序列化、反序列化实现类。这两个类均实现了

org.apache.kafka.common.serialization.Serde接口

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

读取并处理输出

最后通过StreamsBuilder来创建KStream,进行数据处理转换后输出到一个新的topic或者其他外部存储器中。

builder.stream("streams-plaintext-input").to("streams-pipe-output");
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);

退出机制

最后添加退出时的处理逻辑

// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {@Overridepublic void run() {streams.close();latch.countDown();}
});

我们可以在github中查看完整的程序代码:

https://github.com/lubinsu/new-kafka

156c2c0d92e9b8ce6790e672bab69c68.jpeg

生活

岂止于美

f5b4ea41f58297ff793aaf16419c528f.jpeg

作者:苏鹭彬

长按二维码关注

这篇关于【Kafka】Kafka 1.0.1案例详解之Kafka Streams的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/608551

相关文章

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D

Java中ArrayList和LinkedList有什么区别举例详解

《Java中ArrayList和LinkedList有什么区别举例详解》:本文主要介绍Java中ArrayList和LinkedList区别的相关资料,包括数据结构特性、核心操作性能、内存与GC影... 目录一、底层数据结构二、核心操作性能对比三、内存与 GC 影响四、扩容机制五、线程安全与并发方案六、工程

Spring Cloud LoadBalancer 负载均衡详解

《SpringCloudLoadBalancer负载均衡详解》本文介绍了如何在SpringCloud中使用SpringCloudLoadBalancer实现客户端负载均衡,并详细讲解了轮询策略和... 目录1. 在 idea 上运行多个服务2. 问题引入3. 负载均衡4. Spring Cloud Load

Springboot中分析SQL性能的两种方式详解

《Springboot中分析SQL性能的两种方式详解》文章介绍了SQL性能分析的两种方式:MyBatis-Plus性能分析插件和p6spy框架,MyBatis-Plus插件配置简单,适用于开发和测试环... 目录SQL性能分析的两种方式:功能介绍实现方式:实现步骤:SQL性能分析的两种方式:功能介绍记录

在 Spring Boot 中使用 @Autowired和 @Bean注解的示例详解

《在SpringBoot中使用@Autowired和@Bean注解的示例详解》本文通过一个示例演示了如何在SpringBoot中使用@Autowired和@Bean注解进行依赖注入和Bean... 目录在 Spring Boot 中使用 @Autowired 和 @Bean 注解示例背景1. 定义 Stud

如何通过海康威视设备网络SDK进行Java二次开发摄像头车牌识别详解

《如何通过海康威视设备网络SDK进行Java二次开发摄像头车牌识别详解》:本文主要介绍如何通过海康威视设备网络SDK进行Java二次开发摄像头车牌识别的相关资料,描述了如何使用海康威视设备网络SD... 目录前言开发流程问题和解决方案dll库加载不到的问题老旧版本sdk不兼容的问题关键实现流程总结前言作为

SQL 中多表查询的常见连接方式详解

《SQL中多表查询的常见连接方式详解》本文介绍SQL中多表查询的常见连接方式,包括内连接(INNERJOIN)、左连接(LEFTJOIN)、右连接(RIGHTJOIN)、全外连接(FULLOUTER... 目录一、连接类型图表(ASCII 形式)二、前置代码(创建示例表)三、连接方式代码示例1. 内连接(I

Go路由注册方法详解

《Go路由注册方法详解》Go语言中,http.NewServeMux()和http.HandleFunc()是两种不同的路由注册方式,前者创建独立的ServeMux实例,适合模块化和分层路由,灵活性高... 目录Go路由注册方法1. 路由注册的方式2. 路由器的独立性3. 灵活性4. 启动服务器的方式5.

Java中八大包装类举例详解(通俗易懂)

《Java中八大包装类举例详解(通俗易懂)》:本文主要介绍Java中的包装类,包括它们的作用、特点、用途以及如何进行装箱和拆箱,包装类还提供了许多实用方法,如转换、获取基本类型值、比较和类型检测,... 目录一、包装类(Wrapper Class)1、简要介绍2、包装类特点3、包装类用途二、装箱和拆箱1、装

Go语言中三种容器类型的数据结构详解

《Go语言中三种容器类型的数据结构详解》在Go语言中,有三种主要的容器类型用于存储和操作集合数据:本文主要介绍三者的使用与区别,感兴趣的小伙伴可以跟随小编一起学习一下... 目录基本概念1. 数组(Array)2. 切片(Slice)3. 映射(Map)对比总结注意事项基本概念在 Go 语言中,有三种主要