Skip to content

Kubernetes Operators: Extending Kubernetes with Custom Resources and Controllers

Kubernetes has revolutionized container orchestration, but managing complex stateful applications requires more than just deployments and services. Enter Kubernetes Operators—a method of packaging, deploying, and managing applications that extends Kubernetes’ capabilities through custom resources and intelligent controllers. This comprehensive guide will teach you how to build operators that automate operational knowledge into software.

Understanding Kubernetes Operators

An Operator is a software extension to Kubernetes that uses custom resources to manage applications and their components. Operators follow Kubernetes principles, notably the control loop pattern, to maintain the desired state of your applications.

The Operator Pattern

┌─────────────────────────────────────────────────────┐
│                   Kubernetes API                     │
└────────────────┬───────────────┬────────────────────┘
                 │               │
         ┌───────▼───────┐       │
         │   Custom      │       │
         │  Resource     │       │
         │ Definition    │       │
         └───────┬───────┘       │
                 │               │
         ┌───────▼───────────────▼────────┐
         │         Controller              │
         │  ┌─────────────────────────┐   │
         │  │   Reconciliation Loop   │   │
         │  │  1. Observe            │   │
         │  │  2. Analyze            │   │
         │  │  3. Act                │   │
         │  └─────────────────────────┘   │
         └─────────────┬──────────────────┘

              ┌────────▼────────┐
              │   Application   │
              │   Resources     │
              └─────────────────┘

Creating Custom Resource Definitions (CRDs)

CRDs allow you to store and retrieve structured data in Kubernetes. Let’s create a CRD for a database cluster:

# database-crd.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: databases.example.com
spec:
  group: example.com
  versions:
    - name: v1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              required: ['engine', 'version', 'replicas']
              properties:
                engine:
                  type: string
                  enum: ['postgres', 'mysql', 'mongodb']
                version:
                  type: string
                replicas:
                  type: integer
                  minimum: 1
                  maximum: 9
                storage:
                  type: object
                  properties:
                    size:
                      type: string
                      pattern: '^[0-9]+Gi$'
                    class:
                      type: string
                backup:
                  type: object
                  properties:
                    enabled:
                      type: boolean
                    schedule:
                      type: string
                    retention:
                      type: integer
            status:
              type: object
              properties:
                phase:
                  type: string
                  enum: ['Creating', 'Running', 'Failed', 'Deleting']
                ready:
                  type: boolean
                replicas:
                  type: integer
                endpoint:
                  type: string
                conditions:
                  type: array
                  items:
                    type: object
                    properties:
                      type:
                        type: string
                      status:
                        type: string
                      reason:
                        type: string
                      message:
                        type: string
                      lastTransitionTime:
                        type: string
      additionalPrinterColumns:
        - name: Engine
          type: string
          jsonPath: .spec.engine
        - name: Version
          type: string
          jsonPath: .spec.version
        - name: Replicas
          type: integer
          jsonPath: .spec.replicas
        - name: Status
          type: string
          jsonPath: .status.phase
        - name: Age
          type: date
          jsonPath: .metadata.creationTimestamp
  scope: Namespaced
  names:
    plural: databases
    singular: database
    kind: Database
    shortNames:
      - db

Building a Controller with Kubebuilder

Let’s build a controller using Kubebuilder, a framework for building Kubernetes APIs:

1. Initialize the Project

# @filename: script.sh
# Install kubebuilder
curl -L -o kubebuilder https://go.kubebuilder.io/dl/latest/$(go env GOOS)/$(go env GOARCH)
chmod +x kubebuilder && mv kubebuilder /usr/local/bin/

# Create project
mkdir database-operator && cd database-operator
kubebuilder init --domain example.com --repo github.com/example/database-operator

# Create API
kubebuilder create api --group apps --version v1 --kind Database

2. Define the API Types

// @filename: models.py
// api/v1/database_types.go
package v1


    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// DatabaseSpec defines the desired state of Database
type DatabaseSpec struct {
    // Engine is the database engine to use
    // +kubebuilder:validation:Enum=postgres;mysql;mongodb
    Engine string `json:"engine"`

    // Version is the database version
    Version string `json:"version"`

    // Replicas is the number of database instances
    // +kubebuilder:validation:Minimum=1
    // +kubebuilder:validation:Maximum=9
    Replicas int32 `json:"replicas"`

    // Storage configuration
    Storage StorageSpec `json:"storage,omitempty"`

    // Backup configuration
    Backup BackupSpec `json:"backup,omitempty"`

    // Resources for the database pods
    Resources corev1.ResourceRequirements `json:"resources,omitempty"`
}

