Custom Middleware
info
Learn how to create and use custom middleware in Fluvius Framework.
Basic Middleware
Request Logging
from fastapi import Request
import logging
logger = logging.getLogger(__name__)
@app.middleware("http")
async def logging_middleware(request: Request, call_next):
# Log request
logger.info(f"{request.method} {request.url}")
# Process request
response = await call_next(request)
# Log response
logger.info(f"Status: {response.status_code}")
return response
Timing Middleware
import time
@app.middleware("http")
async def timing_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
response.headers["X-Process-Time"] = str(duration)
return response
Authentication Middleware
JWT Authentication
from fastapi import HTTPException, status
from fastapi.security import HTTPBearer
security = HTTPBearer()
@app.middleware("http")
async def jwt_middleware(request: Request, call_next):
# Skip authentication for public endpoints
if request.url.path.startswith("/public"):
return await call_next(request)
# Extract token
token = request.headers.get("Authorization")
if not token:
raise HTTPException(status_code=401, detail="Not authenticated")
# Validate token
user = validate_jwt_token(token)
if not user:
raise HTTPException(status_code=401, detail="Invalid token")
# Add user to request
request.state.user = user
return await call_next(request)
Error Handling Middleware
Global Error Handler
from fastapi import Request
from fastapi.responses import JSONResponse
@app.middleware("http")
async def error_handler(request: Request, call_next):
try:
response = await call_next(request)
return response
except ValueError as e:
return JSONResponse(
status_code=400,
content={"error": str(e)}
)
except Exception as e:
logger.exception("Unhandled error")
return JSONResponse(
status_code=500,
content={"error": "Internal server error"}
)
CORS Middleware
CORS Configuration
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["https://example.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
Rate Limiting Middleware
Rate Limiting
from collections import defaultdict
from datetime import datetime, timedelta
rate_limits = defaultdict(list)
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
client_ip = request.client.host
now = datetime.now()
# Clean old entries
rate_limits[client_ip] = [
timestamp for timestamp in rate_limits[client_ip]
if now - timestamp < timedelta(minutes=1)
]
# Check rate limit
if len(rate_limits[client_ip]) >= 100:
return JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded"}
)
# Add current request
rate_limits[client_ip].append(now)
return await call_next(request)
Next Steps
- Learn about Error Handling
- Explore New Module
- Check Add API Endpoint