Skip to content

Serverless WASM Functions in Kubernetes: FaaS with WebAssembly Performance

Serverless WASM Functions in Kubernetes: FaaS with WebAssembly Performance

The serverless paradigm reaches new heights when combined with WebAssembly’s performance characteristics. By deploying WASM functions in Kubernetes, we achieve the best of both worlds: serverless simplicity with near-native performance and sub-millisecond cold starts. This comprehensive guide explores building production-ready serverless WASM platforms on Kubernetes.

Table of Contents

  1. Serverless WASM Architecture
  2. Platform Comparison and Selection
  3. Knative with WASM Runtime
  4. OpenFaaS WASM Implementation
  5. Custom Serverless Platform
  6. Function Development Patterns
  7. Event-Driven Architecture
  8. Auto-Scaling and Performance
  9. Security and Isolation
  10. Monitoring and Observability
  11. CI/CD for Serverless WASM
  12. Production Operations

Serverless WASM Architecture

Traditional vs WASM Serverless Comparison

Traditional Serverless Architecture:
┌─────────────────────────────────────────────────┐
│                  API Gateway                    │
├─────────────────────────────────────────────────┤
│           Function Runtime (Node.js)            │
│  ┌─────────────┐  ┌─────────────┐  ┌──────────┐ │
│  │   Cold      │  │    Warm     │  │  Scale   │ │
│  │ Start: 2s   │  │ Start: 50ms │  │ Up: 30s  │ │
│  │ Mem: 128MB  │  │ Mem: 128MB  │  │          │ │
│  └─────────────┘  └─────────────┘  └──────────┘ │
├─────────────────────────────────────────────────┤
│            Container Runtime (Docker)           │
├─────────────────────────────────────────────────┤
│               Kubernetes/VMs                    │
└─────────────────────────────────────────────────┘

WASM Serverless Architecture:
┌─────────────────────────────────────────────────┐
│                  API Gateway                    │
├─────────────────────────────────────────────────┤
│             WASM Function Runtime               │
│  ┌─────────────┐  ┌─────────────┐  ┌──────────┐ │
│  │   Cold      │  │    Warm     │  │  Scale   │ │
│  │ Start: 1ms  │  │ Start: <1ms │  │ Up: <1s  │ │
│  │ Mem: 2MB    │  │ Mem: 2MB    │  │          │ │
│  └─────────────┘  └─────────────┘  └──────────┘ │
├─────────────────────────────────────────────────┤
│           WASM Runtime (Wasmtime)               │
├─────────────────────────────────────────────────┤
│               Kubernetes Nodes                  │
└─────────────────────────────────────────────────┘

Function Execution Model

# WASM Function Execution Lifecycle
execution_lifecycle:
  trigger_received:
    - event: 'HTTP request, queue message, timer'
    - latency: '0ms'

  function_lookup:
    - action: 'Find function definition'
    - latency: '<1ms'

  runtime_initialization:
    - cold_start: '1-5ms (WASM instantiation)'
    - warm_start: '<1ms (existing instance)'

  function_execution:
    - wasm_module_load: 'Pre-compiled, instant'
    - memory_allocation: 'Linear memory, isolated'
    - execution: 'Near-native performance'

  response_handling:
    - serialization: 'Minimal overhead'
    - cleanup: 'Automatic memory reclamation'

  scaling_decisions:
    - scale_up_trigger: 'Queue depth > 0'
    - scale_down_trigger: 'Idle for 30s'
    - scale_speed: 'Instantaneous'

Resource Allocation Model

WASM Function Resource Allocation:
┌─────────────────────────────────────────────────┐
│                 Node Resources                  │
│  ┌─────────────────────────────────────────────┐│
│  │            Available: 32GB RAM              ││
│  │  ┌────┐┌────┐┌────┐┌────┐┌────┐┌────┐       ││
│  │  │2MB ││2MB ││2MB ││2MB ││2MB ││2MB │ ...   ││
│  │  │F1  ││F2  ││F3  ││F4  ││F5  ││F6  │       ││
│  │  └────┘└────┘└────┘└────┘└────┘└────┘       ││
│  │                                             ││
│  │  Theoretical Max: ~16,000 concurrent        ││
│  │  Practical Max: ~8,000 concurrent           ││
│  └─────────────────────────────────────────────┘│
└─────────────────────────────────────────────────┘

Traditional Container Functions:
┌─────────────────────────────────────────────────┐
│                 Node Resources                  │
│  ┌─────────────────────────────────────────────┐│
│  │            Available: 32GB RAM              ││
│  │  ┌────────┐┌────────┐┌────────┐             ││
│  │  │ 128MB  ││ 128MB  ││ 128MB  │             ││
│  │  │   F1   ││   F2   ││   F3   │             ││
│  │  └────────┘└────────┘└────────┘             ││
│  │                                             ││
│  │  Theoretical Max: ~256 concurrent           ││
│  │  Practical Max: ~100 concurrent             ││
│  └─────────────────────────────────────────────┘│
└─────────────────────────────────────────────────┘

Platform Comparison and Selection

Feature Matrix

FeatureKnative + WASMOpenFaaS + WASMCustom Platform
Cold Start<5ms<10ms<1ms
Kubernetes Native
Auto-scalingCustom
Event Sources20+10+Custom
Multi-tenancyCustom
ObservabilityRichGoodCustom
Learning CurveMediumLowHigh
Production ReadyDepends
CommunityLargeMediumNone

Decision Framework

platform_selection:
  choose_knative_when:
    - need_enterprise_features: true
    - team_size: '> 10 developers'
    - compliance_requirements: 'strict'
    - event_sources: 'diverse (Kafka, Cloud Events, etc.)'
    - budget: 'high'

  choose_openfaas_when:
    - simplicity_priority: true
    - team_size: '< 10 developers'
    - quick_deployment: true
    - event_sources: 'basic (HTTP, queues)'
    - budget: 'medium'

  choose_custom_when:
    - unique_requirements: true
    - performance_critical: true
    - full_control_needed: true
    - team_expertise: 'high'
    - budget: 'flexible'

Knative with WASM Runtime

Installation and Setup

# @filename: models.py
# Install Knative Serving
kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.12.0/serving-crds.yaml
kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.12.0/serving-core.yaml

# Install Knative Eventing
kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.12.0/eventing-crds.yaml
kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.12.0/eventing-core.yaml

# Install networking layer (Kourier)
kubectl apply -f https://github.com/knative/net-kourier/releases/download/knative-v1.12.0/kourier.yaml

# Configure Knative to use Kourier
kubectl patch configmap/config-network \
  --namespace knative-serving \
  --type merge \
  --patch '{"data":{"ingress-class":"kourier.ingress.networking.knative.dev"}}'

# Install WASM runtime class
kubectl apply -f https://github.com/spinkube/spin-operator/releases/latest/download/spin-operator.runtime-class.yaml

WASM Runtime Configuration

# wasm-runtime-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: config-features
  namespace: knative-serving
data:
  # Enable WASM runtime support
  kubernetes.podspec-runtimeclassname: 'enabled'
  kubernetes.podspec-securitycontext: 'enabled'

apiVersion: v1
kind: ConfigMap
metadata:
  name: config-deployment
  namespace: knative-serving
data:
  # WASM-specific deployment settings
  progressDeadline: '120s'
  registriesSkippingTagResolving: 'ko.local,dev.local'

  # Queue proxy settings for WASM
  queueSidecarImage: 'gcr.io/knative-releases/knative.dev/serving/cmd/queue@sha256:...'

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: config-autoscaler
  namespace: knative-serving
data:
  # Optimized for WASM cold starts
  enable-scale-to-zero: 'true'
  scale-to-zero-grace-period: '30s'
  stable-window: '60s'

  # WASM-optimized scaling
  max-scale-up-rate: '1000.0' # WASM can scale very quickly
  max-scale-down-rate: '2.0'
  target-burst-capacity: '200'

  # Concurrency settings
  container-concurrency-target-default: '100'
  container-concurrency-target-percentage: '70'

  # Panic thresholds (lower for WASM due to fast startup)
  panic-window-percentage: '10.0'
  panic-threshold-percentage: '200.0'

WASM Function Service

# wasm-function.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: image-processor
  namespace: default
spec:
  template:
    metadata:
      annotations:
        # Use WASM runtime
        autoscaling.knative.dev/class: 'kpa.autoscaling.knative.dev'
        autoscaling.knative.dev/target: '100'
        autoscaling.knative.dev/minScale: '0'
        autoscaling.knative.dev/maxScale: '1000'

        # WASM-specific optimizations
        run.googleapis.com/cpu-throttling: 'false'
        run.googleapis.com/execution-environment: 'gen2'
    spec:
      # Use WASM runtime class
      runtimeClassName: wasmtime

      containers:
        - name: function
          image: registry.example.com/image-processor-wasm:v1.0.0

          ports:
            - containerPort: 8080
              protocol: TCP

          env:
            - name: RUST_LOG
              value: 'info'
            - name: MAX_IMAGE_SIZE
              value: '10MB'
            - name: SUPPORTED_FORMATS
              value: 'jpeg,png,webp'

          resources:
            requests:
              cpu: '50m'
              memory: '32Mi'
            limits:
              cpu: '1000m'
              memory: '128Mi'

          # Health checks optimized for WASM
          livenessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 1
            periodSeconds: 10

          readinessProbe:
            httpGet:
              path: /ready
              port: 8080
            initialDelaySeconds: 1
            periodSeconds: 1

          # Security context for WASM
          securityContext:
            runAsNonRoot: true
            runAsUser: 65534
            readOnlyRootFilesystem: true
            allowPrivilegeEscalation: false
            capabilities:
              drop:
                - ALL

---
# Traffic routing
apiVersion: serving.knative.dev/v1
kind: Route
metadata:
  name: image-processor-route
  namespace: default
spec:
  traffic:
    - revisionName: image-processor-v1
      percent: 90
    - revisionName: image-processor-v2
      percent: 10
      tag: canary

Event-Driven Function Triggers

# event-triggers.yaml
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: image-upload-trigger
  namespace: default
spec:
  broker: default
  filter:
    attributes:
      type: com.example.storage.upload
      source: /storage/bucket/images
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: image-processor

---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: scheduled-cleanup-trigger
  namespace: default
spec:
  broker: default
  filter:
    attributes:
      type: dev.knative.sources.ping
      source: /cleanup/scheduler
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: cleanup-function

---
# Kafka event source
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: user-events-source
  namespace: default
spec:
  consumerGroup: image-processor-group
  bootstrapServers:
    - kafka-broker-1.kafka.svc.cluster.local:9092
    - kafka-broker-2.kafka.svc.cluster.local:9092
  topics:
    - user-uploads
    - user-deletions
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: image-processor

OpenFaaS WASM Implementation

OpenFaaS Installation with WASM Support

# @filename: script.sh
# Install OpenFaaS
curl -sL https://cli.openfaas.com | sudo sh

# Create OpenFaaS namespace
kubectl apply -f https://raw.githubusercontent.com/openfaas/faas-netes/master/namespaces.yml

# Add OpenFaaS Helm repository
helm repo add openfaas https://openfaas.github.io/faas-netes/
helm repo update

# Install OpenFaaS with WASM support
helm upgrade openfaas \
  --install openfaas/openfaas \
  --namespace openfaas \
  --set functionNamespace=openfaas-fn \
  --set generateBasicAuth=true \
  --set serviceType=LoadBalancer \
  --set faasIdler.dryRun=false \
  --set prometheus.create=true \
  --set alertmanager.create=true

WASM Function Template

# template/wasm-rust/template.yml
language: wasm-rust
fprocess: ./handler
welcome_message: |
  You have successfully created a new function which uses WASM runtime.

  Languages supported: Rust -> WASM

  Features:
  - Fast cold starts (<5ms)
  - Memory efficient (<10MB)
  - Secure sandboxing
  - Near-native performance
# @filename: Dockerfile
# template/wasm-rust/Dockerfile
FROM --platform=${BUILDPLATFORM:-linux/amd64} rust:1.75 as builder

ARG TARGETPLATFORM
ARG BUILDPLATFORM

WORKDIR /usr/src/app
COPY Cargo.toml Cargo.lock ./
RUN mkdir src && echo "fn main() {}" > src/main.rs
RUN rustup target add wasm32-wasi
RUN cargo build --target wasm32-wasi --release

COPY src ./src
RUN touch src/main.rs
RUN cargo build --target wasm32-wasi --release

# Optimize WASM binary
RUN curl -L https://github.com/WebAssembly/binaryen/releases/download/version_116/binaryen-version_116-x86_64-linux.tar.gz | tar xz
RUN ./binaryen-version_116/bin/wasm-opt -Os target/wasm32-wasi/release/handler.wasm -o handler.wasm

FROM scratch
COPY --from=builder /usr/src/app/handler.wasm ./handler
CMD ["./handler"]

WASM Function Implementation

// @filename: main.rs
// src/main.rs - Image processing function
use std::io::{self, Read};
use image::{ImageFormat, DynamicImage, ImageBuffer, Rgba};
use serde::{Deserialize, Serialize};
use base64::{Engine as _, engine::general_purpose};

#[derive(Deserialize)]
struct FunctionInput {
    image_data: String,      // Base64 encoded image
    operation: String,       // resize, rotate, blur, etc.
    parameters: Parameters,
}

#[derive(Deserialize)]
struct Parameters {
    width: Option<u32>,
    height: Option<u32>,
    angle: Option<f32>,
    blur_radius: Option<f32>,
    quality: Option<u8>,
}

#[derive(Serialize)]
struct FunctionOutput {
    success: bool,
    image_data: Option<String>,
    error: Option<String>,
    metadata: ImageMetadata,
}

#[derive(Serialize)]
struct ImageMetadata {
    original_size: (u32, u32),
    processed_size: (u32, u32),
    format: String,
    processing_time_ms: u64,
}

fn main() {
    let mut input = String::new();
    io::stdin().read_to_string(&mut input).expect("Failed to read input");

    let start_time = std::time::Instant::now();

    let result = match serde_json::from_str::<FunctionInput>(&input) {
        Ok(request) => process_image(request, start_time),
        Err(e) => FunctionOutput {
            success: false,
            image_data: None,
            error: Some(format!("Invalid input: {}", e)),
            metadata: ImageMetadata {
                original_size: (0, 0),
                processed_size: (0, 0),
                format: "unknown".to_string(),
                processing_time_ms: start_time.elapsed().as_millis() as u64,
            },
        },
    };

    println!("{}", serde_json::to_string(&result).unwrap());
}

fn process_image(input: FunctionInput, start_time: std::time::Instant) -> FunctionOutput {
    // Decode base64 image
    let image_bytes = match general_purpose::STANDARD.decode(&input.image_data) {
        Ok(bytes) => bytes,
        Err(e) => return error_response(format!("Base64 decode error: {}", e), start_time),
    };

    // Load image
    let mut image = match image::load_from_memory(&image_bytes) {
        Ok(img) => img,
        Err(e) => return error_response(format!("Image load error: {}", e), start_time),
    };

    let original_size = (image.width(), image.height());

    // Apply requested operation
    image = match input.operation.as_str() {
        "resize" => resize_image(image, &input.parameters),
        "rotate" => rotate_image(image, &input.parameters),
        "blur" => blur_image(image, &input.parameters),
        "grayscale" => Ok(image.grayscale()),
        "flip_horizontal" => Ok(image.fliph()),
        "flip_vertical" => Ok(image.flipv()),
        "brighten" => brighten_image(image, &input.parameters),
        "contrast" => adjust_contrast(image, &input.parameters),
        _ => Err(format!("Unsupported operation: {}", input.operation)),
    }.unwrap_or_else(|e| {
        eprintln!("Processing error: {}", e);
        image // Return original on error
    });

    let processed_size = (image.width(), image.height());

    // Encode result
    let mut output_bytes = Vec::new();
    let format = ImageFormat::Png; // Default to PNG for lossless

    if let Err(e) = image.write_to(&mut std::io::Cursor::new(&mut output_bytes), format) {
        return error_response(format!("Image encode error: {}", e), start_time);
    }

    let encoded_image = general_purpose::STANDARD.encode(&output_bytes);

    FunctionOutput {
        success: true,
        image_data: Some(encoded_image),
        error: None,
        metadata: ImageMetadata {
            original_size,
            processed_size,
            format: "png".to_string(),
            processing_time_ms: start_time.elapsed().as_millis() as u64,
        },
    }
}

