Real-Time: WebSockets, SSE, and Polling
Build instant, interactive experiences.
Communication Methods
| Method |
Use Case |
Pros |
Cons |
| WebSocket |
Bidirectional real-time |
Full duplex, low latency |
Complex, stateful |
| SSE |
Server-to-client streaming |
Simple, auto-reconnect |
One-way only |
| Polling |
Simple updates |
Easy to implement |
Inefficient, latency |
| Long Polling |
Near real-time |
Works everywhere |
Resource intensive |
WebSockets
FastAPI WebSocket Server
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, Set
import json
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, Set[WebSocket]] = {}
async def connect(self, websocket: WebSocket, room: str):
await websocket.accept()
if room not in self.active_connections:
self.active_connections[room] = set()
self.active_connections[room].add(websocket)
def disconnect(self, websocket: WebSocket, room: str):
self.active_connections[room].discard(websocket)
async def broadcast(self, room: str, message: dict):
if room in self.active_connections:
for connection in self.active_connections[room]:
await connection.send_json(message)
manager = ConnectionManager()
@app.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
await manager.connect(websocket, room)
try:
while True:
data = await websocket.receive_json()
# Process message
await manager.broadcast(room, {
"type": "message",
"data": data,
})
except WebSocketDisconnect:
manager.disconnect(websocket, room)
await manager.broadcast(room, {
"type": "user_left",
"room": room,
})
React WebSocket Hook
import { useEffect, useRef, useState, useCallback } from 'react';
interface UseWebSocketOptions {
onMessage?: (data: any) => void;
onOpen?: () => void;
onClose?: () => void;
onError?: (error: Event) => void;
reconnect?: boolean;
reconnectInterval?: number;
}
export function useWebSocket(url: string, options: UseWebSocketOptions = {}) {
const [isConnected, setIsConnected] = useState(false);
const [lastMessage, setLastMessage] = useState<any>(null);
const wsRef = useRef<WebSocket | null>(null);
const reconnectTimeoutRef = useRef<NodeJS.Timeout>();
const connect = useCallback(() => {
const ws = new WebSocket(url);
ws.onopen = () => {
setIsConnected(true);
options.onOpen?.();
};
ws.onclose = () => {
setIsConnected(false);
options.onClose?.();
if (options.reconnect !== false) {
reconnectTimeoutRef.current = setTimeout(
connect,
options.reconnectInterval ?? 3000
);
}
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
setLastMessage(data);
options.onMessage?.(data);
};
ws.onerror = (error) => {
options.onError?.(error);
};
wsRef.current = ws;
}, [url, options]);
useEffect(() => {
connect();
return () => {
clearTimeout(reconnectTimeoutRef.current);
wsRef.current?.close();
};
}, [connect]);
const sendMessage = useCallback((data: any) => {
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send(JSON.stringify(data));
}
}, []);
return { isConnected, lastMessage, sendMessage };
}
// Usage
function Chat({ roomId }: { roomId: string }) {
const [messages, setMessages] = useState<Message[]>([]);
const { isConnected, sendMessage } = useWebSocket(
`wss://api.example.com/ws/${roomId}`,
{
onMessage: (data) => {
if (data.type === 'message') {
setMessages(prev => [...prev, data.data]);
}
},
}
);
return (
<div>
<div>Status: {isConnected ? 'Connected' : 'Disconnected'}</div>
{/* ... */}
</div>
);
}
Server-Sent Events (SSE)
FastAPI SSE Endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
async def event_generator(user_id: str):
"""Generate events for a specific user."""
while True:
# Check for new notifications
notifications = await get_pending_notifications(user_id)
for notification in notifications:
yield f"data: {json.dumps(notification)}\n\n"
# Keep-alive
yield ": keepalive\n\n"
await asyncio.sleep(1)
@app.get("/events/{user_id}")
async def sse_endpoint(user_id: str):
return StreamingResponse(
event_generator(user_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)
React SSE Hook
import { useEffect, useState } from 'react';
export function useSSE<T>(url: string) {
const [data, setData] = useState<T | null>(null);
const [error, setError] = useState<Error | null>(null);
const [isConnected, setIsConnected] = useState(false);
useEffect(() => {
const eventSource = new EventSource(url);
eventSource.onopen = () => {
setIsConnected(true);
};
eventSource.onmessage = (event) => {
try {
const parsed = JSON.parse(event.data);
setData(parsed);
} catch (e) {
console.error('Failed to parse SSE data', e);
}
};
eventSource.onerror = () => {
setError(new Error('SSE connection failed'));
setIsConnected(false);
};
return () => {
eventSource.close();
};
}, [url]);
return { data, error, isConnected };
}
// Usage
function Notifications() {
const { data: notification } = useSSE<Notification>(
'/api/events/notifications'
);
useEffect(() => {
if (notification) {
toast.info(notification.message);
}
}, [notification]);
return null;
}
Polling
Smart Polling
import { useEffect, useRef, useCallback } from 'react';
interface UsePollingOptions {
interval: number;
enabled?: boolean;
onError?: (error: Error) => void;
}
export function usePolling<T>(
fetcher: () => Promise<T>,
options: UsePollingOptions
) {
const { interval, enabled = true, onError } = options;
const [data, setData] = useState<T | null>(null);
const [isLoading, setIsLoading] = useState(true);
const timeoutRef = useRef<NodeJS.Timeout>();
const poll = useCallback(async () => {
try {
const result = await fetcher();
setData(result);
setIsLoading(false);
} catch (error) {
onError?.(error as Error);
}
if (enabled) {
timeoutRef.current = setTimeout(poll, interval);
}
}, [fetcher, interval, enabled, onError]);
useEffect(() => {
if (enabled) {
poll();
}
return () => clearTimeout(timeoutRef.current);
}, [poll, enabled]);
return { data, isLoading };
}
// Usage with exponential backoff on error
function usePollingWithBackoff<T>(fetcher: () => Promise<T>) {
const [interval, setInterval] = useState(1000);
return usePolling(fetcher, {
interval,
onError: () => {
setInterval(prev => Math.min(prev * 2, 30000)); // Max 30s
},
});
}
Presence and Typing Indicators
Backend
from datetime import datetime, timedelta
import asyncio
class PresenceManager:
def __init__(self, redis):
self.redis = redis
self.timeout = 30 # seconds
async def set_online(self, user_id: str, room_id: str):
key = f"presence:{room_id}"
await self.redis.hset(key, user_id, datetime.utcnow().isoformat())
await self.redis.expire(key, self.timeout * 2)
async def set_typing(self, user_id: str, room_id: str):
key = f"typing:{room_id}"
await self.redis.setex(f"{key}:{user_id}", 3, "1")
async def get_online_users(self, room_id: str) -> list[str]:
key = f"presence:{room_id}"
users = await self.redis.hgetall(key)
cutoff = datetime.utcnow() - timedelta(seconds=self.timeout)
online = []
for user_id, timestamp in users.items():
if datetime.fromisoformat(timestamp) > cutoff:
online.append(user_id)
return online
async def get_typing_users(self, room_id: str) -> list[str]:
pattern = f"typing:{room_id}:*"
keys = await self.redis.keys(pattern)
return [k.split(":")[-1] for k in keys]
Frontend
function ChatRoom({ roomId }: { roomId: string }) {
const [typingUsers, setTypingUsers] = useState<string[]>([]);
const { sendMessage } = useWebSocket(`/ws/${roomId}`, {
onMessage: (data) => {
if (data.type === 'typing') {
setTypingUsers(data.users);
}
},
});
// Debounced typing indicator
const sendTyping = useMemo(
() =>
debounce(() => {
sendMessage({ type: 'typing' });
}, 300),
[sendMessage]
);
return (
<div>
{typingUsers.length > 0 && (
<div className="typing-indicator">
{typingUsers.join(', ')} {typingUsers.length === 1 ? 'is' : 'are'} typing...
</div>
)}
<input onChange={() => sendTyping()} />
</div>
);
}
Scaling WebSockets
Redis Pub/Sub for Multiple Servers
import aioredis
from fastapi import FastAPI, WebSocket
app = FastAPI()
redis = aioredis.from_url("redis://localhost")
class PubSubManager:
def __init__(self):
self.local_connections: Dict[str, Set[WebSocket]] = {}
async def subscribe(self, channel: str, websocket: WebSocket):
# Local tracking
if channel not in self.local_connections:
self.local_connections[channel] = set()
# Start Redis subscriber for this channel
asyncio.create_task(self._redis_subscriber(channel))
self.local_connections[channel].add(websocket)
async def publish(self, channel: str, message: dict):
# Publish to Redis (reaches all servers)
await redis.publish(channel, json.dumps(message))
async def _redis_subscriber(self, channel: str):
pubsub = redis.pubsub()
await pubsub.subscribe(channel)
async for message in pubsub.listen():
if message["type"] == "message":
data = json.loads(message["data"])
# Broadcast to local connections
for ws in self.local_connections.get(channel, []):
await ws.send_json(data)
manager = PubSubManager()
@app.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
await websocket.accept()
await manager.subscribe(f"room:{room}", websocket)
try:
while True:
data = await websocket.receive_json()
await manager.publish(f"room:{room}", data)
except WebSocketDisconnect:
manager.unsubscribe(f"room:{room}", websocket)
Best Practices
| Practice |
Benefit |
| Use SSE for one-way |
Simpler than WebSocket |
| Implement reconnection |
Handle network issues |
| Add heartbeats |
Detect dead connections |
| Use Redis pub/sub |
Scale horizontally |
| Debounce typing |
Reduce message volume |
| Authenticate connections |
Security |
| Handle backpressure |
Prevent memory issues |