kafka 实现worldcount

2024-08-22 22:32
文章标签 kafka worldcount 实现

本文主要是介绍kafka 实现worldcount,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章地址:http://www.haha174.top/article/details/259309
官网地址:http://kafka.apache.org/10/documentation/streams/quickstart
Kafka Streams是一个用来构建流处理程序的库,特别是其输入是一个Kafka topic,输出是另一个Kafka topic的程序(或者是调用外部服务,或者是更新数据库,或者其它)。它使得你以一种分布式以及容错的方式来做这件事情。

例子如下:

 public static void main(String[] args) throws Exception {Properties props = new Properties();props.put("application.id", "streams-wordcount");props.put("bootstrap.servers", "localhost:9092");props.put("cache.max.bytes.buffering", 0);props.put("default.key.serde", Serdes.String().getClass().getName());props.put("default.value.serde", Serdes.String().getClass().getName());props.put("auto.offset.reset", "earliest");StreamsBuilder builder = new StreamsBuilder();KStream<String, String> source = builder.stream("streams-plaintext-input");KTable<String, Long> counts = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {public Iterable<String> apply(String value) {return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));}}).groupBy(new KeyValueMapper<String, String, String>() {public String apply(String key, String value) {return value;}}).count();counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));final KafkaStreams streams = new KafkaStreams(builder.build(), props);final CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {public void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (Throwable var8) {System.exit(1);}System.exit(0);}

使用它只需要起一个streams-plaintext-input topic 的 生产者
启动一个 streams-wordcount-output的消费者
那到底有什么好处呢
简化点1: 不依赖框架的流处理
Kafka Streams使得构建流处理服务更简单的第一点就是:它不依赖于集群和框架,它只是一个库(而且是挺小的一个库)。你只需要Kafka和你自己的代码。Kafka会协调你的程序代码,使得它们可以处理故障,在不同程序实例间分发负载,在新的程序实例加入时重新对负载进行平衡。

我下面会讲一下为什么我认为这是很重要的,以及我们之前的一点经历,来帮助理解这个模型的重要性。

治愈MapReduce的宿醉
我前边讲到我们构造Apache Samza的经历,以及人们实际想要的(简单的流服务)和我们构建的东西(实时的MapReduce)之间的距离。我认为这种概念的错位是普遍的,毕竟流处理做的很多事情是从批处理世界中接管一些能力,用于低延迟的领域。同样的MapReduce遗产影响了其它主流的流处理平台(Storm, Spark等),就像它们对Samza的影响一样。

在LinkedIn在很多生产数据的处理服务是属于低延迟领域的:email, 用户生成的内容,新消息反馈等。其它的很多公司也应该有类似的异步服务,比如零售业需要给商品排序、重新定价,然后卖出,对于金融公司,实时数据更是核心。大部分这些业务,都是异步的,对于渲染页面或者更新移动app的屏幕就不会有这样的问题(这些是同步的)。

那么为什么在Storm, Samza, Spark Streaming这样的流处理框架之上构建这样的核心应用这么繁琐呢?

一个批处理框架,像是MapReduce或者Spark需要解决一些困难的问题:

它必须在一个机器池之上管理很多短期任务,并且在集群中有效地调度资源分配
为了做到这点,它必须动态地把你的代码、配置、依赖的库以及其它所有需要的东西,打包并且物理地布署到将要执行它的机器上。
它必须管理进程,并且实现共享集群的不同任务之间的隔离。
不幸的是,为了解决这些问题,框架就得变得很有侵入性。为了做到容错和扩展,框架得控制你的程序如何布署、配置、监控和打包。

那么,Kafka Streams有什么不同呢?

Kafka Streams对它想要解决的问题要更关注得多。它做了以下的事情:

当你的程序的新的实例加入,或已经有实例退出时,它会重新平衡要处理的负载
维护表的本地状态
从错误中恢复
它使用了Kafka为普通的consumer所提供的同样的组管理协议(group manager protocol)来实现。Kafka Streams可以有一些本地的状态,存储在磁盘上,但是它只是一个缓存。如果这个缓存丢失了,或者这个程序实例被转移到了别的地方,这个本地状态是可以被重建的。你可以把Kafka Streams这个库用在你的程序里,然后启动任意数量的你想要程序实例,Kafka将会把它们进行分区,并且在这些实例间进行负载的平衡。

这对于实现像滚动重启(rolling restart)或者无宕机时间的扩展(no-downtime expansion)这样简单的事情是很重要的。在现代的软件工程中,我们把这样的功能看做是应该的,但是很多流处理框架却做不到这点。

Dockers, Mesos, 以及Kurbernetes, 我的天哪!
从流处理框架中分离出打包和布署的原因是,打包和布署这个领域本身就正在进行自身的复兴。Kafka Streams可以使用经典的老实巴交维工具,像是Puppet, Chef, Salt来布署,把可以从命令行来启动。如果你年轻,时髦,你也可以把你的程序做成Dock镜像;或者你不是这样的人,那么你可以用WAR文件。

但是,对于寻找更加有灵活的管理方式的人,有很多框架的目标就是让程序更加灵活。这里列了一部分:

