User Domain Example
info
This example demonstrates a complete user management domain with create, update, and deactivate operations.
Overview
This example shows how to build a user management domain using Fluvius Framework. It includes:
- User aggregate with business logic
- Commands for user operations
- Events for state changes
- State management
Complete Example
Aggregate Definition
from fluvius.domain import Aggregate, action
from fluvius.data import DataModel, field, UUID_TYPE
class UserState(DataModel):
name: str = field()
email: str = field()
active: bool = field(initial=True)
created_at: datetime = field(factory=timestamp)
class UserAggregate(Aggregate):
def __init__(self, domain):
super().__init__(domain)
self._state = None
@action(evt_key='user-created', resources=['user'])
async def create_user(self, name: str, email: str):
"""Create a new user"""
if self._state is not None:
raise ValueError("User already exists")
self._state = UserState(
name=name,
email=email,
active=True
)
return self._state
@action(evt_key='user-updated', resources=['user'])
async def update_user(self, name: str = None, email: str = None):
"""Update user information"""
if self._state is None:
raise ValueError("User does not exist")
updates = {}
if name is not None:
updates['name'] = name
if email is not None:
updates['email'] = email
self._state = self._state.update(**updates)
return self._state
@action(evt_key='user-deactivated', resources=['user'])
async def deactivate_user(self):
"""Deactivate a user"""
if self._state is None:
raise ValueError("User does not exist")
self._state = self._state.update(active=False)
return self._state
Domain Definition
from fluvius.domain import Domain
from .aggregate import UserAggregate
class UserDomain(Domain):
__aggregate__ = UserAggregate
class Meta:
revision = 1
tags = ["user", "identity"]
title = "User Management Domain"
description = "Domain for managing user accounts"
Usage
import asyncio
from fluvius.domain.context import SanicContext
from fluvius.data import UUID_GENR
from user_domain import UserDomain
async def main():
# Create context
ctx = SanicContext.create(namespace='app-user')
# Create domain
domain = UserDomain(ctx)
# Set aggregate root
user_id = UUID_GENR()
domain.set_aggroot('user', user_id)
# Create and process commands
commands = [
domain.create_command('create-user', {
'name': 'John Doe',
'email': 'john@example.com'
}),
domain.create_command('update-user', {
'name': 'Jane Doe'
}),
domain.create_command('deactivate-user')
]
# Process commands
async for response in domain.command_processor.process(*commands):
print(f'Response: {response}')
# Query state
user = await domain.statemgr.fetch('user', user_id)
print(f'Final state: {user}')
if __name__ == '__main__':
asyncio.run(main())
Key Concepts
- Aggregate: Contains business logic and manages state
- Actions: Methods decorated with
@actionthat generate events - Commands: Created using
domain.create_command() - State: Managed through the aggregate and accessible via
statemgr - Events: Automatically generated from actions
Next Steps
- Check out the Banking Domain Example
- Learn about State Management
- Read the Domain API Reference