Kubernetes Operator 开发实战:从 CRD 到服务网格 Istio 落地的完整工程指南
2026年,云原生技术已进入规模化落地的深水区。Kubernetes Operator 作为应用自动化运维的核心工具,正在重新定义我们管理和运维复杂分布式系统的方式。本文将从零开始,带你深入理解 Operator 的设计哲学,掌握 controller-runtime 开发框架,并最终实现与 Istio 服务网格的深度集成。
引言:为什么我们需要 Operator?
如果你是一名 Kubernetes 用户,一定经历过这样的困境:部署一个复杂应用(如数据库、消息队列、AI 训练任务),需要手动创建十几个甚至几十个 Kubernetes 原生资源——Deployment、Service、ConfigMap、Secret、PVC、HPA……更麻烦的是,这些资源之间存在复杂的依赖关系和配置约束,一旦某个参数配置错误,整个应用就会崩溃。
Operator 的核心价值,就是将这种"运维知识"代码化。
通过 Operator,我们可以:
- 声明式管理:用户只需描述"我想要一个 MySQL 集群",Operator 自动处理所有底层资源
- 自动化运维:故障自愈、滚动升级、备份恢复、扩缩容,全部自动化
- 知识沉淀:运维专家的经验被封装在代码中,普通开发者也能轻松使用
本文将从实践角度出发,带你一步步构建一个生产级 Operator,并探讨其与 Istio 服务网格的集成方案。
第一部分:Operator 核心概念深度解析
1.1 CRD 与 CR:扩展 Kubernetes API 的钥匙
CRD(Custom Resource Definition) 是 Kubernetes 提供的一种机制,允许用户定义自己的资源类型。一旦 CRD 被创建,Kubernetes API Server 就会为这个新资源类型提供完整的 RESTful API 支持。
# 一个简单的 CRD 定义示例
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: applications.myapp.example.com
spec:
group: myapp.example.com # API 组
names:
kind: Application # 资源类型名称
plural: applications # 复数形式
singular: application
shortNames: [app] # 简写
scope: Namespaced # 命名空间级别
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
replicas:
type: integer
minimum: 1
maximum: 100
image:
type: string
resources:
type: object
properties:
cpu:
type: string
memory:
type: string
status:
type: object
properties:
availableReplicas:
type: integer
conditions:
type: array
items:
type: object
properties:
type:
type: string
status:
type: string
message:
type: string
CR(Custom Resource) 是 CRD 的具体实例。当 CRD 创建后,我们就可以像使用 Pod、Deployment 一样使用自定义资源:
apiVersion: myapp.example.com/v1
kind: Application
metadata:
name: my-webapp
namespace: production
spec:
replicas: 3
image: nginx:1.25
resources:
cpu: "500m"
memory: "512Mi"
1.2 Controller:Operator 的"大脑"
如果说 CRD 定义了"期望状态",那么 Controller(控制器) 就是让现实向期望靠拢的执行引擎。
Controller 的核心工作模式是 Reconcile Loop(调谐循环):
┌─────────────────────────────────────────────────────────────┐
│ Reconcile Loop │
│ │
│ 1. Watch: 监听 CR 及相关资源的变化事件 │
│ ↓ │
│ 2. Get Current State: 读取当前集群状态 │
│ ↓ │
│ 3. Compare: 对比期望状态与当前状态 │
│ ↓ │
│ 4. Reconcile: 执行操作使状态趋于一致 │
│ ↓ │
│ 5. Update Status: 更新 CR 的状态字段 │
│ ↓ │
│ 6. Requeue: 如需要,将请求重新加入队列 │
└─────────────────────────────────────────────────────────────┘
一个关键设计原则:每个 CRD 对应一个 Controller。这保证了职责清晰,便于维护和扩展。
1.3 Operator = CRD + Controller + 运维知识
真正的 Operator 不仅仅是技术实现,更重要的是封装了领域专家的运维知识。以 etcd-operator 为例:
// etcd-operator 的核心调谐逻辑(简化版)
func (r *EtcdClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// 1. 获取期望状态
var cluster etcdv1.EtcdCluster
if err := r.Get(ctx, req.NamespacedName, &cluster); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// 2. 运维知识:检查是否需要初始化新集群
if cluster.Status.Phase == "" {
// 创建初始成员 Pod,配置 bootstrap
return r.initializeCluster(ctx, &cluster)
}
// 3. 运维知识:监控集群健康状态
healthy, err := r.checkClusterHealth(ctx, &cluster)
if err != nil {
return ctrl.Result{RequeueAfter: 10 * time.Second}, err
}
// 4. 运维知识:自动故障恢复
if !healthy {
return r.recoverUnhealthyMember(ctx, &cluster)
}
// 5. 运维知识:处理扩缩容
if len(cluster.Status.Members) != cluster.Spec.Replicas {
return r.resizeCluster(ctx, &cluster)
}
// 6. 运维知识:定期备份
if time.Since(cluster.Status.LastBackup.Time) > cluster.Spec.BackupInterval.Duration {
return r.performBackup(ctx, &cluster)
}
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
可以看到,真正的 Operator 价值在于将复杂的运维决策逻辑代码化,而不是简单的 CRUD 操作。
第二部分:开发环境搭建与项目初始化
2.1 工具链准备
在开始开发之前,确保以下工具已安装:
| 工具 | 版本要求 | 用途 |
|---|---|---|
| Go | 1.23+ | 开发语言 |
| Docker | 24.0+ | 容器构建与运行 |
| kubectl | 1.28+ | 集群交互 |
| Kubebuilder | 4.x | 项目脚手架 |
| kind/minikube | 最新版 | 本地 Kubernetes 集群 |
| make | - | 构建自动化 |
验证安装:
# 检查 Go 版本
go version
# go version go1.24.0 darwin/arm64
# 检查 Kubebuilder
kubebuilder version
# Version: main.version{KubeBuilderVersion:"4.0.0", KubernetesVendor:"1.31.0", GitCommit:"...", BuildDate:"...", GoOs:"darwin", GoArch:"arm64"}
# 创建本地测试集群
kind create cluster --name operator-demo
2.2 使用 Kubebuilder 初始化项目
Kubebuilder 是 Kubernetes 官方推荐的 Operator 开发脚手架工具,它遵循最佳实践生成项目结构:
# 创建项目目录
mkdir application-operator
cd application-operator
# 初始化项目(指定域名)
kubebuilder init --domain example.com --repo github.com/example/application-operator
# 创建 API(CRD + Controller)
kubebuilder create api --group myapp --version v1 --kind Application --resource --controller
# 项目结构
tree .
.
├── Dockerfile # 容器构建文件
├── Makefile # 构建脚本
├── PROJECT # Kubebuilder 项目元数据
├── README.md
├── cmd/
│ └── main.go # 程序入口
├── config/
│ ├── crd/ # CRD 定义
│ ├── default/ # 默认配置
│ ├── manager/ # Controller Manager 配置
│ ├── prometheus/ # 监控指标配置
│ ├── rbac/ # RBAC 权限配置
│ └── samples/ # 示例 CR
├── go.mod
├── go.sum
├── hack/
│ └── boilerplate.go.txt # 文件头模板
└── internal/
└── controller/
└── application_controller.go # Controller 实现
2.3 核心依赖解析
Kubebuilder 生成的项目依赖以下核心库:
// go.mod 关键依赖
require (
// Kubernetes 客户端库
k8s.io/client-go v0.31.0
// Kubernetes API machinery
k8s.io/apimachinery v0.31.0
// Controller Runtime - Operator 开发框架
sigs.k8s.io/controller-runtime v0.19.0
// API DSL
sigs.k8s.io/controller-tools v0.16.0
)
controller-runtime 是整个 Operator 开发的核心框架,它提供了:
- Manager:管理 Controller 生命周期,处理信号、健康检查
- Controller:实现 Watch 和 Reconcile 逻辑
- Cache:缓存 Kubernetes 资源,减少 API Server 压力
- Event Handling:事件过滤、分发机制
- Webhook:CRD 验证和默认值注入
第三部分:定义 CRD Schema
3.1 设计 Application CRD
让我们设计一个完整的 Application CRD,它将代表一个云原生应用的完整声明:
// api/v1/application_types.go
package v1
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// ApplicationSpec 定义期望状态
type ApplicationSpec struct {
// +kubebuilder:validation:Required
// +kubebuilder:validation:MinLength=1
// 镜像地址
Image string `json:"image"`
// +kubebuilder:validation:Minimum=1
// +kubebuilder:validation:Maximum=100
// +kubebuilder:default=1
// 副本数
Replicas *int32 `json:"replicas,omitempty"`
// 服务配置
Service ServiceSpec `json:"service,omitempty"`
// 资源限制
Resources corev1.ResourceRequirements `json:"resources,omitempty"`
// 环境变量
Env []corev1.EnvVar `json:"env,omitempty"`
// 配置挂载
ConfigMaps []ConfigMapMount `json:"configMaps,omitempty"`
// 持久化存储
Storage *StorageSpec `json:"storage,omitempty"`
// 健康检查
HealthCheck *HealthCheckSpec `json:"healthCheck,omitempty"`
// 自动扩缩容
Autoscaling *AutoscalingSpec `json:"autoscaling,omitempty"`
// 服务网格配置
ServiceMesh *ServiceMeshSpec `json:"serviceMesh,omitempty"`
}
// ServiceSpec 服务配置
type ServiceSpec struct {
// +kubebuilder:validation:Enum=ClusterIP;NodePort;LoadBalancer
// +kubebuilder:default=ClusterIP
Type corev1.ServiceType `json:"type,omitempty"`
// 端口配置
Ports []ServicePort `json:"ports,omitempty"`
}
type ServicePort struct {
Name string `json:"name"`
Port int32 `json:"port"`
// +kubebuilder:default=true
EnableIstioProxy bool `json:"enableIstioProxy,omitempty"`
}
// ConfigMapMount 配置挂载
type ConfigMapMount struct {
Name string `json:"name"`
MountPath string `json:"mountPath"`
ReadOnly bool `json:"readOnly,omitempty"`
}
// StorageSpec 持久化存储配置
type StorageSpec struct {
Size string `json:"size"`
StorageClass string `json:"storageClass,omitempty"`
AccessMode string `json:"accessMode,omitempty"`
}
// HealthCheckSpec 健康检查配置
type HealthCheckSpec struct {
LivenessProbe *corev1.Probe `json:"livenessProbe,omitempty"`
ReadinessProbe *corev1.Probe `json:"readinessProbe,omitempty"`
StartupProbe *corev1.Probe `json:"startupProbe,omitempty"`
}
// AutoscalingSpec HPA 配置
type AutoscalingSpec struct {
Enabled bool `json:"enabled,omitempty"`
MinReplicas int32 `json:"minReplicas,omitempty"`
MaxReplicas int32 `json:"maxReplicas,omitempty"`
TargetCPU int32 `json:"targetCPU,omitempty"`
TargetMemory int32 `json:"targetMemory,omitempty"`
}
// ServiceMeshSpec Istio 配置
type ServiceMeshSpec struct {
// 是否启用 Istio Sidecar 注入
Enabled bool `json:"enabled,omitempty"`
// 流量管理策略
TrafficPolicy *TrafficPolicy `json:"trafficPolicy,omitempty"`
// 金丝雀发布配置
Canary *CanarySpec `json:"canary,omitempty"`
}
type TrafficPolicy struct {
// 连接超时
ConnectionTimeout string `json:"connectionTimeout,omitempty"`
// 负载均衡策略
LoadBalancer string `json:"loadBalancer,omitempty"`
// 重试策略
RetryPolicy *RetryPolicy `json:"retryPolicy,omitempty"`
}
type RetryPolicy struct {
Attempts int32 `json:"attempts,omitempty"`
PerTryTimeout string `json:"perTryTimeout,omitempty"`
RetryOn string `json:"retryOn,omitempty"`
}
type CanarySpec struct {
Enabled bool `json:"enabled,omitempty"`
CanaryVersion string `json:"canaryVersion,omitempty"`
Weight int32 `json:"weight,omitempty"`
}
// ApplicationStatus 定义观察到的状态
type ApplicationStatus struct {
// 当前副本数
Replicas int32 `json:"replicas"`
// 可用副本数
AvailableReplicas int32 `json:"availableReplicas"`
// 当前阶段
// +kubebuilder:validation:Enum=Pending;Running;Failed;Updating
Phase string `json:"phase,omitempty"`
// 条件列表
Conditions []metav1.Condition `json:"conditions,omitempty"`
// 最后更新时间
LastUpdateTime metav1.Time `json:"lastUpdateTime,omitempty"`
// 服务端点
Endpoints []string `json:"endpoints,omitempty"`
}
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas
//+kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase"
//+kubebuilder:printcolumn:name="Replicas",type="integer",JSONPath=".status.replicas"
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
// Application is the Schema for the applications API
type Application struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec ApplicationSpec `json:"spec,omitempty"`
Status ApplicationStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
// ApplicationList contains a list of Application
type ApplicationList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []Application `json:"items"`
}
func init() {
SchemeBuilder.Register(&Application{}, &ApplicationList{})
}
3.2 Kubebuilder 注解详解
上述代码中大量使用了 Kubebuilder 注解(marker),它们会被 controller-tools 解析并生成对应的 CRD YAML:
| 注解 | 作用 |
|---|---|
+kubebuilder:validation:Required | 字段必填 |
+kubebuilder:validation:Minimum=1 | 数值最小值 |
+kubebuilder:validation:Enum=A;B;C | 枚举值限制 |
+kubebuilder:default=xxx | 默认值 |
+kubebuilder:subresource:status | 启用 status 子资源 |
+kubebuilder:subresource:scale | 启用 scale 子资源(支持 kubectl scale) |
+kubebuilder:printcolumn | kubectl get 时显示的列 |
生成 CRD:
make generate
make manifests
第四部分:Controller 核心逻辑实现
4.1 Reconcile 函数设计
Controller 的核心是 Reconcile 函数。一个良好的 Reconcile 函数应该遵循以下原则:
- 幂等性:多次执行结果相同
- 无状态:不依赖外部状态,所有信息从 API Server 读取
- 快速返回:阻塞操作使用 RequeueAfter
- 错误处理:区分可重试错误和不可重试错误
// internal/controller/application_controller.go
package controller
import (
"context"
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
myappv1 "github.com/example/application-operator/api/v1"
)
// ApplicationReconciler reconciles a Application object
type ApplicationReconciler struct {
client.Client
Scheme *runtime.Scheme
}
// RBAC 权限声明
//+kubebuilder:rbac:groups=myapp.example.com,resources=applications,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=myapp.example.com,resources=applications/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=myapp.example.com,resources=applications/finalizers,verbs=update
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=networking.istio.io,resources=virtualservices;destinationrules,verbs=get;list;watch;create;update;patch;delete
// Reconcile is part of the main kubernetes reconciliation loop
func (r *ApplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// 1. 获取 Application CR
var app myappv1.Application
if err := r.Get(ctx, req.NamespacedName, &app); err != nil {
if errors.IsNotFound(err) {
// CR 已被删除,清理工作由 Finalizer 处理
logger.Info("Application resource not found, ignoring")
return ctrl.Result{}, nil
}
// 其他错误,重试
logger.Error(err, "Failed to get Application")
return ctrl.Result{}, err
}
// 2. 检查是否正在删除
if !app.DeletionTimestamp.IsZero() {
// 执行清理逻辑
return r.handleDeletion(ctx, &app)
}
// 3. 确保 Finalizer 存在
if !controllerutil.ContainsFinalizer(&app, "myapp.example.com/finalizer") {
controllerutil.AddFinalizer(&app, "myapp.example.com/finalizer")
if err := r.Update(ctx, &app); err != nil {
return ctrl.Result{}, err
}
}
// 4. 初始化状态
if app.Status.Phase == "" {
app.Status.Phase = "Pending"
if err := r.Status().Update(ctx, &app); err != nil {
return ctrl.Result{}, err
}
}
// 5. 协调子资源
result, err := r.reconcileResources(ctx, &app)
if err != nil {
return result, err
}
// 6. 更新状态
if err := r.updateStatus(ctx, &app); err != nil {
return ctrl.Result{}, err
}
return result, nil
}
// reconcileResources 协调所有子资源
func (r *ApplicationReconciler) reconcileResources(ctx context.Context, app *myappv1.Application) (ctrl.Result, error) {
// 1. 确保 ConfigMap 存在
if err := r.reconcileConfigMaps(ctx, app); err != nil {
return ctrl.Result{}, err
}
// 2. 确保 PVC 存在
if app.Spec.Storage != nil {
if err := r.reconcilePVC(ctx, app); err != nil {
return ctrl.Result{}, err
}
}
// 3. 确保 Deployment 存在
if err := r.reconcileDeployment(ctx, app); err != nil {
return ctrl.Result{}, err
}
// 4. 确保 Service 存在
if err := r.reconcileService(ctx, app); err != nil {
return ctrl.Result{}, err
}
// 5. 确保 HPA 存在
if app.Spec.Autoscaling != nil && app.Spec.Autoscaling.Enabled {
if err := r.reconcileHPA(ctx, app); err != nil {
return ctrl.Result{}, err
}
}
// 6. 确保 Istio 资源存在
if app.Spec.ServiceMesh != nil && app.Spec.ServiceMesh.Enabled {
if err := r.reconcileIstioResources(ctx, app); err != nil {
return ctrl.Result{}, err
}
}
// 定期重新协调(监控健康状态)
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
4.2 Deployment 协调逻辑
// reconcileDeployment 确保 Deployment 与期望状态一致
func (r *ApplicationReconciler) reconcileDeployment(ctx context.Context, app *myappv1.Application) error {
logger := log.FromContext(ctx)
// 构建期望的 Deployment
desiredDeployment := r.buildDeployment(app)
// 设置 OwnerReference,实现级联删除
if err := controllerutil.SetControllerReference(app, desiredDeployment, r.Scheme); err != nil {
return err
}
// 获取现有 Deployment
var existingDeployment appsv1.Deployment
err := r.Get(ctx, types.NamespacedName{
Name: app.Name,
Namespace: app.Namespace,
}, &existingDeployment)
if errors.IsNotFound(err) {
// 不存在,创建
logger.Info("Creating Deployment", "name", app.Name)
return r.Create(ctx, desiredDeployment)
}
if err != nil {
return err
}
// 存在,检查是否需要更新
if !r.deploymentEqual(&existingDeployment, desiredDeployment) {
logger.Info("Updating Deployment", "name", app.Name)
existingDeployment.Spec = desiredDeployment.Spec
return r.Update(ctx, &existingDeployment)
}
return nil
}
// buildDeployment 构建 Deployment 对象
func (r *ApplicationReconciler) buildDeployment(app *myappv1.Application) *appsv1.Deployment {
replicas := int32(1)
if app.Spec.Replicas != nil {
replicas = *app.Spec.Replicas
}
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: app.Name,
Namespace: app.Namespace,
Labels: map[string]string{
"app.kubernetes.io/name": app.Name,
"app.kubernetes.io/managed-by": "application-operator",
},
},
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app.kubernetes.io/name": app.Name,
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app.kubernetes.io/name": app.Name,
},
// Istio Sidecar 注入
Annotations: r.buildPodAnnotations(app),
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "app",
Image: app.Spec.Image,
Env: app.Spec.Env,
Resources: corev1.ResourceRequirements{
Requests: app.Spec.Resources.Requests,
Limits: app.Spec.Resources.Limits,
},
VolumeMounts: r.buildVolumeMounts(app),
Ports: r.buildContainerPorts(app),
LivenessProbe: app.Spec.HealthCheck.LivenessProbe,
ReadinessProbe: app.Spec.HealthCheck.ReadinessProbe,
StartupProbe: app.Spec.HealthCheck.StartupProbe,
},
},
Volumes: r.buildVolumes(app),
},
},
},
}
return deployment
}
// buildPodAnnotations 构建 Pod 注解(包括 Istio 注入配置)
func (r *ApplicationReconciler) reconcileIstioResources(ctx context.Context, app *myappv1.Application) error {
logger := log.FromContext(ctx)
// 1. 创建 VirtualService
virtualService := r.buildVirtualService(app)
if err := controllerutil.SetControllerReference(app, virtualService, r.Scheme); err != nil {
return err
}
// ... 创建/更新 VirtualService 的逻辑
// 2. 创建 DestinationRule
destinationRule := r.buildDestinationRule(app)
// ... 创建/更新 DestinationRule 的逻辑
logger.Info("Istio resources reconciled", "application", app.Name)
return nil
}
4.3 状态更新与条件管理
// updateStatus 更新 Application 的状态
func (r *ApplicationReconciler) updateStatus(ctx context.Context, app *myappv1.Application) error {
// 获取关联的 Deployment
var deployment appsv1.Deployment
err := r.Get(ctx, types.NamespacedName{
Name: app.Name,
Namespace: app.Namespace,
}, &deployment)
if err != nil && !errors.IsNotFound(err) {
return err
}
if err == nil {
// 更新副本数状态
app.Status.Replicas = deployment.Status.Replicas
app.Status.AvailableReplicas = deployment.Status.AvailableReplicas
// 更新阶段
if deployment.Status.ReadyReplicas == deployment.Status.Replicas {
app.Status.Phase = "Running"
r.setCondition(app, "Ready", metav1.ConditionTrue, "All replicas are ready")
} else if deployment.Status.ReadyReplicas > 0 {
app.Status.Phase = "Updating"
r.setCondition(app, "Ready", metav1.ConditionFalse, "Some replicas are not ready")
} else {
app.Status.Phase = "Pending"
r.setCondition(app, "Ready", metav1.ConditionFalse, "No replicas are ready")
}
// 更新端点
app.Status.Endpoints = r.getEndpoints(ctx, app)
}
// 更新时间戳
app.Status.LastUpdateTime = metav1.Now()
return r.Status().Update(ctx, app)
}
// setCondition 设置状态条件
func (r *ApplicationReconciler) setCondition(app *myappv1.Application, conditionType string, status metav1.ConditionStatus, message string) {
condition := metav1.Condition{
Type: conditionType,
Status: status,
LastTransitionTime: metav1.Now(),
Reason: "Reconcile",
Message: message,
}
meta.SetStatusCondition(&app.Status.Conditions, condition)
}
4.4 SetupWithManager 配置
// SetupWithManager 注册 Controller 到 Manager
func (r *ApplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&myappv1.Application{}).
// 监听 Deployment 变化
Owns(&appsv1.Deployment{}).
// 监听 Service 变化
Owns(&corev1.Service{}).
// 监听 ConfigMap 变化
Owns(&corev1.ConfigMap{}).
// 监听 PVC 变化
Owns(&corev1.PersistentVolumeClaim{}).
// 过滤器:只处理有特定 label 的资源
WithEventFilter(predicate.ResourceVersionChangedPredicate{}).
Complete(r)
}
第五部分:与 Istio 服务网格集成
5.1 Istio 集成架构
当我们的 Application 启用 Istio 时,Operator 需要创建以下资源:
┌─────────────────────────────────────────────────────────────┐
│ Application CR │
│ spec: │
│ serviceMesh: │
│ enabled: true │
│ trafficPolicy: │
│ connectionTimeout: 10s │
│ canary: │
│ enabled: true │
│ canaryVersion: v2 │
│ weight: 20 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Operator 创建的资源 │
│ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Deployment │ │ Deployment │ │
│ │ (v1 stable) │ │ (v2 canary) │ │
│ └─────────────────┘ └─────────────────┘ │
│ │ │ │
│ └────────┬───────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Service │ │
│ │ selector: app=xxx (all versions) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ VirtualService │ │
│ │ routes: 80% -> v1, 20% -> v2 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ DestinationRule │ │
│ │ subsets: [v1 (stable), v2 (canary)] │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
5.2 VirtualService 构建
// buildVirtualService 构建 Istio VirtualService
func (r *ApplicationReconciler) buildVirtualService(app *myappv1.Application) *istionetworkingv1.VirtualService {
vs := &istionetworkingv1.VirtualService{
ObjectMeta: metav1.ObjectMeta{
Name: app.Name,
Namespace: app.Namespace,
Labels: map[string]string{
"app.kubernetes.io/name": app.Name,
},
},
Spec: istionetworkingv1.VirtualServiceSpec{
Hosts: []string{app.Name},
// 如果有 Gateway,添加到 Gateways
},
}
// 构建路由规则
if app.Spec.ServiceMesh.Canary != nil && app.Spec.ServiceMesh.Canary.Enabled {
// 金丝雀发布:按权重分流
vs.Spec.Http = []*istionetworkingv1.HTTPRoute{
{
Route: []*istionetworkingv1.HTTPRouteDestination{
{
Destination: &istionetworkingv1.Destination{
Host: app.Name,
Subset: "stable",
},
Weight: 100 - app.Spec.ServiceMesh.Canary.Weight,
},
{
Destination: &istionetworkingv1.Destination{
Host: app.Name,
Subset: "canary",
},
Weight: app.Spec.ServiceMesh.Canary.Weight,
},
},
},
}
} else {
// 正常流量:全部路由到 stable
vs.Spec.Http = []*istionetworkingv1.HTTPRoute{
{
Route: []*istionetworkingv1.HTTPRouteDestination{
{
Destination: &istionetworkingv1.Destination{
Host: app.Name,
Subset: "stable",
},
Weight: 100,
},
},
},
}
}
return vs
}
5.3 DestinationRule 构建
// buildDestinationRule 构建 Istio DestinationRule
func (r *ApplicationReconciler) buildDestinationRule(app *myappv1.Application) *istionetworkingv1.DestinationRule {
dr := &istionetworkingv1.DestinationRule{
ObjectMeta: metav1.ObjectMeta{
Name: app.Name,
Namespace: app.Namespace,
},
Spec: istionetworkingv1.DestinationRuleSpec{
Host: app.Name,
Subsets: []*istionetworkingv1.Subset{
{
Name: "stable",
Labels: map[string]string{
"version": "stable",
},
},
},
TrafficPolicy: r.buildIstioTrafficPolicy(app),
},
}
// 如果启用金丝雀发布,添加 canary subset
if app.Spec.ServiceMesh.Canary != nil && app.Spec.ServiceMesh.Canary.Enabled {
dr.Spec.Subsets = append(dr.Spec.Subsets, &istionetworkingv1.Subset{
Name: "canary",
Labels: map[string]string{
"version": app.Spec.ServiceMesh.Canary.CanaryVersion,
},
})
}
return dr
}
// buildIstioTrafficPolicy 构建 Istio 流量策略
func (r *ApplicationReconciler) buildIstioTrafficPolicy(app *myappv1.Application) *istionetworkingv1.TrafficPolicy {
if app.Spec.ServiceMesh.TrafficPolicy == nil {
return nil
}
policy := &istionetworkingv1.TrafficPolicy{}
// 连接超时
if app.Spec.ServiceMesh.TrafficPolicy.ConnectionTimeout != "" {
duration := durationpb.MustParse(app.Spec.ServiceMesh.TrafficPolicy.ConnectionTimeout)
policy.ConnectionPool = &istionetworkingv1.ConnectionPoolSettings{
Tcp: &istionetworkingv1.TCPSettings{
ConnectTimeout: duration,
},
}
}
// 负载均衡
if app.Spec.ServiceMesh.TrafficPolicy.LoadBalancer != "" {
policy.LoadBalancer = &istionetworkingv1.LoadBalancerSettings{
LbPolicy: toIstioLbPolicy(app.Spec.ServiceMesh.TrafficPolicy.LoadBalancer),
}
}
// 重试策略
if app.Spec.ServiceMesh.TrafficPolicy.RetryPolicy != nil {
retry := app.Spec.ServiceMesh.TrafficPolicy.RetryPolicy
policy.OutlierDetection = &istionetworkingv1.OutlierDetection{
ConsecutiveErrors: retry.Attempts,
}
}
return policy
}
5.4 金丝雀发布工作流
完整的金丝雀发布流程:
// promoteCanary 将金丝雀版本提升为稳定版本
func (r *ApplicationReconciler) PromoteCanary(ctx context.Context, app *myappv1.Application) error {
if app.Spec.ServiceMesh.Canary == nil || !app.Spec.ServiceMesh.Canary.Enabled {
return fmt.Errorf("canary not enabled")
}
logger := log.FromContext(ctx)
// 1. 更新 stable 版本的镜像
var deployment appsv1.Deployment
if err := r.Get(ctx, types.NamespacedName{Name: app.Name + "-stable", Namespace: app.Namespace}, &deployment); err != nil {
return err
}
canaryImage := app.Spec.ServiceMesh.Canary.CanaryVersion
deployment.Spec.Template.Spec.Containers[0].Image = canaryImage
if err := r.Update(ctx, &deployment); err != nil {
return err
}
// 2. 删除 canary Deployment
var canaryDeployment appsv1.Deployment
if err := r.Get(ctx, types.NamespacedName{Name: app.Name + "-canary", Namespace: app.Namespace}, &canaryDeployment); err == nil {
if err := r.Delete(ctx, &canaryDeployment); err != nil {
return err
}
}
// 3. 更新 VirtualService:100% 流量到 stable
var vs istionetworkingv1.VirtualService
if err := r.Get(ctx, types.NamespacedName{Name: app.Name, Namespace: app.Namespace}, &vs); err != nil {
return err
}
vs.Spec.Http[0].Route = []*istionetworkingv1.HTTPRouteDestination{
{
Destination: &istionetworkingv1.Destination{Host: app.Name, Subset: "stable"},
Weight: 100,
},
}
if err := r.Update(ctx, &vs); err != nil {
return err
}
// 4. 更新 DestinationRule:移除 canary subset
var dr istionetworkingv1.DestinationRule
if err := r.Get(ctx, types.NamespacedName{Name: app.Name, Namespace: app.Namespace}, &dr); err != nil {
return err
}
dr.Spec.Subsets = []*istionetworkingv1.Subset{
{Name: "stable", Labels: map[string]string{"version": "stable"}},
}
if err := r.Update(ctx, &dr); err != nil {
return err
}
// 5. 清除 CR 中的 canary 配置
app.Spec.ServiceMesh.Canary.Enabled = false
if err := r.Update(ctx, app); err != nil {
return err
}
logger.Info("Canary promoted successfully", "application", app.Name)
return nil
}
第六部分:Webhook 实现与验证
6.1 为什么需要 Webhook?
Webhook 提供两种能力:
- Defaulting(默认值注入):用户未指定某些字段时,自动填充默认值
- Validation(验证):拒绝不符合规范的 CR 创建/更新
6.2 创建 Webhook
kubebuilder create webhook --group myapp --version v1 --kind Application --defaulting --programmatic-validation
6.3 Defaulting Webhook
// api/v1/application_webhook.go
package v1
import (
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)
// log is for logging in this package.
var applicationlog = logf.Log.WithName("application-resource")
func (r *Application) SetupWebhookWithManager(mgr ctrl.Manager) error {
return ctrl.NewWebhookManagedBy(mgr).
For(r).
Complete()
}
//+kubebuilder:webhook:path=/mutate-myapp-example-com-v1-application,mutating=true,failurePolicy=fail,sideEffects=None,groups=myapp.example.com,resources=applications,verbs=create;update,versions=v1,name=mapplication.kb.io,admissionReviewVersions=v1
var _ webhook.Defaulter = &Application{}
// Default implements webhook.Defaulter so a webhook will be registered for the type
func (r *Application) Default() {
applicationlog.Info("default", "name", r.Name)
// 默认副本数
if r.Spec.Replicas == nil {
replicas := int32(1)
r.Spec.Replicas = &replicas
}
// 默认服务类型
if r.Spec.Service.Type == "" {
r.Spec.Service.Type = "ClusterIP"
}
// 默认服务端口
if len(r.Spec.Service.Ports) == 0 {
r.Spec.Service.Ports = []ServicePort{
{Name: "http", Port: 80, EnableIstioProxy: true},
}
}
// 默认健康检查
if r.Spec.HealthCheck == nil {
r.Spec.HealthCheck = &HealthCheckSpec{
ReadinessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/health",
Port: intstr.FromInt(80),
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 10,
},
}
}
// 默认 Istio Sidecar 注入
if r.Spec.ServiceMesh != nil && r.Spec.ServiceMesh.Enabled {
if r.Spec.ServiceMesh.TrafficPolicy == nil {
r.Spec.ServiceMesh.TrafficPolicy = &TrafficPolicy{
ConnectionTimeout: "10s",
LoadBalancer: "ROUND_ROBIN",
}
}
}
}
6.4 Validation Webhook
//+kubebuilder:webhook:path=/validate-myapp-example-com-v1-application,mutating=false,failurePolicy=fail,sideEffects=None,groups=myapp.example.com,resources=applications,verbs=create;update,versions=v1,name=vapplication.kb.io,admissionReviewVersions=v1
var _ webhook.Validator = &Application{}
// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func (r *Application) ValidateCreate() error {
applicationlog.Info("validate create", "name", r.Name)
return r.validateApplication()
}
// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (r *Application) ValidateUpdate(old runtime.Object) error {
applicationlog.Info("validate update", "name", r.Name)
// 检查不可变字段
oldApp := old.(*Application)
if oldApp.Spec.Storage != nil && r.Spec.Storage != nil {
if oldApp.Spec.Storage.Size != r.Spec.Storage.Size {
return fmt.Errorf("storage size is immutable")
}
}
return r.validateApplication()
}
// ValidateDelete implements webhook.Validator so a webhook will be registered for the type
func (r *Application) ValidateDelete() error {
applicationlog.Info("validate delete", "name", r.Name)
return nil
}
// validateApplication 验证 Application 的有效性
func (r *Application) validateApplication() error {
var allErrs field.ErrorList
// 验证镜像格式
if r.Spec.Image == "" {
allErrs = append(allErrs, field.Invalid(
field.NewPath("spec").Child("image"),
r.Spec.Image,
"image cannot be empty",
))
}
// 验证副本数范围
if r.Spec.Replicas != nil {
if *r.Spec.Replicas < 1 || *r.Spec.Replicas > 100 {
allErrs = append(allErrs, field.Invalid(
field.NewPath("spec").Child("replicas"),
*r.Spec.Replicas,
"replicas must be between 1 and 100",
))
}
}
// 验证金丝雀权重
if r.Spec.ServiceMesh != nil && r.Spec.ServiceMesh.Canary != nil {
if r.Spec.ServiceMesh.Canary.Weight < 0 || r.Spec.ServiceMesh.Canary.Weight > 100 {
allErrs = append(allErrs, field.Invalid(
field.NewPath("spec").Child("serviceMesh").Child("canary").Child("weight"),
r.Spec.ServiceMesh.Canary.Weight,
"canary weight must be between 0 and 100",
))
}
}
// 验证存储大小格式
if r.Spec.Storage != nil {
if _, err := resource.ParseQuantity(r.Spec.Storage.Size); err != nil {
allErrs = append(allErrs, field.Invalid(
field.NewPath("spec").Child("storage").Child("size"),
r.Spec.Storage.Size,
"invalid storage size format",
))
}
}
if len(allErrs) > 0 {
return apierrors.NewInvalid(
r.GroupVersionKind().GroupKind(),
r.Name,
allErrs,
)
}
return nil
}
第七部分:测试与调试
7.1 本地测试
使用 envtest 进行本地集成测试:
// internal/controller/application_controller_test.go
package controller
import (
"context"
"testing"
"time"
myappv1 "github.com/example/application-operator/api/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)
func TestApplicationReconciler(t *testing.T) {
logf.SetLogger(zap.New(zap.UseDevMode(true)))
// 启动 envtest
testEnv := &envtest.Environment{
CRDDirectoryPaths: []string{"../../config/crd/bases"},
ErrorIfCRDPathMissing: true,
}
cfg, err := testEnv.Start()
if err != nil {
t.Fatal(err)
}
defer testEnv.Stop()
// 创建 Scheme
scheme := runtime.NewScheme()
_ = myappv1.AddToScheme(scheme)
_ = corev1.AddToScheme(scheme)
// 创建 Manager 和 Controller
k8sClient, err := client.New(cfg, client.Options{Scheme: scheme})
if err != nil {
t.Fatal(err)
}
k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{Scheme: scheme})
if err != nil {
t.Fatal(err)
}
reconciler := &ApplicationReconciler{
Client: k8sManager.GetClient(),
Scheme: scheme,
}
if err = reconciler.SetupWithManager(k8sManager); err != nil {
t.Fatal(err)
}
go func() {
if err := k8sManager.Start(ctrl.SetupSignalHandler()); err != nil {
t.Error(err)
}
}()
// 测试用例
t.Run("should create deployment from application cr", func(t *testing.T) {
ctx := context.Background()
app := &myappv1.Application{
ObjectMeta: metav1.ObjectMeta{
Name: "test-app",
Namespace: "default",
},
Spec: myappv1.ApplicationSpec{
Image: "nginx:1.25",
Replicas: pointer.Int32(3),
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100m"),
corev1.ResourceMemory: resource.MustParse("128Mi"),
},
},
},
}
// 创建 Application
if err := k8sClient.Create(ctx, app); err != nil {
t.Fatal(err)
}
// 等待 Deployment 被创建
eventually(t, func() bool {
var deployment appsv1.Deployment
err := k8sClient.Get(ctx, types.NamespacedName{
Name: "test-app",
Namespace: "default",
}, &deployment)
return err == nil
}, 10*time.Second, 1*time.Second, "Deployment should be created")
// 验证 Deployment 的副本数
var deployment appsv1.Deployment
if err := k8sClient.Get(ctx, types.NamespacedName{
Name: "test-app",
Namespace: "default",
}, &deployment); err != nil {
t.Fatal(err)
}
if *deployment.Spec.Replicas != 3 {
t.Errorf("expected 3 replicas, got %d", *deployment.Spec.Replicas)
}
})
}
7.2 E2E 测试
// test/e2e/e2e_test.go
package e2e
import (
"context"
"testing"
"time"
myappv1 "github.com/example/application-operator/api/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)
func TestApplicationEndToEnd(t *testing.T) {
cfg, err := config.GetConfig()
if err != nil {
t.Skip("Skipping e2e test: no cluster config")
}
scheme := runtime.NewScheme()
_ = myappv1.AddToScheme(scheme)
k8sClient, err := client.New(cfg, client.Options{Scheme: scheme})
if err != nil {
t.Fatal(err)
}
ctx := context.Background()
appName := "e2e-test-app"
namespace := "default"
// 清理测试资源
defer func() {
app := &myappv1.Application{ObjectMeta: metav1.ObjectMeta{Name: appName, Namespace: namespace}}
_ = k8sClient.Delete(ctx, app)
}()
// 创建测试 Application
app := &myappv1.Application{
ObjectMeta: metav1.ObjectMeta{
Name: appName,
Namespace: namespace,
},
Spec: myappv1.ApplicationSpec{
Image: "nginx:1.25",
Replicas: pointer.Int32(2),
Service: myappv1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
Ports: []myappv1.ServicePort{
{Name: "http", Port: 80},
},
},
},
}
if err := k8sClient.Create(ctx, app); err != nil {
t.Fatal(err)
}
// 等待 Application 进入 Running 状态
eventually(t, func() bool {
if err := k8sClient.Get(ctx, types.NamespacedName{Name: appName, Namespace: namespace}, app); err != nil {
return false
}
return app.Status.Phase == "Running"
}, 5*time.Minute, 5*time.Second, "Application should reach Running state")
// 验证服务可用性
var svc corev1.Service
if err := k8sClient.Get(ctx, types.NamespacedName{Name: appName, Namespace: namespace}, &svc); err != nil {
t.Errorf("Service not found: %v", err)
}
}
第八部分:性能优化与最佳实践
8.1 控制并发度
func (r *ApplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&myappv1.Application{}).
Owns(&appsv1.Deployment{}).
Owns(&corev1.Service{}).
// 限制并发数,避免 API Server 过载
WithOptions(controller.Options{
MaxConcurrentReconciles: 10, // 并发协程数
RateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](
100*time.Millisecond, // 最小重试间隔
1*time.Minute, // 最大重试间隔
),
}).
Complete(r)
}
8.2 事件过滤优化
import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
// 只在 spec 变化时触发调谐
var specChangedPredicate = predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
oldObj := e.ObjectOld.(*myappv1.Application)
newObj := e.ObjectNew.(*myappv1.Application)
return !reflect.DeepEqual(oldObj.Spec, newObj.Spec)
},
CreateFunc: func(e event.CreateEvent) bool { return true },
DeleteFunc: func(e event.DeleteEvent) bool { return true },
GenericFunc: func(e event.GenericEvent) bool { return false },
}
func (r *ApplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&myappv1.Application{}, builder.WithPredicates(specChangedPredicate)).
Owns(&appsv1.Deployment{}).
Complete(r)
}
8.3 缓存优化
func (r *ApplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
// 只缓存需要的资源
return ctrl.NewControllerManagedBy(mgr).
For(&myappv1.Application{}).
Owns(&appsv1.Deployment{}).
Owns(&corev1.Service{}, builder.OnlyMetadata). // 只缓存元数据
Complete(r)
}
8.4 避免热循环
func (r *ApplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// ...
// 如果一切正常,设置合理的重新协调间隔
// 避免频繁重试
if app.Status.Phase == "Running" {
return ctrl.Result{RequeueAfter: 5 * time.Minute}, nil
}
// 正在更新中,更频繁检查
if app.Status.Phase == "Updating" {
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
// 错误状态,使用指数退避
return ctrl.Result{}, nil
}
8.5 指标监控
import (
"github.com/prometheus/client_golang/prometheus"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
)
var (
reconciliationCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "application_operator_reconciliations_total",
Help: "Total number of reconciliations",
},
[]string{"name", "namespace", "phase"},
)
reconciliationDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "application_operator_reconciliation_duration_seconds",
Help: "Duration of reconciliation",
Buckets: []float64{.1, .5, 1, 2, 5, 10},
},
[]string{"name", "namespace"},
)
)
func init() {
ctrlmetrics.Registry.MustRegister(reconciliationCount)
ctrlmetrics.Registry.MustRegister(reconciliationDuration)
}
func (r *ApplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
start := time.Now()
defer func() {
reconciliationDuration.WithLabelValues(req.Name, req.Namespace).Observe(time.Since(start).Seconds())
}()
// ... reconciliation logic
reconciliationCount.WithLabelValues(app.Name, app.Namespace, app.Status.Phase).Inc()
return ctrl.Result{}, nil
}
第九部分:部署与运维
9.1 构建与推送镜像
# 构建镜像
make docker-build IMG=example/application-operator:v1.0.0
# 推送镜像
make docker-push IMG=example/application-operator:v1.0.0
9.2 部署到集群
# 安装 CRD
make install
# 部署 Controller
make deploy IMG=example/application-operator:v1.0.0
# 查看部署状态
kubectl get pods -n application-operator-system
9.3 创建示例应用
# config/samples/myapp_v1_application.yaml
apiVersion: myapp.example.com/v1
kind: Application
metadata:
name: demo-app
namespace: default
spec:
image: nginx:1.25
replicas: 3
resources:
requests:
cpu: "100m"
memory: "128Mi"
limits:
cpu: "500m"
memory: "512Mi"
service:
type: ClusterIP
ports:
- name: http
port: 80
enableIstioProxy: true
healthCheck:
readinessProbe:
httpGet:
path: /
port: 80
initialDelaySeconds: 5
periodSeconds: 10
autoscaling:
enabled: true
minReplicas: 2
maxReplicas: 10
targetCPU: 70
serviceMesh:
enabled: true
trafficPolicy:
connectionTimeout: 10s
loadBalancer: ROUND_ROBIN
kubectl apply -f config/samples/myapp_v1_application.yaml
9.4 验证部署
# 查看 Application 状态
kubectl get application demo-app -o wide
# 查看创建的子资源
kubectl get all -l app.kubernetes.io/name=demo-app
# 查看详细状态
kubectl describe application demo-app
第十部分:总结与展望
10.1 核心要点回顾
通过本文,我们深入探讨了 Kubernetes Operator 开发的完整流程:
- 概念理解:CRD 定义期望状态,Controller 实现调谐逻辑,Operator = CRD + Controller + 运维知识
- 开发实践:使用 Kubebuilder 脚手架,基于 controller-runtime 框架,实现声明式 API
- Istio 集成:通过 VirtualService 和 DestinationRule 实现金丝雀发布和流量管理
- 质量保障:Webhook 验证、单元测试、E2E 测试
- 性能优化:并发控制、事件过滤、缓存策略、监控指标
10.2 生产级 Operator 的进阶方向
一个真正的生产级 Operator 还需要考虑:
| 领域 | 关键能力 |
|---|---|
| 可观测性 | Prometheus 指标、结构化日志、分布式追踪 |
| 高可用 | Leader Election、多副本部署、故障恢复 |
| 安全性 | RBAC 最小权限、Secret 加密、镜像签名验证 |
| 可扩展性 | Webhook 扩展、Plugin 机制、多版本支持 |
| 灾难恢复 | 备份恢复、跨集群迁移、数据一致性保障 |
10.3 学习资源
Operator 是云原生时代的运维编程范式。它将运维专家的知识和经验封装在代码中,让复杂的分布式系统管理变得简单、可靠、可重复。掌握 Operator 开发,意味着你已经具备了将任何复杂系统"Kubernetes 化"的能力——这是云原生架构师的核心技能之一。
作者:程序员茄子
发布时间:2026年4月23日
字数:约 12000 字