Skip to content

Real-Time AI Applications with LangChain and WebSockets

Building real-time AI applications has become crucial for creating responsive and engaging user experiences. This comprehensive guide explores how to implement LangChain streaming with WebSockets and Server-Sent Events (SSE) to build a scalable real-time AI assistant. We’ll cover everything from basic streaming responses to advanced topics like connection management, queue handling, and production-ready scaling considerations.

Why Real-Time AI Matters

Traditional request-response patterns for AI applications can lead to poor user experiences, especially when dealing with large language models that take seconds to generate complete responses. Real-time streaming solves this by:

  • Immediate feedback: Users see responses as they’re generated
  • Better perceived performance: Even slower models feel responsive
  • Enhanced interactivity: Enable features like real-time collaboration
  • Resource efficiency: Stream processing reduces memory overhead

Understanding Streaming Technologies

Before diving into implementation, let’s understand the three main approaches for real-time communication in web applications:

WebSockets

WebSockets provide full-duplex communication channels over a single TCP connection, perfect for bidirectional real-time data flow.

Advantages:

  • Bidirectional communication
  • Low latency
  • Efficient for frequent small messages
  • Persistent connections

Use cases:

  • Collaborative features
  • Real-time notifications
  • Interactive AI assistants

Server-Sent Events (SSE)

SSE enables servers to push data to web clients over HTTP, ideal for unidirectional streaming from server to client.

Advantages:

  • Simple implementation
  • Automatic reconnection
  • Works over standard HTTP
  • Built-in event IDs for message ordering

Use cases:

  • Streaming AI responses
  • Live updates
  • Progress notifications

Long Polling

While not true real-time, long polling can be a fallback option for environments where WebSockets and SSE aren’t available.

Setting Up the Backend Infrastructure

Let’s build a complete real-time AI assistant using LangChain, WebSockets, and proper connection management. We’ll use Node.js with Express for the backend and React for the frontend.

Installing Dependencies

# @filename: script.sh
# Backend dependencies
npm install express ws langchain @langchain/openai dotenv cors
npm install --save-dev @types/ws @types/express typescript nodemon

# Additional utilities
npm install uuid winston bull redis

Basic WebSocket Server with LangChain

Here’s our foundational WebSocket server with LangChain integration:

// @filename: app.js
// src/server.ts
import express from 'express'
import { createServer } from 'http'
import WebSocket from 'ws'
import { ChatOpenAI } from '@langchain/openai'
import { StringOutputParser } from '@langchain/core/output_parsers'
import { ChatPromptTemplate } from '@langchain/core/prompts'
import dotenv from 'dotenv'
import { v4 as uuidv4 } from 'uuid'

dotenv.config()

const app = express()
const server = createServer(app)
const wss = new WebSocket.Server({ server })

// Initialize LangChain with streaming
const model = new ChatOpenAI({
  modelName: 'gpt-3.5-turbo',
  temperature: 0.7,
  streaming: true,
  openAIApiKey: process.env.OPENAI_API_KEY,
})

const outputParser = new StringOutputParser()

// Connection tracking
const connections = new Map<string, WebSocket>()

wss.on('connection', (ws: WebSocket) => {
  const connectionId = uuidv4()
  connections.set(connectionId, ws)

  console.log(`New connection: ${connectionId}`)

  // Send connection established message
  ws.send(
    JSON.stringify({
      type: 'connection',
      connectionId,
      status: 'connected',
    })
  )

  ws.on('message', async (message: string) => {
    try {
      const data = JSON.parse(message.toString())

      if (data.type === 'chat') {
        await handleChatMessage(ws, data)
      } else if (data.type === 'ping') {
        ws.send(JSON.stringify({ type: 'pong' }))
      }
    } catch (error) {
      console.error('Error processing message:', error)
      ws.send(
        JSON.stringify({
          type: 'error',
          error: 'Failed to process message',
        })
      )
    }
  })

  ws.on('close', () => {
    connections.delete(connectionId)
    console.log(`Connection closed: ${connectionId}`)
  })

  ws.on('error', (error) => {
    console.error(`WebSocket error for ${connectionId}:`, error)
  })
})

