众所周知,在 K8s 的 informer 缓存里,通过添加 indexer 能加快查询,这里看看为什么。
以pv controller为例,在初始化pv controller的时候,创建了两份本地cache,这个cache实际上跟informer cache工作原理一致,其中一个添加了索引,看一下这个缓存是怎么使用,以及怎么实现的。
controller := &PersistentVolumeController{
// 根据accessmode添加索引
volumes: newPersistentVolumeOrderedIndex(),
claims: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
// ... 忽略其他代码
}
func newPersistentVolumeOrderedIndex() persistentVolumeOrderedIndex {
return persistentVolumeOrderedIndex{cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"accessmodes": accessModesIndexFunc})}
}
首先需要解释一下,为什么要额外设置一份pv的缓存,在pv informer已经帮我们缓存了一份的情况下。根据注释:当将pv与pvc绑定的时候,将产生4个event,大概是下面顺序,但是根据goroutine的快慢可能完全不一致。
- volume.Spec更新
- volume.Status更新
- claim.Spec更新
- claim.Status更新
因为做了本地缓存,在收到这些事件的时候,controller可以根据本地缓存在检查当前的状态,而不必依赖informer的同步,因为informer同步有快慢,举例说明:假设volume.Spec这个事件先到了,因为其他事件还没到,它看到的volume.Status以及claim状态是旧的(因为informer还没同步过来),它看到pvc还没有绑定,于是开始bind操作,其实是不必要的。
由上面可以看出,这个缓存的CRUD是由controller自己控制的,用来防止informer同步导致的异步问题。
本地缓存在pv controller中的使用
首先是初始化,方法时initializeCaches
,在控制器run
之前运行的,就是把informer里面的cache深拷贝一遍。
pv的同步逻辑主要是下面方法
func (ctrl *PersistentVolumeController) initializeCaches(volumeLister corelisters.PersistentVolumeLister, claimLister corelisters.PersistentVolumeClaimLister) {
volumeList, err := volumeLister.List(labels.Everything())
if err != nil { return }
for _, volume := range volumeList {
// 从informer的cache中取出来,放到私有缓存中
volumeClone := volume.DeepCopy()
if _, err = ctrl.storeVolumeUpdate(volumeClone); err != nil {
klog.Errorf("error updating volume cache: %v", err)
}
}
}
上面有个方法storeVolumeUpdate
是主要的更新以及添加本地缓存的方法。
func (ctrl *PersistentVolumeController) storeVolumeUpdate(volume interface{}) (bool, error) {
return storeObjectUpdate(ctrl.volumes.store, volume, "volume")
}
// storeObjectUpdate 会在两种情况下更新本地缓存:
// 1. Informer回调函数(来自etcd的事件)
// 2. controller自己修改了资源,更新了pv之后,在update到apiserver之后也要同步更新到本地缓存
func storeObjectUpdate(store cache.Store, obj interface{}, className string) (bool, error) {
// 首先尝试从缓存中获取
oldObj, found, err := store.Get(obj)
if err != nil {
return false, fmt.Errorf("Error finding %s %q in controller cache: %v", className, objName, err)
}
// 返回 metav1.Object类型,用来访问资源的元信息
objAccessor, _ := meta.Accessor(obj)
// 本地缓存中不存在
if !found {
// 打印resourceVersion,并记录到本地缓存中,并直接返回
klog.V(4).Infof("storeObjectUpdate: adding %s %q, version %s", className, objName, objAccessor.GetResourceVersion())
_ = store.Add(obj)
return true, nil
}
oldObjAccessor, err := meta.Accessor(oldObj)
if err != nil {
return false, err
}
// 获取新旧资源的ResourceVersion
objResourceVersion, _ := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64)
oldObjResourceVersion, _ := strconv.ParseInt(oldObjAccessor.GetResourceVersion(), 10, 64)
// 旧的资源RV更高,直接返回
if oldObjResourceVersion > objResourceVersion {
return false, nil
}
// 否则就更新本地缓存
if err = store.Update(obj); err != nil {
return false, fmt.Errorf("Error updating %s %q in controller cache: %v", className, objName, err)
}
return true, nil
}
本地缓存的删除是在同步到apiserver的删除事件之后做的。添加和删除以及更新都很容易理解。这里再找下,controller是怎么查询并使用本地缓存的,毕竟缓存的主要意义在这里。
看了一下,在syncClaim
方法中,在需要查找volume的地方都是从私有缓存查的,以syncUnboundClaim
为例,findBestMatchForCliam
就是从本地缓存取的。
if claim.Spec.VolumeName == "" {
// User did not care which PV they get.
delayBinding, _ := ctrl.shouldDelayBinding(claim)
// 从本地缓存取volume
volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding)
if err != nil {
klog.V(2).Infof("synchronizing unbound PersistentVolumeClaim[%s]: Error finding PV for claim: %v", claimToClaimKey(claim), err)
return fmt.Errorf("Error finding PV for claim %q: %v", claimToClaimKey(claim), err)
}
}
syncBoundClaim
方法中,对于volume的查找也是从私有缓存进行的。那么informer cache在pv controller运行中起了什么作用呢?访问informer pv cache主要是通过controller结构体中的volumeLister
进行的,
type PersistentVolumeController struct {
volumeLister corelisters.PersistentVolumeLister
volumeListerSynced cache.InformerSynced
// 省去其他代码
}
该字段除了初始化,还有三个地方使用到了,分别是:
- 初始化私有缓存的时候,把informer的cache全部深拷贝到私有缓存中去
- 同步volume的时候,判断volume的事件类型,在informer中表示 update/sync/add事件,不在informer中表示delete事件
- 执行resync的时候,把informer中的所有元素入队,默认resync周期是15s
为什么indexer能加快查询
首先看这个cache的初始化,直接调用newPersistentVolumeOrderedIndex()
没有任何输入参数,
func newPersistentVolumeOrderedIndex() persistentVolumeOrderedIndex {
return persistentVolumeOrderedIndex{cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"accessmodes": accessModesIndexFunc})}
}
accessModesIndexFunc
是一个生成索引字段的函数,输入为资源,输出为一个[]string,表示这个资源对应的索引。其必须是IndexFunc类型。
type IndexFunc func(obj interface{}) ([]string, error)
volume indexer的主要实现是persistentVolumeOrderedIndex
,定义如下:
// persistentVolumeOrderedIndex 根据 accessModes 进行索引,并根据存储容量排序。
type persistentVolumeOrderedIndex struct {
store cache.Indexer
}
相对于普通的store接口,Indexer接口的定义如下:
// Indexer 是一个存储接口,允许通过多个索引函数来list资源
type Indexer interface {
Store
// 使用预配置的索引名,以及期望资源的模板,枚举所有匹配的资源
Index(indexName string, obj interface{}) ([]interface{}, error)
// IndexKeys returns the set of keys that match on the named indexing function.
IndexKeys(indexName, indexKey string) ([]string, error)
// ListIndexFuncValues returns the list of generated values of an Index func
ListIndexFuncValues(indexName string) []string
// ByIndex lists object that match on the named indexing function with the exact key
ByIndex(indexName, indexKey string) ([]interface{}, error)
// GetIndexer return the indexers
GetIndexers() Indexers
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
}
对于我们pv来说,主要作用的方法是listByAccessModes
// listByAccessModes 列出缓存中所有与给定的pv的accessModes相等的 pv
func (pvIndex *persistentVolumeOrderedIndex) listByAccessModes(modes []v1.PersistentVolumeAccessMode) ([]*v1.PersistentVolume, error) {
// 使用pv构造期望的accessMode
pv := &v1.PersistentVolume{
Spec: v1.PersistentVolumeSpec{
AccessModes: modes,
},
}
// 枚举
objs, err := pvIndex.store.Index("accessmodes", pv)
if err != nil {
return nil, err
}
// 类型转义
volumes := make([]*v1.PersistentVolume, len(objs))
for i, obj := range objs {
volumes[i] = obj.(*v1.PersistentVolume)
}
return volumes, nil
}
上面Index对应到底层是threadSafeMap
的Index方法,如下
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()
// 根据预配置的索引名,从map中取出索引函数
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
// 使用索引函数,以及给定的期望模板,计算出索引值,返回值类型是 []string
indexKeys, err := indexFunc(obj)
if err != nil {
return nil, err
}
index := c.indices[indexName]
var returnKeySet sets.String
if len(indexKeys) == 1 {
// 在大多数情况下,indexKeys里面只有一个元素,也就是说,我们自定义的索引函数,针对给定
// 的资源,只生成一个索引值,[]string里面只含有一个元素。这里把这个path单独拎出来优化一下。
returnKeySet = index[indexKeys[0]]
} else {
// 如果对应的资源有多个索引值,每个索引值都对应一个结果集合,表示这个结果集合匹配特定的索引值
returnKeySet = sets.String{}
for _, indexKey := range indexKeys {
for key := range index[indexKey] {
returnKeySet.Insert(key)
}
}
}
// 根据索引保存的元素的key,取出元素并返回。
list := make([]interface{}, 0, returnKeySet.Len())
for absoluteKey := range returnKeySet {
list = append(list, c.items[absoluteKey])
}
return list, nil
}
由上面方法看出,给缓存设定索引,其实就是在底层根据索引函数,把元素的key保存了一边,根据索引查询时,直接根据索引返回key的集合,再根据这些key取出元素,梳理一下相应的数据结构。
type threadSafeMap struct {
lock sync.RWMutex
// 存放资源
items map[string]interface{}
// 索引名字到索引函数的映射,这个是提前注册的
indexers Indexers
// 这个就是存放索引的集合了,是个二维的map,外层的map是根据索引名,取出该索引名下分类好的
// 各个key集合,每个key集合对应该索引下的特定索引值
indices Indices
}
// Index maps the indexed value to a set of keys in the store that match on that value
// Index 映射 索引值到 key的集合
type Index map[string]sets.String
// Indexers maps a name to a IndexFunc
type Indexers map[string]IndexFunc
// Indices maps a name to an Index
type Indices map[string]Index
总结一下:
每个Index(索引)都有缓存数据的一份全集,当然只是absoluteKey的全集就可以了,详细的资源全集只存放在items
结构体中。这里的absoluteKey就是初始化cache就需要指定的,一般是DeletionHandlingMetaNamespaceKeyFunc
,通俗地说,每个index都会根据indexFunc生成的key将所有资源划分若干子集,每个子集的元素都是absoluteKey,当根据key来list资源时,时间复杂度就变成O(1)了,因为是map。也由此可以看出,indexer的好处是利于list,是一种使用空间换时间的方式,额外消耗的空间是所有资源的absoluteKey重新复制一遍。(map的key也是消耗空间的)
// DeletionHandlingMetaNamespaceKeyFunc checks for
// DeletedFinalStateUnknown objects before calling
// MetaNamespaceKeyFunc.
func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
if d, ok := obj.(DeletedFinalStateUnknown); ok {
return d.Key, nil
}
return MetaNamespaceKeyFunc(obj)
}