Watch使用

watch有两种操作,一种是key,一种是range,即监听一段的key,从一个测试用例里面看看,主要包括几个操作

  • 初始化一个watchablestore
  • 初始化watchStream
  • 使用watchStream创建一个watch对象,监听一个Key
  • 使用put操作进行测试即可完成整个流程
func TestWatch(t *testing.T) {
	b, tmpPath := backend.NewDefaultTmpBackend()
	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})

	defer func() {
		s.store.Close()
		os.Remove(tmpPath)
	}()

	testKey := []byte("foo")
	testValue := []byte("bar")
	s.Put(testKey, testValue, lease.NoLease)

	w := s.NewWatchStream()
	w.Watch(0, testKey, nil, 0)

	if !s.synced.contains(string(testKey)) {
		// the key must have had an entry in synced
		t.Errorf("existence = false, want true")
	}
}

Watch结构

event

监听即为事件信息,对事件进行封装,里面结构如下

  • type:操作类型,DELETPUT
  • KVkey\value
  • PrevKV:上一版本的kv

watcher

使用watch命令的时候即为使用此对象来存放所监听的key,并负责对事件最终的发送;里面重要结构如下:

  • keykey列表
  • end:若是监听一段key的变化,则会有一个区间形式的操作,[start,end)
  • victim:翻译为牺牲者,当有事件可以发送时,client可能存在一定的问题未发送成功(比如说通道满了)是否存在未发送成功的事件,是一个标记
  • compacted:对事件进行压缩
  • fcs:这是一个函数过滤器列表,过滤出client只想监听的key
  • ch:还有一个watchResponse,用于将监听的事件发送出去,形成消息通道

watcherGroup

watcher的集合,正因为这样的设计,所以etcd支持一段keywatch,里面结构如下:

  • keyWatchers:一个map对象,key为需要监听的keyvaluewatcherSet
  • ranges:红黑树(adt),用于对key->watcher结构进行快速查询,存储、删除操作
  • watcherswatcher列表

watchstream

创建watchserver的时候,会创建一个watchStreamstream里面起了一个for循环,会监听rpc消息过来的watch事件信息:创建watch、取消watch,处理watch

func (sws *serverWatchStream) recvLoop() error {
	switch uv := req.RequestUnion.(type) {
		case *pb.WatchRequest_CreateRequest:
		case *pb.WatchRequest_CancelRequest:
		case *pb.WatchRequest_ProgressRequest:
}

watchstore

初始化server的时候生成,里面主要包含几个东西

  • store:对boltdb进行封装,实现底层数据的操作
  • victimc:翻译为牺牲者,实际上是发送给client失败后的一些事件信息
  • unsynced:本质上是waitGroup,未发送给client的事件,创建key的监听时,监听keyrev若小于库里面的rev存放于此
  • synced:本质上是waitGroup,需要发送给client的事件都在此存放

工作原理

初始化

  • 初始化server

  • 初始化mvcc

  • 初始化watchstore

    • 启动监听watcher循环
    • 启动监听victims循环
  	go s.syncWatchersLoop()
  	go s.syncVictimsLoop()

syncWatchers

  • kvsToEvents:将变更的kv进行event化处理,此时会从watcherGroup当中是否包含这个key,从红黑树里面拿到结果

  • key的rev与storerev进行比对,判断此watch是否需要进行sync,还是放在unsync当中

  • 组装watchResponse,发送至client;若发送失败,将event放至victims里面,走syncVictimsLoop逻辑

client put&&end

put操作最终会进行事务提交,所以最后会走到mvcc模块,这里面会有put操作

func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
......
	tw.trace.Step("marshal mvccpb.KeyValue")
	tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
	tw.s.kvindex.Put(key, idxRev)
	tw.changes = append(tw.changes, kv)
	tw.trace.Step("store kv pair into bolt db")
......
}

kv的数据信息放至changes里面;将key与revision信息重新存放至btree里面

put完成之后,会有end操作

func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
	tw := wv.kv.Write(traceutil.TODO())
	defer tw.End()
	return tw.Put(key, value, lease)
}

watchableStroe end

end函数里面会对kv进行event化处理,最后notifywatchstreamchannel当中

func (tw *watchableStoreTxnWrite) End() {
	changes := tw.Changes()
	if len(changes) == 0 {
		tw.TxnWrite.End()
		return
	}

	rev := tw.Rev() + 1
	evs := make([]mvccpb.Event, len(changes))
	for i, change := range changes {
		evs[i].Kv = &changes[i]
		if change.CreateRevision == 0 {
			evs[i].Type = mvccpb.DELETE
			evs[i].Kv.ModRevision = rev
		} else {
			evs[i].Type = mvccpb.PUT
		}
	}

	tw.s.mu.Lock()
  // notify
	tw.s.notify(rev, evs)
	tw.TxnWrite.End()
	tw.s.mu.Unlock()
}

sendWatchResponse

eventkeywatcherGroup当中查找到对应的watcher,然后将数据变更信息发送至client

参考