Request Lifecycle
Understanding the request lifecycle helps you debug issues and optimize your Fluvius applications.
Overview
The request lifecycle in Fluvius follows a clear path from HTTP request to response:
HTTP Request
│
▼
FastAPI Middleware
│
▼
FastAPI Endpoint
│
▼
Domain Command/Query
│
├──► Command Path
│ │
│ ▼
│ Command Processor
│ │
│ ▼
│ Aggregate Action
│ │
│ ├──► Business Logic
│ │
│ ├──► State Update
│ │
│ └──► Event Generation
│ │
│ ├──► Event Store
│ │
│ └──► State Store Update
│
└──► Query Path
│
▼
State Manager
│
▼
State Store
│
└──► Response
Command Request Lifecycle
1. HTTP Request
Client sends HTTP request:
POST /api/users
Content-Type: application/json
{
"name": "John Doe",
"email": "john@example.com"
}
2. FastAPI Middleware
Middleware processes request:
@middleware
async def logging_middleware(request, call_next):
# Log request
logger.info(f"Request: {request.method} {request.url}")
response = await call_next(request)
# Log response
return response
3. FastAPI Endpoint
Endpoint receives request:
@app.post("/api/users")
async def create_user(request: CreateUserRequest):
# Validate request
# Create command
# Process command
pass
4. Domain Command Creation
Domain creates command:
command = domain.create_command('create-user', {
'name': request.name,
'email': request.email
})
5. Command Processing
Command processor routes to aggregate:
response = await domain.process_command(command)
6. Aggregate Action
Aggregate executes business logic:
@action(evt_key='user-created', resources=['user'])
async def create_user(self, name: str, email: str):
# Business logic
self._state = UserState(name=name, email=email)
return self._state
7. Event Generation
Event is automatically generated:
# Event 'user-created' is generated with:
# - key: 'user-created'
# - payload: {'name': name, 'email': email}
# - timestamp: current time
# - aggregate_id: user_id
8. Event Storage
Event is stored in event store:
# Event is appended to event log
await event_store.append(event)
9. State Update
State store is updated:
# Current state is updated
await state_store.update('user', user_id, new_state)
10. Response
Response is returned to client:
return {
"id": user_id,
"name": "John Doe",
"email": "john@example.com"
}
Query Request Lifecycle
1. HTTP Request
Client sends HTTP GET request:
GET /api/users/123
2. FastAPI Middleware
Middleware processes request (same as command)
3. FastAPI Endpoint
Endpoint receives request:
@app.get("/api/users/{user_id}")
async def get_user(user_id: str):
# Query state
user = await domain.statemgr.fetch('user', user_id)
return user
4. State Manager Query
State manager queries state store:
user = await domain.statemgr.fetch('user', user_id)
5. State Store Query
State store returns current state:
# Query optimized state store
state = await state_store.fetch('user', user_id)
6. Response
Response is returned to client:
return {
"id": user_id,
"name": "John Doe",
"email": "john@example.com",
"active": True
}
Lifecycle Hooks
Before Command Processing
class UserDomain(Domain):
async def before_process_command(self, command):
# Pre-processing logic
logger.info(f"Processing command: {command.name}")
After Command Processing
class UserDomain(Domain):
async def after_process_command(self, command, response):
# Post-processing logic
logger.info(f"Command processed: {command.name}")
Before Event Storage
class UserDomain(Domain):
async def before_store_event(self, event):
# Pre-storage logic
# Validate event
pass
After Event Storage
class UserDomain(Domain):
async def after_store_event(self, event):
# Post-storage logic
# Trigger side effects
pass
Error Handling
Aggregate Errors
Errors in aggregates are caught and handled:
@action(evt_key='user-created')
async def create_user(self, name: str, email: str):
if not email or '@' not in email:
raise ValueError("Invalid email") # Caught by command processor
Command Processing Errors
Errors are handled by command processor:
try:
response = await domain.process_command(command)
except ValueError as e:
# Handle validation error
return {"error": str(e)}
State Query Errors
Errors in queries are handled:
try:
user = await domain.statemgr.fetch('user', user_id)
except NotFoundError:
raise HTTPException(status_code=404, detail="User not found")
Transaction Management
Command Transactions
Commands are processed in transactions:
# Command processing is transactional
async with domain.transaction():
response = await domain.process_command(command)
# If error occurs, transaction is rolled back
Event Storage Transactions
Events are stored atomically:
# Event storage is atomic
# Either all events are stored or none
await event_store.append_many(events)
Performance Considerations
Command Processing
- Synchronous: Commands are processed synchronously
- Transactional: Each command is in a transaction
- Event Generation: Events are generated synchronously
Query Processing
- Optimized: Queries use optimized state store
- Cached: Optional caching layer
- Indexed: State store is indexed for fast queries
Event Storage
- Append-Only: Events are appended (fast writes)
- Batchable: Multiple events can be batched
- Async: Event storage can be async
Monitoring
Request Metrics
Track request metrics:
@middleware
async def metrics_middleware(request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
metrics.record_request(request.path, duration)
return response
Command Metrics
Track command processing:
class UserDomain(Domain):
async def process_command(self, command):
start_time = time.time()
try:
response = await super().process_command(command)
metrics.record_command(command.name, time.time() - start_time)
return response
except Exception as e:
metrics.record_command_error(command.name, str(e))
raise
Next Steps
- Learn about Messaging & Events
- Explore Data Persistence
- Check Auth & Security