动手搓一个kubernetes管理平台(5)-WebSocket和TTY

2024-01-23 15:28

本文主要是介绍动手搓一个kubernetes管理平台(5)-WebSocket和TTY,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

所有的kubernetes管理平台,都会用到TTY的功能,既通过前端直接进入到容器内部,这是一个交互式的操作,或者说是一个流式操作,简单的http协议肯定不能满足这个需求,使用websocket就能很好的满足这个需求。

用通俗的话来描述websocket, 其实就三点:

  1. 可以直接在浏览器里使用
  2. 支持双向通信
  3. 封装简单

既然决定了使用websocket作为前端进入容器的方式,那么可以看看后端是如何进入容器的。

后端封装

golang中,一般使用github.com/gorilla/websocket 对websocket进行封装。

package wsconnectimport ("errors""github.com/gorilla/websocket""net/http""sync"
)// http升级websocket协议的配置
var wsUpgrader = websocket.Upgrader{// 允许所有CORS跨域请求CheckOrigin: func(r *http.Request) bool {return true},
}// websocket消息
type WsMessage struct {MessageType intData        []byte
}// 封装websocket连接
type WsConnection struct {wsSocket *websocket.Conn // 底层websocketinChan   chan *WsMessage // 读取队列outChan  chan *WsMessage // 发送队列mutex     sync.Mutex // 避免重复关闭管道isClosed  boolcloseChan chan byte // 关闭通知
}// 读取协程
func (wsConn *WsConnection) wsReadLoop() {var (msgType intdata    []bytemsg     *WsMessageerr     error)for {// 读一个messageif msgType, data, err = wsConn.wsSocket.ReadMessage(); err != nil {goto ERROR}msg = &WsMessage{msgType,data,}// 放入请求队列select {case wsConn.inChan <- msg:case <-wsConn.closeChan:goto CLOSED}}
ERROR:wsConn.WsClose()
CLOSED:
}// 发送协程
func (wsConn *WsConnection) wsWriteLoop() {var (msg *WsMessageerr error)for {select {// 取一个应答case msg = <-wsConn.outChan:// 写给websocketif err = wsConn.wsSocket.WriteMessage(msg.MessageType, msg.Data); err != nil {goto ERROR}case <-wsConn.closeChan:goto CLOSED}}
ERROR:wsConn.WsClose()
CLOSED:
}func InitWebsocket(resp http.ResponseWriter, req *http.Request) (wsConn *WsConnection, err error) {var (wsSocket *websocket.Conn)// 应答客户端告知升级连接为websocketif wsSocket, err = wsUpgrader.Upgrade(resp, req, nil); err != nil {return}wsConn = &WsConnection{wsSocket:  wsSocket,inChan:    make(chan *WsMessage, 1000),outChan:   make(chan *WsMessage, 1000),closeChan: make(chan byte),isClosed:  false,}// 读协程go wsConn.wsReadLoop()// 写协程go wsConn.wsWriteLoop()return wsConn, nil
}// 发送消息
func (wsConn *WsConnection) WsWrite(messageType int, data []byte) (err error) {select {case wsConn.outChan <- &WsMessage{messageType, data}:case <-wsConn.closeChan:err = errors.New("websocket closed")}return
}// 读取消息
func (wsConn *WsConnection) WsRead() (msg *WsMessage, err error) {select {case msg = <-wsConn.inChan:returncase <-wsConn.closeChan:err = errors.New("websocket closed")}return
}// 关闭连接
func (wsConn *WsConnection) WsClose() {wsConn.wsSocket.Close()wsConn.mutex.Lock()defer wsConn.mutex.Unlock()if !wsConn.isClosed {wsConn.isClosed = trueclose(wsConn.closeChan)}
}

上述代码就是一个简单的websocket的封装,来看看这段代码做了哪些事:

  • 首先封装websocket 连接和消息的结构体,这个和tcp传输的方式类似,在连接的结构体中,声明2读写2个channel, 以及对应的websocket消息体,考虑到并发安全,所以加了个mutex的锁。
  • 然后定义读写逻辑,由于上述消息体的Data的字节,所以需要在读写逻辑里定义一个结构体 ,来解析这个字节,这个需要前后端一起约定好,然后发起一个协程不停的调用
  • 最后进行初始化, 注意,初始化的时候需要使用upgrade的方法,将http请求升级成websocket协议,然后启动收发2个协程,返回wsconn的结构体。

上述代码是对websocket的封装,这段代码仅仅是用来获取前端传来的数据,并不会对kubernetes进行任何操作,好在kubernetes的标准库"k8s.io/client-go/tools/remotecommand"提供了一个解决思路,remotecommand提供了连接到容器的方法:

if executor, err = remotecommand.NewSPDYExecutor(restConf, "POST", sshReq.URL()); err != nil {goto END}

然后使用Stream方法,将输入输出以流的方式,连接到容器

// 配置与容器之间的数据流处理回调handler = &streamHandler{wsConn: wsConn, resizeEvent: make(chan remotecommand.TerminalSize)}if err = executor.Stream(remotecommand.StreamOptions{Stdin:             handler,Stdout:            handler,Stderr:            handler,TerminalSizeQueue: handler,Tty:               true,}); err != nil {goto END}return

当然,在实际的执行命令之前,需要提前做一些预处理,比如验证权限,初始化客户端等等,简单的描述一下后端的逻辑:

ws请求
截取token以及其他参数
基于参数和token生成客户端
使用客户端创建客户端到容器的连接
将wss请求转换成流
持续输出和返回

前端封装

前端由于需要模拟一个terminal的窗口,这块可以用到大名鼎鼎的xterm,使用xterm可以在前端模拟出一个完整的terminal,包括颜色 ,字体 ,窗口大小等等,都是可以可配置的。

由于我的前端使用的是ts+vue3的框架进行编写的,所以仅需要单独写一个页面即可。

<template><div class="container"><Breadcrumb:items="[{path: '../workload/listpods',label: $t('menu.dashboard.workload'),},{ path: '', label: $t('menu.dashboard.workload.terminal.get') },]"/><!-- 基础信息 --><div:style="{width: '100%',}"><a-cardclass="general-card":title="$t('menu.dashboard.workload.terminal.get')"><a-row style="margin-bottom: 16px"><a-col :span="4"><a-space size="mini"><a-tag size="large">命名空间:</a-tag><p>{{ route.query.namespace }}</p></a-space></a-col><a-col :span="8" :offset="1"><div><a-space size="mini"><a-tag size="large">Pod:</a-tag><p>{{ route.query.podname }}</p></a-space></div></a-col><a-col :span="4" :offset="1"><div><a-space size="mini"><a-tag size="large">Container:</a-tag><p>{{ route.query.container }}</p></a-space></div></a-col><!-- 选择bash or shell --><a-col :span="4" :offset="1"><div><a-space size="mini"><h4>Bash:</h4><a-select :style="{ width: '100px' }" v-model="currentBash"><a-option>bash</a-option><a-option>sh</a-option></a-select></a-space></div></a-col></a-row><div ref="terminal"></div></a-card></div></div>
</template><script lang="ts" setup>
import { ref, onMounted, onBeforeUnmount, watch } from "vue";
import useLoading from "@/hooks/loading";
import { debounce } from "lodash";
import { Terminal } from "xterm";
import { FitAddon } from "xterm-addon-fit";
import { useRoute } from "vue-router";
import "xterm/css/xterm.css";
import { LabelDesc } from "@/api/common";
import { getToken } from "@/utils/auth";const { loading } = useLoading(true);
const route = useRoute();const descData = ref<LabelDesc[]>([{ label: "命名空间", value: route.query.namespace as string },{ label: "POD", value: route.query.podname as string },{ label: "Container", value: route.query.container as string },
]);
const shrc = ref<string>("b");
const currentBash = ref<string>("bash");// websocket客户端初始化相关
// 打开terminal
const OpenTerminal = () => {loading.value = false;
};
// 关闭terminal
const CloseTerminal = () => {console.log("onclose");
};
// 处理消息
const OnMessage = (event: any) => {term.value.write(event.data);
};
// 处理terminal错误
const OnError = () => {console.log("onerror");
};const terminalSocket = ref();
// 判断连接是否打开
const isWsOpen = () => {const readyState = terminalSocket.value && terminalSocket.value.readyState;return readyState === 1;
};
// 创建WS
const createWS = () => {const token = getToken() as string;if (currentBash.value === "bash") {shrc.value = "b"} else if (currentBash.value === "sh") {shrc.value = "s"}const wsUrl = `wss://${window.location.host}/kubemgr/api/v1/ws/${route.params.clusteruuid}/${route.query.namespace}/${route.query.podname}/${route.query.container}/${shrc.value}/ssh?clusterinfo=${token}`;terminalSocket.value = new WebSocket(wsUrl);terminalSocket.value.onopen = OpenTerminal; // WebSocket 连接已建立terminalSocket.value.onmessage = OnMessage; // 收到服务器消息terminalSocket.value.onclose = CloseTerminal; // WebSocket 连接已关闭terminalSocket.value.onerror = OnError; // WebSocket 连接出错
};
// 初始化WS
const initWS = () => {if (!terminalSocket.value) {createWS();}if (terminalSocket.value && terminalSocket.value.readyState >= 1) {terminalSocket.value.close();createWS();}
};// terminal初始化相关
const term = ref();
const terminal = ref();
const fitAddon = new FitAddon();// 尺寸同步 发送给后端,调整后端终端大小,和前端保持一致,不然前端只是范围变大了,命令还是会换行
const resizeRemoteTerminal = () => {if (isWsOpen()) {const msg = {type: "resize",rows: term.value.rows,cols: term.value.cols,};terminalSocket.value.send(JSON.stringify(msg));}
};// 终端输入绑定事件
const termData = () => {// 输入与粘贴的情况,onData不能重复绑定,不然会发送多次term.value.onData((data: any) => {if (isWsOpen()) {// 写给服务端, 由服务端发给containerconst msg = { type: "input", input: data };terminalSocket.value.send(JSON.stringify(msg));}});// 终端尺寸变化触发term.value.onResize(() => {resizeRemoteTerminal();});
};const initTerm = () => {term.value = new Terminal({lineHeight: 1.2,fontSize: 12,fontFamily: "Monaco, Menlo, Consolas, 'Courier New', monospace",theme: {background: "#181d28",},// 光标闪烁cursorBlink: true,cursorStyle: "underline",scrollback: 100,tabStopWidth: 4,});term.value.open(terminal.value); // 挂载dom窗口,初始化为空数据term.value.loadAddon(fitAddon); // 自适应尺寸// 不能初始化的时候fit,需要等terminal准备就绪,可以设置延时操作setTimeout(() => {fitAddon.fit();}, 1000);termData(); // Terminal 事件挂载
};const resetTerm = () => {term.value.reset()terminal.value.innerHTML = '';term.value = new Terminal({lineHeight: 1.2,fontSize: 12,fontFamily: "Monaco, Menlo, Consolas, 'Courier New', monospace",theme: {background: "#181d28",},// 光标闪烁cursorBlink: true,cursorStyle: "underline",scrollback: 100,tabStopWidth: 4,});term.value.open(terminal.value); // 挂载dom窗口,初始化为空数据term.value.loadAddon(fitAddon); // 自适应尺寸// 不能初始化的时候fit,需要等terminal准备就绪,可以设置延时操作setTimeout(() => {fitAddon.fit();}, 1000);termData(); // Terminal 事件挂载
};// 窗口大小适应相关
// 适应浏览器尺寸变化
const fitTerm = () => {fitAddon.fit();
};
const onResize = debounce(() => fitTerm(), 500);
const onTerminalResize = () => {window.addEventListener("resize", onResize);
};
const removeResizeListener = () => {window.removeEventListener("resize", onResize);
};onMounted(() => {loading.value = true;initWS();initTerm();onTerminalResize();
});onBeforeUnmount(() => {removeResizeListener();if (terminalSocket.value) {terminalSocket.value.close();}
});watch(() => currentBash.value, // 要监视的数据() => {// 回调函数loading.value = true;initWS();resetTerm();// initTerm();onTerminalResize();},{// immediate: true, // 立即执行回调deep: true, // 深层监视}
);
</script><script lang="ts">
export default {name: "GetTerminal",
};
</script><style lang="scss" scoped>
.terminal {width: 90%;// height: calc(100% - 62px);// height: 100%;margin-bottom: 16px;
}
</style>

最终的效果就类似这种

63BIQb

  • Tips: 一般服务部署后,服务暴露会使用nginx或者ingress, 默认是不支持websocket的,所以需要在nginx/ingress上添加一下配置, 将转发的http请求升级到websocket
	proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection upgrade;proxy_set_header Host $host;proxy_pass http://127.0.0.1:8082;
个人公众号, 分享一些日常开发,运维工作中的日常以及一些学习感悟,欢迎大家互相学习,交流

在这里插入图片描述

这篇关于动手搓一个kubernetes管理平台(5)-WebSocket和TTY的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/636748

相关文章

SpringBoot使用minio进行文件管理的流程步骤

《SpringBoot使用minio进行文件管理的流程步骤》MinIO是一个高性能的对象存储系统,兼容AmazonS3API,该软件设计用于处理非结构化数据,如图片、视频、日志文件以及备份数据等,本文... 目录一、拉取minio镜像二、创建配置文件和上传文件的目录三、启动容器四、浏览器登录 minio五、

IDEA中的Kafka管理神器详解

《IDEA中的Kafka管理神器详解》这款基于IDEA插件实现的Kafka管理工具,能够在本地IDE环境中直接运行,简化了设置流程,为开发者提供了更加紧密集成、高效且直观的Kafka操作体验... 目录免安装:IDEA中的Kafka管理神器!简介安装必要的插件创建 Kafka 连接第一步:创建连接第二步:选

SpringBoot实现websocket服务端及客户端的详细过程

《SpringBoot实现websocket服务端及客户端的详细过程》文章介绍了WebSocket通信过程、服务端和客户端的实现,以及可能遇到的问题及解决方案,感兴趣的朋友一起看看吧... 目录一、WebSocket通信过程二、服务端实现1.pom文件添加依赖2.启用Springboot对WebSocket

流媒体平台/视频监控/安防视频汇聚EasyCVR播放暂停后视频画面黑屏是什么原因?

视频智能分析/视频监控/安防监控综合管理系统EasyCVR视频汇聚融合平台,是TSINGSEE青犀视频垂直深耕音视频流媒体技术、AI智能技术领域的杰出成果。该平台以其强大的视频处理、汇聚与融合能力,在构建全栈视频监控系统中展现出了独特的优势。视频监控管理系统EasyCVR平台内置了强大的视频解码、转码、压缩等技术,能够处理多种视频流格式,并以多种格式(RTMP、RTSP、HTTP-FLV、WebS

综合安防管理平台LntonAIServer视频监控汇聚抖动检测算法优势

LntonAIServer视频质量诊断功能中的抖动检测是一个专门针对视频稳定性进行分析的功能。抖动通常是指视频帧之间的不必要运动,这种运动可能是由于摄像机的移动、传输中的错误或编解码问题导致的。抖动检测对于确保视频内容的平滑性和观看体验至关重要。 优势 1. 提高图像质量 - 清晰度提升:减少抖动,提高图像的清晰度和细节表现力,使得监控画面更加真实可信。 - 细节增强:在低光条件下,抖

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

如何解决线上平台抽佣高 线下门店客流少的痛点!

目前,许多传统零售店铺正遭遇客源下降的难题。尽管广告推广能带来一定的客流,但其费用昂贵。鉴于此,众多零售商纷纷选择加入像美团、饿了么和抖音这样的大型在线平台,但这些平台的高佣金率导致了利润的大幅缩水。在这样的市场环境下,商家之间的合作网络逐渐成为一种有效的解决方案,通过资源和客户基础的共享,实现共同的利益增长。 以最近在上海兴起的一个跨行业合作平台为例,该平台融合了环保消费积分系统,在短

Android平台播放RTSP流的几种方案探究(VLC VS ExoPlayer VS SmartPlayer)

技术背景 好多开发者需要遴选Android平台RTSP直播播放器的时候,不知道如何选的好,本文针对常用的方案,做个大概的说明: 1. 使用VLC for Android VLC Media Player(VLC多媒体播放器),最初命名为VideoLAN客户端,是VideoLAN品牌产品,是VideoLAN计划的多媒体播放器。它支持众多音频与视频解码器及文件格式,并支持DVD影音光盘,VCD影

Kubernetes PodSecurityPolicy:PSP能实现的5种主要安全策略

Kubernetes PodSecurityPolicy:PSP能实现的5种主要安全策略 1. 特权模式限制2. 宿主机资源隔离3. 用户和组管理4. 权限提升控制5. SELinux配置 💖The Begin💖点点关注,收藏不迷路💖 Kubernetes的PodSecurityPolicy(PSP)是一个关键的安全特性,它在Pod创建之前实施安全策略,确保P

软考系统规划与管理师考试证书含金量高吗?

2024年软考系统规划与管理师考试报名时间节点: 报名时间:2024年上半年软考将于3月中旬陆续开始报名 考试时间:上半年5月25日到28日,下半年11月9日到12日 分数线:所有科目成绩均须达到45分以上(包括45分)方可通过考试 成绩查询:可在“中国计算机技术职业资格网”上查询软考成绩 出成绩时间:预计在11月左右 证书领取时间:一般在考试成绩公布后3~4个月,各地领取时间有所不同