Kubevirt vnc 流程分析

在kubevirt中,virtctl 命令提供 vnc 的功能,virtctl 会向virt-api发送vnc请求,virt-api将请求转发给virt-handler,virt-handler在节点上利用对应虚拟机的vnc socket 与虚机建立vnc连接。

代码入口在pkg/virtctl/root.go vnc.NewCommand(clientConfig)

pkg/virt-api/api.go subresourceApp.VNCRequestHandler

cmd/virt-handler/virt-handler.go consoleHandler.VNCHandler

virtctl

virtctl vnc 命令接受listenAddressproxyOnlycustomPort三个参数,并根据config 创建了一个 VNC{} ,然后执行 VNC.Run(cmd, args)

VNC.Run

在准备阶段中,根据config 生成 virtCli。接着与vmi创建websocket连接。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
virtCli, err := kubecli.GetKubevirtClientFromClientConfig(o.clientConfig)
if err != nil {
	return err
}

// setup connection with VM
vnc, err := virtCli.VirtualMachineInstance(namespace).VNC(vmi)
if err != nil {
	return fmt.Errorf("Can't access VMI %s: %s", vmi, err.Error())
}

VNC方法会去请求相应的资源(virt-api)建立 websocket 长连接

1
2
3
4
	u.Path = path.Join(
		u.Path,
		fmt.Sprintf("/apis/subresources.kubevirt.io/%s/namespaces/%s/%s/%s/%s", v1.ApiStorageVersion, namespace, resource, name, subresource),
	)

VNC 是wsStreamer,Stream 方法做了一个转发,从option.in 拿数据并将conn的数据转发给 option.out

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
//kubevirt.io/client-go/kubecli/streamer.go
func (ws *wsStreamer) Stream(options StreamOptions) error {
	copyErr := make(chan error, 1)

	go func() {
		_, err := CopyTo(ws.conn, options.In)
		copyErr <- err
	}()

	go func() {
		_, err := CopyFrom(options.Out, ws.conn)
		copyErr <- err
	}()

	defer ws.streamDone()
	return <-copyErr
}

根据ip和端口的输入参数,生成 tcpaddr,并监听该地址,该server 是用来将websocket 转发给client用的。接着创建两个pipe,将pipeInReader的数据传给VNC ws conn,将VNC ws conn的数据传给pipeOutWriter。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
	// Format the listening address to account for the port (ex: 127.0.0.0:5900)
	// Set listenAddress to localhost if proxy-only flag is not set
	if !proxyOnly {
		listenAddress = "127.0.0.1"
		glog.V(2).Infof("--proxy-only is set to false, listening on %s\n", listenAddress)
	}
	listenAddressFmt = listenAddress + ":%d"
	lnAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf(listenAddressFmt, customPort))
	if err != nil {
		return fmt.Errorf("Can't resolve the address: %s", err.Error())
	}

	// The local tcp server is used to proxy the podExec websock connection to vnc client
	ln, err := net.ListenTCP("tcp", lnAddr)
	if err != nil {
		return fmt.Errorf("Can't listen on unix socket: %s", err.Error())
	}
	// End of pre-flight checks. Everything looks good, we can start
	// the goroutines and let the data flow

	//                                       -> pipeInWriter  -> pipeInReader
	// remote-viewer -> unix sock connection
	//                                       <- pipeOutReader <- pipeOutWriter
	pipeInReader, pipeInWriter := io.Pipe()
	pipeOutReader, pipeOutWriter := io.Pipe()

	k8ResChan := make(chan error)
	listenResChan := make(chan error)
	viewResChan := make(chan error)
	stopChan := make(chan struct{}, 1)
	doneChan := make(chan struct{}, 1)
	writeStop := make(chan error)
	readStop := make(chan error)

	go func() {
		// transfer data from/to the VM
		k8ResChan <- vnc.Stream(kubecli.StreamOptions{
			In:  pipeInReader,
			Out: pipeOutWriter,
		})
	}()

