Watch使用
watch有两种操作,一种是key,一种是range,即监听一段的key,从一个测试用例里面看看,主要包括几个操作
- 初始化一个
watchablestore - 初始化
watchStream - 使用
watchStream创建一个watch对象,监听一个Key - 使用
put操作进行测试即可完成整个流程
func TestWatch(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer func() {
s.store.Close()
os.Remove(tmpPath)
}()
testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue, lease.NoLease)
w := s.NewWatchStream()
w.Watch(0, testKey, nil, 0)
if !s.synced.contains(string(testKey)) {
// the key must have had an entry in synced
t.Errorf("existence = false, want true")
}
}
Watch结构
event
监听即为事件信息,对事件进行封装,里面结构如下
type:操作类型,DELET、PUTKV:key\valuePrevKV:上一版本的kv
watcher
使用watch命令的时候即为使用此对象来存放所监听的key,并负责对事件最终的发送;里面重要结构如下:
key:key列表end:若是监听一段key的变化,则会有一个区间形式的操作,[start,end)victim:翻译为牺牲者,当有事件可以发送时,client可能存在一定的问题未发送成功(比如说通道满了)是否存在未发送成功的事件,是一个标记compacted:对事件进行压缩fcs:这是一个函数过滤器列表,过滤出client只想监听的keych:还有一个watchResponse,用于将监听的事件发送出去,形成消息通道
watcherGroup
watcher的集合,正因为这样的设计,所以etcd支持一段key的watch,里面结构如下:
keyWatchers:一个map对象,key为需要监听的key,value为watcherSetranges:红黑树(adt),用于对key->watcher结构进行快速查询,存储、删除操作watchers:watcher列表
watchstream
创建watchserver的时候,会创建一个watchStream;stream里面起了一个for循环,会监听rpc消息过来的watch事件信息:创建watch、取消watch,处理watch;
func (sws *serverWatchStream) recvLoop() error {
switch uv := req.RequestUnion.(type) {
case *pb.WatchRequest_CreateRequest:
case *pb.WatchRequest_CancelRequest:
case *pb.WatchRequest_ProgressRequest:
}
watchstore
初始化server的时候生成,里面主要包含几个东西
store:对boltdb进行封装,实现底层数据的操作victimc:翻译为牺牲者,实际上是发送给client失败后的一些事件信息unsynced:本质上是waitGroup,未发送给client的事件,创建key的监听时,监听key的rev若小于库里面的rev存放于此synced:本质上是waitGroup,需要发送给client的事件都在此存放
工作原理
初始化
-
初始化
server -
初始化
mvcc -
初始化
watchstore- 启动监听
watcher循环 - 启动监听
victims循环
- 启动监听
go s.syncWatchersLoop()
go s.syncVictimsLoop()
syncWatchers
-
kvsToEvents:将变更的kv进行event化处理,此时会从watcherGroup当中是否包含这个key,从红黑树里面拿到结果 -
将
key的rev与store的rev进行比对,判断此watch是否需要进行sync,还是放在unsync当中 -
组装
watchResponse,发送至client;若发送失败,将event放至victims里面,走syncVictimsLoop逻辑
client put&&end
put操作最终会进行事务提交,所以最后会走到mvcc模块,这里面会有put操作
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
......
tw.trace.Step("marshal mvccpb.KeyValue")
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
tw.s.kvindex.Put(key, idxRev)
tw.changes = append(tw.changes, kv)
tw.trace.Step("store kv pair into bolt db")
......
}
将kv的数据信息放至changes里面;将key与revision信息重新存放至btree里面
在put完成之后,会有end操作
func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
tw := wv.kv.Write(traceutil.TODO())
defer tw.End()
return tw.Put(key, value, lease)
}
watchableStroe end
end函数里面会对kv进行event化处理,最后notify至watchstream的channel当中
func (tw *watchableStoreTxnWrite) End() {
changes := tw.Changes()
if len(changes) == 0 {
tw.TxnWrite.End()
return
}
rev := tw.Rev() + 1
evs := make([]mvccpb.Event, len(changes))
for i, change := range changes {
evs[i].Kv = &changes[i]
if change.CreateRevision == 0 {
evs[i].Type = mvccpb.DELETE
evs[i].Kv.ModRevision = rev
} else {
evs[i].Type = mvccpb.PUT
}
}
tw.s.mu.Lock()
// notify
tw.s.notify(rev, evs)
tw.TxnWrite.End()
tw.s.mu.Unlock()
}
sendWatchResponse
将event的key在watcherGroup当中查找到对应的watcher,然后将数据变更信息发送至client