当启动了一个etcd server的时候,会生成一些数据文件,这些文件便是etcd storage,主要分析walsnapshot

➜  tree
.
└── member
    ├── snap
    │   └── db
    └── wal
        ├── 0.tmp
        └── 0000000000000000-0000000000000000.wal

WAL

WAL是与磁盘打交道的模块,这个是用于数据持久化的;

存储结构

几个重要的属性

  • start: 快照开始读的地方
  • decoder、encoder:将entry进行序列化和反序列化
  • state:wal的头部
  • metadata:每个wal的头部信息,记录着NodeIdClusterID
// etcdserver/raft.go#startNode
metadata := pbutil.MustMarshal(
		&pb.Metadata{
			NodeID:    uint64(member.ID),
			ClusterID: uint64(cl.ID()),
		},
	)
  • locks: 文件锁

一个wal文件是由多个record组成,record的结构如下:

type Record struct {
  // 类型,可以是metadata、entry、state、crc、snapshot
	Type             int64  `protobuf:"varint,1,opt,name=type" json:"type"`
  // 校验码
	Crc              uint32 `protobuf:"varint,2,opt,name=crc" json:"crc"`
	Data             []byte `protobuf:"bytes,3,opt,name=data" json:"data,omitempty"`
	XXX_unrecognized []byte `json:"-"`
}

record经过encode之后就被writefile对象当中

初始化

初始化过程从startNode开始,主要做了如下几个动作

  • 根据文件创建wal dir
  • 生成wal的名称文件
  • 对文件进行预置操作,默认是64M大小
➜  wal ll -h
total 8
-rw-------  1 admin  wheel    61M Dec 23 14:58 0.tmp
-rw-------  1 admin  wheel    61M Dec 23 14:58 0000000000000000-0000000000000000.wal
  • crc写入
  • metadata的数据写入进去
  • snapshot数据写入
  • 对文件进行重命名(0.tmp)
  • 将重命名操作同步至parent dir

其他操作

  • OpenWAL:当node重启,获取销毁建立的时候,会根据先加载snapshot,根据snapshotindex来到wal dir当中获取指定的wal
  • Verify:根据snapshotwal dir来获取指定的wal,对其crcmetadatasnapthot进行校验,看看这个wal文件是否中间是否存在中断的情况,这个主要是用于测试用例;
  • Save:在node start|restartdumprestore命令的时候会执行wal save操作,主要干了以下几个事情
    • 接收一个 HardState,包括:termvotecommit;同时接收所需要保存的entries
    • wal对象里面的state与传入的state进行对比,校验termvote是否相等
    • 开始保存entry,新建一个类型为entryrecord,然后序列化,落盘
    • 同时将wal对象里面的statestate类型的record进行封装,序列化后落盘
    • 若文件的大小< SegmentSizeBytes--64M,直接flush,若大于,需要进行cut操作
  • cut:将offset超出的部分,重新写入一个文件,再走一次create逻辑,这个时候wal的文件序列会+1,即1.tmp

Snapshot

snapshot是一个快照,用于保存数据信息,便于恢复;经过压缩大小会比wal小;本篇是基于etcdServer里面的逻辑写的,etcd里面还有一个raftexample/raft.go,这个是一个简易版本,逻辑差不多,方便读者理解raft的逻辑;

存储结构

pb文件里面可以看到存储结构如下:

message Snapshot {
	optional bytes            data     = 1;
	optional SnapshotMetadata metadata = 2 [(gogoproto.nullable) = false];
}
message SnapshotMetadata {
	optional ConfState conf_state = 1 [(gogoproto.nullable) = false];
	optional uint64    index      = 2 [(gogoproto.nullable) = false];
	optional uint64    term       = 3 [(gogoproto.nullable) = false];
}
message ConfState {
	// The voters in the incoming config. (If the configuration is not joint,
	// then the outgoing config is empty).
	repeated uint64 voters = 1;
	// The learners in the incoming config.
	repeated uint64 learners          = 2;
	// The voters in the outgoing config.
	repeated uint64 voters_outgoing   = 3;
	// The nodes that will become learners when the outgoing config is removed.
	// These nodes are necessarily currently in nodes_joint (or they would have
	// been added to the incoming config right away).
	repeated uint64 learners_next     = 4;
	// If set, the config is joint and Raft will automatically transition into
	// the final config (i.e. remove the outgoing config) when this is safe.
	optional bool   auto_leave        = 5 [(gogoproto.nullable) = false];
}

raft/storage.go当中可以看到Storeage是一个interface,目前只有一种基于内存的实现MemoryStorage

  • 里面有一个互斥锁
  • snapshot
  • 还有对应的entries
  • 这个hardState是用来记录这个快照的termvotecommit
type MemoryStorage struct {
	// Protects access to all fields. Most methods of MemoryStorage are
	// run on the raft goroutine, but Append() is run on an application
	// goroutine.
	sync.Mutex
	
	hardState pb.HardState
	snapshot  pb.Snapshot
	// ents[i] has raft log position i+snapshot.Metadata.Index
	ents []pb.Entry
}

初始化

NewServer的时候会初始化snapshotter

	ss := snap.New(cfg.Logger, cfg.SnapDir())

何时触发快照

server启动后,会有一个forever的处理,里面会有apply的操作,这个applyAll会触发快照的生成

// etcdserver/server.go
......
for {
		select {
		case ap := <-s.r.apply():
			f := func(context.Context) { s.applyAll(&ep, &ap) }
			sched.Schedule(f)
			......
}

而这个apply是在node start的时候,同样启动了一个forever,监听node Ready通道;当上层业务通过调用getSnapshot的时候就会触发此ready操作,Ready是一个状态集合,维护着各种状态,是否可以读,可以写等;

另外,可以看一下测试类里面的代码:etcdserver/server_test.go#TestTriggerSnap

生成快照

getSnapshot将用户需要做快照的数据进行传递过来,然后会进行两步操作

  • 保存在wal:用snapshot类型的record进行封装,然后序列化,落盘
  • 保存snapshot:用Snapshot对象进行封装,然后序列化,落盘至snapshot的文件当中,snapshot文件命名格式为:Term-Index.snap
	if err := rc.wal.SaveSnapshot(walSnap); err != nil {
		return err
	}
	if err := rc.snapshotter.SaveSnap(snap); err != nil {
		return err
	}

需要注意的:快照是follower在本地完成的,不需要leader向其发送消息;因为follower是完整的备份事项,leader只需要将snapshot所带的termindex传递过来即可;

func (r *raft) handleSnapshot(m pb.Message) {
	// message里面只是一个snapshot对象,包含index和term
   sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
   if r.restore(m.Snapshot) {
      r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
         r.id, r.raftLog.committed, sindex, sterm)
      r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
   } else {
      r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
         r.id, r.raftLog.committed, sindex, sterm)
      r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
   }
}