type StorageSpec struct {
    // Size of the persistent volume
    // +kubebuilder:validation:Pattern="^[0-9]+Gi$"
    Size string `json:"size,omitempty"`

    // StorageClass to use
    Class string `json:"class,omitempty"`
}

type BackupSpec struct {
    // Enabled determines if backups are enabled
    Enabled bool `json:"enabled,omitempty"`

    // Schedule in cron format
    Schedule string `json:"schedule,omitempty"`

    // Retention days
    Retention int32 `json:"retention,omitempty"`
}

// DatabaseStatus defines the observed state of Database
type DatabaseStatus struct {
    // Phase represents the current phase of database
    Phase DatabasePhase `json:"phase,omitempty"`

    // Ready indicates if the database is ready
    Ready bool `json:"ready,omitempty"`

    // Replicas is the number of ready replicas
    Replicas int32 `json:"replicas,omitempty"`

    // Endpoint for database connection
    Endpoint string `json:"endpoint,omitempty"`

    // Conditions represent the latest available observations
    Conditions []metav1.Condition `json:"conditions,omitempty"`
}

// +kubebuilder:validation:Enum=Creating;Running;Failed;Deleting
type DatabasePhase string

const (
    DatabasePhaseCreating DatabasePhase = "Creating"
    DatabasePhaseRunning  DatabasePhase = "Running"
    DatabasePhaseFailed   DatabasePhase = "Failed"
    DatabasePhaseDeleting DatabasePhase = "Deleting"
)

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Engine",type=string,JSONPath=`.spec.engine`
// +kubebuilder:printcolumn:name="Version",type=string,JSONPath=`.spec.version`
// +kubebuilder:printcolumn:name="Replicas",type=integer,JSONPath=`.spec.replicas`
// +kubebuilder:printcolumn:name="Status",type=string,JSONPath=`.status.phase`
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=`.metadata.creationTimestamp`

// Database is the Schema for the databases API
type Database struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec   DatabaseSpec   `json:"spec,omitempty"`
    Status DatabaseStatus `json:"status,omitempty"`
}

3. Implement the Controller

// @filename: main.go
// controllers/database_controller.go
package controllers


    "context"
    "fmt"

    "github.com/go-logr/logr"
    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/errors"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/types"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

    appsv1 "github.com/example/database-operator/api/v1"
)

// DatabaseReconciler reconciles a Database object
type DatabaseReconciler struct {
    client.Client
    Log    logr.Logger
    Scheme *runtime.Scheme
}

// +kubebuilder:rbac:groups=apps.example.com,resources=databases,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps.example.com,resources=databases/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps.example.com,resources=databases/finalizers,verbs=update
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch;create;update;patch;delete

