本文主要是介绍云服务器docker-compose部署kafka并编写ava使用kafka示例代码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
首先,你需要在云服务器上安装Docker和Docker Compose。然后,创建一个新的目录来存放你的Docker Compose配置文件。
在这个目录下创建一个名为docker-compose.yml
的文件,并将以下内容复制到文件中:
version: '3.7'services:zookeeper:image: wurstmeister/zookeeperports:- "2181:2181"kafka:image: wurstmeister/kafka:2.12-2.6.0depends_on:- zookeeperports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: kafkaKAFKA_LISTENERS: PLAINTEXT://${KAFKA_ADVERTISED_HOST_NAME}:9092KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_CREATE_TOPICS: "test_topic:1:1"
这个Docker Compose文件将会启动一个Zookeeper和一个Kafka容器。注意,我们在Kafka容器中设置了一个环境变量来指定Kafka的主机名。
保存并关闭文件后,在终端中使用以下命令启动Kafka和Zookeeper容器:
docker-compose up -d
等待一段时间直到容器启动完成。
接下来,你可以编写使用Kafka的Java示例代码。这里是一个简单的示例:
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaExample {private static final String TOPIC = "test_topic";private static final String BOOTSTRAP_SERVERS = "localhost:9092";public static void main(String[] args) {// 生产者示例Properties producerProps = new Properties();producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());Producer<String, String> producer = new KafkaProducer<>(producerProps);producer.send(new ProducerRecord<>(TOPIC, "Hello, Kafka!"));producer.close();// 消费者示例Properties consumerProps = new Properties();consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);consumer.subscribe(Collections.singletonList(TOPIC));ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: key = %s, value = %s\n", record.key(), record.value());}consumer.close();}
}
以上代码包括一个简单的Kafka生产者和一个消费者。它们使用的Kafka主题名称为test_topic
,Kafka的服务器地址为localhost:9092
。
你可以将以上代码保存到一个名为KafkaExample.java
的文件中,并使用以下命令进行编译和执行:
javac -cp kafka-clients-2.6.0.jar KafkaExample.java
java -cp kafka-clients-2.6.0.jar:. KafkaExample
请确保你已经下载并添加了kafka-clients-2.6.0.jar
到你的类路径中,以便在命令中使用。
通过以上步骤,你应该可以在云服务器上使用Docker Compose部署Kafka,并编写Java示例代码来使用Kafka。
这篇关于云服务器docker-compose部署kafka并编写ava使用kafka示例代码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!