Skip to content

DDD Service (ORM) — Bank Balance Alert Example

A complete illustrative example showing how to structure a banking feature with balance alerts using DDD hexagonal architecture and SQLAlchemy ORM.

See also: Core concepts · Usage examples · External API calls


Overview

This example demonstrates: - Multiple ports (repository + notification) - SQLAlchemy models with relationships - Domain entities with business meaning - Use-case orchestrating multiple collaborators - Infrastructure adapters for persistence and notifications

Not scaffolded — illustrative only.


Domain Layer

Entities

modules/banking/domain/entities.py

from dataclasses import dataclass
from datetime import datetime
from typing import Optional


@dataclass
class Account:
    id: str
    owner_email: str
    balance: float
    updated_at: datetime
    owner_name: Optional[str] = None


@dataclass
class AlertLog:
    """Record of sent alerts for auditing."""
    id: str
    account_id: str
    threshold: float
    balance_at_alert: float
    sent_at: datetime

Ports

modules/banking/domain/ports.py

from typing import Protocol, Optional
from datetime import datetime
from .entities import Account, AlertLog


class AccountRepository(Protocol):
    """Port for account persistence."""

    def get(self, account_id: str) -> Optional[Account]:
        """Retrieve an account by ID."""
        ...

    def save(self, account: Account) -> Account:
        """Persist an account."""
        ...

    def get_below_threshold(self, threshold: float) -> list[Account]:
        """Get all accounts with balance below threshold."""
        ...


class AlertLogRepository(Protocol):
    """Port for alert log persistence."""

    def save(self, alert: AlertLog) -> AlertLog:
        """Persist an alert log entry."""
        ...

    def get_recent_for_account(
        self, account_id: str, since: datetime
    ) -> list[AlertLog]:
        """Get recent alerts for an account."""
        ...


class NotificationPort(Protocol):
    """Port for sending notifications."""

    def send_balance_alert(
        self,
        to_email: str,
        owner_name: str,
        current_balance: float,
        threshold: float,
    ) -> bool:
        """Send a low-balance alert. Returns True if sent successfully."""
        ...


Infrastructure Layer — SQLAlchemy Models

modules/banking/infrastructure/models.py

from datetime import datetime
from sqlalchemy import String, Float, DateTime, ForeignKey, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from core.infrastructure.database import Base, generate_uuid


class AccountModel(Base):
    """SQLAlchemy model for bank accounts."""

    __tablename__ = "accounts"

    id: Mapped[str] = mapped_column(String(36), primary_key=True, default=generate_uuid)
    owner_email: Mapped[str] = mapped_column(String(255), nullable=False, index=True)
    owner_name: Mapped[str] = mapped_column(String(255), nullable=True)
    balance: Mapped[float] = mapped_column(Float, nullable=False, default=0.0)
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        onupdate=func.now(),
    )

    # Relationship to alert logs
    alerts: Mapped[list["AlertLogModel"]] = relationship(
        "AlertLogModel", back_populates="account", cascade="all, delete-orphan"
    )

    def to_entity(self) -> "Account":
        from ..domain.entities import Account
        return Account(
            id=self.id,
            owner_email=self.owner_email,
            owner_name=self.owner_name,
            balance=self.balance,
            updated_at=self.updated_at,
        )

    @classmethod
    def from_entity(cls, entity: "Account") -> "AccountModel":
        return cls(
            id=entity.id,
            owner_email=entity.owner_email,
            owner_name=entity.owner_name,
            balance=entity.balance,
            updated_at=entity.updated_at,
        )


class AlertLogModel(Base):
    """SQLAlchemy model for alert audit logs."""

    __tablename__ = "alert_logs"

    id: Mapped[str] = mapped_column(String(36), primary_key=True, default=generate_uuid)
    account_id: Mapped[str] = mapped_column(
        String(36), ForeignKey("accounts.id"), nullable=False, index=True
    )
    threshold: Mapped[float] = mapped_column(Float, nullable=False)
    balance_at_alert: Mapped[float] = mapped_column(Float, nullable=False)
    sent_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )

    # Relationship back to account
    account: Mapped["AccountModel"] = relationship("AccountModel", back_populates="alerts")

    def to_entity(self) -> "AlertLog":
        from ..domain.entities import AlertLog
        return AlertLog(
            id=self.id,
            account_id=self.account_id,
            threshold=self.threshold,
            balance_at_alert=self.balance_at_alert,
            sent_at=self.sent_at,
        )


