1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
//上文中的 app.Run
func (app *VirtOperatorApp) Run() {
promTLSConfig := webhooks.SetupPromTLS(app.operatorCertManager)
go func() {
// 监控相关接口生成
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
webService := new(restful.WebService)
webService.Path("/").Consumes(restful.MIME_JSON).Produces(restful.MIME_JSON)
componentProfiler := profiler.NewProfileManager(app.clusterConfig)
webService.Route(webService.GET("/start-profiler").To(componentProfiler.HandleStartProfiler).Doc("start profiler endpoint"))
webService.Route(webService.GET("/stop-profiler").To(componentProfiler.HandleStopProfiler).Doc("stop profiler endpoint"))
webService.Route(webService.GET("/dump-profiler").To(componentProfiler.HandleDumpProfiler).Doc("dump profiler results endpoint"))
restfulContainer := restful.NewContainer()
restfulContainer.ServeMux = mux
restfulContainer.Add(webService)
server := http.Server{
Addr: app.ServiceListen.Address(),
Handler: mux,
TLSConfig: promTLSConfig,
}
if err := server.ListenAndServeTLS("", ""); err != nil {
golog.Fatal(err)
}
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
endpointName := VirtOperator
recorder := app.getNewRecorder(k8sv1.NamespaceAll, endpointName)
id, err := os.Hostname()
if err != nil {
golog.Fatalf("unable to get hostname: %v", err)
}
//创建选举用 lock
rl, err := resourcelock.New(app.LeaderElection.ResourceLock,
app.operatorNamespace,
endpointName,
app.clientSet.CoreV1(),
app.clientSet.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
})
if err != nil {
golog.Fatal(err)
}
apiAuthConfig := app.informerFactory.ApiAuthConfigMap()
stop := ctx.Done()
//开始所有的 informerfactory 中注册的 informer
app.informerFactory.Start(stop)
cache.WaitForCacheSync(stop, apiAuthConfig.HasSynced)
go app.operatorCertManager.Start()
caManager := webhooks.NewKubernetesClientCAManager(apiAuthConfig.GetStore())
tlsConfig := webhooks.SetupTLSWithCertManager(caManager, app.operatorCertManager, tls.VerifyClientCertIfGiven)
//创建钩子函数服务器
webhookServer := &http.Server{
Addr: fmt.Sprintf("%s:%d", app.BindAddress, 8444),
TLSConfig: tlsConfig,
}
var mux http.ServeMux
mux.HandleFunc("/kubevirt-validate-delete", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
validating_webhooks.Serve(w, r, operator_webhooks.NewKubeVirtDeletionAdmitter(app.clientSet))
}))
mux.HandleFunc(components.KubeVirtUpdateValidatePath, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
validating_webhooks.Serve(w, r, operator_webhooks.NewKubeVirtUpdateAdmitter(app.clientSet, app.clusterConfig))
}))
webhookServer.Handler = &mux
go func() {
err := webhookServer.ListenAndServeTLS("", "")
if err != nil {
panic(err)
}
}()
//创建leaderElector 其中OnStartedLeading: func(ctx context.Context) {
// leaderGauge.Set(1)
// log.Log.Infof("Started leading")
//
// log.Log.V(5).Info("start monitoring the kubevirt-config configMap")
// app.kubeVirtController.checkIfConfigMapStillExists(log.Log, stop)
//
// // run app
// go app.kubeVirtController.Run(controllerThreads, stop)
// },
//是抢占到leader后执行的逻辑
leaderElector, err := leaderelection.NewLeaderElector(
leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: app.LeaderElection.LeaseDuration.Duration,
RenewDeadline: app.LeaderElection.RenewDeadline.Duration,
RetryPeriod: app.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
leaderGauge.Set(1)
log.Log.Infof("Started leading")
log.Log.V(5).Info("start monitoring the kubevirt-config configMap")
app.kubeVirtController.checkIfConfigMapStillExists(log.Log, stop)
// run app
go app.kubeVirtController.Run(controllerThreads, stop)
},
OnStoppedLeading: func() {
leaderGauge.Set(0)
log.Log.V(5).Info("stop monitoring the kubevirt-config configMap")
golog.Fatal("leaderelection lost")
},
},
})
if err != nil {
golog.Fatal(err)
}
readyGauge.Set(1)
log.Log.Infof("Attempting to acquire leader status")
//开始选举
leaderElector.Run(ctx)
panic("unreachable")
}
|