Skip to main content

Custom Middleware

info

Learn how to create and use custom middleware in Fluvius Framework.

Basic Middleware

Request Logging

from fastapi import Request
import logging

logger = logging.getLogger(__name__)

@app.middleware("http")
async def logging_middleware(request: Request, call_next):
# Log request
logger.info(f"{request.method} {request.url}")

# Process request
response = await call_next(request)

# Log response
logger.info(f"Status: {response.status_code}")

return response

Timing Middleware

import time

@app.middleware("http")
async def timing_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
response.headers["X-Process-Time"] = str(duration)
return response

Authentication Middleware

JWT Authentication

from fastapi import HTTPException, status
from fastapi.security import HTTPBearer

security = HTTPBearer()

@app.middleware("http")
async def jwt_middleware(request: Request, call_next):
# Skip authentication for public endpoints
if request.url.path.startswith("/public"):
return await call_next(request)

# Extract token
token = request.headers.get("Authorization")
if not token:
raise HTTPException(status_code=401, detail="Not authenticated")

# Validate token
user = validate_jwt_token(token)
if not user:
raise HTTPException(status_code=401, detail="Invalid token")

# Add user to request
request.state.user = user

return await call_next(request)

Error Handling Middleware

Global Error Handler

from fastapi import Request
from fastapi.responses import JSONResponse

@app.middleware("http")
async def error_handler(request: Request, call_next):
try:
response = await call_next(request)
return response
except ValueError as e:
return JSONResponse(
status_code=400,
content={"error": str(e)}
)
except Exception as e:
logger.exception("Unhandled error")
return JSONResponse(
status_code=500,
content={"error": "Internal server error"}
)

CORS Middleware

CORS Configuration

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
CORSMiddleware,
allow_origins=["https://example.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

Rate Limiting Middleware

Rate Limiting

from collections import defaultdict
from datetime import datetime, timedelta

rate_limits = defaultdict(list)

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
client_ip = request.client.host
now = datetime.now()

# Clean old entries
rate_limits[client_ip] = [
timestamp for timestamp in rate_limits[client_ip]
if now - timestamp < timedelta(minutes=1)
]

# Check rate limit
if len(rate_limits[client_ip]) >= 100:
return JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded"}
)

# Add current request
rate_limits[client_ip].append(now)

return await call_next(request)

Next Steps