Kubevirt launcher 流程分析

virt-launcher 是 kubevirt 与 vm的接口层,其中有 qemu libvirt 等组件用来与虚机交互。

代码入口

cmd/virt-launcher/virt-launcher.go main()

launcher 似乎可以传入非常多的参数,首先会确定log 级别,接着启动virtqemud、virtlogd等deamond。创建domainManager,暴露libvirt 的接口。接着创建server 向外暴露操作vm的接口。设定shutdown callback

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148

func main() {
    //相关flag设置
	qemuTimeout := pflag.Duration("qemu-timeout", defaultStartTimeout, "Amount of time to wait for qemu")
...
	libvirtLogFilters := pflag.String("libvirt-log-filters", "", "Set custom log filters for libvirt")
...
	// Block until all requested hookSidecars are ready
	hookManager := hooks.GetManager()
	err := hookManager.Collect(*hookSidecars, *qemuTimeout)
	if err != nil {
		panic(err)
	}

	vmi := v1.NewVMIReferenceWithUUID(*namespace, *name, types.UID(*uid))

	// Initialize local and shared directories
    //初始化相关 dir
	initializeDirs(*ephemeralDiskDir, *containerDiskDir, *hotplugDiskDir, *uid)
	ephemeralDiskCreator := ephemeraldisk.NewEphemeralDiskCreator(filepath.Join(*ephemeralDiskDir, "disk-data"))
	if err := ephemeralDiskCreator.Init(); err != nil {
		panic(err)
	}

	// Start libvirtd, virtlogd, and establish libvirt connection
	stopChan := make(chan struct{})

	l := util.NewLibvirtWrapper(*runWithNonRoot)
    //将目录cmd/virt-launcher下的一些配置文件放到目录合适的位置
	err = l.SetupLibvirt(libvirtLogFilters)
	if err != nil {
		panic(err)
	}
	//具体启动 libvirt 的 cmd 命令
	l.StartLibvirt(stopChan)
	// only single domain should be present
	domainName := api.VMINamespaceKeyFunc(vmi)
	//启动virtlog
	util.StartVirtlog(stopChan, domainName, *runWithNonRoot)

	domainConn := createLibvirtConnection(*runWithNonRoot)
	defer domainConn.Close()

	var agentStore = agentpoller.NewAsyncAgentStore()

	notifier := notifyclient.NewNotifier(*virtShareDir)
	defer notifier.Close()
//domainManager 中定义了与qemu 交互操作虚机的方法应该是给grpc使用的 
	domainManager, err := virtwrap.NewLibvirtDomainManager(domainConn, *virtShareDir, *ephemeralDiskDir, &agentStore, *ovmfPath, ephemeralDiskCreator)
	if err != nil {
		panic(err)
	}

	// Start the virt-launcher command service.
	// Clients can use this service to tell virt-launcher
	// to start/stop virtual machines
    
	options := cmdserver.NewServerOptions(*allowEmulation)
	cmdclient.SetLegacyBaseDir(*virtShareDir)
    //cmdclient.UninitializedSocketOnGuest() socket文件
    //启动grpc server
	cmdServerDone := startCmdServer(cmdclient.UninitializedSocketOnGuest(), domainManager, stopChan, options)

	gracefulShutdownCallback := func() {
		err := wait.PollImmediate(time.Second, 15*time.Second, func() (bool, error) {
			err := domainManager.MarkGracefulShutdownVMI(vmi)
			if err != nil {
				log.Log.Reason(err).Errorf("Unable to signal graceful shutdown")
				return false, err
			}

			return true, nil
		})

		if err != nil {
			log.Log.Reason(err).Errorf("Gave up attempting to signal graceful shutdown")
		} else {
			log.Log.Object(vmi).Info("Successfully signaled graceful shutdown")
		}
	}

	finalShutdownCallback := func(pid int) {
		if err := domainManager.KillVMI(vmi); err != nil {
			log.Log.Reason(err).Errorf("Unable to stop qemu with libvirt")
			if pid != 0 {
				log.Log.Warning("Falling back to SIGTERM")
				if err := syscall.Kill(pid, syscall.SIGTERM); err != nil {
					log.Log.Reason(err).Errorf("Unable to kill PID %d", pid)
				}
			}
		}
	}

	events := make(chan watch.Event, 2)
	// Send domain notifications to virt-handler
    //与virt-handler 通信
	startDomainEventMonitoring(notifier, *virtShareDir, domainConn, events, vmi, domainName, &agentStore, *qemuAgentSysInterval, *qemuAgentFileInterval, *qemuAgentUserInterval, *qemuAgentVersionInterval, *qemuAgentFSFreezeStatusInterval)
//关闭相关
	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt,
		syscall.SIGHUP,
		syscall.SIGINT,
		syscall.SIGTERM,
		syscall.SIGQUIT,
	)

	signalStopChan := make(chan struct{})
	go func() {
		s := <-c
		log.Log.Infof("Received signal %s", s.String())
		close(signalStopChan)
	}()

	// Marking Ready allows the container's readiness check to pass.
	// This informs virt-controller that virt-launcher is ready to handle
	// managing virtual machines.
	markReady()

	domain := waitForDomainUUID(*qemuTimeout, events, signalStopChan, domainManager)
	if domain != nil {
		var pidDir string
		if *runWithNonRoot {
			pidDir = "/run/libvirt/qemu/run"
		} else {
			pidDir = "/run/libvirt/qemu"
		}
        //容器内组件活性监测
		mon := virtlauncher.NewProcessMonitor(domainName,
			pidDir,
			*gracePeriodSeconds,
			finalShutdownCallback,
			gracefulShutdownCallback)

		// This is a wait loop that monitors the qemu pid. When the pid
		// exits, the wait loop breaks.
		mon.RunForever(*qemuTimeout, signalStopChan)

		// Now that the pid has exited, we wait for the final delete notification to be
		// sent back to virt-handler. This delete notification contains the reason the
		// domain exited.
		waitForFinalNotify(events, domainManager, vmi)
	}

	close(stopChan)
	<-cmdServerDone

	log.Log.Info("Exiting...")
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
//启动libvirt
func (l LibvirtWrapper) StartLibvirt(stopChan chan struct{}) {
   // we spawn libvirt from virt-launcher in order to ensure the libvirtd+qemu process
   // doesn't exit until virt-launcher is ready for it to. Virt-launcher traps signals
   // to perform special shutdown logic. These processes need to live in the same
   // container.

   go func() {
       //
      for {
         exitChan := make(chan struct{})
         args := []string{"-f", "/var/run/libvirt/libvirtd.conf"}
         cmd := exec.Command("/usr/sbin/libvirtd", args...)
         if l.user != 0 {
            cmd.SysProcAttr = &syscall.SysProcAttr{
                //linux 独有,CAP_NET_BIND_SERVIC应该是可以使用1000以下的端口
               AmbientCaps: []uintptr{unix.CAP_NET_BIND_SERVICE, unix.CAP_SYS_PTRACE},
            }
         }

         // connect libvirt's stderr to our own stdout in order to see the logs in the container logs
          //获取libvirt的log
         reader, err := cmd.StderrPipe()
         if err != nil {
            log.Log.Reason(err).Error("failed to start libvirtd")
            panic(err)
         }

         go func() {
            scanner := bufio.NewScanner(reader)
            scanner.Buffer(make([]byte, 1024), 512*1024)
            for scanner.Scan() {
               log.LogLibvirtLogLine(log.Log, scanner.Text())
            }

            if err := scanner.Err(); err != nil {
               log.Log.Reason(err).Error("failed to read libvirt logs")
            }
         }()

         err = cmd.Start()
         if err != nil {
            log.Log.Reason(err).Error("failed to start libvirtd")
            panic(err)
         }

         go func() {
            defer close(exitChan)
            cmd.Wait()
         }()

         select {
         case <-stopChan:
            cmd.Process.Kill()
            return
         case <-exitChan:
            log.Log.Errorf("libvirtd exited, restarting")
         }

         // this sleep is to avoid consumming all resources in the
         // event of a libvirtd crash loop.
         time.Sleep(time.Second)
      }
   }()
}
updatedupdated2023-03-312023-03-31