fn resize_image(image: DynamicImage, params: &Parameters) -> Result<DynamicImage, String> {
    let width = params.width.ok_or("Width parameter required for resize")?;
    let height = params.height.ok_or("Height parameter required for resize")?;

    Ok(image.resize(width, height, image::imageops::FilterType::Lanczos3))
}

fn rotate_image(image: DynamicImage, params: &Parameters) -> Result<DynamicImage, String> {
    let angle = params.angle.ok_or("Angle parameter required for rotate")?;

    match angle as i32 {
        90 | -270 => Ok(image.rotate90()),
        180 | -180 => Ok(image.rotate180()),
        270 | -90 => Ok(image.rotate270()),
        _ => Err(format!("Unsupported rotation angle: {}. Use 90, 180, or 270", angle)),
    }
}

fn blur_image(image: DynamicImage, params: &Parameters) -> Result<DynamicImage, String> {
    let radius = params.blur_radius.ok_or("Blur radius parameter required")?;

    if radius < 0.0 || radius > 100.0 {
        return Err("Blur radius must be between 0 and 100".to_string());
    }

    Ok(image.blur(radius))
}

fn brighten_image(image: DynamicImage, params: &Parameters) -> Result<DynamicImage, String> {
    // Use blur_radius as brightness value for simplicity
    let brightness = params.blur_radius.unwrap_or(0.0) as i32;

    Ok(image.brighten(brightness))
}

fn adjust_contrast(image: DynamicImage, params: &Parameters) -> Result<DynamicImage, String> {
    // Use blur_radius as contrast value for simplicity
    let contrast = params.blur_radius.unwrap_or(1.0);

    if contrast < 0.0 {
        return Err("Contrast must be positive".to_string());
    }

    Ok(image.adjust_contrast(contrast))
}

fn error_response(error: String, start_time: std::time::Instant) -> FunctionOutput {
    FunctionOutput {
        success: false,
        image_data: None,
        error: Some(error),
        metadata: ImageMetadata {
            original_size: (0, 0),
            processed_size: (0, 0),
            format: "unknown".to_string(),
            processing_time_ms: start_time.elapsed().as_millis() as u64,
        },
    }
}

Function Configuration

# stack.yml
provider:
  name: openfaas
  gateway: http://127.0.0.1:8080

functions:
  image-processor:
    lang: wasm-rust
    handler: ./image-processor
    image: registry.example.com/image-processor-wasm:latest

    # WASM-specific configuration
    constraints:
      - 'kubernetes.io/arch=amd64'

    # Resource limits optimized for WASM
    limits:
      memory: '128Mi'
      cpu: '1000m'
    requests:
      memory: '32Mi'
      cpu: '50m'

    # Environment variables
    environment:
      RUST_LOG: 'info'
      MAX_IMAGE_SIZE: '10485760' # 10MB
      ALLOWED_FORMATS: 'jpeg,png,gif,webp'

    # Annotations for WASM runtime
    annotations:
      prometheus.io/scrape: 'true'
      prometheus.io/path: '/metrics'
      prometheus.io/port: '8080'

    # Labels
    labels:
      runtime: 'wasm'
      language: 'rust'
      category: 'image-processing'

  # Additional functions
  thumbnail-generator:
    lang: wasm-rust
    handler: ./thumbnail-generator
    image: registry.example.com/thumbnail-generator-wasm:latest

    environment:
      THUMBNAIL_SIZES: '150x150,300x300,600x600'

  image-metadata-extractor:
    lang: wasm-rust
    handler: ./metadata-extractor
    image: registry.example.com/metadata-extractor-wasm:latest

    environment:
      EXTRACT_GPS: 'true'
      EXTRACT_CAMERA_INFO: 'true'

Deployment and Management

# @filename: script.sh
# Build and deploy functions
faas-cli template pull https://github.com/openfaas/templates
faas-cli build -f stack.yml
faas-cli push -f stack.yml
faas-cli deploy -f stack.yml

# Test function
echo '{"image_data":"iVBORw0KGgoAAAANSUhEUgAA...","operation":"resize","parameters":{"width":300,"height":200}}' | \
faas-cli invoke image-processor

# Scale function
faas-cli scale image-processor --replicas=10

# Monitor function
faas-cli logs image-processor --since=5m --follow

# Update function
faas-cli deploy -f stack.yml --update=true

Custom Serverless Platform

Platform Architecture

// @filename: main.rs
// src/main.rs - Custom serverless platform controller
use k8s_openapi::api::apps::v1::{Deployment, DeploymentSpec};
use k8s_openapi::api::core::v1::{
    Pod, PodSpec, Container, Service, ServiceSpec, ServicePort,
    EnvVar, ResourceRequirements, ResourceList,
};
use kube::{
    Api, Client,
    api::{Patch, PatchParams, ListParams, PostParams, DeleteParams},
    runtime::{Controller, controller::Action, watcher::Config},
    CustomResource, Resource, ResourceExt,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::collections::BTreeMap;
use tokio::time::Duration;
use tracing::*;

#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)]
#[kube(group = "serverless.example.com", version = "v1", kind = "WasmFunction")]
#[kube(namespaced)]
pub struct WasmFunctionSpec {
    pub image: String,
    pub runtime: String,
    pub min_replicas: Option<i32>,
    pub max_replicas: Option<i32>,
    pub target_cpu: Option<i32>,
    pub target_memory: Option<i32>,
    pub timeout: Option<String>,
    pub environment: Option<Vec<EnvVar>>,
    pub triggers: Vec<FunctionTrigger>,
    pub resources: Option<ResourceRequirements>,
}

#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct FunctionTrigger {
    pub trigger_type: String,  // http, kafka, cron, etc.
    pub config: BTreeMap<String, String>,
}

#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct WasmFunctionStatus {
    pub replicas: Option<i32>,
    pub ready_replicas: Option<i32>,
    pub conditions: Vec<FunctionCondition>,
    pub endpoint: Option<String>,
}

#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct FunctionCondition {
    pub condition_type: String,
    pub status: String,
    pub reason: Option<String>,
    pub message: Option<String>,
}

impl WasmFunction {
    pub fn to_deployment(&self) -> Deployment {
        let name = self.name_any();
        let namespace = self.namespace().unwrap_or_default();

        let mut labels = BTreeMap::new();
        labels.insert("app".to_string(), name.clone());
        labels.insert("runtime".to_string(), "wasm".to_string());
        labels.insert("managed-by".to_string(), "wasm-serverless".to_string());

        let container = Container {
            name: "function".to_string(),
            image: Some(self.spec.image.clone()),
            ports: Some(vec![k8s_openapi::api::core::v1::ContainerPort {
                container_port: 8080,
                protocol: Some("TCP".to_string()),
                ..Default::default()
            }]),
            env: self.spec.environment.clone(),
            resources: self.spec.resources.clone(),
            security_context: Some(k8s_openapi::api::core::v1::SecurityContext {
                run_as_non_root: Some(true),
                run_as_user: Some(65534),
                read_only_root_filesystem: Some(true),
                allow_privilege_escalation: Some(false),
                capabilities: Some(k8s_openapi::api::core::v1::Capabilities {
                    drop: Some(vec!["ALL".to_string()]),
                    ..Default::default()
                }),
                ..Default::default()
            }),
            liveness_probe: Some(k8s_openapi::api::core::v1::Probe {
                http_get: Some(k8s_openapi::api::core::v1::HTTPGetAction {
                    path: Some("/health".to_string()),
                    port: k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(8080),
                    ..Default::default()
                }),
                initial_delay_seconds: Some(1),
                period_seconds: Some(10),
                ..Default::default()
            }),
            readiness_probe: Some(k8s_openapi::api::core::v1::Probe {
                http_get: Some(k8s_openapi::api::core::v1::HTTPGetAction {
                    path: Some("/ready".to_string()),
                    port: k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(8080),
                    ..Default::default()
                }),
                initial_delay_seconds: Some(1),
                period_seconds: Some(1),
                ..Default::default()
            }),
            ..Default::default()
        };

        Deployment {
            metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
                name: Some(name.clone()),
                namespace: Some(namespace.clone()),
                labels: Some(labels.clone()),
                owner_references: Some(vec![self.controller_owner_ref(&()).unwrap()]),
                ..Default::default()
            },
            spec: Some(DeploymentSpec {
                replicas: Some(self.spec.min_replicas.unwrap_or(1)),
                selector: k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector {
                    match_labels: Some(labels.clone()),
                    ..Default::default()
                },
                template: k8s_openapi::api::core::v1::PodTemplateSpec {
                    metadata: Some(k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
                        labels: Some(labels),
                        ..Default::default()
                    }),
                    spec: Some(PodSpec {
                        runtime_class_name: Some(self.spec.runtime.clone()),
                        containers: vec![container],
                        restart_policy: Some("Always".to_string()),
                        ..Default::default()
                    }),
                },
                ..Default::default()
            }),
            ..Default::default()
        }
    }

    pub fn to_service(&self) -> Service {
        let name = self.name_any();
        let namespace = self.namespace().unwrap_or_default();

        let mut labels = BTreeMap::new();
        labels.insert("app".to_string(), name.clone());

        Service {
            metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
                name: Some(format!("{}-service", name)),
                namespace: Some(namespace),
                owner_references: Some(vec![self.controller_owner_ref(&()).unwrap()]),
                ..Default::default()
            },
            spec: Some(ServiceSpec {
                selector: Some(labels),
                ports: Some(vec![ServicePort {
                    name: Some("http".to_string()),
                    port: 80,
                    target_port: Some(k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(8080)),
                    protocol: Some("TCP".to_string()),
                    ..Default::default()
                }]),
                type_: Some("ClusterIP".to_string()),
                ..Default::default()
            }),
            ..Default::default()
        }
    }
}

#[derive(Clone)]
pub struct Context {
    pub client: Client,
}

pub async fn reconcile(wasm_function: Arc<WasmFunction>, ctx: Arc<Context>) -> Result<Action, Box<dyn std::error::Error + Send + Sync>> {
    let name = wasm_function.name_any();
    let namespace = wasm_function.namespace().unwrap_or_default();

    info!("Reconciling WasmFunction {}/{}", namespace, name);

    let deployments: Api<Deployment> = Api::namespaced(ctx.client.clone(), &namespace);
    let services: Api<Service> = Api::namespaced(ctx.client.clone(), &namespace);
    let functions: Api<WasmFunction> = Api::namespaced(ctx.client.clone(), &namespace);

    // Create or update deployment
    let deployment = wasm_function.to_deployment();
    let deployment_name = deployment.metadata.name.as_ref().unwrap();

    match deployments.get_opt(deployment_name).await? {
        Some(_existing) => {
            info!("Updating deployment {}/{}", namespace, deployment_name);
            deployments.patch(
                deployment_name,
                &PatchParams::apply("wasm-serverless-controller"),
                &Patch::Apply(&deployment),
            ).await?;
        },
        None => {
            info!("Creating deployment {}/{}", namespace, deployment_name);
            deployments.create(&PostParams::default(), &deployment).await?;
        }
    }

    // Create or update service
    let service = wasm_function.to_service();
    let service_name = service.metadata.name.as_ref().unwrap();

    match services.get_opt(service_name).await? {
        Some(_existing) => {
            info!("Updating service {}/{}", namespace, service_name);
            services.patch(
                service_name,
                &PatchParams::apply("wasm-serverless-controller"),
                &Patch::Apply(&service),
            ).await?;
        },
        None => {
            info!("Creating service {}/{}", namespace, service_name);
            services.create(&PostParams::default(), &service).await?;
        }
    }

    // Update status
    let status = WasmFunctionStatus {
        replicas: Some(1), // This should be read from actual deployment
        ready_replicas: Some(1),
        conditions: vec![FunctionCondition {
            condition_type: "Ready".to_string(),
            status: "True".to_string(),
            reason: Some("Deployed".to_string()),
            message: Some("Function deployed successfully".to_string()),
        }],
        endpoint: Some(format!("http://{}.{}.svc.cluster.local", service_name, namespace)),
    };

    let mut updated_function = (*wasm_function).clone();
    updated_function.status = Some(status);

    functions.patch_status(
        &name,
        &PatchParams::default(),
        &Patch::Merge(&updated_function),
    ).await?;

    Ok(Action::requeue(Duration::from_secs(60)))
}

pub fn error_policy(_wasm_function: Arc<WasmFunction>, error: &Box<dyn std::error::Error + Send + Sync>, _ctx: Arc<Context>) -> Action {
    error!("Reconciliation error: {:?}", error);
    Action::requeue(Duration::from_secs(30))
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    tracing_subscriber::init();

    let client = Client::try_default().await?;
    let context = Arc::new(Context { client: client.clone() });

    let functions = Api::<WasmFunction>::all(client);

    Controller::new(functions, Config::default())
        .run(reconcile, error_policy, context)
        .for_each(|_| async {})
        .await;

    Ok(())
}

Function CRD Definition

# crd/wasmfunction.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: wasmfunctions.serverless.example.com
spec:
  group: serverless.example.com
  versions:
    - name: v1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                image:
                  type: string
                  description: 'WASM function container image'
                runtime:
                  type: string
                  description: 'WASM runtime class name'
                  default: 'wasmtime'
                minReplicas:
                  type: integer
                  minimum: 0
                  default: 0
                maxReplicas:
                  type: integer
                  minimum: 1
                  default: 100
                targetCpu:
                  type: integer
                  minimum: 1
                  maximum: 100
                  default: 70
                targetMemory:
                  type: integer
                  minimum: 1
                  maximum: 100
                  default: 80
                timeout:
                  type: string
                  default: '30s'
                environment:
                  type: array
                  items:
                    type: object
                    properties:
                      name:
                        type: string
                      value:
                        type: string
                triggers:
                  type: array
                  items:
                    type: object
                    properties:
                      triggerType:
                        type: string
                        enum: ['http', 'kafka', 'cron', 'storage']
                      config:
                        type: object
                        additionalProperties:
                          type: string
            status:
              type: object
              properties:
                replicas:
                  type: integer
                readyReplicas:
                  type: integer
                conditions:
                  type: array
                  items:
                    type: object
                    properties:
                      conditionType:
                        type: string
                      status:
                        type: string
                      reason:
                        type: string
                      message:
                        type: string
                endpoint:
                  type: string
      subresources:
        status: {}
        scale:
          specReplicasPath: .spec.minReplicas
          statusReplicasPath: .status.replicas
  scope: Namespaced
  names:
    plural: wasmfunctions
    singular: wasmfunction
    kind: WasmFunction
    shortNames:
      - wfn
      - wfunc

Function Deployment Example

# example-function.yaml
apiVersion: serverless.example.com/v1
kind: WasmFunction
metadata:
  name: image-resizer
  namespace: default
spec:
  image: 'registry.example.com/image-resizer:v1.0.0'
  runtime: 'wasmtime'

  # Scaling configuration
  minReplicas: 0
  maxReplicas: 1000
  targetCpu: 70
  targetMemory: 80

  # Function configuration
  timeout: '30s'
  environment:
    - name: MAX_IMAGE_SIZE
      value: '10485760'
    - name: ALLOWED_FORMATS
      value: 'jpeg,png,webp'
    - name: RUST_LOG
      value: 'info'

  # Event triggers
  triggers:
    - triggerType: 'http'
      config:
        path: '/resize'
        methods: 'POST'
    - triggerType: 'kafka'
      config:
        topic: 'image-upload-events'
        consumerGroup: 'image-resizer-group'
    - triggerType: 'storage'
      config:
        bucket: 'user-uploads'
        events: 'object.created'

  # Resource requirements
  resources:
    requests:
      cpu: '50m'
      memory: '32Mi'
    limits:
      cpu: '2000m'
      memory: '256Mi'

---
apiVersion: serverless.example.com/v1
kind: WasmFunction
metadata:
  name: log-analyzer
  namespace: monitoring
spec:
  image: 'registry.example.com/log-analyzer:v1.2.0'
  runtime: 'wasmtime'

  minReplicas: 1 # Always running for log processing
  maxReplicas: 50

  triggers:
    - triggerType: 'kafka'
      config:
        topic: 'application-logs'
        consumerGroup: 'log-analyzer'
    - triggerType: 'cron'
      config:
        schedule: '0 */5 * * * *' # Every 5 minutes

  environment:
    - name: LOG_LEVEL
      value: 'warn'
    - name: ALERT_THRESHOLD
      value: '100'
    - name: ELASTICSEARCH_URL
      valueFrom:
        secretKeyRef:
          name: elasticsearch-credentials
          key: url

