本文主要是介绍使用 Kubernetes 部署 Flink 应用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
概述
编译并打包 Flink 脚本 Jar 文件;
构建 Docker 容器镜像,添加 Flink 运行时库和上述 Jar 包;
使用 Kubernetes Job 部署 Flink JobManager 组件;
使用 Kubernetes Service 将 JobManager 服务端口开放到集群中;
使用 Kubernetes Deployment 部署 Flink TaskManager;
配置 Flink JobManager 高可用,需使用 ZooKeeper 和 HDFS;
借助 Flink SavePoint 机制来停止和恢复脚本。
Kubernetes 实验环境
安装 VirtualBox,Minikube 将在虚拟机中启动 K8s 集群;
下载 Minikube 程序,权限修改为可运行,并加入到 PATH 环境变量中;
执行 minikube start,该命令会下载虚拟机镜像,安装 kubelet 和 kubeadm 程序,并构建一个完整的 K8s 集群。如果你在访问网络时遇到问题,可以配置一个代理,并告知 Minikube 使用它;
下载并安装 kubectl 程序,Minikube 已经将该命令指向虚拟机中的 K8s 集群了,所以可以直接运行 kubectl get pods -A 来显示当前正在运行的 K8s Pods:
NAMESPACE NAME READY STATUS RESTARTS AGE
kube-system kube-apiserver-minikube 1/1 Running 0 16m
kube-system etcd-minikube 1/1 Running 0 15m
kube-system coredns-5c98db65d4-d4t2h 1/1 Running 0 17m
Flink实时处理脚本示例
DataStream<Tuple2<String, Integer>> dataStream = env .socketTextStream("192.168.99.1", 9999) .flatMap(new Splitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1); dataStream.print();
构建 Docker 容器镜像
将 OpenJDK 1.8 作为基础镜像;
下载并安装 Flink 至 /opt/flink 目录中;
添加 flink 用户和组;
指定入口文件,不过我们会在 K8s 配置中覆盖此项。
FROM openjdk:8-jre
ENV FLINK_HOME=/opt/flink
WORKDIR $FLINK_HOME
RUN useradd flink && \ wget -O flink.tgz "$FLINK_TGZ_URL" && \ tar -xf flink.tgz
ENTRYPOINT ["/docker-entrypoint.sh"]
FROM flink:1.8.1-scala_2.12
ARG hadoop_jar
ARG job_jar
COPY --chown=flink:flink $hadoop_jar $job_jar $FLINK_HOME/lib/
USER flink
$ brew install docker
$ eval $(minikube docker-env)
$ cd /path/to/Dockerfile
$ cp /path/to/flink-shaded-hadoop-2-uber-2.8.3-7.0.jar hadoop.jar
$ cp /path/to/flink-on-kubernetes-0.0.1-SNAPSHOT-jar-with-dependencies.jar job.jar
$ docker build --build-arg hadoop_jar=hadoop.jar --build-arg job_jar=job.jar --tag flink-on-kubernetes:0.0.1 .
$ docker image ls
REPOSITORY TAG IMAGE ID CREATED SIZE
flink-on-kubernetes 0.0.1 505d2f11cc57 10 seconds ago 618MB
部署 JobManager
apiVersion: batch/v1
kind: Job
metadata: name: ${JOB}-jobmanager
spec: template: metadata: labels: app: flink instance: ${JOB}-jobmanager spec: restartPolicy: OnFailure containers: - name: jobmanager image: flink-on-kubernetes:0.0.1 command: ["/opt/flink/bin/standalone-job.sh"] args: ["start-foreground", "-Djobmanager.rpc.address=${JOB}-jobmanager", "-Dparallelism.default=1", "-Dblob.server.port=6124", "-Dqueryable-state.server.ports=6125"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob - containerPort: 6125 name: query - containerPort: 8081 name: ui
${JOB} 变量可以使用 envsubst 命令来替换,这样同一份配置文件就能够为多个脚本使用了;
容器的入口修改为了 standalone-job.sh,这是 Flink 的官方脚本,会以前台模式启动 JobManager,扫描类加载路径中的 Main-Class 作为脚本入口,我们也可以使用 -j 参数来指定完整的类名。之后,这个脚本会被自动提交到集群中。
JobManager 的 RPC 地址修改为了 Kubernetes Service 的名称,我们将在下文创建。集群中的其他组件将通过这个名称来访问 JobManager。
Flink Blob Server & Queryable State Server 的端口号默认是随机的,为了方便将其开放到集群中,我们修改为了固定端口。
kubectl
命令创建对象,并查看状态: $ export JOB=flink-on-kubernetes
$ envsubst <jobmanager.yml | kubectl create -f -
$ kubectl get pod
NAME READY STATUS RESTARTS AGE
flink-on-kubernetes-jobmanager-kc4kq 1/1 Running 0 2m26s
service.yml
apiVersion: v1
kind: Service
metadata: name: ${JOB}-jobmanager
spec: selector: app: flink instance: ${JOB}-jobmanager type: NodePort ports: - name: rpc port: 6123 - name: blob port: 6124 - name: query port: 6125 - name: ui port: 8081
type: NodePort
是必要的,因为通过这项配置,我们可以在 K8s 集群之外访问 JobManager UI 和 RESTful API。 $ envsubst <service.yml | kubectl create -f -
$ kubectl get service
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
flink-on-kubernetes-jobmanager NodePort 10.109.78.143 <none> 6123:31476/TCP,6124:32268/TCP,6125:31602/TCP,8081:31254/TCP 15m
$ minikube service $JOB-jobmanager --urlhttp://192.168.99.108:31476http://192.168.99.108:32268http://192.168.99.108:31602http://192.168.99.108:31254
部署 TaskManager
apiVersion: apps/v1
kind: Deployment
metadata: name: ${JOB}-taskmanager
spec: selector: matchLabels: app: flink instance: ${JOB}-taskmanager replicas: 1 template: metadata: labels: app: flink instance: ${JOB}-taskmanager spec: containers: - name: taskmanager image: flink-on-kubernetes:0.0.1 command: ["/opt/flink/bin/taskmanager.sh"] args: ["start-foreground", "-Djobmanager.rpc.address=${JOB}-jobmanager"]
$ nc -lk 9999
hello world
hello flink
$ kubectl logs -f -l instance=$JOB-taskmanager
(hello,2)
(flink,1)
(world,1)
开启高可用模式
command: ["/opt/flink/bin/standalone-job.sh"]
args: ["start-foreground", "-Djobmanager.rpc.address=${JOB}-jobmanager", "-Dparallelism.default=1", "-Dblob.server.port=6124", "-Dqueryable-state.server.ports=6125", "-Dhigh-availability=zookeeper", "-Dhigh-availability.zookeeper.quorum=192.168.99.1:2181", "-Dhigh-availability.zookeeper.path.root=/flink", "-Dhigh-availability.cluster-id=/${JOB}", "-Dhigh-availability.storageDir=hdfs://192.168.99.1:9000/flink/recovery", "-Dhigh-availability.jobmanager.port=6123", ]
command: ["/opt/flink/bin/taskmanager.sh"]
args: ["start-foreground", "-Dhigh-availability=zookeeper", "-Dhigh-availability.zookeeper.quorum=192.168.99.1:2181", "-Dhigh-availability.zookeeper.path.root=/flink", "-Dhigh-availability.cluster-id=/${JOB}", "-Dhigh-availability.storageDir=hdfs://192.168.99.1:9000/flink/recovery", ]
准备好 ZooKeeper 和 HDFS 测试环境,该配置中使用的是宿主机上的 2181 和 9000 端口;
Flink 集群基本信息会存储在 ZooKeeper 的 /flink/${JOB} 目录下;
Checkpoint 数据会存储在 HDFS 的 /flink/recovery 目录下。使用前,请先确保 Flink 有权限访问 HDFS 的 /flink 目录;
jobmanager.rpc.address 选项从 TaskManager 的启动命令中去除了,是因为在 HA 模式下,TaskManager 会通过访问 ZooKeeper 来获取到当前 JobManager 的连接信息。需要注意的是,HA 模式下的 JobManager RPC 端口默认是随机的,我们需要使用 high-availability.jobmanager.port 配置项将其固定下来,方便在 K8s Service 中开放。
管理Flink脚本
-m
参数来指定目标集群: $ bin/flink list -m 192.168.99.108:30206
------------------ Running/Restarting Jobs -------------------
24.08.2019 12:50:28 : 00000000000000000000000000000000 : Window WordCount (RUNNING)
--------------------------------------------------------------
00000000000000000000000000000000
,我们可以使用这个 ID 来手工停止脚本,并生成一个 SavePoint 快照: $ bin/flink cancel -m 192.168.99.108:30206 -s hdfs://192.168.99.1:9000/flink/savepoints/ 00000000000000000000000000000000
Cancelled job 00000000000000000000000000000000. Savepoint stored in hdfs://192.168.99.1:9000/flink/savepoints/savepoint-000000-f776c8e50a0c.
$ kubectl get job
NAME COMPLETIONS DURATION AGE
flink-on-kubernetes-jobmanager 1/1 4m40s 7m14s
$ kubectl delete job $JOB-jobmanager
$ kubectl delete deployment $JOB-taskmanager
--fromSavepoint
参数: command: ["/opt/flink/bin/standalone-job.sh"]
args: ["start-foreground", ... "--fromSavepoint", "${SAVEPOINT}", ]
$ export SAVEPOINT=hdfs://192.168.99.1:9000/flink/savepoints/savepoint-000000-f776c8e50a0c
$ envsubst <jobmanager-savepoint.yml | kubectl create -f -
扩容
command: ["/opt/flink/bin/standalone-job.sh"]
args: ["start-foreground", ... "-Dstate.savepoints.dir=hdfs://192.168.99.1:9000/flink/savepoints/", ]
kubectl scale
命令调整 TaskManager 的个数; $ kubectl scale --replicas=2 deployment/$JOB-taskmanager
deployment.extensions/flink-on-kubernetes-taskmanager scaled
flink modify
调整脚本并发度: $ bin/flink modify 755877434b676ce9dae5cfb533ed7f33 -m 192.168.99.108:30206 -p 2
Modify job 755877434b676ce9dae5cfb533ed7f33.
Rescaled job 755877434b676ce9dae5cfb533ed7f33. Its new parallelism is 2.
flink modify
命令来对 HA 模式下的 Flink 集群进行扩容,因此还请使用人工的方式操作。 Flink 将原生支持 Kubernetes
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html
https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/
https://jobs.zalando.com/tech/blog/running-apache-flink-on-kubernetes/
https://www.slideshare.net/tillrohrmann/redesigning-apache-flinks-distributed-architecture-flink-forward-2017
https://www.slideshare.net/tillrohrmann/future-of-apache-flink-deployments-containers-kubernetes-and-more-flink-forward-2019-sf
文章不错?点个【在看】吧! ?
这篇关于使用 Kubernetes 部署 Flink 应用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!