alpakka-kafka(1)-producer

2024-04-09 04:32
文章标签 kafka producer alpakka

本文主要是介绍alpakka-kafka(1)-producer,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

alpakka项目是一个基于akka-streams流处理编程工具的scala/java开源项目,通过提供connector连接各种数据源并在akka-streams里进行数据处理。alpakka-kafka就是alpakka项目里的kafka-connector。对于我们来说:可以用alpakka-kafka来对接kafka,使用kafka提供的功能。或者从另外一个角度讲:alpakka-kafka就是一个用akka-streams实现kafka功能的scala开发工具。

alpakka-kafka提供了kafka的核心功能:producer、consumer,分别负责把akka-streams里的数据写入kafka及从kafka中读出数据并输入到akka-streams里。用akka-streams集成kafka的应用场景通常出现在业务集成方面:在一项业务A中产生一些业务操作指令写入kafka,然后通过kafka把指令传送给另一项业务B,业务B从kafka中获取操作指令并进行相应的业务操作。如:有两个业务模块:收货管理和库存管理,一方面收货管理向kafka写入收货记录。另一头库存管理从kafka中读取收货记录并更新相关库存数量记录。注意,这两项业务是分别操作的。在alpakka中,实际的业务操作基本就是在akka-streams里的数据处理(transform),其实是典型的CQRS模式:读写两方互不关联,写时不管受众是谁,如何使用、读者不关心谁是写方。这里的写和读两方分别代表kafka里的producer和consumer。

本篇我们先介绍alpakka-kafka的producer功能及其使用方法。如前所述:alpakka是用akka-streams实现了kafka-producer功能。alpakka提供的producer也就是akka-streams的一种组件,可以与其它的akka-streams组件组合形成更大的akka-streams个体。构建一个producer需要先完成几个配件类构成:

1、producer-settings配置:alpakka-kafka在reference.conf里的akka.kafka.producer配置段落提供了足够支持基本运作的默认producer配置。用户可以通过typesafe config配置文件操作工具来灵活调整配置

2、de/serializer序列化工具:alpakka-kafka提供了String类型的序列化/反序列化函数,可以直接使用

4、bootstrap-server:一个以逗号分隔的kafka-cluster节点ip清单文本

下面是一个具体的例子:

  implicit val system = ActorSystem("kafka_sys")val bootstrapServers = "localhost:9092"val config = system.settings.config.getConfig("akka.kafka.producer")val producerSettings =ProducerSettings(config, new StringSerializer, new StringSerializer).withBootstrapServers(bootstrapServers)

这里使用ActorSystem只是为了读取.conf文件里的配置,还没有使用任何akka-streams组件。akka.kafka.producer配置段落在alpakka-kafka的reference.conf里提供了默认配置,不需要在application.conf里重新定义。

alpakka-kafka提供了一个最基本的producer,非akka-streams组件,sendProducer。下面我们示范一下sendProducer的使用和效果:

import akka.actor.ActorSystem
import akka.kafka.scaladsl.{Consumer, SendProducer}
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
import akka.kafka._
import org.apache.kafka.common.serialization._
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}object SendProducerDemo extends App {implicit val system = ActorSystem("kafka_sys")implicit val executionContext = system.dispatcherval bootstrapServers = "localhost:9092"val config = system.settings.config.getConfig("akka.kafka.producer")val producerSettings =ProducerSettings(config, new StringSerializer, new StringSerializer).withBootstrapServers(bootstrapServers)val producer = SendProducer(producerSettings)val topic = "greatings"val lstfut: Seq[Future[RecordMetadata]] =(100 to 200).reverse.map(_.toString).map(value => new ProducerRecord[String, String](topic, s"hello-$value")).map(msg => producer.send(msg))val futlst = Future.sequence(lstfut)Await.result(futlst, 2.seconds)scala.io.StdIn.readLine()producer.close()system.terminate()
}

以上示范用sendProducer向kafka写入100条hello消息。使用的是集合遍历,没有使用akka-streams的Source。为了检验具体效果,我们可以使用kafka提供的一些手工指令,如下:

\w> ./kafka-topics --create --topic greatings --bootstrap-server localhost:9092
Created topic greatings.
\w> ./kafka-console-consumer --topic greatings  --bootstrap-server localhost:9092
hello-100
hello-101
hello-102
hello-103
hello-104
hello-105
hello-106
...

既然producer代表写入功能,那么在akka-streams里就是Sink或Flow组件的功能了。下面这个例子是producer Sink组件plainSink的示范:

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.scaladsl._
import akka.kafka._
import akka.stream.scaladsl._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization._import scala.concurrent._
import scala.concurrent.duration._object plain_sink extends App {implicit val system = ActorSystem("kafka_sys")val bootstrapServers = "localhost:9092"val config = system.settings.config.getConfig("akka.kafka.producer")val producerSettings =ProducerSettings(config, new StringSerializer, new StringSerializer).withBootstrapServers(bootstrapServers)implicit val executionContext = system.dispatcherval topic = "greatings"val done: Future[Done] =Source(1 to 100).map(_.toString).map(value => new ProducerRecord[String, String](topic, s"hello-$value")).runWith(Producer.plainSink(producerSettings))Await.ready(done,3.seconds)scala.io.StdIn.readLine()system.terminate()
}

