cdi 镜像导入流程分析

cdi 的基本功能是将镜像源中的镜像导入某个 dv 底层的同名 pvc 作为虚机使用的操作系统镜像。大体流程是从 dv-controller 创建 pvc 然后在 import-controller 中通过 pkg/importer/datasource 将 source 中的内容处理成标准的source,将所有内容处理后创建 importer pod 去完成处理。这里以 http 方式为例,梳理导入流程。

import 流程

流程开始我们创建了一个 source 为 http 的dv。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
apiVersion: cdi.kubevirt.io/v1beta1
kind: DataVolume
metadata:
  name: "example-import-dv"
spec:
  source:
      http:
         url: "https://download.cirros-cloud.net/0.4.0/cirros-0.4.0-x86_64-disk.img" # Or S3
         secretRef: "" # Optional
         certConfigMap: "" # Optional
  pvc:
    accessModes:
      - ReadWriteOnce  #ReadWriteOnce、ReadOnlyMany和ReadWriteMany
    resources:
      requests:
        storage: 20Gi
storageClassName: ***   #固定值
volumeMode: Block

createImporterPod

在datavolume-controller.go 上文中提到source不为pvc将使用dv中spec.pvc字段为其创建pvc。并且进入reconcileDataVolumeStatus更改dv状态。

然后进入import流程,import-controller中监视的应该是拥有controller的pvc。

在在reconcile逻辑中,我们首先获得那个pvc,然后判断是否需要进入进一步的reconcilePvc,正常是会进入的。

在 import-controller 的 reconcilePvc 流程中,首先去找与 pvc 对应的 pod,若不存在根据 pvc 创建 podr.createImporterPod(pvc)

在 createImporterPod 中首先 r.createImportEnvVar(pvc) 创建相关的环境变量以及 podArgs,然后create pod, err := createImporterPod(r.log, r.client, podArgs, r.installerLabels)。

ImporterPod

Importerpod 的创建使用了 importer 相关的代码,入口在 /cmd/cdi-importer/importer.go 同一目录下的 bazel 脚本将相关程序打包成 importer 镜像。这里 init 函数比较简单。

在 main 函数中 http import 会进入 handleImport(source, contentType, volumeMode, imageSize, filesystemOverhead, preallocation) 这个流程继续。

在 handleImport 中将 source 转换为标准的 datasource(接口),并且创建dataprocessor。真正的处理流程在 processor.ProcessData() 方法中。

1
2
3
4
5
6
7
8
func handleImport
...
    ds := newDataSource(source, contentType, volumeMode)
    defer ds.Close()
//newDataProcessor
    processor := newDataProcessor(contentType, volumeMode, ds, imageSize, filesystemOverhead, preallocation)
    err := processor.ProcessData()
...

DataProcessor 和 datasource 的相关代码在 pkg/importer 目录下,processor.ProcessData() 会将 DataProcessor 中 phaseExecutors 这个 map是 processor 状态和执行函数的映射。

1
2
// phaseExecutors is a mapping from the given processing phase to its execution function. The function returns the next processing phase or error.
phaseExecutors map[ProcessingPhase]func() (ProcessingPhase, error)

DataProcessor