Infrastructure Layer — Repositories

modules/banking/infrastructure/repositories.py

from datetime import datetime
from typing import Optional
from sqlalchemy.orm import Session
from sqlalchemy import select
from core.infrastructure.database import Repository, generate_uuid
from .models import AccountModel, AlertLogModel
from ..domain.entities import Account, AlertLog
from ..domain.ports import AccountRepository, AlertLogRepository


class SQLAlchemyAccountRepository(AccountRepository, Repository):
    """SQLAlchemy implementation of AccountRepository."""

    def __init__(self, session: Session):
        super().__init__(session)

    def get(self, account_id: str) -> Optional[Account]:
        model = self.session.get(AccountModel, account_id)
        return model.to_entity() if model else None

    def save(self, account: Account) -> Account:
        model = self.session.get(AccountModel, account.id)
        if model:
            # Update existing
            model.owner_email = account.owner_email
            model.owner_name = account.owner_name
            model.balance = account.balance
        else:
            # Create new
            model = AccountModel.from_entity(account)
            self.session.add(model)
        self.session.flush()
        return account

    def get_below_threshold(self, threshold: float) -> list[Account]:
        stmt = select(AccountModel).where(AccountModel.balance < threshold)
        models = self.session.execute(stmt).scalars().all()
        return [m.to_entity() for m in models]


class SQLAlchemyAlertLogRepository(AlertLogRepository, Repository):
    """SQLAlchemy implementation of AlertLogRepository."""

    def __init__(self, session: Session):
        super().__init__(session)

    def save(self, alert: AlertLog) -> AlertLog:
        model = AlertLogModel(
            id=alert.id or generate_uuid(),
            account_id=alert.account_id,
            threshold=alert.threshold,
            balance_at_alert=alert.balance_at_alert,
            sent_at=alert.sent_at,
        )
        self.session.add(model)
        self.session.flush()
        alert.id = model.id
        return alert

    def get_recent_for_account(
        self, account_id: str, since: datetime
    ) -> list[AlertLog]:
        stmt = (
            select(AlertLogModel)
            .where(AlertLogModel.account_id == account_id)
            .where(AlertLogModel.sent_at >= since)
            .order_by(AlertLogModel.sent_at.desc())
        )
        models = self.session.execute(stmt).scalars().all()
        return [m.to_entity() for m in models]


Infrastructure Layer — Notification Adapters

modules/banking/infrastructure/notifications.py

from ..domain.ports import NotificationPort


class EmailNotificationAdapter(NotificationPort):
    """Email notification adapter (console output for demo)."""

    def send_balance_alert(
        self,
        to_email: str,
        owner_name: str,
        current_balance: float,
        threshold: float,
    ) -> bool:
        name = owner_name or "Customer"
        print(
            f"📧 Email -> {to_email}: "
            f"Hi {name}, your balance (${current_balance:.2f}) is below ${threshold:.2f}"
        )
        return True


class SlackNotificationAdapter(NotificationPort):
    """Alternative: Slack notification adapter."""

    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url

    def send_balance_alert(
        self,
        to_email: str,
        owner_name: str,
        current_balance: float,
        threshold: float,
    ) -> bool:
        import requests
        response = requests.post(self.webhook_url, json={
            "text": f"⚠️ Low balance alert for {owner_name} ({to_email}): ${current_balance:.2f} < ${threshold:.2f}"
        })
        return response.status_code == 200


Application Layer

modules/banking/application/balance_alert.py

from datetime import datetime, timedelta
from ..domain.entities import AlertLog
from ..domain.ports import AccountRepository, AlertLogRepository, NotificationPort
from core.infrastructure.database import generate_uuid


