controllerruntime 中的 cache 与 client 实现

目录

cache 问题概述

近期我们需要将组件部署到客户机房,客户机房对权限要求比较严格,需要我们提供一个 ClusterRole 以及 Role 列表,在梳理 ClusterRole 列表时,发现一个问题,controllerruntime 的缓存机制默认是缓存集群内所有资源的。比如我只想关注 test-ns 命名空间下的 pod 资源,但是缓存默认缓存所有命名空间下的 pod 资源,因此需要 ClusterRole 权限。

问题的解决方式是在初始化 Manager 的时候指定一个命名空间列表,表示只缓存特定命名空间下的资源。本文通过这个问题,研究下 controllerruntime 中的 cache 与client 实现。

mgr, err := ctrl.NewManager(restCfg, ctrl.Options{
	Host:                   "0.0.0.0",
	Port:                   9443,
	MetricsBindAddress:     metricsAddr,
	Cache: cache.Options{
		// 只缓存 test-ns 下面的资源 
		Namespaces: []string{"test-ns"},
	},
})

controllerruntime 的核心接口

在分析 cache 之前,先大概看下 controllerruntime 中的相关结构体,有个感性的认识。

Manager 接口

Manager 大概分两部分:Cluster 接口;其他工具接口,其实现者为 controllerManager 结构体。因为内嵌了 cluster.Cluster 接口,因此实际上 Manager 的大多数接口都是 Cluster 接口提供的,比如 cache 和 client 相关接口。

// Manager initializes shared dependencies such as Caches and Clients, and provides them to Runnables.
// A Manager is required to create Controllers.
type Manager interface {
	// Cluster holds a variety of methods to interact with a cluster.
	cluster.Cluster
	Add(Runnable) error
	Elected() <-chan struct{}
	AddHealthzCheck(name string, check healthz.Checker) error
	AddReadyzCheck(name string, check healthz.Checker) error
	Start(ctx context.Context) error
	GetWebhookServer() webhook.Server
	GetLogger() logr.Logger
	GetControllerOptions() config.Controller
}

Cluster 接口

Cluster 接口是 controllerruntime 实现的主要接口,包括缓存、client 等,其实现者是 cluster 结构体。下面接口中我们先关注两个接口:

1)GetClient():我们最常用的接口(或者说是必用),这个接口默认情况从缓存拿资源,如果资源的缓存不存在,则主动构建一个,这里的缓存是只 informercache。举例来说,如果我们通过下面的语句拿一个 StatefulSet,控制器发现没有缓存 StatefulSet 时,会在后台默认给我们生成一个,这个是按需生成的。

sts := &appsv1.StatefulSet{}
if err := r.Client.Get(context.TODO(), types.NamespacedName{
	Name:      "web",
	Namespace: "default",
}, sts); err != nil {
	fmt.Printf("list statefulset err: %v", err)
}

2)GetAPIReader():这个接口生成一个直接跟 apiserver 交互的 reader,只有 get 与 list 接口。

// Cluster provides various methods to interact with a cluster.
type Cluster interface {
	GetHTTPClient() *http.Client
	GetConfig() *rest.Config
	GetCache() cache.Cache
	GetScheme() *runtime.Scheme
	// 获取 client,默认情况下,这个 client 从缓存读数据
	GetClient() client.Client
	GetFieldIndexer() client.FieldIndexer
	GetEventRecorderFor(name string) record.EventRecorder
	GetRESTMapper() meta.RESTMapper
	// 获取一个 Reader
	GetAPIReader() client.Reader
	Start(ctx context.Context) error
}

初始化过程

在实现一个控制器时,项目的初始化一般分两步:1)初始化一个 Manager;2)用我们的 Manager 初始化一个控制器。本文我们不关注如何实现一个控制器,只关注控制器中的 cache 以及 client 的实现。

