目录
local volume provisioner,一个提供本地存储的组件,主要是不用管理员手动创建pv 了,可以根据目录自动发现并创建 pv,梳理一下其工作过程。
main 函数
main 函数做的工作不多,就是启动 controller,并注册了一些 prometheus 监控项。
go controller.StartLocalController(client, procTable, &common.UserConfig{
Node: node,
// 忽略其他代码
})
prometheus.MustRegister([]prometheus.Collector{
metrics.PersistentVolumeDiscoveryTotal,
// 忽略其他代码
}...)
Local Controller 主流程
StartLocalController
主要做两件事:pv的发现和删除。此外,还维护了一份 pv 的缓存,以及通过一个 job 来辅助删除 pod。先列出整个流程,然后逐步分析
func StartLocalController(client *kubernetes.Clientset, ptable deleter.ProcTable, config *common.UserConfig) {
// 生成pv缓存
populator.NewPopulator(runtimeConfig)
// 根据配置决定是否启用job controller
var jobController deleter.JobController
if runtimeConfig.UseJobForCleaning {
jobController, err = deleter.NewJobController(labels, runtimeConfig)
}
cleanupTracker := &deleter.CleanupStatusTracker{ProcTable: ptable, JobController: jobController}
// 发现
discoverer, err := discovery.NewDiscoverer(runtimeConfig, cleanupTracker)
// 删除
deleter := deleter.NewDeleter(runtimeConfig, cleanupTracker)
// Run controller logic.
if jobController != nil {
go jobController.Run(wait.NeverStop)
}
for {
deleter.DeletePVs()
discoverer.DiscoverLocalVolumes()
time.Sleep(10 * time.Second)
}
}
流程简单清晰,现在逐步看一下。
生成 pv 缓存
这个本质上没什么好说的,就是通过 informer 机制来同步缓存。这部分逻辑如下。
func NewPopulator(config *common.RuntimeConfig) *Populator {
p := &Populator{RuntimeConfig: config}
// 定制化一个listwatch,只缓存label值等于value的pv
optionsModifier := func(options *metav1.ListOptions) {
options.LabelSelector = apis.LabelHostname + "=" + config.Node.Name
}
// 使用factory生成一个informer,对于同一种类型,可以调用多次InformerFor,但是factory只会生成一个,后续会返回之前生成的
pvInformer := config.InformerFactory.InformerFor(
&v1.PersistentVolume{},
func(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
cache.NewFilteredListWatchFromClient(client.CoreV1().RESTClient(),"persistentvolumes","",optionsModifier,),
&v1.PersistentVolume{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
},
)
// 监听事件,来更新pv缓存,
pvInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
p.handlePVUpdate(pv) //更新缓存的pv
},
UpdateFunc: func(oldObj, newObj interface{}) {
p.handlePVUpdate(newPV)
},
DeleteFunc: func(obj interface{}) {
p.handlePVDelete(pv) // 从缓存中删除pv。
},
})
return p
}
pv 缓存的构建过程就这样结束了。
发现 pv
发现过程分两步,先构建一个discoverer,再调用 discoverer 的DiscoverLocalVolumes
方法。下面重点看一下后者
func (d *Discoverer) discoverVolumesAtPath(class string, config common.MountConfig) {
// 从缓存中拿storageclass的回收策略,默认是delete
reclaimPolicy, err := d.getReclaimPolicyFromStorageClass(class)
files, err := d.VolUtil.ReadDir(config.MountDir)
// 获取系统的所有挂载点
mountPoints, err := d.RuntimeConfig.Mounter.List()
if err != nil {
klog.Errorf("Error retreiving mountpoints: %v", err)
return
}
// 将所有挂载点放到一个集合中,便于查找
type empty struct{}
mountPointMap := make(map[string]empty)
for _, mp := range mountPoints {
mountPointMap[mp.Path] = empty{}
}
for _, file := range files {
startTime := time.Now()
filePath := filepath.Join(config.MountDir, file)
// 判断是块设备还是文件系统
volMode, err := common.GetVolumeMode(d.VolUtil, filePath)
// 查看pv是否存在,特定目录生成的pv名字是固定的
pvName := generatePVName(file, d.Node.Name, class)
pv, exists := d.Cache.GetPV(pvName)
if exists {
continue
}
usejob := false
if volMode == v1.PersistentVolumeBlock {
usejob = d.RuntimeConfig.UseJobForCleaning
}
// pv正在被删除,暂时不创建
if d.CleanupTracker.InProgress(pvName, usejob) {
klog.Infof("PV %s is still being cleaned, not going to recreate it", pvName)
continue
}
// cleanup old status,忽略错误
d.CleanupTracker.RemoveStatus(pvName, usejob)
mountOptions, err := d.getMountOptionsFromStorageClass(class)
if err != nil {
klog.Errorf("Failed to get mount options from storage class %s: %v", class, err)
continue
}
var capacityByte int64
desireVolumeMode := v1.PersistentVolumeMode(config.VolumeMode)
switch volMode {
case v1.PersistentVolumeBlock:
capacityByte, err = d.VolUtil.GetBlockCapacityByte(filePath) // 查看块设备大小
case v1.PersistentVolumeFilesystem:
capacityByte, err = d.VolUtil.GetFsCapacityByte(filePath) // 该目录是挂载点,获取容量
default:
klog.Errorf("Path %q has unexpected volume type %q", filePath, volMode)
continue
}
// 调用client-go创建pv
d.createPV(file, class, reclaimPolicy, mountOptions, config, capacityByte, desireVolumeMode, startTime)
}
}
删除 pv
删除pv的过程可能要麻烦一点。首先是从pv缓存中枚举pv,只删除phase状态为Released的pv,其他状态的pv不处理,并且在storageclass的回收策略为Delete的时候才删除。
这里有个疑问,当sc回收策略为delete时,pv是被谁删的?
大概浏览了一下pv controller的代码,如果plugin没有实现deletable接口(local volume plugin没有实现),pv controller是不会删除pv的。所以这里pv的删除工作还是交给local volume provisioner了。具体是成功清除目录里的数据后会删除pv,但是根据pv的发现机制,马上会创建一个同名的pv出来,并且状态为Available。
删除pv时,这里可以通过起一个job来删除,或者直接起一个goroutine,判断的条件是:
func (d *Deleter) shouldRunJob(mode v1.PersistentVolumeMode) bool {
return mode == v1.PersistentVolumeBlock && d.RuntimeConfig.UseJobForCleaning
}
另一方面,删除时,会通过statustracker来记录删除状态。这个一会分析下。这里先看下上面两种删除方式。
使用goroutine删除,起一个goroutine来异步删除。
func (d *Deleter) runProcess(pv *v1.PersistentVolume, volMode v1.PersistentVolumeMode, mountPath string,config common.MountConfig) error {
// Run as exec script.
err := d.CleanupStatus.ProcTable.MarkRunning(pv.Name)
go d.asyncCleanPV(pv, volMode, mountPath, config)
return nil
}
func (d *Deleter) asyncCleanPV(pv *v1.PersistentVolume, volMode v1.PersistentVolumeMode, mountPath string,config common.MountConfig) {
// 这个是清空目录,不再跟进去了。
err := d.cleanPV(pv, volMode, mountPath, config)
if err != nil {
// 标记为失败
if err := d.CleanupStatus.ProcTable.MarkFailed(pv.Name); err != nil {
klog.Error(err)
}
return
}
// 标记为成功
if err := d.CleanupStatus.ProcTable.MarkSucceeded(pv.Name); err != nil {
klog.Error(err)
}
}
标记 pv 回收流程,用来标记pv回收流程的接口和结构体如下:
// ProcTable Interface for tracking running processes
type ProcTable interface {
// CleanupBlockPV deletes block based PV
IsRunning(pvName string) bool
IsEmpty() bool
MarkRunning(pvName string) error
MarkFailed(pvName string) error
MarkSucceeded(pvName string) error
RemoveEntry(pvName string) (CleanupState, *time.Time, error)
Stats() ProcTableStats
}
// ProcEntry represents an entry in the proc table
type ProcEntry struct {
StartTime time.Time
Status CleanupState // state有三种:Running,Succeeded,Failed
}
// ProcTableImpl Implementation of BLockCleaner interface
type ProcTableImpl struct {
mutex sync.RWMutex
procTable map[string]ProcEntry // 带锁的map
succeeded int // 统计success的数目
failed int // 统计failed的数目
}
工作流程大概如下:
- 删除前检查该pv是否正在执行删除任务(pv的删除状态为Running或者存在对应job),如果是,直接返回.
- 检查当前状态(同时删除success或者failed状态,如果有),如果succeed,就使用client-go删除pv。如果failed或者notExist则开始删除任务。
- 开始删除时,标记为running。
- 删除结束,标记为失败或者成功。
- discover在创建pv时,如果正在执行删除,则不创建,(实际情况可能更复杂一点)