本文介绍了如何对Kubernetes 核心组件、扩展机制与API Resource设计概念,以及如何使用定制资源(Custom Resource)与定制控制器(Custom Controller)实现对Kubernetes API Resource的扩展

Kubernetes架构设计

Kubernetes核心组件

Kubernetes系统架构整体采用的是C/S 的架构,其中Master节点作为 Server,各Worker 节点作为 Client。

  • Master Node:
    • kube-apiserver:提供了资源操作的唯一入口,提供REST API接口,并提供认证、授权、访问控制、API 注册和发现等机制;常见的与kube-apiserver进行交互可以通过kubectl 、client-go
    • kube-scheduler:负责kubernetes内资源的调度,按照调度策略将Pod调度到相应的Node上
    • kube-controller-manager:控制器管理器,提供了一些内置的Controller,自动化地管理集群状态,包括了资源状态,节点状态等;核心功能是确保集群始终处于预期状态,控制Kubernetes中的资源它们向 spec 配置的期望状态进行收敛
    • etcd:持久化集群状态、元数据、集群资源对象
    • kube-proxy:负责 Kubernetes 中 Service 的服务发现和负载均衡功能实现
  • Worker Node:
    • kubelet:负责管理Worker Node上Pod资源的生命周期,kubelet相当于一个代理执行器,接收到kube-apiserver的请求后执行Pod的管理逻辑,定期监控资源的使用状态上报kube-apiserver,以及如下接口的管理
      • CRI(Container Runtime Interface):容器运行时接口,提供计算资源
      • CNI(Container Network Interface):容器网络接口,提供网络资源
      • CSI(Container Storage Interface):容器存储接口,提供存储资源
    • Container Runtime: 负责镜像管理及 Pod 和容器运行时接口实现(CRI)

image

Kubernetes扩展性

参考:https://kubernetes.io/zh/docs/concepts/extend-kubernetes/

Kubernetes的架构设计是高度可配置、可扩展的,这里主要关注Kubernetes自身的扩展性,即扩充 Kubernetes 的能力并深度集成软件组件。

扩展模式:

  • 控制器模式:编写Kubernetes 的客户端程序的一种特定模式,控制器通常读取一个对象的 spec 字段,做出对应的处理,然后更新对象的 status 字段。
  • Webhook模式:Kubernetes作为客户端调用远程服务的模式。
  • Binary Plugin模式:Kubernetes作为客户端执行一个二进制插件程序。

image

扩展kubernetes的核心组件:

  • kube-controller-manager与API Resource扩展:通过使用crd与custom controller、operator framework实现,本文主要讲解这方面的扩展实现。
  • kube-apiserver扩展:通过使用API Aggregation layer在不修改 Kubernetes 核心代码的同时扩展 Kubernetes api-server,即将第三方服务注册成Kubernetes api-server提供服务。
  • kube-scheduler扩展:通过Kubernetes Scheduling Framework扩展Kubernetes调度机制。

扩展Kubernetes API Resource相关概念

资源(Resource): Resource是 Kubernetes API中的一个端点,用于存储某个类别的API对象的一个集合;如YAML中的kind:Pod、CronJob…

定制资源(Custom Resource): 自定义 API 资源,Custom Resource是对 Kubernetes API 的扩展,定制资源所代表的是对特定Kubernetes API的一种定制。

定制控制器(Custom Controller): Custom Resource本身只能用来存取结构化的数据,需要将Custom Resource与Custom Controller结合,Custom Controller负责监控Custom Resource的变化(创建、删除…)并执行具体的动作。

CRD(CustomResourceDefinitions): Custom Resource的定义,Kubernetes CustomResourceDefinition API资源允许自定义Custom Resource。 定义CRD对象的操作会使用你所设定的名字和模式定义(Schema)创建一个新的Custom Resource,Kubernetes API 会为Custom Resource提供存储和访问服务。

简单的说,可以通过CRD定制自定义API资源即Custom Resource,动态注册到Kubernetes集群中;注册完成后用户可以通过 kubectl 来创建访问自定义API资源对象,类似于操作 Pod 一样。CRD 仅仅只是做资源的定义,需要配合控制器即Custom Controller 去监听Custom Resource的事件触发执行对应的处理逻辑。

Kubernetes 的 Resource 设计概念

通过上文的介绍不难看出,Kubernetes是一个以资源为中心容器编排平台,核心组件kube-api-server通过REST API暴漏资源操作接口、kube-controller-manager控制管理资源的状态、kube-scheduler进行资源的调度。

