K8s Informer机制
informer是k8s里面的重要通讯机制,理解了它,有助于我们对k8s进行二次开发,像operator等;看一下简单的实现
package main
import (
"context"
"fmt"
"log"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 简单起见硬编码相关配置
configPath := "/Users/admin/Documents/etc/k8s/config/kubeconfig.yaml"
masterURL := "https://10.0.4.175:6443"
// 初始化config
config, err := clientcmd.BuildConfigFromFlags(masterURL, configPath)
if err != nil {
panic(err)
}
// 初始化client
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
depList, _ := kubeClient.AppsV1().Deployments("idcos").List(context.Background(), metav1.ListOptions{LabelSelector: ""})
for _, v := range depList.Items {
log.Printf("namespace: %s, name: %s", v.Namespace, v.Name)
}
fmt.Printf("connection k8s success \n")
// 获取工厂实例, 通过这个工厂实例可获取到所有资源的 Informer
factory := informers.NewSharedInformerFactory(kubeClient, 0)
// 创建Pod Informer
podInformer := factory.Core().V1().Pods()
informer := podInformer.Informer()
// 创建ns informer
ns := factory.Core().V1().Namespaces()
nsformer := ns.Informer()
stopCh := make(chan struct{})
defer close(stopCh)
go factory.Start(stopCh)
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
log.Fatal("sync failed")
}
nsformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
// 当有namespace创建的时候,会打印出来
AddFunc: func(obj interface{}) {
ns := obj.(*v1.Namespace)
log.Println("get a namesapce:", ns.Name)
},
// 当有namespace更新的时候,会打印出来
UpdateFunc: func(oldObj, newObj interface{}) {
log.Printf("update namespace %s -> %s", oldObj.(*v1.Namespace).Name, newObj.(*v1.Namespace).Name)
},
// 当有namespace删除的时候,会打印出来
DeleteFunc: func(obj interface{}) {
log.Println("delete a namespace", obj.(*v1.Namespace).Name)
},
})
// 拿到所有的namespace
nsLister := ns.Lister()
nsList, err := nsLister.List(labels.Everything())
if err != nil {
log.Fatal(err)
}
fmt.Println(nsList)
<-stopCh
}
代码当中的配置文件是在
k8s集群当中的cat ~/.kube/config当中,将其copy至本地即可开始debug
这个例子里面的大概意思可以描述为用户通过kubectl命令操作ApiServer的种种行为,都可以被监听到,用户可以对这些事件通过注册Handler的方式进行扩展操作;那他怎么实现的呢?先看一张官方的图,有一个大概的了解,后面会详情地理一理;

这张图刚看起来可能比较抽象;我的思路是以obj为主体,跟踪它的流转情形;当看完源码后,再来看这张图就会比较清晰了;
Delta
当我们使用了kubectl create ns idcos这样一条命令的时候,肯定是先调用至apiserver,若apiserver将这些变动封装成对象的对象,通知其他模块我做了update/delete/add等操作后,其他模块即可继续向下走,完成自己的逻辑,所以本质上就是对这些update/delete/add的事件操作;
这些事件都会被封装为一个一个的Delta,Type主要包括五种,add/update/delete/replace/sync
type Delta struct {
Type DeltaType
Object interface{}
}
Reflector
ApiServer接收发送请求会非常的多,肯定有一个模块是用来监听它的,这个便是Reflector;它有两个方法,一个是List,用于监听全量的信息;一个是Watch用于监听增量的信息;
// staging/src/k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {}
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {}
Watch出来的event会有add/update/delete操作,这些数据会被封装为Delta,然后进入到Delta FIFO queue当中
Controller
这个Controller是与我们k8s里面的controller不是一回事,只是代码起了这个名字,它在初始化完成后,会有一个loop,将Delta FIFO queue里面的obj pop给Informer
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
在Pop的时候会有一个process,这个对应的便是HandleDeltas
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
......
err := process(item)
......
}
Indexer
这些obj对象需要变更,存储,否则没有办法知道现在是什么状况;Indexer的职责就是用于存储这些obj,当informer接收到obj后,通过HandleDeltas会将add/update/delete的操作给Indexer,这时Indexer就会进行存储;
以Add为例,这个时候的操作,是更新至内存当中,其实现方法为:
// staging/src/k8s.io/client-go/tools/cache/store.go
// Add inserts an item into the cache.
func (c *cache) Add(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Add(key, obj)
return nil
}
持久化对象结构乃是一个map;
// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
Informer
HandleDeltas对obj进行处理,通过processor进行distribute至listener,同时将obj转换为notification添加至listener的addChannel当中;
processor: informer的一个对象,在初始化的时候会创建出来;它的作用是对obj进行处理,通过listner干活
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
// indexer进行更新
if err := s.indexer.Update(d.Object); err != nil {
return err
}
isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
// processor进行distribute
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
// indexer进行添加
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
// indexer进行删除
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
listener:在processor的时候创建,它的作用是:将obj对象分发至注册的add/update/delete handles,它有两个方法run\pop
// k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}
- run是用来处理
notification给注册的handlers pop是用来监听addChannel里面的notification,交由run来处理;
Obj流转图

- 红色的箭头即为
obj的流转过程 - 最上面的
client\factory\handle即为开发人员的操作过程;初始化创建等;
总结
apiserver是k8s的主要入口,所有的操作都是由ApiServer来执行的;- 通过
Informer的处理,大大提升了k8s的扩展性,这也就衍生出了Operator的开发框架,像 operator-framework FIFO queue保证了消息的缓冲,大大提高了运行的性能;