Skip to main content

Request Lifecycle

info

Understanding the request lifecycle helps you debug issues and optimize your Fluvius applications.

Overview

The request lifecycle in Fluvius follows a clear path from HTTP request to response:

HTTP Request


FastAPI Middleware


FastAPI Endpoint


Domain Command/Query

├──► Command Path
│ │
│ ▼
│ Command Processor
│ │
│ ▼
│ Aggregate Action
│ │
│ ├──► Business Logic
│ │
│ ├──► State Update
│ │
│ └──► Event Generation
│ │
│ ├──► Event Store
│ │
│ └──► State Store Update

└──► Query Path


State Manager


State Store

└──► Response

Command Request Lifecycle

1. HTTP Request

Client sends HTTP request:

POST /api/users
Content-Type: application/json

{
"name": "John Doe",
"email": "john@example.com"
}

2. FastAPI Middleware

Middleware processes request:

@middleware
async def logging_middleware(request, call_next):
# Log request
logger.info(f"Request: {request.method} {request.url}")
response = await call_next(request)
# Log response
return response

3. FastAPI Endpoint

Endpoint receives request:

@app.post("/api/users")
async def create_user(request: CreateUserRequest):
# Validate request
# Create command
# Process command
pass

4. Domain Command Creation

Domain creates command:

command = domain.create_command('create-user', {
'name': request.name,
'email': request.email
})

5. Command Processing

Command processor routes to aggregate:

response = await domain.process_command(command)

6. Aggregate Action

Aggregate executes business logic:

@action(evt_key='user-created', resources=['user'])
async def create_user(self, name: str, email: str):
# Business logic
self._state = UserState(name=name, email=email)
return self._state

7. Event Generation

Event is automatically generated:

# Event 'user-created' is generated with:
# - key: 'user-created'
# - payload: {'name': name, 'email': email}
# - timestamp: current time
# - aggregate_id: user_id

8. Event Storage

Event is stored in event store:

# Event is appended to event log
await event_store.append(event)

9. State Update

State store is updated:

# Current state is updated
await state_store.update('user', user_id, new_state)

10. Response

Response is returned to client:

return {
"id": user_id,
"name": "John Doe",
"email": "john@example.com"
}

Query Request Lifecycle

1. HTTP Request

Client sends HTTP GET request:

GET /api/users/123

2. FastAPI Middleware

Middleware processes request (same as command)

3. FastAPI Endpoint

Endpoint receives request:

@app.get("/api/users/{user_id}")
async def get_user(user_id: str):
# Query state
user = await domain.statemgr.fetch('user', user_id)
return user

4. State Manager Query

State manager queries state store:

user = await domain.statemgr.fetch('user', user_id)

5. State Store Query

State store returns current state:

# Query optimized state store
state = await state_store.fetch('user', user_id)

6. Response

Response is returned to client:

return {
"id": user_id,
"name": "John Doe",
"email": "john@example.com",
"active": True
}

Lifecycle Hooks

Before Command Processing

class UserDomain(Domain):
async def before_process_command(self, command):
# Pre-processing logic
logger.info(f"Processing command: {command.name}")

After Command Processing

class UserDomain(Domain):
async def after_process_command(self, command, response):
# Post-processing logic
logger.info(f"Command processed: {command.name}")

Before Event Storage

class UserDomain(Domain):
async def before_store_event(self, event):
# Pre-storage logic
# Validate event
pass

After Event Storage

class UserDomain(Domain):
async def after_store_event(self, event):
# Post-storage logic
# Trigger side effects
pass

Error Handling

Aggregate Errors

Errors in aggregates are caught and handled:

@action(evt_key='user-created')
async def create_user(self, name: str, email: str):
if not email or '@' not in email:
raise ValueError("Invalid email") # Caught by command processor

Command Processing Errors

Errors are handled by command processor:

try:
response = await domain.process_command(command)
except ValueError as e:
# Handle validation error
return {"error": str(e)}

State Query Errors

Errors in queries are handled:

try:
user = await domain.statemgr.fetch('user', user_id)
except NotFoundError:
raise HTTPException(status_code=404, detail="User not found")

Transaction Management

Command Transactions

Commands are processed in transactions:

# Command processing is transactional
async with domain.transaction():
response = await domain.process_command(command)
# If error occurs, transaction is rolled back

Event Storage Transactions

Events are stored atomically:

# Event storage is atomic
# Either all events are stored or none
await event_store.append_many(events)

Performance Considerations

Command Processing

  • Synchronous: Commands are processed synchronously
  • Transactional: Each command is in a transaction
  • Event Generation: Events are generated synchronously

Query Processing

  • Optimized: Queries use optimized state store
  • Cached: Optional caching layer
  • Indexed: State store is indexed for fast queries

Event Storage

  • Append-Only: Events are appended (fast writes)
  • Batchable: Multiple events can be batched
  • Async: Event storage can be async

Monitoring

Request Metrics

Track request metrics:

@middleware
async def metrics_middleware(request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
metrics.record_request(request.path, duration)
return response

Command Metrics

Track command processing:

class UserDomain(Domain):
async def process_command(self, command):
start_time = time.time()
try:
response = await super().process_command(command)
metrics.record_command(command.name, time.time() - start_time)
return response
except Exception as e:
metrics.record_command_error(command.name, str(e))
raise

Next Steps