class BalanceAlertService:
    """Use-case: check account balance and send alert if below threshold."""

    def __init__(
        self,
        accounts: AccountRepository,
        alert_logs: AlertLogRepository,
        notifier: NotificationPort,
        cooldown_hours: int = 24,
    ):
        self.accounts = accounts
        self.alert_logs = alert_logs
        self.notifier = notifier
        self.cooldown_hours = cooldown_hours

    def execute(self, account_id: str, threshold: float) -> bool:
        """
        Check if account balance is below threshold and send alert.

        Respects cooldown period to avoid spamming.

        Returns:
            True if alert was sent, False otherwise.
        """
        account = self.accounts.get(account_id)
        if account is None:
            return False

        if account.balance >= threshold:
            return False

        # Check cooldown
        since = datetime.utcnow() - timedelta(hours=self.cooldown_hours)
        recent_alerts = self.alert_logs.get_recent_for_account(account_id, since)
        if recent_alerts:
            return False  # Already alerted recently

        # Send notification
        success = self.notifier.send_balance_alert(
            to_email=account.owner_email,
            owner_name=account.owner_name or "Customer",
            current_balance=account.balance,
            threshold=threshold,
        )

        if success:
            # Log the alert
            alert_log = AlertLog(
                id=generate_uuid(),
                account_id=account_id,
                threshold=threshold,
                balance_at_alert=account.balance,
                sent_at=datetime.utcnow(),
            )
            self.alert_logs.save(alert_log)

        return success


class BulkBalanceAlertService:
    """Use-case: check all accounts and send alerts for those below threshold."""

    def __init__(
        self,
        accounts: AccountRepository,
        alert_logs: AlertLogRepository,
        notifier: NotificationPort,
        cooldown_hours: int = 24,
    ):
        self.single_alert = BalanceAlertService(
            accounts, alert_logs, notifier, cooldown_hours
        )
        self.accounts = accounts

    def execute(self, threshold: float) -> int:
        """
        Check all accounts below threshold and send alerts.

        Returns:
            Number of alerts sent.
        """
        low_balance_accounts = self.accounts.get_below_threshold(threshold)
        alerts_sent = 0

        for account in low_balance_accounts:
            if self.single_alert.execute(account.id, threshold):
                alerts_sent += 1

        return alerts_sent


Wiring & Running

modules/banking/main.py

from datetime import datetime
from core.application import build_database_session
from core.infrastructure.database import generate_uuid
from .application.balance_alert import BalanceAlertService, BulkBalanceAlertService
from .infrastructure.notifications import EmailNotificationAdapter
from .infrastructure.repositories import (
    SQLAlchemyAccountRepository,
    SQLAlchemyAlertLogRepository,
)
from .domain.entities import Account


def run_demo() -> None:
    # Setup database
    db = build_database_session()
    db.create_tables()

    with db.session() as session:
        # Wire dependencies
        accounts = SQLAlchemyAccountRepository(session)
        alert_logs = SQLAlchemyAlertLogRepository(session)
        notifier = EmailNotificationAdapter()

        # Create services
        single_alert = BalanceAlertService(accounts, alert_logs, notifier)
        bulk_alert = BulkBalanceAlertService(accounts, alert_logs, notifier)

        # Seed data
        accounts.save(Account(
            id=generate_uuid(),
            owner_email="alice@example.com",
            owner_name="Alice",
            balance=49.0,
            updated_at=datetime.utcnow(),
        ))
        accounts.save(Account(
            id=generate_uuid(),
            owner_email="bob@example.com",
            owner_name="Bob",
            balance=150.0,
            updated_at=datetime.utcnow(),
        ))
        accounts.save(Account(
            id=generate_uuid(),
            owner_email="carol@example.com",
            owner_name="Carol",
            balance=25.0,
            updated_at=datetime.utcnow(),
        ))
        session.commit()

        # Execute bulk alerts
        print("\n--- Running bulk alerts (threshold: $50) ---")
        alerts_sent = bulk_alert.execute(threshold=50.0)
        session.commit()
        print(f"\nTotal alerts sent: {alerts_sent}")


if __name__ == "__main__":
    run_demo()

Output:

--- Running bulk alerts (threshold: $50) ---
📧 Email -> alice@example.com: Hi Alice, your balance ($49.00) is below $50.00
📧 Email -> carol@example.com: Hi Carol, your balance ($25.00) is below $50.00

