Skip to main content

Main Abstractions

info

Understanding the main abstractions in Fluvius Framework helps you build effective domain-driven applications.

Domain

The Domain is the main entry point for domain logic. It coordinates aggregates, commands, events, and state management.

Key Responsibilities

  • Aggregate Management: Manages aggregate instances
  • Command Processing: Routes commands to aggregates
  • Event Handling: Manages event generation and storage
  • State Management: Provides access to current state

Example

from fluvius.domain import Domain
from fluvius.domain.context import SanicContext

class UserDomain(Domain):
__aggregate__ = UserAggregate

class Meta:
revision = 1
tags = ["user", "identity"]

# Create domain instance
ctx = SanicContext.create(namespace='app-user')
domain = UserDomain(ctx)

Aggregate

Aggregates contain business logic and manage state. They enforce business invariants and generate events.

Key Responsibilities

  • Business Logic: Implement business rules
  • State Management: Manage aggregate state
  • Event Generation: Generate domain events
  • Invariant Enforcement: Enforce business rules

Example

from fluvius.domain import Aggregate, action
from fluvius.data import DataModel, field

class UserState(DataModel):
name: str = field()
email: str = field()
active: bool = field(initial=True)

class UserAggregate(Aggregate):
def __init__(self, domain):
super().__init__(domain)
self._state = None

@action(evt_key='user-created', resources=['user'])
async def create_user(self, name: str, email: str):
# Business logic
if not email or '@' not in email:
raise ValueError("Invalid email")

# State update
self._state = UserState(name=name, email=email, active=True)
return self._state

Command

Commands represent intentions to change state. They are immutable and validated before processing.

Key Characteristics

  • Immutable: Cannot be changed after creation
  • Validated: Validated before processing
  • Routed: Routed to appropriate aggregate action
  • Idempotent: Can be safely retried

Example

# Create command
command = domain.create_command('create-user', {
'name': 'John Doe',
'email': 'john@example.com'
})

# Process command
response = await domain.process_command(command)

Event

Events represent things that have happened. They are immutable and stored in the event log.

Key Characteristics

  • Immutable: Cannot be changed
  • Append-Only: Added to event log
  • Ordered: Maintain chronological order
  • Replayable: Can be replayed to reconstruct state

Example

@action(evt_key='user-created', resources=['user'])
async def create_user(self, name: str, email: str):
# Event 'user-created' is automatically generated
# with payload: {'name': name, 'email': email}
pass

State

State represents the current state of an aggregate. It's derived from events and optimized for reads.

Key Characteristics

  • Derived: Computed from events
  • Read-Optimized: Optimized for queries
  • Denormalized: May be denormalized for performance
  • Cached: Optional caching layer

Example

# Fetch current state
user = await domain.statemgr.fetch('user', user_id)

# Query state
users = await domain.statemgr.find('user', active=True)

State Manager

The State Manager provides read access to current state. It queries the state store optimized for reads.

Key Methods

  • fetch(): Fetch single entity
  • find(): Find multiple entities
  • count(): Count entities
  • exists(): Check if entity exists

Example

# Fetch single entity
user = await domain.statemgr.fetch('user', user_id)

# Find entities
users = await domain.statemgr.find('user', active=True)

# Query with filters
active_users = await domain.statemgr.find(
'user',
active=True,
limit=10,
offset=0
)

Command Processor

The Command Processor routes commands to aggregates and handles command processing.

Key Responsibilities

  • Command Routing: Route commands to aggregates
  • Action Invocation: Invoke aggregate actions
  • Event Handling: Handle event generation
  • Error Handling: Handle processing errors

Example

# Process single command
response = await domain.process_command(command)

# Process multiple commands
async for response in domain.command_processor.process(*commands):
print(response)

Event Store

The Event Store persists events in an append-only log.

Key Features

  • Append-Only: Events are never modified
  • Immutable: Events cannot be changed
  • Ordered: Events maintain order
  • Queryable: Query events by criteria

Context

Context provides request-scoped information and services.

Key Responsibilities

  • Request Information: Provide request metadata
  • Service Access: Access to services
  • Transaction Management: Manage transactions
  • Namespace Isolation: Isolate by namespace

Example

from fluvius.domain.context import SanicContext

# Create context
ctx = SanicContext.create(namespace='app-user')

# Access context in domain
domain = UserDomain(ctx)

Data Model

Data Models define the structure of aggregate state.

Key Features

  • Type Safety: Type hints for validation
  • Immutability: Immutable updates
  • Validation: Automatic validation
  • Serialization: JSON serialization

Example

from fluvius.data import DataModel, field
from datetime import datetime

class UserState(DataModel):
name: str = field()
email: str = field()
active: bool = field(initial=True)
created_at: datetime = field(default_factory=datetime.now)

Abstractions Working Together

Client Request


Domain

├──► Command Processor
│ │
│ └──► Aggregate
│ │
│ ├──► Business Logic
│ │
│ ├──► State Update
│ │
│ └──► Event Generation
│ │
│ └──► Event Store

└──► State Manager

└──► State Store (Queries)

Best Practices

1. Keep Aggregates Focused

Each aggregate should have a single responsibility:

# Good: Focused aggregate
class UserAggregate(Aggregate):
# User-related logic only
pass

# Bad: Too many responsibilities
class UserOrderPaymentAggregate(Aggregate):
# Too many concerns
pass

2. Use Actions for State Changes

Always use @action decorator for state changes:

# Good: Uses action decorator
@action(evt_key='user-created')
async def create_user(self, name: str, email: str):
pass

# Bad: Direct state modification
async def create_user(self, name: str, email: str):
self._state = UserState(...) # No event generated

3. Validate in Aggregates

Enforce business rules in aggregates:

@action(evt_key='user-created')
async def create_user(self, name: str, email: str):
# Validate business rules
if not email or '@' not in email:
raise ValueError("Invalid email")
# Business logic
pass

4. Use State Manager for Queries

Use state manager for read operations:

# Good: Use state manager
user = await domain.statemgr.fetch('user', user_id)

# Bad: Direct database access
user = await db.query("SELECT * FROM users WHERE id = ?", user_id)

Next Steps