K8s Apiserver 中的存储层调用链路梳理

这两天在家看了一下《Kubernetes 源码剖析》,主要是关于 Apiserver / etcd 的部分,感觉受益颇多,这里整理一下。总的来说,对于特定资源的处理函数我们只需要关注两个文件就可以了。参考下图,以 Deployment 为例,一个是处理文件 storage.go,定义了该资源的 Storage.Interface 实现,其实一般用的是默认的 CacherStorage,只有个别资源的个别操作比较特殊时,才在这个文件中实现。比如,Deployment 的 storage.go 中定义了一些对 Deployment Status 的处理,以及对 Deployment 扩缩容的处理等,这个是 Deployment 相对于其他资源比较特殊的地方。

对于 Deployment,另一个需要注意的文件是 storage_apps.go 这个文件定义了 Deployment 资源的 REST 接口,在 Kube-Apiserver 启动进行初始化时,这些 REST 都会被注册到 Http server 中。

java-javascript

关于 Kube-apiserver 底层存储启动,一共有两个:CacherStorage 以及 RawStorage,前者是后者的封装,添加了对读请求的缓存,后者是直接跟 Etcd 打交道。对于特定资源的处理,我们先看 storage.go 文件,再来看 cacher.go 文件(后面有 cacher.go 文件的路径)。

RESTStorage 存储服务通用接口

k8s 中的所有资源都必须实现 RESTStorage 接口(所有通过 RESTful API 对外暴露的资源都必须实现 RESTStorage 接口),在文件 vendor/k8s.io/apiserver/pkg/registry/rest/rest.go 中,该接口定义为:

type Storage interface {
    New() runtime.Object
}

Kubernetes 中每种资源实现的 RESTstorage 接口一般定义在 pkg/registry/<资源组>/<资源>/storage/storage.go 中,他们通过 NewStorage 函数或 NewREST 函数实例化。以 Deployment 资源为例,代码示例如下:

代码路径:pkg/registry/apps/deployment/storage/storage.go

func NewStorage(optsGetter generic.RESTOptionsGetter) (DeploymentStorage, error) {
	// 调用下面的 NewREST 函数创建 REST 接口
	deploymentRest, deploymentStatusRest, deploymentRollbackRest, err := NewREST(optsGetter)
	if err != nil {
		return DeploymentStorage{}, err
	}
	// 省略部分代码...
	return DeploymentStorage{
        // 省略部分代码
	}, nil
}

// NewREST returns a RESTStorage object that will work against deployments.
func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, *RollbackREST, error) {
	store := &genericregistry.Store{
		NewFunc:                  func() runtime.Object { return &apps.Deployment{} },
		NewListFunc:              func() runtime.Object { return &apps.DeploymentList{} },
		DefaultQualifiedResource: apps.Resource("deployments"),

		// 这些 Stragegy 类似 hook 操作,在实际执行更新/创建操作时,进行一些校验等操作。
		CreateStrategy: deployment.Strategy,
		UpdateStrategy: deployment.Strategy,
		DeleteStrategy: deployment.Strategy,
		TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
	}
    options := &generic.StoreOptions{RESTOptions: optsGetter}
	// 这个 CompleteWithOptions 方法会补充一些配置,比如初始化 Storage 等
	if err := store.CompleteWithOptions(options); err != nil {
		return nil, nil, nil, err
	}
	// statusREST 跟 Status 更新有关,RollbackREST 跟扩缩容有关
	statusStore := *store
	statusStore.UpdateStrategy = deployment.StatusStrategy
	return &REST{store, []string{"all"}}, &StatusREST{store: &statusStore}, &RollbackREST{store: store}, nil
}

// 实现 deployment 资源的 RESTStorage 接口
type REST struct {
	*genericregistry.Store
	categories []string
}
// 用于实现 deployment/status 子资源的 RESTStorage 接口
type StatusREST struct {
	store *genericregistry.Store
}

上面 REST 以及 StatusREST 都是对 RegistryStore 操作进行封装。比如对 deployment/status 子资源的 Get 操作,实际执行的是 RegistryStore 操作,代码如下:

func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
	return r.store.Get(ctx, name, options)
}

RegistryStore 存储服务通用操作

RegistryStore 的代码路径为 vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go

