Kubevirt operator 初始化流程分析

我们在部署 kubevirt 组件时,yaml 里只显式的创建了 operator 的 deployment。virt-operator 负责创建应用中的其他组件并保证其正常工作,了解 operator 执行流程可以帮助我们建立如何在 kubernetes 中创建应用有一个模糊的概念。

代码入口为

cmd/virt-operator/virt-operator.go

1
2
3
func main() {
	virt_operator.Execute()
}

代码主体为

pkg/virt-operator/application.go

VirtOperatorApp初始化

operator 的实例是VirtOperatorApp,在execute 函数中首先会对 operatorApp 进行一些初始化工作,主要包含 Setup 注入环境参数;根据 config 创建 clientset;初始化 leaderElection 实例;创建 informerFactory,创建相关的 informer 和相应的 cache;prepareCertManagers来初始化CertManagers,检查是否有serviceMonitor(监控使用)、prometheus组件并根据结果创建 informer 或 dummyInformer。创建Recorder 、 KubeVirtController(用来处理kubevirt 组件)以及clusterconfig(这里包含CRD\Kubevirt 的informer)。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
//一些初始化工作
func Execute() {
	var err error
	app := VirtOperatorApp{}
//注意这里在正常的operator启动项中不会设置,但是当operator启动后需要创建 install strategy 的configmap 会使用“dump-install-strategy”启动项使用 operator 的镜像创建一个 job。
	dumpInstallStrategy := pflag.Bool("dump-install-strategy", false, "Dump install strategy to configmap and exit")
    
	service.Setup(&app)

	log.InitializeLogging(VirtOperator)

	err = util.VerifyEnv()
	if err != nil {
		golog.Fatal(err)
	}
//将pod.continerd中相关字段写入容器环境变量
	// apply any passthrough environment to this operator as well
	for k, v := range util.GetPassthroughEnv() {
		os.Setenv(k, v)
	}

	config, err := kubecli.GetKubevirtClientConfig()
	if err != nil {
		panic(err)
	}
//创建clientset 
	app.aggregatorClient = aggregatorclient.NewForConfigOrDie(config)
    
	app.clientSet, err = kubecli.GetKubevirtClient()

	if err != nil {
		golog.Fatal(err)
	}

	app.restClient = app.clientSet.RestClient()

	app.LeaderElection = leaderelectionconfig.DefaultLeaderElectionConfiguration()

	app.operatorNamespace, err = clientutil.GetNamespace()
	if err != nil {
		golog.Fatalf("Error searching for namespace: %v", err)
	}
//dumpInstallStrategy 的具体流程,执行完 job 结束。
	if *dumpInstallStrategy {
		err = install.DumpInstallStrategyToConfigMap(app.clientSet, app.operatorNamespace)
		if err != nil {
			golog.Fatal(err)
		}
		os.Exit(0)
	}
//创建 informerFactory 用来生成 各类 informer
	app.informerFactory = controller.NewKubeInformerFactory(app.restClient, app.clientSet, app.aggregatorClient, app.operatorNamespace)

	app.kubeVirtInformer = app.informerFactory.KubeVirt()
	app.kubeVirtCache = app.kubeVirtInformer.GetStore()

	app.informers = util.Informers{
		ServiceAccount:           app.informerFactory.OperatorServiceAccount(),
		ClusterRole:              app.informerFactory.OperatorClusterRole(),
...
		Namespace:                app.informerFactory.Namespace(),
		Secrets:                  app.informerFactory.Secrets(),
		ConfigMap:                app.informerFactory.OperatorConfigMap(),
	}

	app.stores = util.Stores{
		ServiceAccountCache:           app.informerFactory.OperatorServiceAccount().GetStore(),
		ClusterRoleCache:              app.informerFactory.OperatorClusterRole().GetStore(),
	...
		PodDisruptionBudgetCache:      app.informerFactory.OperatorPodDisruptionBudget().GetStore(),
		NamespaceCache:                app.informerFactory.Namespace().GetStore(),
		SecretCache:                   app.informerFactory.Secrets().GetStore(),
		ConfigMapCache:                app.informerFactory.OperatorConfigMap().GetStore(),
	}
//openshift 环境相关操作
...
	serviceMonitorEnabled, err := util.IsServiceMonitorEnabled(app.clientSet)
	if err != nil {
		golog.Fatalf("Error checking for ServiceMonitor: %v", err)
	}
	if serviceMonitorEnabled {
	...
	}

	prometheusRuleEnabled, err := util.IsPrometheusRuleEnabled(app.clientSet)
	if err != nil {
		golog.Fatalf("Error checking for PrometheusRule: %v", err)
	}
	if prometheusRuleEnabled {
		...
	}

	app.prepareCertManagers()
//初始化 kubevirt-controller
	app.kubeVirtRecorder = app.getNewRecorder(k8sv1.NamespaceAll, VirtOperator)
	app.kubeVirtController = *NewKubeVirtController(app.clientSet, app.aggregatorClient.ApiregistrationV1().APIServices(), app.kubeVirtInformer, app.kubeVirtRecorder, app.stores, app.informers, app.operatorNamespace)

	image := os.Getenv(util.OperatorImageEnvName)
	if image == "" {
		golog.Fatalf("Error getting operator's image: %v", err)
	}
	log.Log.Infof("Operator image: %s", image)
//根据已有信息初始化 kubevirt 的 config
    app.clusterConfig = virtconfig.NewClusterConfig(app.informerFactory.CRD(),
		app.informerFactory.KubeVirt(),
		app.operatorNamespace)

	app.Run()
}