接着处理监听到的请求,接收到vnc请求后,创建 Socket 将 VNC ws conn的数据通过pipeOutReader 传给socket的fd,将socket fd的数据传给pipeInWriter,完成了vnc client 端到 VNC proxy的数据交换。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
go func() {
		start := time.Now()
		glog.Infof("connection timeout: %v", LISTEN_TIMEOUT)
		// Don't set deadline if only proxy is running and VNC is to be connected manually
		if !proxyOnly {
			// exit early if spawning vnc client fails
			ln.SetDeadline(time.Now().Add(LISTEN_TIMEOUT))
		}
		fd, err := ln.Accept()
		if err != nil {
			glog.V(2).Infof("Failed to accept unix sock connection. %s", err.Error())
			listenResChan <- err
		}
		defer fd.Close()

		glog.V(2).Infof("VNC Client connected in %v", time.Now().Sub(start))
		templates.PrintWarningForPausedVMI(virtCli, vmi, namespace)

		// write to FD <- pipeOutReader
		go func() {
			_, err := io.Copy(fd, pipeOutReader)
			readStop <- err
		}()

		// read from FD -> pipeInWriter
		go func() {
			_, err := io.Copy(pipeInWriter, fd)
			writeStop <- err
		}()

		// don't terminate until vnc client is done
		<-doneChan
		listenResChan <- err
	}()

若为proxyOnly就返回监听的端口,否则进入checkAndRunVNCViewer 打开一个vnc client应用程序

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
if proxyOnly {
		defer close(doneChan)
		optionString, err := json.Marshal(struct {
			Port int `json:"port"`
		}{port})
		if err != nil {
			return fmt.Errorf("Error encountered: %s", err.Error())
		}
		fmt.Fprintln(cmd.OutOrStdout(), string(optionString))
	} else {
		// execute VNC Viewer
		go checkAndRunVNCViewer(doneChan, viewResChan, port)
	}

checkAndRunVNCViewer主要检查所在host的操作系统和存在的vnc应用,选择一个进行vnc连接

如:vncviewer vnc://127.0.0.1:XXXX --debug

vncBin = TIGER_VNC
args = tigerVncArgs(port)
...
cmnd := exec.Command(vncBin, args...)

最后在对chan 进行处理,优雅退出

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
	go func() {
		defer close(stopChan)
		interrupt := make(chan os.Signal, 1)
		signal.Notify(interrupt, os.Interrupt)
		<-interrupt
	}()

	select {
	case <-stopChan:
	case err = <-readStop:
	case err = <-writeStop:
	case err = <-k8ResChan:
	case err = <-viewResChan:
	case err = <-listenResChan:
	}

	if err != nil {
		return fmt.Errorf("Error encountered: %s", err.Error())
	}
	return nil

virt-api

在vnc中会去请求fmt.Sprintf("/apis/subresources.kubevirt.io/%s/namespaces/%s/%s/%s/%s", v1.ApiStorageVersion, namespace, resource, name, subresource)建立长连接。 该URI的服务端是virt-api

1
2
3
4
5
6
//pkg/virt-api/api.go
		subws.Route(subws.GET(definitions.NamespacedResourcePath(subresourcesvmiGVR) + definitions.SubResourcePath("vnc")).
			To(subresourceApp.VNCRequestHandler).
			Param(definitions.NamespaceParam(subws)).Param(definitions.NameParam(subws)).
			Operation(version.Version + "VNC").
			Doc("Open a websocket connection to connect to VNC on the specified VirtualMachineInstance."))

VNC handler 创建 streamer 去处理 request,其中streamer.dial 为

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (app *SubresourceAPIApp) VNCRequestHandler(request *restful.Request, response *restful.Response) {
	activeConnectionMetric := apimetrics.NewActiveVNCConnection(request.PathParameter("namespace"), request.PathParameter("name"))
	defer activeConnectionMetric.Dec()

	streamer := NewRawStreamer(
		app.FetchVirtualMachineInstance,
		validateVMIForVNC,
		app.virtHandlerDialer(func(vmi *v1.VirtualMachineInstance, conn kubecli.VirtHandlerConn) (string, error) {
			return conn.VNCURI(vmi)
		}),
	)

	streamer.Handle(request, response)
}


