Skip to content

WebSockets and Real-Time Features

🎯 What You’ll Learn

  • What WebSockets are and how they differ from HTTP
  • How to create a WebSocket endpoint in FastAPI
  • How to broadcast real-time task updates to connected clients
  • Best practices for managing connections and scaling real-time features

🌐 Step 1: WebSocket Basics

🧠 What is a WebSocket?

  • HTTP is request/response: the client asks, the server replies, then the connection closes.
  • WebSockets are persistent, two-way connections: once established, both client and server can send messages at any time.

This makes WebSockets ideal for:

  • Chat applications
  • Live notifications
  • Real-time dashboards
  • Task updates in our Task Manager

📄 Example: Simple WebSocket Echo

from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws/echo")
async def websocket_echo(websocket: WebSocket):
    await websocket.accept()
    while True:
        data = await websocket.receive_text()
        await websocket.send_text(f"Echo: {data}")

🔍 What’s happening?

  • @app.websocket("/ws/echo") defines a WebSocket endpoint.
  • websocket.accept() establishes the connection.
  • The server listens for messages (receive_text) and responds (send_text).

⚡ Step 2: Real-Time Task Updates

Let’s extend this to our Task Manager. When a task is created or updated, we want connected clients to receive live updates.

📄 Managing Connections

We’ll keep track of connected clients in memory.

Create the following file.

📄 routers/ws.py

from fastapi import APIRouter, WebSocket
from typing import List

router = APIRouter(prefix="/ws", tags=["WebSockets"])

active_connections: List[WebSocket] = []

async def broadcast(message: str):
    for connection in active_connections:
        await connection.send_text(message)

@router.websocket("/tasks")
async def task_updates(websocket: WebSocket):
    await websocket.accept()
    active_connections.append(websocket)
    try:
        while True:
            await websocket.receive_text()  # keep alive
    except:
        active_connections.remove(websocket)

🔍 What’s happening?

  • active_connections stores all connected clients.
  • broadcast() sends a message to every client.
  • Each client stays connected until it disconnects.

📄 Broadcasting Task Events

Now, in your task router (routers/tasks.py), you can notify clients when tasks change:

from fastapi import APIRouter, Depends
from sqlmodel import Session
from db import get_session
from models.task import Task
from models.user import User
from services import task_service
from core.dependencies import require_active_user, get_current_user

# import broadcast
from routers.ws import broadcast

# ...

@router.post("/", status_code=201, dependencies=[Depends(require_active_user)])
async def create(task: Task, session: Session = Depends(get_session), user: User = Depends(get_current_user)):
    await broadcast(f"Task created: {task.title}")
    return task_service.create_task(task, session, user)

# ...

@router.put("/{task_id}", dependencies=[Depends(require_active_user)])
async def update(task_id: int, updated: Task, session: Session = Depends(get_session), user: User = Depends(get_current_user)):
    await broadcast(f"Task {task_id} updated")
    return task_service.update_task(task_id, updated, session, user)

🧪 Step 3: Testing Real-Time Updates

  1. Open a WebSocket client (e.g. browser, Postman, or websocat) and connect to:

    ws://localhost:8000/ws/tasks
    
  2. Create or update tasks via the REST API.

  3. Watch the WebSocket client receive live updates instantly.

🧠 Best Practices

  • Connection management: Always remove disconnected clients to avoid memory leaks.
  • Authentication: Secure WebSocket endpoints with JWT tokens (e.g. pass token in query params or headers).
  • Scaling: For production, use a message broker (Redis, Kafka) to broadcast across multiple server instances.
  • Error handling: Wrap broadcast() calls in try/except to handle broken connections gracefully.

🧪 Practice Challenge

  1. Add authentication to the WebSocket endpoint so only logged-in users can connect.
  2. Extend broadcast() to send structured JSON messages (e.g. { "event": "task_created", "task": {...} }).
  3. Build a simple frontend that listens to /ws/tasks and updates the task list live.

🧠 Recap

You now know how to:

  • Create WebSocket endpoints in FastAPI
  • Manage active connections and broadcast messages
  • Send real-time task updates to clients