K8s 调度器中 findNodesThatFit 实现

由点到线,由线到面。本文针对的k8s的版本是1.16.0

findNodesThatFit是scheduler调用predicate的实现,即对每个node调用一系列predicate方法,查看这个node是否满足这个predicate,并返回一个满足这些predicate的node的列表,后续会对这个列表执行priority操作。 博客最后有这个函数的完整代码。

podFitsOnNode

检查node是否满足条件的具体方法,返回这个节点是否满足条件,如果不满足,则返回原因。另外podFitsOnNode有一个参数AlwaysCheckAllPredicates用来控制是否短路predicate,短路是指,如果一个node执行一个predicate失败了,后面的就不执行了。 podFitsOnNode调用一系列predicates是有序的,目前是安装静态顺序,如下所示:

	for _, predicateKey := range predicates.Ordering() {

ParallelizeUntil

ParallelizeUntil是一个并行任务框架,可以让一个队列里的特定数目(假设为k)的任务并行运行,思路就是启动k个goroutine去读(利用for range循环读,能拿到任务就处理)一个channel。 主要代码是下面一行,checkNode方法是对一个node调用所有的predicate(具体是调用podFitsOnNode方法)。

workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

// ParallelizeUntil is a framework that allows for parallelizing N
// independent pieces of work until done or the context is canceled.
func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {
	// stop是一个只读的channel,表示传进来的ctx的stop channel。
	var stop <-chan struct{}
	if ctx != nil {
		stop = ctx.Done()
	}

    // toProcess是存放所有任务编号的队列
	toProcess := make(chan int, pieces)
	for i := 0; i < pieces; i++ {
		toProcess <- i
	}
	// 把所有任务编号放到channel之后,关闭channel
	close(toProcess)

	if pieces < workers {
		workers = pieces
	}

	// wg用来等待worker全部结束,下面for循环,启动workers个数量的goroutine,每个goroutine都是一个
	// for range不断从队列(channel)里读取任务(读一个任务的id,然后把这个id传递给处理函数`doWorkPiece`)。
	wg := sync.WaitGroup{}
	wg.Add(workers)
	for i := 0; i < workers; i++ {
		go func() {
			defer utilruntime.HandleCrash()
			defer wg.Done()
			for piece := range toProcess {
				// 一旦stop channel关闭就返回
				select {
				case <-stop:
					return
				default:
					doWorkPiece(piece)
				}
			}
		}()
	}
	wg.Wait()
}

context

findNodesThatFit中有一行代码创建了一个context,用来结束worker goroutine,结束goroutine的条件是已经找到了足够的node,不需要再查找其他node了,关于足够的node,有两个默认参数:minFeasibleNodesToFind,DefaultPercentageOfNodesToScore,默认值分别为100,以及50%,分别表示最少的查找的node,以及一个查找的可行的node占所有node的百分比。

	ctx, cancel := context.WithCancel(context.Background())

如果已经找到了足够的node,则调用创建context时返回的cancel函数,关闭通道,从而通知worker停止工作。

	length := atomic.AddInt32(&filteredLen, 1)
	if length > numNodesToFind {
		cancel()
		atomic.AddInt32(&filteredLen, -1)
	} 

此外,generic_scheduler还允许对scheduler进行扩展,用来管理不是由k8s直接管理的资源。

if len(filtered) > 0 && len(g.extenders) != 0 {

	// 执行extender的过滤操作
	filteredList, failedMap, err := extender.Filter(pod, filtered, g.nodeInfoSnapshot.NodeInfoMap)
	// 忽略其他代码
}

完整代码如下:

func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginContext, 
pod *v1.Pod, nodeLister algorithm.NodeLister) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) {

	var filtered []*v1.Node
	failedPredicateMap := FailedPredicateMap{}
	filteredNodesStatuses := framework.NodeToStatusMap{}

	if len(g.predicates) == 0 {
		filtered = nodeLister.ListNodes()
	} else {
		allNodes := int32(g.cache.NodeTree().NumNodes())
		numNodesToFind := g.numFeasibleNodesToFind(allNodes)

		// Create filtered list with enough space to avoid growing it
		// and allow assigning.
		filtered = make([]*v1.Node, numNodesToFind)
		errs := errors.MessageCountMap{}
		var (
			predicateResultLock sync.Mutex
			filteredLen         int32
		)

		ctx, cancel := context.WithCancel(context.Background())

		// We can use the same metadata producer for all nodes.
		meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)

		checkNode := func(i int) {
			nodeName := g.cache.NodeTree().Next()

			fits, failedPredicates, err := podFitsOnNode(
				pod,
				meta,
				g.nodeInfoSnapshot.NodeInfoMap[nodeName],
				g.predicates,
				g.schedulingQueue,
				g.alwaysCheckAllPredicates,
			)
			if err != nil {
				predicateResultLock.Lock()
				errs[err.Error()]++
				predicateResultLock.Unlock()
				return
			}
			if fits {
				// Iterate each plugin to verify current node
				status := g.framework.RunFilterPlugins(pluginContext, pod, nodeName)
				if !status.IsSuccess() {
					predicateResultLock.Lock()
					filteredNodesStatuses[nodeName] = status
					if status.Code() != framework.Unschedulable {
						errs[status.Message()]++
					}
					predicateResultLock.Unlock()
					return
				}

				length := atomic.AddInt32(&filteredLen, 1)
				if length > numNodesToFind {
					cancel()
					atomic.AddInt32(&filteredLen, -1)
				} else {
					filtered[length-1] = g.nodeInfoSnapshot.NodeInfoMap[nodeName].Node()
				}
			} else {
				predicateResultLock.Lock()
				failedPredicateMap[nodeName] = failedPredicates
				predicateResultLock.Unlock()
			}
		}

		// Stops searching for more nodes once the configured number of feasible nodes
		// are found.
		workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

		filtered = filtered[:filteredLen]
		if len(errs) > 0 {
			return []*v1.Node{}, FailedPredicateMap{}, framework.NodeToStatusMap{}, errors.CreateAggregateFromMessageCountMap(errs)
		}
	}

	if len(filtered) > 0 && len(g.extenders) != 0 {
		for _, extender := range g.extenders {
			if !extender.IsInterested(pod) {
				continue
			}
			filteredList, failedMap, err := extender.Filter(pod, filtered, g.nodeInfoSnapshot.NodeInfoMap)
			if err != nil {
				if extender.IsIgnorable() {
					klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
						extender, err)
					continue
				}

				return []*v1.Node{}, FailedPredicateMap{}, framework.NodeToStatusMap{}, err
			}

			for failedNodeName, failedMsg := range failedMap {
				if _, found := failedPredicateMap[failedNodeName]; !found {
					failedPredicateMap[failedNodeName] = []predicates.PredicateFailureReason{}
				}
				failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
			}
			filtered = filteredList
			if len(filtered) == 0 {
				break
			}
		}
	}
	return filtered, failedPredicateMap, filteredNodesStatuses, nil
}