func (app *SubresourceAPIApp) virtHandlerDialer(getURL URLResolver) dialer {
	return handlerDial{
		getURL: getURL,
		app:    app,
	}
}

Handler 向virt-handler 使用s.dialer.DialUnderlying(namespace, name)方法创建一个 ws 连接

URL 模板如wss://%s:%v/v1/namespaces/%s/virtualmachineinstances/%s/vnc

并且将client端(virt-api)的连接处理 upgrade 为 websocket 请求(clientConnectionUpgrade(request, response)

然后将两个conn 的数据通过io.copy传递。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

func (s *Streamer) Handle(request *restful.Request, response *restful.Response) error {
	namespace := request.PathParameter(definitions.NamespaceParamName)
	name := request.PathParameter(definitions.NameParamName)
	serverConn, statusErr := s.dialer.DialUnderlying(namespace, name)

	if statusErr != nil {
		writeError(statusErr, response)
		return statusErr
	}

	clientConn, err := clientConnectionUpgrade(request, response)
	if err != nil {
		writeError(errors.NewBadRequest(err.Error()), response)
		return err
	}

	ctx, cancel := context.WithCancel(request.Request.Context())
	defer cancel()
	go s.cleanupOnClosedContext(ctx, clientConn, serverConn)

	if s.keepAliveClient != nil {
		go s.keepAliveClient(context.Background(), clientConn, cancel)
	}

	results := make(chan streamFuncResult, 2)
	defer close(results)

	go s.streamToClient(clientConn, serverConn, results)
	go s.streamToServer(clientConn, serverConn, results)

	result1 := <-results
	// start canceling on the first result to force all goroutines to terminate
	cancel()
	result2 := <-results

	if result1 != nil {
		return result1
	}
	return result2
}

virt-handler

virt-handler用来在节点上真正与虚机交互,virt-handler 的服务端代码入口在:

1
2
//cmd/virt-handler/virt-handler.go
ws.Route(ws.GET("/v1/namespaces/{namespace}/virtualmachineinstances/{name}/vnc").To(consoleHandler.VNCHandler))

VNCHandler 获取节点上的/proc/pid/root/var/run/kubevirt-private/uid/virt-vnc socket 文件,然后unixSocketDialer(vmi, unixSocketPath)根据socket文件创建conn,stream中将客户端conn中数据与vnc server conn中数据通过io.copy进行交换。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
func (t *ConsoleHandler) VNCHandler(request *restful.Request, response *restful.Response) {
   vmi, code, err := getVMI(request, t.vmiInformer)
   if err != nil {
      log.Log.Object(vmi).Reason(err).Error(failedRetrieveVMI)
      response.WriteError(code, err)
      return
   }
   unixSocketPath, err := t.getUnixSocketPath(vmi, "virt-vnc")
   if err != nil {
      log.Log.Object(vmi).Reason(err).Error("Failed finding unix socket for VNC console")
      response.WriteError(http.StatusBadRequest, err)
      return
   }
   uid := vmi.GetUID()
   stopChn := newStopChan(uid, t.vncLock, t.vncStopChans)
   defer deleteStopChan(uid, stopChn, t.vncLock, t.vncStopChans)
   t.stream(vmi, request, response, unixSocketDialer(vmi, unixSocketPath), stopChn)
}

VNC server

vnc server 在 virt-launcher 创建虚机时在XML中指定:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
//pkg/virt-launcher/virtwrap/converter/converter.go
domain.Spec.Devices.Graphics = []api.Graphics{
   {
      Listen: &api.GraphicsListen{
         Type:   "socket",
         Socket: fmt.Sprintf("/var/run/kubevirt-private/%s/virt-vnc", vmi.ObjectMeta.UID),
      },
      Type: "vnc",
   },
}

https://libvirt.org/formatdomain.html#graphical-framebuffers

updatedupdated2023-06-302023-06-30