本文主要是介绍动手搓一个kubernetes管理平台(5)-WebSocket和TTY,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
所有的kubernetes管理平台,都会用到TTY的功能,既通过前端直接进入到容器内部,这是一个交互式的操作,或者说是一个流式操作,简单的http协议肯定不能满足这个需求,使用websocket就能很好的满足这个需求。
用通俗的话来描述websocket, 其实就三点:
- 可以直接在浏览器里使用
- 支持双向通信
- 封装简单
既然决定了使用websocket作为前端进入容器的方式,那么可以看看后端是如何进入容器的。
后端封装
golang中,一般使用github.com/gorilla/websocket 对websocket进行封装。
package wsconnectimport ("errors""github.com/gorilla/websocket""net/http""sync"
)// http升级websocket协议的配置
var wsUpgrader = websocket.Upgrader{// 允许所有CORS跨域请求CheckOrigin: func(r *http.Request) bool {return true},
}// websocket消息
type WsMessage struct {MessageType intData []byte
}// 封装websocket连接
type WsConnection struct {wsSocket *websocket.Conn // 底层websocketinChan chan *WsMessage // 读取队列outChan chan *WsMessage // 发送队列mutex sync.Mutex // 避免重复关闭管道isClosed boolcloseChan chan byte // 关闭通知
}// 读取协程
func (wsConn *WsConnection) wsReadLoop() {var (msgType intdata []bytemsg *WsMessageerr error)for {// 读一个messageif msgType, data, err = wsConn.wsSocket.ReadMessage(); err != nil {goto ERROR}msg = &WsMessage{msgType,data,}// 放入请求队列select {case wsConn.inChan <- msg:case <-wsConn.closeChan:goto CLOSED}}
ERROR:wsConn.WsClose()
CLOSED:
}// 发送协程
func (wsConn *WsConnection) wsWriteLoop() {var (msg *WsMessageerr error)for {select {// 取一个应答case msg = <-wsConn.outChan:// 写给websocketif err = wsConn.wsSocket.WriteMessage(msg.MessageType, msg.Data); err != nil {goto ERROR}case <-wsConn.closeChan:goto CLOSED}}
ERROR:wsConn.WsClose()
CLOSED:
}func InitWebsocket(resp http.ResponseWriter, req *http.Request) (wsConn *WsConnection, err error) {var (wsSocket *websocket.Conn)// 应答客户端告知升级连接为websocketif wsSocket, err = wsUpgrader.Upgrade(resp, req, nil); err != nil {return}wsConn = &WsConnection{wsSocket: wsSocket,inChan: make(chan *WsMessage, 1000),outChan: make(chan *WsMessage, 1000),closeChan: make(chan byte),isClosed: false,}// 读协程go wsConn.wsReadLoop()// 写协程go wsConn.wsWriteLoop()return wsConn, nil
}// 发送消息
func (wsConn *WsConnection) WsWrite(messageType int, data []byte) (err error) {select {case wsConn.outChan <- &WsMessage{messageType, data}:case <-wsConn.closeChan:err = errors.New("websocket closed")}return
}// 读取消息
func (wsConn *WsConnection) WsRead() (msg *WsMessage, err error) {select {case msg = <-wsConn.inChan:returncase <-wsConn.closeChan:err = errors.New("websocket closed")}return
}// 关闭连接
func (wsConn *WsConnection) WsClose() {wsConn.wsSocket.Close()wsConn.mutex.Lock()defer wsConn.mutex.Unlock()if !wsConn.isClosed {wsConn.isClosed = trueclose(wsConn.closeChan)}
}
上述代码就是一个简单的websocket的封装,来看看这段代码做了哪些事:
- 首先封装websocket 连接和消息的结构体,这个和tcp传输的方式类似,在连接的结构体中,声明2读写2个channel, 以及对应的websocket消息体,考虑到并发安全,所以加了个mutex的锁。
- 然后定义读写逻辑,由于上述消息体的Data的字节,所以需要在读写逻辑里定义一个结构体 ,来解析这个字节,这个需要前后端一起约定好,然后发起一个协程不停的调用
- 最后进行初始化, 注意,初始化的时候需要使用upgrade的方法,将http请求升级成websocket协议,然后启动收发2个协程,返回wsconn的结构体。
上述代码是对websocket的封装,这段代码仅仅是用来获取前端传来的数据,并不会对kubernetes进行任何操作,好在kubernetes的标准库"k8s.io/client-go/tools/remotecommand"提供了一个解决思路,remotecommand提供了连接到容器的方法:
if executor, err = remotecommand.NewSPDYExecutor(restConf, "POST", sshReq.URL()); err != nil {goto END}
然后使用Stream方法,将输入输出以流的方式,连接到容器
// 配置与容器之间的数据流处理回调handler = &streamHandler{wsConn: wsConn, resizeEvent: make(chan remotecommand.TerminalSize)}if err = executor.Stream(remotecommand.StreamOptions{Stdin: handler,Stdout: handler,Stderr: handler,TerminalSizeQueue: handler,Tty: true,}); err != nil {goto END}return
当然,在实际的执行命令之前,需要提前做一些预处理,比如验证权限,初始化客户端等等,简单的描述一下后端的逻辑:
前端封装
前端由于需要模拟一个terminal的窗口,这块可以用到大名鼎鼎的xterm,使用xterm可以在前端模拟出一个完整的terminal,包括颜色 ,字体 ,窗口大小等等,都是可以可配置的。
由于我的前端使用的是ts+vue3的框架进行编写的,所以仅需要单独写一个页面即可。
<template><div class="container"><Breadcrumb:items="[{path: '../workload/listpods',label: $t('menu.dashboard.workload'),},{ path: '', label: $t('menu.dashboard.workload.terminal.get') },]"/><!-- 基础信息 --><div:style="{width: '100%',}"><a-cardclass="general-card":title="$t('menu.dashboard.workload.terminal.get')"><a-row style="margin-bottom: 16px"><a-col :span="4"><a-space size="mini"><a-tag size="large">命名空间:</a-tag><p>{{ route.query.namespace }}</p></a-space></a-col><a-col :span="8" :offset="1"><div><a-space size="mini"><a-tag size="large">Pod:</a-tag><p>{{ route.query.podname }}</p></a-space></div></a-col><a-col :span="4" :offset="1"><div><a-space size="mini"><a-tag size="large">Container:</a-tag><p>{{ route.query.container }}</p></a-space></div></a-col><!-- 选择bash or shell --><a-col :span="4" :offset="1"><div><a-space size="mini"><h4>Bash:</h4><a-select :style="{ width: '100px' }" v-model="currentBash"><a-option>bash</a-option><a-option>sh</a-option></a-select></a-space></div></a-col></a-row><div ref="terminal"></div></a-card></div></div>
</template><script lang="ts" setup>
import { ref, onMounted, onBeforeUnmount, watch } from "vue";
import useLoading from "@/hooks/loading";
import { debounce } from "lodash";
import { Terminal } from "xterm";
import { FitAddon } from "xterm-addon-fit";
import { useRoute } from "vue-router";
import "xterm/css/xterm.css";
import { LabelDesc } from "@/api/common";
import { getToken } from "@/utils/auth";const { loading } = useLoading(true);
const route = useRoute();const descData = ref<LabelDesc[]>([{ label: "命名空间", value: route.query.namespace as string },{ label: "POD", value: route.query.podname as string },{ label: "Container", value: route.query.container as string },
]);
const shrc = ref<string>("b");
const currentBash = ref<string>("bash");// websocket客户端初始化相关
// 打开terminal
const OpenTerminal = () => {loading.value = false;
};
// 关闭terminal
const CloseTerminal = () => {console.log("onclose");
};
// 处理消息
const OnMessage = (event: any) => {term.value.write(event.data);
};
// 处理terminal错误
const OnError = () => {console.log("onerror");
};const terminalSocket = ref();
// 判断连接是否打开
const isWsOpen = () => {const readyState = terminalSocket.value && terminalSocket.value.readyState;return readyState === 1;
};
// 创建WS
const createWS = () => {const token = getToken() as string;if (currentBash.value === "bash") {shrc.value = "b"} else if (currentBash.value === "sh") {shrc.value = "s"}const wsUrl = `wss://${window.location.host}/kubemgr/api/v1/ws/${route.params.clusteruuid}/${route.query.namespace}/${route.query.podname}/${route.query.container}/${shrc.value}/ssh?clusterinfo=${token}`;terminalSocket.value = new WebSocket(wsUrl);terminalSocket.value.onopen = OpenTerminal; // WebSocket 连接已建立terminalSocket.value.onmessage = OnMessage; // 收到服务器消息terminalSocket.value.onclose = CloseTerminal; // WebSocket 连接已关闭terminalSocket.value.onerror = OnError; // WebSocket 连接出错
};
// 初始化WS
const initWS = () => {if (!terminalSocket.value) {createWS();}if (terminalSocket.value && terminalSocket.value.readyState >= 1) {terminalSocket.value.close();createWS();}
};// terminal初始化相关
const term = ref();
const terminal = ref();
const fitAddon = new FitAddon();// 尺寸同步 发送给后端,调整后端终端大小,和前端保持一致,不然前端只是范围变大了,命令还是会换行
const resizeRemoteTerminal = () => {if (isWsOpen()) {const msg = {type: "resize",rows: term.value.rows,cols: term.value.cols,};terminalSocket.value.send(JSON.stringify(msg));}
};// 终端输入绑定事件
const termData = () => {// 输入与粘贴的情况,onData不能重复绑定,不然会发送多次term.value.onData((data: any) => {if (isWsOpen()) {// 写给服务端, 由服务端发给containerconst msg = { type: "input", input: data };terminalSocket.value.send(JSON.stringify(msg));}});// 终端尺寸变化触发term.value.onResize(() => {resizeRemoteTerminal();});
};const initTerm = () => {term.value = new Terminal({lineHeight: 1.2,fontSize: 12,fontFamily: "Monaco, Menlo, Consolas, 'Courier New', monospace",theme: {background: "#181d28",},// 光标闪烁cursorBlink: true,cursorStyle: "underline",scrollback: 100,tabStopWidth: 4,});term.value.open(terminal.value); // 挂载dom窗口,初始化为空数据term.value.loadAddon(fitAddon); // 自适应尺寸// 不能初始化的时候fit,需要等terminal准备就绪,可以设置延时操作setTimeout(() => {fitAddon.fit();}, 1000);termData(); // Terminal 事件挂载
};const resetTerm = () => {term.value.reset()terminal.value.innerHTML = '';term.value = new Terminal({lineHeight: 1.2,fontSize: 12,fontFamily: "Monaco, Menlo, Consolas, 'Courier New', monospace",theme: {background: "#181d28",},// 光标闪烁cursorBlink: true,cursorStyle: "underline",scrollback: 100,tabStopWidth: 4,});term.value.open(terminal.value); // 挂载dom窗口,初始化为空数据term.value.loadAddon(fitAddon); // 自适应尺寸// 不能初始化的时候fit,需要等terminal准备就绪,可以设置延时操作setTimeout(() => {fitAddon.fit();}, 1000);termData(); // Terminal 事件挂载
};// 窗口大小适应相关
// 适应浏览器尺寸变化
const fitTerm = () => {fitAddon.fit();
};
const onResize = debounce(() => fitTerm(), 500);
const onTerminalResize = () => {window.addEventListener("resize", onResize);
};
const removeResizeListener = () => {window.removeEventListener("resize", onResize);
};onMounted(() => {loading.value = true;initWS();initTerm();onTerminalResize();
});onBeforeUnmount(() => {removeResizeListener();if (terminalSocket.value) {terminalSocket.value.close();}
});watch(() => currentBash.value, // 要监视的数据() => {// 回调函数loading.value = true;initWS();resetTerm();// initTerm();onTerminalResize();},{// immediate: true, // 立即执行回调deep: true, // 深层监视}
);
</script><script lang="ts">
export default {name: "GetTerminal",
};
</script><style lang="scss" scoped>
.terminal {width: 90%;// height: calc(100% - 62px);// height: 100%;margin-bottom: 16px;
}
</style>
最终的效果就类似这种
- Tips: 一般服务部署后,服务暴露会使用nginx或者ingress, 默认是不支持websocket的,所以需要在nginx/ingress上添加一下配置, 将转发的http请求升级到websocket
proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection upgrade;proxy_set_header Host $host;proxy_pass http://127.0.0.1:8082;
个人公众号, 分享一些日常开发,运维工作中的日常以及一些学习感悟,欢迎大家互相学习,交流
这篇关于动手搓一个kubernetes管理平台(5)-WebSocket和TTY的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!