cdi DV process 展示功能代码分析

在 dv clone 中,dv 的 Spec.progress 字段显示了当前的克隆进度,这部分代码主要使用了 prometheus 监控克隆流程,本文将从 cloner pod 以及 clone-controller 这两个监控数据生成方和数据接受和展示方进行分析。

数据生成

对于porgress 数据的生成方 cloner pod 主要的代码逻辑位于cmd/cdi-cloner/clone-source.go,在main() 函数中

1
reader := pipeToSnappy(createProgressReader(getInputStream(preallocation), ownerUID, uploadBytes))

这段代码返回了一个 io.ReadCloser,这个io.ReadCloser是由getInputStream(preallocation)获得的,source data 进行读取,然后经过两层包装,分别加上了监控以及压缩。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
//cmd/cdi-cloner/clone-source.go
func getInputStream(preallocation bool) (rc io.ReadCloser) {
   var err error
   switch contentType {
   case "filesystem-clone":
      rc, err = newTarReader(preallocation)
      if err != nil {
         klog.Fatalf("Error creating tar reader for %q: %+v", mountPoint, err)
      }
   case "blockdevice-clone":
      rc, err = os.Open(mountPoint)
      if err != nil {
         klog.Fatalf("Error opening block device %q: %+v", mountPoint, err)
      }
   default:
      klog.Fatalf("Invalid content-type %q", contentType)
   }
   return
}

createProgressReader

createProgressReader()为reader 加上了监听器,封装成util.CountingReader 再进一步封装成prometheusutil.NewProgressReader 这部分代码在 pkg/util/prometheus/prometheus.go 以及pkg/util/util.go中,其中util.CountingReader的Read方法会同步更新current和done字段,current表示已经读取的量done表示是否读取完成。

