Etcd put 请求过程:clientv3 发送请求到 EtcdServer

看一下 Etcd clientv3 将请求发送给 EtcdServer 的过程,本文基于的 Etcd 版本为 3.4。

从 clientv3 说起

在《在本地安装 Etcd 集群进行测试》 中介绍了 etcd clientv3 客户端的使用。大概有三个小步骤:1)配置一个 client config;2)通过 config 生成一个 client;3)通过这个 client 生成一个 KV client,通过这个 KV client 进行 put 、get 操作。

config = clientv3.Config {
	Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"},
	DialTimeout: 5 * time.Second,
}
if client, err = clientv3.New(config); err != nil {
	// panic(err), handle error
}
defer client.Close()

kvClient := clientv3.NewKV(client)
kvClient.Put(ctx, "k1", "v1")
getResp, err := kvClient.Get(ctx, "k1")

这里重点看下执行 get、put 请求的 kvClient。clientv3.NewKV(client) 返回了一个 KV 接口,该接口包含了六个方法,即操作数据库的基本方法。在 clientv3 中,实现该接口的为 kv 结构体。也就是具体请求的发起都是由这个 kv 结构体执行的。

// clientv3 中定义的 KV 接口,用来实现数据的 CRUD 操作
type KV interface {
	Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
	Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
	Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
	Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
	Do(ctx context.Context, op Op) (OpResponse, error)
	Txn(ctx context.Context) Txn
}

kv 结构体的实现如下,可以看到只有两个字段,一个是 pb.KVClient 类型,另一个是 []grpc.CallOption,后者是发起请求时的一些选项配置。前者是用来发起请求的,前者是一个 grpc 接口的实现。

type kv struct {
	remote   pb.KVClient
	callOpts []grpc.CallOption
}

在 grpc 的实现中,我们知道首先要定义一个 proto 文件,指定服务的接口,然后使用 protoc 等工具编译 proto 文件,为对应的服务生成客户端和服务端接口,假如定义的服务为 KV,则生成的接口分别为 KVClient 以及 KVServer,对于 KVClient 接口,一般生成的 *.pb.go go 代码中有默认实现,对于 KVServer 接口,则需要在服务端自己实现。

在 Etcd 代码中,对应的 proto 文件为 rpc.proto,在 Etcd 仓库中的位置为:etcd/etcdserver/etcdserverpb/rpc.proto。生成的 go 文件也在对应的目录下面。

对于上面这个 kv 结构体,其字段 remote 就是 grpc 中 KVClient 的实现,其没有使用 grpc 自动生成的客户端,而是通过 RetryKVClient 对其封装了一层,大概看了一下,感觉差不多,好像没有区别。顺便提一些 grpc 接口中定义的 KVClient 接口,跟上面的 KV 接口不一样,如下,也就是说 kv 接口对 grpc 中的客户端接口进行了适配。grpc 中定义的 KVClient 接口如下。

type KVClient interface {
	Range(ctx context.Context, in *RangeRequest, opts ...grpc.CallOption) (*RangeResponse, error)
	Put(ctx context.Context, in *PutRequest, opts ...grpc.CallOption) (*PutResponse, error)
	DeleteRange(ctx context.Context, in *DeleteRangeRequest, opts ...grpc.CallOption) (*DeleteRangeResponse, error)
	Txn(ctx context.Context, in *TxnRequest, opts ...grpc.CallOption) (*TxnResponse, error)
	Compact(ctx context.Context, in *CompactionRequest, opts ...grpc.CallOption) (*CompactionResponse, error)
}

retryKVClient 对 protoc 自动生成的 client 进行了封装:

type retryKVClient struct {
	kc pb.KVClient
}
// 具体封装适配的代码如下:
func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
	var err error
	switch op.t {
	case tRange:
		var resp *pb.RangeResponse
		resp, err = kv.remote.Range(ctx, op.toRangeRequest(), kv.callOpts...)
		if err == nil {
			return OpResponse{get: (*GetResponse)(resp)}, nil
		}
	case tPut:  //...
	case tDeleteRange:
	case tTxn:
	default:
		panic("Unknown op")
	}
	return OpResponse{}, toErr(ctx, err)
}

gRPC 服务端

上面介绍了 gRPC 客户端发起一些数据操作,现在看服务端的处理函数,因为是通过 gRPC 定义的,我们主要看是哪个结构体实现了 gRPC 的服务即可。proto 文件的定义路径为 etcdserverpb/rpc.proto,我们找一下 KVServer Interface 的实现,一个简单的方法是看其 RegisterKVServer 方法在哪里调用了,看下哪里将其注册为 gRPC server。发现相关代码为:

// 注册 grpc server 的代码
func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
    // 将 NewQuotaKVServer 的返回值作为 grpc KV server
	pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
	pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
	pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
	pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
    return grpcServer
}
// QuotaKVServer 对 kvServer 进行了封装,覆盖了 Put 以及 Txn 方法,进行了 Quota 检查。
func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer {
	return &quotaKVServer{
		NewKVServer(s),
		quotaAlarmer{etcdserver.NewBackendQuota(s, "kv"), s, s.ID()},
	}
}
// kvServer 结构体实现了 grpc 代码中的 KVServer 接口
type kvServer struct {
	hdr header
	kv  etcdserver.RaftKV
	maxTxnOps uint
}
func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer {
	return &kvServer{hdr: newHeader(s), kv: s, maxTxnOps: s.Cfg.MaxTxnOps}
}

从上面的方法可以看出,最终注册为 gRPC server 的是 quotaKVServer,也就是我们客户端发出的请求,最终由其处理,其内嵌了 kvServer 结构体,并覆盖了 kvServer 结构体的 Put 以及 Txn 方法。

在上面的代码中,我们看到是 kvServer 模块实现了 grpc server 来处理 client 的请求,同时在构造 kvServer 的时候,传入了 etcdserver.EtcdServer 参数。下面我们看一下 EtcdServer 在 kvServer中的作用,及 kvServer 是如何跟 EtcdServer 交互的。这里直接贴一下 kvServer 的实现(代码路径为:etcd/etcdserver/api/v3rpc/key.go),相关逻辑可以直接看注释。从代码中可以看出,kvServer 又是对 EtcdServer 的封装。

type kvServer struct {
	hdr header
	kv  etcdserver.RaftKV
	maxTxnOps uint
}
// 构造 KVServer 的时候,EtcdServer 赋值给 kv feild 字段
func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer {
	return &kvServer{hdr: newHeader(s), kv: s, maxTxnOps: s.Cfg.MaxTxnOps}
}
// kvServer 的 Range 方法直接调用 etcdServer 的 Range 方法
func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
	if err := checkRangeRequest(r); err != nil {
		return nil, err
	}

	resp, err := s.kv.Range(ctx, r)
	if err != nil {
		return nil, togRPCError(err)
	}

	s.hdr.fill(resp.Header)
	return resp, nil
}
// Put、DeleteRange 等方法类似,不再贴出

从上面可以看出,基本上,kvServer 都是直接调用 EtcdServer 的方法实现的。在下一篇文件中,将分析 EtcdServer 是具体如何处理逻辑的。

在 Etcd 代码中,EtcdServer 是处理 client 请求并返回结果的模块,EtcdServer 不处理 raft 相关逻辑,关于一致性相关逻辑会调用 raft 模块进行处理,其交互过程可以参考下图。 java-javascript

在 go.mod 中引入 clientv3

另外在使用 Etcd clientv3 的时候,一直想使用 v3.4.3 版本,但是怎么都无法正常的 import,一直提示 invalid version: unknown revision 3.4.3,只能通过下面方式引入 v3.5 版本或者 v3.3 版本。后来查了下,在 v3.4 Etcd 代码的 go.mod 文件中 module 的定义有问题,需要将 module go.etcd.io/etcd 改为 module go.etcd.io/etcd/v3

这个问题可以参考《Etcd使用go module的灾难》以及 go get error for 3.4.0 #11154,解决方式有: 1)自己 fork 一份代码,然后改下 go.mod 文件。2)通过 commit 号引入,go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738,获取时间戳和 commit 号的命令如下:

git clone --branch v3.4.3 https://github.com/etcd-io/etcd.git
cd etcd
echo -n v0.0.0-
TZ=UTC git --no-pager show \
  --quiet \
  --abbrev=12 \
  --date='format-local:%Y%m%d%H%M%S' \
  --format="%cd-%h"

通过版本号,只能引入 v3.3 和 v3.5 版本。

// 引入 3.5 版本
import (
	clientv3 "go.etcd.io/etcd/client/v3"
)
// go.mod 的配置如下
require (
	go.etcd.io/etcd/client/v3 v3.5.6
)
// ------------------

// 引入 3.3 版本
import (
	clientv3 "go.etcd.io/etcd/clientv3"
)

// go.mod 配置如下
require (
		go.etcd.io/etcd v3.3.27+incompatible
)
replace (
	google.golang.org/grpc => google.golang.org/grpc v1.26.0
)