func (r *DatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    log := r.Log.WithValues("database", req.NamespacedName)

    // Fetch the Database instance
    database := &appsv1.Database{}
    if err := r.Get(ctx, req.NamespacedName, database); err != nil {
        if errors.IsNotFound(err) {
            log.Info("Database resource not found. Ignoring since object must be deleted")
            return ctrl.Result{}, nil
        }
        log.Error(err, "Failed to get Database")
        return ctrl.Result{}, err
    }

    // Add finalizer for graceful cleanup
    finalizerName := "database.example.com/finalizer"
    if database.ObjectMeta.DeletionTimestamp.IsZero() {
        if !controllerutil.ContainsFinalizer(database, finalizerName) {
            controllerutil.AddFinalizer(database, finalizerName)
            if err := r.Update(ctx, database); err != nil {
                return ctrl.Result{}, err
            }
        }
    } else {
        // Handle deletion
        if controllerutil.ContainsFinalizer(database, finalizerName) {
            if err := r.deleteExternalResources(database); err != nil {
                return ctrl.Result{}, err
            }

            controllerutil.RemoveFinalizer(database, finalizerName)
            if err := r.Update(ctx, database); err != nil {
                return ctrl.Result{}, err
            }
        }
        return ctrl.Result{}, nil
    }

    // Update status phase
    database.Status.Phase = appsv1.DatabasePhaseCreating
    if err := r.Status().Update(ctx, database); err != nil {
        log.Error(err, "Failed to update Database status")
        return ctrl.Result{}, err
    }

    // Create or update ConfigMap
    configMap := r.configMapForDatabase(database)
    if err := ctrl.SetControllerReference(database, configMap, r.Scheme); err != nil {
        return ctrl.Result{}, err
    }

    foundConfigMap := &corev1.ConfigMap{}
    err := r.Get(ctx, types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace}, foundConfigMap)
    if err != nil && errors.IsNotFound(err) {
        log.Info("Creating a new ConfigMap", "ConfigMap.Namespace", configMap.Namespace, "ConfigMap.Name", configMap.Name)
        err = r.Create(ctx, configMap)
        if err != nil {
            return ctrl.Result{}, err
        }
    } else if err != nil {
        return ctrl.Result{}, err
    }

    // Create or update Service
    service := r.serviceForDatabase(database)
    if err := ctrl.SetControllerReference(database, service, r.Scheme); err != nil {
        return ctrl.Result{}, err
    }

    foundService := &corev1.Service{}
    err = r.Get(ctx, types.NamespacedName{Name: service.Name, Namespace: service.Namespace}, foundService)
    if err != nil && errors.IsNotFound(err) {
        log.Info("Creating a new Service", "Service.Namespace", service.Namespace, "Service.Name", service.Name)
        err = r.Create(ctx, service)
        if err != nil {
            return ctrl.Result{}, err
        }
    } else if err != nil {
        return ctrl.Result{}, err
    }

    // Create or update StatefulSet
    statefulSet := r.statefulSetForDatabase(database)
    if err := ctrl.SetControllerReference(database, statefulSet, r.Scheme); err != nil {
        return ctrl.Result{}, err
    }

    foundStatefulSet := &appsv1.StatefulSet{}
    err = r.Get(ctx, types.NamespacedName{Name: statefulSet.Name, Namespace: statefulSet.Namespace}, foundStatefulSet)
    if err != nil && errors.IsNotFound(err) {
        log.Info("Creating a new StatefulSet", "StatefulSet.Namespace", statefulSet.Namespace, "StatefulSet.Name", statefulSet.Name)
        err = r.Create(ctx, statefulSet)
        if err != nil {
            return ctrl.Result{}, err
        }
        return ctrl.Result{Requeue: true}, nil
    } else if err != nil {
        return ctrl.Result{}, err
    }

    // Update StatefulSet if needed
    if foundStatefulSet.Spec.Replicas != &database.Spec.Replicas {
        foundStatefulSet.Spec.Replicas = &database.Spec.Replicas
        err = r.Update(ctx, foundStatefulSet)
        if err != nil {
            return ctrl.Result{}, err
        }
        return ctrl.Result{Requeue: true}, nil
    }

    // Update status
    database.Status.Replicas = foundStatefulSet.Status.ReadyReplicas
    database.Status.Ready = foundStatefulSet.Status.ReadyReplicas == database.Spec.Replicas
    if database.Status.Ready {
        database.Status.Phase = appsv1.DatabasePhaseRunning
        database.Status.Endpoint = fmt.Sprintf("%s.%s.svc.cluster.local:%d",
            service.Name, database.Namespace, r.getPort(database))
    }

    // Update conditions
    r.updateConditions(database, foundStatefulSet)

    if err := r.Status().Update(ctx, database); err != nil {
        log.Error(err, "Failed to update Database status")
        return ctrl.Result{}, err
    }

    return ctrl.Result{}, nil
}

func (r *DatabaseReconciler) statefulSetForDatabase(db *appsv1.Database) *appsv1.StatefulSet {
    labels := map[string]string{
        "app":      "database",
        "database": db.Name,
        "engine":   db.Spec.Engine,
    }

    replicas := db.Spec.Replicas

    statefulSet := &appsv1.StatefulSet{
        ObjectMeta: metav1.ObjectMeta{
            Name:      db.Name + "-" + db.Spec.Engine,
            Namespace: db.Namespace,
            Labels:    labels,
        },
        Spec: appsv1.StatefulSetSpec{
            Replicas: &replicas,
            Selector: &metav1.LabelSelector{
                MatchLabels: labels,
            },
            ServiceName: db.Name + "-" + db.Spec.Engine,
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{
                    Labels: labels,
                },
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{{
                        Name:  db.Spec.Engine,
                        Image: r.getImage(db),
                        Ports: []corev1.ContainerPort{{
                            ContainerPort: r.getPort(db),
                            Name:          db.Spec.Engine,
                        }},
                        VolumeMounts: []corev1.VolumeMount{{
                            Name:      "data",
                            MountPath: r.getDataPath(db),
                        }},
                        Env: r.getEnvVars(db),
                        Resources: db.Spec.Resources,
                    }},
                },
            },
            VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{
                ObjectMeta: metav1.ObjectMeta{
                    Name: "data",
                },
                Spec: corev1.PersistentVolumeClaimSpec{
                    AccessModes: []corev1.PersistentVolumeAccessMode{
                        corev1.ReadWriteOnce,
                    },
                    StorageClassName: &db.Spec.Storage.Class,
                    Resources: corev1.ResourceRequirements{
                        Requests: corev1.ResourceList{
                            corev1.ResourceStorage: resource.MustParse(db.Spec.Storage.Size),
                        },
                    },
                },
            }},
        },
    }

    return statefulSet
}

func (r *DatabaseReconciler) getImage(db *appsv1.Database) string {
    switch db.Spec.Engine {
    case "postgres":
        return fmt.Sprintf("postgres:%s", db.Spec.Version)
    case "mysql":
        return fmt.Sprintf("mysql:%s", db.Spec.Version)
    case "mongodb":
        return fmt.Sprintf("mongo:%s", db.Spec.Version)
    default:
        return ""
    }
}

func (r *DatabaseReconciler) getPort(db *appsv1.Database) int32 {
    switch db.Spec.Engine {
    case "postgres":
        return 5432
    case "mysql":
        return 3306
    case "mongodb":
        return 27017
    default:
        return 0
    }
}

func (r *DatabaseReconciler) getDataPath(db *appsv1.Database) string {
    switch db.Spec.Engine {
    case "postgres":
        return "/var/lib/postgresql/data"
    case "mysql":
        return "/var/lib/mysql"
    case "mongodb":
        return "/data/db"
    default:
        return "/data"
    }
}

func (r *DatabaseReconciler) SetupWithManager(mgr ctrl.Manager) error {
    return ctrl.NewControllerManagedBy(mgr).
        For(&appsv1.Database{}).
        Owns(&appsv1.StatefulSet{}).
        Owns(&corev1.Service{}).
        Owns(&corev1.ConfigMap{}).
        Complete(r)
}

Advanced Operator Patterns

1. Backup Controller

// @filename: main.go
// controllers/backup_controller.go
package controllers


    "context"
    "time"

    batchv1 "k8s.io/api/batch/v1"
    batchv1beta1 "k8s.io/api/batch/v1beta1"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func (r *DatabaseReconciler) ensureBackupCronJob(ctx context.Context, db *appsv1.Database) error {
    if !db.Spec.Backup.Enabled {
        // Delete CronJob if backup is disabled
        cronJob := &batchv1beta1.CronJob{}
        err := r.Get(ctx, types.NamespacedName{
            Name:      db.Name + "-backup",
            Namespace: db.Namespace,
        }, cronJob)

        if err == nil {
            return r.Delete(ctx, cronJob)
        }
        return nil
    }

    cronJob := &batchv1beta1.CronJob{
        ObjectMeta: metav1.ObjectMeta{
            Name:      db.Name + "-backup",
            Namespace: db.Namespace,
        },
        Spec: batchv1beta1.CronJobSpec{
            Schedule: db.Spec.Backup.Schedule,
            JobTemplate: batchv1beta1.JobTemplateSpec{
                Spec: batchv1.JobSpec{
                    Template: corev1.PodTemplateSpec{
                        Spec: corev1.PodSpec{
                            RestartPolicy: corev1.RestartPolicyOnFailure,
                            Containers: []corev1.Container{{
                                Name:  "backup",
                                Image: r.getBackupImage(db),
                                Command: r.getBackupCommand(db),
                                Env: []corev1.EnvVar{
                                    {
                                        Name:  "DATABASE_HOST",
                                        Value: db.Status.Endpoint,
                                    },
                                    {
                                        Name: "DATABASE_PASSWORD",
                                        ValueFrom: &corev1.EnvVarSource{
                                            SecretKeyRef: &corev1.SecretKeySelector{
                                                LocalObjectReference: corev1.LocalObjectReference{
                                                    Name: db.Name + "-secret",
                                                },
                                                Key: "password",
                                            },
                                        },
                                    },
                                    {
                                        Name:  "S3_BUCKET",
                                        Value: "database-backups",
                                    },
                                    {
                                        Name:  "RETENTION_DAYS",
                                        Value: fmt.Sprintf("%d", db.Spec.Backup.Retention),
                                    },
                                },
                            }},
                        },
                    },
                },
            },
        },
    }

    if err := ctrl.SetControllerReference(db, cronJob, r.Scheme); err != nil {
        return err
    }

    found := &batchv1beta1.CronJob{}
    err := r.Get(ctx, types.NamespacedName{Name: cronJob.Name, Namespace: cronJob.Namespace}, found)
    if err != nil && errors.IsNotFound(err) {
        return r.Create(ctx, cronJob)
    }

    return err
}

func (r *DatabaseReconciler) getBackupCommand(db *appsv1.Database) []string {
    timestamp := time.Now().Format("20060102-150405")

    switch db.Spec.Engine {
    case "postgres":
        return []string{
            "/bin/bash",
            "-c",
            fmt.Sprintf(`
                pg_dump -h $DATABASE_HOST -U postgres -d postgres | \
                gzip | \
                aws s3 cp - s3://$S3_BUCKET/%s/backup-%s.sql.gz
            `, db.Name, timestamp),
        }
    case "mysql":
        return []string{
            "/bin/bash",
            "-c",
            fmt.Sprintf(`
                mysqldump -h $DATABASE_HOST -u root -p$DATABASE_PASSWORD --all-databases | \
                gzip | \
                aws s3 cp - s3://$S3_BUCKET/%s/backup-%s.sql.gz
            `, db.Name, timestamp),
        }
    case "mongodb":
        return []string{
            "/bin/bash",
            "-c",
            fmt.Sprintf(`
                mongodump --uri="mongodb://$DATABASE_HOST" --archive | \
                gzip | \
                aws s3 cp - s3://$S3_BUCKET/%s/backup-%s.archive.gz
            `, db.Name, timestamp),
        }
    default:
        return []string{}
    }
}

2. Monitoring Integration

// @filename: main.go
// controllers/monitoring.go
package controllers


    monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
)

func (r *DatabaseReconciler) ensureServiceMonitor(ctx context.Context, db *appsv1.Database) error {
    serviceMonitor := &monitoringv1.ServiceMonitor{
        ObjectMeta: metav1.ObjectMeta{
            Name:      db.Name + "-monitor",
            Namespace: db.Namespace,
            Labels: map[string]string{
                "app":      "database",
                "database": db.Name,
            },
        },
        Spec: monitoringv1.ServiceMonitorSpec{
            Selector: metav1.LabelSelector{
                MatchLabels: map[string]string{
                    "app":      "database",
                    "database": db.Name,
                },
            },
            Endpoints: []monitoringv1.Endpoint{
                {
                    Port:     "metrics",
                    Interval: "30s",
                    Path:     "/metrics",
                },
            },
        },
    }

    if err := ctrl.SetControllerReference(db, serviceMonitor, r.Scheme); err != nil {
        return err
    }

    found := &monitoringv1.ServiceMonitor{}
    err := r.Get(ctx, types.NamespacedName{
        Name:      serviceMonitor.Name,
        Namespace: serviceMonitor.Namespace,
    }, found)

    if err != nil && errors.IsNotFound(err) {
        return r.Create(ctx, serviceMonitor)
    }

    return err
}