async function handleChatMessage(ws: WebSocket, data: any) {
  const { message, conversationId } = data

  try {
    // Create prompt template
    const prompt = ChatPromptTemplate.fromTemplate(
      'You are a helpful AI assistant. Respond to the following: {input}'
    )

    // Create streaming chain
    const chain = prompt.pipe(model).pipe(outputParser)

    // Send start of stream
    ws.send(
      JSON.stringify({
        type: 'stream_start',
        conversationId,
        timestamp: new Date().toISOString(),
      })
    )

    // Stream the response
    const stream = await chain.stream({
      input: message,
    })

    for await (const chunk of stream) {
      ws.send(
        JSON.stringify({
          type: 'stream_chunk',
          conversationId,
          content: chunk,
        })
      )
    }

    // Send end of stream
    ws.send(
      JSON.stringify({
        type: 'stream_end',
        conversationId,
        timestamp: new Date().toISOString(),
      })
    )
  } catch (error) {
    console.error('Error in chat handler:', error)
    ws.send(
      JSON.stringify({
        type: 'error',
        conversationId,
        error: 'Failed to generate response',
      })
    )
  }
}

server.listen(3001, () => {
  console.log('WebSocket server running on port 3001')
})

Implementing Server-Sent Events Alternative

For scenarios where WebSockets aren’t suitable, here’s an SSE implementation:

// @filename: server.js
// src/sse-handler.ts
import { Response } from 'express'
import { ChatOpenAI } from '@langchain/openai'
import { StringOutputParser } from '@langchain/core/output_parsers'
import { ChatPromptTemplate } from '@langchain/core/prompts'

export async function handleSSEChat(req: any, res: Response) {
  // Set headers for SSE
  res.setHeader('Content-Type', 'text/event-stream')
  res.setHeader('Cache-Control', 'no-cache')
  res.setHeader('Connection', 'keep-alive')
  res.setHeader('Access-Control-Allow-Origin', '*')

  // Send initial connection event
  res.write(
    `event: connected\ndata: ${JSON.stringify({ status: 'connected' })}\n\n`
  )

  const { message } = req.body

  try {
    const model = new ChatOpenAI({
      modelName: 'gpt-3.5-turbo',
      temperature: 0.7,
      streaming: true,
    })

    const prompt = ChatPromptTemplate.fromTemplate(
      'You are a helpful AI assistant. Respond to: {input}'
    )

    const chain = prompt.pipe(model).pipe(new StringOutputParser())

    // Send stream start event
    res.write(
      `event: stream_start\ndata: ${JSON.stringify({ timestamp: new Date().toISOString() })}\n\n`
    )

    const stream = await chain.stream({ input: message })

    for await (const chunk of stream) {
      // Send each chunk as an SSE event
      res.write(
        `event: stream_chunk\ndata: ${JSON.stringify({ content: chunk })}\n\n`
      )
    }

    // Send stream end event
    res.write(
      `event: stream_end\ndata: ${JSON.stringify({ timestamp: new Date().toISOString() })}\n\n`
    )

    res.end()
  } catch (error) {
    res.write(
      `event: error\ndata: ${JSON.stringify({ error: 'Failed to generate response' })}\n\n`
    )
    res.end()
  }
}

// Add route to Express app
app.post('/api/chat/sse', express.json(), handleSSEChat)

Advanced Connection Management

Robust connection management is crucial for production applications. Let’s implement reconnection logic, heartbeat monitoring, and connection pooling:

// @filename: main.py
// src/connection-manager.ts
import WebSocket from 'ws'
import { EventEmitter } from 'events'
import winston from 'winston'

interface ConnectionInfo {
  id: string
  ws: WebSocket
  userId?: string
  lastActivity: Date
  reconnectAttempts: number
}

export class ConnectionManager extends EventEmitter {
  private connections: Map<string, ConnectionInfo> = new Map()
  private heartbeatInterval: NodeJS.Timer
  private logger: winston.Logger

  constructor() {
    super()

    this.logger = winston.createLogger({
      level: 'info',
      format: winston.format.json(),
      transports: [
        new winston.transports.File({ filename: 'connections.log' }),
        new winston.transports.Console(),
      ],
    })

    // Start heartbeat monitoring
    this.heartbeatInterval = setInterval(() => {
      this.checkHeartbeats()
    }, 30000) // Check every 30 seconds
  }