在NewDataProcessor()中会通过dp.initDefaultPhases()去注册一系列的函数并且和processer的Phase对应。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
func (dp *DataProcessor) initDefaultPhases() {
    dp.phaseExecutors = make(map[ProcessingPhase]func() (ProcessingPhase, error))
    dp.RegisterPhaseExecutor(ProcessingPhaseInfo, func() (ProcessingPhase, error) {
        pp, err := dp.source.Info()
        if err != nil {
            err = errors.Wrap(err, "Unable to obtain information about data source")
        }
        return pp, err
    })
    dp.RegisterPhaseExecutor(ProcessingPhaseTransferScratch, func() (ProcessingPhase, error) {
        pp, err := dp.source.Transfer(dp.scratchDataDir)
        if err == ErrInvalidPath {
            // Passed in invalid scratch space path, return scratch space needed error.
            err = ErrRequiresScratchSpace
        } else if err != nil {
            err = errors.Wrap(err, "Unable to transfer source data to scratch space")
        }
        return pp, err
    })
    dp.RegisterPhaseExecutor(ProcessingPhaseTransferDataDir, func() (ProcessingPhase, error) {
        pp, err := dp.source.Transfer(dp.dataDir)
        if err != nil {
            err = errors.Wrap(err, "Unable to transfer source data to target directory")
        }
        return pp, err
    })
    dp.RegisterPhaseExecutor(ProcessingPhaseTransferDataFile, func() (ProcessingPhase, error) {
        pp, err := dp.source.TransferFile(dp.dataFile)
        if err != nil {
            err = errors.Wrap(err, "Unable to transfer source data to target file")
        }
        return pp, err
    })
    dp.RegisterPhaseExecutor(ProcessingPhaseValidatePause, func() (ProcessingPhase, error) {
        pp := ProcessingPhasePause
        err := dp.validate(dp.source.GetURL())
        if err != nil {
            pp = ProcessingPhaseError
        }
        return pp, err
    })
    dp.RegisterPhaseExecutor(ProcessingPhaseConvert, func() (ProcessingPhase, error) {
        pp, err := dp.convert(dp.source.GetURL())
        if err != nil {
            err = errors.Wrap(err, "Unable to convert source data to target format")
        }
        return pp, err
    })
    dp.RegisterPhaseExecutor(ProcessingPhaseResize, func() (ProcessingPhase, error) {
        pp, err := dp.resize()
        if err != nil {
            err = errors.Wrap(err, "Unable to resize disk image to requested size")
        }
        return pp, err
    })
    dp.RegisterPhaseExecutor(ProcessingPhaseMergeDelta, func() (ProcessingPhase, error) {
        pp, err := dp.merge()
        if err != nil {
            err = errors.Wrap(err, "Unable to apply delta to base image")
        }
        return pp, err
    })
}

ProcessData()方法会通过dp.ProcessDataWithPause()去遍历所有注册的函数,完成process流程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// ProcessData is the main synchronous processing loop
func (dp *DataProcessor) ProcessData() error {
    if size, _ := util.GetAvailableSpace(dp.scratchDataDir); size > int64(0) {
        // Clean up before trying to write, in case a previous attempt left a mess. Note the deferred cleanup is intentional.
        if err := CleanDir(dp.scratchDataDir); err != nil {
            return errors.Wrap(err, "Failure cleaning up temporary scratch space")
        }
        // Attempt to be a good citizen and clean up my mess at the end.
        defer CleanDir(dp.scratchDataDir)
    }

    if size, _ := util.GetAvailableSpace(dp.dataDir); size > int64(0) && dp.needsDataCleanup {
        // Clean up data dir before trying to write in case a previous attempt failed and left some stuff behind.
        if err := CleanDir(dp.dataDir); err != nil {
            return errors.Wrap(err, "Failure cleaning up target space")
        }
    }
    return dp.ProcessDataWithPause()
}

从initDefaultPhases()中可以总结出默认的Phase有 info->TransferScratch->TransferDataDir->TransferDataFile-ValidatePause->Convert-Resize->MergeDelta。

DataSource

在 NewHTTPDataSource 中

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//cmd/cdi-importer/importer.go
ds, err := importer.NewHTTPDataSource(ep, acc, sec, certDir, cdiv1.DataVolumeContentType(contentType))
ep = https://download.cirros-cloud.net/0.4.0/cirros-0.4.0-x86_64-disk.img
contentType = kubevirt 
// pkg/importer/http-datasource.go
// NewHTTPDataSource creates a new instance of the http data provider.
func NewHTTPDataSource(endpoint, accessKey, secKey, certDir string, contentType cdiv1.DataVolumeContentType) (*HTTPDataSource, error) {
    ep, err := ParseEndpoint(endpoint)
    if err != nil {
        return nil, errors.Wrapf(err, fmt.Sprintf("unable to parse endpoint %q", endpoint))
    }
    ctx, cancel := context.WithCancel(context.Background())

    extraHeaders, secretExtraHeaders, err := getExtraHeaders()
    if err != nil {
        cancel()
        return nil, errors.Wrap(err, "Error getting extra headers for HTTP client")
    }
//这里将得到决定info中流程走向的一些变量
    httpReader, contentLength, brokenForQemuImg, err := createHTTPReader(ctx, ep, accessKey, secKey, certDir, extraHeaders, secretExtraHeaders)
    if err != nil {
        cancel()
        return nil, err
    }

    httpSource := &HTTPDataSource{
        ctx:              ctx,
        cancel:           cancel,
        httpReader:       httpReader,
        contentType:      contentType,
        endpoint:         ep,
        customCA:         certDir,
        brokenForQemuImg: brokenForQemuImg,
        contentLength:    contentLength,
    }
    httpSource.n = createNbdkitCurl(nbdkitPid, accessKey, secKey, certDir, nbdkitSocket, extraHeaders, secretExtraHeaders)
    // We know this is a counting reader, so no need to check.
    countingReader := httpReader.(*util.CountingReader)
    go httpSource.pollProgress(countingReader, 10*time.Minute, time.Second)
    return httpSource, nil
}

