kafka 实现worldcount

2024-08-22 22:32
文章标签 实现 kafka worldcount

本文主要是介绍kafka 实现worldcount,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章地址:http://www.haha174.top/article/details/259309
官网地址:http://kafka.apache.org/10/documentation/streams/quickstart
Kafka Streams是一个用来构建流处理程序的库,特别是其输入是一个Kafka topic,输出是另一个Kafka topic的程序(或者是调用外部服务,或者是更新数据库,或者其它)。它使得你以一种分布式以及容错的方式来做这件事情。

例子如下:

 public static void main(String[] args) throws Exception {Properties props = new Properties();props.put("application.id", "streams-wordcount");props.put("bootstrap.servers", "localhost:9092");props.put("cache.max.bytes.buffering", 0);props.put("default.key.serde", Serdes.String().getClass().getName());props.put("default.value.serde", Serdes.String().getClass().getName());props.put("auto.offset.reset", "earliest");StreamsBuilder builder = new StreamsBuilder();KStream<String, String> source = builder.stream("streams-plaintext-input");KTable<String, Long> counts = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {public Iterable<String> apply(String value) {return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));}}).groupBy(new KeyValueMapper<String, String, String>() {public String apply(String key, String value) {return value;}}).count();counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));final KafkaStreams streams = new KafkaStreams(builder.build(), props);final CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {public void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (Throwable var8) {System.exit(1);}System.exit(0);}

使用它只需要起一个streams-plaintext-input topic 的 生产者
启动一个 streams-wordcount-output的消费者
那到底有什么好处呢
简化点1: 不依赖框架的流处理
Kafka Streams使得构建流处理服务更简单的第一点就是:它不依赖于集群和框架,它只是一个库(而且是挺小的一个库)。你只需要Kafka和你自己的代码。Kafka会协调你的程序代码,使得它们可以处理故障,在不同程序实例间分发负载,在新的程序实例加入时重新对负载进行平衡。

我下面会讲一下为什么我认为这是很重要的,以及我们之前的一点经历,来帮助理解这个模型的重要性。

治愈MapReduce的宿醉
我前边讲到我们构造Apache Samza的经历,以及人们实际想要的(简单的流服务)和我们构建的东西(实时的MapReduce)之间的距离。我认为这种概念的错位是普遍的,毕竟流处理做的很多事情是从批处理世界中接管一些能力,用于低延迟的领域。同样的MapReduce遗产影响了其它主流的流处理平台(Storm, Spark等),就像它们对Samza的影响一样。

在LinkedIn在很多生产数据的处理服务是属于低延迟领域的:email, 用户生成的内容,新消息反馈等。其它的很多公司也应该有类似的异步服务,比如零售业需要给商品排序、重新定价,然后卖出,对于金融公司,实时数据更是核心。大部分这些业务,都是异步的,对于渲染页面或者更新移动app的屏幕就不会有这样的问题(这些是同步的)。

那么为什么在Storm, Samza, Spark Streaming这样的流处理框架之上构建这样的核心应用这么繁琐呢?

一个批处理框架,像是MapReduce或者Spark需要解决一些困难的问题:

它必须在一个机器池之上管理很多短期任务,并且在集群中有效地调度资源分配
为了做到这点,它必须动态地把你的代码、配置、依赖的库以及其它所有需要的东西,打包并且物理地布署到将要执行它的机器上。
它必须管理进程,并且实现共享集群的不同任务之间的隔离。
不幸的是,为了解决这些问题,框架就得变得很有侵入性。为了做到容错和扩展,框架得控制你的程序如何布署、配置、监控和打包。

那么,Kafka Streams有什么不同呢?

Kafka Streams对它想要解决的问题要更关注得多。它做了以下的事情:

当你的程序的新的实例加入,或已经有实例退出时,它会重新平衡要处理的负载
维护表的本地状态
从错误中恢复
它使用了Kafka为普通的consumer所提供的同样的组管理协议(group manager protocol)来实现。Kafka Streams可以有一些本地的状态,存储在磁盘上,但是它只是一个缓存。如果这个缓存丢失了,或者这个程序实例被转移到了别的地方,这个本地状态是可以被重建的。你可以把Kafka Streams这个库用在你的程序里,然后启动任意数量的你想要程序实例,Kafka将会把它们进行分区,并且在这些实例间进行负载的平衡。

这对于实现像滚动重启(rolling restart)或者无宕机时间的扩展(no-downtime expansion)这样简单的事情是很重要的。在现代的软件工程中,我们把这样的功能看做是应该的,但是很多流处理框架却做不到这点。

Dockers, Mesos, 以及Kurbernetes, 我的天哪!
从流处理框架中分离出打包和布署的原因是,打包和布署这个领域本身就正在进行自身的复兴。Kafka Streams可以使用经典的老实巴交维工具,像是Puppet, Chef, Salt来布署,把可以从命令行来启动。如果你年轻,时髦,你也可以把你的程序做成Dock镜像;或者你不是这样的人,那么你可以用WAR文件。

但是,对于寻找更加有灵活的管理方式的人,有很多框架的目标就是让程序更加灵活。这里列了一部分:

Apache Mesos with a framework like Marathon
Kubernetes
YARN with something like Slider
Swarm from Docker
Various hosted container services such as ECS from Amazon
Cloud Foundry
这个生态系统就和流处理生态一样专注。