Function Development Patterns

Multi-Format Handler Pattern

// @filename: main.rs
// src/lib.rs - Multi-format function handler
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::{self, Read};

#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub enum FunctionInput {
    // HTTP request format
    HttpRequest {
        method: String,
        path: String,
        headers: HashMap<String, String>,
        body: String,
        query_parameters: Option<HashMap<String, String>>,
    },

    // Event format (Kafka, SQS, etc.)
    Event {
        source: String,
        event_type: String,
        data: serde_json::Value,
        timestamp: String,
        metadata: Option<HashMap<String, String>>,
    },

    // Simple function call
    DirectCall {
        function: String,
        parameters: serde_json::Value,
    },

    // Batch processing
    Batch {
        items: Vec<serde_json::Value>,
        batch_id: String,
        configuration: Option<HashMap<String, String>>,
    },
}

#[derive(Debug, Serialize)]
pub struct FunctionOutput {
    pub success: bool,
    pub data: Option<serde_json::Value>,
    pub error: Option<String>,
    pub metadata: OutputMetadata,
}

#[derive(Debug, Serialize)]
pub struct OutputMetadata {
    pub processing_time_ms: u64,
    pub memory_used_bytes: usize,
    pub items_processed: usize,
    pub function_version: String,
}

pub trait FunctionHandler {
    fn handle_http(&self, request: HttpRequest) -> Result<serde_json::Value, String>;
    fn handle_event(&self, event: Event) -> Result<serde_json::Value, String>;
    fn handle_direct_call(&self, call: DirectCall) -> Result<serde_json::Value, String>;
    fn handle_batch(&self, batch: Batch) -> Result<serde_json::Value, String>;
}

pub struct ImageProcessor {
    max_size: usize,
    allowed_formats: Vec<String>,
}

impl ImageProcessor {
    pub fn new() -> Self {
        let max_size = std::env::var("MAX_IMAGE_SIZE")
            .unwrap_or_else(|_| "10485760".to_string())
            .parse()
            .unwrap_or(10485760);

        let allowed_formats = std::env::var("ALLOWED_FORMATS")
            .unwrap_or_else(|_| "jpeg,png,webp".to_string())
            .split(',')
            .map(|s| s.trim().to_string())
            .collect();

        Self { max_size, allowed_formats }
    }
}

impl FunctionHandler for ImageProcessor {
    fn handle_http(&self, request: HttpRequest) -> Result<serde_json::Value, String> {
        match request.method.as_str() {
            "POST" => {
                if request.path == "/resize" {
                    self.process_resize_request(&request.body)
                } else if request.path == "/thumbnail" {
                    self.process_thumbnail_request(&request.body)
                } else {
                    Err(format!("Unsupported path: {}", request.path))
                }
            },
            "GET" => {
                if request.path == "/health" {
                    Ok(serde_json::json!({"status": "healthy", "uptime": get_uptime()}))
                } else if request.path == "/metrics" {
                    Ok(serde_json::json!(get_metrics()))
                } else {
                    Err(format!("Unsupported GET path: {}", request.path))
                }
            },
            _ => Err(format!("Unsupported method: {}", request.method)),
        }
    }

    fn handle_event(&self, event: Event) -> Result<serde_json::Value, String> {
        match event.event_type.as_str() {
            "image.uploaded" => {
                let image_data = event.data.get("image_data")
                    .and_then(|v| v.as_str())
                    .ok_or("Missing image_data in event")?;

                self.process_image_from_base64(image_data)
            },
            "batch.process" => {
                let images = event.data.get("images")
                    .and_then(|v| v.as_array())
                    .ok_or("Missing images array in event")?;

                self.process_image_batch(images)
            },
            _ => Err(format!("Unsupported event type: {}", event.event_type)),
        }
    }

    fn handle_direct_call(&self, call: DirectCall) -> Result<serde_json::Value, String> {
        match call.function.as_str() {
            "resize_image" => {
                let params = call.parameters.as_object()
                    .ok_or("Parameters must be an object")?;

                let image_data = params.get("image_data")
                    .and_then(|v| v.as_str())
                    .ok_or("Missing image_data parameter")?;

                let width = params.get("width")
                    .and_then(|v| v.as_u64())
                    .ok_or("Missing width parameter")? as u32;

                let height = params.get("height")
                    .and_then(|v| v.as_u64())
                    .ok_or("Missing height parameter")? as u32;

                self.resize_image_base64(image_data, width, height)
            },
            _ => Err(format!("Unsupported function: {}", call.function)),
        }
    }

    fn handle_batch(&self, batch: Batch) -> Result<serde_json::Value, String> {
        let mut results = Vec::new();
        let mut errors = Vec::new();

        for (index, item) in batch.items.iter().enumerate() {
            match self.process_batch_item(item) {
                Ok(result) => results.push(serde_json::json!({
                    "index": index,
                    "result": result
                })),
                Err(error) => errors.push(serde_json::json!({
                    "index": index,
                    "error": error
                })),
            }
        }

        Ok(serde_json::json!({
            "batch_id": batch.batch_id,
            "total_items": batch.items.len(),
            "successful_items": results.len(),
            "failed_items": errors.len(),
            "results": results,
            "errors": errors
        }))
    }
}

impl ImageProcessor {
    fn process_resize_request(&self, body: &str) -> Result<serde_json::Value, String> {
        // Implementation for resize request
        let request: serde_json::Value = serde_json::from_str(body)
            .map_err(|e| format!("Invalid JSON: {}", e))?;

        // Extract parameters and process
        Ok(serde_json::json!({"processed": true}))
    }

    fn process_thumbnail_request(&self, body: &str) -> Result<serde_json::Value, String> {
        // Implementation for thumbnail generation
        Ok(serde_json::json!({"thumbnail_generated": true}))
    }

    fn process_image_from_base64(&self, image_data: &str) -> Result<serde_json::Value, String> {
        // Implementation for processing base64 image
        Ok(serde_json::json!({"image_processed": true}))
    }

    fn process_image_batch(&self, images: &[serde_json::Value]) -> Result<serde_json::Value, String> {
        // Implementation for batch processing
        Ok(serde_json::json!({"batch_processed": images.len()}))
    }

    fn resize_image_base64(&self, image_data: &str, width: u32, height: u32) -> Result<serde_json::Value, String> {
        // Implementation for resizing image
        Ok(serde_json::json!({
            "resized": true,
            "width": width,
            "height": height
        }))
    }

    fn process_batch_item(&self, item: &serde_json::Value) -> Result<serde_json::Value, String> {
        // Implementation for processing individual batch item
        Ok(serde_json::json!({"item_processed": true}))
    }
}

// Helper functions
fn get_uptime() -> u64 {
    // Return uptime in seconds
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap()
        .as_secs()
}

fn get_metrics() -> serde_json::Value {
    serde_json::json!({
        "memory_usage": get_memory_usage(),
        "cpu_usage": get_cpu_usage(),
        "requests_processed": get_request_count(),
        "errors": get_error_count()
    })
}

fn get_memory_usage() -> usize {
    // Return current memory usage in bytes
    0 // Placeholder
}

fn get_cpu_usage() -> f64 {
    // Return current CPU usage percentage
    0.0 // Placeholder
}

fn get_request_count() -> u64 {
    // Return total requests processed
    0 // Placeholder
}

fn get_error_count() -> u64 {
    // Return total errors encountered
    0 // Placeholder
}

// Main function entry point
fn main() {
    let start_time = std::time::Instant::now();

    // Read input from stdin
    let mut input = String::new();
    io::stdin().read_to_string(&mut input)
        .expect("Failed to read input");

    // Parse input
    let function_input: FunctionInput = serde_json::from_str(&input)
        .unwrap_or_else(|_| FunctionInput::DirectCall {
            function: "unknown".to_string(),
            parameters: serde_json::json!({}),
        });

    // Create handler
    let handler = ImageProcessor::new();

    // Process request
    let result = match function_input {
        FunctionInput::HttpRequest { method, path, headers, body, query_parameters } => {
            handler.handle_http(HttpRequest { method, path, headers, body, query_parameters })
        },
        FunctionInput::Event { source, event_type, data, timestamp, metadata } => {
            handler.handle_event(Event { source, event_type, data, timestamp, metadata })
        },
        FunctionInput::DirectCall { function, parameters } => {
            handler.handle_direct_call(DirectCall { function, parameters })
        },
        FunctionInput::Batch { items, batch_id, configuration } => {
            handler.handle_batch(Batch { items, batch_id, configuration })
        },
    };

    // Create output
    let output = match result {
        Ok(data) => FunctionOutput {
            success: true,
            data: Some(data),
            error: None,
            metadata: OutputMetadata {
                processing_time_ms: start_time.elapsed().as_millis() as u64,
                memory_used_bytes: get_memory_usage(),
                items_processed: 1,
                function_version: env!("CARGO_PKG_VERSION").to_string(),
            },
        },
        Err(error) => FunctionOutput {
            success: false,
            data: None,
            error: Some(error),
            metadata: OutputMetadata {
                processing_time_ms: start_time.elapsed().as_millis() as u64,
                memory_used_bytes: get_memory_usage(),
                items_processed: 0,
                function_version: env!("CARGO_PKG_VERSION").to_string(),
            },
        },
    };

    // Output result
    println!("{}", serde_json::to_string(&output).unwrap());
}

// Type aliases for pattern matching
type HttpRequest = FunctionInput;
type Event = FunctionInput;
type DirectCall = FunctionInput;
type Batch = FunctionInput;

Testing Framework

// @filename: lib.rs
// tests/integration_test.rs
use serde_json::json;
use std::process::{Command, Stdio};
use std::io::Write;

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_http_resize_request() {
        let input = json!({
            "method": "POST",
            "path": "/resize",
            "headers": {
                "content-type": "application/json"
            },
            "body": json!({
                "image_data": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8/5+hHgAHggJ/PchI7wAAAABJRU5ErkJggg==",
                "width": 300,
                "height": 200
            }).to_string(),
            "query_parameters": {}
        });

        let result = run_function(input);
        assert!(result["success"].as_bool().unwrap());
        assert!(result["metadata"]["processing_time_ms"].as_u64().unwrap() > 0);
    }

    #[test]
    fn test_event_processing() {
        let input = json!({
            "source": "storage.bucket",
            "event_type": "image.uploaded",
            "data": {
                "image_data": "base64-encoded-image",
                "bucket": "user-uploads",
                "key": "images/photo.jpg"
            },
            "timestamp": "2024-01-24T10:00:00Z",
            "metadata": {
                "user_id": "12345"
            }
        });

        let result = run_function(input);
        assert!(result["success"].as_bool().unwrap());
    }

    #[test]
    fn test_batch_processing() {
        let input = json!({
            "items": [
                {"image_data": "image1", "operation": "resize"},
                {"image_data": "image2", "operation": "thumbnail"},
                {"image_data": "image3", "operation": "rotate"}
            ],
            "batch_id": "batch_123",
            "configuration": {
                "parallel": "true",
                "timeout": "30s"
            }
        });

        let result = run_function(input);
        assert!(result["success"].as_bool().unwrap());
        assert_eq!(result["data"]["total_items"].as_u64().unwrap(), 3);
    }

    #[test]
    fn test_error_handling() {
        let input = json!({
            "method": "POST",
            "path": "/invalid",
            "headers": {},
            "body": "",
            "query_parameters": {}
        });

        let result = run_function(input);
        assert!(!result["success"].as_bool().unwrap());
        assert!(result["error"].as_str().unwrap().contains("Unsupported path"));
    }

    #[test]
    fn test_performance_requirements() {
        let input = json!({
            "function": "resize_image",
            "parameters": {
                "image_data": "small-test-image",
                "width": 100,
                "height": 100
            }
        });

        let start = std::time::Instant::now();
        let result = run_function(input);
        let duration = start.elapsed();

        assert!(result["success"].as_bool().unwrap());
        assert!(duration.as_millis() < 100, "Function took too long: {:?}", duration);
        assert!(result["metadata"]["processing_time_ms"].as_u64().unwrap() < 100);
    }

    fn run_function(input: serde_json::Value) -> serde_json::Value {
        let mut child = Command::new("wasmtime")
            .arg("run")
            .arg("target/wasm32-wasi/release/image_processor.wasm")
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .stderr(Stdio::piped())
            .spawn()
            .expect("Failed to start WASM function");

        let stdin = child.stdin.as_mut().expect("Failed to get stdin");
        stdin.write_all(input.to_string().as_bytes())
            .expect("Failed to write to stdin");
        stdin.flush().expect("Failed to flush stdin");

        let output = child.wait_with_output()
            .expect("Failed to read output");

        if !output.status.success() {
            panic!("Function failed: {}", String::from_utf8_lossy(&output.stderr));
        }

        let stdout = String::from_utf8(output.stdout)
            .expect("Invalid UTF-8 output");

        serde_json::from_str(&stdout)
            .expect("Invalid JSON output")
    }
}

// Load testing
#[cfg(test)]
mod load_tests {
    use super::*;
    use std::sync::Arc;
    use std::sync::atomic::{AtomicUsize, Ordering};
    use tokio::time::{Duration, timeout};

    #[tokio::test]
    async fn test_concurrent_requests() {
        let request_count = Arc::new(AtomicUsize::new(0));
        let success_count = Arc::new(AtomicUsize::new(0));
        let error_count = Arc::new(AtomicUsize::new(0));

        let mut handles = Vec::new();

        // Spawn 100 concurrent requests
        for i in 0..100 {
            let request_count = Arc::clone(&request_count);
            let success_count = Arc::clone(&success_count);
            let error_count = Arc::clone(&error_count);

            let handle = tokio::spawn(async move {
                let input = json!({
                    "function": "resize_image",
                    "parameters": {
                        "image_data": format!("test-image-{}", i),
                        "width": 200,
                        "height": 200
                    }
                });

                request_count.fetch_add(1, Ordering::SeqCst);

                // Set timeout for each request
                let result = timeout(Duration::from_secs(5), async {
                    run_async_function(input).await
                }).await;

                match result {
                    Ok(Ok(response)) => {
                        if response["success"].as_bool().unwrap_or(false) {
                            success_count.fetch_add(1, Ordering::SeqCst);
                        } else {
                            error_count.fetch_add(1, Ordering::SeqCst);
                        }
                    },
                    Ok(Err(_)) | Err(_) => {
                        error_count.fetch_add(1, Ordering::SeqCst);
                    }
                }
            });
            handles.push(handle);
        }

        // Wait for all requests to complete
        for handle in handles {
            handle.await.expect("Task panicked");
        }

        let total_requests = request_count.load(Ordering::SeqCst);
        let successful_requests = success_count.load(Ordering::SeqCst);
        let failed_requests = error_count.load(Ordering::SeqCst);

        println!("Load test results:");
        println!("  Total requests: {}", total_requests);
        println!("  Successful: {}", successful_requests);
        println!("  Failed: {}", failed_requests);
        println!("  Success rate: {:.2}%", (successful_requests as f64 / total_requests as f64) * 100.0);

        // Assert at least 95% success rate
        assert!(successful_requests as f64 / total_requests as f64 >= 0.95);
        assert_eq!(total_requests, 100);
    }

    async fn run_async_function(input: serde_json::Value) -> Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>> {
        // Async version of run_function
        tokio::task::spawn_blocking(move || {
            run_function(input)
        }).await.map_err(|e| e.into())
    }
}

Event-Driven Architecture

Multi-Source Event Processing

# event-sources.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: event-router-config
  namespace: serverless