// Prometheus rules for alerting
func (r *DatabaseReconciler) ensurePrometheusRule(ctx context.Context, db *appsv1.Database) error {
    rule := &monitoringv1.PrometheusRule{
        ObjectMeta: metav1.ObjectMeta{
            Name:      db.Name + "-rules",
            Namespace: db.Namespace,
        },
        Spec: monitoringv1.PrometheusRuleSpec{
            Groups: []monitoringv1.RuleGroup{
                {
                    Name: db.Name + "-alerts",
                    Rules: []monitoringv1.Rule{
                        {
                            Alert: "DatabaseDown",
                            Expr:  intstr.FromString(fmt.Sprintf(`up{job="%s-monitor"} == 0`, db.Name)),
                            For:   "5m",
                            Labels: map[string]string{
                                "severity": "critical",
                                "database": db.Name,
                            },
                            Annotations: map[string]string{
                                "summary":     "Database {{ $labels.database }} is down",
                                "description": "Database {{ $labels.database }} has been down for more than 5 minutes.",
                            },
                        },
                        {
                            Alert: "DatabaseHighConnections",
                            Expr:  intstr.FromString(fmt.Sprintf(`database_connections{database="%s"} > 80`, db.Name)),
                            For:   "10m",
                            Labels: map[string]string{
                                "severity": "warning",
                                "database": db.Name,
                            },
                            Annotations: map[string]string{
                                "summary":     "High connection count on {{ $labels.database }}",
                                "description": "Database {{ $labels.database }} has more than 80 active connections.",
                            },
                        },
                    },
                },
            },
        },
    }

    if err := ctrl.SetControllerReference(db, rule, r.Scheme); err != nil {
        return err
    }

    return r.Create(ctx, rule)
}

3. Webhook Validation

// @filename: main.go
// api/v1/database_webhook.go
package v1


    "k8s.io/apimachinery/pkg/runtime"
    ctrl "sigs.k8s.io/controller-runtime"
    logf "sigs.k8s.io/controller-runtime/pkg/log"
    "sigs.k8s.io/controller-runtime/pkg/webhook"
)

var databaselog = logf.Log.WithName("database-resource")

func (r *Database) SetupWebhookWithManager(mgr ctrl.Manager) error {
    return ctrl.NewWebhookManagedBy(mgr).
        For(r).
        Complete()
}

// +kubebuilder:webhook:path=/mutate-apps-example-com-v1-database,mutating=true,failurePolicy=fail,sideEffects=None,groups=apps.example.com,resources=databases,verbs=create;update,versions=v1,name=mdatabase.kb.io,admissionReviewVersions={v1,v1beta1}

var _ webhook.Defaulter = &Database{}

// Default implements webhook.Defaulter
func (r *Database) Default() {
    databaselog.Info("default", "name", r.Name)

    // Set default values
    if r.Spec.Storage.Size == "" {
        r.Spec.Storage.Size = "10Gi"
    }

    if r.Spec.Storage.Class == "" {
        r.Spec.Storage.Class = "standard"
    }

    if r.Spec.Backup.Schedule == "" && r.Spec.Backup.Enabled {
        r.Spec.Backup.Schedule = "0 2 * * *" // 2 AM daily
    }

    if r.Spec.Backup.Retention == 0 && r.Spec.Backup.Enabled {
        r.Spec.Backup.Retention = 7
    }
}

// +kubebuilder:webhook:path=/validate-apps-example-com-v1-database,mutating=false,failurePolicy=fail,sideEffects=None,groups=apps.example.com,resources=databases,verbs=create;update,versions=v1,name=vdatabase.kb.io,admissionReviewVersions={v1,v1beta1}

var _ webhook.Validator = &Database{}

// ValidateCreate implements webhook.Validator
func (r *Database) ValidateCreate() error {
    databaselog.Info("validate create", "name", r.Name)

    // Validate engine-specific constraints
    switch r.Spec.Engine {
    case "postgres":
        if r.Spec.Version < "12" {
            return fmt.Errorf("PostgreSQL version must be 12 or higher")
        }
    case "mysql":
        if r.Spec.Version < "8.0" {
            return fmt.Errorf("MySQL version must be 8.0 or higher")
        }
    case "mongodb":
        if r.Spec.Version < "4.4" {
            return fmt.Errorf("MongoDB version must be 4.4 or higher")
        }
    }

    // Validate storage size
    size, err := resource.ParseQuantity(r.Spec.Storage.Size)
    if err != nil {
        return fmt.Errorf("invalid storage size: %v", err)
    }

    minSize := resource.MustParse("1Gi")
    if size.Cmp(minSize) < 0 {
        return fmt.Errorf("storage size must be at least 1Gi")
    }

    return nil
}