  addConnection(id: string, ws: WebSocket, userId?: string): void {
    const connectionInfo: ConnectionInfo = {
      id,
      ws,
      userId,
      lastActivity: new Date(),
      reconnectAttempts: 0,
    }

    this.connections.set(id, connectionInfo)

    // Setup ping/pong for connection health
    ws.on('pong', () => {
      const conn = this.connections.get(id)
      if (conn) {
        conn.lastActivity = new Date()
      }
    })

    this.logger.info('Connection added', { id, userId })
    this.emit('connection:added', connectionInfo)
  }

  removeConnection(id: string): void {
    const conn = this.connections.get(id)
    if (conn) {
      this.connections.delete(id)
      this.logger.info('Connection removed', { id, userId: conn.userId })
      this.emit('connection:removed', conn)
    }
  }

  private checkHeartbeats(): void {
    const now = new Date()
    const timeout = 60000 // 60 seconds timeout

    this.connections.forEach((conn, id) => {
      const timeSinceLastActivity = now.getTime() - conn.lastActivity.getTime()

      if (timeSinceLastActivity > timeout) {
        if (conn.ws.readyState === WebSocket.OPEN) {
          // Try to ping the connection
          conn.ws.ping()
        } else {
          // Connection is dead, remove it
          this.removeConnection(id)
        }
      }
    })
  }

  getConnectionsByUserId(userId: string): ConnectionInfo[] {
    return Array.from(this.connections.values()).filter(
      (conn) => conn.userId === userId
    )
  }

  broadcastToUser(userId: string, message: any): void {
    const userConnections = this.getConnectionsByUserId(userId)
    const messageStr = JSON.stringify(message)

    userConnections.forEach((conn) => {
      if (conn.ws.readyState === WebSocket.OPEN) {
        conn.ws.send(messageStr)
      }
    })
  }

  getMetrics() {
    return {
      totalConnections: this.connections.size,
      activeConnections: Array.from(this.connections.values()).filter(
        (conn) => conn.ws.readyState === WebSocket.OPEN
      ).length,
      userCount: new Set(
        Array.from(this.connections.values())
          .map((conn) => conn.userId)
          .filter(Boolean)
      ).size,
    }
  }

  destroy(): void {
    clearInterval(this.heartbeatInterval)
    this.connections.forEach((conn) => {
      if (conn.ws.readyState === WebSocket.OPEN) {
        conn.ws.close()
      }
    })
    this.connections.clear()
  }
}

Queue Management for Concurrent Requests

When building real-time AI applications at scale, managing concurrent requests becomes critical. Let’s implement a robust queue system using Bull and Redis:

// @filename: main.py
// src/queue-manager.ts
import Bull from 'bull'
import Redis from 'redis'
import { ChatOpenAI } from '@langchain/openai'
import { StringOutputParser } from '@langchain/core/output_parsers'
import { ChatPromptTemplate } from '@langchain/core/prompts'

interface ChatJob {
  connectionId: string
  message: string
  conversationId: string
  userId?: string
  priority?: number
}

export class QueueManager {
  private chatQueue: Bull.Queue<ChatJob>
  private redis: Redis.RedisClient
  private connectionManager: ConnectionManager

  constructor(connectionManager: ConnectionManager) {
    this.connectionManager = connectionManager

    // Initialize Redis
    this.redis = Redis.createClient({
      host: process.env.REDIS_HOST || 'localhost',
      port: parseInt(process.env.REDIS_PORT || '6379'),
    })

    // Initialize Bull queue
    this.chatQueue = new Bull<ChatJob>('chat-processing', {
      redis: {
        host: process.env.REDIS_HOST || 'localhost',
        port: parseInt(process.env.REDIS_PORT || '6379'),
      },
      defaultJobOptions: {
        removeOnComplete: true,
        removeOnFail: false,
      },
    })

    // Process jobs
    this.setupQueueProcessing()

    // Monitor queue events
    this.setupQueueMonitoring()
  }

