K8s Informer机制

informerk8s里面的重要通讯机制,理解了它,有助于我们对k8s进行二次开发,像operator等;看一下简单的实现

package main

import (
	"context"
	"fmt"
	"log"

	v1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	// 简单起见硬编码相关配置
	configPath := "/Users/admin/Documents/etc/k8s/config/kubeconfig.yaml"
	masterURL := "https://10.0.4.175:6443"

	// 初始化config
	config, err := clientcmd.BuildConfigFromFlags(masterURL, configPath)
	if err != nil {
		panic(err)
	}

	// 初始化client
	kubeClient, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err)
	}

	depList, _ := kubeClient.AppsV1().Deployments("idcos").List(context.Background(), metav1.ListOptions{LabelSelector: ""})

	for _, v := range depList.Items {
		log.Printf("namespace: %s, name: %s", v.Namespace, v.Name)
	}

	fmt.Printf("connection k8s success \n")

	// 获取工厂实例, 通过这个工厂实例可获取到所有资源的 Informer
	factory := informers.NewSharedInformerFactory(kubeClient, 0)
	// 创建Pod Informer
	podInformer := factory.Core().V1().Pods()
	informer := podInformer.Informer()

	// 创建ns informer
	ns := factory.Core().V1().Namespaces()
	nsformer := ns.Informer()

	stopCh := make(chan struct{})
	defer close(stopCh)
	go factory.Start(stopCh)

	if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
		log.Fatal("sync failed")
	}

	nsformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    // 当有namespace创建的时候,会打印出来
		AddFunc: func(obj interface{}) {
			ns := obj.(*v1.Namespace)
			log.Println("get a namesapce:", ns.Name)
		},
    // 当有namespace更新的时候,会打印出来
		UpdateFunc: func(oldObj, newObj interface{}) {
			log.Printf("update namespace %s -> %s", oldObj.(*v1.Namespace).Name, newObj.(*v1.Namespace).Name)
		},
    // 当有namespace删除的时候,会打印出来
		DeleteFunc: func(obj interface{}) {
			log.Println("delete a namespace", obj.(*v1.Namespace).Name)
		},
	})

  // 拿到所有的namespace
	nsLister := ns.Lister()
	nsList, err := nsLister.List(labels.Everything())
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println(nsList)

	<-stopCh
}

代码当中的配置文件是在k8s集群当中的cat ~/.kube/config当中,将其copy至本地即可开始debug

这个例子里面的大概意思可以描述为用户通过kubectl命令操作ApiServer的种种行为,都可以被监听到,用户可以对这些事件通过注册Handler的方式进行扩展操作;那他怎么实现的呢?先看一张官方的图,有一个大概的了解,后面会详情地理一理;

k8s informer

这张图刚看起来可能比较抽象;我的思路是以obj为主体,跟踪它的流转情形;当看完源码后,再来看这张图就会比较清晰了;

Delta

当我们使用了kubectl create ns idcos这样一条命令的时候,肯定是先调用至apiserver,若apiserver将这些变动封装成对象的对象,通知其他模块我做了update/delete/add等操作后,其他模块即可继续向下走,完成自己的逻辑,所以本质上就是对这些update/delete/add的事件操作;

这些事件都会被封装为一个一个的DeltaType主要包括五种,add/update/delete/replace/sync

type Delta struct {
	Type   DeltaType
	Object interface{}
}

Reflector

ApiServer接收发送请求会非常的多,肯定有一个模块是用来监听它的,这个便是Reflector;它有两个方法,一个是List,用于监听全量的信息;一个是Watch用于监听增量的信息;

// staging/src/k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {}
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {}

Watch出来的event会有add/update/delete操作,这些数据会被封装为Delta,然后进入到Delta FIFO queue当中

Controller

这个Controller是与我们k8s里面的controller不是一回事,只是代码起了这个名字,它在初始化完成后,会有一个loop,将Delta FIFO queue里面的obj popInformer

func (c *controller) processLoop() {
	for {
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

Pop的时候会有一个process,这个对应的便是HandleDeltas

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
		......
		err := process(item)
		......
}

Indexer

这些obj对象需要变更,存储,否则没有办法知道现在是什么状况;Indexer的职责就是用于存储这些obj,当informer接收到obj后,通过HandleDeltas会将add/update/delete的操作给Indexer,这时Indexer就会进行存储;

Add为例,这个时候的操作,是更新至内存当中,其实现方法为:

// staging/src/k8s.io/client-go/tools/cache/store.go 
// Add inserts an item into the cache.
func (c *cache) Add(obj interface{}) error {
	key, err := c.keyFunc(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	c.cacheStorage.Add(key, obj)
	return nil
}

持久化对象结构乃是一个map

// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
	lock  sync.RWMutex
	items map[string]interface{}

	// indexers maps a name to an IndexFunc
	indexers Indexers
	// indices maps a name to an Index
	indices Indices
}

Informer

HandleDeltasobj进行处理,通过processor进行distributelistener,同时将obj转换为notification添加至listeneraddChannel当中;

  • processor: informer的一个对象,在初始化的时候会创建出来;它的作用是对obj进行处理,通过listner干活
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// from oldest to newest
	for _, d := range obj.(Deltas) {
		switch d.Type {
		case Sync, Replaced, Added, Updated:
			s.cacheMutationDetector.AddObject(d.Object)
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				// indexer进行更新
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}

				isSync := false
				switch {
				case d.Type == Sync:
					// Sync events are only propagated to listeners that requested resync
					isSync = true
				case d.Type == Replaced:
					if accessor, err := meta.Accessor(d.Object); err == nil {
						if oldAccessor, err := meta.Accessor(old); err == nil {
							// Replaced events that didn't change resourceVersion are treated as resync events
							// and only propagated to listeners that requested resync
							isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
						}
					}
				}
				// processor进行distribute
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
			// indexer进行添加
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				s.processor.distribute(addNotification{newObj: d.Object}, false)
			}
		case Deleted:
			// indexer进行删除
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}
  • listener:在processor的时候创建,它的作用是:将obj对象分发至注册的add/update/delete handles ,它有两个方法run\pop
// k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
	func() {
		p.listenersLock.RLock()
		defer p.listenersLock.RUnlock()
		for _, listener := range p.listeners {
			p.wg.Start(listener.run)
			p.wg.Start(listener.pop)
		}
		p.listenersStarted = true
	}()
	<-stopCh
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()
	for _, listener := range p.listeners {
		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
	}
	p.wg.Wait() // Wait for all .pop() and .run() to stop
}
  • run是用来处理notification给注册的handlers
  • pop是用来监听addChannel里面的notification,交由run来处理;

Obj流转图

k8s delta obj

  • 红色的箭头即为obj的流转过程
  • 最上面的client\factory\handle即为开发人员的操作过程;初始化创建等;

总结

  • apiserverk8s的主要入口,所有的操作都是由ApiServer来执行的;
  • 通过Informer的处理,大大提升了k8s的扩展性,这也就衍生出了Operator的开发框架,像 operator-framework
  • FIFO queue保证了消息的缓冲,大大提高了运行的性能;

参考