Skip to content

Real-Time: WebSockets, SSE, and Polling

Build instant, interactive experiences.

Communication Methods

Method Use Case Pros Cons
WebSocket Bidirectional real-time Full duplex, low latency Complex, stateful
SSE Server-to-client streaming Simple, auto-reconnect One-way only
Polling Simple updates Easy to implement Inefficient, latency
Long Polling Near real-time Works everywhere Resource intensive

WebSockets

FastAPI WebSocket Server

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, Set
import json

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: Dict[str, Set[WebSocket]] = {}

    async def connect(self, websocket: WebSocket, room: str):
        await websocket.accept()
        if room not in self.active_connections:
            self.active_connections[room] = set()
        self.active_connections[room].add(websocket)

    def disconnect(self, websocket: WebSocket, room: str):
        self.active_connections[room].discard(websocket)

    async def broadcast(self, room: str, message: dict):
        if room in self.active_connections:
            for connection in self.active_connections[room]:
                await connection.send_json(message)

manager = ConnectionManager()

@app.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
    await manager.connect(websocket, room)
    try:
        while True:
            data = await websocket.receive_json()
            # Process message
            await manager.broadcast(room, {
                "type": "message",
                "data": data,
            })
    except WebSocketDisconnect:
        manager.disconnect(websocket, room)
        await manager.broadcast(room, {
            "type": "user_left",
            "room": room,
        })

React WebSocket Hook

import { useEffect, useRef, useState, useCallback } from 'react';

interface UseWebSocketOptions {
  onMessage?: (data: any) => void;
  onOpen?: () => void;
  onClose?: () => void;
  onError?: (error: Event) => void;
  reconnect?: boolean;
  reconnectInterval?: number;
}

export function useWebSocket(url: string, options: UseWebSocketOptions = {}) {
  const [isConnected, setIsConnected] = useState(false);
  const [lastMessage, setLastMessage] = useState<any>(null);
  const wsRef = useRef<WebSocket | null>(null);
  const reconnectTimeoutRef = useRef<NodeJS.Timeout>();

  const connect = useCallback(() => {
    const ws = new WebSocket(url);

    ws.onopen = () => {
      setIsConnected(true);
      options.onOpen?.();
    };

    ws.onclose = () => {
      setIsConnected(false);
      options.onClose?.();

      if (options.reconnect !== false) {
        reconnectTimeoutRef.current = setTimeout(
          connect,
          options.reconnectInterval ?? 3000
        );
      }
    };

    ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      setLastMessage(data);
      options.onMessage?.(data);
    };

    ws.onerror = (error) => {
      options.onError?.(error);
    };

    wsRef.current = ws;
  }, [url, options]);

  useEffect(() => {
    connect();
    return () => {
      clearTimeout(reconnectTimeoutRef.current);
      wsRef.current?.close();
    };
  }, [connect]);

  const sendMessage = useCallback((data: any) => {
    if (wsRef.current?.readyState === WebSocket.OPEN) {
      wsRef.current.send(JSON.stringify(data));
    }
  }, []);

  return { isConnected, lastMessage, sendMessage };
}

// Usage
function Chat({ roomId }: { roomId: string }) {
  const [messages, setMessages] = useState<Message[]>([]);

  const { isConnected, sendMessage } = useWebSocket(
    `wss://api.example.com/ws/${roomId}`,
    {
      onMessage: (data) => {
        if (data.type === 'message') {
          setMessages(prev => [...prev, data.data]);
        }
      },
    }
  );

  return (
    <div>
      <div>Status: {isConnected ? 'Connected' : 'Disconnected'}</div>
      {/* ... */}
    </div>
  );
}

Server-Sent Events (SSE)

FastAPI SSE Endpoint

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()

async def event_generator(user_id: str):
    """Generate events for a specific user."""
    while True:
        # Check for new notifications
        notifications = await get_pending_notifications(user_id)

        for notification in notifications:
            yield f"data: {json.dumps(notification)}\n\n"

        # Keep-alive
        yield ": keepalive\n\n"

        await asyncio.sleep(1)

@app.get("/events/{user_id}")
async def sse_endpoint(user_id: str):
    return StreamingResponse(
        event_generator(user_id),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        },
    )

React SSE Hook

import { useEffect, useState } from 'react';

export function useSSE<T>(url: string) {
  const [data, setData] = useState<T | null>(null);
  const [error, setError] = useState<Error | null>(null);
  const [isConnected, setIsConnected] = useState(false);

  useEffect(() => {
    const eventSource = new EventSource(url);

    eventSource.onopen = () => {
      setIsConnected(true);
    };

    eventSource.onmessage = (event) => {
      try {
        const parsed = JSON.parse(event.data);
        setData(parsed);
      } catch (e) {
        console.error('Failed to parse SSE data', e);
      }
    };

    eventSource.onerror = () => {
      setError(new Error('SSE connection failed'));
      setIsConnected(false);
    };

    return () => {
      eventSource.close();
    };
  }, [url]);

  return { data, error, isConnected };
}

// Usage
function Notifications() {
  const { data: notification } = useSSE<Notification>(
    '/api/events/notifications'
  );

  useEffect(() => {
    if (notification) {
      toast.info(notification.message);
    }
  }, [notification]);

  return null;
}