  private setupQueueProcessing(): void {
    // Process with concurrency based on available resources
    const concurrency = parseInt(process.env.QUEUE_CONCURRENCY || '5')

    this.chatQueue.process(concurrency, async (job) => {
      const { connectionId, message, conversationId, userId } = job.data

      try {
        // Update job progress
        await job.progress(10)

        const model = new ChatOpenAI({
          modelName: 'gpt-3.5-turbo',
          temperature: 0.7,
          streaming: true,
        })

        const prompt = ChatPromptTemplate.fromTemplate(
          'You are a helpful AI assistant. Respond to: {input}'
        )

        const chain = prompt.pipe(model).pipe(new StringOutputParser())

        // Get WebSocket connection
        const connections = userId
          ? this.connectionManager.getConnectionsByUserId(userId)
          : []

        if (connections.length === 0) {
          throw new Error('No active connections for user')
        }

        await job.progress(30)

        // Send to all user connections
        connections.forEach((conn) => {
          conn.ws.send(
            JSON.stringify({
              type: 'stream_start',
              conversationId,
              jobId: job.id,
            })
          )
        })

        const stream = await chain.stream({ input: message })
        let chunkCount = 0

        for await (const chunk of stream) {
          connections.forEach((conn) => {
            if (conn.ws.readyState === WebSocket.OPEN) {
              conn.ws.send(
                JSON.stringify({
                  type: 'stream_chunk',
                  conversationId,
                  content: chunk,
                })
              )
            }
          })

          chunkCount++
          if (chunkCount % 10 === 0) {
            await job.progress(30 + Math.min(60, chunkCount))
          }
        }

        // Send completion
        connections.forEach((conn) => {
          conn.ws.send(
            JSON.stringify({
              type: 'stream_end',
              conversationId,
              jobId: job.id,
            })
          )
        })

        await job.progress(100)

        return { success: true, conversationId }
      } catch (error) {
        console.error(`Job ${job.id} failed:`, error)
        throw error
      }
    })
  }

  private setupQueueMonitoring(): void {
    this.chatQueue.on('completed', (job, result) => {
      console.log(`Job ${job.id} completed`, result)
    })

    this.chatQueue.on('failed', (job, err) => {
      console.error(`Job ${job.id} failed`, err)

      // Notify user of failure
      const { userId, conversationId } = job.data
      if (userId) {
        this.connectionManager.broadcastToUser(userId, {
          type: 'error',
          conversationId,
          error: 'Failed to process message',
          jobId: job.id,
        })
      }
    })

    this.chatQueue.on('stalled', (job) => {
      console.warn(`Job ${job.id} stalled`)
    })
  }

  async addChatJob(data: ChatJob): Promise<Bull.Job<ChatJob>> {
    // Add priority queue support
    const options: Bull.JobOptions = {}

    if (data.priority) {
      options.priority = data.priority
    }

    // Rate limiting per user
    if (data.userId) {
      const userKey = `rate:${data.userId}`
      const count = await this.redis.incr(userKey)

      if (count === 1) {
        await this.redis.expire(userKey, 60) // 60 second window
      }

      if (count > 10) {
        // Max 10 requests per minute
        throw new Error('Rate limit exceeded')
      }
    }

    return this.chatQueue.add(data, options)
  }

  async getQueueMetrics() {
    const [waiting, active, completed, failed, delayed] = await Promise.all([
      this.chatQueue.getWaitingCount(),
      this.chatQueue.getActiveCount(),
      this.chatQueue.getCompletedCount(),
      this.chatQueue.getFailedCount(),
      this.chatQueue.getDelayedCount(),
    ])

    return {
      waiting,
      active,
      completed,
      failed,
      delayed,
      total: waiting + active + delayed,
    }
  }

  async clearQueue(): Promise<void> {
    await this.chatQueue.empty()
  }

  async close(): Promise<void> {
    await this.chatQueue.close()
    this.redis.quit()
  }
}

Frontend Integration with React

Now let’s build a React frontend that integrates with our LangChain WebSocket backend:

// @filename: useWebSocket.tsx
// src/hooks/useWebSocket.ts
import { useEffect, useRef, useState, useCallback } from 'react'

interface WebSocketMessage {
  type: string
  [key: string]: any
}

interface UseWebSocketOptions {
  url: string
  reconnect?: boolean
  reconnectInterval?: number
  maxReconnectAttempts?: number
  onMessage?: (message: WebSocketMessage) => void
  onConnect?: () => void
  onDisconnect?: () => void
  onError?: (error: Event) => void
}