Apache Mesos with a framework like Marathon
Kubernetes
YARN with something like Slider
Swarm from Docker
Various hosted container services such as ECS from Amazon
Cloud Foundry
这个生态系统就和流处理生态一样专注。

的确,Mesos和Kubernets想要解决的问题是把进程分布到很多机器上,这也是当你布署一个Storm任务到Storm集群时,Storm尝试解决的问题。关键在于,这个问题最终被发现是挺难的,而这些通用的框架,至少是其中优秀的那些,会比其它的做得好得多-它们具有执行像在保持并行度的情况下重启、对主机的粘性(sticky host affinity)、真正的基于cgroup的隔离、用docker打包、花哨的UI等等功能。

你可以在这些框架里的任何一种里使用Kafka Streams,就像你会对其它程序做的一样,这是用来实现动态和有弹性的进程管理的一种简单的方式。比如,如果你有Mesos和Marathon,你可以使用Marathon UI直接启动你的Kafka Streams程序,然后动态地扩展它,而不会有服务中断, Meos会管理好进程,Kafka会管理和负载匀衡以及维护你的任务进程的状态。

使用一种这些的框架的开销是和使用Storm这样的框架的集群管理部分是一样的,但是优点是所有这些框架都是可选的(当然,Kafka Streams没有了它们也可以很好的工作)。

简化点2:Streams Meet Tables
Kafka Strems用于简化处理程序的另一个关键方式是把“表”和”流“这两个概念紧密地结合在一起。我们在之前的”turning the database inside out”中简化这个想法。那句话抓住了作为结果的系统是如何重铸程序和它的数据之彰的关系以及它是怎么应于数据变化,这样的要点。为了理想这些,我会回顾一下,解释我对于”table”和”stream”的定义,以及把二者结合在一起如何能够简化常见的异步程序。

传统的数据库都是关于在表格中存储状态的。当需要对事件流进行反应时,传统数据库做得并不好。什么是事件呢?事件只是一些已经发生了的事-可以是一个点击、一次出售、源自某个传感器的一个动态,或者抽象成任何这个世界上发生的事情。

像Storm一样的流处理程序,是从这个等式的另一端出发的。它们被设计用于处理事件流,但是基于流来产生状态却是后面才加进来的。

我认为异步程序的基本问题是把代表当前世界状态的tables与代表正在发生事件的event streams结合在一起。框架需要处理好如何表示它们,以及如何在它们之间进行转化。

为什么说这些概念是相关的呢?我们举一个零售商的简单例子。对于零售商而言,核心的事件流是卖出商品、订购新商品以及接收订购的商品。“库存表”是一个基于当前的存货量,通过售出和接收流进行加减的“表”。对于零售商而言两个关键的流处理动作是当库存开始降低时订购商品,以及根据供需关系调整商品价格。

这篇关于kafka 实现worldcount的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1097577

相关文章

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

让树莓派智能语音助手实现定时提醒功能

最初的时候是想直接在rasa 的chatbot上实现,因为rasa本身是带有remindschedule模块的。不过经过一番折腾后,忽然发现,chatbot上实现的定时,语音助手不一定会有响应。因为,我目前语音助手的代码设置了长时间无应答会结束对话,这样一来,chatbot定时提醒的触发就不会被语音助手获悉。那怎么让语音助手也具有定时提醒功能呢? 我最后选择的方法是用threading.Time

Android实现任意版本设置默认的锁屏壁纸和桌面壁纸(两张壁纸可不一致)

客户有些需求需要设置默认壁纸和锁屏壁纸  在默认情况下 这两个壁纸是相同的  如果需要默认的锁屏壁纸和桌面壁纸不一样 需要额外修改 Android13实现 替换默认桌面壁纸: 将图片文件替换frameworks/base/core/res/res/drawable-nodpi/default_wallpaper.*  (注意不能是bmp格式) 替换默认锁屏壁纸: 将图片资源放入vendo

C#实战|大乐透选号器[6]:实现实时显示已选择的红蓝球数量

哈喽,你好啊,我是雷工。 关于大乐透选号器在前面已经记录了5篇笔记,这是第6篇; 接下来实现实时显示当前选中红球数量,蓝球数量; 以下为练习笔记。 01 效果演示 当选择和取消选择红球或蓝球时,在对应的位置显示实时已选择的红球、蓝球的数量; 02 标签名称 分别设置Label标签名称为:lblRedCount、lblBlueCount

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

Kubernetes PodSecurityPolicy:PSP能实现的5种主要安全策略

Kubernetes PodSecurityPolicy:PSP能实现的5种主要安全策略 1. 特权模式限制2. 宿主机资源隔离3. 用户和组管理4. 权限提升控制5. SELinux配置 💖The Begin💖点点关注,收藏不迷路💖 Kubernetes的PodSecurityPolicy(PSP)是一个关键的安全特性,它在Pod创建之前实施安全策略,确保P

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

C++——stack、queue的实现及deque的介绍

目录 1.stack与queue的实现 1.1stack的实现  1.2 queue的实现 2.重温vector、list、stack、queue的介绍 2.1 STL标准库中stack和queue的底层结构  3.deque的简单介绍 3.1为什么选择deque作为stack和queue的底层默认容器  3.2 STL中对stack与queue的模拟实现 ①stack模拟实现