K8s 队列之基本队列实现

这里的基本队列是指实现了如下接口的队列,代码路径为:client-go/util/workqueue/queue.go,区别于DelayingInterface,以及RateLimitingInterfaceDelayingInterface实现延时加入队列的功能,RateLimitingInterface要配合限速器使用,实现了加入队列时的速率控制。后两者都内嵌了Interface,是基于此队列实现的。

type Interface interface {
	Add(item interface{})
	Len() int
	Get() (item interface{}, shutdown bool)
	Done(item interface{})
	ShutDown()
	ShuttingDown() bool
}

先介绍下接口包含的方法:

  • Add: 添加一个元素到队列,可以是任意类型,但是实际上只添加K8s资源的key(由namespace和name组成的字符串),具体处理某个元素时,再从informer cache中根据key取出元素进行处理。
  • Len: 返回queue队列的长度。
  • Get: 获取queue队列头部的一个元素,将此元素从queue中删除。
  • Done: 标记一个元素刚刚被处理完了,从processing集合中删除,同时,如果dirty集合中有此元素,则添加到queue队列中
  • ShutDown: 关闭队列
  • ShuttingDown: 查询队列是否关闭

K8s代码里对此接口的实现结构体是Type,相关字段如下。核心字段就是一个队列queue,两个setdirtyprocessing。之所以设置这么多集合,是为了保证一个元素在同一个时刻,只有一个worker在处理(也就是一个controller goroutine),在有多个worker的controller中,会有多个worker从queue中取元素。在有一个worker在处理一个元素的时候,其他worker不会拿到这个元素,即使这个元素需要再被处理。因为只要有worker在处理元素,就会把这个元素添加到dirty集合中。

另外需要注意,这个queue并不完全是FIFO的,比如我们有两个元素A, B,这两个元素发生的事件的顺序为A1、A2、B1,当A2发生的时候,A1还在处理,此时A2只能被添加到dirty集合中,假设B1事件又发生了,这时A1事件还在处理,因为processing集合中没有B的key,因此B可以直接被加入到queue队列中,等到A1处理完了,再把A2从processingdirty中删除,并添加到queue中, 因此是先处理B1,再处理A2。

type Type struct {
	// queue defines the order in which we will work on items. Every
	// element of queue should be in the dirty set and not in the
	// processing set.
	// 一个有序队列,
	queue []t
	// dirty defines all of the items that need to be processed.
	dirty set
	// Things that are currently being processed are in the processing set.
	// These things may be simultaneously in the dirty set. When we finish
	// processing something and remove it from this set, we'll check if
	// it's in the dirty set, and if so, add it to the queue.
	processing set
	cond *sync.Cond
	shuttingDown bool
	metrics queueMetrics
	unfinishedWorkUpdatePeriod time.Duration
	clock                      clock.Clock
}

type empty struct{}
type t interface{}
type set map[t]empty

下面具体分析一下每个方法的实现,顺便说一下,Type的所有操作都是加锁的,不管是读操作还是写操作,因此也是线程安全的,三个集合queuedirtyprocessing在同一时刻,只能有一个goroutine在处理,所以能保证三个集合状态的一致性。

Add

Add标记一个元素需要被处理,当调用Add的时候,所添加的元素一定被添加到dirty集合中,如果当前被添加的元素正在被处理(也就是说在processing集合中)那么这个元素不需要被添加到queue队列中了,因为当前正在被某个goroutine处理的元素,在处理完调用Done方法时,会检查该元素是否在processing集合中,如果在processing集合中,则将其添加到queue队列中。

如果不在processing集合中,则添加到queue队列中。

另外,如果有goroutine阻塞在Get调用,则调用q.cond.Signal唤醒一个goroutine。

func (q *Type) Add(item interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
	if q.shuttingDown {
		return
	}
	if q.dirty.has(item) {
		return
	}

	q.metrics.add(item)

	q.dirty.insert(item)
	if q.processing.has(item) {
		return
	}

	q.queue = append(q.queue, item)
	q.cond.Signal()
}

Get

Get是一个阻塞式的调用,如果queue中没有元素可以取,则调用sync.Cond.Wait()进行阻塞(注意Wait要写在一个for循环中,因为被唤醒之后,还要争锁)。

Get从queue中取出第一个元素,并加入到processing集合中,表示这个元素当前正在被某个goroutine处理。另外如果dirty中有此元素,则从dirty中将元素删除。

func (q *Type) Get() (item interface{}, shutdown bool) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
	for len(q.queue) == 0 && !q.shuttingDown {
		q.cond.Wait()
	}
	if len(q.queue) == 0 {
		// We must be shutting down.
		return nil, true
	}

	item, q.queue = q.queue[0], q.queue[1:]

	q.metrics.get(item)

	q.processing.insert(item)
	q.dirty.delete(item)

	return item, false
}

Done

标记一个元素刚刚被处理完了,因为这个元素刚刚在被处理,所以在processing集合中,处理完了,需要从此集合中删除。另外检查dirty集合,如果在处理的时候这个元素又被添加进来了,会被放到dirty集合中,如果dirty集合中有此元素,则添加到queue队列中。

在处理完一个元素的时候,不管处理成功还是失败,都要调用Done来标记此次处理结束,如果处理失败,可以选择Forget忽略元素或者使用接口AddRateLimited重新加入到队列中,下次继续处理。

func (q *Type) Done(item interface{}) {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()

	q.metrics.done(item)

	q.processing.delete(item)
	if q.dirty.has(item) {
		q.queue = append(q.queue, item)
		q.cond.Signal()
	}
}

ShutDown

调用q.cond.Broadcast唤醒所有的goroutine,被唤醒的goroutine不需要再争夺锁。同时修改shuttingDown标志位。

func (q *Type) ShutDown() {
	q.cond.L.Lock()
	defer q.cond.L.Unlock()
	q.shuttingDown = true
	q.cond.Broadcast()
}