可以明确

  • contenttype = kubevirt
  • dest = /dev/cdi-block-volume
  • volumeMode = block

ProcessData(http)

ProcessData 代码是在 /pkg/importer 目录下各个 datasource 去具体实现的。这里以 http-datasource 为例。

ProcessData(http) 流程

Info 阶段将获得datasource的info,并且根据所用的contenttype决定后续行动。经过一系列的判断进入不同的流程。因此需要去分析datasource的参数。简单来说会对source资源进行测试,如果qume-img无法下载则使用TransferScratch要不然就是用TransferDataFile。

由于我们知道dest 是 /dev/cdi-block-volume所以使用了TransferDataFile。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (hs *HTTPDataSource) Transfer(path string) (ProcessingPhase, error) {
    if hs.contentType == cdiv1.DataVolumeKubeVirt {
        size, err := util.GetAvailableSpace(path)
        if size <= int64(0) {
            //Path provided is invalid.
            return ProcessingPhaseError, ErrInvalidPath
        }
        file := filepath.Join(path, tempFile)
        err = util.StreamDataToFile(hs.readers.TopReader(), file)
        if err != nil {
            return ProcessingPhaseError, err
        }
        // If we successfully wrote to the file, then the parse will succeed.
        hs.url, _ = url.Parse(file)
        return ProcessingPhaseConvert, nil
    }

由于contenttype是kubevirt ,在StreamDataToFile(hs.readers.TopReader(), file)中将io.Reader 中的数据传到相应的path中,datasource的url设为/dev/cdi-block-volume/tmpimage进入convert流程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// convert is called when convert the image from the url to a RAW disk image. Source formats include RAW/QCOW2 (Raw to raw conversion is a copy)
func (dp *DataProcessor) convert(url *url.URL) (ProcessingPhase, error) {
    err := dp.validate(url)
    if err != nil {
        return ProcessingPhaseError, err
    }
klog.V(3).Infoln("Converting to Raw")
//此处dataFile 为dest := getImporterDestPath(contentType, volumeMode)中设置的common.WriteBlockPath = /dev/cdi-block-volume
    err = qemuOperations.ConvertToRawStream(url, dp.dataFile, dp.preallocation)
    if err != nil {
        return ProcessingPhaseError, errors.Wrap(err, "Conversion to Raw failed")
    }
    dp.preallocationApplied = dp.preallocation

    return ProcessingPhaseResize, nil
}

ConvertToRawStream(url, dp.dataFile, dp.preallocation)在pkg/image/qemu.go中

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func convertToRaw(src, dest string, preallocate bool) error {
    args := []string{"convert", "-t", "writeback", "-p", "-O", "raw", src, dest}
    var err error

    if preallocate {
        err = addPreallocation(args, convertPreallocationMethods, func(args []string) ([]byte, error) {
            return qemuExecFunction(nil, reportProgress, "qemu-img", args...)
        })
    } else {
        klog.V(3).Infof("Running qemu-img convert with args: %v", args)
        _, err = qemuExecFunction(nil, reportProgress, "qemu-img", args...)
    }
    if err != nil {
        os.Remove(dest)
        errorMsg := "could not convert image to raw"
        if nbdkitLog, err := ioutil.ReadFile(common.NbdkitLogPath); err == nil {
            errorMsg += " " + string(nbdkitLog)
        }
        return errors.Wrap(err, errorMsg)
    }

    return nil
}

func (o *qemuOperations) ConvertToRawStream(url *url.URL, dest string, preallocate bool) error {
    if len(url.Scheme) > 0 && url.Scheme != "nbd+unix" {
        return fmt.Errorf("not valid schema %s", url.Scheme)
    }
    return convertToRaw(url.String(), dest, preallocate)
}

如果不存在问题则进入Resize流程,resize流程无问题则complete

updatedupdated2023-02-282023-02-28