etcd 中的 etcd-raft
模块是一个 raft 的标准实现,raftexample 通过这个 etcd-raft
模块构建了一个分布式的一致性的存储驱动,通过 raftexample 这个例子可以看下使用 raft 的设计思想。从这例子中可以看出 raft 是跟业务无关的,它只关心数据在几个 raft 实例之间的一致性,具体是什么数据它不关心。同时它也不关心数据持久化的问题,他只是提供一个 committed 状态,表示这个数据已经在多个实例之间达成共识,需要上层应用来实现持久化,也就是 apply。
etcd-raft
的实现主要在 etcd 源码 raft 包中,主要是 raft/node.go,raft/raft.go 这两个文件,前者暴露接口,后者是实际实现。本文暂不介绍 raft 实现,先从宏观上看下怎么使用 raft,以及 raft 是如何跟上层应用交互的。
raftexample
的 github 文档为:raftexample,根据文档,raftexample
主要包括三部分:
- a raft-backed key-value store:这个 key-value 存储就是
kvstore
,具体是在contrib/raftexample/kvstore.go
文件中。在 raftexample 这个示例中,扮演持久化存储的角色,实际实现是用 map 保存在内存中的。 - a REST API server:这个是一个 Http server,对外暴露了数据的 get、put 方法,用于数据的查询,添加。同时也暴露了 raft 集群配置更新的接口,比如添加实例、删除实例等。其实现主要是
contrib/raftexample/httpapi.go
中的httpKVAPI
。 - a raft consensus server:这个主要是说 raftNode,其实现为
contrib/raftexample/raft.go
,raftNode 是对 etcd raft 模块的封装,后者代表一个 raft 实例,raftNode 还有其他功能,比如:WAL 日志管理、快照管理、网络层相关功能。
对外的 GET/PUT 请求首先发送到 REST server,然后 REST server 调用 kvStore 的 Lookup
以及 Propose
方法,完成数据的查询以及写入,我们知道,kvStore 不是直接操作其 map 进行读写,毕竟要经过 raft 模块的共识处理。上层应用通知 raft 模块来进行共识处理,主要是通过 channel 来完成的,在 raftexample 中,主要的 channel 有:proposeC、confChangeC、commitC。在 raftexample 中,其实现流程是:httpServer 收到 put 请求时,调用 kvStore 的 Propose 方法,后者将数据写入 proposeC 这个 channel,raftNode 模块消费这个 channel,并最终调用 etcd raft 模块的 Propose 进行处理。其中 raftNode 调用 raft 模块主要是通过 Node 这个接口来进行的,Node 接口定义的文件为 raft/node.go
,定义大概如下:
// Node represents a node in a raft cluster.
type Node interface {
Tick()
Campaign(ctx context.Context) error
Propose(ctx context.Context, data []byte) error
Ready() <-chan Ready
Advance()
ReadIndex(ctx context.Context, rctx []byte) error
// 省去了其他方法和注释
}
参考《etcd 技术内幕》, raftexample 的整体架构如下。
下面大概从三个方面介绍 raftexample,也就是上面提到的三个模块 httpserver、kvstore、raftNode。另外,也通过 raftexample 来看下 snapshot 以及 wal 的实现逻辑,这两者作为单独的模块分析。
httpserver
httpserver 的实现较为简单,其定义如下,包含了持久存储 kvstore,以及一个处理配置变化的 channel。
type httpKVAPI struct {
store *kvstore
confChangeC chan<- raftpb.ConfChange
}
其定义了几个请求处理方法,严格来说是根据请求的 verb 来处理请求:
- PUT: 存储数据。
- GET:查询数据。
- POST: 集群配置变化。
- DELETE: 集群删除节点。
列举一下主要的逻辑,省略其他逻辑:
func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
key := r.RequestURI
defer r.Body.Close()
switch {
case r.Method == "PUT":
v, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Printf("Failed to read on PUT (%v)\n", err)
http.Error(w, "Failed on PUT", http.StatusBadRequest)
return
}
// 调用 store 的 Propose 方法,后者写 proposeC
h.store.Propose(key, string(v))
// Optimistic-- no waiting for ack from raft. Value is not yet
// committed so a subsequent GET on the key may return old value
w.WriteHeader(http.StatusNoContent)
case r.Method == "GET":
// 调用 lookup 查询
if v, ok := h.store.Lookup(key); ok {
w.Write([]byte(v))
} else {
http.Error(w, "Failed to GET", http.StatusNotFound)
}
case r.Method == "POST":
url, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Printf("Failed to read on POST (%v)\n", err)
http.Error(w, "Failed on POST", http.StatusBadRequest)
return
}
nodeId, err := strconv.ParseUint(key[1:], 0, 64)
if err != nil {
log.Printf("Failed to convert ID for conf change (%v)\n", err)
http.Error(w, "Failed on POST", http.StatusBadRequest)
return
}
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: nodeId,
Context: url,
}
// 直接写 confChangeC channel
h.confChangeC <- cc
}
}
raftNode
raftNode 是对 etcd raft 模块的封装,从上对接应用层(或者本身是个应用层,这个要看你怎么理解了),从下对接 raft 模块。从上对接主要是通过一些列 channel 来完成的,与 raft 通信,主要是通过 raft.Node 接口。raftNode 实现比较复杂,其定义如下:
// A key-value stream backed by raft
type raftNode struct {
proposeC <-chan string // proposed messages (k,v)
confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
commitC chan<- *string // entries committed to log (k,v)
errorC chan<- error // errors from raft session
id int // client ID for raft session
peers []string // raft peer URLs
join bool // node is joining an existing cluster
waldir string // path to WAL directory
snapdir string // path to snapshot directory
getSnapshot func() ([]byte, error)
lastIndex uint64 // index of log at start
confState raftpb.ConfState
snapshotIndex uint64
appliedIndex uint64
// raft backing for the commit/error channel
node raft.Node
raftStorage *raft.MemoryStorage
wal *wal.WAL
snapshotter *snap.Snapshotter
snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready
snapCount uint64
transport *rafthttp.Transport
stopc chan struct{} // signals proposal channel closed
httpstopc chan struct{} // signals http server to shutdown
httpdonec chan struct{} // signals http server shutdown complete
}
上面有几个比较重要的字段是 channel,也都介绍过了,还有就是 raft.Node
也就是跟 etcd raft 通信的接口,还有就是 *rafthttp.Transport
负责网络层。其中 serveChannels 主要用于各个 channel 的生产和消费。其中一个 goroutine 读取 proposeC 和 confChangeC,另一个 channel 通过调用 raft 模块的 Ready() 方法获取 ready 实例,并放到 commitC 同道中,用于 kvstore 消费。
func (rc *raftNode) serveChannels() {
snap, err := rc.raftStorage.Snapshot()
if err != nil {
panic(err)
}
rc.confState = snap.Metadata.ConfState
rc.snapshotIndex = snap.Metadata.Index
rc.appliedIndex = snap.Metadata.Index
defer rc.wal.Close()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
// send proposals over raft
go func() {
confChangeCount := uint64(0)
for rc.proposeC != nil && rc.confChangeC != nil {
select {
case prop, ok := <-rc.proposeC:
if !ok {
rc.proposeC = nil
} else {
// blocks until accepted by raft state machine
rc.node.Propose(context.TODO(), []byte(prop))
}
case cc, ok := <-rc.confChangeC:
if !ok {
rc.confChangeC = nil
} else {
confChangeCount++
cc.ID = confChangeCount
rc.node.ProposeConfChange(context.TODO(), cc)
}
}
}
// client closed channel; shutdown raft if not already
close(rc.stopc)
}()
// event loop on raft state machine updates
for {
select {
case <-ticker.C:
rc.node.Tick()
// store raft entries to wal, then publish over commit channel
case rd := <-rc.node.Ready():
rc.wal.Save(rd.HardState, rd.Entries)
if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.publishSnapshot(rd.Snapshot)
}
rc.raftStorage.Append(rd.Entries)
rc.transport.Send(rd.Messages)
if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
rc.stop()
return
}
rc.maybeTriggerSnapshot()
rc.node.Advance()
case err := <-rc.transport.ErrorC:
rc.writeError(err)
return
case <-rc.stopc:
rc.stop()
return
}
}
}
kvstore
kvstore 扮演持久化存储的角色,其定义如下,proposeC 就是跟 raftNOde 通信的 channel。
// a key-value store backed by raft
type kvstore struct {
proposeC chan<- string // channel for proposing updates
mu sync.RWMutex
kvStore map[string]string // current committed key-value pairs
snapshotter *snap.Snapshotter
}
// 查询方法就是直接读 kvStore
func (s *kvstore) Lookup(key string) (string, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
v, ok := s.kvStore[key]
return v, ok
}
// 添加方法是写 proposeC 通道。
func (s *kvstore) Propose(k string, v string) {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
log.Fatal(err)
}
s.proposeC <- buf.String()
}
其中 kvStore 的 readCommits 方法就是通过读取 commitC 通道,并写入 map.
func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
for data := range commitC {
if data == nil {
// done replaying log; new data incoming
// OR signaled to load snapshot
snapshot, err := s.snapshotter.Load()
if err == snap.ErrNoSnapshot {
return
}
if err != nil {
log.Panic(err)
}
log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
log.Panic(err)
}
continue
}
var dataKv kv
dec := gob.NewDecoder(bytes.NewBufferString(*data))
if err := dec.Decode(&dataKv); err != nil {
log.Fatalf("raftexample: could not decode message (%v)", err)
}
s.mu.Lock()
// 写 map
s.kvStore[dataKv.Key] = dataKv.Val
s.mu.Unlock()
}
if err, ok := <-errorC; ok {
log.Fatal(err)
}
}
快照
wal 日志
raftexample 同样没有实现自己的 wal,而是直接使用了 etcd 中的 wal 实现,wal 作为 raftNode 的一个字段。这里只关注 wal 是怎么使用的,写 wal 的时机是什么。在 raftNode 的定义中,关于 wal 有两个字段:
// A key-value stream backed by raft
type raftNode struct {
waldir string // path to WAL directory
wal *wal.WAL
// 省略其他字段
}
第一个是目录,第二个是 wal 的定义,从 WAL 的定义看,包含了很多东西,暂时先不关注 WAL 的定义。 从下面代码看,从 raft 模块的 Ready() 接口读到数据之后,立刻写 wal,也就是经过共识的数据写 wal,具体如下:
func (rc *raftNode) serveChannels() {
// 省略代码
// event loop on raft state machine updates
for {
select {
case <-ticker.C:
rc.node.Tick()
// store raft entries to wal, then publish over commit channel
case rd := <-rc.node.Ready():
// 写 wal
rc.wal.Save(rd.HardState, rd.Entries)
if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.publishSnapshot(rd.Snapshot)
}
rc.raftStorage.Append(rd.Entries)
rc.transport.Send(rd.Messages)
if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
rc.stop()
return
}
rc.maybeTriggerSnapshot()
rc.node.Advance()
// 省略代码
}
}
}
后面再详细分析下 snapshot 以及 wal 的实现,明天要开工了,暂时先不分析了。