data:
  config.yaml: |
    event_sources:
      # HTTP API Gateway
      - name: "api-gateway"
        type: "http"
        config:
          host: "0.0.0.0"
          port: 8080
          routes:
            - path: "/api/v1/process"
              methods: ["POST", "PUT"]
              target_function: "data-processor"
            - path: "/api/v1/images/*"
              methods: ["POST"]
              target_function: "image-processor"
            - path: "/api/v1/notifications"
              methods: ["POST"]
              target_function: "notification-sender"
      
      # Kafka event streams
      - name: "kafka-events"
        type: "kafka"
        config:
          brokers: ["kafka-1:9092", "kafka-2:9092", "kafka-3:9092"]
          consumer_group: "wasm-functions"
          topics:
            - topic: "user.events"
              target_function: "user-processor"
              filter: "event_type == 'user.created' || event_type == 'user.updated'"
            - topic: "order.events"
              target_function: "order-processor"
              dead_letter_topic: "order.events.dlq"
            - topic: "payment.events"
              target_function: "payment-processor"
              retry_attempts: 3
      
      # Cloud storage events
      - name: "storage-events"
        type: "storage"
        config:
          provider: "aws_s3"
          buckets:
            - name: "user-uploads"
              events: ["object.created", "object.deleted"]
              target_function: "file-processor"
              filters:
                - suffix: ".jpg"
                - suffix: ".png"
                - suffix: ".pdf"
            - name: "data-exports"
              events: ["object.created"]
              target_function: "export-processor"
      
      # Scheduled events
      - name: "scheduler"
        type: "cron"
        config:
          schedules:
            - name: "cleanup-temp-files"
              schedule: "0 2 * * *"  # Daily at 2 AM
              target_function: "cleanup-function"
              timezone: "UTC"
            - name: "generate-reports"
              schedule: "0 9 * * MON"  # Monday at 9 AM
              target_function: "report-generator"
            - name: "health-check"
              schedule: "*/5 * * * *"  # Every 5 minutes
              target_function: "system-health-check"
      
      # Database change streams
      - name: "database-changes"
        type: "database"
        config:
          type: "postgresql"
          connection_string: "${DATABASE_URL}"
          tables:
            - name: "users"
              operations: ["INSERT", "UPDATE", "DELETE"]
              target_function: "user-sync"
            - name: "orders" 
              operations: ["INSERT", "UPDATE"]
              target_function: "order-tracking"
      
      # Message queues
      - name: "message-queues"
        type: "sqs"
        config:
          region: "us-east-1"
          queues:
            - name: "image-processing-queue"
              target_function: "image-processor"
              batch_size: 10
              visibility_timeout: 300
            - name: "email-queue"
              target_function: "email-sender"
              max_receives: 3
              dead_letter_queue: "email-dlq"

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: event-router
  namespace: serverless
spec:
  replicas: 3
  selector:
    matchLabels:
      app: event-router
  template:
    metadata:
      labels:
        app: event-router
    spec:
      containers:
        - name: event-router
          image: registry.example.com/event-router:v1.0.0
          ports:
            - containerPort: 8080
            - containerPort: 9090 # Metrics
          env:
            - name: CONFIG_PATH
              value: '/etc/config/config.yaml'
            - name: DATABASE_URL
              valueFrom:
                secretKeyRef:
                  name: database-credentials
                  key: url
            - name: KAFKA_BROKERS
              value: 'kafka-1:9092,kafka-2:9092,kafka-3:9092'
            - name: AWS_REGION
              value: 'us-east-1'
          volumeMounts:
            - name: config
              mountPath: /etc/config
            - name: aws-credentials
              mountPath: /var/secrets/aws
          resources:
            requests:
              cpu: '200m'
              memory: '256Mi'
            limits:
              cpu: '1000m'
              memory: '512Mi'
      volumes:
        - name: config
          configMap:
            name: event-router-config
        - name: aws-credentials
          secret:
            secretName: aws-credentials

Event Processing Implementation

// @filename: lib.rs
// src/event_router.rs
use tokio::sync::mpsc;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tracing::*;

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Event {
    pub id: String,
    pub source: String,
    pub event_type: String,
    pub timestamp: chrono::DateTime<chrono::Utc>,
    pub data: serde_json::Value,
    pub metadata: HashMap<String, String>,
}

#[derive(Debug, Clone)]
pub struct FunctionInvocation {
    pub function_name: String,
    pub event: Event,
    pub retry_count: u32,
    pub timeout: std::time::Duration,
}

pub struct EventRouter {
    function_registry: Arc<FunctionRegistry>,
    event_queue: mpsc::UnboundedSender<FunctionInvocation>,
    metrics: Arc<RouterMetrics>,
}

impl EventRouter {
    pub fn new(function_registry: Arc<FunctionRegistry>) -> Self {
        let (tx, rx) = mpsc::unbounded_channel();
        let metrics = Arc::new(RouterMetrics::new());

        // Start event processor
        let processor = EventProcessor::new(rx, Arc::clone(&function_registry), Arc::clone(&metrics));
        tokio::spawn(async move {
            processor.run().await;
        });

        Self {
            function_registry,
            event_queue: tx,
            metrics,
        }
    }

    pub async fn route_event(&self, event: Event) -> Result<(), RouterError> {
        info!("Routing event: {} from {}", event.event_type, event.source);

        // Find target function based on event
        let function_name = self.determine_target_function(&event)?;

        // Create invocation
        let invocation = FunctionInvocation {
            function_name: function_name.clone(),
            event: event.clone(),
            retry_count: 0,
            timeout: std::time::Duration::from_secs(30),
        };

        // Queue for processing
        self.event_queue.send(invocation)
            .map_err(|_| RouterError::QueueFull)?;

        // Update metrics
        self.metrics.events_routed.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

        info!("Event {} routed to function {}", event.id, function_name);
        Ok(())
    }

    fn determine_target_function(&self, event: &Event) -> Result<String, RouterError> {
        // Route based on source and event type
        let routing_key = format!("{}:{}", event.source, event.event_type);

        match routing_key.as_str() {
            "storage.bucket:object.created" => {
                // Route to different functions based on file type
                if let Some(key) = event.metadata.get("object_key") {
                    if key.ends_with(".jpg") || key.ends_with(".png") {
                        Ok("image-processor".to_string())
                    } else if key.ends_with(".pdf") {
                        Ok("document-processor".to_string())
                    } else {
                        Ok("file-processor".to_string())
                    }
                } else {
                    Ok("file-processor".to_string())
                }
            },
            "api.gateway:http.request" => {
                // Route based on path
                if let Some(path) = event.metadata.get("path") {
                    if path.starts_with("/api/v1/images") {
                        Ok("image-processor".to_string())
                    } else if path.starts_with("/api/v1/users") {
                        Ok("user-processor".to_string())
                    } else {
                        Ok("default-handler".to_string())
                    }
                } else {
                    Ok("default-handler".to_string())
                }
            },
            "kafka:user.created" | "kafka:user.updated" => {
                Ok("user-processor".to_string())
            },
            "kafka:order.created" | "kafka:order.updated" => {
                Ok("order-processor".to_string())
            },
            "cron:scheduled" => {
                // Route based on schedule name
                if let Some(schedule) = event.metadata.get("schedule_name") {
                    match schedule.as_str() {
                        "cleanup-temp-files" => Ok("cleanup-function".to_string()),
                        "generate-reports" => Ok("report-generator".to_string()),
                        "health-check" => Ok("system-health-check".to_string()),
                        _ => Err(RouterError::UnknownSchedule(schedule.clone())),
                    }
                } else {
                    Err(RouterError::MissingScheduleName)
                }
            },
            _ => {
                warn!("No routing rule for event: {}", routing_key);
                Err(RouterError::NoRoute(routing_key))
            }
        }
    }
}

pub struct EventProcessor {
    receiver: mpsc::UnboundedReceiver<FunctionInvocation>,
    function_registry: Arc<FunctionRegistry>,
    metrics: Arc<RouterMetrics>,
    active_invocations: Arc<tokio::sync::Mutex<HashMap<String, tokio::task::JoinHandle<()>>>>,
}

impl EventProcessor {
    pub fn new(
        receiver: mpsc::UnboundedReceiver<FunctionInvocation>,
        function_registry: Arc<FunctionRegistry>,
        metrics: Arc<RouterMetrics>,
    ) -> Self {
        Self {
            receiver,
            function_registry,
            metrics,
            active_invocations: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
        }
    }

    pub async fn run(mut self) {
        info!("Event processor starting");

        while let Some(invocation) = self.receiver.recv().await {
            let invocation_id = format!("{}:{}", invocation.event.id, invocation.function_name);

            // Check if function exists
            if !self.function_registry.function_exists(&invocation.function_name).await {
                error!("Function {} not found", invocation.function_name);
                self.metrics.function_not_found.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                continue;
            }

            // Spawn function invocation
            let function_registry = Arc::clone(&self.function_registry);
            let metrics = Arc::clone(&self.metrics);
            let active_invocations = Arc::clone(&self.active_invocations);

            let handle = tokio::spawn(async move {
                let start_time = std::time::Instant::now();

                match Self::invoke_function(&function_registry, &invocation).await {
                    Ok(result) => {
                        info!("Function {} completed successfully: {:?}", invocation.function_name, result);
                        metrics.successful_invocations.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
                    },
                    Err(error) => {
                        error!("Function {} failed: {}", invocation.function_name, error);
                        metrics.failed_invocations.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

                        // Retry logic
                        if invocation.retry_count < 3 {
                            let mut retry_invocation = invocation;
                            retry_invocation.retry_count += 1;

                            info!("Retrying function {} (attempt {})", retry_invocation.function_name, retry_invocation.retry_count);

                            // Schedule retry with exponential backoff
                            let delay = std::time::Duration::from_secs(2_u64.pow(retry_invocation.retry_count));
                            tokio::time::sleep(delay).await;

                            // Re-queue for retry (simplified - in production use proper retry queue)
                            if let Err(e) = Self::invoke_function(&function_registry, &retry_invocation).await {
                                error!("Retry failed for function {}: {}", retry_invocation.function_name, e);
                            }
                        }
                    }
                }

                let duration = start_time.elapsed();
                metrics.invocation_duration.observe(duration.as_secs_f64());

                // Remove from active invocations
                let mut active = active_invocations.lock().await;
                active.remove(&invocation_id);
            });

            // Track active invocation
            {
                let mut active = self.active_invocations.lock().await;
                active.insert(invocation_id, handle);
            }
        }

        info!("Event processor stopped");
    }

    async fn invoke_function(
        function_registry: &FunctionRegistry,
        invocation: &FunctionInvocation,
    ) -> Result<serde_json::Value, InvocationError> {
        // Get function metadata
        let function_info = function_registry.get_function(&invocation.function_name).await?;

        // Prepare function input
        let input = serde_json::json!({
            "source": invocation.event.source,
            "event_type": invocation.event.event_type,
            "data": invocation.event.data,
            "timestamp": invocation.event.timestamp,
            "metadata": invocation.event.metadata
        });

        // Invoke function with timeout
        let result = tokio::time::timeout(
            invocation.timeout,
            function_registry.invoke_function(&invocation.function_name, input)
        ).await;

        match result {
            Ok(Ok(response)) => Ok(response),
            Ok(Err(e)) => Err(InvocationError::FunctionError(e.to_string())),
            Err(_) => Err(InvocationError::Timeout),
        }
    }
}

// Supporting types and implementations
#[derive(Debug)]
pub enum RouterError {
    QueueFull,
    NoRoute(String),
    UnknownSchedule(String),
    MissingScheduleName,
}

#[derive(Debug)]
pub enum InvocationError {
    FunctionNotFound,
    FunctionError(String),
    Timeout,
}

pub struct RouterMetrics {
    pub events_routed: std::sync::atomic::AtomicU64,
    pub successful_invocations: std::sync::atomic::AtomicU64,
    pub failed_invocations: std::sync::atomic::AtomicU64,
    pub function_not_found: std::sync::atomic::AtomicU64,
    pub invocation_duration: prometheus::Histogram,
}

impl RouterMetrics {
    pub fn new() -> Self {
        Self {
            events_routed: std::sync::atomic::AtomicU64::new(0),
            successful_invocations: std::sync::atomic::AtomicU64::new(0),
            failed_invocations: std::sync::atomic::AtomicU64::new(0),
            function_not_found: std::sync::atomic::AtomicU64::new(0),
            invocation_duration: prometheus::Histogram::with_opts(
                prometheus::HistogramOpts::new("function_invocation_duration_seconds", "Function invocation duration")
                    .buckets(vec![0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0])
            ).unwrap(),
        }
    }
}

// Mock implementations for completeness
pub struct FunctionRegistry;

impl FunctionRegistry {
    pub async fn function_exists(&self, _name: &str) -> bool {
        true // Mock implementation
    }

    pub async fn get_function(&self, name: &str) -> Result<FunctionInfo, InvocationError> {
        Ok(FunctionInfo {
            name: name.to_string(),
            image: format!("registry.example.com/{}:latest", name),
            timeout: std::time::Duration::from_secs(30),
        })
    }

    pub async fn invoke_function(&self, _name: &str, _input: serde_json::Value) -> Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>> {
        // Mock successful invocation
        Ok(serde_json::json!({"success": true, "result": "processed"}))
    }
}

pub struct FunctionInfo {
    pub name: String,
    pub image: String,
    pub timeout: std::time::Duration,
}

impl std::fmt::Display for RouterError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{:?}", self)
    }
}

impl std::error::Error for RouterError {}

impl std::fmt::Display for InvocationError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{:?}", self)
    }
}

impl std::error::Error for InvocationError {}

Auto-Scaling and Performance

Advanced Scaling Configuration

# advanced-scaling.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: wasm-function-hpa
  namespace: serverless
spec:
  scaleTargetRef:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: image-processor
  minReplicas: 0 # Scale to zero
  maxReplicas: 1000

  # Multiple scaling metrics
  metrics:
    # CPU-based scaling
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70

    # Memory-based scaling
    - type: Resource
      resource:
        name: memory
        target:
          type: Utilization
          averageUtilization: 80

    # Request rate scaling
    - type: Pods
      pods:
        metric:
          name: http_requests_per_second
        target:
          type: AverageValue
          averageValue: '100'

    # Queue depth scaling
    - type: External
      external:
        metric:
          name: sqs_queue_depth
          selector:
            matchLabels:
              queue: image-processing
        target:
          type: AverageValue
          averageValue: '5'

    # Custom latency metric
    - type: External
      external:
        metric:
          name: function_p95_latency_seconds
          selector:
            matchLabels:
              function: image-processor
        target:
          type: Value
          value: '0.1' # Scale up if p95 > 100ms

  # Scaling behavior
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 30 # Fast scale up for WASM
      policies:
        - type: Percent
          value: 1000 # Up to 10x increase
          periodSeconds: 15
        - type: Pods
          value: 100 # Or 100 pods at once
          periodSeconds: 15
      selectPolicy: Max

    scaleDown:
      stabilizationWindowSeconds: 60
      policies:
        - type: Percent
          value: 50 # Scale down by 50%
          periodSeconds: 30
        - type: Pods
          value: 10 # Or 10 pods at once
          periodSeconds: 30
      selectPolicy: Min

---
# KEDA ScaledObject for advanced scaling
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: wasm-function-scaler
  namespace: serverless
spec:
  scaleTargetRef:
    name: image-processor
  pollingInterval: 5 # Check every 5 seconds
  cooldownPeriod: 30 # Wait 30s before scaling down
  idleReplicaCount: 0 # Scale to zero when idle
  minReplicaCount: 0
  maxReplicaCount: 2000

  triggers:
    # Kafka lag scaling
    - type: kafka
      metadata:
        bootstrapServers: kafka-1:9092,kafka-2:9092,kafka-3:9092
        consumerGroup: image-processor-group
        topic: image-events
        lagThreshold: '10'
        offsetResetPolicy: latest
      authenticationRef:
        name: kafka-auth

    # Prometheus metrics scaling
    - type: prometheus
      metadata:
        serverAddress: http://prometheus:9090
        metricName: function_queue_length
        threshold: '20'
        query: sum(function_queue_length{function="image-processor"})

    # HTTP request scaling
    - type: prometheus
      metadata:
        serverAddress: http://prometheus:9090
        metricName: http_requests_rate
        threshold: '1000'
        query: sum(rate(http_requests_total{function="image-processor"}[1m]))

    # Custom business metric
    - type: external-push
      metadata:
        scalerAddress: custom-scaler:8080

    # Schedule-based scaling
    - type: cron
      metadata:
        timezone: America/New_York
        start: '0 8 * * MON-FRI' # Scale up at 8 AM weekdays
        end: '0 18 * * MON-FRI' # Scale down at 6 PM weekdays
        desiredReplicas: '50'

---
# Vertical Pod Autoscaler for WASM functions
apiVersion: autoscaling.k8s.io/v1
kind: VerticalPodAutoscaler
metadata:
  name: wasm-function-vpa
  namespace: serverless