// ValidateUpdate implements webhook.Validator
func (r *Database) ValidateUpdate(old runtime.Object) error {
    databaselog.Info("validate update", "name", r.Name)

    oldDB := old.(*Database)

    // Prevent engine changes
    if r.Spec.Engine != oldDB.Spec.Engine {
        return fmt.Errorf("database engine cannot be changed")
    }

    // Prevent downgrades
    if r.Spec.Version < oldDB.Spec.Version {
        return fmt.Errorf("database version cannot be downgraded")
    }

    // Prevent storage shrinking
    newSize, _ := resource.ParseQuantity(r.Spec.Storage.Size)
    oldSize, _ := resource.ParseQuantity(oldDB.Spec.Storage.Size)

    if newSize.Cmp(oldSize) < 0 {
        return fmt.Errorf("storage size cannot be reduced")
    }

    return nil
}

// ValidateDelete implements webhook.Validator
func (r *Database) ValidateDelete() error {
    databaselog.Info("validate delete", "name", r.Name)

    // Could add logic to prevent deletion of databases with active connections
    // or require confirmation annotation

    if r.Annotations["confirm-delete"] != "true" {
        return fmt.Errorf("database deletion requires 'confirm-delete: true' annotation")
    }

    return nil
}

Testing Your Operator

1. Unit Tests

// @filename: main.go
// controllers/database_controller_test.go
package controllers


    "context"
    "time"

    . "github.com/onsi/ginkgo"
    . "github.com/onsi/gomega"
    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/types"

    dbv1 "github.com/example/database-operator/api/v1"
)

var _ = Describe("Database Controller", func() {
    const (
        DatabaseName      = "test-database"
        DatabaseNamespace = "default"

        timeout  = time.Second * 10
        duration = time.Second * 10
        interval = time.Millisecond * 250
    )

    Context("When creating Database", func() {
        It("Should create StatefulSet and Service", func() {
            ctx := context.Background()
            database := &dbv1.Database{
                ObjectMeta: metav1.ObjectMeta{
                    Name:      DatabaseName,
                    Namespace: DatabaseNamespace,
                },
                Spec: dbv1.DatabaseSpec{
                    Engine:   "postgres",
                    Version:  "14",
                    Replicas: 3,
                    Storage: dbv1.StorageSpec{
                        Size:  "10Gi",
                        Class: "fast-ssd",
                    },
                },
            }

            Expect(k8sClient.Create(ctx, database)).Should(Succeed())

            databaseLookupKey := types.NamespacedName{Name: DatabaseName, Namespace: DatabaseNamespace}
            createdDatabase := &dbv1.Database{}

            Eventually(func() bool {
                err := k8sClient.Get(ctx, databaseLookupKey, createdDatabase)
                return err == nil
            }, timeout, interval).Should(BeTrue())

            // Check StatefulSet creation
            Eventually(func() bool {
                statefulSet := &appsv1.StatefulSet{}
                err := k8sClient.Get(ctx, types.NamespacedName{
                    Name:      DatabaseName + "-postgres",
                    Namespace: DatabaseNamespace,
                }, statefulSet)

                if err != nil {
                    return false
                }

                return *statefulSet.Spec.Replicas == 3
            }, timeout, interval).Should(BeTrue())

            // Check Service creation
            Eventually(func() bool {
                service := &corev1.Service{}
                err := k8sClient.Get(ctx, types.NamespacedName{
                    Name:      DatabaseName + "-postgres",
                    Namespace: DatabaseNamespace,
                }, service)

                return err == nil
            }, timeout, interval).Should(BeTrue())

            // Check status update
            Eventually(func() string {
                err := k8sClient.Get(ctx, databaseLookupKey, createdDatabase)
                if err != nil {
                    return ""
                }
                return string(createdDatabase.Status.Phase)
            }, timeout, interval).Should(Equal(string(dbv1.DatabasePhaseCreating)))
        })
    })
})

2. Integration Tests

// @filename: handlers.go
// test/e2e/database_test.go
package e2e


    "fmt"
    "testing"
    "time"

    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
)

