0%

一个Kubernetes Web终端连接工具

当应用部署到Kubernetes集群中之后,如何提供Web终端的功能,以便开发人员调试?

该功能的核心就是实现kubernetes executor接口

exec.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
package pod

import (
"context"
"errors"
"log"
"net/http"
"sync"

"github.com/gorilla/websocket"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/remotecommand"
)

// 封装websocket连接
type WsConnection struct {
wsSocket *websocket.Conn // 底层websocket
inChan chan *WsMessage // 读取队列
outChan chan *WsMessage // 发送队列
mutex sync.Mutex // 避免重复关闭管道
isClosed bool
closeChan chan byte // 关闭通知
}

// web终端发来的包
type xtermMessage struct {
MsgType string `json:"type"` // 类型:resize客户端调整终端, input客户端输入
Input string `json:"input"` // msgtype=input情况下使用
Rows uint16 `json:"rows"` // msgtype=resize情况下使用
Cols uint16 `json:"cols"` // msgtype=resize情况下使用
}

// websocket消息
type WsMessage struct {
MessageType int
Data []byte
}

// 关闭连接
func (wsConn *WsConnection) WsClose() {
wsConn.wsSocket.Close()
wsConn.mutex.Lock()
defer wsConn.mutex.Unlock()
if !wsConn.isClosed {
wsConn.isClosed = true
close(wsConn.closeChan)
}
}

// ssh流式处理器
type streamHandler struct {
wsConn *WsConnection
resizeEvent chan remotecommand.TerminalSize
}

// executor回调获取web是否resize
func (handler *streamHandler) Next() (size *remotecommand.TerminalSize) {
ret := <-handler.resizeEvent
size = &ret
return
}

// 发送返回消息到协程
func (wsConn *WsConnection) WsWrite(messageType int, data []byte) (err error) {
select {
case wsConn.outChan <- &WsMessage{messageType, data}:
case <-wsConn.closeChan:
err = errors.New("WsWrite websocket closed")
break
}
return
}

// 读取协程
func (wsConn *WsConnection) wsReadLoop() {
for {
// 读一条message
// ReadMessage返回的messageType只可能是:TextMessage BinaryMessage
msgType, data, err := wsConn.wsSocket.ReadMessage()
if err != nil {
log.Println(err)
break
}
//log.Print(string(data))
// 放入请求队列
wsConn.inChan <- &WsMessage{
msgType,
data,
}
}
}

// 发送协程
func (wsConn *WsConnection) wsWriteLoop() {
// 服务端返回给页面的数据
for {
select {
// 取一个应答
case msg := <-wsConn.outChan:
//log.Print(string(msg.Data))
// 写给web websocket
if err := wsConn.wsSocket.WriteMessage(msg.MessageType, msg.Data); err != nil {
log.Println(err)
break
}
case <-wsConn.closeChan:
wsConn.WsClose()
}
}
}

// 读取 页面消息到协程
func (wsConn *WsConnection) WsRead() (msg *WsMessage, err error) {
select {
case msg = <-wsConn.inChan:
return
case <-wsConn.closeChan:
err = errors.New("WsRead websocket closed")
break
}
return
}

// executor回调读取web端的输入
func (handler *streamHandler) Read(p []byte) (size int, err error) {
// 读web发来的输入
msg, err := handler.wsConn.WsRead()
if err != nil {
handler.wsConn.WsClose()
return
}

xtermMsg := &xtermMessage{
//MsgType: string(msg.MessageType),
Input: string(msg.Data),
}
// 放到channel里,等remotecommand executor调用我们的Next取走
handler.resizeEvent <- remotecommand.TerminalSize{Width: xtermMsg.Cols, Height: xtermMsg.Rows}
size = len(xtermMsg.Input)
copy(p, xtermMsg.Input)
return

}

// executor回调向web端输出
func (handler *streamHandler) Write(p []byte) (size int, err error) {
// 产生副本
copyData := make([]byte, len(p))
copy(copyData, p)
size = len(p)
err = handler.wsConn.WsWrite(websocket.TextMessage, copyData)
return
}

func ContainerExec(ctx context.Context, r *http.Request, w http.ResponseWriter, cluster, namespace, podID, container string) error {

// todo 获取k8s信息部分 需要替换成自己的
ctxName := meta.GetContextName(cluster)
kclient, err := k8s.GetClient(ctxName)
if err != nil {
log.Println(err)
return err
}
cmds := []string{"sh", "-c", "test -f /bin/bash && bash || sh"}
option := &corev1.PodExecOptions{
Command: cmds,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: true,
Container: container,
}
//subCtx, cancel := context.WithTimeout(ctx, models.READ_LOG_TIMEOUT)
//defer cancel()
req := kclient.CoreV1().RESTClient().
Post().
Resource("pods").
Name(podID).
Namespace(namespace).
SubResource("exec").
VersionedParams(option, scheme.ParameterCodec).Timeout(models.READ_LOG_TIMEOUT)
wsSocket, err := upGrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return err
}
wsConn := &WsConnection{
wsSocket: wsSocket,
inChan: make(chan *WsMessage, 1000),
outChan: make(chan *WsMessage, 1000),
closeChan: make(chan byte),
isClosed: false,
}
// 获取kube config配置
config, err := k8s.GetClientConfig(ctxName)
if err != nil {
wsConn.WsClose()
log.Println(err)
return err
}
// 创建到容器的连接
executor, err := remotecommand.NewSPDYExecutor(config, http.MethodPost, req.URL())
if err != nil {
wsConn.WsClose()
log.Println(err)
return err
}
// 页面读入输入 协程
go wsConn.wsReadLoop()
// 服务端返回数据 协程
go wsConn.wsWriteLoop()

// 配置与容器之间的数据流处理回调
handler := &streamHandler{wsConn: wsConn, resizeEvent: make(chan remotecommand.TerminalSize)}
if err = executor.Stream(remotecommand.StreamOptions{
Stdin: handler,
Stdout: handler,
Stderr: handler,
TerminalSizeQueue: handler,
Tty: true,
}); err != nil {
log.Println("handler", err)
return err
}
return err
}

参考资料:

https://github.com/jiankunking/k8-web-terminal

欢迎关注我的其它发布渠道