的确,Mesos和Kubernets想要解决的问题是把进程分布到很多机器上,这也是当你布署一个Storm任务到Storm集群时,Storm尝试解决的问题。关键在于,这个问题最终被发现是挺难的,而这些通用的框架,至少是其中优秀的那些,会比其它的做得好得多-它们具有执行像在保持并行度的情况下重启、对主机的粘性(sticky host affinity)、真正的基于cgroup的隔离、用docker打包、花哨的UI等等功能。

你可以在这些框架里的任何一种里使用Kafka Streams,就像你会对其它程序做的一样,这是用来实现动态和有弹性的进程管理的一种简单的方式。比如,如果你有Mesos和Marathon,你可以使用Marathon UI直接启动你的Kafka Streams程序,然后动态地扩展它,而不会有服务中断, Meos会管理好进程,Kafka会管理和负载匀衡以及维护你的任务进程的状态。

使用一种这些的框架的开销是和使用Storm这样的框架的集群管理部分是一样的,但是优点是所有这些框架都是可选的(当然,Kafka Streams没有了它们也可以很好的工作)。

简化点2:Streams Meet Tables
Kafka Strems用于简化处理程序的另一个关键方式是把“表”和”流“这两个概念紧密地结合在一起。我们在之前的”turning the database inside out”中简化这个想法。那句话抓住了作为结果的系统是如何重铸程序和它的数据之彰的关系以及它是怎么应于数据变化,这样的要点。为了理想这些,我会回顾一下,解释我对于”table”和”stream”的定义,以及把二者结合在一起如何能够简化常见的异步程序。

传统的数据库都是关于在表格中存储状态的。当需要对事件流进行反应时,传统数据库做得并不好。什么是事件呢?事件只是一些已经发生了的事-可以是一个点击、一次出售、源自某个传感器的一个动态,或者抽象成任何这个世界上发生的事情。

像Storm一样的流处理程序,是从这个等式的另一端出发的。它们被设计用于处理事件流,但是基于流来产生状态却是后面才加进来的。

我认为异步程序的基本问题是把代表当前世界状态的tables与代表正在发生事件的event streams结合在一起。框架需要处理好如何表示它们,以及如何在它们之间进行转化。

为什么说这些概念是相关的呢?我们举一个零售商的简单例子。对于零售商而言,核心的事件流是卖出商品、订购新商品以及接收订购的商品。“库存表”是一个基于当前的存货量,通过售出和接收流进行加减的“表”。对于零售商而言两个关键的流处理动作是当库存开始降低时订购商品,以及根据供需关系调整商品价格。

这篇关于kafka 实现worldcount的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1097577

相关文章

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os

关于集合与数组转换实现方法

《关于集合与数组转换实现方法》:本文主要介绍关于集合与数组转换实现方法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、Arrays.asList()1.1、方法作用1.2、内部实现1.3、修改元素的影响1.4、注意事项2、list.toArray()2.1、方

使用Python实现可恢复式多线程下载器

《使用Python实现可恢复式多线程下载器》在数字时代,大文件下载已成为日常操作,本文将手把手教你用Python打造专业级下载器,实现断点续传,多线程加速,速度限制等功能,感兴趣的小伙伴可以了解下... 目录一、智能续传:从崩溃边缘抢救进度二、多线程加速:榨干网络带宽三、速度控制:做网络的好邻居四、终端交互

java实现docker镜像上传到harbor仓库的方式

《java实现docker镜像上传到harbor仓库的方式》:本文主要介绍java实现docker镜像上传到harbor仓库的方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 前 言2. 编写工具类2.1 引入依赖包2.2 使用当前服务器的docker环境推送镜像2.2

C++20管道运算符的实现示例

《C++20管道运算符的实现示例》本文简要介绍C++20管道运算符的使用与实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录标准库的管道运算符使用自己实现类似的管道运算符我们不打算介绍太多,因为它实际属于c++20最为重要的

Java easyExcel实现导入多sheet的Excel

《JavaeasyExcel实现导入多sheet的Excel》这篇文章主要为大家详细介绍了如何使用JavaeasyExcel实现导入多sheet的Excel,文中的示例代码讲解详细,感兴趣的小伙伴可... 目录1.官网2.Excel样式3.代码1.官网easyExcel官网2.Excel样式3.代码

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文

浏览器插件cursor实现自动注册、续杯的详细过程

《浏览器插件cursor实现自动注册、续杯的详细过程》Cursor简易注册助手脚本通过自动化邮箱填写和验证码获取流程,大大简化了Cursor的注册过程,它不仅提高了注册效率,还通过友好的用户界面和详细... 目录前言功能概述使用方法安装脚本使用流程邮箱输入页面验证码页面实战演示技术实现核心功能实现1. 随机

Golang如何对cron进行二次封装实现指定时间执行定时任务

《Golang如何对cron进行二次封装实现指定时间执行定时任务》:本文主要介绍Golang如何对cron进行二次封装实现指定时间执行定时任务问题,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录背景cron库下载代码示例【1】结构体定义【2】定时任务开启【3】使用示例【4】控制台输出总结背景

Golang如何用gorm实现分页的功能

《Golang如何用gorm实现分页的功能》:本文主要介绍Golang如何用gorm实现分页的功能方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录背景go库下载初始化数据【1】建表【2】插入数据【3】查看数据4、代码示例【1】gorm结构体定义【2】分页结构体