AppRun

VirtOperatorApp.Run中,首先暴露/metrics接口(用于启动或停止prometheus),以及/kubevirt-validate-delete和update接口,create and run leaderElector。在learderElector 获得leader 时,将会调用callback,其中会run kubevirtcontroller。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
//上文中的 app.Run
func (app *VirtOperatorApp) Run() {
	promTLSConfig := webhooks.SetupPromTLS(app.operatorCertManager)

	go func() {
// 监控相关接口生成
		mux := http.NewServeMux()
		mux.Handle("/metrics", promhttp.Handler())

		webService := new(restful.WebService)
		webService.Path("/").Consumes(restful.MIME_JSON).Produces(restful.MIME_JSON)

		componentProfiler := profiler.NewProfileManager(app.clusterConfig)
		webService.Route(webService.GET("/start-profiler").To(componentProfiler.HandleStartProfiler).Doc("start profiler endpoint"))
		webService.Route(webService.GET("/stop-profiler").To(componentProfiler.HandleStopProfiler).Doc("stop profiler endpoint"))
		webService.Route(webService.GET("/dump-profiler").To(componentProfiler.HandleDumpProfiler).Doc("dump profiler results endpoint"))

		restfulContainer := restful.NewContainer()
		restfulContainer.ServeMux = mux
		restfulContainer.Add(webService)

		server := http.Server{
			Addr:      app.ServiceListen.Address(),
			Handler:   mux,
			TLSConfig: promTLSConfig,
		}
		if err := server.ListenAndServeTLS("", ""); err != nil {
			golog.Fatal(err)
		}
	}()

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	endpointName := VirtOperator

	recorder := app.getNewRecorder(k8sv1.NamespaceAll, endpointName)

	id, err := os.Hostname()
	if err != nil {
		golog.Fatalf("unable to get hostname: %v", err)
	}
//创建选举用 lock
	rl, err := resourcelock.New(app.LeaderElection.ResourceLock,
		app.operatorNamespace,
		endpointName,
		app.clientSet.CoreV1(),
		app.clientSet.CoordinationV1(),
		resourcelock.ResourceLockConfig{
			Identity:      id,
			EventRecorder: recorder,
		})
	if err != nil {
		golog.Fatal(err)
	}

	apiAuthConfig := app.informerFactory.ApiAuthConfigMap()

	stop := ctx.Done()
//开始所有的 informerfactory 中注册的 informer
	app.informerFactory.Start(stop)
	cache.WaitForCacheSync(stop, apiAuthConfig.HasSynced)

	go app.operatorCertManager.Start()

	caManager := webhooks.NewKubernetesClientCAManager(apiAuthConfig.GetStore())

	tlsConfig := webhooks.SetupTLSWithCertManager(caManager, app.operatorCertManager, tls.VerifyClientCertIfGiven)
//创建钩子函数服务器
	webhookServer := &http.Server{
		Addr:      fmt.Sprintf("%s:%d", app.BindAddress, 8444),
		TLSConfig: tlsConfig,
	}

	var mux http.ServeMux
	mux.HandleFunc("/kubevirt-validate-delete", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		validating_webhooks.Serve(w, r, operator_webhooks.NewKubeVirtDeletionAdmitter(app.clientSet))
	}))
	mux.HandleFunc(components.KubeVirtUpdateValidatePath, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		validating_webhooks.Serve(w, r, operator_webhooks.NewKubeVirtUpdateAdmitter(app.clientSet, app.clusterConfig))
	}))
	webhookServer.Handler = &mux
	go func() {
		err := webhookServer.ListenAndServeTLS("", "")
		if err != nil {
			panic(err)
		}
	}()
