这两天在家看了一下《Kubernetes 源码剖析》,主要是关于 Apiserver / etcd 的部分,感觉受益颇多,这里整理一下。总的来说,对于特定资源的处理函数我们只需要关注两个文件就可以了。参考下图,以 Deployment 为例,一个是处理文件 storage.go
,定义了该资源的 Storage.Interface
实现,其实一般用的是默认的 CacherStorage
,只有个别资源的个别操作比较特殊时,才在这个文件中实现。比如,Deployment 的 storage.go
中定义了一些对 Deployment Status 的处理,以及对 Deployment 扩缩容的处理等,这个是 Deployment 相对于其他资源比较特殊的地方。
对于 Deployment,另一个需要注意的文件是 storage_apps.go
这个文件定义了 Deployment 资源的 REST 接口,在 Kube-Apiserver 启动进行初始化时,这些 REST 都会被注册到 Http server 中。
关于 Kube-apiserver 底层存储启动,一共有两个:CacherStorage 以及 RawStorage,前者是后者的封装,添加了对读请求的缓存,后者是直接跟 Etcd 打交道。对于特定资源的处理,我们先看 storage.go
文件,再来看 cacher.go
文件(后面有 cacher.go 文件的路径)。
RESTStorage 存储服务通用接口
k8s 中的所有资源都必须实现 RESTStorage
接口(所有通过 RESTful API 对外暴露的资源都必须实现 RESTStorage 接口),在文件 vendor/k8s.io/apiserver/pkg/registry/rest/rest.go 中,该接口定义为:
type Storage interface {
New() runtime.Object
}
Kubernetes 中每种资源实现的 RESTstorage 接口一般定义在 pkg/registry/<资源组>/<资源>/storage/storage.go
中,他们通过 NewStorage
函数或 NewREST
函数实例化。以 Deployment
资源为例,代码示例如下:
代码路径:pkg/registry/apps/deployment/storage/storage.go
func NewStorage(optsGetter generic.RESTOptionsGetter) (DeploymentStorage, error) {
// 调用下面的 NewREST 函数创建 REST 接口
deploymentRest, deploymentStatusRest, deploymentRollbackRest, err := NewREST(optsGetter)
if err != nil {
return DeploymentStorage{}, err
}
// 省略部分代码...
return DeploymentStorage{
// 省略部分代码
}, nil
}
// NewREST returns a RESTStorage object that will work against deployments.
func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, *RollbackREST, error) {
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &apps.Deployment{} },
NewListFunc: func() runtime.Object { return &apps.DeploymentList{} },
DefaultQualifiedResource: apps.Resource("deployments"),
// 这些 Stragegy 类似 hook 操作,在实际执行更新/创建操作时,进行一些校验等操作。
CreateStrategy: deployment.Strategy,
UpdateStrategy: deployment.Strategy,
DeleteStrategy: deployment.Strategy,
TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
}
options := &generic.StoreOptions{RESTOptions: optsGetter}
// 这个 CompleteWithOptions 方法会补充一些配置,比如初始化 Storage 等
if err := store.CompleteWithOptions(options); err != nil {
return nil, nil, nil, err
}
// statusREST 跟 Status 更新有关,RollbackREST 跟扩缩容有关
statusStore := *store
statusStore.UpdateStrategy = deployment.StatusStrategy
return &REST{store, []string{"all"}}, &StatusREST{store: &statusStore}, &RollbackREST{store: store}, nil
}
// 实现 deployment 资源的 RESTStorage 接口
type REST struct {
*genericregistry.Store
categories []string
}
// 用于实现 deployment/status 子资源的 RESTStorage 接口
type StatusREST struct {
store *genericregistry.Store
}
上面 REST
以及 StatusREST
都是对 RegistryStore
操作进行封装。比如对 deployment/status
子资源的 Get 操作,实际执行的是 RegistryStore
操作,代码如下:
func (r *StatusREST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
return r.store.Get(ctx, name, options)
}
RegistryStore 存储服务通用操作
RegistryStore
的代码路径为 vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go。
RegistryStore
注册了两类函数:
- Before Func: 也称
Strategy
预处理,它被定义为在创建资源对象之前调用,做一些预处理的操作。 - After Func: 它被定义在创建资源对象之后调用,做一些收尾工作。(当前 K8s 未实现该功能)。
RegistryStore
结构体如下,根据注释,其实现了 rest.StandardStorage
,并被设计成了可嵌入式的,允许针对特定的资源类型实现特定的(non-generic)处理函数。Storage
字段是 RegistryStore
对 Storage.Interface
通用存储接口进行的封装,实现了对 Etcd 集群的读写操作。
// Store implements pkg/api/rest.StandardStorage. It's intended to be
// embeddable and allows the consumer to implement any non-generic functions
// that are required. This object is intended to be copyable so that it can be
// used in different ways but share the same underlying behavior.
//
// All fields are required unless specified.
//
// The intended use of this type is embedding within a Kind specific
// RESTStorage implementation. This type provides CRUD semantics on a Kubelike
// resource, handling details like conflict detection with ResourceVersion and
// semantics. The RESTCreateStrategy, RESTUpdateStrategy, and
// RESTDeleteStrategy are generic across all backends, and encapsulate logic
// specific to the API.
type Store struct {
// CreateStrategy implements resource-specific behavior during creation.
CreateStrategy rest.RESTCreateStrategy
// AfterCreate implements a further operation to run after a resource is
// created and before it is decorated, optional.
AfterCreate ObjectFunc
// 省略了很多字段
// Storage is the interface for the underlying storage for the
// resource. It is wrapped into a "DryRunnableStorage" that will
// either pass-through or simply dry-run.
Storage DryRunnableStorag
}
Storage.Interface 通用存储接口
Storage.Interface
通用存储接口定义了资源的操作方法,其代码路径为:vendor/k8s.io/apiserver/pkg/storage/interface.go,这个接口方法比较多,但是都比较重要,也都比较容器理解的。
// Interface offers a common interface for object marshaling/unmarshaling operations and
// hides all the storage-related operations behind it.
type Interface interface {
Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error
Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions, validateDeletion ValidateObjectFunc) error
// watch 单个 key
Watch(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
// watch 多个 key (当前目录及目录下的所有 key)
WatchList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error
GetToList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
List(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
// 保证执行成功的 update 方法,一直执行 tryUpdate
GuaranteedUpdate(
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error
}
Storage.Interface
是通用存储接口,实现通用存储接口的分布是 CacherStorage
以及 UnderlyingStorage
资源存储对象,分别介绍如下:
- CacherStorage: 带有缓存功能的资源存储对象,定义在 vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go 中
- UnderlyingStorage: 底层存储对象,真正与
Etcd
集群交互的资源存储对象,它定义在 vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go 中。
关注下 CacherStorage
以及 UnderlyingStorage
的初始化过程。
func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
ret := generic.RESTOptions{
StorageConfig: &f.Options.StorageConfig,
// 默认是初始化一个 UndecoratedStorage,也就是调用 NewRawStorage 初始化一个 etcd3 存储
Decorator: generic.UndecoratedStorage,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
ResourcePrefix: resource.Group + "/" + resource.Resource,
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
}
// 如果启用了 watchCache,则初始化 StorageWithCacher,这个其实是对 RawStorage 的封装,对“读”操作做了缓存
if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
if err != nil {
return generic.RESTOptions{}, err
}
cacheSize, ok := sizes[resource]
if !ok {
cacheSize = f.Options.DefaultWatchCacheSize
}
// depending on cache size this might return an undecorated storage
ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
}
return ret, nil
}
我们以 CacherStorage
的 Get
方法为例,看下实现方式是怎样的,只关注几行核心代码:
func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
// 如果 resourceVersion 没有指定,则直接调用 etcd 的接口
if resourceVersion == "" {
return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
}
// 省略一堆代码
// 这个地方看上去是在等待特定的 resourceVersion,这个有空研究一下
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(getRV, key, nil)
if err != nil {
return err
}
return nil
}
Etcd 在 Kube-apiserver 中的层次可以参考下图: