Skip to main content

Data Persistence

info

Fluvius Framework supports multiple database backends and provides flexible data persistence options.

Database Drivers

PostgreSQL Driver

PostgreSQL is the recommended production database:

from fluvius.data import PostgreSQLDriver

driver = PostgreSQLDriver(
connection_string="postgresql://user:password@localhost:5432/mydb",
pool_size=10,
max_overflow=20
)

Features:

  • ACID compliance
  • JSON support
  • Full-text search
  • Transactions
  • Connection pooling

MongoDB Driver

MongoDB for document storage:

from fluvius.data import MongoDBDriver

driver = MongoDBDriver(
connection_string="mongodb://localhost:27017/",
database="mydb"
)

Features:

  • Document storage
  • Flexible schema
  • Horizontal scaling
  • JSON-like documents

SQLite Driver

SQLite for development and testing:

from fluvius.data import SQLiteDriver

driver = SQLiteDriver(
database_path=":memory:", # or "path/to/database.db"
check_same_thread=False
)

Features:

  • Zero configuration
  • File-based
  • Fast for development
  • Easy testing

Event Store Persistence

Event Storage

Events are stored in the event store:

# Events are automatically stored
@action(evt_key='user-created')
async def create_user(self, name: str, email: str):
# Event is stored in event store
pass

Event Retrieval

Retrieve events from event store:

# Get all events for an aggregate
events = await domain.logstore.get_events('user', user_id)

# Get events by type
events = await domain.logstore.get_events_by_type('user-created')

# Get events in time range
events = await domain.logstore.get_events_in_range(
start_time, end_time
)

State Store Persistence

State Storage

Current state is stored in state store:

# State is automatically updated when events are stored
@action(evt_key='user-created')
async def create_user(self, name: str, email: str):
# State is updated in state store
self._state = UserState(name=name, email=email)
return self._state

State Queries

Query state from state store:

# Fetch single entity
user = await domain.statemgr.fetch('user', user_id)

# Find entities
users = await domain.statemgr.find('user', active=True)

# Query with filters
users = await domain.statemgr.find(
'user',
active=True,
limit=10,
offset=0
)

Transactions

Command Transactions

Commands are processed in transactions:

# Command processing is transactional
async with domain.transaction():
response = await domain.process_command(command)
# If error occurs, transaction is rolled back

Event Storage Transactions

Events are stored atomically:

# Event storage is atomic
# Either all events are stored or none
await event_store.append_many(events)

State Update Transactions

State updates are transactional:

# State updates are transactional
async with state_store.transaction():
await state_store.update('user', user_id, new_state)

Migrations

Alembic Migrations

Use Alembic for schema migrations:

# Create migration
alembic revision --autogenerate -m "Add user table"

# Apply migration
alembic upgrade head

# Rollback migration
alembic downgrade -1

Migration Files

# alembic/versions/001_add_user_table.py
from alembic import op
import sqlalchemy as sa

def upgrade():
op.create_table(
'users',
sa.Column('id', sa.String(), primary_key=True),
sa.Column('name', sa.String(), nullable=False),
sa.Column('email', sa.String(), nullable=False),
)

def downgrade():
op.drop_table('users')

Connection Pooling

Pool Configuration

Configure connection pools:

from fluvius.data import PostgreSQLDriver

driver = PostgreSQLDriver(
connection_string="postgresql://...",
pool_size=10, # Base pool size
max_overflow=20, # Maximum overflow
pool_timeout=30, # Timeout in seconds
pool_recycle=3600 # Recycle connections after 1 hour
)

Caching

State Caching

Cache state for faster queries:

from fluvius.data import CacheManager

cache = CacheManager(redis_url="redis://localhost:6379")

# Cache state queries
@cache.cached(ttl=300) # Cache for 5 minutes
async def get_user(user_id: str):
return await domain.statemgr.fetch('user', user_id)

Backup and Recovery

Event Log Backup

Backup event log:

# Export events
events = await domain.logstore.export_events()

# Import events
await domain.logstore.import_events(events)

State Backup

Backup state store:

# Export state
state = await state_store.export_state()

# Import state
await state_store.import_state(state)

Performance Optimization

Indexing

Create indexes for faster queries:

# Create index on state store
await state_store.create_index('user', 'email')

# Create composite index
await state_store.create_index('user', ['active', 'created_at'])

Query Optimization

Optimize queries:

# Use specific fields
users = await domain.statemgr.find(
'user',
active=True,
fields=['id', 'name', 'email'] # Only fetch needed fields
)

# Use pagination
users = await domain.statemgr.find(
'user',
limit=10,
offset=0
)

Data Models

State Models

Define state models:

from fluvius.data import DataModel, field

class UserState(DataModel):
name: str = field()
email: str = field()
active: bool = field(initial=True)
created_at: datetime = field(default_factory=datetime.now)

Model Validation

Models are automatically validated:

# Validation happens automatically
user = UserState(name="John", email="john@example.com")
# Invalid data raises ValidationError

Best Practices

1. Use Transactions

Always use transactions for multi-step operations:

async with domain.transaction():
# Multiple operations
await domain.process_command(command1)
await domain.process_command(command2)

2. Index Frequently Queried Fields

Create indexes on frequently queried fields:

await state_store.create_index('user', 'email')
await state_store.create_index('order', ['customer_id', 'status'])

3. Use Connection Pooling

Configure connection pools appropriately:

# Production: Larger pool
pool_size=20, max_overflow=40

# Development: Smaller pool
pool_size=5, max_overflow=10

4. Monitor Performance

Monitor database performance:

# Track query performance
import time
start = time.time()
user = await domain.statemgr.fetch('user', user_id)
duration = time.time() - start
logger.info(f"Query took {duration}s")

Next Steps