//创建leaderElector 其中OnStartedLeading: func(ctx context.Context) {
//					leaderGauge.Set(1)
//					log.Log.Infof("Started leading")
//
//					log.Log.V(5).Info("start monitoring the kubevirt-config configMap")
//					app.kubeVirtController.checkIfConfigMapStillExists(log.Log, stop)
//
//					// run app
//					go app.kubeVirtController.Run(controllerThreads, stop)
//				},
//是抢占到leader后执行的逻辑
	leaderElector, err := leaderelection.NewLeaderElector(
		leaderelection.LeaderElectionConfig{
			Lock:          rl,
			LeaseDuration: app.LeaderElection.LeaseDuration.Duration,
			RenewDeadline: app.LeaderElection.RenewDeadline.Duration,
			RetryPeriod:   app.LeaderElection.RetryPeriod.Duration,
			Callbacks: leaderelection.LeaderCallbacks{
				OnStartedLeading: func(ctx context.Context) {
					leaderGauge.Set(1)
					log.Log.Infof("Started leading")

					log.Log.V(5).Info("start monitoring the kubevirt-config configMap")
					app.kubeVirtController.checkIfConfigMapStillExists(log.Log, stop)

					// run app
					go app.kubeVirtController.Run(controllerThreads, stop)
				},
				OnStoppedLeading: func() {
					leaderGauge.Set(0)
					log.Log.V(5).Info("stop monitoring the kubevirt-config configMap")
					golog.Fatal("leaderelection lost")
				},
			},
		})
	if err != nil {
		golog.Fatal(err)
	}

	readyGauge.Set(1)
    log.Log.Infof("Attempting to acquire leader status")
//开始选举
    leaderElector.Run(ctx)
	panic("unreachable")

}

LeaderElector.Run

leaderElector run 这里使用acquire 不断去尝试获取leader 锁。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// Run starts the leader election loop. Run will not return
// before leader election loop is stopped by ctx or it has
// stopped holding the leader lease
//这里使用acquire 不断去尝试获取leader 锁,底层使用了 wait.JitterUntil
func (le *LeaderElector) Run(ctx context.Context) {
	defer runtime.HandleCrash()
	defer func() {
		le.config.Callbacks.OnStoppedLeading()
	}()

	if !le.acquire(ctx) {
		return // ctx signalled done
	}
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()
	go le.config.Callbacks.OnStartedLeading(ctx)
	le.renew(ctx)
}

KubeVirtcontroller.Run

首先等待相关informer 都启动,然后在worker中不断execute crd kubevirt obj。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