// 第一步初始化 Manager
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
	Scheme:                 scheme,
	MetricsBindAddress:     metricsAddr,
	Port:                   9443,
	HealthProbeBindAddress: probeAddr,
	Cache: cache.Options{
		Namespaces: []string{"test-ns"},
	},
})
// 第二步初始化我们的自定义控制器
if err = (&controllers.GuestbookReconciler{
	Client: mgr.GetClient(),
	Reader: mgr.GetAPIReader(),
	Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
	setupLog.Error(err, "unable to create controller", "controller", "Guestbook")
	os.Exit(1)
}

cache 初始化

因为 client 的初始化是依赖 cache 的,因此我们先关注 cache 的初始化。cache 的初始化是在 Cluster 接口中做的。

// pkg/cluster/cluster.go
func setOptionsDefaults(options Options, config *rest.Config) (Options, error) {
	if options.NewCache == nil {
		options.NewCache = cache.New
	}
	// ...
}

上面代码中的 cache.New 是默认方法,位于文件 pkg/cache/cache.go 中。下面是其代码实现,在下面情况下,只是配置了一个 namespace 的情况,多个命名空间会走 newMultiNamespaceCache 方法。

func New(config *rest.Config, opts Options) (Cache, error) {
	// ...
	return &informerCache{
		scheme: opts.Scheme,
		Informers: internal.NewInformers(config, &internal.InformersOpts{
			HTTPClient:   opts.HTTPClient,
			Scheme:       opts.Scheme,
			Mapper:       opts.Mapper,
			ResyncPeriod: *opts.SyncPeriod,
			Namespace:    opts.Namespaces[0],
			ByGVK:        byGVK,
		}),
	}, nil
}

因此,我们知道 controllerruntime 的 cache 实现是 informerCache,后面我们看 cache 相关接口时,可以直接看 informerCache 的相关实现。

client 初始化

client 的初始化同样是在初始化 Cluster 接口时候做的,下面的 client.New 是在 pkg/client/client.go 方法中实现的。在该方法的注释中,说生成的不带 cache 的 client,这个其实是有误的(看代码的时候给我造成了很大的误解)。因为如果 Options 中带有 cache,则走的是 cache,而且默认情况下是走 cache 的。

// pkg/cluster/cluster.go
func setOptionsDefaults(options Options, config *rest.Config) (Options, error) {
	if options.NewClient == nil {
		options.NewClient = client.New
	}
	// ...
	return options, nil
}

// pkg/client/client.go,注意下面注释有误

// New returns a new Client using the provided config and Options.
// The returned client reads *and* writes directly from the server
// (it doesn't use object caches).
func New(config *rest.Config, options Options) (c Client, err error) {
	c, err = newClient(config, options)
	if err == nil && options.DryRun != nil && *options.DryRun {
		c = NewDryRunClient(c)
	}
	return c, err
}

在 Cluster 接口的初始化时,生成两个两个 client:clientWriter 和 clientReader。后者的实现比较明确,是直接跟 K8s apiserver 交互的,不会走 cache。 我们重点看下 clientWriter 的实现。需要注意的是 clientWriter 最终赋值给了 cluster 的 client 字段,也是我们 Manager 接口 GetClient() 方法最终返回的字段。

// New constructs a brand new cluster.
func New(config *rest.Config, opts ...Option) (Cluster, error) {
	options, _ := setOptionsDefaults(options, config)
	cache, _ := options.NewCache(config, cacheOpts)

	// Create the client, and default its options.
	clientOpts := options.Client
	{
		if clientOpts.Cache.Reader == nil {
			clientOpts.Cache.Reader = cache
		}
	}
	// Writer 走 cache
	clientWriter, err := options.NewClient(config, clientOpts)
	// Reader 不走 cache
	clientReader, err := client.New(config, client.Options{
		HTTPClient: options.HTTPClient,
		Scheme:     options.Scheme,
		Mapper:     mapper,
	})
	return &cluster{
		cache:            cache,
		fieldIndexes:     cache,
		client:           clientWriter,
		apiReader:        clientReader,
		// ... 只关注 client 和 cache 字段
	}, nil
}

newClient 的实现如下,如果 options 中没有配置 cache,则直接跟 apiserver 交互。否在在调用 Get 方法的时候,从 cache 里拿。

func newClient(config *rest.Config, options Options) (*client, error) {
	c := &client{
		typedClient: typedClient{
			resources:  resources,
			paramCodec: runtime.NewParameterCodec(options.Scheme),
		},
		unstructuredClient: unstructuredClient{
			resources:  resources,
			paramCodec: noConversionParamCodec{},
		},
	}
	if options.Cache == nil || options.Cache.Reader == nil {
		return c, nil
	}
	// client 中的 cache,就是我们上面实现的 informerCache 结构体,但是只用了 Get 和 List
	c.cache = options.Cache.Reader
	return c, nil
}
func (c *client) Get(ctx context.Context, key ObjectKey, obj Object, opts ...GetOption) error {
	if isUncached, err := c.shouldBypassCache(obj); err != nil {
		return err
	} else if !isUncached {
		return c.cache.Get(ctx, key, obj, opts...)
	}
	// ...
}

informerCache 的 Get 接口

通过上面的分析,我们知道通过 client.Get 接口拿资源时,默认走的是 cache,也就是 informerCache 的接口。其中下面的 ic.Informers.Get 方法中,如果没有 gvk 对应的 informer,则会动态创建一个。

// Get implements Reader.
func (ic *informerCache) Get(ctx context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) error {
	gvk, err := apiutil.GVKForObject(out, ic.scheme)
	if err != nil {
		return err
	}

	started, cache, err := ic.Informers.Get(ctx, gvk, out)
	if err != nil {
		return err
	}

	if !started {
		return &ErrCacheNotStarted{}
	}
	return cache.Reader.Get(ctx, key, out)
}

总结

上面我们提到过,在控制器代码中,如果我们使用 client.Get 接口拿资源时,默认是走的 informerCache。还有另一个中情况是,我们在构建一个控制器时,通常会通过 For 或者 Owns 来指定 watch 的资源,这这种情况下,也是共用上面的 informerCache,因此如果我们在初始化 Manager 时指定了命名空间,也会影响到我们的 Controller 可同步到的资源。

// SetupWithManager sets up the controller with the Manager.
func (r *GuestbookReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&webappv1.Guestbook{}).
		Complete(r)
}