StartTimedUpdate 将每秒更新util.CountingReader 已经读取了的字节数,progress是以读取字节数与total字节数的百分比。并且add到监控指标progress中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
//pkg/util/util.go
// CountingReader is a reader that keeps track of how much has been read
type CountingReader struct {
   Reader  io.ReadCloser
   Current uint64
   Done    bool
}
// Read reads bytes from the stream and updates the prometheus clone_progress metric according to the progress.
func (r *CountingReader) Read(p []byte) (n int, err error) {
   n, err = r.Reader.Read(p)
   r.Current += uint64(n)
   r.Done = err == io.EOF
   return n, err
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
//pkg/util/prometheus/prometheus.go
func (r *ProgressReader) updateProgress() bool {
   if r.total > 0 {
      finished := r.final && r.Done
      currentProgress := 100.0
      if !finished && r.Current < r.total {
         currentProgress = float64(r.Current) / float64(r.total) * 100.0
      }
      metric := &dto.Metric{}
      r.progress.WithLabelValues(r.ownerUID).Write(metric)
      if currentProgress > *metric.Counter.Value {
         r.progress.WithLabelValues(r.ownerUID).Add(currentProgress - *metric.Counter.Value)
      }
      klog.V(1).Infoln(fmt.Sprintf("%.2f", currentProgress))
      return !finished
   }
   return false
}

pipeToSnappy

pipeToSnappy方法继续封装,实现了存储块级别的分段压缩。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
//cmd/cdi-cloner/clone-source.go
func pipeToSnappy(reader io.ReadCloser) io.ReadCloser {
   pr, pw := io.Pipe()
   sbw := snappy.NewBufferedWriter(pw)

   go func() {
      n, err := io.Copy(sbw, reader)
      if err != nil {
         klog.Fatalf("Error %s piping to gzip", err)
      }
      if err = sbw.Close(); err != nil {
         klog.Fatalf("Error closing snappy writer %+v", err)
      }
      if err = pw.Close(); err != nil {
         klog.Fatalf("Error closing pipe writer %+v", err)
      }
      klog.Infof("Wrote %d bytes\n", n)
   }()

   return pr
}

startPrometheus

接下来在 main() 函数 中 startPrometheus() 启动了一个服务器。

1
2
3
4
5
6
7
8
9
//cmd/cdi-cloner/clone-source.go
func startPrometheus() {
   certsDirectory, err := ioutil.TempDir("", "certsdir")
   if err != nil {
      klog.Fatalf("Error %s creating temp dir", err)
   }

   prometheusutil.StartPrometheusEndpoint(certsDirectory)
}

更里层的代码向外暴露8443端口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//pkg/util/prometheus/prometheus.go
// StartPrometheusEndpoint starts an http server providing a prometheus endpoint using the passed
// in directory to store the self signed certificates that will be generated before starting the
// http server.
func StartPrometheusEndpoint(certsDirectory string) {
   certBytes, keyBytes, err := cert.GenerateSelfSignedCertKey("cloner_target", nil, nil)
   if err != nil {
      klog.Error("Error generating cert for prometheus")
      return
   }

   certFile := path.Join(certsDirectory, "tls.crt")
   if err = ioutil.WriteFile(certFile, certBytes, 0600); err != nil {
      klog.Error("Error writing cert file")
      return
   }

   keyFile := path.Join(certsDirectory, "tls.key")
   if err = ioutil.WriteFile(keyFile, keyBytes, 0600); err != nil {
      klog.Error("Error writing key file")
      return
   }

   go func() {
      http.Handle("/metrics", promhttp.Handler())
      if err := http.ListenAndServeTLS(":8443", certFile, keyFile, nil); err != nil {
         return
      }
   }()
}

数据接收与展示

progress 数据的接受方是 dv ,代码主要位于 pkg/controller/datavolume-controller.go 的 reconcileProgressUpdate 中调用了updateProgressUsingPod 方法,首先从相关 pod 的 endpoint 拿到数据,然后进行过滤,得到具体数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//pkg/controller/datavolume-controller.go
func (r *DatavolumeReconciler) reconcileProgressUpdate(datavolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) (reconcile.Result, error) {
   var podNamespace string
   if datavolume.Status.Progress == "" {
      datavolume.Status.Progress = "N/A"
   }

   if datavolume.Spec.Source.PVC != nil {
      podNamespace = datavolume.Spec.Source.PVC.Namespace
   } else {
      podNamespace = datavolume.Namespace
   }

   if datavolume.Status.Phase == cdiv1.Succeeded || datavolume.Status.Phase == cdiv1.Failed {
      // Data volume completed progress, or failed, either way stop queueing the data volume.
      r.log.Info("Datavolume finished, no longer updating progress", "Namespace", datavolume.Namespace, "Name", datavolume.Name, "Phase", datavolume.Status.Phase)
      return reconcile.Result{}, nil
   }
   pod, err := r.getPodFromPvc(podNamespace, pvc)
   if err == nil {
      if pod.Status.Phase != corev1.PodRunning {
         // Avoid long timeouts and error traces from HTTP get when pod is already gone
         return reconcile.Result{}, nil
      }
      if err := updateProgressUsingPod(datavolume, pod); err != nil {
         return reconcile.Result{}, err
      }
   }
   // We are not done yet, force a re-reconcile in 2 seconds to get an update.
   return reconcile.Result{RequeueAfter: 2 * time.Second}, nil
}

process数据更新,需要请求相关接口获取最新数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func updateProgressUsingPod(dataVolumeCopy *cdiv1.DataVolume, pod *corev1.Pod) error {
   httpClient := buildHTTPClient()
   // Example value: import_progress{ownerUID="b856691e-1038-11e9-a5ab-525500d15501"} 13.45
   var importRegExp = regexp.MustCompile("progress\\{ownerUID\\=\"" + string(dataVolumeCopy.UID) + "\"\\} (\\d{1,3}\\.?\\d*)")

   port, err := getPodMetricsPort(pod)
   if err == nil && pod.Status.PodIP != "" {
      url := fmt.Sprintf("https://%s:%d/metrics", pod.Status.PodIP, port)
      resp, err := httpClient.Get(url)
      if err != nil {
         if errConnectionRefused(err) {
            return nil
         }
         return err
      }
      defer resp.Body.Close()
      body, err := ioutil.ReadAll(resp.Body)
      if err != nil {
         return err
      }

      match := importRegExp.FindStringSubmatch(string(body))
      if match == nil {
         // No match
         return nil
      }
      if f, err := strconv.ParseFloat(match[1], 64); err == nil {
         dataVolumeCopy.Status.Progress = cdiv1.DataVolumeProgress(fmt.Sprintf("%.2f%%", f))
      }
      return nil
   }
   return err
}
updatedupdated2023-02-282023-02-28