在k8s 中驱逐launcher pod 的操作可以转换成对虚机的热迁移。总的来说是通过k8s 的webhook 机制,当 pod 驱逐动作发起后会给vmi添加相关字段,出发热迁移操作。这里我们介绍对launcher pod 进行evict 操作后,kubevirt 侧做了什么。
DisruptionBudget for launcher pod
为了让launcher pod 在驱逐中不受影响,kubevirt 为配置了相关驱逐迁移策略的launcher pod 添加DisruptionBudget,MinAvailable 为1。相关逻辑在pkg/virt-controller/watch/drain/disruptionbudget/disruptionbudget.go,因此相关launcher pod 并不会被驱逐。
ValidatingWebhookConfiguration
部署ValidatingWebhookConfiguration
virt-operator 将在部署kubevirt 阶段创建virt-api-validator 的ValidatingWebhookConfiguration
pkg/virt-operator/resource/generate/components/webhooks.go
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
...
name: virt-api-validator
..
webhooks:
- admissionReviewVersions:
- v1
- v1beta1
clientConfig:
caBundle:
...
service:
name: virt-api
namespace: kubevirt
path: /launcher-eviction-validate
port: 443
failurePolicy: Ignore
matchPolicy: Equivalent
name: virt-launcher-eviction-interceptor.kubevirt.io
namespaceSelector: {}
objectSelector: {}
rules:
- apiGroups:
- ""
apiVersions:
- v1
operations:
- '*'
resources:
- pods/eviction
scope: '*'
sideEffects: NoneOnDryRun
timeoutSeconds: 10
virt-api webhook 实现
virt-api 侧实现该webhook
//pkg/virt-api/webhooks/validating-webhook/validating-webhook.go
func ServePodEvictionInterceptor(resp http.ResponseWriter, req *http.Request, clusterConfig *virtconfig.ClusterConfig, virtCli kubecli.KubevirtClient) {
validating_webhooks.Serve(resp, req, admitters.NewPodEvictionAdmitter(clusterConfig, virtCli, virtCli.GeneratedKubeVirtClient()))
}
//pkg/util/webhooks/validating-webhooks/validating-webhook.go
//webhook server 实现
func Serve(resp http.ResponseWriter, req *http.Request, admitter Admitter) {
review, err := webhooks.GetAdmissionReview(req)
if err != nil {
resp.WriteHeader(http.StatusBadRequest)
return
}
response := admissionv1.AdmissionReview{
TypeMeta: v1.TypeMeta{
// match the request version to be
// backwards compatible with v1beta1
APIVersion: review.APIVersion,
Kind: "AdmissionReview",
},
}
//具体的webhook逻辑
reviewResponse := admitter.Admit(review)
if reviewResponse != nil {
response.Response = reviewResponse
response.Response.UID = review.Request.UID
}
// reset the Object and OldObject, they are not needed in admitter response.
review.Request.Object = runtime.RawExtension{}
review.Request.OldObject = runtime.RawExtension{}
responseBytes, err := json.Marshal(response)
if err != nil {
log.Log.Reason(err).Errorf("failed json encode webhook response")
resp.WriteHeader(http.StatusBadRequest)
return
}
if _, err := resp.Write(responseBytes); err != nil {
log.Log.Reason(err).Errorf("failed to write webhook response")
resp.WriteHeader(http.StatusBadRequest)
return
}
}
其中PodEvictionAdmitter
实现了具体的webhook逻辑,如果是launcher pod,就去查看对应vmi的EvictionStrategy
(cluster config中可进行配置),当EvictionStrategy 策略存在时,markVMI
对vmi 的/status/evacuationNodeName 进行patch ,将值设为vmi.Status.NodeName
//pkg/virt-api/webhooks/validating-webhook/admitters/pod-eviction-admitter.go
func (admitter *PodEvictionAdmitter) Admit(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
pod, err := admitter.kubeClient.CoreV1().Pods(ar.Request.Namespace).Get(context.Background(), ar.Request.Name, metav1.GetOptions{})
if err != nil {
return validating_webhooks.NewPassingAdmissionResponse()
}
//筛选出launcher pod
if !isVirtLauncher(pod) {
return validating_webhooks.NewPassingAdmissionResponse()
}
vmiName, exists := pod.GetAnnotations()[virtv1.DomainAnnotation]
if !exists {
return validating_webhooks.NewPassingAdmissionResponse()
}
vmi, err := admitter.virtClient.KubevirtV1().VirtualMachineInstances(ar.Request.Namespace).Get(context.Background(), vmiName, metav1.GetOptions{})
if err != nil {
return denied(fmt.Sprintf("kubevirt failed getting the vmi: %s", err.Error()))
}
evictionStrategy := migrations.VMIEvictionStrategy(admitter.clusterConfig, vmi)
if evictionStrategy == nil {
// we don't act on VMIs without an eviction strategy
return validating_webhooks.NewPassingAdmissionResponse()
}
markForEviction := false
switch *evictionStrategy {
case virtv1.EvictionStrategyLiveMigrate:
if !vmi.IsMigratable() {
return denied(fmt.Sprintf("VMI %s is configured with an eviction strategy but is not live-migratable", vmi.Name))
}
markForEviction = true
case virtv1.EvictionStrategyLiveMigrateIfPossible:
if vmi.IsMigratable() {
markForEviction = true
}
case virtv1.EvictionStrategyExternal:
markForEviction = true
}
if markForEviction && !vmi.IsMarkedForEviction() && vmi.Status.NodeName == pod.Spec.NodeName {
dryRun := ar.Request.DryRun != nil && *ar.Request.DryRun == true
err := admitter.markVMI(vmi.Namespace, vmi.Name, vmi.Status.NodeName, dryRun)
if err != nil {
// As with the previous case, it is up to the user to issue a retry.
return denied(fmt.Sprintf("kubevirt failed marking the vmi for eviction: %s", err.Error()))
}
return denied(fmt.Sprintf("Eviction triggered evacuation of VMI \"%s/%s\"", vmi.Namespace, vmi.Name))
}
// We can let the request go through because the pod is protected by a PDB if the VMI wants to be live-migrated on
// eviction. Otherwise, we can just evict it.
return validating_webhooks.NewPassingAdmissionResponse()
}
细节上,当pod 驱逐 需要kubevirt 执行迁移操作时denied 中返回码为429
func denied(message string) *admissionv1.AdmissionResponse {
return &admissionv1.AdmissionResponse{
Allowed: false,
Result: &metav1.Status{
Message: message,
Code: http.StatusTooManyRequests,
},
}
}
EvacuationController
EvacuationController属于virt-controller,包括vmiInformer、migrationInformer、nodeInformer、vmiPodInformer,相关资源变动后会将相关联的node加入queue。
具体处理逻辑在sync 中:
首先统计出节点上需要迁移的vmi(node taint,vmi.status.evacuationNodeName 等),然后根据现存的迁移数量,为需要迁移的虚机创建 VirtualMachineInstanceMigration crd。其上会添加 kubevirt.io/evacuationMigration
这个annotation。并会用于informer的addMigration
中将有该ann的VirtualMachineInstanceMigration 所对应的node入队。
//pkg/virt-controller/watch/drain/evacuation/evacuation.go
func (c *EvacuationController) sync(node *k8sv1.Node, vmisOnNode []*virtv1.VirtualMachineInstance, activeMigrations []*virtv1.VirtualMachineInstanceMigration) error {
// If the node has no drain taint, we have nothing to do
taintKey := *c.clusterConfig.GetMigrationConfiguration().NodeDrainTaintKey
taint := &k8sv1.Taint{
Key: taintKey,
Effect: k8sv1.TaintEffectNoSchedule,
}
vmisToMigrate := vmisToMigrate(node, vmisOnNode, taint)
if len(vmisToMigrate) == 0 {
return nil
}
migrationCandidates, nonMigrateable := c.filterRunningNonMigratingVMIs(vmisToMigrate, activeMigrations)
if len(migrationCandidates) == 0 && len(nonMigrateable) == 0 {
return nil
}
runningMigrations := migrationutils.FilterRunningMigrations(activeMigrations)
activeMigrationsFromThisSourceNode := c.numOfVMIMForThisSourceNode(vmisOnNode, runningMigrations)
maxParallelMigrationsPerOutboundNode :=
int(*c.clusterConfig.GetMigrationConfiguration().ParallelOutboundMigrationsPerNode)
maxParallelMigrations := int(*c.clusterConfig.GetMigrationConfiguration().ParallelMigrationsPerCluster)
freeSpotsPerCluster := maxParallelMigrations - len(runningMigrations)
freeSpotsPerThisSourceNode := maxParallelMigrationsPerOutboundNode - activeMigrationsFromThisSourceNode
freeSpots := int(math.Min(float64(freeSpotsPerCluster), float64(freeSpotsPerThisSourceNode)))
if freeSpots <= 0 {
c.Queue.AddAfter(node.Name, 5*time.Second)
return nil
}
diff := int(math.Min(float64(freeSpots), float64(len(migrationCandidates))))
remaining := freeSpots - diff
remainingForNonMigrateableDiff := int(math.Min(float64(remaining), float64(len(nonMigrateable))))
if remainingForNonMigrateableDiff > 0 {
// for all non-migrating VMIs which would get e spot emit a warning
for _, vmi := range nonMigrateable[0:remainingForNonMigrateableDiff] {
c.recorder.Eventf(vmi, k8sv1.EventTypeNormal, FailedCreateVirtualMachineInstanceMigrationReason, "VirtualMachineInstance is not migrateable")
}
}
if diff == 0 {
if remainingForNonMigrateableDiff > 0 {
// Let's ensure that some warnings will stay in the event log and periodically update
// In theory the warnings could disappear after one hour if nothing else updates
c.Queue.AddAfter(node.Name, 1*time.Minute)
}
// nothing to do
return nil
}
// TODO: should the order be randomized?
selectedCandidates := migrationCandidates[0:diff]
log.DefaultLogger().Infof("node: %v, migrations: %v, candidates: %v, selected: %v", node.Name, len(activeMigrations), len(migrationCandidates), len(selectedCandidates))
wg := &sync.WaitGroup{}
wg.Add(diff)
errChan := make(chan error, diff)
c.migrationExpectations.ExpectCreations(node.Name, diff)
for _, vmi := range selectedCandidates {
go func(vmi *virtv1.VirtualMachineInstance) {
defer wg.Done()
createdMigration, err := c.clientset.VirtualMachineInstanceMigration(vmi.Namespace).Create(context.Background(), GenerateNewMigration(vmi.Name, node.Name), v1.CreateOptions{})
if err != nil {
c.migrationExpectations.CreationObserved(node.Name)
c.recorder.Eventf(vmi, k8sv1.EventTypeWarning, FailedCreateVirtualMachineInstanceMigrationReason, "Error creating a Migration: %v", err)
errChan <- err
return
} else {
c.recorder.Eventf(vmi, k8sv1.EventTypeNormal, SuccessfulCreateVirtualMachineInstanceMigrationReason, "Created Migration %s", createdMigration.Name)
}
}(vmi)
}
wg.Wait()
select {
case err := <-errChan:
return err
default:
}
return nil
}
descheduler in kubevirt
kubevirt 对 descheduler 逻辑有一定的支持,主要如下:
首先上文可知,
- 当pod 驱逐 需要kubevirt 执行迁移操作时,webhook会返回429
- EvacuationController 创建的 VirtualMachineInstanceMigration 会添加
kubevirt.io/evacuationMigration
这个annotation。
其次:
- 在migrationcontroller中 新的 VirtualMachineInstanceMigration 如果包含
kubevirt.io/evacuationMigration
这个annotation。则会给 source pod 添加descheduler.alpha.kubernetes.io/eviction-in-progress
这个annotations。当迁移失败后,将相对应source pod 上的这个annotation remove。 - 在vmi controller 创建 launcher pod 时添加
descheduler.alpha.kubernetes.io/request-evict-only
annotations。
由于kubevirt 的webhook会对相关的迁移操作返回false, 这会导致descheduler 驱逐其他pod, 因此导致集群状态异常。
kubevirt 希望 descheduler 添加相关逻辑。
descheduler.alpha.kubernetes.io/request-evict-only
表示descheduler 将对有这个ann的pod驱逐请求返回false 时特殊处理。
而descheduler.alpha.kubernetes.io/eviction-in-progress
表示迁移正在进行,descheduler 不会驱逐请求失败对相关的驱逐控制计数器重新计数。
这部分逻辑descheduler 并未实现,目前为止descheduler 社区还在讨论这种实现的可行性。
参考文档:
https://github.com/kubevirt/kubevirt/blob/main/docs/handling-eviction-requests.md
https://github.com/kubevirt/community/pull/258
https://github.com/kubevirt/community/blob/main/design-proposals/descheduler-support.md
https://github.com/kubernetes-sigs/descheduler/pull/1354
https://github.com/kubernetes-sigs/descheduler/blob/master/keps/753-descheduling-framework/README.md