驱逐launcher pod与kubevirt虚机迁移的关系

在k8s 中驱逐launcher pod 的操作可以转换成对虚机的热迁移。总的来说是通过k8s 的webhook 机制,当 pod 驱逐动作发起后会给vmi添加相关字段,出发热迁移操作。这里我们介绍对launcher pod 进行evict 操作后,kubevirt 侧做了什么。

DisruptionBudget for launcher pod

为了让launcher pod 在驱逐中不受影响,kubevirt 为配置了相关驱逐迁移策略的launcher pod 添加DisruptionBudget,MinAvailable 为1。相关逻辑在pkg/virt-controller/watch/drain/disruptionbudget/disruptionbudget.go,因此相关launcher pod 并不会被驱逐。

ValidatingWebhookConfiguration

部署ValidatingWebhookConfiguration

virt-operator 将在部署kubevirt 阶段创建virt-api-validator 的ValidatingWebhookConfiguration

pkg/virt-operator/resource/generate/components/webhooks.go

apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
...
  name: virt-api-validator
..
webhooks:
- admissionReviewVersions:
  - v1
  - v1beta1
  clientConfig:
    caBundle: 
    ...
    service:
      name: virt-api
      namespace: kubevirt
      path: /launcher-eviction-validate
      port: 443
  failurePolicy: Ignore
  matchPolicy: Equivalent
  name: virt-launcher-eviction-interceptor.kubevirt.io
  namespaceSelector: {}
  objectSelector: {}
  rules:
  - apiGroups:
    - ""
    apiVersions:
    - v1
    operations:
    - '*'
    resources:
    - pods/eviction
    scope: '*'
  sideEffects: NoneOnDryRun
  timeoutSeconds: 10

virt-api webhook 实现

virt-api 侧实现该webhook

//pkg/virt-api/webhooks/validating-webhook/validating-webhook.go
func ServePodEvictionInterceptor(resp http.ResponseWriter, req *http.Request, clusterConfig *virtconfig.ClusterConfig, virtCli kubecli.KubevirtClient) {
   validating_webhooks.Serve(resp, req, admitters.NewPodEvictionAdmitter(clusterConfig, virtCli, virtCli.GeneratedKubeVirtClient()))
}
//pkg/util/webhooks/validating-webhooks/validating-webhook.go
//webhook server 实现
func Serve(resp http.ResponseWriter, req *http.Request, admitter Admitter) {
   review, err := webhooks.GetAdmissionReview(req)
   if err != nil {
      resp.WriteHeader(http.StatusBadRequest)
      return
   }

   response := admissionv1.AdmissionReview{
      TypeMeta: v1.TypeMeta{
         // match the request version to be
         // backwards compatible with v1beta1
         APIVersion: review.APIVersion,
         Kind:       "AdmissionReview",
      },
   }
   //具体的webhook逻辑
   reviewResponse := admitter.Admit(review)
   if reviewResponse != nil {
      response.Response = reviewResponse
      response.Response.UID = review.Request.UID
   }
   // reset the Object and OldObject, they are not needed in admitter response.
   review.Request.Object = runtime.RawExtension{}
   review.Request.OldObject = runtime.RawExtension{}

   responseBytes, err := json.Marshal(response)
   if err != nil {
      log.Log.Reason(err).Errorf("failed json encode webhook response")
      resp.WriteHeader(http.StatusBadRequest)
      return
   }
   if _, err := resp.Write(responseBytes); err != nil {
      log.Log.Reason(err).Errorf("failed to write webhook response")
      resp.WriteHeader(http.StatusBadRequest)
      return
   }
}

其中PodEvictionAdmitter 实现了具体的webhook逻辑,如果是launcher pod,就去查看对应vmi的EvictionStrategy(cluster config中可进行配置),当EvictionStrategy 策略存在时,markVMI对vmi 的/status/evacuationNodeName 进行patch ,将值设为vmi.Status.NodeName

//pkg/virt-api/webhooks/validating-webhook/admitters/pod-eviction-admitter.go
func (admitter *PodEvictionAdmitter) Admit(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {

   pod, err := admitter.kubeClient.CoreV1().Pods(ar.Request.Namespace).Get(context.Background(), ar.Request.Name, metav1.GetOptions{})
   if err != nil {
      return validating_webhooks.NewPassingAdmissionResponse()
   }
//筛选出launcher pod
   if !isVirtLauncher(pod) {
      return validating_webhooks.NewPassingAdmissionResponse()
   }

   vmiName, exists := pod.GetAnnotations()[virtv1.DomainAnnotation]
   if !exists {
      return validating_webhooks.NewPassingAdmissionResponse()
   }

   vmi, err := admitter.virtClient.KubevirtV1().VirtualMachineInstances(ar.Request.Namespace).Get(context.Background(), vmiName, metav1.GetOptions{})
   if err != nil {
      return denied(fmt.Sprintf("kubevirt failed getting the vmi: %s", err.Error()))
   }

   evictionStrategy := migrations.VMIEvictionStrategy(admitter.clusterConfig, vmi)
   if evictionStrategy == nil {
      // we don't act on VMIs without an eviction strategy
      return validating_webhooks.NewPassingAdmissionResponse()
   }

   markForEviction := false

   switch *evictionStrategy {
   case virtv1.EvictionStrategyLiveMigrate:
      if !vmi.IsMigratable() {
         return denied(fmt.Sprintf("VMI %s is configured with an eviction strategy but is not live-migratable", vmi.Name))
      }
      markForEviction = true
   case virtv1.EvictionStrategyLiveMigrateIfPossible:
      if vmi.IsMigratable() {
         markForEviction = true
      }
   case virtv1.EvictionStrategyExternal:
      markForEviction = true
   }

   if markForEviction && !vmi.IsMarkedForEviction() && vmi.Status.NodeName == pod.Spec.NodeName {
      dryRun := ar.Request.DryRun != nil && *ar.Request.DryRun == true
      err := admitter.markVMI(vmi.Namespace, vmi.Name, vmi.Status.NodeName, dryRun)
      if err != nil {
         // As with the previous case, it is up to the user to issue a retry.
         return denied(fmt.Sprintf("kubevirt failed marking the vmi for eviction: %s", err.Error()))
      }

      return denied(fmt.Sprintf("Eviction triggered evacuation of VMI \"%s/%s\"", vmi.Namespace, vmi.Name))
   }

   // We can let the request go through because the pod is protected by a PDB if the VMI wants to be live-migrated on
   // eviction. Otherwise, we can just evict it.
   return validating_webhooks.NewPassingAdmissionResponse()
}

细节上,当pod 驱逐 需要kubevirt 执行迁移操作时denied 中返回码为429

func denied(message string) *admissionv1.AdmissionResponse {
   return &admissionv1.AdmissionResponse{
      Allowed: false,
      Result: &metav1.Status{
         Message: message,
         Code:    http.StatusTooManyRequests,
      },
   }
}

EvacuationController

EvacuationController属于virt-controller,包括vmiInformer、migrationInformer、nodeInformer、vmiPodInformer,相关资源变动后会将相关联的node加入queue。

具体处理逻辑在sync 中:

首先统计出节点上需要迁移的vmi(node taint,vmi.status.evacuationNodeName 等),然后根据现存的迁移数量,为需要迁移的虚机创建 VirtualMachineInstanceMigration crd。其上会添加 kubevirt.io/evacuationMigration 这个annotation。并会用于informer的addMigration 中将有该ann的VirtualMachineInstanceMigration 所对应的node入队。

//pkg/virt-controller/watch/drain/evacuation/evacuation.go
func (c *EvacuationController) sync(node *k8sv1.Node, vmisOnNode []*virtv1.VirtualMachineInstance, activeMigrations []*virtv1.VirtualMachineInstanceMigration) error {
   // If the node has no drain taint, we have nothing to do
   taintKey := *c.clusterConfig.GetMigrationConfiguration().NodeDrainTaintKey
   taint := &k8sv1.Taint{
      Key:    taintKey,
      Effect: k8sv1.TaintEffectNoSchedule,
   }

   vmisToMigrate := vmisToMigrate(node, vmisOnNode, taint)
   if len(vmisToMigrate) == 0 {
      return nil
   }

   migrationCandidates, nonMigrateable := c.filterRunningNonMigratingVMIs(vmisToMigrate, activeMigrations)
   if len(migrationCandidates) == 0 && len(nonMigrateable) == 0 {
      return nil
   }

   runningMigrations := migrationutils.FilterRunningMigrations(activeMigrations)
   activeMigrationsFromThisSourceNode := c.numOfVMIMForThisSourceNode(vmisOnNode, runningMigrations)
   maxParallelMigrationsPerOutboundNode :=
      int(*c.clusterConfig.GetMigrationConfiguration().ParallelOutboundMigrationsPerNode)
   maxParallelMigrations := int(*c.clusterConfig.GetMigrationConfiguration().ParallelMigrationsPerCluster)
   freeSpotsPerCluster := maxParallelMigrations - len(runningMigrations)
   freeSpotsPerThisSourceNode := maxParallelMigrationsPerOutboundNode - activeMigrationsFromThisSourceNode
   freeSpots := int(math.Min(float64(freeSpotsPerCluster), float64(freeSpotsPerThisSourceNode)))
   if freeSpots <= 0 {
      c.Queue.AddAfter(node.Name, 5*time.Second)
      return nil
   }

   diff := int(math.Min(float64(freeSpots), float64(len(migrationCandidates))))
   remaining := freeSpots - diff
   remainingForNonMigrateableDiff := int(math.Min(float64(remaining), float64(len(nonMigrateable))))

   if remainingForNonMigrateableDiff > 0 {
      // for all non-migrating VMIs which would get e spot emit a warning
      for _, vmi := range nonMigrateable[0:remainingForNonMigrateableDiff] {
         c.recorder.Eventf(vmi, k8sv1.EventTypeNormal, FailedCreateVirtualMachineInstanceMigrationReason, "VirtualMachineInstance is not migrateable")
      }

   }

   if diff == 0 {
      if remainingForNonMigrateableDiff > 0 {
         // Let's ensure that some warnings will stay in the event log and periodically update
         // In theory the warnings could disappear after one hour if nothing else updates
         c.Queue.AddAfter(node.Name, 1*time.Minute)
      }
      // nothing to do
      return nil
   }

   // TODO: should the order be randomized?
   selectedCandidates := migrationCandidates[0:diff]

   log.DefaultLogger().Infof("node: %v, migrations: %v, candidates: %v, selected: %v", node.Name, len(activeMigrations), len(migrationCandidates), len(selectedCandidates))

   wg := &sync.WaitGroup{}
   wg.Add(diff)

   errChan := make(chan error, diff)

   c.migrationExpectations.ExpectCreations(node.Name, diff)
   for _, vmi := range selectedCandidates {
      go func(vmi *virtv1.VirtualMachineInstance) {
         defer wg.Done()
         createdMigration, err := c.clientset.VirtualMachineInstanceMigration(vmi.Namespace).Create(context.Background(), GenerateNewMigration(vmi.Name, node.Name), v1.CreateOptions{})
         if err != nil {
            c.migrationExpectations.CreationObserved(node.Name)
            c.recorder.Eventf(vmi, k8sv1.EventTypeWarning, FailedCreateVirtualMachineInstanceMigrationReason, "Error creating a Migration: %v", err)
            errChan <- err
            return
         } else {
            c.recorder.Eventf(vmi, k8sv1.EventTypeNormal, SuccessfulCreateVirtualMachineInstanceMigrationReason, "Created Migration %s", createdMigration.Name)
         }
      }(vmi)
   }

   wg.Wait()

   select {
   case err := <-errChan:
      return err
   default:
   }
   return nil
}

descheduler in kubevirt

kubevirt 对 descheduler 逻辑有一定的支持,主要如下:

首先上文可知,

  1. 当pod 驱逐 需要kubevirt 执行迁移操作时,webhook会返回429
  2. EvacuationController 创建的 VirtualMachineInstanceMigration 会添加 kubevirt.io/evacuationMigration 这个annotation。

其次:

  1. 在migrationcontroller中 新的 VirtualMachineInstanceMigration 如果包含kubevirt.io/evacuationMigration 这个annotation。则会给 source pod 添加descheduler.alpha.kubernetes.io/eviction-in-progress 这个annotations。当迁移失败后,将相对应source pod 上的这个annotation remove。
  2. 在vmi controller 创建 launcher pod 时添加descheduler.alpha.kubernetes.io/request-evict-only annotations。

由于kubevirt 的webhook会对相关的迁移操作返回false, 这会导致descheduler 驱逐其他pod, 因此导致集群状态异常。

kubevirt 希望 descheduler 添加相关逻辑。

descheduler.alpha.kubernetes.io/request-evict-only表示descheduler 将对有这个ann的pod驱逐请求返回false 时特殊处理。

descheduler.alpha.kubernetes.io/eviction-in-progress 表示迁移正在进行,descheduler 不会驱逐请求失败对相关的驱逐控制计数器重新计数。

这部分逻辑descheduler 并未实现,目前为止descheduler 社区还在讨论这种实现的可行性。

参考文档:

https://github.com/kubevirt/kubevirt/blob/main/docs/handling-eviction-requests.md

https://github.com/kubevirt/community/pull/258

https://github.com/kubevirt/community/blob/main/design-proposals/descheduler-support.md

https://github.com/kubernetes-sigs/descheduler/pull/1354

https://github.com/kubernetes-sigs/descheduler/blob/master/keps/753-descheduling-framework/README.md

updatedupdated2024-06-192024-06-19