Total alerts sent: 2


Testing

import pytest
from unittest.mock import Mock
from datetime import datetime, timedelta
from core.infrastructure.database import DatabaseSession, generate_uuid
from modules.banking.infrastructure.repositories import (
    SQLAlchemyAccountRepository,
    SQLAlchemyAlertLogRepository,
)
from modules.banking.application.balance_alert import BalanceAlertService
from modules.banking.domain.entities import Account, AlertLog


@pytest.fixture
def db_session():
    """Create an in-memory SQLite database for testing."""
    db = DatabaseSession("sqlite:///:memory:")
    db.create_tables()
    with db.session() as session:
        yield session


class TestBalanceAlertService:
    def test_sends_alert_when_below_threshold(self, db_session):
        accounts = SQLAlchemyAccountRepository(db_session)
        alert_logs = SQLAlchemyAlertLogRepository(db_session)
        mock_notifier = Mock()
        mock_notifier.send_balance_alert.return_value = True

        # Create account
        account = Account(
            id=generate_uuid(),
            owner_email="test@example.com",
            owner_name="Test User",
            balance=40.0,
            updated_at=datetime.utcnow(),
        )
        accounts.save(account)
        db_session.commit()

        service = BalanceAlertService(accounts, alert_logs, mock_notifier)
        result = service.execute(account.id, threshold=50.0)
        db_session.commit()

        assert result is True
        mock_notifier.send_balance_alert.assert_called_once_with(
            to_email="test@example.com",
            owner_name="Test User",
            current_balance=40.0,
            threshold=50.0,
        )

    def test_no_alert_when_above_threshold(self, db_session):
        accounts = SQLAlchemyAccountRepository(db_session)
        alert_logs = SQLAlchemyAlertLogRepository(db_session)
        mock_notifier = Mock()

        account = Account(
            id=generate_uuid(),
            owner_email="test@example.com",
            owner_name="Test",
            balance=100.0,
            updated_at=datetime.utcnow(),
        )
        accounts.save(account)
        db_session.commit()

        service = BalanceAlertService(accounts, alert_logs, mock_notifier)
        result = service.execute(account.id, threshold=50.0)

        assert result is False
        mock_notifier.send_balance_alert.assert_not_called()

    def test_respects_cooldown_period(self, db_session):
        accounts = SQLAlchemyAccountRepository(db_session)
        alert_logs = SQLAlchemyAlertLogRepository(db_session)
        mock_notifier = Mock()
        mock_notifier.send_balance_alert.return_value = True

        account = Account(
            id=generate_uuid(),
            owner_email="test@example.com",
            owner_name="Test",
            balance=40.0,
            updated_at=datetime.utcnow(),
        )
        accounts.save(account)
        db_session.commit()

        service = BalanceAlertService(accounts, alert_logs, mock_notifier, cooldown_hours=24)

        # First alert should succeed
        result1 = service.execute(account.id, threshold=50.0)
        db_session.commit()
        assert result1 is True

        # Second alert should be blocked by cooldown
        result2 = service.execute(account.id, threshold=50.0)
        assert result2 is False

        # Notifier called only once
        assert mock_notifier.send_balance_alert.call_count == 1

    def test_no_alert_when_account_not_found(self, db_session):
        accounts = SQLAlchemyAccountRepository(db_session)
        alert_logs = SQLAlchemyAlertLogRepository(db_session)
        mock_notifier = Mock()

        service = BalanceAlertService(accounts, alert_logs, mock_notifier)
        result = service.execute("nonexistent-id", threshold=50.0)

        assert result is False
        mock_notifier.send_balance_alert.assert_not_called()

Key Takeaways

Aspect Implementation
ORM Models AccountModel and AlertLogModel with SQLAlchemy relationships
Multiple ports AccountRepository + AlertLogRepository + NotificationPort
Audit trail Alert logs persisted for compliance and debugging
Cooldown Prevents spamming users with repeated alerts
Bulk operations BulkBalanceAlertService for batch processing
Testability In-memory SQLite + mocked notifier for fast tests