spec:
  targetRef:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: image-processor
  updatePolicy:
    updateMode: 'Auto'

  # WASM-optimized resource policy
  resourcePolicy:
    containerPolicies:
      - containerName: user-container
        minAllowed:
          cpu: 10m # Very low CPU for WASM
          memory: 16Mi # Minimal memory
        maxAllowed:
          cpu: 2000m
          memory: 512Mi
        controlledResources: ['cpu', 'memory']
        controlledValues: RequestsAndLimits

        # WASM-specific scaling factors
        mode: Auto

---
# Predictive scaling with machine learning
apiVersion: v1
kind: ConfigMap
metadata:
  name: predictive-scaler-config
  namespace: serverless
data:
  config.yaml: |
    predictive_scaling:
      enabled: true
      model_endpoint: "http://ml-predictor:8080/predict"
      
      # Historical patterns
      patterns:
        - name: "daily_pattern"
          description: "Higher load during business hours"
          schedule: "0 6-18 * * MON-FRI"
          scale_factor: 2.0
          
        - name: "weekly_pattern" 
          description: "Lower load on weekends"
          schedule: "0 * * * SAT-SUN"
          scale_factor: 0.5
          
        - name: "monthly_pattern"
          description: "End of month reporting spike"
          schedule: "0 * 28-31 * *"
          scale_factor: 3.0
      
      # ML model configuration
      model:
        features:
          - "hour_of_day"
          - "day_of_week"
          - "requests_last_hour"
          - "queue_depth"
          - "cpu_utilization"
          - "memory_utilization"
        prediction_window: "300s"  # 5 minute ahead prediction
        confidence_threshold: 0.8
        
      # Scaling decisions
      scaling:
        scale_up_threshold: 0.7    # Scale up at 70% predicted capacity
        scale_down_threshold: 0.3  # Scale down at 30% predicted capacity
        max_scale_up_rate: 200%    # Don't increase by more than 200%
        max_scale_down_rate: 50%   # Don't decrease by more than 50%

Performance Optimization Pipeline

// @filename: lib.rs
// src/performance_optimizer.rs
use prometheus::{Histogram, Counter, Gauge};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::time::{Duration, interval};
use tracing::*;

pub struct PerformanceOptimizer {
    metrics_collector: Arc<MetricsCollector>,
    scaling_controller: Arc<ScalingController>,
    resource_optimizer: Arc<ResourceOptimizer>,
    configuration: OptimizerConfig,
}

#[derive(Clone)]
pub struct OptimizerConfig {
    pub target_p95_latency_ms: f64,
    pub target_cpu_utilization: f64,
    pub target_memory_utilization: f64,
    pub min_throughput_rps: f64,
    pub optimization_interval: Duration,
    pub cold_start_threshold_ms: f64,
}

impl PerformanceOptimizer {
    pub fn new(config: OptimizerConfig) -> Self {
        Self {
            metrics_collector: Arc::new(MetricsCollector::new()),
            scaling_controller: Arc::new(ScalingController::new()),
            resource_optimizer: Arc::new(ResourceOptimizer::new()),
            configuration: config,
        }
    }

    pub async fn start_optimization_loop(&self) {
        let mut interval = interval(self.configuration.optimization_interval);

        loop {
            interval.tick().await;

            if let Err(e) = self.run_optimization_cycle().await {
                error!("Optimization cycle failed: {}", e);
            }
        }
    }

    async fn run_optimization_cycle(&self) -> Result<(), OptimizationError> {
        info!("Starting optimization cycle");

        // Collect current metrics
        let metrics = self.metrics_collector.collect_metrics().await?;

        // Analyze performance
        let analysis = self.analyze_performance(&metrics).await?;

        // Generate optimization recommendations
        let recommendations = self.generate_recommendations(&analysis).await?;

        // Apply optimizations
        self.apply_optimizations(&recommendations).await?;

        // Log optimization results
        info!("Optimization cycle completed: {:?}", recommendations);

        Ok(())
    }

    async fn analyze_performance(&self, metrics: &PerformanceMetrics) -> Result<PerformanceAnalysis, OptimizationError> {
        let mut issues = Vec::new();
        let mut recommendations = Vec::new();

        // Latency analysis
        if metrics.p95_latency_ms > self.configuration.target_p95_latency_ms {
            issues.push(PerformanceIssue::HighLatency {
                current: metrics.p95_latency_ms,
                target: self.configuration.target_p95_latency_ms,
            });

            // Check if high latency is due to cold starts
            if metrics.cold_start_ratio > 0.1 {  // More than 10% cold starts
                recommendations.push(OptimizationRecommendation::ReduceColdStarts {
                    current_ratio: metrics.cold_start_ratio,
                    suggested_min_replicas: (metrics.avg_concurrent_requests * 1.2) as u32,
                });
            }

            // Check if high latency is due to resource constraints
            if metrics.cpu_utilization > 0.8 {
                recommendations.push(OptimizationRecommendation::IncreaseResources {
                    resource_type: "cpu".to_string(),
                    current_utilization: metrics.cpu_utilization,
                    suggested_increase_factor: 1.5,
                });
            }
        }

        // Throughput analysis
        if metrics.requests_per_second < self.configuration.min_throughput_rps {
            issues.push(PerformanceIssue::LowThroughput {
                current: metrics.requests_per_second,
                target: self.configuration.min_throughput_rps,
            });

            // Analyze bottlenecks
            if metrics.avg_queue_time_ms > 10.0 {
                recommendations.push(OptimizationRecommendation::IncreaseReplicas {
                    current_replicas: metrics.current_replicas,
                    suggested_replicas: (metrics.current_replicas as f64 * 1.5) as u32,
                    reason: "High queue time".to_string(),
                });
            }
        }

        // Resource utilization analysis
        if metrics.memory_utilization > self.configuration.target_memory_utilization {
            issues.push(PerformanceIssue::HighMemoryUsage {
                current: metrics.memory_utilization,
                target: self.configuration.target_memory_utilization,
            });

            recommendations.push(OptimizationRecommendation::OptimizeMemory {
                current_usage_mb: metrics.memory_usage_mb,
                suggested_limit_mb: (metrics.memory_usage_mb * 1.2) as u32,
            });
        }

        // Error rate analysis
        if metrics.error_rate > 0.01 {  // More than 1% errors
            issues.push(PerformanceIssue::HighErrorRate {
                current: metrics.error_rate,
                threshold: 0.01,
            });

            // Analyze error patterns
            for (error_type, count) in &metrics.error_breakdown {
                if *count > 10 {
                    match error_type.as_str() {
                        "timeout" => {
                            recommendations.push(OptimizationRecommendation::IncreaseTimeout {
                                current_timeout_ms: metrics.avg_timeout_ms,
                                suggested_timeout_ms: metrics.avg_timeout_ms * 1.5,
                            });
                        },
                        "memory_limit" => {
                            recommendations.push(OptimizationRecommendation::IncreaseMemoryLimit {
                                current_limit_mb: metrics.memory_limit_mb,
                                suggested_limit_mb: metrics.memory_limit_mb * 1.3,
                            });
                        },
                        _ => {}
                    }
                }
            }
        }

        // Scaling efficiency analysis
        let scaling_efficiency = self.calculate_scaling_efficiency(metrics).await?;
        if scaling_efficiency.up_efficiency < 0.8 {
            recommendations.push(OptimizationRecommendation::TuneScaling {
                component: "scale_up".to_string(),
                current_efficiency: scaling_efficiency.up_efficiency,
                suggested_config: ScalingConfig {
                    stabilization_window_seconds: 15,  // Faster scale up
                    max_scale_up_rate: 300.0,          // More aggressive
                },
            });
        }

        Ok(PerformanceAnalysis {
            issues,
            recommendations,
            overall_score: self.calculate_performance_score(metrics),
            optimization_priority: self.determine_optimization_priority(&issues),
        })
    }

    async fn generate_recommendations(&self, analysis: &PerformanceAnalysis) -> Result<Vec<OptimizationAction>, OptimizationError> {
        let mut actions = Vec::new();

        for recommendation in &analysis.recommendations {
            match recommendation {
                OptimizationRecommendation::IncreaseReplicas { current_replicas, suggested_replicas, reason } => {
                    actions.push(OptimizationAction::ScaleReplicas {
                        from: *current_replicas,
                        to: *suggested_replicas,
                        reason: reason.clone(),
                    });
                },
                OptimizationRecommendation::IncreaseResources { resource_type, suggested_increase_factor, .. } => {
                    actions.push(OptimizationAction::AdjustResources {
                        resource_type: resource_type.clone(),
                        factor: *suggested_increase_factor,
                    });
                },
                OptimizationRecommendation::ReduceColdStarts { suggested_min_replicas, .. } => {
                    actions.push(OptimizationAction::SetMinReplicas {
                        min_replicas: *suggested_min_replicas,
                    });
                },
                OptimizationRecommendation::TuneScaling { component, suggested_config, .. } => {
                    actions.push(OptimizationAction::UpdateScalingConfig {
                        component: component.clone(),
                        config: suggested_config.clone(),
                    });
                },
                _ => {}
            }
        }

        // Sort actions by priority
        actions.sort_by_key(|action| action.priority());

        Ok(actions)
    }

    async fn apply_optimizations(&self, actions: &[OptimizationAction]) -> Result<(), OptimizationError> {
        for action in actions {
            match action {
                OptimizationAction::ScaleReplicas { from, to, reason } => {
                    info!("Scaling replicas from {} to {} ({})", from, to, reason);
                    self.scaling_controller.scale_replicas(*to).await?;
                },
                OptimizationAction::AdjustResources { resource_type, factor } => {
                    info!("Adjusting {} resources by factor {}", resource_type, factor);
                    self.resource_optimizer.adjust_resources(resource_type, *factor).await?;
                },
                OptimizationAction::SetMinReplicas { min_replicas } => {
                    info!("Setting minimum replicas to {}", min_replicas);
                    self.scaling_controller.set_min_replicas(*min_replicas).await?;
                },
                OptimizationAction::UpdateScalingConfig { component, config } => {
                    info!("Updating scaling config for {}: {:?}", component, config);
                    self.scaling_controller.update_config(component, config).await?;
                },
            }

            // Wait between actions to avoid overwhelming the system
            tokio::time::sleep(Duration::from_secs(5)).await;
        }

        Ok(())
    }

    async fn calculate_scaling_efficiency(&self, metrics: &PerformanceMetrics) -> Result<ScalingEfficiency, OptimizationError> {
        // Calculate how efficiently the system scales up and down
        // This would analyze historical scaling events and their effectiveness
        Ok(ScalingEfficiency {
            up_efficiency: 0.85,   // Mock values - real implementation would calculate from metrics
            down_efficiency: 0.92,
            avg_scale_up_time_seconds: 15.0,
            avg_scale_down_time_seconds: 45.0,
        })
    }

    fn calculate_performance_score(&self, metrics: &PerformanceMetrics) -> f64 {
        // Calculate overall performance score (0-100)
        let latency_score = if metrics.p95_latency_ms <= self.configuration.target_p95_latency_ms {
            100.0
        } else {
            100.0 * (self.configuration.target_p95_latency_ms / metrics.p95_latency_ms)
        };

        let throughput_score = if metrics.requests_per_second >= self.configuration.min_throughput_rps {
            100.0
        } else {
            100.0 * (metrics.requests_per_second / self.configuration.min_throughput_rps)
        };

        let error_score = 100.0 * (1.0 - metrics.error_rate.min(1.0));

        let resource_score = 100.0 * (1.0 - (metrics.cpu_utilization - self.configuration.target_cpu_utilization).abs());

        (latency_score + throughput_score + error_score + resource_score) / 4.0
    }

    fn determine_optimization_priority(&self, issues: &[PerformanceIssue]) -> OptimizationPriority {
        let has_critical_issues = issues.iter().any(|issue| match issue {
            PerformanceIssue::HighErrorRate { current, .. } => *current > 0.05,  // >5% error rate
            PerformanceIssue::HighLatency { current, target } => *current > *target * 2.0,  // 2x target latency
            _ => false,
        });

        if has_critical_issues {
            OptimizationPriority::Critical
        } else if !issues.is_empty() {
            OptimizationPriority::High
        } else {
            OptimizationPriority::Low
        }
    }
}

// Supporting types and structures
#[derive(Debug)]
pub struct PerformanceMetrics {
    pub p95_latency_ms: f64,
    pub requests_per_second: f64,
    pub error_rate: f64,
    pub cpu_utilization: f64,
    pub memory_utilization: f64,
    pub memory_usage_mb: f64,
    pub memory_limit_mb: f64,
    pub current_replicas: u32,
    pub cold_start_ratio: f64,
    pub avg_concurrent_requests: f64,
    pub avg_queue_time_ms: f64,
    pub avg_timeout_ms: f64,
    pub error_breakdown: HashMap<String, u32>,
}

#[derive(Debug)]
pub struct PerformanceAnalysis {
    pub issues: Vec<PerformanceIssue>,
    pub recommendations: Vec<OptimizationRecommendation>,
    pub overall_score: f64,
    pub optimization_priority: OptimizationPriority,
}

#[derive(Debug)]
pub enum PerformanceIssue {
    HighLatency { current: f64, target: f64 },
    LowThroughput { current: f64, target: f64 },
    HighMemoryUsage { current: f64, target: f64 },
    HighErrorRate { current: f64, threshold: f64 },
}

#[derive(Debug)]
pub enum OptimizationRecommendation {
    IncreaseReplicas { current_replicas: u32, suggested_replicas: u32, reason: String },
    IncreaseResources { resource_type: String, current_utilization: f64, suggested_increase_factor: f64 },
    ReduceColdStarts { current_ratio: f64, suggested_min_replicas: u32 },
    OptimizeMemory { current_usage_mb: f64, suggested_limit_mb: u32 },
    IncreaseTimeout { current_timeout_ms: f64, suggested_timeout_ms: f64 },
    IncreaseMemoryLimit { current_limit_mb: f64, suggested_limit_mb: f64 },
    TuneScaling { component: String, current_efficiency: f64, suggested_config: ScalingConfig },
}

#[derive(Debug)]
pub enum OptimizationAction {
    ScaleReplicas { from: u32, to: u32, reason: String },
    AdjustResources { resource_type: String, factor: f64 },
    SetMinReplicas { min_replicas: u32 },
    UpdateScalingConfig { component: String, config: ScalingConfig },
}

impl OptimizationAction {
    fn priority(&self) -> u8 {
        match self {
            OptimizationAction::ScaleReplicas { .. } => 1,      // Highest priority
            OptimizationAction::AdjustResources { .. } => 2,
            OptimizationAction::SetMinReplicas { .. } => 3,
            OptimizationAction::UpdateScalingConfig { .. } => 4, // Lowest priority
        }
    }
}

#[derive(Debug, Clone)]
pub struct ScalingConfig {
    pub stabilization_window_seconds: u32,
    pub max_scale_up_rate: f64,
}

#[derive(Debug)]
pub struct ScalingEfficiency {
    pub up_efficiency: f64,
    pub down_efficiency: f64,
    pub avg_scale_up_time_seconds: f64,
    pub avg_scale_down_time_seconds: f64,
}

#[derive(Debug)]
pub enum OptimizationPriority {
    Critical,
    High,
    Medium,
    Low,
}

#[derive(Debug)]
pub enum OptimizationError {
    MetricsCollectionFailed(String),
    AnalysisFailed(String),
    OptimizationApplyFailed(String),
}

impl std::fmt::Display for OptimizationError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{:?}", self)
    }
}

impl std::error::Error for OptimizationError {}

// Mock implementations for supporting components
pub struct MetricsCollector;

impl MetricsCollector {
    pub fn new() -> Self {
        Self
    }

    pub async fn collect_metrics(&self) -> Result<PerformanceMetrics, OptimizationError> {
        // Mock implementation - real version would collect from Prometheus, etc.
        Ok(PerformanceMetrics {
            p95_latency_ms: 85.0,
            requests_per_second: 1250.0,
            error_rate: 0.015,
            cpu_utilization: 0.65,
            memory_utilization: 0.72,
            memory_usage_mb: 128.0,
            memory_limit_mb: 256.0,
            current_replicas: 10,
            cold_start_ratio: 0.08,
            avg_concurrent_requests: 25.0,
            avg_queue_time_ms: 5.0,
            avg_timeout_ms: 30000.0,
            error_breakdown: [
                ("timeout".to_string(), 5),
                ("memory_limit".to_string(), 2),
                ("validation".to_string(), 8),
            ].iter().cloned().collect(),
        })
    }
}