Group/Version/Resource

kubernetes在资源的概念上进行了分组与版本化,,一个 API对象在Etcd 里的完整资源路径,是由:Group(API 组)、Version(API 版本)和 Resource(API 资源类型)三个部分组成,如下图所示。

image

资源间的关系如下:

  • Kubernetes支持多个 Group(资源组)
  • 每个Group支持多个Version(资源版本)
  • 每个Version支持多种Resource(资源),部分资源还拥有自己的子资源
  • Kind 与 Resource 属于同一级概念,Kind 用于描述 Resource 的种类,一般情况下Kind与Resource是一一对应的关系,例如pods Resource 对应于 Pod Kind

定位资源的形式如下:

<GROUP>/<VERSION>/<RESOURCE>[/<SUBSOURCE>]

# 以Deployment为例子
apps/v1/deployments/status

资源对象(资源描述)即Resource Object描述如下:

<GROUP>/<VERSION>, Kind=<RESOURCE_NAME>

# 以Deployment为例子
apps/v1, Kind=Deployment

Group

  • 资源组的划分依据是资源的功能,Kubernetes支持不同资源组中拥有不同资源版本,方便组内资源迭代升级
  • 对于 Kubernetes 里的核心 API 对象(如:Pod…)是无组名Group(即:Group 是“”),在 /api 这个层级下;对于Kubernetes里的非核心 API 对象(如:CronJob…)是有组名Group,在 /apis 这个层级下
有组名 Group 资源: .../apis/<GROUP>/<VERSION>/<RESOURCE>
无组名 Group 资源: .../api/<VERSION>/<RESOURCE>

Version

Kubernetes 的资源版本 Version 采用语义化的版本号

  • Alpha 阶段:内部测试版本,Alpha 版本中的功能默认情况下会被禁用,常见命名方式如 v1alpha1。
  • Beta 阶段:相对稳定版本,经过了官方和社区的测试,Beta 阶段下的功能默认是开启的,常见命名方式如 v2beta1。
  • Stable 阶段:正式发布版本,命名方式如 v1、v2 。

Resouce

Resource 是 Kubernetes 中的核心概念

  • Resource 实例化后称为一个 Resource Object。
  • Kubernetes 中所有的 Resource Object 都称为 Entity。
  • 可以通过 Kubenetes API Server 去操作 Resource Object。

Kubernetes 目前的 Entity 分为两大类:

  • Persistent Entity:持久化实体,Resource Object 创建后会持久存在,如 Deployment / Service。
  • Ephemeral Entity: 短暂实体,Resource Object 创建后不稳定,如出现故障/调度失败后不再重建,如Pod。

资源操作方法

Kubernetes资源YAML文件提交给kube-apiserver后,会被转换为Resouce Object,序列化后持久化到Etcd中;对资源的操作方法主要有如下8种:

  • create:Resource Object 创建
  • delete:Resource Object 删除
  • deletecollection:多个 Resource Objects 删除
  • patch:Resource Object 局部字段更新
  • update:Resource Object 整体更新
  • get:Resource Object 获取
  • list:多个 Resource Objects 获取
  • watch:Resource Objects 监控

如何创建CRD

参考文档:

https://kubernetes.io/zh/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  # 名字必需与下面的 spec 字段匹配,并且格式为 '<名称的复数形式>.<组名>'
  name: crontabs.stable.example.com
spec:
  # 组名称,用于 REST API: /apis/<组>/<版本>
  group: stable.example.com
  # 列举此 CustomResourceDefinition 所支持的版本
  versions:
    - name: v1
      # 每个版本都可以通过 served 标志来独立启用或禁止
      served: true
      # 其中一个且只有一个版本必需被标记为存储版本
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                cronSpec:
                  type: string
                image:
                  type: string
                replicas:
                  type: integer
  # 可以是 Namespaced 或 Cluster
  scope: Namespaced
  names:
    # 名称的复数形式,用于 URL:/apis/<组>/<版本>/<名称的复数形式>
    plural: crontabs
    # 名称的单数形式,作为命令行使用时和显示时的别名
    singular: crontab
    # kind 通常是单数形式的帕斯卡编码(PascalCased)形式。你的资源清单会使用这一形式。
    kind: CronTab
    # shortNames 允许你在命令行使用较短的字符串来匹配资源
    shortNames:
    - ct