export function useWebSocket({
  url,
  reconnect = true,
  reconnectInterval = 5000,
  maxReconnectAttempts = 10,
  onMessage,
  onConnect,
  onDisconnect,
  onError,
}: UseWebSocketOptions) {
  const ws = useRef<WebSocket | null>(null)
  const reconnectCount = useRef(0)
  const reconnectTimeout = useRef<NodeJS.Timeout>()
  const [isConnected, setIsConnected] = useState(false)
  const [connectionId, setConnectionId] = useState<string | null>(null)

  const connect = useCallback(() => {
    try {
      ws.current = new WebSocket(url)

      ws.current.onopen = () => {
        console.log('WebSocket connected')
        setIsConnected(true)
        reconnectCount.current = 0
        onConnect?.()
      }

      ws.current.onmessage = (event) => {
        try {
          const message = JSON.parse(event.data) as WebSocketMessage

          if (message.type === 'connection') {
            setConnectionId(message.connectionId)
          }

          onMessage?.(message)
        } catch (error) {
          console.error('Failed to parse WebSocket message:', error)
        }
      }

      ws.current.onclose = () => {
        console.log('WebSocket disconnected')
        setIsConnected(false)
        setConnectionId(null)
        onDisconnect?.()

        // Attempt reconnection
        if (reconnect && reconnectCount.current < maxReconnectAttempts) {
          reconnectTimeout.current = setTimeout(() => {
            reconnectCount.current++
            console.log(`Reconnecting... Attempt ${reconnectCount.current}`)
            connect()
          }, reconnectInterval)
        }
      }

      ws.current.onerror = (error) => {
        console.error('WebSocket error:', error)
        onError?.(error)
      }
    } catch (error) {
      console.error('Failed to create WebSocket:', error)
    }
  }, [
    url,
    reconnect,
    reconnectInterval,
    maxReconnectAttempts,
    onMessage,
    onConnect,
    onDisconnect,
    onError,
  ])

  const disconnect = useCallback(() => {
    if (reconnectTimeout.current) {
      clearTimeout(reconnectTimeout.current)
    }

    if (ws.current) {
      ws.current.close()
      ws.current = null
    }
  }, [])

  const sendMessage = useCallback((message: WebSocketMessage) => {
    if (ws.current && ws.current.readyState === WebSocket.OPEN) {
      ws.current.send(JSON.stringify(message))
    } else {
      console.error('WebSocket is not connected')
    }
  }, [])

  useEffect(() => {
    connect()

    // Cleanup on unmount
    return () => {
      disconnect()
    }
  }, [connect, disconnect])

  // Heartbeat to maintain connection
  useEffect(() => {
    const heartbeatInterval = setInterval(() => {
      if (isConnected) {
        sendMessage({ type: 'ping' })
      }
    }, 30000) // Ping every 30 seconds

    return () => clearInterval(heartbeatInterval)
  }, [isConnected, sendMessage])

  return {
    isConnected,
    connectionId,
    sendMessage,
    disconnect,
    reconnect: connect,
  }
}
// @filename: RealtimeChat.tsx
// src/components/RealtimeChat.tsx
import React, { useState, useEffect, useRef } from 'react'
import { useWebSocket } from '../hooks/useWebSocket'
import { v4 as uuidv4 } from 'uuid'

interface Message {
  id: string
  content: string
  role: 'user' | 'assistant'
  timestamp: Date
  streaming?: boolean
}

