[k8s源码]4.informer

07-21 1579阅读

Informer 是 client-go 库中的一个核心组件,它提供了一种高效的方式来监视 Kubernetes 集群中资源的变化。Informer 通过 Watch 机制与 API Server 建立长连接,初次同步时会获取资源的完整列表,之后只接收增量更新,大大减少了网络流量。

使用informer可以减少 API 调用: 避免频繁轮询,降低 API Server 压力。

  1. 实时性: 能快速获知资源变化
  2. 缓存: Informer 在本地维护资源缓存,减少查询延迟
  3. 事件处理: 提供 Add/Update/Delete 事件的回调机制

从下图我们可以看到 Informer 包含2个蓝色组件,分别是Reflector,Indexer。其中 Reflector 是用来和 apiserver 建立链接,实时获取最新的数据并交给 Informer,Informer 拿到数据存放到 Indexer 中并通知下游 controller。

[k8s源码]4.informer

代码工厂

这里的podinformer是一个专门用来监视和更新pod信息的informer,是通过工厂informer来实例化的一个informer,以下是一个实例来解释代码工厂的概念:

这里的studentInformer是一种特殊的informer, 里面有一个informerfactory和一个自有的函数,这里使用函数类型func()作为字段,可以在结构体中保存一些动态生成的逻辑或者回调函数,使得结构体的行为可以根据这些函数的不同实现而变化。这种方式可以帮助实现更灵活和可扩展的程序设计。

package main
import (
    "fmt"
)
type Student struct {
    Name string
    Age  int
}
type InformerFactory struct {
    // 一些工厂的字段
}
type StudentInformer struct {
    factory InformerFactory
    defaultInformer func() string
}
func (s *StudentInformer) Informer() string {
    return s.factory.InformerFor(s.defaultInformer)
}
func (f InformerFactory) InformerFor(informerFunc func() string) string {
    // 这里我们简单返回调用的结果
    return informerFunc()
}
func defaultStudentInformer() string {
    return "Student Informer Initialized"
}
func main() {
    factory := InformerFactory{}
    studentInformer := StudentInformer{
        factory: factory,
        defaultInformer: defaultStudentInformer,
    }
    
    result := studentInformer.Informer()
    fmt.Println(result) // 输出: Student Informer Initialized
}

这里InformerFor 方法需要在未来根据不同的情况返回不同的 informer 结果,而不是简单地调用一个预定义的函数,这种设计就显得更加合理。此外,通过 InformerFor 方法,可以对 InformerFactory 进行更多的管理和配置,比如添加日志记录、性能监控等逻辑,而不需要修改 StudentInformer 结构体本身。所以这里使用了informer和informerFor两个方法。

 PodInformer

通过上面的例子,可以更好的理解这里的代码逻辑,PodInformer使用了一个工厂方法internalinterfaces.SharedInformerFactory,他的informer方法调用了SharedInformerFactory中的informerFor方法。

type PodInformer interface {
	Informer() cache.SharedIndexInformer
	Lister() v1.PodLister
}
type podInformer struct {
	factory          internalinterfaces.SharedInformerFactory
	tweakListOptions internalinterfaces.TweakListOptionsFunc
	namespace        string
}
// 返回一个 sharedIndexInformer ,本质上是调用了 sharedInformerFactory 的 InformerFor 方法,
//我在下面贴出来了,sharedInformerFactory 是个工厂方法,这里初始化了 podInformer 并存储在 //factory 的 informers map 中
func (f *podInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
// 返回一个 PodList , 可以用来列出 indexer 中的数据,这里的 indexer 是本地的,
//因此是拿出的pod是只读的,无法修改
func (f *podInformer) Lister() v1.PodLister {
	return v1.NewPodLister(f.Informer().GetIndexer())
}
// 根据传入的对象类型 obj 和创建函数 newFunc,返回一个对应的 SharedIndexInformer 实例。
//如果已经存在相同类型的 informer,则直接返回现有的实例;否则,根据参数创建一个新的实例,
//并在需要时进行存储,以便后续重复使用,从而提高效率和资源利用率。
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
	f.lock.Lock()
	defer f.lock.Unlock()
	informerType := reflect.TypeOf(obj)
	informer, exists := f.informers[informerType]
	if exists {
		return informer
	}
	resyncPeriod, exists := f.customResync[informerType]
	if !exists {
		resyncPeriod = f.defaultResync
	}
	informer = newFunc(f.client, resyncPeriod)
	informer.SetTransform(f.transform)
	f.informers[informerType] = informer
	return informer
}
Informer构造方法

构造方法(NewPodInformer 和 NewFilteredPodInformer)是创建实际 Informer 的地方,而 podInformer 结构体则是对这个 Informer 的一个封装,提供了更高层次的抽象和额外的功能(如 Lister)。podInformer 的 defaultInformer 方法是连接这两部分的桥梁,它在需要时调用构造方法来创建新的 Informer。这种设计允许灵活地创建和管理 Informer,同时提供了一致的接口(PodInformer)供客户端代码使用。

这里的NewPodInformer其实是调用了另一个informer但是自动将 tweakListOptions 参数设为 nil

func NewPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
    return NewFilteredPodInformer(client, namespace, resyncPeriod, indexers, nil)
}

 这里的 NewFilteredPodInformer方法定义了wach和list两个函数,核心是client.CoreV1().Pods(namespace).Watch(context.TODO(), options),client.CoreV1().Pods(namespace).List(context.TODO(), options)

&cache.ListWatch{...} 创建一个 ListWatch 结构体的指针。

ListFunc: 是 ListWatch 结构体的一个字段,它期望一个函数作为值。

func(options metav1.ListOptions) (runtime.Object, error) { ... } 是一个匿名函数,它:

接受一个 metav1.ListOptions 参数

返回一个 runtime.Object 和一个 error

这个函数的内容:

如果 tweakListOptions 不为 nil,就调用它来修改 options

然后调用 client.CoreV1().Pods(namespace).List() 来获取 Pod 列表

func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).List(context.TODO(), options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
			},
		},
		&corev1.Pod{},
		resyncPeriod,
		indexers,
	)
}
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
	return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

&corev1.Pod{},

        resyncPeriod,

        indexers,

这三行简单的代码本身并不直接完成所有复杂的操作,而是为 cache.NewSharedIndexInformer 函数提供了必要的参数。这个函数内部会使用这些参数来设置和配置 Informer。

&corev1.Pod{}:这只是一个类型指示器。NewSharedIndexInformer 函数内部会使用反射来检查这个类型,以确定如何处理从 API 服务器接收的对象。

resyncPeriod:这个参数被传递给 Informer 的内部定时器。Informer 会创建一个 goroutine,定期触发重新同步操作。

indexers:这个参数用于初始化 Informer 的索引器。例如像这样:

cache.Indexers{
    cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
}

这个 Indexers 映射会被传递给 Informer 的内部存储(通常是一个 cache.ThreadSafeStore),用于设置索引函数。 

可以看到最后定义了defaultInformer,是上面构造podInformer的时候,Informer方法中调用InformerFor的时候传递的一个函数。

总结以上内容

这段代码定义了 PodInformer 接口及其实现,采用了工厂模式和延迟初始化策略来管理 Kubernetes Pod 资源的 Informer。核心是 Informer() 方法,它通过调用 factory.InformerFor() 来获取或创建 cache.SharedIndexInformer 实例。这个过程的本质是构造一个 cache.SharedIndexInformer 类型的 informer。

具体流程如下:Informer() 方法调用 InformerFor(),并传递一个 defaultInformer 函数作为参数。这个 defaultInformer 函数实际上返回一个 NewFilteredPodInformer,而 NewFilteredPodInformer 在构造时会创建一个 cache.SharedIndexInformer 类型的对象。InformerFor() 函数首先检查是否已经存在对应类型的 informer 实例。如果已存在,直接返回该实例;如果不存在,则使用提供的 defaultInformer 函数创建新的实例。这种设计确保了 Informer 的单一实例,避免重复创建,从而提高了资源利用效率。

sharedIndexInformer

它包含了以下重要组件:

indexer: 用于存储和索引对象的本地缓存。

controller: 负责管理 Reflector,处理对象的添加、更新和删除。

processor: 处理事件分发给注册的事件处理器。

listerWatcher: 用于列出和监视资源的接口。

objectType: 指定了 Informer 处理的对象类型。

resyncCheckPeriod 和 defaultEventHandlerResyncPeriod: 控制重新同步的周期。

started 和 stopped: 控制 Informer 的生命周期。

watchErrorHandler: 处理监视过程中的错误。

transform: 允许在将对象添加到存储之前对其进行转换。

这个结构体的定义揭示了 SharedIndexInformer 的内部工作机制:

它维护了一个本地缓存(indexer),用于快速检索对象。

通过 controller 和 listerWatcher,它能够持续监视 Kubernetes API 服务器的变化。

使用 processor 来分发事件,确保所有注册的处理器都能接收到对象的变更通知。

通过 resync 机制,它可以周期性地重新同步所有对象,确保本地缓存与服务器状态一致。 

type sharedIndexInformer struct {
	indexer    Indexer
	controller Controller
	processor             *sharedProcessor
	cacheMutationDetector MutationDetector
	listerWatcher ListerWatcher
	// objectType is an example object of the type this informer is expected to handle. If set, an event
	// with an object with a mismatching type is dropped instead of being delivered to listeners.
	objectType runtime.Object
	// objectDescription is the description of this informer's objects. This typically defaults to
	objectDescription string
	// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
	// shouldResync to check if any of our listeners need a resync.
	resyncCheckPeriod time.Duration
	// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
	// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
	// value).
	defaultEventHandlerResyncPeriod time.Duration
	// clock allows for testability
	clock clock.Clock
	started, stopped bool
	startedLock      sync.Mutex
	// blockDeltas gives a way to stop all event distribution so that a late event handler
	// can safely join the shared informer.
	blockDeltas sync.Mutex
	// Called whenever the ListAndWatch drops the connection with an error.
	watchErrorHandler WatchErrorHandler
	transform TransformFunc
}

除了这个结构,sharedIndexInformer 还实现了许多重要的方法来完成其功能。如:Run(stopCh

VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]