如上CRD所示, 指定group为stable.example.com,version为v1,CustomResource为CronTab,scope为Namespaced(CronTab属于Namespace的对象),然后需要设置CustomResource的对象描述,包括:Spec、Status …;

通过以下命令创建CRD后,会创建一个新的 namespace 级别的 RESTful API 就会被创建:/apis/stable.example.com/v1/namespaces/*/crontabs/...,用以创建和管理CustomResource CronTab。在创建CRD时,Kubernetes 会对我们提交的声明文件进行校验(基于 OpenAPI v3 schem 进行规范)。如果想要更加复杂的校验,需要通过 Kubernetes 的 admission webhook 进行实现。

kubectl apply -f crd.yaml

创建完CRD后,可以通过如下方式定义一个CronTab资源对象

apiVersion: "stable.example.com/v1"
kind: CronTab
metadata:
  name: my-cron-object
spec:
  cronSpec: "* * * * */5"
  image: my-image

创建完CronTab资源对象后,就可以通过kubectl来管理自定义资源对象

$ kubectl get crontab
NAME          AGE
my-cron-tab   93s
$ kubectl get crontab -o yaml
...

如何创建Custom Controller(sample-controller示例)

参考:

kubernetes官方Custom Controller示例:https://github.com/kubernetes/sample-controller

示例演示了:

  • 如何使用 CustomResourceDefinition注册类型的新自定义资源(自定义资源类型)Foo
  • 如何创建/获取/列出新资源类型的实例Foo
  • 如何基于控制器处理资源Foo创建/更新/删除事件

示例控制器基于client-go 库 进行Controller开发;该项目的tools/cache目录下包含了开发Custom Controller 使用的各种工具、机制。

Client-go Client对象

  • RESTClient:client-go 中最基础的客户端,其它 client 都基于 RESTClient 实现,RESTClient 实现了 RESTful 风格的 API 请求封装,可以实现对任意 Kubernetes 资源(包括内置资源及 CRDs)的 RESTful 风格交互,如 Post() / Delete() / Put() / Get(),同时支持 Json 和 protobuf;
  • ClientSet:与 Kubernetes 内置资源对象交互最常用的 Client,强调只能处理 Kubernetes 内置资源,不包括 CRD 自定义资源,使用时需要指定 Group、指定 Version,然后根据 Resource 获取。ClientSet 的操作代码是通过 client-gen 代码生成器自动生成的;
  • DynamicClient:DynamicClient 能处理包括 CRD 自定义资源在内的任意 kubernetes 资源。如果一个 Controller 中需要控制所有的 API,可以使用Dynamic Client,DynamicClient 只支持JSON;
  • DiscoveryClient:用于发现 kube-apiserver 支持的 Group / Version / Resource 信息;

Client-go 组件工作流程与以及与Custom Controller的交互点

image

client-go组件:

  • Reflector:通过 Kubernetes API 监控 Kubernetes 的资源类型,通过List/Watch 机制, 获取&监听资源对象实例的变化,添加 object 对象到 FIFO 队列,后续Informer 会从队列中进行数据获取。
  • Informer:controller 机制的基础,控制循环(Control Loop)处理,从队列中取出数据,添加到 Indexer 进行数据缓存,提供对象监听事件回调处理的 handler 接口,通过 给Informer 添加 ResourceEventHandler 实例的回调函数,通过实现OnAdd(obj interface{})、 OnUpdate(oldObj, newObj interface{}) 和 OnDelete(obj interface{}) 方法处理资源的创建、更新和删除操作。
  • Indexer:提供 object 对象的索引,缓存对象信息,indexer是线程安全的存储。

Custom Controller组件:

  • Informer与Indexer的reference: 通过client-go 提供的NewIndexerInformer函数进行创建。
  • Resource Event Handlers:Informer在要将object 对象传递给Custom Controller 时将调用的回调函数,Resource Event Handlers 被回调后会将Object Key写入到Work queue中。
  • Process Item:从Work queue中取出Object key(事件通知) 进行后续处理。

Simaple-controller核心逻辑

  • 项目结构
└── sample-controller
    ├── artifacts 				# yaml示例,如crd.yaml、example-foo.yaml
    │   └── examples
    ├── code-of-conduct.md
    ├── CONTRIBUTING.md
    ├── controller.go			# custom controller实现,核心逻辑
    ├── controller_test.go
    ├── docs
    │   ├── controller-client-go.md
    │   └── images
    ├── go.mod
    ├── go.sum
    ├── hack					# code generation 工具类
    │   ├── boilerplate.go.txt
    │   ├── custom-boilerplate.go.txt
    │   ├── tools.go
    │   ├── update-codegen.sh
    │   └── verify-codegen.sh
    ├── LICENSE
    ├── main.go					# 启动函数,参数配置与初始化逻辑
    ├── OWNERS
    ├── pkg						# 资源定义文件与自动生成的代码
    │   ├── apis
    │   ├── generated
    │   └── signals
    ├── README.md
    └── SECURITY_CONTACTS
  • 功能

