Skip to main content

Messaging & Events

info

Events are the foundation of event sourcing in Fluvius Framework. Understanding how events work helps you build event-driven applications.

Event Basics

What are Events?

Events represent things that have happened in your domain. They are:

  • Immutable: Once created, they never change
  • Append-Only: Added to event log, never modified
  • Ordered: Maintain chronological order
  • Replayable: Can be replayed to reconstruct state

Event Structure

Event(
key: str, # Event identifier (e.g., 'user-created')
payload: dict, # Event data
aggregate_id: str, # Aggregate identifier
timestamp: datetime, # When it happened
version: int, # Event version
metadata: dict, # Additional metadata
)

Event Generation

Automatic Generation

Events are automatically generated from aggregate actions:

from fluvius.domain import Aggregate, action

class UserAggregate(Aggregate):
@action(evt_key='user-created', resources=['user'])
async def create_user(self, name: str, email: str):
# Event 'user-created' is automatically generated
self._state = UserState(name=name, email=email)
return self._state

Event Key

The event key identifies the event type:

@action(evt_key='user-created')  # Event key
async def create_user(self, name: str, email: str):
pass

Event Payload

The event payload contains the event data:

@action(evt_key='user-created')
async def create_user(self, name: str, email: str):
# Event payload will be: {'name': name, 'email': email}
return {'name': name, 'email': email}

Event Storage

Event Store

Events are stored in the event store:

# Events are automatically stored when commands are processed
response = await domain.process_command(command)

# Access event log
events = await domain.logstore.get_events('user', user_id)

Event Log

The event log is an append-only log:

# Get all events for an aggregate
events = await domain.logstore.get_events('user', user_id)

# Events are ordered chronologically
for event in events:
print(f"{event.timestamp}: {event.key}")

Event Replay

Reconstructing State

Reconstruct state by replaying events:

# Get all events
events = await domain.logstore.get_events('user', user_id)

# Replay events to reconstruct state
state = None
for event in events:
if event.key == 'user-created':
state = UserState(**event.payload)
elif event.key == 'user-updated':
state = state.update(**event.payload)
elif event.key == 'user-deactivated':
state = state.update(active=False)

Time Travel

Query state at any point in time:

# Get events up to a specific time
cutoff_time = datetime(2024, 1, 1)
events = await domain.logstore.get_events_until('user', user_id, cutoff_time)

# Replay to that point
state = replay_events(events)

Event Handlers

Event Handlers

React to events with event handlers:

from fluvius.domain import event_handler

@event_handler('user-created')
async def send_welcome_email(event):
# React to user-created event
await email_service.send_welcome(event.payload['email'])

@event_handler('user-created')
async def create_user_profile(event):
# Another handler for the same event
await profile_service.create(event.aggregate_id)

Async Event Handlers

Event handlers are async:

@event_handler('order-placed')
async def process_order(event):
# Async processing
await inventory_service.reserve(event.payload['items'])
await payment_service.charge(event.payload['amount'])
await shipping_service.schedule(event.aggregate_id)

Event-Driven Integration

External Systems

Events can trigger external system integrations:

@event_handler('payment-processed')
async def notify_accounting(event):
# Send to accounting system
await accounting_api.record_transaction(event.payload)

@event_handler('order-shipped')
async def notify_customer(event):
# Send notification to customer
await notification_service.send(
event.payload['customer_id'],
f"Order {event.aggregate_id} has been shipped"
)

Event Streaming

Stream events to external systems:

# Stream events to Kafka
@event_handler('*') # All events
async def stream_to_kafka(event):
await kafka_producer.send('events', event.to_dict())

Event Versioning

Event Schema Evolution

Handle event schema changes:

# Version 1 event
@action(evt_key='user-created', version=1)
async def create_user_v1(self, name: str, email: str):
return {'name': name, 'email': email}

# Version 2 event (backward compatible)
@action(evt_key='user-created', version=2)
async def create_user_v2(self, name: str, email: str, phone: str = None):
payload = {'name': name, 'email': email}
if phone:
payload['phone'] = phone
return payload

Event Migration

Migrate old events to new schema:

async def migrate_event(event):
if event.version == 1:
# Migrate to version 2
event.payload['phone'] = None
event.version = 2
return event

Event Best Practices

1. Meaningful Event Names

Use clear, descriptive event names:

# Good: Clear and descriptive
@action(evt_key='user-account-activated')

# Bad: Unclear
@action(evt_key='user-updated')

2. Include Relevant Data

Include all relevant data in events:

@action(evt_key='order-placed')
async def place_order(self, items: list, customer_id: str, total: float):
# Include all relevant data
return {
'items': items,
'customer_id': customer_id,
'total': total,
'timestamp': datetime.now().isoformat()
}

3. Keep Events Immutable

Never modify events after creation:

# Good: Create new event for correction
@action(evt_key='user-email-corrected')
async def correct_email(self, new_email: str):
# Original event remains unchanged
pass

# Bad: Trying to modify existing event
# This is not possible in Fluvius

4. Handle Event Failures

Handle event handler failures gracefully:

@event_handler('user-created')
async def send_welcome_email(event):
try:
await email_service.send(event.payload['email'])
except Exception as e:
# Log error, don't fail the command
logger.error(f"Failed to send welcome email: {e}")

Event Patterns

Event Sourcing

Store all changes as events:

# All state changes are events
@action(evt_key='user-created')
async def create_user(self, name: str, email: str):
# Event stored
pass

@action(evt_key='user-updated')
async def update_user(self, name: str):
# Event stored
pass

Event-Driven Architecture

Use events for system integration:

# Domain A publishes event
@action(evt_key='order-placed')
async def place_order(self, items: list):
# Event generated
pass

# Domain B listens to event
@event_handler('order-placed')
async def process_inventory(event):
# React to event
pass

Next Steps