pub struct ScalingController;

impl ScalingController {
    pub fn new() -> Self {
        Self
    }

    pub async fn scale_replicas(&self, replicas: u32) -> Result<(), OptimizationError> {
        info!("Scaling to {} replicas", replicas);
        Ok(())
    }

    pub async fn set_min_replicas(&self, min_replicas: u32) -> Result<(), OptimizationError> {
        info!("Setting minimum replicas to {}", min_replicas);
        Ok(())
    }

    pub async fn update_config(&self, component: &str, config: &ScalingConfig) -> Result<(), OptimizationError> {
        info!("Updating {} config: {:?}", component, config);
        Ok(())
    }
}

pub struct ResourceOptimizer;

impl ResourceOptimizer {
    pub fn new() -> Self {
        Self
    }

    pub async fn adjust_resources(&self, resource_type: &str, factor: f64) -> Result<(), OptimizationError> {
        info!("Adjusting {} by factor {}", resource_type, factor);
        Ok(())
    }
}

Security and Isolation

WASM Security Model

# security-policy.yaml
apiVersion: v1
kind: NetworkPolicy
metadata:
  name: wasm-function-network-policy
  namespace: serverless
spec:
  podSelector:
    matchLabels:
      runtime: wasm
  policyTypes:
    - Ingress
    - Egress

  # Ingress rules
  ingress:
    - from:
        - namespaceSelector:
            matchLabels:
              name: api-gateway
        - namespaceSelector:
            matchLabels:
              name: monitoring
      ports:
        - protocol: TCP
          port: 8080

  # Egress rules
  egress:
    # Allow DNS
    - to: []
      ports:
        - protocol: UDP
          port: 53
    # Allow database access
    - to:
        - namespaceSelector:
            matchLabels:
              name: database
      ports:
        - protocol: TCP
          port: 5432
    # Allow external APIs
    - to: []
      ports:
        - protocol: TCP
          port: 443

---
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
  name: wasm-functions-mtls
  namespace: serverless
spec:
  selector:
    matchLabels:
      runtime: wasm
  mtls:
    mode: STRICT

---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: wasm-function-authz
  namespace: serverless
spec:
  selector:
    matchLabels:
      runtime: wasm
  rules:
    # Allow monitoring access
    - from:
        - source:
            principals: ['cluster.local/ns/monitoring/sa/prometheus']
    # Allow API gateway access
    - from:
        - source:
            namespaces: ['api-gateway']
    # Function-to-function communication
    - from:
        - source:
            principals: ['cluster.local/ns/serverless/sa/function-invoker']
    # Specific path-based rules
    - to:
        - operation:
            methods: ['GET']
            paths: ['/health', '/metrics']
    - to:
        - operation:
            methods: ['POST']
            paths: ['/invoke']
      when:
        - key: request.headers[authorization]
          values: ['Bearer *']

Capability-Based Security

// @filename: lib.rs
// src/security.rs - WASM capability management
use std::collections::HashSet;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WasmCapabilities {
    pub network: NetworkCapabilities,
    pub filesystem: FilesystemCapabilities,
    pub environment: EnvironmentCapabilities,
    pub system: SystemCapabilities,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NetworkCapabilities {
    pub outbound_hosts: Vec<String>,
    pub allowed_ports: Vec<u16>,
    pub protocols: Vec<String>,
    pub max_connections: u32,
    pub timeout_seconds: u32,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FilesystemCapabilities {
    pub readable_paths: Vec<String>,
    pub writable_paths: Vec<String>,
    pub max_file_size: u64,
    pub max_files: u32,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnvironmentCapabilities {
    pub allowed_env_vars: Vec<String>,
    pub can_read_secrets: bool,
    pub max_memory_mb: u32,
    pub max_cpu_ms: u32,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemCapabilities {
    pub can_spawn_processes: bool,
    pub can_access_clock: bool,
    pub can_generate_random: bool,
    pub can_access_crypto: bool,
}

impl Default for WasmCapabilities {
    fn default() -> Self {
        Self {
            network: NetworkCapabilities {
                outbound_hosts: vec![],
                allowed_ports: vec![80, 443],
                protocols: vec!["https".to_string()],
                max_connections: 10,
                timeout_seconds: 30,
            },
            filesystem: FilesystemCapabilities {
                readable_paths: vec!["/tmp".to_string()],
                writable_paths: vec!["/tmp".to_string()],
                max_file_size: 10 * 1024 * 1024,  // 10MB
                max_files: 100,
            },
            environment: EnvironmentCapabilities {
                allowed_env_vars: vec!["PATH".to_string()],
                can_read_secrets: false,
                max_memory_mb: 128,
                max_cpu_ms: 30000,
            },
            system: SystemCapabilities {
                can_spawn_processes: false,
                can_access_clock: true,
                can_generate_random: true,
                can_access_crypto: false,
            },
        }
    }
}

pub struct CapabilityEnforcer {
    capabilities: WasmCapabilities,
    active_connections: std::sync::Arc<std::sync::Mutex<HashSet<String>>>,
    file_handles: std::sync::Arc<std::sync::Mutex<HashSet<String>>>,
}

impl CapabilityEnforcer {
    pub fn new(capabilities: WasmCapabilities) -> Self {
        Self {
            capabilities,
            active_connections: std::sync::Arc::new(std::sync::Mutex::new(HashSet::new())),
            file_handles: std::sync::Arc::new(std::sync::Mutex::new(HashSet::new())),
        }
    }

    pub fn check_network_access(&self, host: &str, port: u16) -> Result<(), SecurityError> {
        // Check if host is allowed
        if !self.capabilities.network.outbound_hosts.is_empty() &&
           !self.capabilities.network.outbound_hosts.iter().any(|allowed| {
               host.ends_with(allowed) || allowed == "*"
           }) {
            return Err(SecurityError::NetworkAccessDenied(format!("Host {} not allowed", host)));
        }

        // Check if port is allowed
        if !self.capabilities.network.allowed_ports.contains(&port) {
            return Err(SecurityError::NetworkAccessDenied(format!("Port {} not allowed", port)));
        }

        // Check connection limits
        let active_connections = self.active_connections.lock().unwrap();
        if active_connections.len() >= self.capabilities.network.max_connections as usize {
            return Err(SecurityError::ResourceLimitExceeded("Max connections reached".to_string()));
        }

        Ok(())
    }

    pub fn check_file_access(&self, path: &str, write: bool) -> Result<(), SecurityError> {
        let allowed_paths = if write {
            &self.capabilities.filesystem.writable_paths
        } else {
            &self.capabilities.filesystem.readable_paths
        };

        // Check if path is allowed
        let path_allowed = allowed_paths.iter().any(|allowed| {
            path.starts_with(allowed) || allowed == "*"
        });

        if !path_allowed {
            return Err(SecurityError::FileSystemAccessDenied(
                format!("Path {} not allowed for {} access", path, if write { "write" } else { "read" })
            ));
        }

        // Check file handle limits
        let file_handles = self.file_handles.lock().unwrap();
        if file_handles.len() >= self.capabilities.filesystem.max_files as usize {
            return Err(SecurityError::ResourceLimitExceeded("Max files reached".to_string()));
        }

        Ok(())
    }

    pub fn check_environment_access(&self, var_name: &str) -> Result<(), SecurityError> {
        if !self.capabilities.environment.allowed_env_vars.is_empty() &&
           !self.capabilities.environment.allowed_env_vars.contains(&var_name.to_string()) &&
           !self.capabilities.environment.allowed_env_vars.contains(&"*".to_string()) {
            return Err(SecurityError::EnvironmentAccessDenied(
                format!("Environment variable {} not allowed", var_name)
            ));
        }

        Ok(())
    }

    pub fn check_system_capability(&self, capability: &str) -> Result<(), SecurityError> {
        match capability {
            "spawn_process" => {
                if !self.capabilities.system.can_spawn_processes {
                    return Err(SecurityError::SystemCapabilityDenied("Process spawning not allowed".to_string()));
                }
            },
            "access_clock" => {
                if !self.capabilities.system.can_access_clock {
                    return Err(SecurityError::SystemCapabilityDenied("Clock access not allowed".to_string()));
                }
            },
            "generate_random" => {
                if !self.capabilities.system.can_generate_random {
                    return Err(SecurityError::SystemCapabilityDenied("Random generation not allowed".to_string()));
                }
            },
            "access_crypto" => {
                if !self.capabilities.system.can_access_crypto {
                    return Err(SecurityError::SystemCapabilityDenied("Crypto access not allowed".to_string()));
                }
            },
            _ => return Err(SecurityError::UnknownCapability(capability.to_string())),
        }

        Ok(())
    }
}

#[derive(Debug)]
pub enum SecurityError {
    NetworkAccessDenied(String),
    FileSystemAccessDenied(String),
    EnvironmentAccessDenied(String),
    SystemCapabilityDenied(String),
    ResourceLimitExceeded(String),
    UnknownCapability(String),
}

impl std::fmt::Display for SecurityError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{:?}", self)
    }
}

impl std::error::Error for SecurityError {}

// Integration with WASM runtime
pub struct SecureWasmRunner {
    enforcer: CapabilityEnforcer,
}

impl SecureWasmRunner {
    pub fn new(capabilities: WasmCapabilities) -> Self {
        Self {
            enforcer: CapabilityEnforcer::new(capabilities),
        }
    }

    pub async fn run_function(&self, wasm_bytes: &[u8], input: serde_json::Value) -> Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>> {
        // Set up WASM runtime with security constraints
        let engine = wasmtime::Engine::default();
        let module = wasmtime::Module::new(&engine, wasm_bytes)?;

        let mut linker = wasmtime::Linker::new(&engine);

        // Add secure host functions
        self.add_secure_host_functions(&mut linker)?;

        let mut store = wasmtime::Store::new(&engine, self);
        let instance = linker.instantiate(&mut store, &module)?;

        // Get main function
        let main_func = instance.get_typed_func::<(), ()>(&mut store, "main")?;

        // Execute with resource limits
        let result = tokio::time::timeout(
            std::time::Duration::from_millis(self.enforcer.capabilities.environment.max_cpu_ms as u64),
            async {
                main_func.call_async(&mut store, ()).await
            }
        ).await??;

        Ok(serde_json::json!({"success": true, "result": result}))
    }

    fn add_secure_host_functions(&self, linker: &mut wasmtime::Linker<&SecureWasmRunner>) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
        // Add file operations with capability checks
        linker.func_wrap("env", "file_open", |caller: wasmtime::Caller<'_, &SecureWasmRunner>, path_ptr: i32, path_len: i32| -> i32 {
            let data = caller.data();
            let memory = caller.get_export("memory").unwrap().into_memory().unwrap();

            // Extract path from WASM memory
            let path_bytes = memory.data(&caller)[path_ptr as usize..(path_ptr + path_len) as usize].to_vec();
            let path = String::from_utf8(path_bytes).unwrap_or_default();

            // Check capability
            match data.enforcer.check_file_access(&path, false) {
                Ok(()) => 1, // Success
                Err(_) => -1, // Access denied
            }
        })?;

        // Add network operations with capability checks
        linker.func_wrap("env", "http_request", |caller: wasmtime::Caller<'_, &SecureWasmRunner>, url_ptr: i32, url_len: i32| -> i32 {
            let data = caller.data();
            let memory = caller.get_export("memory").unwrap().into_memory().unwrap();

            // Extract URL from WASM memory
            let url_bytes = memory.data(&caller)[url_ptr as usize..(url_ptr + url_len) as usize].to_vec();
            let url = String::from_utf8(url_bytes).unwrap_or_default();

            // Parse URL to get host and port
            if let Ok(parsed_url) = url::Url::parse(&url) {
                let host = parsed_url.host_str().unwrap_or("");
                let port = parsed_url.port().unwrap_or(if url.starts_with("https") { 443 } else { 80 });

                // Check capability
                match data.enforcer.check_network_access(host, port) {
                    Ok(()) => 1, // Success
                    Err(_) => -1, // Access denied
                }
            } else {
                -1 // Invalid URL
            }
        })?;

        Ok(())
    }
}

Monitoring and Observability

Comprehensive Monitoring Stack

# monitoring-stack.yaml
apiVersion: v1
kind: ServiceMonitor
metadata:
  name: wasm-functions-monitor
  namespace: monitoring
spec:
  selector:
    matchLabels:
      runtime: wasm
  endpoints:
    - port: metrics
      interval: 15s
      path: /metrics

---
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
  name: wasm-function-alerts
  namespace: monitoring
spec:
  groups:
    - name: wasm-functions
      interval: 30s
      rules:
        # Cold start alerts
        - alert: WasmHighColdStartRate
          expr: |
            (
              sum(rate(wasm_function_cold_starts_total[5m])) /
              sum(rate(wasm_function_invocations_total[5m]))
            ) > 0.1
          for: 2m
          labels:
            severity: warning
            component: wasm-functions
          annotations:
            summary: 'High WASM function cold start rate'
            description: 'Cold start rate is {{ $value | humanizePercentage }} over the last 5 minutes'

        # Function execution time
        - alert: WasmFunctionHighLatency
          expr: |
            histogram_quantile(0.95,
              sum(rate(wasm_function_duration_seconds_bucket[5m])) by (le, function)
            ) > 1
          for: 5m
          labels:
            severity: critical
            component: wasm-functions
          annotations:
            summary: 'WASM function {{ $labels.function }} has high latency'
            description: '95th percentile latency is {{ $value }}s'

        # Memory usage
        - alert: WasmFunctionHighMemoryUsage
          expr: |
            (
              wasm_function_memory_usage_bytes /
              wasm_function_memory_limit_bytes
            ) > 0.9
          for: 10m
          labels:
            severity: warning
            component: wasm-functions
          annotations:
            summary: 'WASM function {{ $labels.function }} high memory usage'
            description: 'Memory usage is {{ $value | humanizePercentage }} of limit'

        # Error rate
        - alert: WasmFunctionHighErrorRate
          expr: |
            (
              sum(rate(wasm_function_errors_total[5m])) by (function) /
              sum(rate(wasm_function_invocations_total[5m])) by (function)
            ) > 0.05
          for: 3m
          labels:
            severity: critical
            component: wasm-functions
          annotations:
            summary: 'WASM function {{ $labels.function }} high error rate'
            description: 'Error rate is {{ $value | humanizePercentage }}'

        # Scaling alerts
        - alert: WasmFunctionScalingIssue
          expr: |
            (
              wasm_function_desired_replicas -
              wasm_function_available_replicas
            ) > 5
          for: 5m
          labels:
            severity: warning
            component: wasm-functions
          annotations:
            summary: 'WASM function {{ $labels.function }} scaling issue'
            description: '{{ $value }} replicas are not available (desired vs actual)'

---
# Grafana dashboard ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
  name: wasm-functions-dashboard
  namespace: monitoring