export function RealtimeChat() {
  const [messages, setMessages] = useState<Message[]>([])
  const [input, setInput] = useState('')
  const [isStreaming, setIsStreaming] = useState(false)
  const messagesEndRef = useRef<HTMLDivElement>(null)
  const currentStreamRef = useRef<string>('')
  const conversationIdRef = useRef<string>(uuidv4())

  const { isConnected, sendMessage } = useWebSocket({
    url: 'ws://localhost:3001',
    onMessage: (message) => {
      handleWebSocketMessage(message)
    },
    onConnect: () => {
      console.log('Connected to AI assistant')
    },
    onDisconnect: () => {
      console.log('Disconnected from AI assistant')
    },
  })

  const handleWebSocketMessage = (message: any) => {
    switch (message.type) {
      case 'stream_start':
        setIsStreaming(true)
        currentStreamRef.current = ''

        // Add assistant message placeholder
        setMessages((prev) => [
          ...prev,
          {
            id: message.conversationId,
            content: '',
            role: 'assistant',
            timestamp: new Date(),
            streaming: true,
          },
        ])
        break

      case 'stream_chunk':
        currentStreamRef.current += message.content

        // Update streaming message
        setMessages((prev) =>
          prev.map((msg) =>
            msg.id === message.conversationId
              ? { ...msg, content: currentStreamRef.current }
              : msg
          )
        )
        break

      case 'stream_end':
        setIsStreaming(false)

        // Mark message as complete
        setMessages((prev) =>
          prev.map((msg) =>
            msg.id === message.conversationId
              ? { ...msg, streaming: false }
              : msg
          )
        )
        break

      case 'error':
        setIsStreaming(false)
        console.error('Chat error:', message.error)

        // Add error message
        setMessages((prev) => [
          ...prev,
          {
            id: uuidv4(),
            content: `Error: ${message.error}`,
            role: 'assistant',
            timestamp: new Date(),
            streaming: false,
          },
        ])
        break
    }
  }

  const sendChatMessage = () => {
    if (!input.trim() || !isConnected || isStreaming) return

    const userMessage: Message = {
      id: uuidv4(),
      content: input,
      role: 'user',
      timestamp: new Date(),
    }

    setMessages((prev) => [...prev, userMessage])

    sendMessage({
      type: 'chat',
      message: input,
      conversationId: conversationIdRef.current,
    })

    setInput('')
    conversationIdRef.current = uuidv4() // New ID for next conversation
  }

  useEffect(() => {
    messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' })
  }, [messages])

  return (
    <div className="chat-container">
      <div className="connection-status">
        {isConnected ? (
          <span className="connected">Connected</span>
        ) : (
          <span className="disconnected">Disconnected</span>
        )}
      </div>

      <div className="messages">
        {messages.map((message) => (
          <div key={message.id} className={`message ${message.role}`}>
            <div className="message-content">
              {message.content}
              {message.streaming && <span className="cursor-blink">▊</span>}
            </div>
            <div className="message-time">
              {message.timestamp.toLocaleTimeString()}
            </div>
          </div>
        ))}
        <div ref={messagesEndRef} />
      </div>

      <div className="input-area">
        <input
          type="text"
          value={input}
          onChange={(e) => setInput(e.target.value)}
          onKeyPress={(e) => e.key === 'Enter' && sendChatMessage()}
          placeholder="Type your message..."
          disabled={!isConnected || isStreaming}
        />
        <button
          onClick={sendChatMessage}
          disabled={!isConnected || isStreaming || !input.trim()}
        >
          {isStreaming ? 'Generating...' : 'Send'}
        </button>
      </div>
    </div>
  )
}

Performance Metrics and Monitoring

Monitoring is crucial for maintaining reliable real-time AI applications. Let’s implement comprehensive metrics collection:

// @filename: server.js
// src/metrics-collector.ts
import { EventEmitter } from 'events'
import winston from 'winston'
import { StatsD } from 'node-statsd'

interface MetricEvent {
  name: string
  value: number
  tags?: Record<string, string>
  timestamp: Date
}

export class MetricsCollector extends EventEmitter {
  private statsd: StatsD
  private logger: winston.Logger
  private metrics: Map<string, MetricEvent[]> = new Map()

  constructor() {
    super()

    // Initialize StatsD client
    this.statsd = new StatsD({
      host: process.env.STATSD_HOST || 'localhost',
      port: parseInt(process.env.STATSD_PORT || '8125'),
      prefix: 'realtime_ai.',
    })

    // Initialize logger
    this.logger = winston.createLogger({
      level: 'info',
      format: winston.format.combine(
        winston.format.timestamp(),
        winston.format.json()
      ),
      transports: [
        new winston.transports.File({ filename: 'metrics.log' }),
        new winston.transports.Console(),
      ],
    })

    // Start periodic aggregation
    setInterval(() => this.aggregateMetrics(), 60000) // Every minute
  }

  // WebSocket metrics
  trackConnection(event: 'connect' | 'disconnect', metadata?: any): void {
    this.statsd.increment(`websocket.${event}`)
    this.logger.info(`WebSocket ${event}`, metadata)
  }

  trackMessageLatency(latency: number, messageType: string): void {
    this.statsd.timing('websocket.message.latency', latency, [
      `type:${messageType}`,
    ])

    const metric: MetricEvent = {
      name: 'message_latency',
      value: latency,
      tags: { type: messageType },
      timestamp: new Date(),
    }

    this.addMetric('latency', metric)
  }

