当启动了一个etcd server的时候,会生成一些数据文件,这些文件便是etcd storage,主要分析wal和snapshot
➜ tree
.
└── member
├── snap
│ └── db
└── wal
├── 0.tmp
└── 0000000000000000-0000000000000000.wal
WAL
WAL是与磁盘打交道的模块,这个是用于数据持久化的;
存储结构
几个重要的属性
- start: 快照开始读的地方
- decoder、encoder:将
entry进行序列化和反序列化 - state:wal的头部
- metadata:每个wal的头部信息,记录着
NodeId、ClusterID
// etcdserver/raft.go#startNode
metadata := pbutil.MustMarshal(
&pb.Metadata{
NodeID: uint64(member.ID),
ClusterID: uint64(cl.ID()),
},
)
- locks: 文件锁
一个wal文件是由多个record组成,record的结构如下:
type Record struct {
// 类型,可以是metadata、entry、state、crc、snapshot
Type int64 `protobuf:"varint,1,opt,name=type" json:"type"`
// 校验码
Crc uint32 `protobuf:"varint,2,opt,name=crc" json:"crc"`
Data []byte `protobuf:"bytes,3,opt,name=data" json:"data,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
record经过encode之后就被write到file对象当中
初始化
初始化过程从startNode开始,主要做了如下几个动作
- 根据文件创建
wal dir - 生成
wal的名称文件 - 对文件进行预置操作,默认是
64M大小
➜ wal ll -h
total 8
-rw------- 1 admin wheel 61M Dec 23 14:58 0.tmp
-rw------- 1 admin wheel 61M Dec 23 14:58 0000000000000000-0000000000000000.wal
- 将
crc写入 - 将
metadata的数据写入进去 - 将
snapshot数据写入 - 对文件进行重命名(0.tmp)
- 将重命名操作同步至
parent dir
其他操作
OpenWAL:当node重启,获取销毁建立的时候,会根据先加载snapshot,根据snapshot的index来到wal dir当中获取指定的walVerify:根据snapshot和wal dir来获取指定的wal,对其crc、metadata、snapthot进行校验,看看这个wal文件是否中间是否存在中断的情况,这个主要是用于测试用例;Save:在node start|restart、dump、restore命令的时候会执行wal save操作,主要干了以下几个事情- 接收一个
HardState,包括:term、vote、commit;同时接收所需要保存的entries wal对象里面的state与传入的state进行对比,校验term、vote是否相等- 开始保存
entry,新建一个类型为entry的record,然后序列化,落盘 - 同时将
wal对象里面的state用state类型的record进行封装,序列化后落盘 - 若文件的大小<
SegmentSizeBytes--64M,直接flush,若大于,需要进行cut操作
- 接收一个
cut:将offset超出的部分,重新写入一个文件,再走一次create逻辑,这个时候wal的文件序列会+1,即1.tmp
Snapshot
snapshot是一个快照,用于保存数据信息,便于恢复;经过压缩大小会比wal小;本篇是基于etcdServer里面的逻辑写的,etcd里面还有一个raftexample/raft.go,这个是一个简易版本,逻辑差不多,方便读者理解raft的逻辑;
存储结构
从pb文件里面可以看到存储结构如下:
message Snapshot {
optional bytes data = 1;
optional SnapshotMetadata metadata = 2 [(gogoproto.nullable) = false];
}
message SnapshotMetadata {
optional ConfState conf_state = 1 [(gogoproto.nullable) = false];
optional uint64 index = 2 [(gogoproto.nullable) = false];
optional uint64 term = 3 [(gogoproto.nullable) = false];
}
message ConfState {
// The voters in the incoming config. (If the configuration is not joint,
// then the outgoing config is empty).
repeated uint64 voters = 1;
// The learners in the incoming config.
repeated uint64 learners = 2;
// The voters in the outgoing config.
repeated uint64 voters_outgoing = 3;
// The nodes that will become learners when the outgoing config is removed.
// These nodes are necessarily currently in nodes_joint (or they would have
// been added to the incoming config right away).
repeated uint64 learners_next = 4;
// If set, the config is joint and Raft will automatically transition into
// the final config (i.e. remove the outgoing config) when this is safe.
optional bool auto_leave = 5 [(gogoproto.nullable) = false];
}
在raft/storage.go当中可以看到Storeage是一个interface,目前只有一种基于内存的实现MemoryStorage
- 里面有一个互斥锁
- 有
snapshot - 还有对应的
entries - 这个
hardState是用来记录这个快照的term、vote、commit
type MemoryStorage struct {
// Protects access to all fields. Most methods of MemoryStorage are
// run on the raft goroutine, but Append() is run on an application
// goroutine.
sync.Mutex
hardState pb.HardState
snapshot pb.Snapshot
// ents[i] has raft log position i+snapshot.Metadata.Index
ents []pb.Entry
}
初始化
在NewServer的时候会初始化snapshotter
ss := snap.New(cfg.Logger, cfg.SnapDir())
何时触发快照
server启动后,会有一个forever的处理,里面会有apply的操作,这个applyAll会触发快照的生成
// etcdserver/server.go
......
for {
select {
case ap := <-s.r.apply():
f := func(context.Context) { s.applyAll(&ep, &ap) }
sched.Schedule(f)
......
}
而这个apply是在node start的时候,同样启动了一个forever,监听node Ready通道;当上层业务通过调用getSnapshot的时候就会触发此ready操作,Ready是一个状态集合,维护着各种状态,是否可以读,可以写等;
另外,可以看一下测试类里面的代码:etcdserver/server_test.go#TestTriggerSnap
生成快照
getSnapshot将用户需要做快照的数据进行传递过来,然后会进行两步操作
- 保存在wal:用
snapshot类型的record进行封装,然后序列化,落盘 - 保存snapshot:用
Snapshot对象进行封装,然后序列化,落盘至snapshot的文件当中,snapshot文件命名格式为:Term-Index.snap
if err := rc.wal.SaveSnapshot(walSnap); err != nil {
return err
}
if err := rc.snapshotter.SaveSnap(snap); err != nil {
return err
}
需要注意的:快照是follower在本地完成的,不需要leader向其发送消息;因为follower是完整的备份事项,leader只需要将snapshot所带的term和index传递过来即可;
func (r *raft) handleSnapshot(m pb.Message) {
// message里面只是一个snapshot对象,包含index和term
sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
if r.restore(m.Snapshot) {
r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
r.id, r.raftLog.committed, sindex, sterm)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
} else {
r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
r.id, r.raftLog.committed, sindex, sterm)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
}
}