data:
  dashboard.json: |
    {
      "dashboard": {
        "id": null,
        "title": "WASM Serverless Functions",
        "tags": ["wasm", "serverless", "functions"],
        "style": "dark",
        "timezone": "UTC",
        "refresh": "30s",
        "time": {
          "from": "now-1h",
          "to": "now"
        },
        "panels": [
          {
            "id": 1,
            "title": "Function Invocations",
            "type": "graph",
            "targets": [
              {
                "expr": "sum(rate(wasm_function_invocations_total[5m])) by (function)",
                "legendFormat": "{{ function }}"
              }
            ],
            "yAxes": [
              {
                "label": "Invocations/sec",
                "min": 0
              }
            ]
          },
          {
            "id": 2, 
            "title": "Cold Start Rate",
            "type": "stat",
            "targets": [
              {
                "expr": "sum(rate(wasm_function_cold_starts_total[5m])) / sum(rate(wasm_function_invocations_total[5m])) * 100",
                "legendFormat": "Cold Start %"
              }
            ],
            "fieldConfig": {
              "defaults": {
                "unit": "percent",
                "thresholds": {
                  "steps": [
                    {"color": "green", "value": 0},
                    {"color": "yellow", "value": 5},
                    {"color": "red", "value": 15}
                  ]
                }
              }
            }
          },
          {
            "id": 3,
            "title": "Function Duration",
            "type": "graph", 
            "targets": [
              {
                "expr": "histogram_quantile(0.50, sum(rate(wasm_function_duration_seconds_bucket[5m])) by (le, function))",
                "legendFormat": "{{ function }} p50"
              },
              {
                "expr": "histogram_quantile(0.95, sum(rate(wasm_function_duration_seconds_bucket[5m])) by (le, function))",
                "legendFormat": "{{ function }} p95"
              },
              {
                "expr": "histogram_quantile(0.99, sum(rate(wasm_function_duration_seconds_bucket[5m])) by (le, function))",
                "legendFormat": "{{ function }} p99"
              }
            ],
            "yAxes": [
              {
                "label": "Duration (seconds)",
                "min": 0
              }
            ]
          },
          {
            "id": 4,
            "title": "Memory Usage",
            "type": "graph",
            "targets": [
              {
                "expr": "wasm_function_memory_usage_bytes / 1024 / 1024",
                "legendFormat": "{{ function }} Memory (MB)"
              }
            ]
          },
          {
            "id": 5,
            "title": "Active Functions",
            "type": "stat",
            "targets": [
              {
                "expr": "sum(wasm_function_available_replicas)",
                "legendFormat": "Active Instances"
              }
            ]
          },
          {
            "id": 6,
            "title": "Error Breakdown",
            "type": "piechart",
            "targets": [
              {
                "expr": "sum(rate(wasm_function_errors_total[5m])) by (error_type)",
                "legendFormat": "{{ error_type }}"
              }
            ]
          }
        ]
      }
    }

Distributed Tracing Implementation

// @filename: lib.rs
// src/tracing.rs - WASM function tracing
use opentelemetry::{
    global,
    trace::{TraceContextExt, Tracer, Status, SpanKind},
    Context, KeyValue,
};
use opentelemetry_jaeger::JaegerPropagator;
use tracing::{info, error, span, Level, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use std::collections::HashMap;

pub struct WasmFunctionTracer {
    tracer: Box<dyn Tracer + Send + Sync>,
}

impl WasmFunctionTracer {
    pub fn new(service_name: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
        global::set_text_map_propagator(JaegerPropagator::new());

        let tracer = opentelemetry_jaeger::new_agent_pipeline()
            .with_service_name(service_name)
            .with_endpoint("http://jaeger-collector:14268/api/traces")
            .with_tags(vec![
                KeyValue::new("runtime", "wasm"),
                KeyValue::new("platform", "kubernetes"),
            ])
            .install_batch(opentelemetry::runtime::Tokio)?;

        Ok(Self {
            tracer: Box::new(tracer),
        })
    }

    pub async fn trace_function_invocation<F, R>(
        &self,
        function_name: &str,
        event_id: &str,
        headers: &HashMap<String, String>,
        operation: F,
    ) -> R
    where
        F: std::future::Future<Output = R> + Send,
    {
        // Extract parent context from headers
        let parent_context = global::get_text_map_propagator(|propagator| {
            propagator.extract(&HeaderExtractor::new(headers))
        });

        // Create span for function invocation
        let span = self.tracer
            .span_builder(format!("wasm.function.{}", function_name))
            .with_kind(SpanKind::Server)
            .with_attributes(vec![
                KeyValue::new("function.name", function_name.to_string()),
                KeyValue::new("event.id", event_id.to_string()),
                KeyValue::new("runtime", "wasm"),
            ])
            .start_with_context(&self.tracer, &parent_context);

        let cx = Context::current_with_span(span);

        // Execute function with tracing context
        let result = operation.instrument(tracing::info_span!(
            "wasm_function",
            function.name = function_name,
            event.id = event_id
        )).await;

        // Update span with result
        let span = cx.span();
        span.set_status(Status::Ok);
        span.end();

        result
    }

    pub fn trace_function_execution<T>(
        &self,
        operation_name: &str,
        function_name: &str,
        attributes: Vec<KeyValue>,
    ) -> FunctionSpan<T> {
        let span = self.tracer
            .span_builder(operation_name)
            .with_kind(SpanKind::Internal)
            .with_attributes({
                let mut attrs = vec![
                    KeyValue::new("function.name", function_name.to_string()),
                ];
                attrs.extend(attributes);
                attrs
            })
            .start(&self.tracer);

        FunctionSpan::new(span)
    }
}

pub struct FunctionSpan<T> {
    span: Box<dyn opentelemetry::trace::Span + Send + Sync>,
    _phantom: std::marker::PhantomData<T>,
}

impl<T> FunctionSpan<T> {
    fn new(span: Box<dyn opentelemetry::trace::Span + Send + Sync>) -> Self {
        Self {
            span,
            _phantom: std::marker::PhantomData,
        }
    }

    pub fn add_event(&mut self, name: &str, attributes: Vec<KeyValue>) {
        self.span.add_event(name.to_string(), attributes);
    }

    pub fn set_attribute(&mut self, key: &str, value: &str) {
        self.span.set_attribute(KeyValue::new(key.to_string(), value.to_string()));
    }

    pub fn record_error(&mut self, error: &dyn std::error::Error) {
        self.span.record_error(error);
        self.span.set_status(Status::error(error.to_string()));
    }

    pub fn finish(self) {
        self.span.end();
    }
}

struct HeaderExtractor<'a> {
    headers: &'a HashMap<String, String>,
}

impl<'a> HeaderExtractor<'a> {
    fn new(headers: &'a HashMap<String, String>) -> Self {
        Self { headers }
    }
}

impl<'a> opentelemetry::propagation::Extractor for HeaderExtractor<'a> {
    fn get(&self, key: &str) -> Option<&str> {
        self.headers.get(key).map(|s| s.as_str())
    }

    fn keys(&self) -> Vec<&str> {
        self.headers.keys().map(|s| s.as_str()).collect()
    }
}

// Integration with function runtime
pub async fn traced_function_handler(
    tracer: &WasmFunctionTracer,
    function_name: &str,
    event: FunctionEvent,
) -> Result<FunctionResult, FunctionError> {
    tracer.trace_function_invocation(
        function_name,
        &event.id,
        &event.headers,
        async {
            let mut execution_span = tracer.trace_function_execution(
                "function.execute",
                function_name,
                vec![
                    KeyValue::new("event.source", event.source.clone()),
                    KeyValue::new("event.type", event.event_type.clone()),
                ],
            );

            // Add custom events
            execution_span.add_event("function.started", vec![
                KeyValue::new("timestamp", chrono::Utc::now().to_rfc3339()),
            ]);

            // Execute the actual function
            let result = match execute_wasm_function(function_name, &event).await {
                Ok(result) => {
                    execution_span.add_event("function.completed", vec![
                        KeyValue::new("success", true),
                        KeyValue::new("duration_ms", result.execution_time_ms as i64),
                    ]);

                    execution_span.set_attribute("result.success", "true");
                    execution_span.set_attribute("result.size_bytes", &result.output_size.to_string());

                    Ok(result)
                },
                Err(error) => {
                    execution_span.record_error(&error);
                    execution_span.add_event("function.failed", vec![
                        KeyValue::new("error", error.to_string()),
                    ]);

                    error!("Function {} failed: {}", function_name, error);
                    Err(error)
                }
            };

            execution_span.finish();
            result
        }
    ).await
}

// Mock types for completeness
#[derive(Debug)]
pub struct FunctionEvent {
    pub id: String,
    pub source: String,
    pub event_type: String,
    pub headers: HashMap<String, String>,
    pub payload: serde_json::Value,
}

#[derive(Debug)]
pub struct FunctionResult {
    pub output: serde_json::Value,
    pub execution_time_ms: u64,
    pub output_size: usize,
}

#[derive(Debug)]
pub struct FunctionError {
    pub message: String,
    pub error_type: String,
}

impl std::fmt::Display for FunctionError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}: {}", self.error_type, self.message)
    }
}

impl std::error::Error for FunctionError {}

async fn execute_wasm_function(
    _function_name: &str,
    _event: &FunctionEvent,
) -> Result<FunctionResult, FunctionError> {
    // Mock implementation
    tokio::time::sleep(std::time::Duration::from_millis(50)).await;

    Ok(FunctionResult {
        output: serde_json::json!({"processed": true}),
        execution_time_ms: 50,
        output_size: 24,
    })
}

CI/CD for Serverless WASM

Complete Pipeline Configuration

# .github/workflows/serverless-wasm-cicd.yml
name: Serverless WASM CI/CD Pipeline

on:
  push:
    branches: [ main, develop ]
    paths: [ 'functions/**' ]
  pull_request:
    branches: [ main ]

env:
  REGISTRY: ghcr.io
  ORGANIZATION: example

jobs:
  # Discovery job to find changed functions
  discover-functions:
    runs-on: ubuntu-latest
    outputs:
      functions: ${{ steps.discover.outputs.functions }}
      matrix: ${{ steps.discover.outputs.matrix }}
    steps:
    - uses: actions/checkout@v4
      with:
        fetch-depth: 0

    - name: Discover changed functions
      id: discover
      run: |
        # Find changed function directories
        CHANGED_FUNCTIONS=$(git diff --name-only ${{ github.event.before }}..${{ github.sha }} | grep '^functions/' | cut -d/ -f2 | sort -u | jq -R . | jq -s .)

        if [ "$CHANGED_FUNCTIONS" = "[]" ]; then
          # If no functions changed, build all for main branch
          if [ "${{ github.ref }}" = "refs/heads/main" ]; then
            CHANGED_FUNCTIONS=$(find functions -maxdepth 1 -type d -not -name functions | cut -d/ -f2 | jq -R . | jq -s .)
          fi
        fi

        echo "functions=$CHANGED_FUNCTIONS" >> $GITHUB_OUTPUT
        echo "matrix={'function': $CHANGED_FUNCTIONS}" >> $GITHUB_OUTPUT

        echo "Changed functions: $CHANGED_FUNCTIONS"

  # Test and build functions in parallel
  build-functions:
    needs: discover-functions
    if: needs.discover-functions.outputs.functions != '[]'
    runs-on: ubuntu-latest
    strategy:
      matrix: ${{ fromJson(needs.discover-functions.outputs.matrix) }}
      fail-fast: false

    steps:
    - uses: actions/checkout@v4

    - name: Setup Rust
      uses: actions-rs/toolchain@v1
      with:
        toolchain: stable
        target: wasm32-wasi
        override: true
        components: rustfmt, clippy

    - name: Cache Rust dependencies
      uses: actions/cache@v3
      with:
        path: |
          ~/.cargo/registry
          ~/.cargo/git
          functions/${{ matrix.function }}/target
        key: ${{ runner.os }}-cargo-${{ matrix.function }}-${{ hashFiles(format('functions/{0}/Cargo.lock', matrix.function)) }}

    - name: Function-specific setup
      working-directory: functions/${{ matrix.function }}
      run: |
        # Install function-specific dependencies
        if [ -f "setup.sh" ]; then
          chmod +x setup.sh
          ./setup.sh
        fi

    - name: Lint and format check
      working-directory: functions/${{ matrix.function }}
      run: |
        cargo fmt -- --check
        cargo clippy -- -D warnings

    - name: Unit tests
      working-directory: functions/${{ matrix.function }}
      run: |
        cargo test --target wasm32-wasi

    - name: Build WASM module
      working-directory: functions/${{ matrix.function }}
      run: |
        cargo build --target wasm32-wasi --release

        # Optimize WASM binary
        if command -v wasm-opt &> /dev/null; then
          wasm-opt -Os target/wasm32-wasi/release/${{ matrix.function }}.wasm -o ${{ matrix.function }}.wasm
        else
          cp target/wasm32-wasi/release/${{ matrix.function }}.wasm ${{ matrix.function }}.wasm
        fi

    - name: Build container image
      working-directory: functions/${{ matrix.function }}
      run: |
        # Create optimized Dockerfile
        cat > Dockerfile << 'EOF'
        FROM scratch
        COPY ${{ matrix.function }}.wasm /function.wasm
        ENTRYPOINT ["/function.wasm"]
        EOF

        # Build and tag image
        IMAGE_TAG="${{ env.REGISTRY }}/${{ env.ORGANIZATION }}/${{ matrix.function }}:${{ github.sha }}"
        docker build -t "$IMAGE_TAG" .

        # Also tag as latest for main branch
        if [ "${{ github.ref }}" = "refs/heads/main" ]; then
          docker tag "$IMAGE_TAG" "${{ env.REGISTRY }}/${{ env.ORGANIZATION }}/${{ matrix.function }}:latest"
        fi

    - name: Security scan
      uses: anchore/scan-action@v3
      with:
        image: "${{ env.REGISTRY }}/${{ env.ORGANIZATION }}/${{ matrix.function }}:${{ github.sha }}"
        fail-build: true
        severity-cutoff: high

    - name: Performance benchmarks
      working-directory: functions/${{ matrix.function }}
      run: |
        if [ -f "bench.sh" ]; then
          chmod +x bench.sh
          ./bench.sh
        fi

    - name: Push container image
      if: github.event_name == 'push'
      run: |
        echo ${{ secrets.GITHUB_TOKEN }} | docker login ${{ env.REGISTRY }} -u ${{ github.actor }} --password-stdin

        IMAGE_TAG="${{ env.REGISTRY }}/${{ env.ORGANIZATION }}/${{ matrix.function }}:${{ github.sha }}"
        docker push "$IMAGE_TAG"

        if [ "${{ github.ref }}" = "refs/heads/main" ]; then
          docker push "${{ env.REGISTRY }}/${{ env.ORGANIZATION }}/${{ matrix.function }}:latest"
        fi

  # Integration tests
  integration-tests:
    needs: [discover-functions, build-functions]
    if: needs.discover-functions.outputs.functions != '[]'
    runs-on: ubuntu-latest
    services:
      redis:
        image: redis:7
        options: >-
          --health-cmd "redis-cli ping"
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5
      postgres:
        image: postgres:15
        env:
          POSTGRES_PASSWORD = postgres
          POSTGRES_DB: testdb
        options: >-
          --health-cmd pg_isready
          --health-interval 10s
          --health-timeout 5s
          --health-retries 5

    steps:
    - uses: actions/checkout@v4

    - name: Setup test environment
      run: |
        # Install wasmtime for local testing
        curl https://wasmtime.dev/install.sh -sSf | bash
        echo "$HOME/.wasmtime/bin" >> $GITHUB_PATH

        # Install testing tools
        npm install -g newman  # Postman CLI
        pip install pytest requests

    - name: Run integration tests
      env:
        REDIS_URL: redis://localhost:6379
        DATABASE_URL: postgresql://postgres:postgres@localhost:5432/testdb
      run: |
        # Download built functions
        for function in $(echo '${{ needs.discover-functions.outputs.functions }}' | jq -r '.[]'); do
          echo "Testing function: $function"

          # Pull container image
          docker pull "${{ env.REGISTRY }}/${{ env.ORGANIZATION }}/$function:${{ github.sha }}"

          # Run function container
          docker run -d \
            --name "test-$function" \
            -p "800$((RANDOM % 100)):8080" \
            -e REDIS_URL="$REDIS_URL" \
            -e DATABASE_URL="$DATABASE_URL" \
            "${{ env.REGISTRY }}/${{ env.ORGANIZATION }}/$function:${{ github.sha }}"

          # Wait for function to be ready
          timeout 30 bash -c "until curl -f http://localhost:800$((RANDOM % 100))/health; do sleep 1; done"
        done

        # Run API tests
        if [ -f "tests/integration/api_tests.json" ]; then
          newman run tests/integration/api_tests.json \
            --environment tests/integration/test_env.json \
            --reporters cli,json \
            --reporter-json-export test-results.json
        fi

        # Run custom integration tests
        if [ -f "tests/integration/test_suite.py" ]; then
          pytest tests/integration/ -v --junitxml=pytest-results.xml
        fi

    - name: Upload test results
      uses: actions/upload-artifact@v3
      if: always()
      with:
        name: integration-test-results
        path: |
          test-results.json
          pytest-results.xml

  # Deploy to staging
  deploy-staging:
    needs: [discover-functions, build-functions, integration-tests]
    if: github.ref == 'refs/heads/develop' && needs.discover-functions.outputs.functions != '[]'
    runs-on: ubuntu-latest
    environment: staging

    steps:
    - uses: actions/checkout@v4

    - name: Setup kubectl
      uses: azure/setup-kubectl@v3

    - name: Configure kubeconfig
      run: |
        echo "${{ secrets.KUBE_CONFIG_STAGING }}" | base64 -d > kubeconfig
        export KUBECONFIG=kubeconfig

    - name: Deploy functions to staging
      run: |
        for function in $(echo '${{ needs.discover-functions.outputs.functions }}' | jq -r '.[]'); do
          echo "Deploying $function to staging"

          # Update function manifest
          sed -i "s|image:.*|image: ${{ env.REGISTRY }}/${{ env.ORGANIZATION }}/$function:${{ github.sha }}|" \
            "deployments/staging/$function.yaml"

          # Apply to Kubernetes
          kubectl apply -f "deployments/staging/$function.yaml"

          # Wait for rollout
          kubectl rollout status deployment "$function" -n staging --timeout=300s
        done

    - name: Run smoke tests
      run: |
        # Get staging endpoints and run basic health checks
        for function in $(echo '${{ needs.discover-functions.outputs.functions }}' | jq -r '.[]'); do
          ENDPOINT=$(kubectl get ingress "$function-ingress" -n staging -o jsonpath='{.status.loadBalancer.ingress[0].hostname}')

          # Health check
          curl -f "https://$ENDPOINT/health" || exit 1

          # Basic functionality test
          if [ -f "tests/smoke/$function.sh" ]; then
            chmod +x "tests/smoke/$function.sh"
            "tests/smoke/$function.sh" "$ENDPOINT"
          fi
        done

  # Deploy to production
  deploy-production:
    needs: [discover-functions, build-functions, integration-tests]
    if: github.ref == 'refs/heads/main' && needs.discover-functions.outputs.functions != '[]'
    runs-on: ubuntu-latest
    environment: production

    steps:
    - uses: actions/checkout@v4

    - name: Setup kubectl
      uses: azure/setup-kubectl@v3

    - name: Configure kubeconfig
      run: |
        echo "${{ secrets.KUBE_CONFIG_PRODUCTION }}" | base64 -d > kubeconfig
        export KUBECONFIG=kubeconfig

    - name: Blue-Green deployment
      run: |
        for function in $(echo '${{ needs.discover-functions.outputs.functions }}' | jq -r '.[]'); do
          echo "Deploying $function to production with blue-green strategy"

          # Create green deployment
          sed -i "s|image:.*|image: ${{ env.REGISTRY }}/${{ env.ORGANIZATION }}/$function:${{ github.sha }}|" \
            "deployments/production/$function-green.yaml"

          kubectl apply -f "deployments/production/$function-green.yaml"
          kubectl rollout status deployment "$function-green" -n production --timeout=600s

          # Run production validation tests
          if [ -f "tests/production/$function.sh" ]; then
            chmod +x "tests/production/$function.sh"
            "tests/production/$function.sh"
          fi

          # Switch traffic to green
          kubectl patch service "$function-service" -n production \
            -p '{"spec":{"selector":{"version":"green"}}}'

          # Monitor for 5 minutes
          sleep 300

          # Check error rates
          ERROR_RATE=$(kubectl exec -n monitoring deployment/prometheus -- \
            promtool query instant 'rate(wasm_function_errors_total{function="'$function'"}[5m]) / rate(wasm_function_invocations_total{function="'$function'"}[5m])' | \
            grep -o '[0-9.]*$' || echo "0")

          if (( $(echo "$ERROR_RATE > 0.01" | bc -l) )); then
            echo "High error rate detected ($ERROR_RATE), rolling back"
            kubectl patch service "$function-service" -n production \
              -p '{"spec":{"selector":{"version":"blue"}}}'
            exit 1
          fi

          # Clean up blue deployment
          kubectl delete deployment "$function-blue" -n production || true

          # Rename green to blue for next deployment
          kubectl patch deployment "$function-green" -n production \
            -p '{"metadata":{"name":"'$function'-blue"},"spec":{"selector":{"matchLabels":{"version":"blue"}},"template":{"metadata":{"labels":{"version":"blue"}}}}}'
        done

  # Performance benchmarks in production
  performance-tests:
    needs: deploy-production
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest

    steps:
    - uses: actions/checkout@v4

    - name: Install k6
      run: |
        wget https://github.com/grafana/k6/releases/latest/download/k6-linux-amd64.tar.gz
        tar -xzf k6-linux-amd64.tar.gz
        sudo mv k6*/k6 /usr/local/bin/

    - name: Run performance tests
      run: |
        for function in $(echo '${{ needs.discover-functions.outputs.functions }}' | jq -r '.[]'); do
          if [ -f "tests/performance/$function.js" ]; then
            echo "Running performance tests for $function"
            k6 run "tests/performance/$function.js" \
              --env FUNCTION_ENDPOINT="https://api.example.com/$function" \
              --out json=results-$function.json
          fi
        done

    - name: Analyze results
      run: |
        python scripts/analyze-performance.py results-*.json > performance-report.md

    - name: Upload performance results
      uses: actions/upload-artifact@v3
      with:
        name: performance-results
        path: |
          results-*.json
          performance-report.md

  # Cleanup and notification
  cleanup:
    needs: [deploy-production, performance-tests]
    if: always()
    runs-on: ubuntu-latest

    steps:
    - name: Slack notification
      uses: 8398a7/action-slack@v3
      with:
        status: ${{ job.status }}
        channel: '#serverless-deployments'
        webhook_url: ${{ secrets.SLACK_WEBHOOK }}
        fields: repo,message,commit,author,action,eventName,ref,workflow
        custom_payload: |
          {
            text: "Serverless WASM Deployment",
            attachments: [{
              color: '${{ job.status }}' === 'success' ? 'good' : 'danger',
              fields: [
                {
                  title: 'Repository',
                  value: '${{ github.repository }}',
                  short: true
                },
                {
                  title: 'Functions Updated',
                  value: '${{ needs.discover-functions.outputs.functions }}',
                  short: true
                },
                {
                  title: 'Environment',
                  value: '${{ github.ref }}' === 'refs/heads/main' ? 'Production' : 'Staging',
                  short: true
                },
                {
                  title: 'Status',
                  value: '${{ job.status }}',
                  short: true
                }
              ]
            }]
          }

