Lease使用

先声明一个指定过期时间ttllease,再将lease绑定到一个key上面,ttl到期后将key移除,这中间有哪些操作呢?

Lease构成

lessor

lessor是一个接口,实现了对leasegrantrevokeattach等一系列的操作;在server.go NewServer的时候,会新建一个lessor

	// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
	// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
	srv.lessor = lease.NewLessor(
		srv.getLogger(),
    // 这个be是一个后端的backend,可以认为是存储操作,是内存或者boltdb
		srv.be,
		lease.LessorConfig{
			MinLeaseTTL:                int64(math.Ceil(minTTL.Seconds())),
			CheckpointInterval:         cfg.LeaseCheckpointInterval,
			ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
		})

lease queue

租约队列,实现了go heap的接口,有poppushswaplen操作,本质上是一个数组 []*LeaseWithTime,其构成如下

  • id:leaseId,由系统生成,int64
  • time:即ttl(Time To Live),还有多久过期
  • index:在queue当中的序号,当队列重新排队的时候,会被修改
type LeaseWithTime struct {
   id LeaseID
   // Unix nanos timestamp.
   time  int64
   index int
}

LeaseExpiredNotifier

这是一个对象,包含lease queue和一个map,这个mapleaseId: LeaseWithTime,方便进行索引

实现过程

grant

  • 创建一个lease对象
  • 组装LeaseWithTime
  • 将其放入queue当中,在lease queue当中实现对元素的push和重排序操作
    • 为什么会有重排序?因为有可能添加了续约的时间,这个时间time被更新了
  • 落盘,根据创建时的backend进行持久化操作
func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
  ......
	le.leaseMap[id] = l
	item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
	le.leaseExpiredNotifier.RegisterOrUpdate(item)
	l.persistTo(le.b)
  .....
}

ttl处理

  • leasor启动的时候,早早地创建好了loop,对queue里面的元素进行监听
func (le *lessor) runLoop() {
	defer close(le.doneC)

	for {
		le.revokeExpiredLeases()
		le.checkpointScheduledLeases()

		select {
		case <-time.After(500 * time.Millisecond):
		case <-le.stopC:
			return
		}
	}
}
  • 每500ms对lease queue进行一次revoke expired lease操作,这个逻辑也比较简单
    • 查看lease queue里面是否存在过期的lease,其实就是取第一个,因为每次写入操作都会对队列进行排序,所以第一个就是快过期的;
    • 判断时间是否过期,若过期,则将其从lease queue当中pop