这是一个典型的akka-streams应用实例,其中Producer.plainSink就是一个akka-streams Sink组件。

以上两个示范都涉及到构建一个ProducerRecord类型并将之写入kafka。ProducerRecord是一个基本的kafka消息类型:

   public ProducerRecord(String topic, K key, V value) {this(topic, null, null, key, value, null);}

topic是String类型,key, value 是 Any 类型的。 alpakka-kafka在ProducerRecord之上又拓展了一个复杂点的消息类型ProducerMessage.Envelope类型:

sealed trait Envelope[K, V, +PassThrough] {def passThrough: PassThroughdef withPassThrough[PassThrough2](value: PassThrough2): Envelope[K, V, PassThrough2]}final case class Message[K, V, +PassThrough](record: ProducerRecord[K, V],passThrough: PassThrough) extends Envelope[K, V, PassThrough] {override def withPassThrough[PassThrough2](value: PassThrough2): Message[K, V, PassThrough2] =copy(passThrough = value)}

ProducerMessage.Envelope增加了个PassThrough参数,用来与消息一道传递额外的元数据。alpakka-kafka streams组件使用这个消息类型作为流元素,最终把它转换成一或多条ProducerRecord写入kafka。如下:

object EventMessages {
//一对一条ProducerRecorddef createMessage[KeyType,ValueType,PassThroughType](topic: String,key: KeyType,value: ValueType,passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = {val single = ProducerMessage.single(new ProducerRecord[KeyType,ValueType](topic,key,value),passThrough)single}
//一对多条ProducerRecorddef createMultiMessage[KeyType,ValueType,PassThroughType] (topics: List[String],key: KeyType,value: ValueType,passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = {import scala.collection.immutableval msgs = topics.map { topic =>new ProducerRecord(topic,key,value)}.toSeqval multi = ProducerMessage.multi(msgs,passThrough)multi}
//只传递通过型元数据def createPassThroughMessage[KeyType,ValueType,PassThroughType](topic: String,key: KeyType,value: ValueType,passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = {ProducerMessage.passThrough(passThrough)}}

flexiFlow是一个alpakka-kafka Flow组件,流入ProducerMessage.Evelope,流出Results类型:

  def flexiFlow[K, V, PassThrough](settings: ProducerSettings[K, V]): Flow[Envelope[K, V, PassThrough], Results[K, V, PassThrough], NotUsed] = { ... }

Results类型定义如下:

  final case class Result[K, V, PassThrough] private (metadata: RecordMetadata,message: Message[K, V, PassThrough]) extends Results[K, V, PassThrough] {def offset: Long = metadata.offset()def passThrough: PassThrough = message.passThrough}

也就是说flexiFlow可以返回写入kafka后kafka返回的操作状态数据。我们再看看flexiFlow的使用案例:

import akka.kafka.ProducerMessage._
import akka.actor.ActorSystem
import akka.kafka.scaladsl._
import akka.kafka.{ProducerMessage, ProducerSettings}
import akka.stream.scaladsl.{Sink, Source}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializerimport scala.concurrent._
import scala.concurrent.duration._object flexi_flow extends App {implicit val system = ActorSystem("kafka_sys")val bootstrapServers = "localhost:9092"val config = system.settings.config.getConfig("akka.kafka.producer")val producerSettings =ProducerSettings(config, new StringSerializer, new StringSerializer).withBootstrapServers(bootstrapServers)// needed for the future flatMap/onComplete in the endimplicit val executionContext = system.dispatcherval topic = "greatings"val done = Source(1 to 100).map { number =>val value = number.toStringEventMessages.createMessage(topic,"key",value,number)}.via(Producer.flexiFlow(producerSettings)).map {case ProducerMessage.Result(metadata, ProducerMessage.Message(record, passThrough)) =>s"${metadata.topic}/${metadata.partition} ${metadata.offset}: ${record.value}"case ProducerMessage.MultiResult(parts, passThrough) =>parts.map {case MultiResultPart(metadata, record) =>s"${metadata.topic}/${metadata.partition} ${metadata.offset}: ${record.value}"}.mkString(", ")case ProducerMessage.PassThroughResult(passThrough) =>s"passed through"}.runWith(Sink.foreach(println(_)))Await.ready(done,3.seconds)scala.io.StdIn.readLine()system.terminate()
}object EventMessages {def createMessage[KeyType,ValueType,PassThroughType](topic: String,key: KeyType,value: ValueType,passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = {val single = ProducerMessage.single(new ProducerRecord[KeyType,ValueType](topic,key,value),passThrough)single}def createMultiMessage[KeyType,ValueType,PassThroughType] (topics: List[String],key: KeyType,value: ValueType,passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = {import scala.collection.immutableval msgs = topics.map { topic =>new ProducerRecord(topic,key,value)}.toSeqval multi = ProducerMessage.multi(msgs,passThrough)multi}def createPassThroughMessage[KeyType,ValueType,PassThroughType](topic: String,key: KeyType,value: ValueType,passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = {ProducerMessage.passThrough(passThrough)}}

producer除向kafka写入与业务相关的业务事件或业务指令外还会向kafka写入当前消息读取的具体位置offset,所以alpakka-kafka的produce可分成两种类型:上面示范的plainSink, flexiFlow只向kafka写业务数据。还有一类如commitableSink还包括了把消息读取位置offset写入commit的功能。如下:

val control =Consumer.committableSource(consumerSettings, Subscriptions.topics(topic1, topic2)).map { msg =>ProducerMessage.single(new ProducerRecord(targetTopic, msg.record.key, msg.record.value),msg.committableOffset)}.toMat(Producer.committableSink(producerSettings, committerSettings))(DrainingControl.apply).run()control.drainAndShutdown()

如上所示,committableSource从kafka读取业务消息及读取位置committableOffsset,然后Producer.committableSink把业务消息和offset再写入kafka。

下篇讨论我们再具体介绍consumer。

 

这篇关于alpakka-kafka(1)-producer的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/887182

相关文章

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

Kafka拦截器的神奇操作方法

《Kafka拦截器的神奇操作方法》Kafka拦截器是一种强大的机制,用于在消息发送和接收过程中插入自定义逻辑,它们可以用于消息定制、日志记录、监控、业务逻辑集成、性能统计和异常处理等,本文介绍Kafk... 目录前言拦截器的基本概念Kafka 拦截器的定义和基本原理:拦截器是 Kafka 消息传递的不可或缺

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

IDEA中的Kafka管理神器详解

《IDEA中的Kafka管理神器详解》这款基于IDEA插件实现的Kafka管理工具,能够在本地IDE环境中直接运行,简化了设置流程,为开发者提供了更加紧密集成、高效且直观的Kafka操作体验... 目录免安装:IDEA中的Kafka管理神器!简介安装必要的插件创建 Kafka 连接第一步:创建连接第二步:选

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

Java消息队列:RabbitMQ与Kafka的集成与应用

Java消息队列:RabbitMQ与Kafka的集成与应用 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在现代的分布式系统中,消息队列是实现系统间通信、解耦和提高可扩展性的重要组件。RabbitMQ和Kafka是两个广泛使用的消息队列系统,它们各有特点和优势。本文将介绍如何在Java应用中集成RabbitMQ和Kafka,并展示它们的应用场景。 消息队

Kafka (快速)安装部署

文章目录 1、软件下载&配置环境1_JDK安装2_Zookeeper安装3_Kafka安装 2、单机安装1_配置主机名和IP映射2_单机Kafka配置 3、集群安装1_配置主机名和IP的映射关系2_时钟同步3_Zookeeper配置信息4_集群Kafka配置 4、kafka的其他脚本命令 1、软件下载&配置环境 下面的操作无论是单机部署还是分布式集群环境下都是通用的。 准

Kafka 分布式消息系统详细介绍

Kafka 分布式消息系统 一、Kafka 概述1.1 Kafka 定义1.2 Kafka 设计目标1.3 Kafka 特点 二、Kafka 架构设计2.1 基本架构2.2 Topic 和 Partition2.3 消费者和消费者组2.4 Replica 副本 三、Kafka 分布式集群搭建3.1 下载解压3.1.1 上传解压 3.2 修改 Kafka 配置文件3.2.1 修改zookeep

Kafka 实战演练:创建、配置与测试 Kafka全面教程

文章目录 1.配置文件2.消费者1.注解方式2.KafkaConsumer 3.依赖1.注解依赖2.KafkaConsumer依赖 本文档只是为了留档方便以后工作运维,或者给同事分享文档内容比较简陋命令也不是特别全,不适合小白观看,如有不懂可以私信,上班期间都是在得 1.配置文件 Yml配置 spring:kafka:bootstrap-servers: cons

Kafka【十二】消费者拉取主题分区的分配策略

【1】消费者组、leader和follower 消费者想要拉取主题分区的数据,首先必须要加入到一个组中。 但是一个组中有多个消费者的话,那么每一个消费者该如何消费呢,是不是像图中一样的消费策略呢?如果是的话,那假设消费者组中只有2个消费者或有4个消费者,和分区的数量不匹配,怎么办? 所以这里,我们需要学习Kafka中基本的消费者组中的消费者和分区之间的分配规则: 同一个消费者组的消费者都订