本文主要是介绍在k8s上搭建kfk-zk集群(2),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
在k8s上搭建kfk-zk集群(2)
这篇接zk的部署方式,整体结构是一样的,包括StorageClass、PV、HeadlessService、StatefulSet,不同的是kfk本身的配置,k8s官方没有k8s的部署教程,我在网上找到一个在k8s上部署kfk的帖子,借用其kfk的模版。
StorageClass我们已经创建,直接使用local-class。
root@hw1:~/zk# kubectl get storageclass
NAME PROVISIONER AGE
local-class kubernetes.io/no-provisioner 18h
上次仅仅创建了三个PVs,不够用,这里我们为kfk也创建三个PVs,kubectl create -f pv-kfk.yaml
。
kind: PersistentVolume
apiVersion: v1
metadata:name: datadir-kfk-1labels:type: local
spec:storageClassName: local-classcapacity:storage: 5GiaccessModes:- ReadWriteOncehostPath:path: "/mnt/data11"
---
kind: PersistentVolume
apiVersion: v1
metadata:name: datadir-kfk-2labels:type: local
spec:storageClassName: local-classcapacity:storage: 5GiaccessModes:- ReadWriteOncehostPath:path: "/mnt/data22"
---
kind: PersistentVolume
apiVersion: v1
metadata:name: datadir-kfk-3labels:type: local
spec:storageClassName: local-classcapacity:storage: 5GiaccessModes:- ReadWriteOncehostPath:path: "/mnt/data33"
kfk模板借用Kafka on Kubernetes Deploy a highly available Kafka cluster on Kubernetes.中kfk部分,修改kfk的spec.volumeClaimTemplates.spec.storageClassName为之前的local-class,以及kfk连接zk的参数 KAFKA_ZOOKEEPER_CONNECT为zk-cs:2181。
kubectl create -f kfk.yaml
。
apiVersion: v1
kind: Service
metadata:name: kafka
spec:ports:- name: brokerport: 9092protocol: TCPtargetPort: kafkaselector:app: kafkasessionAffinity: Nonetype: ClusterIP
---
apiVersion: v1
kind: Service
metadata:name: kafka-headless
spec:clusterIP: Noneports:- name: brokerport: 9092protocol: TCPtargetPort: 9092selector:app: kafkasessionAffinity: Nonetype: ClusterIP
---
apiVersion: apps/v1
kind: StatefulSet
metadata:labels:app: kafkaname: kafka
spec:podManagementPolicy: OrderedReadyreplicas: 3revisionHistoryLimit: 1selector:matchLabels:app: kafkaserviceName: kafka-headlesstemplate:metadata:labels:app: kafkaspec:containers:- command:- sh- -exc- |unset KAFKA_PORT && \export KAFKA_BROKER_ID=${HOSTNAME##*-} && \export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${POD_IP}:9092 && \exec /etc/confluent/docker/runenv:- name: POD_IPvalueFrom:fieldRef:apiVersion: v1fieldPath: status.podIP- name: KAFKA_HEAP_OPTSvalue: -Xmx1G -Xms1G- name: KAFKA_ZOOKEEPER_CONNECTvalue: zk-cs:2181- name: KAFKA_LOG_DIRSvalue: /opt/kafka/data/logs- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTORvalue: "3"- name: KAFKA_JMX_PORTvalue: "5555"image: confluentinc/cp-kafka:4.1.2-2imagePullPolicy: IfNotPresentlivenessProbe:exec:command:- sh- -ec- /usr/bin/jps | /bin/grep -q SupportedKafkafailureThreshold: 3initialDelaySeconds: 30periodSeconds: 10successThreshold: 1timeoutSeconds: 5name: kafka-brokerports:- containerPort: 9092name: kafkaprotocol: TCPreadinessProbe:failureThreshold: 3initialDelaySeconds: 30periodSeconds: 10successThreshold: 1tcpSocket:port: kafkatimeoutSeconds: 5resources: {}terminationMessagePath: /dev/termination-logterminationMessagePolicy: FilevolumeMounts:- mountPath: /opt/kafka/dataname: datadirdnsPolicy: ClusterFirstrestartPolicy: AlwaysschedulerName: default-schedulersecurityContext: {}terminationGracePeriodSeconds: 60updateStrategy:type: OnDeletevolumeClaimTemplates:- metadata:name: datadirspec:accessModes:- ReadWriteOncestorageClassName: "local-class"resources:requests:storage: 1Gi
查看部署情况
root@hw1:~/zk# kubectl get pods
NAME READY STATUS RESTARTS AGE
busybox 1/1 Running 20 20h
kafka-0 1/1 Running 9 18h
kafka-1 1/1 Running 0 18h
kafka-2 1/1 Running 0 18h
zk-0 1/1 Running 0 18h
zk-1 1/1 Running 0 18h
zk-2 1/1 Running 0 18h
root@hw1:~/zk# kubectl get pvc
NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
datadir-kafka-0 Bound datadir-kfk-1 5Gi RWO local-class 18h
datadir-kafka-1 Bound datadir-kfk-2 5Gi RWO local-class 18h
datadir-kafka-2 Bound datadir-kfk-3 5Gi RWO local-class 18h
datadir-zk-0 Bound datadir1 5Gi RWO local-class 18h
datadir-zk-1 Bound datadir2 5Gi RWO local-class 18h
datadir-zk-2 Bound datadir3 5Gi RWO local-class 18h
root@hw1:~/zk# kubectl get pv
NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS REASON AGE
datadir-kfk-1 5Gi RWO Retain Bound default/datadir-kafka-0 local-class 18h
datadir-kfk-2 5Gi RWO Retain Bound default/datadir-kafka-1 local-class 18h
datadir-kfk-3 5Gi RWO Retain Bound default/datadir-kafka-2 local-class 18h
datadir1 5Gi RWO Retain Bound default/datadir-zk-0 local-class 18h
datadir2 5Gi RWO Retain Bound default/datadir-zk-1 local-class 18h
datadir3 5Gi RWO Retain Bound default/datadir-zk-2 local-class 18h
root@hw1:~/zk#
接着,测试kfk是否连接上zk,能够正常的生产消费,同样借用上述老哥的kfk-test-pod,kubectl create -f pod-test.yaml
。
apiVersion: v1
kind: Pod
metadata:name: kafka-test-client
spec:containers:- command:- sh- -c- exec tail -f /dev/nullimage: confluentinc/cp-kafka:4.1.2-2imagePullPolicy: IfNotPresentname: kafkaresources: {}terminationMessagePath: /dev/termination-logterminationMessagePolicy: File
查看topic,并创建test topic。
root@hw1:~/zk# kubectl exec kafka-test-client -- /usr/bin/kafka-topics --zookeeper zk-cs:2181 --list
__confluent.support.metrics
root@hw1:~/zk# kubectl exec kafka-test-client -- /usr/bin/kafka-topics --zookeeper zk-cs:2181 --topic test --create --partitions 1 --replication-factor 1
Created topic "test".
启动在topic=test的生产者进程,并写入hello world消息。
root@hw1:~# kubectl exec -it kafka-test-client -- /usr/bin/kafka-console-producer --broker-list kafka:9092 --topic test
>hello
>world
>
启动消费者,查看topic=test上的消息。
root@hw1:~# kubectl exec kafka-test-client -- /usr/bin/kafka-console-consumer --bootstrap-server kafka:9092 --topic test --from-beginninghello
world
至此,zk/kfk都已经部署完成。
参考资料:
- StatefulSets
- Storage Classes
- Kafka on Kubernetes Deploy a highly available Kafka cluster on Kubernetes.
- 也欢迎去我的github page访问,获取笔记及k8s模板
接下来,我们要利用zk/kfk集群,部署fabric with kafka consensus。
这篇关于在k8s上搭建kfk-zk集群(2)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!