Skill

FastAPI Production


name: fastapi-production-skill description: Production-ready FastAPI application patterns for enterprise APIs. Use when building FastAPI applications with (1) JWT authentication, (2) background task processing, (3) WebSocket real-time updates, (4) structured logging for observability, (5) health check endpoints, (6) rate limiting, (7) error handling, or (8) OpenAPI documentation. Triggers on FastAPI, async Python API, authentication, WebSocket backend.

FastAPI Production Patterns

Project Structure

app/
├── __init__.py
├── main.py              # FastAPI app entry
├── config.py            # Settings from env/secrets
├── dependencies.py      # Dependency injection
├── api/
│   ├── __init__.py
│   ├── health.py
│   ├── auth.py
│   └── v1/              # Versioned endpoints
├── models/              # Pydantic schemas
├── services/            # Business logic
├── db/
│   ├── session.py
│   └── repositories/
└── middleware/
    ├── logging.py
    └── rate_limit.py

Application Factory

from fastapi import FastAPI
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    await init_db_pool()
    await init_redis()
    yield
    # Shutdown
    await close_db_pool()
    await close_redis()

def create_app() -> FastAPI:
    app = FastAPI(
        title="ProYaro API",
        version="3.1.0",
        lifespan=lifespan,
        docs_url="/api/docs",
        redoc_url="/api/redoc",
    )
    app.include_router(health_router)
    app.include_router(auth_router, prefix="/api/v1")
    app.include_router(jobs_router, prefix="/api/v1")
    return app

Settings from Secrets

from pydantic_settings import BaseSettings
from functools import lru_cache
from pathlib import Path

class Settings(BaseSettings):
    # From environment
    debug: bool = False
    api_host: str = "0.0.0.0"
    api_port: int = 8000
    
    # From Docker secrets (file-based)
    @property
    def db_password(self) -> str:
        return self._read_secret("postgres_password")
    
    @property
    def jwt_secret(self) -> str:
        return self._read_secret("jwt_secret")
    
    def _read_secret(self, name: str) -> str:
        path = Path(f"/run/secrets/{name}")
        if path.exists():
            return path.read_text().strip()
        raise ValueError(f"Secret {name} not found")

@lru_cache
def get_settings() -> Settings:
    return Settings()

JWT Authentication

from datetime import datetime, timedelta
from jose import jwt, JWTError
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()

def create_access_token(sub: str, expires_delta: timedelta = timedelta(minutes=15)) -> str:
    expire = datetime.utcnow() + expires_delta
    return jwt.encode(
        {"sub": sub, "exp": expire, "type": "access"},
        get_settings().jwt_secret,
        algorithm="HS256"
    )

async def get_current_user(token: str = Depends(security)) -> User:
    try:
        payload = jwt.decode(token.credentials, get_settings().jwt_secret, algorithms=["HS256"])
        if payload.get("type") != "access":
            raise HTTPException(status_code=401, detail="Invalid token type")
        user = await get_user_by_id(payload["sub"])
        if not user:
            raise HTTPException(status_code=401, detail="User not found")
        return user
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid token")

Health Checks

from fastapi import APIRouter, Response
from enum import Enum

class HealthStatus(str, Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"

@router.get("/health")
async def health_check(response: Response):
    checks = {
        "database": await check_db(),
        "redis": await check_redis(),
        "gpu": await check_gpu(),
    }
    
    all_healthy = all(c["status"] == "up" for c in checks.values())
    any_up = any(c["status"] == "up" for c in checks.values())
    
    if all_healthy:
        status = HealthStatus.HEALTHY
    elif any_up:
        status = HealthStatus.DEGRADED
        response.status_code = 200  # Still operational
    else:
        status = HealthStatus.UNHEALTHY
        response.status_code = 503
    
    return {"status": status, "checks": checks}

Structured Logging

import logging
import json
from datetime import datetime
from fastapi import Request

class JSONFormatter(logging.Formatter):
    def format(self, record):
        return json.dumps({
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "message": record.getMessage(),
            "logger": record.name,
            "request_id": getattr(record, "request_id", None),
        })

# Middleware for request logging
@app.middleware("http")
async def log_requests(request: Request, call_next):
    request_id = request.headers.get("X-Request-ID", str(uuid4()))
    request.state.request_id = request_id
    
    start = time.perf_counter()
    response = await call_next(request)
    duration = time.perf_counter() - start
    
    logger.info(
        f"{request.method} {request.url.path}",
        extra={
            "request_id": request_id,
            "method": request.method,
            "path": request.url.path,
            "status": response.status_code,
            "duration_ms": round(duration * 1000, 2),
        }
    )
    response.headers["X-Request-ID"] = request_id
    return response

WebSocket with Authentication

from fastapi import WebSocket, WebSocketDisconnect

class ConnectionManager:
    def __init__(self):
        self.active: dict[str, WebSocket] = {}
    
    async def connect(self, job_id: str, websocket: WebSocket):
        await websocket.accept()
        self.active[job_id] = websocket
    
    def disconnect(self, job_id: str):
        self.active.pop(job_id, None)
    
    async def send_progress(self, job_id: str, progress: int):
        if ws := self.active.get(job_id):
            await ws.send_json({"type": "progress", "value": progress})

@router.websocket("/ws/jobs/{job_id}")
async def job_websocket(websocket: WebSocket, job_id: str, token: str):
    # Validate JWT from query param
    try:
        user = await validate_token(token)
    except:
        await websocket.close(code=4001)
        return
    
    await manager.connect(job_id, websocket)
    try:
        while True:
            data = await websocket.receive_text()
            if data == "ping":
                await websocket.send_text("pong")
    except WebSocketDisconnect:
        manager.disconnect(job_id)

Rate Limiting

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@router.post("/jobs")
@limiter.limit("10/minute")
async def submit_job(request: Request, job: JobCreate):
    ...

Error Handling

from fastapi import HTTPException
from fastapi.responses import JSONResponse

class AppException(Exception):
    def __init__(self, code: str, message: str, status_code: int = 400):
        self.code = code
        self.message = message
        self.status_code = status_code

@app.exception_handler(AppException)
async def app_exception_handler(request: Request, exc: AppException):
    return JSONResponse(
        status_code=exc.status_code,
        content={
            "error": {"code": exc.code, "message": exc.message},
            "request_id": getattr(request.state, "request_id", None),
        }
    )

@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    logger.exception("Unhandled exception", extra={"request_id": request.state.request_id})
    return JSONResponse(
        status_code=500,
        content={"error": {"code": "INTERNAL_ERROR", "message": "An unexpected error occurred"}}
    )

Prometheus Metrics

from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST

REQUEST_COUNT = Counter("http_requests_total", "Total requests", ["method", "endpoint", "status"])
REQUEST_LATENCY = Histogram("http_request_duration_seconds", "Request latency", ["endpoint"])

@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    start = time.perf_counter()
    response = await call_next(request)
    duration = time.perf_counter() - start
    
    REQUEST_COUNT.labels(request.method, request.url.path, response.status_code).inc()
    REQUEST_LATENCY.labels(request.url.path).observe(duration)
    return response

@router.get("/metrics")
async def metrics():
    return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)

Production Checklist

  • Settings loaded from secrets, not environment
  • JWT with short-lived access tokens (15 min)
  • Refresh token rotation implemented
  • Health check covers all dependencies
  • Structured JSON logging enabled
  • Request ID tracking through middleware
  • Rate limiting on public endpoints
  • Global exception handler with logging
  • Prometheus metrics exposed
  • WebSocket authentication via token
  • API versioning in URL path

ProYaro AI Infrastructure Documentation • Version 1.2