Watch使用
watch
有两种操作,一种是key
,一种是range
,即监听一段的key
,从一个测试用例里面看看,主要包括几个操作
- 初始化一个
watchablestore
- 初始化
watchStream
- 使用
watchStream
创建一个watch
对象,监听一个Key
- 使用
put
操作进行测试即可完成整个流程
func TestWatch(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})
defer func() {
s.store.Close()
os.Remove(tmpPath)
}()
testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue, lease.NoLease)
w := s.NewWatchStream()
w.Watch(0, testKey, nil, 0)
if !s.synced.contains(string(testKey)) {
// the key must have had an entry in synced
t.Errorf("existence = false, want true")
}
}
Watch结构
event
监听即为事件信息,对事件进行封装,里面结构如下
type
:操作类型,DELET
、PUT
KV
:key\value
PrevKV
:上一版本的kv
watcher
使用watch
命令的时候即为使用此对象来存放所监听的key
,并负责对事件最终的发送;里面重要结构如下:
key
:key
列表end
:若是监听一段key
的变化,则会有一个区间形式的操作,[start,end)victim
:翻译为牺牲者,当有事件可以发送时,client
可能存在一定的问题未发送成功(比如说通道满了)是否存在未发送成功的事件,是一个标记compacted
:对事件进行压缩fcs
:这是一个函数过滤器列表,过滤出client
只想监听的key
ch
:还有一个watchResponse
,用于将监听的事件发送出去,形成消息通道
watcherGroup
watcher
的集合,正因为这样的设计,所以etcd
支持一段key
的watch
,里面结构如下:
keyWatchers
:一个map
对象,key
为需要监听的key
,value
为watcherSet
ranges
:红黑树(adt
),用于对key
->watcher
结构进行快速查询,存储、删除操作watchers
:watcher
列表
watchstream
创建watchserver
的时候,会创建一个watchStream
;stream
里面起了一个for
循环,会监听rpc
消息过来的watch
事件信息:创建watch
、取消watch
,处理watch
;
func (sws *serverWatchStream) recvLoop() error {
switch uv := req.RequestUnion.(type) {
case *pb.WatchRequest_CreateRequest:
case *pb.WatchRequest_CancelRequest:
case *pb.WatchRequest_ProgressRequest:
}
watchstore
初始化server
的时候生成,里面主要包含几个东西
store
:对boltdb
进行封装,实现底层数据的操作victimc
:翻译为牺牲者,实际上是发送给client失败后的一些事件信息unsynced
:本质上是waitGroup
,未发送给client
的事件,创建key
的监听时,监听key
的rev
若小于库里面的rev
存放于此synced
:本质上是waitGroup
,需要发送给client
的事件都在此存放
工作原理
初始化
-
初始化
server
-
初始化
mvcc
-
初始化
watchstore
- 启动监听
watcher
循环 - 启动监听
victims
循环
- 启动监听
go s.syncWatchersLoop()
go s.syncVictimsLoop()
syncWatchers
-
kvsToEvents
:将变更的kv
进行event
化处理,此时会从watcherGroup
当中是否包含这个key
,从红黑树里面拿到结果 -
将
key
的rev与store
的rev
进行比对,判断此watch
是否需要进行sync
,还是放在unsync
当中 -
组装
watchResponse
,发送至client
;若发送失败,将event
放至victims
里面,走syncVictimsLoop
逻辑
client put&&end
put
操作最终会进行事务提交,所以最后会走到mvcc
模块,这里面会有put
操作
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
......
tw.trace.Step("marshal mvccpb.KeyValue")
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
tw.s.kvindex.Put(key, idxRev)
tw.changes = append(tw.changes, kv)
tw.trace.Step("store kv pair into bolt db")
......
}
将kv
的数据信息放至changes
里面;将key
与revision信息重新存放至btree
里面
在put
完成之后,会有end
操作
func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
tw := wv.kv.Write(traceutil.TODO())
defer tw.End()
return tw.Put(key, value, lease)
}
watchableStroe end
end
函数里面会对kv
进行event
化处理,最后notify
至watchstream
的channel
当中
func (tw *watchableStoreTxnWrite) End() {
changes := tw.Changes()
if len(changes) == 0 {
tw.TxnWrite.End()
return
}
rev := tw.Rev() + 1
evs := make([]mvccpb.Event, len(changes))
for i, change := range changes {
evs[i].Kv = &changes[i]
if change.CreateRevision == 0 {
evs[i].Type = mvccpb.DELETE
evs[i].Kv.ModRevision = rev
} else {
evs[i].Type = mvccpb.PUT
}
}
tw.s.mu.Lock()
// notify
tw.s.notify(rev, evs)
tw.TxnWrite.End()
tw.s.mu.Unlock()
}
sendWatchResponse
将event
的key
在watcherGroup
当中查找到对应的watcher
,然后将数据变更信息发送至client