日志复制

日志对象

etcd把一条日志做为一个entry,每一个里面都会有几个属性:

  • Termleader的任期,这个属性的目的乃是follower接收到msgApp类型的消息的时候,会与本地维护的Term进行对比,防止出现不是最新的leader,以便可以重新发起选举
  • Indexleader维护的日志信息当中最大的索引
  • Type,日志类型,因为etcd当中最早使用的是entryConfChange现在升级到2版本,主要加了事务的功能;
  • Data,日志信息,是使用entryConfChange2结构体进行序列化的结果
type Entry struct {
	Term             uint64    `protobuf:"varint,2,opt,name=Term" json:"Term"`
	Index            uint64    `protobuf:"varint,3,opt,name=Index" json:"Index"`
	Type             EntryType `protobuf:"varint,1,opt,name=Type,enum=raftpb.EntryType" json:"Type"`
	Data             []byte    `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"`
	XXX_unrecognized []byte    `json:"-"`
}

日志复制过程

发起

clientetcd-server发起一条msg的时候(比如etcdctl put key value),这个时候的会通过rpc调用至etcdserveretcdserver在处理的时候,会将这条数据进行封装

  • 类型为MsgProp
  • 消息体为Entry
func (n *node) Propose(ctx context.Context, data []byte) error {
	return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}

然后将这条消息放入到node的propcchannel当中

// Step advances the state machine using msgs. The ctx.Err() will be returned,
// if any.
func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
	......
	ch := n.propc
	pm := msgWithResult{m: m}
	if wait {
		pm.result = make(chan error, 1)
	}
	select {
    // 将pm写入channel,node监听拿到pm,这就是go channel的好处哇;
	case ch <- pm:
		if !wait {
			return nil
		}
	.....
}

node run的时候会有一个forever的进程,一直在监听着channel,直接就进入leaderstepLeader方法当中

		case pm := <-propc:
			m := pm.m
			m.From = r.id
			// 进入r.Step,因为leader的Term比较大,所以直接就进行StepLeader
			err := r.Step(m)
			if pm.result != nil {
				pm.result <- err
				close(pm.result)
			}

Leader处理过程

Leader本地处理
  • 对日志进行unmarshal
  • 判断日志是否可以进行追加
    • 已经存在待追加的消息
    • 已经joint,事务还未提交(根据节点维护的ProgressTracker记录着follower当前的状态来判断)
    • 存在正在提交事务
  • 若可以则进行将pendingConfIndex+1,并进行日志追加广播
    • 追加日志广播的时候一样的逻辑,消息类型为msgApp,消息体为entry,并带上自身的LogTermIndex

leader send entry

case pb.MsgProp:
		......
		// 开始处理entry
		for i := range m.Entries {
      // 反序列化得到消息
			e := &m.Entries[i]
      ......
				var ccc pb.ConfChangeV2
				if err := ccc.Unmarshal(e.Data); err != nil {
					panic(err)
				}
				cc = ccc
			if cc != nil {
        // 判断是否可以追加
				alreadyPending := r.pendingConfIndex > r.raftLog.applied
				alreadyJoint := len(r.prs.Config.Voters[1]) > 0
				wantsLeaveJoint := len(cc.AsV2().Changes) == 0

				var refused string
				if alreadyPending {
					refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
				} else if alreadyJoint && !wantsLeaveJoint {
					refused = "must transition out of joint config first"
				} else if !alreadyJoint && wantsLeaveJoint {
					refused = "not in joint state; refusing empty conf change"
				}

				if refused != "" {
					r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
					m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
				} else {
					r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
				}
			}
		}

		if !r.appendEntry(m.Entries...) {
			return ErrProposalDropped
		}
		r.bcastAppend()
		return nil
Follower远程处理

follower接收到消息之后的逻辑

  • 比对一个Term,看看leader是不是最新的
  • 开始尝试追加
  • 最后返回给leader一个MsgAppResp的消息类型

follower resp

func (r *raft) handleAppendEntries(m pb.Message) {
	if m.Index < r.raftLog.committed {
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
		return
	}

	if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
	} else {
		r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
			r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
	}
}
follower追加逻辑

对比本地indexTerm;每一个节点上面都会维护一个raftLog,它里面包含已存储的日志Storage,已提交的日志索引(Commited)、未提交的unstable,已同意提交的applied(其中这个属性的值会<=committed)

  • entryindex需要比unstablestorage的最大值还要大;

  • entryterm需要比unstablestorageTerm最大值还要大,都取最后一条日志的上一条的Term进行对比;这也是为什么etcd能够做到数据版本历史的原因

  • 若以上条件都满足,对每个entry是否已经被包含了,然后再进行一次commit,这个逻辑比较简单,直接将committed修改为最新需要commitindex即可

  • 做完这一切会向Leader发送一个MsgAppResp的消息,包括自己的id,最新的日志信息index;若是拒绝的,则会发送自身最新的日志信息,将其放在RejectHint属性当中

Leader接收MsgAppResp

stepLeader function当中可以看到收到MsgAppResp的消息后,Leader做了以下处理:

  • 若是拒绝消息,因为有可能follower的日志比较旧,跟不上leader,那么leader就会根据RejectHint来降低自己的日志索引,然后发给follower,同时将progress里面follower的状态维护为StateProbe,意思是这个伙计现在有点问题,直至恢复

  • 若是成功消息,则对此followerprogress里面的状态进行判断

    • 若为Probe,修改为Replicate
    • 若为snapshot,说明是需要做快照了
    • 若为replicate,则需要开始提交commit了;
    • 若没有问题,将leader的commit索引更新,然后广播可以提交的append消息,只不过这一次发送的时候progress的状态不同;
    				if r.maybeCommit() {
    				// 广播发送
    					r.bcastAppend()
    
    		m.Type = pb.MsgApp
    		m.Index = pr.Next - 1
    		m.LogTerm = term
    		m.Entries = ents
    		m.Commit = r.raftLog.committed
    		if n := len(m.Entries); n != 0 {
    			switch pr.State {
    			// optimistically increase the next when in StateReplicate
          // 再次发送时,state为replicate
    			case tracker.StateReplicate:
    				last := m.Entries[n-1].Index
    				pr.OptimisticUpdate(last)
    				pr.Inflights.Add(last)
           // 第一次发送是为probe状态
    			case tracker.StateProbe:
    				pr.ProbeSent = true
    			default:
    				r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
    			}
    		}
    

可以看到当etcdserver接受到命令的时候,并不是直接就执行,而是先放在本地的unstable里面,然后再向follower发送,等大多数的follower返回结果,自己才会进行更新至commit里面;而整个过程都是维护在ProgressTracker当中;

type ProgressTracker struct {
	Config
  Progress ProgressMap 
	Votes map[uint64]bool  
	MaxInflight int
}
  • ProgressMap是以follower的id为key,对应的value为一个Progress的指针,维护着follower的状态
    • 状态类型,probereplicatesnapshot
type Progress struct {
	Match, Next uint64
	State StateType
  // 等待快照的数量
	PendingSnapshot uint64
  // 是否活跃,即收到了任何的message都会设置为true
	RecentActive bool
  // 当为true时,follower就无法向其发送信息,直到follower修改状态
	ProbeSent bool
  // 这个是航班,每开始同步日志的时候,就会将其尾部的去掉,将新的日志index加入进去;
	Inflights *Inflights
  // 刚开始的节点或者角色发生变化 后,状态为learner;
	IsLearner bool
}