Production Operations

Operational Runbooks

# operations/runbooks.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: serverless-wasm-runbooks
  namespace: operations
data:
  incident-response.md: |
    # Serverless WASM Incident Response Runbook

    ## High Error Rate (> 5%)

    ### Investigation Steps:
    1. **Check function logs**:
       ```bash
       kubectl logs -l app=<function-name> -n serverless --tail=100
       ```

    2. **Check metrics**:
       ```bash
       # Error breakdown
       promtool query instant 'rate(wasm_function_errors_total[5m]) by (error_type, function)'
       
       # Recent deployments
       kubectl get events -n serverless --sort-by='.firstTimestamp'
       ```

    3. **Identify error patterns**:
       - Memory limits exceeded
       - Timeout errors
       - Invalid input validation
       - Network connectivity issues

    ### Mitigation Actions:
    - **Memory errors**: Increase memory limits or optimize function
    - **Timeout errors**: Increase timeout or optimize performance
    - **Validation errors**: Check input schemas and validation logic
    - **Network errors**: Verify network policies and external service health

    ### Rollback Procedure:
    ```bash
    # Emergency rollback to previous version
    kubectl rollout undo deployment/<function-name> -n serverless
    kubectl rollout status deployment/<function-name> -n serverless
    ```

    ## High Latency (p95 > 1s)

    ### Investigation:
    1. **Check cold start ratio**:
       ```bash
       promtool query instant 'rate(wasm_function_cold_starts_total[5m]) / rate(wasm_function_invocations_total[5m])'
       ```

    2. **Check resource utilization**:
       ```bash
       kubectl top pods -l app=<function-name> -n serverless
       ```

    3. **Check scaling metrics**:
       ```bash
       kubectl get hpa -n serverless
       kubectl describe hpa <function-name>-hpa -n serverless
       ```

    ### Mitigation:
    - **High cold starts**: Increase minimum replicas
    - **Resource constraints**: Scale up or increase resource limits
    - **Slow dependencies**: Check database/API response times

    ## Zero Replicas (Scale-to-Zero Issues)

    ### Investigation:
    ```bash
    # Check autoscaler status
    kubectl describe hpa <function-name>-hpa -n serverless

    # Check metrics server
    kubectl get --raw /apis/metrics.k8s.io/v1beta1/pods

    # Check scaling events
    kubectl get events --field-selector reason=SuccessfulRescale -n serverless
    ```

    ### Mitigation:
    ```bash
    # Temporarily set minimum replicas
    kubectl patch hpa <function-name>-hpa -n serverless -p '{"spec":{"minReplicas":1}}'

    # Or manually scale
    kubectl scale deployment <function-name> --replicas=3 -n serverless
    ```

  capacity-planning.md: |
    # Capacity Planning for Serverless WASM

    ## Resource Estimation

    ### Memory Requirements:
    - **Base WASM runtime**: 2-5MB per instance
    - **Function code**: 1-10MB depending on complexity
    - **Working memory**: Varies by function logic (10-100MB typical)
    - **Buffer for scaling**: 20% overhead

    ### CPU Requirements:
    - **Idle CPU**: ~1-5m per instance
    - **Active CPU**: Depends on workload
    - **Burst capacity**: 100-500m per instance

    ### Scaling Calculations:
    ```
    Peak RPS: 10,000 requests/second
    Average response time: 50ms
    Concurrent requests = Peak RPS × Response Time = 10,000 × 0.05 = 500

    With 100 RPS per instance:
    Required instances = 10,000 / 100 = 100 instances

    With buffer (20%): 100 × 1.2 = 120 instances
    ```

    ## Node Sizing:

    ### For WASM Functions:
    - **Small functions** (< 10MB memory): 500-1000 per node
    - **Medium functions** (10-50MB): 100-500 per node  
    - **Large functions** (50-200MB): 20-100 per node

    ### Recommended Node Sizes:
    - **CPU-optimized**: 16-32 vCPU, 32-64GB RAM
    - **Memory-optimized**: 8-16 vCPU, 64-128GB RAM
    - **Balanced**: 12-24 vCPU, 48-96GB RAM

  disaster-recovery.md: |
    # Disaster Recovery Procedures

    ## Complete Platform Failure

    ### Recovery Steps:
    1. **Assess scope**:
       ```bash
       # Check cluster status
       kubectl cluster-info
       kubectl get nodes
       kubectl get namespaces
       ```

    2. **Restore from backup**:
       ```bash
       # Using Velero
       velero restore create --from-backup serverless-backup-20240124
       velero restore describe serverless-restore-001
       ```

    3. **Verify function deployments**:
       ```bash
       kubectl get deployments -n serverless
       kubectl get services -n serverless
       kubectl get ingresses -n serverless
       ```

    4. **Test function endpoints**:
       ```bash
       curl -f https://api.example.com/function1/health
       curl -f https://api.example.com/function2/health
       ```

    ## Data Corruption

    ### Steps:
    1. **Isolate affected functions**:
       ```bash
       kubectl scale deployment <affected-function> --replicas=0 -n serverless
       ```

    2. **Restore from clean backup**:
       ```bash
       # Restore specific function
       velero restore create \
         --from-backup function-backup-20240124 \
         --include-resources deployments,services,configmaps \
         --selector app=<function-name>
       ```

    3. **Validate data integrity**:
       ```bash
       # Run data validation tests
       kubectl exec -it <function-pod> -- /bin/sh
       # Run integrity checks
       ```

    ## Network Partitioning

    ### Multi-Region Failover:
    ```bash
    # Switch traffic to backup region
    kubectl patch ingress <function-ingress> -n serverless \
      --patch '{"spec":{"rules":[{"host":"api.example.com","http":{"paths":[{"path":"/","backend":{"service":{"name":"backup-function-service","port":{"number":80}}}}]}}]}}'

    # Monitor failover
    watch kubectl get pods -n serverless -o wide
    ```

---
# Monitoring and alerting during incidents
apiVersion: v1
kind: ConfigMap
metadata:
  name: incident-monitoring
  namespace: operations
data:
  alert-escalation.yaml: |
    escalation_policies:
      - name: serverless-critical
        triggers:
          - alert: WasmFunctionDown
            severity: critical
          - alert: WasmHighErrorRate
            severity: critical
            threshold: "error_rate > 10%"
        
        escalation_steps:
          - level: 1
            delay: 0m
            contacts: ["oncall-engineer"]
            actions: ["page", "slack"]
          
          - level: 2  
            delay: 15m
            contacts: ["platform-team-lead"]
            actions: ["phone", "email"]
          
          - level: 3
            delay: 30m  
            contacts: ["engineering-manager"]
            actions: ["phone", "escalation-email"]
      
      - name: serverless-warning
        triggers:
          - alert: WasmFunctionHighLatency
            severity: warning
          - alert: WasmFunctionScalingIssue
            severity: warning
            
        escalation_steps:
          - level: 1
            delay: 0m
            contacts: ["platform-team"]
            actions: ["slack"]
          
          - level: 2
            delay: 60m
            contacts: ["oncall-engineer"] 
            actions: ["email"]

    notification_channels:
      slack:
        webhook_url: "${SLACK_WEBHOOK_URL}"
        channel: "#serverless-alerts"
      
      email:
        smtp_server: "smtp.example.com"
        from: "alerts@example.com"
        
      pagerduty:
        service_key: "${PAGERDUTY_SERVICE_KEY}"

Conclusion

Serverless WebAssembly functions in Kubernetes represent a paradigm shift that combines the best aspects of serverless computing with WASM’s revolutionary performance characteristics. This comprehensive guide has covered the complete journey from basic concepts to production-ready implementations.

Key Achievements

  • Sub-millisecond cold starts vs seconds for traditional containers
  • 20x higher density - thousands of functions per node
  • Unified development model across multiple platforms
  • Production-grade security with capability-based isolation
  • Comprehensive observability with distributed tracing
  • Automated CI/CD pipelines for function deployment
  • Advanced scaling strategies with predictive algorithms

Performance Summary

MetricTraditional ServerlessWASM ServerlessImprovement
Cold Start1-5 seconds1-5 milliseconds1000x faster
Memory Footprint128MB+2-10MB90% reduction
Startup OverheadHighMinimalNear-instant
Scaling Speed30+ seconds<1 second30x faster
Resource Density10-50 per node500-5000 per node20x higher

Platform Comparison Summary

Choose Knative when:

  • Enterprise features and compliance are critical
  • Complex event sources and integrations needed
  • Large development teams with varied skill levels
  • Budget allows for comprehensive platform investment

Choose OpenFaaS when:

  • Simplicity and quick deployment are priorities
  • Small to medium teams with focused requirements
  • Cost-conscious implementations
  • Basic serverless patterns sufficient

Choose Custom Platform when:

  • Unique requirements not met by existing platforms
  • Performance is absolutely critical
  • Full control over platform behavior needed
  • Development team has deep Kubernetes expertise

Best Practices Recap

  1. Function Design:

    • Keep functions small and focused (single responsibility)
    • Minimize dependencies and external calls
    • Implement proper error handling and timeouts
    • Use structured logging for observability
  2. Security:

    • Apply capability-based security from day one
    • Use network policies to restrict function communication
    • Implement proper authentication and authorization
    • Regular security audits and vulnerability scanning
  3. Performance:

    • Profile functions to identify bottlenecks
    • Implement predictive scaling for consistent performance
    • Use connection pooling for database access
    • Cache frequently accessed data appropriately
  4. Operations:

    • Comprehensive monitoring and alerting
    • Automated deployment pipelines with thorough testing
    • Disaster recovery procedures and regular drills
    • Capacity planning based on actual usage patterns

Future Developments

The serverless WASM ecosystem continues rapid evolution:

  • Enhanced runtimes with better performance and more capabilities
  • Improved tooling for development, debugging, and monitoring
  • Standard interfaces for cross-platform compatibility
  • AI/ML integration for intelligent scaling and optimization
  • Edge computing expansion with geo-distributed deployments

Next Steps

Ready to explore the future of container technology? Our final article examines the convergence of WASM and containers, exploring emerging patterns and future architectural possibilities!

Resources

The serverless revolution has been accelerated by WebAssembly. Welcome to the era of truly efficient serverless computing! 🚀

Kubernetes Container Orchestration DevOps Performance
Share:

Continue Reading