  // LangChain streaming metrics
  trackStreamingPerformance(data: {
    conversationId: string
    duration: number
    tokenCount: number
    chunkCount: number
    model: string
  }): void {
    const { duration, tokenCount, chunkCount, model } = data

    // Track various streaming metrics
    this.statsd.timing('langchain.stream.duration', duration, [
      `model:${model}`,
    ])
    this.statsd.gauge('langchain.stream.tokens', tokenCount, [`model:${model}`])
    this.statsd.gauge('langchain.stream.chunks', chunkCount, [`model:${model}`])

    // Calculate tokens per second
    const tokensPerSecond = tokenCount / (duration / 1000)
    this.statsd.gauge('langchain.stream.tokens_per_second', tokensPerSecond, [
      `model:${model}`,
    ])

    this.logger.info('Streaming performance', data)
  }

  // Queue metrics
  trackQueueMetrics(metrics: any): void {
    Object.entries(metrics).forEach(([key, value]) => {
      this.statsd.gauge(`queue.${key}`, value as number)
    })
  }

  // Error tracking
  trackError(error: Error, context: Record<string, any>): void {
    this.statsd.increment('errors.total', [`type:${error.name}`])

    this.logger.error('Application error', {
      error: {
        name: error.name,
        message: error.message,
        stack: error.stack,
      },
      context,
    })
  }

  // Performance metrics
  trackResponseTime(duration: number, endpoint: string): void {
    this.statsd.timing('api.response_time', duration, [`endpoint:${endpoint}`])
  }

  // Memory and resource metrics
  collectSystemMetrics(): void {
    const memoryUsage = process.memoryUsage()

    this.statsd.gauge('system.memory.rss', memoryUsage.rss)
    this.statsd.gauge('system.memory.heap_total', memoryUsage.heapTotal)
    this.statsd.gauge('system.memory.heap_used', memoryUsage.heapUsed)
    this.statsd.gauge('system.memory.external', memoryUsage.external)

    // CPU usage
    const cpuUsage = process.cpuUsage()
    this.statsd.gauge('system.cpu.user', cpuUsage.user)
    this.statsd.gauge('system.cpu.system', cpuUsage.system)
  }

  // Custom metric tracking
  private addMetric(category: string, metric: MetricEvent): void {
    if (!this.metrics.has(category)) {
      this.metrics.set(category, [])
    }

    const categoryMetrics = this.metrics.get(category)!
    categoryMetrics.push(metric)

    // Keep only last hour of metrics
    const oneHourAgo = new Date(Date.now() - 3600000)
    this.metrics.set(
      category,
      categoryMetrics.filter((m) => m.timestamp > oneHourAgo)
    )
  }

  // Aggregate and report metrics
  private aggregateMetrics(): void {
    this.metrics.forEach((metrics, category) => {
      if (metrics.length === 0) return

      const values = metrics.map((m) => m.value)
      const avg = values.reduce((a, b) => a + b, 0) / values.length
      const min = Math.min(...values)
      const max = Math.max(...values)

      this.statsd.gauge(`aggregate.${category}.avg`, avg)
      this.statsd.gauge(`aggregate.${category}.min`, min)
      this.statsd.gauge(`aggregate.${category}.max`, max)

      this.logger.info(`Metric aggregation: ${category}`, {
        count: metrics.length,
        avg,
        min,
        max,
      })
    })

    // Collect system metrics
    this.collectSystemMetrics()
  }

  // Get current metrics snapshot
  getSnapshot(): Record<string, any> {
    const snapshot: Record<string, any> = {}

    this.metrics.forEach((metrics, category) => {
      const values = metrics.map((m) => m.value)
      snapshot[category] = {
        count: metrics.length,
        average:
          values.length > 0
            ? values.reduce((a, b) => a + b, 0) / values.length
            : 0,
        min: values.length > 0 ? Math.min(...values) : 0,
        max: values.length > 0 ? Math.max(...values) : 0,
      }
    })

    return snapshot
  }

  close(): void {
    this.statsd.close()
  }
}

// Integration with main server
const metricsCollector = new MetricsCollector()

