看一下 Etcd clientv3 将请求发送给 EtcdServer 的过程,本文基于的 Etcd 版本为 3.4。
从 clientv3 说起
在《在本地安装 Etcd 集群进行测试》 中介绍了 etcd clientv3 客户端的使用。大概有三个小步骤:1)配置一个 client config;2)通过 config 生成一个 client;3)通过这个 client 生成一个 KV client,通过这个 KV client 进行 put 、get 操作。
config = clientv3.Config {
Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"},
DialTimeout: 5 * time.Second,
}
if client, err = clientv3.New(config); err != nil {
// panic(err), handle error
}
defer client.Close()
kvClient := clientv3.NewKV(client)
kvClient.Put(ctx, "k1", "v1")
getResp, err := kvClient.Get(ctx, "k1")
这里重点看下执行 get、put 请求的 kvClient。clientv3.NewKV(client)
返回了一个 KV 接口,该接口包含了六个方法,即操作数据库的基本方法。在 clientv3 中,实现该接口的为 kv
结构体。也就是具体请求的发起都是由这个 kv
结构体执行的。
// clientv3 中定义的 KV 接口,用来实现数据的 CRUD 操作
type KV interface {
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
Do(ctx context.Context, op Op) (OpResponse, error)
Txn(ctx context.Context) Txn
}
kv
结构体的实现如下,可以看到只有两个字段,一个是 pb.KVClient
类型,另一个是 []grpc.CallOption
,后者是发起请求时的一些选项配置。前者是用来发起请求的,前者是一个 grpc 接口的实现。
type kv struct {
remote pb.KVClient
callOpts []grpc.CallOption
}
在 grpc 的实现中,我们知道首先要定义一个 proto
文件,指定服务的接口,然后使用 protoc
等工具编译 proto
文件,为对应的服务生成客户端和服务端接口,假如定义的服务为 KV
,则生成的接口分别为 KVClient
以及 KVServer
,对于 KVClient
接口,一般生成的 *.pb.go
go 代码中有默认实现,对于 KVServer
接口,则需要在服务端自己实现。
在 Etcd 代码中,对应的 proto 文件为 rpc.proto
,在 Etcd 仓库中的位置为:etcd/etcdserver/etcdserverpb/rpc.proto
。生成的 go 文件也在对应的目录下面。
对于上面这个 kv
结构体,其字段 remote
就是 grpc 中 KVClient
的实现,其没有使用 grpc 自动生成的客户端,而是通过 RetryKVClient
对其封装了一层,大概看了一下,感觉差不多,好像没有区别。顺便提一些 grpc 接口中定义的 KVClient
接口,跟上面的 KV
接口不一样,如下,也就是说 kv
接口对 grpc 中的客户端接口进行了适配。grpc 中定义的 KVClient 接口如下。
type KVClient interface {
Range(ctx context.Context, in *RangeRequest, opts ...grpc.CallOption) (*RangeResponse, error)
Put(ctx context.Context, in *PutRequest, opts ...grpc.CallOption) (*PutResponse, error)
DeleteRange(ctx context.Context, in *DeleteRangeRequest, opts ...grpc.CallOption) (*DeleteRangeResponse, error)
Txn(ctx context.Context, in *TxnRequest, opts ...grpc.CallOption) (*TxnResponse, error)
Compact(ctx context.Context, in *CompactionRequest, opts ...grpc.CallOption) (*CompactionResponse, error)
}
retryKVClient 对 protoc 自动生成的 client 进行了封装:
type retryKVClient struct {
kc pb.KVClient
}
// 具体封装适配的代码如下:
func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
var err error
switch op.t {
case tRange:
var resp *pb.RangeResponse
resp, err = kv.remote.Range(ctx, op.toRangeRequest(), kv.callOpts...)
if err == nil {
return OpResponse{get: (*GetResponse)(resp)}, nil
}
case tPut: //...
case tDeleteRange:
case tTxn:
default:
panic("Unknown op")
}
return OpResponse{}, toErr(ctx, err)
}
gRPC 服务端
上面介绍了 gRPC 客户端发起一些数据操作,现在看服务端的处理函数,因为是通过 gRPC 定义的,我们主要看是哪个结构体实现了 gRPC 的服务即可。proto
文件的定义路径为 etcdserverpb/rpc.proto
,我们找一下 KVServer
Interface 的实现,一个简单的方法是看其 RegisterKVServer
方法在哪里调用了,看下哪里将其注册为 gRPC server。发现相关代码为:
// 注册 grpc server 的代码
func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
// 将 NewQuotaKVServer 的返回值作为 grpc KV server
pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
return grpcServer
}
// QuotaKVServer 对 kvServer 进行了封装,覆盖了 Put 以及 Txn 方法,进行了 Quota 检查。
func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer {
return "aKVServer{
NewKVServer(s),
quotaAlarmer{etcdserver.NewBackendQuota(s, "kv"), s, s.ID()},
}
}
// kvServer 结构体实现了 grpc 代码中的 KVServer 接口
type kvServer struct {
hdr header
kv etcdserver.RaftKV
maxTxnOps uint
}
func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer {
return &kvServer{hdr: newHeader(s), kv: s, maxTxnOps: s.Cfg.MaxTxnOps}
}
从上面的方法可以看出,最终注册为 gRPC server 的是 quotaKVServer,也就是我们客户端发出的请求,最终由其处理,其内嵌了 kvServer 结构体,并覆盖了 kvServer 结构体的 Put 以及 Txn 方法。
在上面的代码中,我们看到是 kvServer
模块实现了 grpc server 来处理 client 的请求,同时在构造 kvServer
的时候,传入了 etcdserver.EtcdServer
参数。下面我们看一下 EtcdServer
在 kvServer中的作用,及 kvServer 是如何跟 EtcdServer
交互的。这里直接贴一下 kvServer 的实现(代码路径为:etcd/etcdserver/api/v3rpc/key.go
),相关逻辑可以直接看注释。从代码中可以看出,kvServer 又是对 EtcdServer 的封装。
type kvServer struct {
hdr header
kv etcdserver.RaftKV
maxTxnOps uint
}
// 构造 KVServer 的时候,EtcdServer 赋值给 kv feild 字段
func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer {
return &kvServer{hdr: newHeader(s), kv: s, maxTxnOps: s.Cfg.MaxTxnOps}
}
// kvServer 的 Range 方法直接调用 etcdServer 的 Range 方法
func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
if err := checkRangeRequest(r); err != nil {
return nil, err
}
resp, err := s.kv.Range(ctx, r)
if err != nil {
return nil, togRPCError(err)
}
s.hdr.fill(resp.Header)
return resp, nil
}
// Put、DeleteRange 等方法类似,不再贴出
从上面可以看出,基本上,kvServer
都是直接调用 EtcdServer
的方法实现的。在下一篇文件中,将分析 EtcdServer 是具体如何处理逻辑的。
在 Etcd 代码中,EtcdServer 是处理 client 请求并返回结果的模块,EtcdServer 不处理 raft 相关逻辑,关于一致性相关逻辑会调用 raft 模块进行处理,其交互过程可以参考下图。
在 go.mod 中引入 clientv3
另外在使用 Etcd clientv3 的时候,一直想使用 v3.4.3 版本,但是怎么都无法正常的 import,一直提示 invalid version: unknown revision 3.4.3
,只能通过下面方式引入 v3.5 版本或者 v3.3 版本。后来查了下,在 v3.4 Etcd 代码的 go.mod 文件中 module 的定义有问题,需要将 module go.etcd.io/etcd
改为 module go.etcd.io/etcd/v3
,
这个问题可以参考《Etcd使用go module的灾难》以及 go get error for 3.4.0 #11154,解决方式有: 1)自己 fork 一份代码,然后改下 go.mod 文件。2)通过 commit 号引入,go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738
,获取时间戳和 commit 号的命令如下:
git clone --branch v3.4.3 https://github.com/etcd-io/etcd.git
cd etcd
echo -n v0.0.0-
TZ=UTC git --no-pager show \
--quiet \
--abbrev=12 \
--date='format-local:%Y%m%d%H%M%S' \
--format="%cd-%h"
通过版本号,只能引入 v3.3 和 v3.5 版本。
// 引入 3.5 版本
import (
clientv3 "go.etcd.io/etcd/client/v3"
)
// go.mod 的配置如下
require (
go.etcd.io/etcd/client/v3 v3.5.6
)
// ------------------
// 引入 3.3 版本
import (
clientv3 "go.etcd.io/etcd/clientv3"
)
// go.mod 配置如下
require (
go.etcd.io/etcd v3.3.27+incompatible
)
replace (
google.golang.org/grpc => google.golang.org/grpc v1.26.0
)