Inside Kubernetes controller-02

网友投稿 650 2022-05-29

二 低层架构-client-go

2.1 Informer

Informer watches 对象事件(added,updated,deleted)

当控制器每次向api服务器查询对象状态时,监视对象更改,api服务器高负载。

informer watch api,通过controller设置关联cache。

func main() { ... clientset, err := kubernetes.NewForConfig(config) // Create InformerFactory informerFactory := informers.NewSharedInformerFactory(clientset, time.Second*30) // Create pod informer by informerFactory podInformer := informerFactory.Core().V1().Pods() // Add EventHandler to informer podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(new interface{}) { log.Println("Added") }, UpdateFunc: func(old, new interface{}) { log.Println("Updated") }, DeleteFunc: func(old interface{}) { log.Println("Deleted") }, }) // Start Go routines informerFactory.Start(wait.NeverStop) // Wait until finish caching with List API informerFactory.WaitForCacheSync(wait.NeverStop) // Create Pod Lister podLister := podInformer.Lister() // Get List of pods _, err = podLister.List(labels.Nothing()) … }

当我们使用Informer,我们不直接使用informer自己,而是使用共享informer。

共享informer 共享相同的资源在一个单独的二进制文件中。

Reflector list & watch api-server

Reflector将对象添加到Delta FIFO 队列中

informer 从Delta FIFO中弹出对象

将对象添加到Indexer

Indexer 存储对象到安全线程存储中

informer 设置事件reference

用户自定义的Controller,事件Handlers 将对象添加至workqueue,

进程获取key,process item

获取对象的key从indexer reference中。

Informer 整体架构

reflector 开始listAndWatch api对象中的用户创建的心资源pod对象。

informer 从DeltaFIFO 队列中Pop初事件。

HandleDelta

indexer.Add 增加事件到内存存储中

开始循环弹出事件并进行处理

后期get or list 请求都从indexer中获取。

Informer 和其组件

Informer:watch 一个对象事件并且存储在内存缓存中

Inside Kubernetes controller-02

Reflector:listAndWatch api-server

DeltaFIFO:FIFO 队列存储对象临时

Indexer:设置或者获取对象

Store:存储对象

Lister:获取对象在内存通过index

2.2 WorkQueue

WorkQueue不同于DeltaFIFO队列,

WorkQueue是使用存储Contrl Loop 的item。

调谐将执行WorkQWueue中的内容。

感觉的控制存储enqueues item 到工作队列中,当事件发生时。

func main() { ... clientset, err := kubernetes.NewForConfig(config) // Create InformerFactory informerFactory := informers.NewSharedInformerFactory(clientset, time.Second*30) // Create pod informer by informerFactory podInformer := informerFactory.Core().V1().Pods() // Create RateLimitQueue queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) // shutdown when process ends defer queue.ShutDown() // Add EventHandler to informer podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(old interface{}) { var key string var err error if key, err = cache.MetaNamespaceKeyFunc(old); err != nil { runtime.HandleError(err) return } queue.Add(key) log.Println("Added: " + key) }, UpdateFunc: func(old, new interface{}) { … }, DeleteFunc: func(old interface{}) { … }, }) … }

整体示意图

Reflector watch k8s-api 将其中资源发送到DeltaFIFO中

Informer EventHandler 从DeltaFIFO中pop出数据。

Informer EventHandler 调用资源事件函数,例如:AddFunc、UpdateFunc、DeleteFunc来将事件对象放进WorkQueue中。

注意:再次添加的key为资源的唯一标示,后期使用的时候从index中之间获取,从而减轻api-server 的压力。

workqueue.Add 进入WorkQueue中,例如namespace/name、default/nginx

循环3-4,处理事件放入队列中。

用户自定义的controller 逻辑从WorkQueue中获取对象进行处理。

Informer 重新同步周期

重新同步周期对于Informer是可选的。

Informer监视api服务器上的对象事件。

再同步期过后,无论发生什么事件,UpdateFunc被回调。结果,调谐将再次执行。

2.3 Controller Cycle Main Logic

完整示意图:

假设用户创建了两个pod,生成了两个update 事件

informer 执行UpdateFunc

Informer 调用workQueue.Add 函数 增加事件到WorkQueue中。

程序处理调用workqueue.Get 获取事件

调谐处理具体更新业务逻辑。

当调谐成功后,处理程序进行workQueue.Forget/workQueue.Done,这个item从workQueue中移除。

当调谐是错误结束后,workQueue 增加速率限制,并且控制器重新入队item,并且开始新一轮调谐。

每次事件发生时,项都会继续存储在工作队列中,控制器处理工作队列中的项并执行协调。该循环持续不断,直到控制器停止。

从内存中读取,写向api-server。

但是,如果我们直接更新缓存中的对象,很难保证其一致性。

所以,我们在更新对象时使用DeepCopy(获取克隆数据)。

例如代码:kubernetes/pkg/controller/replicaset/replica_set.go

rs = rs.DeepCopy() newStatus := calculateStatus(rs, filteredPods, manageReplicasErr) // Always updates status as pods come up or die. updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1(). ⇨ ReplicaSets(rs.Namespace), rs, newStatus)

worker:无限循环 processNextWorkItem。

processNet WorkItem:操作WorkQueue(Get, Add)并且调用调谐逻辑。

syncHandler:这就是调谐具体逻辑。

基于K8s v1.16

worker:https://github.com/kubernetes/kubernetes/blob/release-1.16/pkg/controller/replicaset/replica_set.go#L432

processNextWorkItem:https://github.com/kubernetes/kubernetes/blob/release-1.16/pkg/controller/replicaset/replica_set.go#L437

syncReplicaSet:https://github.com/kubernetes/kubernetes/blob/release-1.16/pkg/controller/replicaset/replica_set.go#L562

其他

Informer 从etcd 同步对象数据到内存中。

再次需要考虑内存数据是否和etcd数据不相同呢。

这是没有问题的,对象有resourceVersion,如果etcd的resourceVersion 于内存缓存中的resourceVersion不同,Controller在更新对象状态时出错,控制器请求调谐,直到迁移完成。

review

Informer:通过eventandler将Control Loop的项添加到WorkQueue中

Lister:通过Indexer从内存缓存中获取对象数据

WorkQueue:存储控制循环项的队列。该项是Reconcile Logic的目标。如果在调解结束时发生错误,控制器将项请求到工作队列。控制器再次执行Reconcile。

Kubernetes

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:【C++快速上手】十、volatile学习笔记
下一篇:结构化数据,我该拿你怎么办?
相关文章