// Track WebSocket connections
wss.on('connection', (ws) => {
  const startTime = Date.now()
  metricsCollector.trackConnection('connect')

  ws.on('message', (message) => {
    const latency = Date.now() - startTime
    metricsCollector.trackMessageLatency(latency, 'chat')
  })

  ws.on('close', () => {
    metricsCollector.trackConnection('disconnect')
  })
})

// Expose metrics endpoint
app.get('/metrics', (req, res) => {
  res.json({
    snapshot: metricsCollector.getSnapshot(),
    connections: connectionManager.getMetrics(),
    queue: queueManager.getQueueMetrics(),
  })
})

Scaling Considerations

When building production-ready real-time AI applications, consider these scaling strategies:

Horizontal Scaling with Redis Pub/Sub

// @filename: index.ts
// src/scaled-websocket.ts
import { createAdapter } from '@socket.io/redis-adapter'
import { createClient } from 'redis'

// For multiple server instances
const pubClient = createClient({ host: 'localhost', port: 6379 })
const subClient = pubClient.duplicate()

// Implement cross-server message broadcasting
export class ScaledWebSocketManager {
  private serverId: string = uuidv4()

  async broadcastToAllServers(message: any): Promise<void> {
    // Publish to Redis channel
    await pubClient.publish(
      'websocket:broadcast',
      JSON.stringify({
        serverId: this.serverId,
        message,
        timestamp: new Date().toISOString(),
      })
    )
  }

  setupSubscriptions(): void {
    subClient.subscribe('websocket:broadcast')

    subClient.on('message', (channel, data) => {
      const { serverId, message } = JSON.parse(data)

      // Don't process our own messages
      if (serverId === this.serverId) return

      // Broadcast to local connections
      this.connectionManager.broadcastToAll(message)
    })
  }
}

Load Balancing Strategies

  1. Sticky Sessions: Ensure WebSocket connections stay with the same server
  2. Connection State in Redis: Store connection metadata centrally
  3. Queue-based distribution: Use message queues for work distribution

Resource Optimization

// @filename: index.ts
// Implement connection pooling for LangChain
class LangChainPool {
  private pool: ChatOpenAI[] = []
  private maxSize: number = 10

  async acquire(): Promise<ChatOpenAI> {
    if (this.pool.length > 0) {
      return this.pool.pop()!
    }

    return new ChatOpenAI({
      modelName: 'gpt-3.5-turbo',
      temperature: 0.7,
      streaming: true,
    })
  }

  release(model: ChatOpenAI): void {
    if (this.pool.length < this.maxSize) {
      this.pool.push(model)
    }
  }
}

Deployment Best Practices

When deploying your LangChain WebSocket application:

  1. Use a reverse proxy (Nginx) for WebSocket support
  2. Enable SSL/TLS for secure WebSocket connections (wss://)
  3. Configure proper CORS headers for cross-origin requests
  4. Implement rate limiting to prevent abuse
  5. Set up monitoring with tools like Prometheus and Grafana
  6. Use container orchestration (Kubernetes) for scaling

For detailed deployment guidance, check out our guides on deploying Node.js applications and Kubernetes best practices.

Conclusion

Building real-time AI applications with LangChain streaming and WebSockets opens up exciting possibilities for creating responsive, engaging user experiences. By implementing proper connection management, queue systems, and monitoring, you can build scalable applications that handle thousands of concurrent users.

Key takeaways:

  • Use WebSockets for bidirectional real-time communication
  • Implement robust reconnection logic and heartbeat monitoring
  • Queue concurrent requests to manage load effectively
  • Monitor performance metrics for optimization
  • Plan for horizontal scaling from the start

The combination of LangChain’s streaming capabilities with WebSocket technology enables you to build the next generation of AI-powered applications that feel instantaneous and responsive to users.

Additional Resources

langchain websockets real-time ai streaming
Share:

Continue Reading

Building Your First AI App with LangChain in 30 Minutes

Learn how to build a powerful PDF Q&A chatbot with LangChain in just 30 minutes. This hands-on LangChain quickstart guide shows you exactly how to create, deploy, and share your first AI application.

Read article
langchainaipython

LangChain Agents: Building Autonomous AI Systems

Master the art of building autonomous AI systems with LangChain agents. Learn about ReAct pattern, tool creation, agent executors, and multi-agent architectures for intelligent decision-making.

Read article
langchainagentsai