func (c *KubeVirtController) Run(threadiness int, stopCh <-chan struct{}) {
	defer controller.HandlePanic()
	defer c.queue.ShutDown()
	log.Log.Info("Starting KubeVirt controller.")

	// Wait for cache sync before we start the controller
    //等待相关informer 都启动
	cache.WaitForCacheSync(stopCh, c.kubeVirtInformer.HasSynced)
...
	cache.WaitForCacheSync(stopCh, c.informers.Secrets.HasSynced)
	cache.WaitForCacheSync(stopCh, c.informers.ConfigMap.HasSynced)

	// Start the actual work
    //不断调用c.runWorker
	for i := 0; i < threadiness; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}

	<-stopCh
	log.Log.Info("Stopping KubeVirt controller.")
}

func (c *KubeVirtController) runWorker() {
	for c.Execute() {
	}
}
//从queue中取出元素,应该是一个crd kubevirt obj,然后 c.execute
func (c *KubeVirtController) Execute() bool {
	key, quit := c.queue.Get()
	if quit {
		return false
	}
	defer c.queue.Done(key)
	err := c.execute(key.(string))

	if err != nil {
		log.Log.Reason(err).Errorf("reenqueuing KubeVirt %v", key)
		c.queue.AddRateLimited(key)
	} else {
		log.Log.V(4).Infof("processed KubeVirt %v", key)
		c.queue.Forget(key)
	}
	return true
}

c.execute

execute主要根据obj的情况取判断是否需要create、update或者delete。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
func (c *KubeVirtController) execute(key string) error {
//找到该kubevirt obj
   // Fetch the latest KubeVirt from cache
   obj, exists, err := c.kubeVirtInformer.GetStore().GetByKey(key)

   if err != nil {
      return err
   }

   if !exists {
      // when the resource is gone, deletion was handled already
      log.Log.Infof("KubeVirt resource not found")
      c.kubeVirtExpectations.DeleteExpectations(key)
      return nil
   }

   kv := obj.(*v1.KubeVirt)
   logger := log.Log.Object(kv)

   // this must be first step in execution. Writing the object
   // when api version changes ensures our api stored version is updated.
   if !controller.ObservedLatestApiVersionAnnotation(kv) {
      kv := kv.DeepCopy()
      controller.SetLatestApiVersionAnnotation(kv)
      _, err = c.clientset.KubeVirt(kv.ObjectMeta.Namespace).Update(kv)
      if err != nil {
         logger.Reason(err).Errorf("Could not update the KubeVirt resource.")
      }

      return err
   }

   // If we can't extract the key we can't do anything
   _, err = controller.KeyFunc(kv)
   if err != nil {
      log.Log.Reason(err).Errorf("Could not extract the key from the custom resource, will do nothing and not requeue.")
      return nil
   }

   logger.Info("Handling KubeVirt resource")

   // only process the kubevirt deployment if all expectations are satisfied.
   needsSync := c.kubeVirtExpectations.SatisfiedExpectations(key)
   if !needsSync {
      logger.Info("Waiting for expectations to be fulfilled")
      return nil
   }

   // Adds of all types are not done in one go. We need to set an expectation of 0 so that we can add something
   c.kubeVirtExpectations.ResetExpectations(key)

   var syncError error
   kvCopy := kv.DeepCopy()
//决定是 delete 还是 update/install 后续代码应该就是update kubevirt obj
   if kv.DeletionTimestamp != nil {
      syncError = c.syncDeletion(kvCopy)
   } else {
      syncError = c.syncInstallation(kvCopy)
   }

   // set timestamps on conditions if they changed
   operatorutil.SetConditionTimestamps(kv, kvCopy)

   // If we detect a change on KubeVirt we update it
   if !equality.Semantic.DeepEqual(kv.Status, kvCopy.Status) {
      if err := c.statusUpdater.UpdateStatus(kvCopy); err != nil {
         logger.Reason(err).Errorf("Could not update the KubeVirt resource status.")
         return err
      }
   }

   // If we detect a change on KubeVirt finalizers we update them
   // Note: we don't own the metadata section so we need to use Patch() and not Update()
   if !equality.Semantic.DeepEqual(kv.Finalizers, kvCopy.Finalizers) {
      finalizersJson, err := json.Marshal(kvCopy.Finalizers)
      if err != nil {
         return err
      }
      patch := fmt.Sprintf(`[{"op": "replace", "path": "/metadata/finalizers", "value": %s}]`, string(finalizersJson))
      _, err = c.clientset.KubeVirt(kvCopy.ObjectMeta.Namespace).Patch(kvCopy.Name, types.JSONPatchType, []byte(patch), &metav1.PatchOptions{})
      if err != nil {
         logger.Reason(err).Errorf("Could not patch the KubeVirt finalizers.")
         return err
      }
   }

   return syncError
}