使用 Custom Resource Foo 的deploymentName、replicas 属性与Custom Resource Controller 创建、更新 Deployment。

  • 代码生成

此项目利用k8s.io/code-generator中的生成器 来生成typed client、informers、listers 和deep-copy functions。自动生成了如下文件与目录

pkg/apis/samplecontroller/v1alpha1/zz_generated.deepcopy.go

pkg/generated/

  • 核心逻辑-main.go(pseudocode)
func main() {
	...
	// 创建clientset,kubeClient(操作除自定义资源组外的其他资源)、exampleClient(操作自定义资源组)
	cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
	if err != nil {
		klog.Fatalf("Error building kubeconfig: %s", err.Error())
	}

	kubeClient, err := kubernetes.NewForConfig(cfg)
	if err != nil {
		klog.Fatalf("error building kubernetes clientset: %s", err.Error())
	}

	exampleClient, err := clientset.NewForConfig(cfg)
	if err != nil {
		klog.Fatalf("Error building example clientset: %s", err.Error())
	}
	// 创建Informer
	kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
	exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)

	// 创建 controller,传入 clientset 和 informer
	controller := NewController(kubeClient, exampleClient,
		kubeInformerFactory.Apps().V1().Deployments(),
		exampleInformerFactory.Samplecontroller().V1alpha1().Foos())

	// 运行 Informer,Start方法非阻塞,运行在单独的 goroutine 中
	kubeInformerFactory.Start(stopCh)
	exampleInformerFactory.Start(stopCh)
	
	//运行Custom Controller
	if err = controller.Run(2, stopCh); err != nil {
		klog.Fatalf("Error running controller: %s", err.Error())
	}
}
  • 核心逻辑controller.go (pseudocode)
/*
*** main.go
*/
// 创建 clientset
kubeClient, err := kubernetes.NewForConfig(cfg)		// k8s clientset, "k8s.io/client-go/kubernetes"
exampleClient, err := clientset.NewForConfig(cfg)	// sample clientset, "k8s.io/sample-controller/pkg/generated/clientset/versioned"

// 创建 Informer
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)		// k8s informer, "k8s.io/client-go/informers"
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)		// sample informer, "k8s.io/sample-controller/pkg/generated/informers/externalversions"

// 创建 controller,传入 clientset 和 informer
controller := NewController(kubeClient, exampleClient,
		kubeInformerFactory.Apps().V1().Deployments(),
		exampleInformerFactory.Samplecontroller().V1alpha1().Foos())

// 运行 Informer,Start 方法为非阻塞,会运行在单独的 goroutine 中
kubeInformerFactory.Start(stopCh)
exampleInformerFactory.Start(stopCh)

// 运行 controller
controller.Run(2, stopCh)

/*
*** controller.go 
*/
NewController() *Controller {}
	// 将 CRD 资源类型定义加入到 Kubernetes 的 Scheme 中,以便 Events 可以记录 CRD 的事件
	utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))

	//创建 Event Broadcaster
	eventBroadcaster := record.NewBroadcaster()
	// ... ...

// 监听 CRD 类型Foo变化并注册 ResourceEventHandler方法,当Foo的实例变化时获取Foo资源并将其转换为 namespace/name字符(Key),然后将其放入工作队列中。
	fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: controller.enqueueFoo,
		UpdateFunc: func(old, new interface{}) {
			controller.enqueueFoo(new)
		},
	})

	// 监听Deployment变化并注册ResourceEventHandler方法,
	// 当它的 ownerReferences 为 Foo 类型实例时,将该Foo资源加入 work queue
	deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: controller.handleObject,
		UpdateFunc: func(old, new interface{}) {
			newDepl := new.(*appsv1.Deployment)
			oldDepl := old.(*appsv1.Deployment)
			if newDepl.ResourceVersion == oldDepl.ResourceVersion {
				return
			}
			controller.handleObject(new)
		},
		DeleteFunc: controller.handleObject,
	})

