K8s Apiserver max-requests-inflight 流控实现

了解下,Kubernetes Apiserver 中的 --max-mutating-requests-inflight(Default: 200)以及 --max-requests-inflight(Default: 400)是怎么实现的。

代码路径为:kubernetes/staging/src/k8s.io/apiserver/pkg/server/config.go

Apiserver 中,handler 有众多中间件,在 http server 中间件的实现中,其参数是一个 handler,返回值也是一个 handler,但是这个返回的 handler 是对参数传进来的 handler 的一个封装,添加一些业务逻辑,最常见的包括:添加审计日志、添加监控等。等审计日志记录了,或者监控添加好了,就可以调用参数传递进来的 handler 来进行真正的业务处理。

本示例研究的是服务端流控,那就是在真正调用参数传递进来的 handler 之前,要进行请求数的检查,这个请求数是 inflight 请求,也就是正在处理的请求数检查,如果超过了指定的值,则返回 429 StatusTooManyRequests 错误。没有超过则正常处理。

在 config.go 文件中,可以看到 Apiserver 有众多中间件

func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
	handler := genericapifilters.WithAuthorization(apiHandler, c.Authorization.Authorizer, c.Serializer)
	handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
	handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
	handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
	failedHandler := genericapifilters.Unauthorized(c.Serializer, c.Authentication.SupportsBasicAuth)
	failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyChecker)
	handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
	handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
	handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
	handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
	handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
	handler = genericapifilters.WithCacheControl(handler)
	handler = genericfilters.WithPanicRecovery(handler)
	return handler
}

上面看到了很多跟鉴权相关的,比较眼熟的一个是 withAudit,一看就是添加审计日志用的,还有个 genericfilters.WithMaxInFlightLimit,就是流量控制了,我们看一下这个中间件的实现。

另一个需要关心的是这些 handler 的处理顺序(套娃 handler),也就是来了一个请求,哪一个 handler 最先被执行?我们可以把问题简化,对于一个中间件来说,其大致框架如下:

func WithMaxInFlightLimit(handler http.Handler) http.Handler {
    
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // before 处理逻辑
        // ... ...

        handler.ServeHTTP(w, r)
        
        // after 处理逻辑
        // ... ...
    }
}

也就是说 before 逻辑是早于参数传递进来的 handler 执行的,但是 after 逻辑是晚于 handler 执行的,但是另一方面 Apiserver 的 handler 一般都是 before 逻辑,因此可以认为,越在下面的 handler 越早执行

apiserver 流控的完整代码如下,除去跟监控相关的代码,代码也比较少。思路就是用一个 channel 作为流控队列,能写进去就正常处理,不能写进去就直接返回(当前还要写错误码以及错误信息,相当于把请求拦截了。)

// WithMaxInFlightLimit limits the number of in-flight requests to buffer size of the passed in channel.
func WithMaxInFlightLimit(
	handler http.Handler,
	nonMutatingLimit int,
	mutatingLimit int,
	longRunningRequestCheck apirequest.LongRunningRequestCheck,
) http.Handler {
	startOnce.Do(startRecordingUsage)
	if nonMutatingLimit == 0 && mutatingLimit == 0 {
		return handler
    }
    
    // 声明的全局的 channel,用来作为请求的阻塞队列,初始化大小
	var nonMutatingChan chan bool
	var mutatingChan chan bool
	if nonMutatingLimit != 0 {
		nonMutatingChan = make(chan bool, nonMutatingLimit)
	}
	if mutatingLimit != 0 {
		mutatingChan = make(chan bool, mutatingLimit)
	}

    // 中间件的返回参数,也是一个 handler,封装原来的 handler
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		ctx := r.Context()
		requestInfo, ok := apirequest.RequestInfoFrom(ctx)
		if !ok {
			handleError(w, r, fmt.Errorf("no RequestInfo found in context, handler chain must be wrong"))
			return
		}

		// Skip tracking long running events.
		if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) {
			handler.ServeHTTP(w, r)
			return
		}

		var c chan bool
		isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb)
		if isMutatingRequest {
			c = mutatingChan
		} else {
			c = nonMutatingChan
		}

		if c == nil { // 这里是对应的 verb 没有设置 limit(比如是 mutate 请求,但是mutatingLimit参数置为0),直接转发到原来的 http handler 处理,不做限制
			handler.ServeHTTP(w, r)
		} else {

			select {
                //尝试向阻塞队列里写数据,能写进去,表示正在处理的请求没有把 channel 打满
			case c <- true:
				var mutatingLen, readOnlyLen int
				if isMutatingRequest {
					mutatingLen = len(mutatingChan)
				} else {
					readOnlyLen = len(nonMutatingChan)
				}

				defer func() {
                    // handler 处理完了之后,从阻塞队列 channel 中读出一个元素,表示正在处理的请求少了一个
					<-c
					if isMutatingRequest {
						watermark.recordMutating(mutatingLen)
					} else {
						watermark.recordReadOnly(readOnlyLen)
					}

                }()
                // 能写进去说明当前未达到限制,可以直接处理
				handler.ServeHTTP(w, r)

			default:
                // 省去了跟监控相关的代码。

                // 下面的 tooManyRequest 就是 channel 写不进去的处理逻辑,也就是当前所有处理请求数量已经达到限制了, 这个限制就是 channel 的长度。
				tooManyRequests(r, w)
			}
		}
	})
}

// 处理请求太多了怎么办?返回错误码、header、body,直接返回
func tooManyRequests(req *http.Request, w http.ResponseWriter) {
	// Return a 429 status indicating "Too Many Requests"
	w.Header().Set("Retry-After", acquireReteyAfterTime())
	http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests)
}

// 下面是 http.Error() 的实现
// Error replies to the request with the specified error message and HTTP code.
// It does not otherwise end the request; the caller should ensure no further
// writes are done to w.
// The error message should be plain text.
func Error(w ResponseWriter, error string, code int) {
	w.Header().Set("Content-Type", "text/plain; charset=utf-8")
	w.Header().Set("X-Content-Type-Options", "nosniff")
	w.WriteHeader(code)
	fmt.Fprintln(w, error)
}