install/update

根据installstrategy去install或者update。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
func (c *KubeVirtController) syncInstallation(kv *v1.KubeVirt) error {
   var targetStrategy *install.Strategy
   var targetPending bool
   var err error

   if err := c.checkForActiveInstall(kv); err != nil {
      log.DefaultLogger().Reason(err).Error("Will ignore the install request until the situation is resolved.")
      util.UpdateConditionsFailedExists(kv)
      return nil
   }

   logger := log.Log.Object(kv)
   logger.Infof("Handling deployment")

   config := operatorutil.GetTargetConfigFromKV(kv)

   // Record current operator version to status section
   util.SetOperatorVersion(kv)

   // Record the version we're targeting to install
   config.SetTargetDeploymentConfig(kv)

   if kv.Status.Phase == "" {
      kv.Status.Phase = v1.KubeVirtPhaseDeploying
   }
//根据install还是update修改相关condition
   if isUpdating(kv) {
      util.UpdateConditionsUpdating(kv)
   } else {
      util.UpdateConditionsDeploying(kv)
   }
//获取installstrategy 注意这里和之前说的那个job有关
   targetStrategy, targetPending, err = c.loadInstallStrategy(kv)
   if err != nil {
      return err
   }

   // we're waiting on a job to finish and the config map to be created
   if targetPending {
      return nil
   }

   // add finalizer to prevent deletion of CR before KubeVirt was undeployed
   util.AddFinalizer(kv)

   // once all the install strategies are loaded, garbage collect any
   // install strategy jobs that were created.
   err = c.garbageCollectInstallStrategyJobs()
   if err != nil {
      return err
   }
//创建 reconciler 
   reconciler, err := apply.NewReconciler(kv, targetStrategy, c.stores, c.clientset, c.aggregatorClient, &c.kubeVirtExpectations, c.recorder)
   if err != nil {
      // deployment failed
      util.UpdateConditionsFailedError(kv, err)
      logger.Errorf("Failed to create reconciler: %v", err)
      return err
   }
// 如何去实现希望的kubevirt 状态,相关组件 update、install流程都在这里
   synced, err := reconciler.Sync(c.queue)

   if err != nil {
      // deployment failed
      util.UpdateConditionsFailedError(kv, err)
      logger.Errorf("Failed to create all resources: %v", err)
      return err
   }

   // the entire sync can't always occur within a single control loop execution.
   // when synced==true that means SyncAll() has completed and has nothing left to wait on.
   if synced {
      // record the version that has been completely installed
      config.SetObservedDeploymentConfig(kv)

      // update conditions
      util.UpdateConditionsCreated(kv)
      logger.Info("All KubeVirt resources created")

      // check if components are ready
      if c.isReady(kv) {
         logger.Info("All KubeVirt components ready")
         kv.Status.Phase = v1.KubeVirtPhaseDeployed
         util.UpdateConditionsAvailable(kv)
         kv.Status.ObservedGeneration = &kv.ObjectMeta.Generation
         return nil
      }
   }

   logger.Info("Processed deployment for this round")
   return nil
}
updatedupdated2023-03-312023-03-31