cdi 的基本功能是将镜像源中的镜像导入某个 dv 底层的同名 pvc 作为虚机使用的操作系统镜像。大体流程是从 dv-controller 创建 pvc 然后在 import-controller 中通过 pkg/importer/datasource 将 source 中的内容处理成标准的source,将所有内容处理后创建 importer pod 去完成处理。这里以 http 方式为例,梳理导入流程。

流程开始我们创建了一个 source 为 http 的dv。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
apiVersion: cdi.kubevirt.io/v1beta1
kind: DataVolume
metadata:
name: "example-import-dv"
spec:
source:
http:
url: "https://download.cirros-cloud.net/0.4.0/cirros-0.4.0-x86_64-disk.img" # Or S3
secretRef: "" # Optional
certConfigMap: "" # Optional
pvc:
accessModes:
- ReadWriteOnce #ReadWriteOnce、ReadOnlyMany和ReadWriteMany
resources:
requests:
storage: 20Gi
storageClassName: *** #固定值
volumeMode: Block
|
在datavolume-controller.go 上文中提到source不为pvc将使用dv中spec.pvc字段为其创建pvc。并且进入reconcileDataVolumeStatus更改dv状态。
然后进入import流程,import-controller中监视的应该是拥有controller的pvc。
在在reconcile逻辑中,我们首先获得那个pvc,然后判断是否需要进入进一步的reconcilePvc,正常是会进入的。
在 import-controller 的 reconcilePvc 流程中,首先去找与 pvc 对应的 pod,若不存在根据 pvc 创建 podr.createImporterPod(pvc)
。
在 createImporterPod 中首先 r.createImportEnvVar(pvc) 创建相关的环境变量以及 podArgs,然后create pod, err := createImporterPod(r.log, r.client, podArgs, r.installerLabels)。
Importerpod 的创建使用了 importer 相关的代码,入口在 /cmd/cdi-importer/importer.go 同一目录下的 bazel 脚本将相关程序打包成 importer 镜像。这里 init 函数比较简单。
在 main 函数中 http import 会进入 handleImport(source, contentType, volumeMode, imageSize, filesystemOverhead, preallocation) 这个流程继续。
在 handleImport 中将 source 转换为标准的 datasource(接口),并且创建dataprocessor。真正的处理流程在 processor.ProcessData() 方法中。
1
2
3
4
5
6
7
8
|
func handleImport
...
ds := newDataSource(source, contentType, volumeMode)
defer ds.Close()
//newDataProcessor
processor := newDataProcessor(contentType, volumeMode, ds, imageSize, filesystemOverhead, preallocation)
err := processor.ProcessData()
...
|
DataProcessor 和 datasource 的相关代码在 pkg/importer 目录下,processor.ProcessData() 会将 DataProcessor 中 phaseExecutors 这个 map是 processor 状态和执行函数的映射。
1
2
|
// phaseExecutors is a mapping from the given processing phase to its execution function. The function returns the next processing phase or error.
phaseExecutors map[ProcessingPhase]func() (ProcessingPhase, error)
|
在NewDataProcessor()中会通过dp.initDefaultPhases()去注册一系列的函数并且和processer的Phase对应。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
func (dp *DataProcessor) initDefaultPhases() {
dp.phaseExecutors = make(map[ProcessingPhase]func() (ProcessingPhase, error))
dp.RegisterPhaseExecutor(ProcessingPhaseInfo, func() (ProcessingPhase, error) {
pp, err := dp.source.Info()
if err != nil {
err = errors.Wrap(err, "Unable to obtain information about data source")
}
return pp, err
})
dp.RegisterPhaseExecutor(ProcessingPhaseTransferScratch, func() (ProcessingPhase, error) {
pp, err := dp.source.Transfer(dp.scratchDataDir)
if err == ErrInvalidPath {
// Passed in invalid scratch space path, return scratch space needed error.
err = ErrRequiresScratchSpace
} else if err != nil {
err = errors.Wrap(err, "Unable to transfer source data to scratch space")
}
return pp, err
})
dp.RegisterPhaseExecutor(ProcessingPhaseTransferDataDir, func() (ProcessingPhase, error) {
pp, err := dp.source.Transfer(dp.dataDir)
if err != nil {
err = errors.Wrap(err, "Unable to transfer source data to target directory")
}
return pp, err
})
dp.RegisterPhaseExecutor(ProcessingPhaseTransferDataFile, func() (ProcessingPhase, error) {
pp, err := dp.source.TransferFile(dp.dataFile)
if err != nil {
err = errors.Wrap(err, "Unable to transfer source data to target file")
}
return pp, err
})
dp.RegisterPhaseExecutor(ProcessingPhaseValidatePause, func() (ProcessingPhase, error) {
pp := ProcessingPhasePause
err := dp.validate(dp.source.GetURL())
if err != nil {
pp = ProcessingPhaseError
}
return pp, err
})
dp.RegisterPhaseExecutor(ProcessingPhaseConvert, func() (ProcessingPhase, error) {
pp, err := dp.convert(dp.source.GetURL())
if err != nil {
err = errors.Wrap(err, "Unable to convert source data to target format")
}
return pp, err
})
dp.RegisterPhaseExecutor(ProcessingPhaseResize, func() (ProcessingPhase, error) {
pp, err := dp.resize()
if err != nil {
err = errors.Wrap(err, "Unable to resize disk image to requested size")
}
return pp, err
})
dp.RegisterPhaseExecutor(ProcessingPhaseMergeDelta, func() (ProcessingPhase, error) {
pp, err := dp.merge()
if err != nil {
err = errors.Wrap(err, "Unable to apply delta to base image")
}
return pp, err
})
}
|
ProcessData()方法会通过dp.ProcessDataWithPause()去遍历所有注册的函数,完成process流程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
// ProcessData is the main synchronous processing loop
func (dp *DataProcessor) ProcessData() error {
if size, _ := util.GetAvailableSpace(dp.scratchDataDir); size > int64(0) {
// Clean up before trying to write, in case a previous attempt left a mess. Note the deferred cleanup is intentional.
if err := CleanDir(dp.scratchDataDir); err != nil {
return errors.Wrap(err, "Failure cleaning up temporary scratch space")
}
// Attempt to be a good citizen and clean up my mess at the end.
defer CleanDir(dp.scratchDataDir)
}
if size, _ := util.GetAvailableSpace(dp.dataDir); size > int64(0) && dp.needsDataCleanup {
// Clean up data dir before trying to write in case a previous attempt failed and left some stuff behind.
if err := CleanDir(dp.dataDir); err != nil {
return errors.Wrap(err, "Failure cleaning up target space")
}
}
return dp.ProcessDataWithPause()
}
|
从initDefaultPhases()中可以总结出默认的Phase有 info->TransferScratch->TransferDataDir->TransferDataFile-ValidatePause->Convert-Resize->MergeDelta。
在 NewHTTPDataSource 中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
//cmd/cdi-importer/importer.go
ds, err := importer.NewHTTPDataSource(ep, acc, sec, certDir, cdiv1.DataVolumeContentType(contentType))
ep = https://download.cirros-cloud.net/0.4.0/cirros-0.4.0-x86_64-disk.img
contentType = kubevirt
// pkg/importer/http-datasource.go
// NewHTTPDataSource creates a new instance of the http data provider.
func NewHTTPDataSource(endpoint, accessKey, secKey, certDir string, contentType cdiv1.DataVolumeContentType) (*HTTPDataSource, error) {
ep, err := ParseEndpoint(endpoint)
if err != nil {
return nil, errors.Wrapf(err, fmt.Sprintf("unable to parse endpoint %q", endpoint))
}
ctx, cancel := context.WithCancel(context.Background())
extraHeaders, secretExtraHeaders, err := getExtraHeaders()
if err != nil {
cancel()
return nil, errors.Wrap(err, "Error getting extra headers for HTTP client")
}
//这里将得到决定info中流程走向的一些变量
httpReader, contentLength, brokenForQemuImg, err := createHTTPReader(ctx, ep, accessKey, secKey, certDir, extraHeaders, secretExtraHeaders)
if err != nil {
cancel()
return nil, err
}
httpSource := &HTTPDataSource{
ctx: ctx,
cancel: cancel,
httpReader: httpReader,
contentType: contentType,
endpoint: ep,
customCA: certDir,
brokenForQemuImg: brokenForQemuImg,
contentLength: contentLength,
}
httpSource.n = createNbdkitCurl(nbdkitPid, accessKey, secKey, certDir, nbdkitSocket, extraHeaders, secretExtraHeaders)
// We know this is a counting reader, so no need to check.
countingReader := httpReader.(*util.CountingReader)
go httpSource.pollProgress(countingReader, 10*time.Minute, time.Second)
return httpSource, nil
}
|
可以明确
- contenttype = kubevirt
- dest = /dev/cdi-block-volume
- volumeMode = block
ProcessData 代码是在 /pkg/importer 目录下各个 datasource 去具体实现的。这里以 http-datasource 为例。