func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {}
	// 在启动 worker 前等待缓存同步
	if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
		return fmt.Errorf("failed to wait for caches to sync")
	}
	// 运行两个 worker 来处理资源
	for i := 0; i < workers; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}
	// 不断的调用 processNextWorkItem 处理下一个对象
	func (c *Controller) runWorker() {
		for c.processNextWorkItem() {
		}
	}
	// 从workqueue中获取下一个对象并进行处理,通过调用 syncHandler
	func (c *Controller) processNextWorkItem() bool {
        //从workqueue获取obj
		obj, shutdown := c.workqueue.Get()
		if shutdown {
			return false
		}
		err := func(obj interface{}) error {
			// 调用 workqueue.Done(obj) 方法告诉 workqueue 当前项已经处理完毕,
			// 如果我们不想让当前项重新入队,就需要调用 workqueue.Forget(obj)。
			// 当我们没有调用Forget时,当前项会重新入队 workqueue 并在一段时间后重新被获取。
			defer c.workqueue.Done(obj)
			var key string
			var ok bool
			// 格式校验,我们期望的是 key 'namespace/name' 格式的 string
			if key, ok = obj.(string); !ok {
				// 无效的项调用Forget方法,避免重新入队。
				c.workqueue.Forget(obj)
				utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
				return nil
			}
            //运行 syncHandler,传递Foo资源的namespace/name字符串
			if err := c.syncHandler(key); err != nil {
				// 放回workqueue避免偶发的异常
				c.workqueue.AddRateLimited(key)
				return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
			}
            
			// 如果没有异常,Forget当前项,同步成功
			c.workqueue.Forget(obj)
			klog.Infof("Successfully synced '%s'", key)
			return nil
		}(obj)
        
		if err != nil {
			utilruntime.HandleError(err)
			return true
		}

		return true
	}
	// 将实际状态与期望的状态进行比较,然后尝试将两者收敛,然后它用资源的当前状态更新Foo资源的Status块。
	func (c *Controller) syncHandler(key string) error {
		// 通过 workqueue 中的 key 解析出 namespace 和 name
		namespace, name, err := cache.SplitMetaNamespaceKey(key)
		// 调用 lister 接口通过 namespace 和 name 获取 Foo 实例
		foo, err := c.foosLister.Foos(namespace).Get(name)
		deploymentName := foo.Spec.DeploymentName
		// 获取 Foo 实例中定义的 deploymentname
		deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
		//如果没有发现对应的 deployment,创建一个新的deployment。并还在资源上设置适当的 OwnerReferences,以便 handleObject 可以发现“拥有”它的 Foo 资源。
		if errors.IsNotFound(err) {
			deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo))
		}
		// deployment OwnerReferences 不是 Foo 实例,warning并返回错误
		if !metav1.IsControlledBy(deployment, foo) {
			msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
			c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
			return fmt.Errorf(msg)
		}
		// deployment 中 的配置和 Foo 实例中 Spec 的配置不一致,即更新 deployment
		if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
			deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo))
		}
		// 更新 Foo 实例状态
		err = c.updateFooStatus(foo, deployment)
		c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
	}
  • 运行
# assumes you have a working kubeconfig, not required if operating in-cluster
go build -o sample-controller .
./sample-controller -kubeconfig=$HOME/.kube/config

# create a CustomResourceDefinition
kubectl create -f artifacts/examples/crd-status-subresource.yaml

# create a custom resource of type Foo
kubectl create -f artifacts/examples/example-foo.yaml

# check deployments created through the custom resource
kubectl get deployments

Kubernetes Controller 间通讯方式

Kubernetes 三大核心组建之一的kube-controller-manager,是运行Controller组建进程的控制平面组件,包含了如下Controller集合

image

Kubernetes中不同的Controller间也会进行通讯,以Deployment Controller为例子:

  1. 用户通过 Kubectl 创建 Deployment,APIServer接收到请求后会对该请求进行权限、准入校验,鉴权通过后将 Deployment 的资源信息存储到 Etcd中。
  2. Deployment Controller 基于List/Watch机制,收到Deployment资源的Add事件并处理,为该 Deployment 创建 Replicaset。
  3. APIServer 接收到 Replicaset创建请求后,Replicaset的Add事件将被发布,随后ReplicaSet Controller接收到该事件,进行对应的处理逻辑:创建 Pod。

image

可以看到,Kubernetes Controller基于事件订阅-分发的工作方式,进行Controller间的通信、协调操作;也因为其开放的工作机制,让我们可以自由的定制、开发自己的Custom Controller。