WebSockets and Real-Time Features¶
🎯 What You’ll Learn¶
- What WebSockets are and how they differ from HTTP
- How to create a WebSocket endpoint in FastAPI
- How to broadcast real-time task updates to connected clients
- Best practices for managing connections and scaling real-time features
🌐 Step 1: WebSocket Basics¶
🧠 What is a WebSocket?¶
- HTTP is request/response: the client asks, the server replies, then the connection closes.
- WebSockets are persistent, two-way connections: once established, both client and server can send messages at any time.
This makes WebSockets ideal for:
- Chat applications
- Live notifications
- Real-time dashboards
- Task updates in our Task Manager
📄 Example: Simple WebSocket Echo¶
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws/echo")
async def websocket_echo(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Echo: {data}")
🔍 What’s happening?¶
@app.websocket("/ws/echo")defines a WebSocket endpoint.websocket.accept()establishes the connection.- The server listens for messages (
receive_text) and responds (send_text).
⚡ Step 2: Real-Time Task Updates¶
Let’s extend this to our Task Manager. When a task is created or updated, we want connected clients to receive live updates.
📄 Managing Connections¶
We’ll keep track of connected clients in memory:
from fastapi import APIRouter, WebSocket
from typing import List
router = APIRouter(prefix="/ws", tags=["WebSockets"])
active_connections: List[WebSocket] = []
async def broadcast(message: str):
for connection in active_connections:
await connection.send_text(message)
@router.websocket("/tasks")
async def task_updates(websocket: WebSocket):
await websocket.accept()
active_connections.append(websocket)
try:
while True:
await websocket.receive_text() # keep alive
except:
active_connections.remove(websocket)
🔍 What’s happening?¶
active_connectionsstores all connected clients.broadcast()sends a message to every client.- Each client stays connected until it disconnects.
📄 Broadcasting Task Events¶
Now, in your task router (routers/tasks.py), you can notify clients when tasks change:
from fastapi import APIRouter, Depends, HTTPException
from sqlmodel import Session, select
from db import get_session
from models.task import Task
from models.user import User
from core.dependencies import get_current_user
from routers.ws import broadcast
router = APIRouter(prefix="/tasks", tags=["Tasks"])
@router.post("/", status_code=201)
async def create_task(task: Task, session: Session = Depends(get_session), user: User = Depends(get_current_user)):
task.user_id = user.id
session.add(task)
session.commit()
session.refresh(task)
await broadcast(f"Task created: {task.title}")
return task
@router.patch("/{task_id}")
async def update_task(task_id: int, task: Task, session: Session = Depends(get_session), user: User = Depends(get_current_user)):
db_task = session.get(Task, task_id)
if not db_task or db_task.user_id != user.id:
raise HTTPException(status_code=404, detail="Task not found")
db_task.title = task.title or db_task.title
db_task.completed = task.completed or db_task.completed
session.add(db_task)
session.commit()
session.refresh(db_task)
await broadcast(f"Task updated: {db_task.title}")
return db_task
🧪 Step 3: Testing Real-Time Updates¶
-
Open a WebSocket client (e.g. browser, Postman, or
websocat) and connect to:ws://localhost:8000/ws/tasks -
Create or update tasks via the REST API.
- Watch the WebSocket client receive live updates instantly.
🧠 Best Practices¶
- Connection management: Always remove disconnected clients to avoid memory leaks.
- Authentication: Secure WebSocket endpoints with JWT tokens (e.g. pass token in query params or headers).
- Scaling: For production, use a message broker (Redis, Kafka) to broadcast across multiple server instances.
- Error handling: Wrap
broadcast()calls in try/except to handle broken connections gracefully.
🧪 Practice Challenge¶
- Add authentication to the WebSocket endpoint so only logged-in users can connect.
- Extend
broadcast()to send structured JSON messages (e.g.{ "event": "task_created", "task": {...} }). - Build a simple frontend that listens to
/ws/tasksand updates the task list live.
🧠 Recap¶
You now know how to:
- Create WebSocket endpoints in FastAPI
- Manage active connections and broadcast messages
- Send real-time task updates to clients