Polling

Smart Polling

import { useEffect, useRef, useCallback } from 'react';

interface UsePollingOptions {
  interval: number;
  enabled?: boolean;
  onError?: (error: Error) => void;
}

export function usePolling<T>(
  fetcher: () => Promise<T>,
  options: UsePollingOptions
) {
  const { interval, enabled = true, onError } = options;
  const [data, setData] = useState<T | null>(null);
  const [isLoading, setIsLoading] = useState(true);
  const timeoutRef = useRef<NodeJS.Timeout>();

  const poll = useCallback(async () => {
    try {
      const result = await fetcher();
      setData(result);
      setIsLoading(false);
    } catch (error) {
      onError?.(error as Error);
    }

    if (enabled) {
      timeoutRef.current = setTimeout(poll, interval);
    }
  }, [fetcher, interval, enabled, onError]);

  useEffect(() => {
    if (enabled) {
      poll();
    }
    return () => clearTimeout(timeoutRef.current);
  }, [poll, enabled]);

  return { data, isLoading };
}

// Usage with exponential backoff on error
function usePollingWithBackoff<T>(fetcher: () => Promise<T>) {
  const [interval, setInterval] = useState(1000);

  return usePolling(fetcher, {
    interval,
    onError: () => {
      setInterval(prev => Math.min(prev * 2, 30000)); // Max 30s
    },
  });
}

Presence and Typing Indicators

Backend

from datetime import datetime, timedelta
import asyncio

class PresenceManager:
    def __init__(self, redis):
        self.redis = redis
        self.timeout = 30  # seconds

    async def set_online(self, user_id: str, room_id: str):
        key = f"presence:{room_id}"
        await self.redis.hset(key, user_id, datetime.utcnow().isoformat())
        await self.redis.expire(key, self.timeout * 2)

    async def set_typing(self, user_id: str, room_id: str):
        key = f"typing:{room_id}"
        await self.redis.setex(f"{key}:{user_id}", 3, "1")

    async def get_online_users(self, room_id: str) -> list[str]:
        key = f"presence:{room_id}"
        users = await self.redis.hgetall(key)
        cutoff = datetime.utcnow() - timedelta(seconds=self.timeout)

        online = []
        for user_id, timestamp in users.items():
            if datetime.fromisoformat(timestamp) > cutoff:
                online.append(user_id)

        return online

    async def get_typing_users(self, room_id: str) -> list[str]:
        pattern = f"typing:{room_id}:*"
        keys = await self.redis.keys(pattern)
        return [k.split(":")[-1] for k in keys]

Frontend

function ChatRoom({ roomId }: { roomId: string }) {
  const [typingUsers, setTypingUsers] = useState<string[]>([]);

  const { sendMessage } = useWebSocket(`/ws/${roomId}`, {
    onMessage: (data) => {
      if (data.type === 'typing') {
        setTypingUsers(data.users);
      }
    },
  });

  // Debounced typing indicator
  const sendTyping = useMemo(
    () =>
      debounce(() => {
        sendMessage({ type: 'typing' });
      }, 300),
    [sendMessage]
  );

  return (
    <div>
      {typingUsers.length > 0 && (
        <div className="typing-indicator">
          {typingUsers.join(', ')} {typingUsers.length === 1 ? 'is' : 'are'} typing...
        </div>
      )}
      <input onChange={() => sendTyping()} />
    </div>
  );
}

Scaling WebSockets

Redis Pub/Sub for Multiple Servers

import aioredis
from fastapi import FastAPI, WebSocket

app = FastAPI()
redis = aioredis.from_url("redis://localhost")

class PubSubManager:
    def __init__(self):
        self.local_connections: Dict[str, Set[WebSocket]] = {}

    async def subscribe(self, channel: str, websocket: WebSocket):
        # Local tracking
        if channel not in self.local_connections:
            self.local_connections[channel] = set()
            # Start Redis subscriber for this channel
            asyncio.create_task(self._redis_subscriber(channel))

        self.local_connections[channel].add(websocket)

    async def publish(self, channel: str, message: dict):
        # Publish to Redis (reaches all servers)
        await redis.publish(channel, json.dumps(message))

    async def _redis_subscriber(self, channel: str):
        pubsub = redis.pubsub()
        await pubsub.subscribe(channel)

        async for message in pubsub.listen():
            if message["type"] == "message":
                data = json.loads(message["data"])
                # Broadcast to local connections
                for ws in self.local_connections.get(channel, []):
                    await ws.send_json(data)

manager = PubSubManager()

@app.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
    await websocket.accept()
    await manager.subscribe(f"room:{room}", websocket)

    try:
        while True:
            data = await websocket.receive_json()
            await manager.publish(f"room:{room}", data)
    except WebSocketDisconnect:
        manager.unsubscribe(f"room:{room}", websocket)

Best Practices

Practice Benefit
Use SSE for one-way Simpler than WebSocket
Implement reconnection Handle network issues
Add heartbeats Detect dead connections
Use Redis pub/sub Scale horizontally
Debounce typing Reduce message volume
Authenticate connections Security
Handle backpressure Prevent memory issues