Messaging & Events
Events are the foundation of event sourcing in Fluvius Framework. Understanding how events work helps you build event-driven applications.
Event Basics
What are Events?
Events represent things that have happened in your domain. They are:
- Immutable: Once created, they never change
- Append-Only: Added to event log, never modified
- Ordered: Maintain chronological order
- Replayable: Can be replayed to reconstruct state
Event Structure
Event(
key: str, # Event identifier (e.g., 'user-created')
payload: dict, # Event data
aggregate_id: str, # Aggregate identifier
timestamp: datetime, # When it happened
version: int, # Event version
metadata: dict, # Additional metadata
)
Event Generation
Automatic Generation
Events are automatically generated from aggregate actions:
from fluvius.domain import Aggregate, action
class UserAggregate(Aggregate):
@action(evt_key='user-created', resources=['user'])
async def create_user(self, name: str, email: str):
# Event 'user-created' is automatically generated
self._state = UserState(name=name, email=email)
return self._state
Event Key
The event key identifies the event type:
@action(evt_key='user-created') # Event key
async def create_user(self, name: str, email: str):
pass
Event Payload
The event payload contains the event data:
@action(evt_key='user-created')
async def create_user(self, name: str, email: str):
# Event payload will be: {'name': name, 'email': email}
return {'name': name, 'email': email}
Event Storage
Event Store
Events are stored in the event store:
# Events are automatically stored when commands are processed
response = await domain.process_command(command)
# Access event log
events = await domain.logstore.get_events('user', user_id)
Event Log
The event log is an append-only log:
# Get all events for an aggregate
events = await domain.logstore.get_events('user', user_id)
# Events are ordered chronologically
for event in events:
print(f"{event.timestamp}: {event.key}")
Event Replay
Reconstructing State
Reconstruct state by replaying events:
# Get all events
events = await domain.logstore.get_events('user', user_id)
# Replay events to reconstruct state
state = None
for event in events:
if event.key == 'user-created':
state = UserState(**event.payload)
elif event.key == 'user-updated':
state = state.update(**event.payload)
elif event.key == 'user-deactivated':
state = state.update(active=False)
Time Travel
Query state at any point in time:
# Get events up to a specific time
cutoff_time = datetime(2024, 1, 1)
events = await domain.logstore.get_events_until('user', user_id, cutoff_time)
# Replay to that point
state = replay_events(events)
Event Handlers
Event Handlers
React to events with event handlers:
from fluvius.domain import event_handler
@event_handler('user-created')
async def send_welcome_email(event):
# React to user-created event
await email_service.send_welcome(event.payload['email'])
@event_handler('user-created')
async def create_user_profile(event):
# Another handler for the same event
await profile_service.create(event.aggregate_id)
Async Event Handlers
Event handlers are async:
@event_handler('order-placed')
async def process_order(event):
# Async processing
await inventory_service.reserve(event.payload['items'])
await payment_service.charge(event.payload['amount'])
await shipping_service.schedule(event.aggregate_id)
Event-Driven Integration
External Systems
Events can trigger external system integrations:
@event_handler('payment-processed')
async def notify_accounting(event):
# Send to accounting system
await accounting_api.record_transaction(event.payload)
@event_handler('order-shipped')
async def notify_customer(event):
# Send notification to customer
await notification_service.send(
event.payload['customer_id'],
f"Order {event.aggregate_id} has been shipped"
)
Event Streaming
Stream events to external systems:
# Stream events to Kafka
@event_handler('*') # All events
async def stream_to_kafka(event):
await kafka_producer.send('events', event.to_dict())
Event Versioning
Event Schema Evolution
Handle event schema changes:
# Version 1 event
@action(evt_key='user-created', version=1)
async def create_user_v1(self, name: str, email: str):
return {'name': name, 'email': email}
# Version 2 event (backward compatible)
@action(evt_key='user-created', version=2)
async def create_user_v2(self, name: str, email: str, phone: str = None):
payload = {'name': name, 'email': email}
if phone:
payload['phone'] = phone
return payload
Event Migration
Migrate old events to new schema:
async def migrate_event(event):
if event.version == 1:
# Migrate to version 2
event.payload['phone'] = None
event.version = 2
return event
Event Best Practices
1. Meaningful Event Names
Use clear, descriptive event names:
# Good: Clear and descriptive
@action(evt_key='user-account-activated')
# Bad: Unclear
@action(evt_key='user-updated')
2. Include Relevant Data
Include all relevant data in events:
@action(evt_key='order-placed')
async def place_order(self, items: list, customer_id: str, total: float):
# Include all relevant data
return {
'items': items,
'customer_id': customer_id,
'total': total,
'timestamp': datetime.now().isoformat()
}
3. Keep Events Immutable
Never modify events after creation:
# Good: Create new event for correction
@action(evt_key='user-email-corrected')
async def correct_email(self, new_email: str):
# Original event remains unchanged
pass
# Bad: Trying to modify existing event
# This is not possible in Fluvius
4. Handle Event Failures
Handle event handler failures gracefully:
@event_handler('user-created')
async def send_welcome_email(event):
try:
await email_service.send(event.payload['email'])
except Exception as e:
# Log error, don't fail the command
logger.error(f"Failed to send welcome email: {e}")
Event Patterns
Event Sourcing
Store all changes as events:
# All state changes are events
@action(evt_key='user-created')
async def create_user(self, name: str, email: str):
# Event stored
pass
@action(evt_key='user-updated')
async def update_user(self, name: str):
# Event stored
pass
Event-Driven Architecture
Use events for system integration:
# Domain A publishes event
@action(evt_key='order-placed')
async def place_order(self, items: list):
# Event generated
pass
# Domain B listens to event
@event_handler('order-placed')
async def process_inventory(event):
# React to event
pass
Next Steps
- Learn about Data Persistence
- Explore Auth & Security
- Check Main Abstractions