Data Persistence
Fluvius Framework supports multiple database backends and provides flexible data persistence options.
Database Drivers
PostgreSQL Driver
PostgreSQL is the recommended production database:
from fluvius.data import PostgreSQLDriver
driver = PostgreSQLDriver(
connection_string="postgresql://user:password@localhost:5432/mydb",
pool_size=10,
max_overflow=20
)
Features:
- ACID compliance
- JSON support
- Full-text search
- Transactions
- Connection pooling
MongoDB Driver
MongoDB for document storage:
from fluvius.data import MongoDBDriver
driver = MongoDBDriver(
connection_string="mongodb://localhost:27017/",
database="mydb"
)
Features:
- Document storage
- Flexible schema
- Horizontal scaling
- JSON-like documents
SQLite Driver
SQLite for development and testing:
from fluvius.data import SQLiteDriver
driver = SQLiteDriver(
database_path=":memory:", # or "path/to/database.db"
check_same_thread=False
)
Features:
- Zero configuration
- File-based
- Fast for development
- Easy testing
Event Store Persistence
Event Storage
Events are stored in the event store:
# Events are automatically stored
@action(evt_key='user-created')
async def create_user(self, name: str, email: str):
# Event is stored in event store
pass
Event Retrieval
Retrieve events from event store:
# Get all events for an aggregate
events = await domain.logstore.get_events('user', user_id)
# Get events by type
events = await domain.logstore.get_events_by_type('user-created')
# Get events in time range
events = await domain.logstore.get_events_in_range(
start_time, end_time
)
State Store Persistence
State Storage
Current state is stored in state store:
# State is automatically updated when events are stored
@action(evt_key='user-created')
async def create_user(self, name: str, email: str):
# State is updated in state store
self._state = UserState(name=name, email=email)
return self._state
State Queries
Query state from state store:
# Fetch single entity
user = await domain.statemgr.fetch('user', user_id)
# Find entities
users = await domain.statemgr.find('user', active=True)
# Query with filters
users = await domain.statemgr.find(
'user',
active=True,
limit=10,
offset=0
)
Transactions
Command Transactions
Commands are processed in transactions:
# Command processing is transactional
async with domain.transaction():
response = await domain.process_command(command)
# If error occurs, transaction is rolled back
Event Storage Transactions
Events are stored atomically:
# Event storage is atomic
# Either all events are stored or none
await event_store.append_many(events)
State Update Transactions
State updates are transactional:
# State updates are transactional
async with state_store.transaction():
await state_store.update('user', user_id, new_state)
Migrations
Alembic Migrations
Use Alembic for schema migrations:
# Create migration
alembic revision --autogenerate -m "Add user table"
# Apply migration
alembic upgrade head
# Rollback migration
alembic downgrade -1
Migration Files
# alembic/versions/001_add_user_table.py
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'users',
sa.Column('id', sa.String(), primary_key=True),
sa.Column('name', sa.String(), nullable=False),
sa.Column('email', sa.String(), nullable=False),
)
def downgrade():
op.drop_table('users')
Connection Pooling
Pool Configuration
Configure connection pools:
from fluvius.data import PostgreSQLDriver
driver = PostgreSQLDriver(
connection_string="postgresql://...",
pool_size=10, # Base pool size
max_overflow=20, # Maximum overflow
pool_timeout=30, # Timeout in seconds
pool_recycle=3600 # Recycle connections after 1 hour
)
Caching
State Caching
Cache state for faster queries:
from fluvius.data import CacheManager
cache = CacheManager(redis_url="redis://localhost:6379")
# Cache state queries
@cache.cached(ttl=300) # Cache for 5 minutes
async def get_user(user_id: str):
return await domain.statemgr.fetch('user', user_id)
Backup and Recovery
Event Log Backup
Backup event log:
# Export events
events = await domain.logstore.export_events()
# Import events
await domain.logstore.import_events(events)
State Backup
Backup state store:
# Export state
state = await state_store.export_state()
# Import state
await state_store.import_state(state)
Performance Optimization
Indexing
Create indexes for faster queries:
# Create index on state store
await state_store.create_index('user', 'email')
# Create composite index
await state_store.create_index('user', ['active', 'created_at'])
Query Optimization
Optimize queries:
# Use specific fields
users = await domain.statemgr.find(
'user',
active=True,
fields=['id', 'name', 'email'] # Only fetch needed fields
)
# Use pagination
users = await domain.statemgr.find(
'user',
limit=10,
offset=0
)
Data Models
State Models
Define state models:
from fluvius.data import DataModel, field
class UserState(DataModel):
name: str = field()
email: str = field()
active: bool = field(initial=True)
created_at: datetime = field(default_factory=datetime.now)
Model Validation
Models are automatically validated:
# Validation happens automatically
user = UserState(name="John", email="john@example.com")
# Invalid data raises ValidationError
Best Practices
1. Use Transactions
Always use transactions for multi-step operations:
async with domain.transaction():
# Multiple operations
await domain.process_command(command1)
await domain.process_command(command2)
2. Index Frequently Queried Fields
Create indexes on frequently queried fields:
await state_store.create_index('user', 'email')
await state_store.create_index('order', ['customer_id', 'status'])
3. Use Connection Pooling
Configure connection pools appropriately:
# Production: Larger pool
pool_size=20, max_overflow=40
# Development: Smaller pool
pool_size=5, max_overflow=10
4. Monitor Performance
Monitor database performance:
# Track query performance
import time
start = time.time()
user = await domain.statemgr.fetch('user', user_id)
duration = time.time() - start
logger.info(f"Query took {duration}s")
Next Steps
- Learn about Auth & Security
- Explore Main Abstractions
- Check Usage Guide