Kubevirt VirtController 初始化流程分析

VirtController 是kubevirt 负责处理逻辑的主要组件,他的初始化具有一定代表性,其实是整理 operator 流程的文档时看错代码了,才顺便整理了。

初始化

首先进行一些初始化工作,

1.创建VirtControllerApp;

2.生成并注入lead选举的参数,对 VirtControllerApp 注入环境参数;

3.根据 ClientConfig 生成 clientSet,restClient;

4.NewKubeInformerFactory,创建并启动 CRD 和 KubeVirt informer 获取 clusterConfig,根据clusterConfig 注册一些回调函数;

5.创建相关的 webService;

6.创建各种 informer;

7.创建各种 controller;

8.go app.Run()。

VirtController app.Run

在app.Run中我们显式的发现了相关监控端口的创建和运行以及开启选举,Run的细节藏在vca.setupLeaderElector()中,当获得 leader 后 vca 还会执行一系列动作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (vca *VirtControllerApp) Run() {
	logger := log.Log

	promCertManager := bootstrap.NewFileCertificateManager(vca.promCertFilePath, vca.promKeyFilePath)
	go promCertManager.Start()
	promTLSConfig := kvtls.SetupPromTLS(promCertManager, vca.clusterConfig)

	go func() {
		httpLogger := logger.With("service", "http")
		_ = httpLogger.Level(log.INFO).Log("action", "listening", "interface", vca.BindAddress, "port", vca.Port)
		http.Handle("/metrics", promhttp.Handler())
		server := http.Server{
			Addr:      vca.Address(),
			Handler:   http.DefaultServeMux,
			TLSConfig: promTLSConfig,
		}
		if err := server.ListenAndServeTLS("", ""); err != nil {
			golog.Fatal(err)
		}
	}()

	if err := vca.setupLeaderElector(); err != nil {
		golog.Fatal(err)
	}

	readyGauge.Set(1)
	vca.leaderElector.Run(vca.ctx)
	readyGauge.Set(0)
	panic("unreachable")
}

在setupLeaderElector() 的 vca.onStartedLeading()中,具体定义了该 vca 当选 leader 后会执行的动作。

onStartedLeading

当选 leader 后,首先进行 start 所有的informer,接着 run 相关的controller。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

func (vca *VirtControllerApp) onStartedLeading() func(ctx context.Context) {
	return func(ctx context.Context) {
		stop := ctx.Done()
		vca.informerFactory.Start(stop)

		golog.Printf("STARTING controllers with following threads : "+
			"node %d, vmi %d, replicaset %d, vm %d, migration %d, evacuation %d, disruptionBudget %d",
			vca.nodeControllerThreads, vca.vmiControllerThreads, vca.rsControllerThreads,
			vca.vmControllerThreads, vca.migrationControllerThreads, vca.evacuationControllerThreads,
			vca.disruptionBudgetControllerThreads)

		vmiprom.SetupVMICollector(vca.vmiInformer, vca.clusterConfig)
		vmprom.SetupVMCollector(vca.vmInformer)
		perfscale.RegisterPerfScaleMetrics(vca.vmiInformer)
		if vca.migrationInformer == nil {
			vca.migrationInformer = vca.informerFactory.VirtualMachineInstanceMigration()
		}
		golog.Printf("\nvca.migrationInformer :%v\n", vca.migrationInformer)
		migration.RegisterMigrationMetrics(vca.migrationInformer)
		migrationstats.SetupMigrationsCollector(vca.migrationInformer)

		go vca.evacuationController.Run(vca.evacuationControllerThreads, stop)
		go vca.disruptionBudgetController.Run(vca.disruptionBudgetControllerThreads, stop)
		go vca.nodeController.Run(vca.nodeControllerThreads, stop)
		go vca.vmiController.Run(vca.vmiControllerThreads, stop)
		go vca.rsController.Run(vca.rsControllerThreads, stop)
		go vca.poolController.Run(vca.poolControllerThreads, stop)
		go vca.vmController.Run(vca.vmControllerThreads, stop)
		go vca.migrationController.Run(vca.migrationControllerThreads, stop)
		go func() {
			if err := vca.snapshotController.Run(vca.snapshotControllerThreads, stop); err != nil {
				log.Log.Warningf("error running the snapshot controller: %v", err)
			}
		}()
		go func() {
			if err := vca.restoreController.Run(vca.restoreControllerThreads, stop); err != nil {
				log.Log.Warningf("error running the restore controller: %v", err)
			}
		}()
		go func() {
			if err := vca.exportController.Run(vca.exportControllerThreads, stop); err != nil {
				log.Log.Warningf("error running the export controller: %v", err)
			}
		}()
		go vca.workloadUpdateController.Run(stop)
		go vca.nodeTopologyUpdater.Run(vca.nodeTopologyUpdatePeriod, stop)
		go func() {
			if err := vca.vmCloneController.Run(vca.cloneControllerThreads, stop); err != nil {
				log.Log.Warningf("error running the clone controller: %v", err)
			}
		}()

		cache.WaitForCacheSync(stop, vca.persistentVolumeClaimInformer.HasSynced)
		close(vca.readyChan)
		leaderGauge.Set(1)
	}
}
updatedupdated2023-03-022023-03-02