在 dv clone 中,dv 的 Spec.progress 字段显示了当前的克隆进度,这部分代码主要使用了 prometheus 监控克隆流程,本文将从 cloner pod 以及 clone-controller 这两个监控数据生成方和数据接受和展示方进行分析。
对于porgress 数据的生成方 cloner pod 主要的代码逻辑位于cmd/cdi-cloner/clone-source.go,在main() 函数中
1
|
reader := pipeToSnappy(createProgressReader(getInputStream(preallocation), ownerUID, uploadBytes))
|
这段代码返回了一个 io.ReadCloser,这个io.ReadCloser是由getInputStream(preallocation)获得的,source data 进行读取,然后经过两层包装,分别加上了监控以及压缩。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
//cmd/cdi-cloner/clone-source.go
func getInputStream(preallocation bool) (rc io.ReadCloser) {
var err error
switch contentType {
case "filesystem-clone":
rc, err = newTarReader(preallocation)
if err != nil {
klog.Fatalf("Error creating tar reader for %q: %+v", mountPoint, err)
}
case "blockdevice-clone":
rc, err = os.Open(mountPoint)
if err != nil {
klog.Fatalf("Error opening block device %q: %+v", mountPoint, err)
}
default:
klog.Fatalf("Invalid content-type %q", contentType)
}
return
}
|
createProgressReader()为reader 加上了监听器,封装成util.CountingReader 再进一步封装成prometheusutil.NewProgressReader 这部分代码在 pkg/util/prometheus/prometheus.go 以及pkg/util/util.go中,其中util.CountingReader的Read方法会同步更新current和done字段,current表示已经读取的量done表示是否读取完成。
StartTimedUpdate 将每秒更新util.CountingReader 已经读取了的字节数,progress是以读取字节数与total字节数的百分比。并且add到监控指标progress中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
//pkg/util/util.go
// CountingReader is a reader that keeps track of how much has been read
type CountingReader struct {
Reader io.ReadCloser
Current uint64
Done bool
}
// Read reads bytes from the stream and updates the prometheus clone_progress metric according to the progress.
func (r *CountingReader) Read(p []byte) (n int, err error) {
n, err = r.Reader.Read(p)
r.Current += uint64(n)
r.Done = err == io.EOF
return n, err
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
//pkg/util/prometheus/prometheus.go
func (r *ProgressReader) updateProgress() bool {
if r.total > 0 {
finished := r.final && r.Done
currentProgress := 100.0
if !finished && r.Current < r.total {
currentProgress = float64(r.Current) / float64(r.total) * 100.0
}
metric := &dto.Metric{}
r.progress.WithLabelValues(r.ownerUID).Write(metric)
if currentProgress > *metric.Counter.Value {
r.progress.WithLabelValues(r.ownerUID).Add(currentProgress - *metric.Counter.Value)
}
klog.V(1).Infoln(fmt.Sprintf("%.2f", currentProgress))
return !finished
}
return false
}
|
pipeToSnappy方法继续封装,实现了存储块级别的分段压缩。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
//cmd/cdi-cloner/clone-source.go
func pipeToSnappy(reader io.ReadCloser) io.ReadCloser {
pr, pw := io.Pipe()
sbw := snappy.NewBufferedWriter(pw)
go func() {
n, err := io.Copy(sbw, reader)
if err != nil {
klog.Fatalf("Error %s piping to gzip", err)
}
if err = sbw.Close(); err != nil {
klog.Fatalf("Error closing snappy writer %+v", err)
}
if err = pw.Close(); err != nil {
klog.Fatalf("Error closing pipe writer %+v", err)
}
klog.Infof("Wrote %d bytes\n", n)
}()
return pr
}
|
接下来在 main() 函数 中 startPrometheus() 启动了一个服务器。
1
2
3
4
5
6
7
8
9
|
//cmd/cdi-cloner/clone-source.go
func startPrometheus() {
certsDirectory, err := ioutil.TempDir("", "certsdir")
if err != nil {
klog.Fatalf("Error %s creating temp dir", err)
}
prometheusutil.StartPrometheusEndpoint(certsDirectory)
}
|
更里层的代码向外暴露8443端口。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
//pkg/util/prometheus/prometheus.go
// StartPrometheusEndpoint starts an http server providing a prometheus endpoint using the passed
// in directory to store the self signed certificates that will be generated before starting the
// http server.
func StartPrometheusEndpoint(certsDirectory string) {
certBytes, keyBytes, err := cert.GenerateSelfSignedCertKey("cloner_target", nil, nil)
if err != nil {
klog.Error("Error generating cert for prometheus")
return
}
certFile := path.Join(certsDirectory, "tls.crt")
if err = ioutil.WriteFile(certFile, certBytes, 0600); err != nil {
klog.Error("Error writing cert file")
return
}
keyFile := path.Join(certsDirectory, "tls.key")
if err = ioutil.WriteFile(keyFile, keyBytes, 0600); err != nil {
klog.Error("Error writing key file")
return
}
go func() {
http.Handle("/metrics", promhttp.Handler())
if err := http.ListenAndServeTLS(":8443", certFile, keyFile, nil); err != nil {
return
}
}()
}
|
progress 数据的接受方是 dv ,代码主要位于 pkg/controller/datavolume-controller.go 的 reconcileProgressUpdate 中调用了updateProgressUsingPod 方法,首先从相关 pod 的 endpoint 拿到数据,然后进行过滤,得到具体数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
//pkg/controller/datavolume-controller.go
func (r *DatavolumeReconciler) reconcileProgressUpdate(datavolume *cdiv1.DataVolume, pvc *corev1.PersistentVolumeClaim) (reconcile.Result, error) {
var podNamespace string
if datavolume.Status.Progress == "" {
datavolume.Status.Progress = "N/A"
}
if datavolume.Spec.Source.PVC != nil {
podNamespace = datavolume.Spec.Source.PVC.Namespace
} else {
podNamespace = datavolume.Namespace
}
if datavolume.Status.Phase == cdiv1.Succeeded || datavolume.Status.Phase == cdiv1.Failed {
// Data volume completed progress, or failed, either way stop queueing the data volume.
r.log.Info("Datavolume finished, no longer updating progress", "Namespace", datavolume.Namespace, "Name", datavolume.Name, "Phase", datavolume.Status.Phase)
return reconcile.Result{}, nil
}
pod, err := r.getPodFromPvc(podNamespace, pvc)
if err == nil {
if pod.Status.Phase != corev1.PodRunning {
// Avoid long timeouts and error traces from HTTP get when pod is already gone
return reconcile.Result{}, nil
}
if err := updateProgressUsingPod(datavolume, pod); err != nil {
return reconcile.Result{}, err
}
}
// We are not done yet, force a re-reconcile in 2 seconds to get an update.
return reconcile.Result{RequeueAfter: 2 * time.Second}, nil
}
|
process数据更新,需要请求相关接口获取最新数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
func updateProgressUsingPod(dataVolumeCopy *cdiv1.DataVolume, pod *corev1.Pod) error {
httpClient := buildHTTPClient()
// Example value: import_progress{ownerUID="b856691e-1038-11e9-a5ab-525500d15501"} 13.45
var importRegExp = regexp.MustCompile("progress\\{ownerUID\\=\"" + string(dataVolumeCopy.UID) + "\"\\} (\\d{1,3}\\.?\\d*)")
port, err := getPodMetricsPort(pod)
if err == nil && pod.Status.PodIP != "" {
url := fmt.Sprintf("https://%s:%d/metrics", pod.Status.PodIP, port)
resp, err := httpClient.Get(url)
if err != nil {
if errConnectionRefused(err) {
return nil
}
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
match := importRegExp.FindStringSubmatch(string(body))
if match == nil {
// No match
return nil
}
if f, err := strconv.ParseFloat(match[1], 64); err == nil {
dataVolumeCopy.Status.Progress = cdiv1.DataVolumeProgress(fmt.Sprintf("%.2f%%", f))
}
return nil
}
return err
}
|