Info 阶段将获得datasource的info,并且根据所用的contenttype决定后续行动。经过一系列的判断进入不同的流程。因此需要去分析datasource的参数。简单来说会对source资源进行测试,如果qume-img无法下载则使用TransferScratch要不然就是用TransferDataFile。
由于我们知道dest 是 /dev/cdi-block-volume所以使用了TransferDataFile。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
func (hs *HTTPDataSource) Transfer(path string) (ProcessingPhase, error) {
if hs.contentType == cdiv1.DataVolumeKubeVirt {
size, err := util.GetAvailableSpace(path)
if size <= int64(0) {
//Path provided is invalid.
return ProcessingPhaseError, ErrInvalidPath
}
file := filepath.Join(path, tempFile)
err = util.StreamDataToFile(hs.readers.TopReader(), file)
if err != nil {
return ProcessingPhaseError, err
}
// If we successfully wrote to the file, then the parse will succeed.
hs.url, _ = url.Parse(file)
return ProcessingPhaseConvert, nil
}
|
由于contenttype是kubevirt ,在StreamDataToFile(hs.readers.TopReader(), file)中将io.Reader 中的数据传到相应的path中,datasource的url设为/dev/cdi-block-volume/tmpimage进入convert流程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
// convert is called when convert the image from the url to a RAW disk image. Source formats include RAW/QCOW2 (Raw to raw conversion is a copy)
func (dp *DataProcessor) convert(url *url.URL) (ProcessingPhase, error) {
err := dp.validate(url)
if err != nil {
return ProcessingPhaseError, err
}
klog.V(3).Infoln("Converting to Raw")
//此处dataFile 为dest := getImporterDestPath(contentType, volumeMode)中设置的common.WriteBlockPath = /dev/cdi-block-volume
err = qemuOperations.ConvertToRawStream(url, dp.dataFile, dp.preallocation)
if err != nil {
return ProcessingPhaseError, errors.Wrap(err, "Conversion to Raw failed")
}
dp.preallocationApplied = dp.preallocation
return ProcessingPhaseResize, nil
}
|
ConvertToRawStream(url, dp.dataFile, dp.preallocation)在pkg/image/qemu.go中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
func convertToRaw(src, dest string, preallocate bool) error {
args := []string{"convert", "-t", "writeback", "-p", "-O", "raw", src, dest}
var err error
if preallocate {
err = addPreallocation(args, convertPreallocationMethods, func(args []string) ([]byte, error) {
return qemuExecFunction(nil, reportProgress, "qemu-img", args...)
})
} else {
klog.V(3).Infof("Running qemu-img convert with args: %v", args)
_, err = qemuExecFunction(nil, reportProgress, "qemu-img", args...)
}
if err != nil {
os.Remove(dest)
errorMsg := "could not convert image to raw"
if nbdkitLog, err := ioutil.ReadFile(common.NbdkitLogPath); err == nil {
errorMsg += " " + string(nbdkitLog)
}
return errors.Wrap(err, errorMsg)
}
return nil
}
func (o *qemuOperations) ConvertToRawStream(url *url.URL, dest string, preallocate bool) error {
if len(url.Scheme) > 0 && url.Scheme != "nbd+unix" {
return fmt.Errorf("not valid schema %s", url.Scheme)
}
return convertToRaw(url.String(), dest, preallocate)
}
|
如果不存在问题则进入Resize流程,resize流程无问题则complete