本文介绍了如何对Kubernetes 核心组件、扩展机制与API Resource设计概念,以及如何使用定制资源(Custom Resource)与定制控制器(Custom Controller)实现对Kubernetes API Resource的扩展
Kubernetes架构设计
Kubernetes核心组件
Kubernetes系统架构整体采用的是C/S 的架构,其中Master节点作为 Server,各Worker 节点作为 Client。
- Master Node:
- kube-apiserver:提供了资源操作的唯一入口,提供REST API接口,并提供认证、授权、访问控制、API 注册和发现等机制;常见的与kube-apiserver进行交互可以通过kubectl 、client-go
- kube-scheduler:负责kubernetes内资源的调度,按照调度策略将Pod调度到相应的Node上
- kube-controller-manager:控制器管理器,提供了一些内置的Controller,自动化地管理集群状态,包括了资源状态,节点状态等;核心功能是确保集群始终处于预期状态,控制Kubernetes中的资源它们向
spec
配置的期望状态进行收敛 - etcd:持久化集群状态、元数据、集群资源对象
- kube-proxy:负责 Kubernetes 中 Service 的服务发现和负载均衡功能实现
- Worker Node:
- kubelet:负责管理Worker Node上Pod资源的生命周期,kubelet相当于一个代理执行器,接收到kube-apiserver的请求后执行Pod的管理逻辑,定期监控资源的使用状态上报kube-apiserver,以及如下接口的管理
- CRI(Container Runtime Interface):容器运行时接口,提供计算资源
- CNI(Container Network Interface):容器网络接口,提供网络资源
- CSI(Container Storage Interface):容器存储接口,提供存储资源
- Container Runtime: 负责镜像管理及 Pod 和容器运行时接口实现(CRI)
- kubelet:负责管理Worker Node上Pod资源的生命周期,kubelet相当于一个代理执行器,接收到kube-apiserver的请求后执行Pod的管理逻辑,定期监控资源的使用状态上报kube-apiserver,以及如下接口的管理
Kubernetes扩展性
参考:https://kubernetes.io/zh/docs/concepts/extend-kubernetes/
Kubernetes的架构设计是高度可配置、可扩展的,这里主要关注Kubernetes自身的扩展性,即扩充 Kubernetes 的能力并深度集成软件组件。
扩展模式:
- 控制器模式:编写Kubernetes 的客户端程序的一种特定模式,控制器通常读取一个对象的
spec
字段,做出对应的处理,然后更新对象的status
字段。 - Webhook模式:Kubernetes作为客户端调用远程服务的模式。
- Binary Plugin模式:Kubernetes作为客户端执行一个二进制插件程序。
扩展kubernetes的核心组件:
- kube-controller-manager与API Resource扩展:通过使用crd与custom controller、operator framework实现,本文主要讲解这方面的扩展实现。
- kube-apiserver扩展:通过使用API Aggregation layer在不修改 Kubernetes 核心代码的同时扩展 Kubernetes api-server,即将第三方服务注册成Kubernetes api-server提供服务。
- kube-scheduler扩展:通过Kubernetes Scheduling Framework扩展Kubernetes调度机制。
扩展Kubernetes API Resource相关概念
资源(Resource): Resource是 Kubernetes API中的一个端点,用于存储某个类别的API对象的一个集合;如YAML中的kind:Pod、CronJob…
定制资源(Custom Resource): 自定义 API 资源,Custom Resource是对 Kubernetes API 的扩展,定制资源所代表的是对特定Kubernetes API的一种定制。
定制控制器(Custom Controller): Custom Resource本身只能用来存取结构化的数据,需要将Custom Resource与Custom Controller结合,Custom Controller负责监控Custom Resource的变化(创建、删除…)并执行具体的动作。
CRD(CustomResourceDefinitions): Custom Resource的定义,Kubernetes CustomResourceDefinition API资源允许自定义Custom Resource。 定义CRD对象的操作会使用你所设定的名字和模式定义(Schema)创建一个新的Custom Resource,Kubernetes API 会为Custom Resource提供存储和访问服务。
简单的说,可以通过CRD定制自定义API资源即Custom Resource,动态注册到Kubernetes集群中;注册完成后用户可以通过 kubectl 来创建访问自定义API资源对象,类似于操作 Pod 一样。CRD 仅仅只是做资源的定义,需要配合控制器即Custom Controller 去监听Custom Resource的事件触发执行对应的处理逻辑。
Kubernetes 的 Resource 设计概念
通过上文的介绍不难看出,Kubernetes是一个以资源为中心容器编排平台,核心组件kube-api-server通过REST API暴漏资源操作接口、kube-controller-manager控制管理资源的状态、kube-scheduler进行资源的调度。
Group/Version/Resource
kubernetes在资源的概念上进行了分组与版本化,,一个 API对象在Etcd 里的完整资源路径,是由:Group(API 组)、Version(API 版本)和 Resource(API 资源类型)三个部分组成,如下图所示。
资源间的关系如下:
- Kubernetes支持多个 Group(资源组)
- 每个Group支持多个Version(资源版本)
- 每个Version支持多种Resource(资源),部分资源还拥有自己的子资源
- Kind 与 Resource 属于同一级概念,Kind 用于描述 Resource 的种类,一般情况下Kind与Resource是一一对应的关系,例如
pods
Resource 对应于Pod
Kind
定位资源的形式如下:
<GROUP>/<VERSION>/<RESOURCE>[/<SUBSOURCE>]
# 以Deployment为例子
apps/v1/deployments/status
资源对象(资源描述)即Resource Object描述如下:
<GROUP>/<VERSION>, Kind=<RESOURCE_NAME>
# 以Deployment为例子
apps/v1, Kind=Deployment
Group
- 资源组的划分依据是资源的功能,Kubernetes支持不同资源组中拥有不同资源版本,方便组内资源迭代升级
- 对于 Kubernetes 里的核心 API 对象(如:Pod…)是无组名Group(即:Group 是“”),在 /api 这个层级下;对于Kubernetes里的非核心 API 对象(如:CronJob…)是有组名Group,在 /apis 这个层级下
有组名 Group 资源: .../apis/<GROUP>/<VERSION>/<RESOURCE>
无组名 Group 资源: .../api/<VERSION>/<RESOURCE>
Version
Kubernetes 的资源版本 Version 采用语义化的版本号
- Alpha 阶段:内部测试版本,Alpha 版本中的功能默认情况下会被禁用,常见命名方式如 v1alpha1。
- Beta 阶段:相对稳定版本,经过了官方和社区的测试,Beta 阶段下的功能默认是开启的,常见命名方式如 v2beta1。
- Stable 阶段:正式发布版本,命名方式如 v1、v2 。
Resouce
Resource 是 Kubernetes 中的核心概念
- Resource 实例化后称为一个 Resource Object。
- Kubernetes 中所有的 Resource Object 都称为 Entity。
- 可以通过 Kubenetes API Server 去操作 Resource Object。
Kubernetes 目前的 Entity 分为两大类:
- Persistent Entity:持久化实体,Resource Object 创建后会持久存在,如 Deployment / Service。
- Ephemeral Entity: 短暂实体,Resource Object 创建后不稳定,如出现故障/调度失败后不再重建,如Pod。
资源操作方法
Kubernetes资源YAML文件提交给kube-apiserver后,会被转换为Resouce Object,序列化后持久化到Etcd中;对资源的操作方法主要有如下8种:
- create:Resource Object 创建
- delete:Resource Object 删除
- deletecollection:多个 Resource Objects 删除
- patch:Resource Object 局部字段更新
- update:Resource Object 整体更新
- get:Resource Object 获取
- list:多个 Resource Objects 获取
- watch:Resource Objects 监控
如何创建CRD
参考文档:
https://kubernetes.io/zh/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
# 名字必需与下面的 spec 字段匹配,并且格式为 '<名称的复数形式>.<组名>'
name: crontabs.stable.example.com
spec:
# 组名称,用于 REST API: /apis/<组>/<版本>
group: stable.example.com
# 列举此 CustomResourceDefinition 所支持的版本
versions:
- name: v1
# 每个版本都可以通过 served 标志来独立启用或禁止
served: true
# 其中一个且只有一个版本必需被标记为存储版本
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
cronSpec:
type: string
image:
type: string
replicas:
type: integer
# 可以是 Namespaced 或 Cluster
scope: Namespaced
names:
# 名称的复数形式,用于 URL:/apis/<组>/<版本>/<名称的复数形式>
plural: crontabs
# 名称的单数形式,作为命令行使用时和显示时的别名
singular: crontab
# kind 通常是单数形式的帕斯卡编码(PascalCased)形式。你的资源清单会使用这一形式。
kind: CronTab
# shortNames 允许你在命令行使用较短的字符串来匹配资源
shortNames:
- ct
如上CRD所示, 指定group为stable.example.com
,version为v1
,CustomResource为CronTab,scope为Namespaced(CronTab属于Namespace的对象),然后需要设置CustomResource的对象描述,包括:Spec、Status …;
通过以下命令创建CRD后,会创建一个新的 namespace 级别的 RESTful API 就会被创建:/apis/stable.example.com/v1/namespaces/*/crontabs/...
,用以创建和管理CustomResource CronTab
。在创建CRD时,Kubernetes 会对我们提交的声明文件进行校验(基于 OpenAPI v3 schem 进行规范)。如果想要更加复杂的校验,需要通过 Kubernetes 的 admission webhook 进行实现。
kubectl apply -f crd.yaml
创建完CRD后,可以通过如下方式定义一个CronTab资源对象
apiVersion: "stable.example.com/v1"
kind: CronTab
metadata:
name: my-cron-object
spec:
cronSpec: "* * * * */5"
image: my-image
创建完CronTab资源对象后,就可以通过kubectl来管理自定义资源对象
$ kubectl get crontab
NAME AGE
my-cron-tab 93s
$ kubectl get crontab -o yaml
...
如何创建Custom Controller(sample-controller示例)
参考:
kubernetes官方Custom Controller示例:https://github.com/kubernetes/sample-controller
示例演示了:
- 如何使用 CustomResourceDefinition注册类型的新自定义资源(自定义资源类型)
Foo
- 如何创建/获取/列出新资源类型的实例
Foo
- 如何基于控制器处理资源
Foo
创建/更新/删除事件
示例控制器基于client-go 库 进行Controller开发;该项目的tools/cache目录下包含了开发Custom Controller 使用的各种工具、机制。
Client-go Client对象
- RESTClient:client-go 中最基础的客户端,其它 client 都基于 RESTClient 实现,RESTClient 实现了 RESTful 风格的 API 请求封装,可以实现对任意 Kubernetes 资源(包括内置资源及 CRDs)的 RESTful 风格交互,如 Post() / Delete() / Put() / Get(),同时支持 Json 和 protobuf;
- ClientSet:与 Kubernetes 内置资源对象交互最常用的 Client,强调只能处理 Kubernetes 内置资源,不包括 CRD 自定义资源,使用时需要指定 Group、指定 Version,然后根据 Resource 获取。ClientSet 的操作代码是通过 client-gen 代码生成器自动生成的;
- DynamicClient:DynamicClient 能处理包括 CRD 自定义资源在内的任意 kubernetes 资源。如果一个 Controller 中需要控制所有的 API,可以使用Dynamic Client,DynamicClient 只支持JSON;
- DiscoveryClient:用于发现 kube-apiserver 支持的 Group / Version / Resource 信息;
Client-go 组件工作流程与以及与Custom Controller的交互点
client-go组件:
- Reflector:通过 Kubernetes API 监控 Kubernetes 的资源类型,通过List/Watch 机制, 获取&监听资源对象实例的变化,添加 object 对象到 FIFO 队列,后续Informer 会从队列中进行数据获取。
- Informer:controller 机制的基础,控制循环(Control Loop)处理,从队列中取出数据,添加到 Indexer 进行数据缓存,提供对象监听事件回调处理的 handler 接口,通过 给Informer 添加 ResourceEventHandler 实例的回调函数,通过实现OnAdd(obj interface{})、 OnUpdate(oldObj, newObj interface{}) 和 OnDelete(obj interface{}) 方法处理资源的创建、更新和删除操作。
- Indexer:提供 object 对象的索引,缓存对象信息,indexer是线程安全的存储。
Custom Controller组件:
- Informer与Indexer的reference: 通过client-go 提供的
NewIndexerInformer
函数进行创建。 - Resource Event Handlers:Informer在要将object 对象传递给Custom Controller 时将调用的回调函数,Resource Event Handlers 被回调后会将Object Key写入到Work queue中。
- Process Item:从Work queue中取出Object key(事件通知) 进行后续处理。
Simaple-controller核心逻辑
- 项目结构
└── sample-controller
├── artifacts # yaml示例,如crd.yaml、example-foo.yaml
│ └── examples
├── code-of-conduct.md
├── CONTRIBUTING.md
├── controller.go # custom controller实现,核心逻辑
├── controller_test.go
├── docs
│ ├── controller-client-go.md
│ └── images
├── go.mod
├── go.sum
├── hack # code generation 工具类
│ ├── boilerplate.go.txt
│ ├── custom-boilerplate.go.txt
│ ├── tools.go
│ ├── update-codegen.sh
│ └── verify-codegen.sh
├── LICENSE
├── main.go # 启动函数,参数配置与初始化逻辑
├── OWNERS
├── pkg # 资源定义文件与自动生成的代码
│ ├── apis
│ ├── generated
│ └── signals
├── README.md
└── SECURITY_CONTACTS
- 功能
使用 Custom Resource Foo
的deploymentName、replicas 属性与Custom Resource Controller 创建、更新 Deployment。
- 代码生成
此项目利用k8s.io/code-generator中的生成器 来生成typed client、informers、listers 和deep-copy functions。自动生成了如下文件与目录
pkg/apis/samplecontroller/v1alpha1/zz_generated.deepcopy.go
pkg/generated/
- 核心逻辑-main.go(pseudocode)
func main() {
...
// 创建clientset,kubeClient(操作除自定义资源组外的其他资源)、exampleClient(操作自定义资源组)
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
klog.Fatalf("Error building kubeconfig: %s", err.Error())
}
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
klog.Fatalf("error building kubernetes clientset: %s", err.Error())
}
exampleClient, err := clientset.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Error building example clientset: %s", err.Error())
}
// 创建Informer
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
// 创建 controller,传入 clientset 和 informer
controller := NewController(kubeClient, exampleClient,
kubeInformerFactory.Apps().V1().Deployments(),
exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
// 运行 Informer,Start方法非阻塞,运行在单独的 goroutine 中
kubeInformerFactory.Start(stopCh)
exampleInformerFactory.Start(stopCh)
//运行Custom Controller
if err = controller.Run(2, stopCh); err != nil {
klog.Fatalf("Error running controller: %s", err.Error())
}
}
- 核心逻辑controller.go (pseudocode)
/*
*** main.go
*/
// 创建 clientset
kubeClient, err := kubernetes.NewForConfig(cfg) // k8s clientset, "k8s.io/client-go/kubernetes"
exampleClient, err := clientset.NewForConfig(cfg) // sample clientset, "k8s.io/sample-controller/pkg/generated/clientset/versioned"
// 创建 Informer
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) // k8s informer, "k8s.io/client-go/informers"
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30) // sample informer, "k8s.io/sample-controller/pkg/generated/informers/externalversions"
// 创建 controller,传入 clientset 和 informer
controller := NewController(kubeClient, exampleClient,
kubeInformerFactory.Apps().V1().Deployments(),
exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
// 运行 Informer,Start 方法为非阻塞,会运行在单独的 goroutine 中
kubeInformerFactory.Start(stopCh)
exampleInformerFactory.Start(stopCh)
// 运行 controller
controller.Run(2, stopCh)
/*
*** controller.go
*/
NewController() *Controller {}
// 将 CRD 资源类型定义加入到 Kubernetes 的 Scheme 中,以便 Events 可以记录 CRD 的事件
utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))
//创建 Event Broadcaster
eventBroadcaster := record.NewBroadcaster()
// ... ...
// 监听 CRD 类型Foo变化并注册 ResourceEventHandler方法,当Foo的实例变化时获取Foo资源并将其转换为 namespace/name字符(Key),然后将其放入工作队列中。
fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueFoo,
UpdateFunc: func(old, new interface{}) {
controller.enqueueFoo(new)
},
})
// 监听Deployment变化并注册ResourceEventHandler方法,
// 当它的 ownerReferences 为 Foo 类型实例时,将该Foo资源加入 work queue
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newDepl := new.(*appsv1.Deployment)
oldDepl := old.(*appsv1.Deployment)
if newDepl.ResourceVersion == oldDepl.ResourceVersion {
return
}
controller.handleObject(new)
},
DeleteFunc: controller.handleObject,
})
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {}
// 在启动 worker 前等待缓存同步
if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
// 运行两个 worker 来处理资源
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
// 不断的调用 processNextWorkItem 处理下一个对象
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
// 从workqueue中获取下一个对象并进行处理,通过调用 syncHandler
func (c *Controller) processNextWorkItem() bool {
//从workqueue获取obj
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
// 调用 workqueue.Done(obj) 方法告诉 workqueue 当前项已经处理完毕,
// 如果我们不想让当前项重新入队,就需要调用 workqueue.Forget(obj)。
// 当我们没有调用Forget时,当前项会重新入队 workqueue 并在一段时间后重新被获取。
defer c.workqueue.Done(obj)
var key string
var ok bool
// 格式校验,我们期望的是 key 'namespace/name' 格式的 string
if key, ok = obj.(string); !ok {
// 无效的项调用Forget方法,避免重新入队。
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
//运行 syncHandler,传递Foo资源的namespace/name字符串
if err := c.syncHandler(key); err != nil {
// 放回workqueue避免偶发的异常
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// 如果没有异常,Forget当前项,同步成功
c.workqueue.Forget(obj)
klog.Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
// 将实际状态与期望的状态进行比较,然后尝试将两者收敛,然后它用资源的当前状态更新Foo资源的Status块。
func (c *Controller) syncHandler(key string) error {
// 通过 workqueue 中的 key 解析出 namespace 和 name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
// 调用 lister 接口通过 namespace 和 name 获取 Foo 实例
foo, err := c.foosLister.Foos(namespace).Get(name)
deploymentName := foo.Spec.DeploymentName
// 获取 Foo 实例中定义的 deploymentname
deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
//如果没有发现对应的 deployment,创建一个新的deployment。并还在资源上设置适当的 OwnerReferences,以便 handleObject 可以发现“拥有”它的 Foo 资源。
if errors.IsNotFound(err) {
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo))
}
// deployment OwnerReferences 不是 Foo 实例,warning并返回错误
if !metav1.IsControlledBy(deployment, foo) {
msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
return fmt.Errorf(msg)
}
// deployment 中 的配置和 Foo 实例中 Spec 的配置不一致,即更新 deployment
if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo))
}
// 更新 Foo 实例状态
err = c.updateFooStatus(foo, deployment)
c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
}
- 运行
# assumes you have a working kubeconfig, not required if operating in-cluster
go build -o sample-controller .
./sample-controller -kubeconfig=$HOME/.kube/config
# create a CustomResourceDefinition
kubectl create -f artifacts/examples/crd-status-subresource.yaml
# create a custom resource of type Foo
kubectl create -f artifacts/examples/example-foo.yaml
# check deployments created through the custom resource
kubectl get deployments
Kubernetes Controller 间通讯方式
Kubernetes 三大核心组建之一的kube-controller-manager,是运行Controller组建进程的控制平面组件,包含了如下Controller集合
Kubernetes中不同的Controller间也会进行通讯,以Deployment Controller为例子:
- 用户通过 Kubectl 创建 Deployment,APIServer接收到请求后会对该请求进行权限、准入校验,鉴权通过后将 Deployment 的资源信息存储到 Etcd中。
- Deployment Controller 基于List/Watch机制,收到Deployment资源的Add事件并处理,为该 Deployment 创建 Replicaset。
- APIServer 接收到 Replicaset创建请求后,Replicaset的Add事件将被发布,随后ReplicaSet Controller接收到该事件,进行对应的处理逻辑:创建 Pod。
可以看到,Kubernetes Controller基于事件订阅-分发的工作方式,进行Controller间的通信、协调操作;也因为其开放的工作机制,让我们可以自由的定制、开发自己的Custom Controller。