func TestDatabaseOperator(t *testing.T) {
    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    if err != nil {
        t.Fatal(err)
    }

    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        t.Fatal(err)
    }

    // Create test database
    database := &dbv1.Database{
        ObjectMeta: metav1.ObjectMeta{
            Name:      "e2e-test-db",
            Namespace: "default",
        },
        Spec: dbv1.DatabaseSpec{
            Engine:   "postgres",
            Version:  "14",
            Replicas: 1,
            Storage: dbv1.StorageSpec{
                Size: "5Gi",
            },
            Backup: dbv1.BackupSpec{
                Enabled:   true,
                Schedule:  "*/5 * * * *", // Every 5 minutes for testing
                Retention: 1,
            },
        },
    }

    // Create database
    _, err = dbClient.Create(database)
    if err != nil {
        t.Fatal(err)
    }

    // Wait for database to be ready
    err = waitForDatabaseReady(dbClient, database.Name, 5*time.Minute)
    if err != nil {
        t.Fatal(err)
    }

    // Test database connection
    endpoint, err := getDatabaseEndpoint(dbClient, database.Name)
    if err != nil {
        t.Fatal(err)
    }

    err = testDatabaseConnection(endpoint)
    if err != nil {
        t.Fatal(err)
    }

    // Test backup creation
    err = waitForBackupJob(clientset, database.Name, 10*time.Minute)
    if err != nil {
        t.Fatal(err)
    }

    // Clean up
    err = dbClient.Delete(database.Name, &metav1.DeleteOptions{})
    if err != nil {
        t.Fatal(err)
    }
}

Deployment and Operations

1. Building and Publishing

# @filename: Dockerfile
# Dockerfile
FROM golang:1.19 as builder

WORKDIR /workspace
COPY go.mod go.sum ./
RUN go mod download

COPY main.go main.go
COPY api/ api/
COPY controllers/ controllers/

RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o manager main.go

FROM gcr.io/distroless/static:nonroot
WORKDIR /
COPY --from=builder /workspace/manager .
USER 65532:65532

ENTRYPOINT ["/manager"]

2. Deployment Manifest

# config/manager/manager.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: database-operator-system

apiVersion: apps/v1
kind: Deployment
metadata:
  name: database-operator-controller-manager
  namespace: database-operator-system
spec:
  selector:
    matchLabels:
      control-plane: controller-manager
  replicas: 1
  template:
    metadata:
      labels:
        control-plane: controller-manager
    spec:
      securityContext:
        runAsNonRoot: true
      containers:
        - command:
            - /manager
          args:
            - --leader-elect
          image: database-operator:latest
          name: manager
          securityContext:
            allowPrivilegeEscalation: false
          livenessProbe:
            httpGet:
              path: /healthz
              port: 8081
            initialDelaySeconds: 15
            periodSeconds: 20
          readinessProbe:
            httpGet:
              path: /readyz
              port: 8081
            initialDelaySeconds: 5
            periodSeconds: 10
          resources:
            limits:
              cpu: 500m
              memory: 128Mi
            requests:
              cpu: 10m
              memory: 64Mi
      serviceAccountName: database-operator-controller-manager
      terminationGracePeriodSeconds: 10

3. Monitoring with Prometheus

# Prometheus ServiceMonitor for operator metrics
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: database-operator-metrics
  namespace: database-operator-system
spec:
  endpoints:
    - path: /metrics
      port: https
      scheme: https
      tlsConfig:
        insecureSkipVerify: true
  selector:
    matchLabels:
      control-plane: controller-manager

Best Practices

  1. Resource Ownership: Always set owner references to ensure proper garbage collection
  2. Status Updates: Keep status separate from spec and update it regularly
  3. Idempotency: Ensure reconciliation logic is idempotent
  4. Error Handling: Use exponential backoff for retries
  5. Observability: Export metrics and use structured logging
  6. Testing: Write comprehensive unit and integration tests
  7. Security: Run with minimal privileges and use RBAC properly

Conclusion

Kubernetes Operators represent a powerful pattern for extending Kubernetes to manage complex applications. By encoding operational knowledge into software, operators automate tasks that would otherwise require manual intervention. Whether you’re managing databases, message queues, or custom applications, the operator pattern provides a robust framework for cloud-native automation.

The combination of Custom Resource Definitions and controllers gives you the flexibility to define your own abstractions while leveraging Kubernetes’ proven reconciliation loop pattern. As you build operators, remember that the goal is to make the complex simple—abstracting away operational complexity while providing a declarative API that fits naturally into the Kubernetes ecosystem.

Kubernetes Container Orchestration DevOps
Share:

Continue Reading