Skip to content

WebSockets and Real-Time Features

🎯 What You’ll Learn

  • What WebSockets are and how they differ from HTTP
  • How to create a WebSocket endpoint in FastAPI
  • How to broadcast real-time task updates to connected clients
  • Best practices for managing connections and scaling real-time features

🌐 Step 1: WebSocket Basics

🧠 What is a WebSocket?

  • HTTP is request/response: the client asks, the server replies, then the connection closes.
  • WebSockets are persistent, two-way connections: once established, both client and server can send messages at any time.

This makes WebSockets ideal for:

  • Chat applications
  • Live notifications
  • Real-time dashboards
  • Task updates in our Task Manager

📄 Example: Simple WebSocket Echo

from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws/echo")
async def websocket_echo(websocket: WebSocket):
    await websocket.accept()
    while True:
        data = await websocket.receive_text()
        await websocket.send_text(f"Echo: {data}")

🔍 What’s happening?

  • @app.websocket("/ws/echo") defines a WebSocket endpoint.
  • websocket.accept() establishes the connection.
  • The server listens for messages (receive_text) and responds (send_text).

⚡ Step 2: Real-Time Task Updates

Let’s extend this to our Task Manager. When a task is created or updated, we want connected clients to receive live updates.

📄 Managing Connections

We’ll keep track of connected clients in memory:

from fastapi import APIRouter, WebSocket
from typing import List

router = APIRouter(prefix="/ws", tags=["WebSockets"])

active_connections: List[WebSocket] = []

async def broadcast(message: str):
    for connection in active_connections:
        await connection.send_text(message)

@router.websocket("/tasks")
async def task_updates(websocket: WebSocket):
    await websocket.accept()
    active_connections.append(websocket)
    try:
        while True:
            await websocket.receive_text()  # keep alive
    except:
        active_connections.remove(websocket)

🔍 What’s happening?

  • active_connections stores all connected clients.
  • broadcast() sends a message to every client.
  • Each client stays connected until it disconnects.

📄 Broadcasting Task Events

Now, in your task router (routers/tasks.py), you can notify clients when tasks change:

from fastapi import APIRouter, Depends, HTTPException
from sqlmodel import Session, select
from db import get_session
from models.task import Task
from models.user import User
from core.dependencies import get_current_user
from routers.ws import broadcast

router = APIRouter(prefix="/tasks", tags=["Tasks"])

@router.post("/", status_code=201)
async def create_task(task: Task, session: Session = Depends(get_session), user: User = Depends(get_current_user)):
    task.user_id = user.id
    session.add(task)
    session.commit()
    session.refresh(task)
    await broadcast(f"Task created: {task.title}")
    return task

@router.patch("/{task_id}")
async def update_task(task_id: int, task: Task, session: Session = Depends(get_session), user: User = Depends(get_current_user)):
    db_task = session.get(Task, task_id)
    if not db_task or db_task.user_id != user.id:
        raise HTTPException(status_code=404, detail="Task not found")
    db_task.title = task.title or db_task.title
    db_task.completed = task.completed or db_task.completed
    session.add(db_task)
    session.commit()
    session.refresh(db_task)
    await broadcast(f"Task updated: {db_task.title}")
    return db_task

🧪 Step 3: Testing Real-Time Updates

  1. Open a WebSocket client (e.g. browser, Postman, or websocat) and connect to:

    ws://localhost:8000/ws/tasks
    
  2. Create or update tasks via the REST API.

  3. Watch the WebSocket client receive live updates instantly.

🧠 Best Practices

  • Connection management: Always remove disconnected clients to avoid memory leaks.
  • Authentication: Secure WebSocket endpoints with JWT tokens (e.g. pass token in query params or headers).
  • Scaling: For production, use a message broker (Redis, Kafka) to broadcast across multiple server instances.
  • Error handling: Wrap broadcast() calls in try/except to handle broken connections gracefully.

🧪 Practice Challenge

  1. Add authentication to the WebSocket endpoint so only logged-in users can connect.
  2. Extend broadcast() to send structured JSON messages (e.g. { "event": "task_created", "task": {...} }).
  3. Build a simple frontend that listens to /ws/tasks and updates the task list live.

🧠 Recap

You now know how to:

  • Create WebSocket endpoints in FastAPI
  • Manage active connections and broadcast messages
  • Send real-time task updates to clients