RegistryStore 注册了两类函数:

  • Before Func: 也称 Strategy 预处理,它被定义为在创建资源对象之前调用,做一些预处理的操作。
  • After Func: 它被定义在创建资源对象之后调用,做一些收尾工作。(当前 K8s 未实现该功能)。

RegistryStore 结构体如下,根据注释,其实现了 rest.StandardStorage,并被设计成了可嵌入式的,允许针对特定的资源类型实现特定的(non-generic)处理函数。Storage 字段是 RegistryStoreStorage.Interface 通用存储接口进行的封装,实现了对 Etcd 集群的读写操作。

// Store implements pkg/api/rest.StandardStorage. It's intended to be
// embeddable and allows the consumer to implement any non-generic functions
// that are required. This object is intended to be copyable so that it can be
// used in different ways but share the same underlying behavior.
//
// All fields are required unless specified.
//
// The intended use of this type is embedding within a Kind specific
// RESTStorage implementation. This type provides CRUD semantics on a Kubelike
// resource, handling details like conflict detection with ResourceVersion and
// semantics. The RESTCreateStrategy, RESTUpdateStrategy, and
// RESTDeleteStrategy are generic across all backends, and encapsulate logic
// specific to the API.
type Store struct {
	// CreateStrategy implements resource-specific behavior during creation.
	CreateStrategy rest.RESTCreateStrategy
	// AfterCreate implements a further operation to run after a resource is
	// created and before it is decorated, optional.
	AfterCreate ObjectFunc

	// 省略了很多字段

	// Storage is the interface for the underlying storage for the
	// resource. It is wrapped into a "DryRunnableStorage" that will
	// either pass-through or simply dry-run.
	Storage DryRunnableStorag
}

Storage.Interface 通用存储接口

Storage.Interface 通用存储接口定义了资源的操作方法,其代码路径为:vendor/k8s.io/apiserver/pkg/storage/interface.go,这个接口方法比较多,但是都比较重要,也都比较容器理解的。

// Interface offers a common interface for object marshaling/unmarshaling operations and
// hides all the storage-related operations behind it.
type Interface interface {
	Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error
	Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions, validateDeletion ValidateObjectFunc) error
	// watch 单个 key
	Watch(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
	// watch 多个 key (当前目录及目录下的所有 key)
	WatchList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
	Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error
	GetToList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
	List(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
	// 保证执行成功的 update 方法,一直执行 tryUpdate
	GuaranteedUpdate(
		ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
		precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error
}

Storage.Interface 是通用存储接口,实现通用存储接口的分布是 CacherStorage 以及 UnderlyingStorage 资源存储对象,分别介绍如下:

  • CacherStorage: 带有缓存功能的资源存储对象,定义在 vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go
  • UnderlyingStorage: 底层存储对象,真正与 Etcd 集群交互的资源存储对象,它定义在 vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go 中。

关注下 CacherStorage 以及 UnderlyingStorage 的初始化过程。

func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
	ret := generic.RESTOptions{
		StorageConfig:           &f.Options.StorageConfig,
		// 默认是初始化一个 UndecoratedStorage,也就是调用 NewRawStorage 初始化一个 etcd3 存储
		Decorator:               generic.UndecoratedStorage,
		EnableGarbageCollection: f.Options.EnableGarbageCollection,
		DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
		ResourcePrefix:          resource.Group + "/" + resource.Resource,
		CountMetricPollPeriod:   f.Options.StorageConfig.CountMetricPollPeriod,
	}
	// 如果启用了 watchCache,则初始化 StorageWithCacher,这个其实是对 RawStorage 的封装,对“读”操作做了缓存
	if f.Options.EnableWatchCache { 
		sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
		if err != nil {
			return generic.RESTOptions{}, err
		}
		cacheSize, ok := sizes[resource]
		if !ok {
			cacheSize = f.Options.DefaultWatchCacheSize
		}
		// depending on cache size this might return an undecorated storage
		ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
	}
	return ret, nil
}

我们以 CacherStorageGet 方法为例,看下实现方式是怎样的,只关注几行核心代码:

func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
	// 如果 resourceVersion 没有指定,则直接调用 etcd 的接口
	if resourceVersion == "" {
		return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
	}
	// 省略一堆代码

	// 这个地方看上去是在等待特定的 resourceVersion,这个有空研究一下
	obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(getRV, key, nil)
	if err != nil {
		return err
	}
	return nil
}

Etcd 在 Kube-apiserver 中的层次可以参考下图: java-javascript