Core Components
Understanding the core components helps you build effective applications with Fluvius Framework.
Domain
The Domain is the main entry point for domain logic. It coordinates aggregates, commands, events, and state management.
Responsibilities
- Aggregate Management: Manages aggregate instances
- Command Processing: Routes commands to aggregates
- Event Handling: Manages event generation and storage
- State Management: Provides access to current state
- Context Management: Manages request context
Example
from fluvius.domain import Domain
from fluvius.domain.context import SanicContext
class UserDomain(Domain):
__aggregate__ = UserAggregate
class Meta:
revision = 1
tags = ["user", "identity"]
# Create domain instance
ctx = SanicContext.create(namespace='app-user')
domain = UserDomain(ctx)
Key Methods
create_command(): Create a commandprocess_command(): Process a commandset_aggroot(): Set aggregate root identifierstatemgr: Access state manager
Aggregate
Aggregates contain business logic and manage state. They enforce business invariants and generate events.
Responsibilities
- Business Logic: Implement business rules
- State Management: Manage aggregate state
- Event Generation: Generate domain events
- Invariant Enforcement: Enforce business rules
Example
from fluvius.domain import Aggregate, action
from fluvius.data import DataModel, field
class UserState(DataModel):
name: str = field()
email: str = field()
active: bool = field(initial=True)
class UserAggregate(Aggregate):
def __init__(self, domain):
super().__init__(domain)
self._state = None
@action(evt_key='user-created', resources=['user'])
async def create_user(self, name: str, email: str):
# Business logic
if not email or '@' not in email:
raise ValueError("Invalid email")
# State update
self._state = UserState(name=name, email=email, active=True)
return self._state
@action(evt_key='user-updated', resources=['user'])
async def update_user(self, name: str = None, email: str = None):
if self._state is None:
raise ValueError("User not created")
updates = {}
if name is not None:
updates['name'] = name
if email is not None:
if '@' not in email:
raise ValueError("Invalid email")
updates['email'] = email
self._state = self._state.update(**updates)
return self._state
Key Concepts
- Actions: Methods decorated with
@action - State: Aggregate state (DataModel instance)
- Events: Automatically generated from actions
- Resources: Resources affected by actions
Command
Commands represent intentions to change state. They are immutable and validated before processing.
Responsibilities
- Intent Expression: Express what should happen
- Validation: Validate input data
- Routing: Route to appropriate aggregate action
Example
# Create command
command = domain.create_command('create-user', {
'name': 'John Doe',
'email': 'john@example.com'
})
# Process command
response = await domain.process_command(command)
Command Structure
Command(
name: str, # Action name
payload: dict, # Command data
metadata: dict, # Additional metadata
aggregate_id: str, # Aggregate identifier
)
Event
Events represent things that have happened. They are immutable and stored in the event log.
Responsibilities
- Fact Recording: Record what happened
- Audit Trail: Provide complete history
- State Replay: Enable state reconstruction
- Integration: Enable event-driven integrations
Example
@action(evt_key='user-created', resources=['user'])
async def create_user(self, name: str, email: str):
# Event 'user-created' is automatically generated
# with payload: {'name': name, 'email': email}
pass
Event Structure
Event(
key: str, # Event key
payload: dict, # Event data
aggregate_id: str, # Aggregate identifier
timestamp: datetime, # When it happened
version: int, # Event version
metadata: dict, # Additional metadata
)
State Manager
The State Manager provides read access to current state. It queries the state store optimized for reads.
Responsibilities
- State Queries: Query current state
- Filtering: Filter entities by criteria
- Sorting: Sort query results
- Pagination: Paginate large result sets
Example
# Fetch single entity
user = await domain.statemgr.fetch('user', user_id)
# Find entities
users = await domain.statemgr.find('user', active=True)
# Query with filters
active_users = await domain.statemgr.find(
'user',
active=True,
limit=10,
offset=0
)
Key Methods
fetch(): Fetch single entityfind(): Find multiple entitiescount(): Count entitiesexists(): Check if entity exists
Command Processor
The Command Processor routes commands to aggregates and handles command processing.
Responsibilities
- Command Routing: Route commands to aggregates
- Action Invocation: Invoke aggregate actions
- Event Handling: Handle event generation
- Error Handling: Handle processing errors
Example
# Process single command
response = await domain.process_command(command)
# Process multiple commands
async for response in domain.command_processor.process(*commands):
print(response)
Event Store
The Event Store persists events in an append-only log.
Responsibilities
- Event Persistence: Store events permanently
- Event Retrieval: Retrieve events by criteria
- Event Replay: Replay events for state reconstruction
- Event Streaming: Stream events to consumers
Features
- Append-Only: Events are never modified
- Immutable: Events cannot be changed
- Ordered: Events maintain order
- Queryable: Query events by criteria
State Store
The State Store maintains current state optimized for reads.
Responsibilities
- State Persistence: Store current state
- State Updates: Update state from events
- Fast Queries: Optimize for read performance
- Indexing: Index state for fast queries
Features
- Read-Optimized: Optimized for queries
- Denormalized: Denormalized for performance
- Indexed: Indexed for fast lookups
- Cached: Optional caching layer
Context
Context provides request-scoped information and services.
Responsibilities
- Request Information: Provide request metadata
- Service Access: Access to services
- Transaction Management: Manage transactions
- Namespace Isolation: Isolate by namespace
Example
from fluvius.domain.context import SanicContext
# Create context
ctx = SanicContext.create(namespace='app-user')
# Access context in domain
domain = UserDomain(ctx)
Context Types
- SanicContext: For Sanic applications
- FastAPIContext: For FastAPI applications
- Custom Context: Extend for custom needs
Data Model
Data Models define the structure of aggregate state.
Responsibilities
- State Definition: Define state structure
- Validation: Validate state data
- Immutability: Support immutable updates
- Serialization: Serialize/deserialize state
Example
from fluvius.data import DataModel, field
class UserState(DataModel):
name: str = field()
email: str = field()
active: bool = field(initial=True)
created_at: datetime = field(default_factory=datetime.now)
Features
- Type Safety: Type hints for validation
- Immutability: Immutable updates
- Validation: Automatic validation
- Serialization: JSON serialization
Component Interactions
Domain
│
├──► Aggregate (Business Logic)
│ │
│ └──► Action (State Change)
│ │
│ └──► Event (Generated)
│
├──► Command Processor
│ │
│ └──► Route to Aggregate
│
├──► State Manager
│ │
│ └──► Query State Store
│
└──► Event Store
│
└──► Persist Events
Next Steps
- Learn about Module Structure
- Explore Tech Stack
- Check Design Patterns