Skip to content

Domain Models

Project Model

bani.domain.project

Project model — represents a parsed BDL migration project (Section 7).

These dataclasses hold the in-memory representation of a BDL document after parsing. The BDL parser (separate module) creates ProjectModel instances; the orchestrator consumes them.

SyncStrategy

Bases: Enum

Incremental sync strategy (Section 13.1).

Source code in src/bani/domain/project.py
class SyncStrategy(Enum):
    """Incremental sync strategy (Section 13.1)."""

    FULL = auto()
    TIMESTAMP = auto()
    ROWVERSION = auto()
    CHECKSUM = auto()

WriteStrategy

Bases: Enum

How data is written to the target.

Source code in src/bani/domain/project.py
class WriteStrategy(Enum):
    """How data is written to the target."""

    INSERT = auto()
    UPSERT = auto()
    TRUNCATE_INSERT = auto()

ErrorHandlingStrategy

Bases: Enum

How errors are handled during migration.

Source code in src/bani/domain/project.py
class ErrorHandlingStrategy(Enum):
    """How errors are handled during migration."""

    ABORT = "fail-fast"
    LOG_AND_CONTINUE = "log-and-continue"

ConnectionConfig dataclass

Connection configuration for a source or target database.

Credential values are never stored directly — only ${env:VAR} references that are resolved at runtime.

Attributes:

Name Type Description
dialect str

Database dialect, e.g. "mysql", "postgresql".

host str

Database host.

port int

Database port.

database str

Database name.

username_env str

Environment variable name for the username.

password_env str

Environment variable name for the password.

extra tuple[tuple[str, str], ...]

Additional connector-specific configuration.

encrypt bool

Whether to use TLS (default False).

Source code in src/bani/domain/project.py
@dataclass(frozen=True)
class ConnectionConfig:
    """Connection configuration for a source or target database.

    Credential values are never stored directly — only ``${env:VAR}``
    references that are resolved at runtime.

    Attributes:
        dialect: Database dialect, e.g. ``"mysql"``, ``"postgresql"``.
        host: Database host.
        port: Database port.
        database: Database name.
        username_env: Environment variable name for the username.
        password_env: Environment variable name for the password.
        extra: Additional connector-specific configuration.
        encrypt: Whether to use TLS (default ``False``).
    """

    dialect: str
    host: str = ""
    port: int = 0
    database: str = ""
    username_env: str = ""
    password_env: str = ""
    extra: tuple[tuple[str, str], ...] = ()
    encrypt: bool = False

ColumnMapping dataclass

Mapping for a single column in a table.

Attributes:

Name Type Description
source_name str

Source column name.

target_name str

Target column name.

target_type str | None

Optional type override for the target column.

Source code in src/bani/domain/project.py
@dataclass(frozen=True)
class ColumnMapping:
    """Mapping for a single column in a table.

    Attributes:
        source_name: Source column name.
        target_name: Target column name.
        target_type: Optional type override for the target column.
    """

    source_name: str
    target_name: str
    target_type: str | None = None

TypeMappingOverride dataclass

Type mapping override from source to target type.

Attributes:

Name Type Description
source_type str

Source data type.

target_type str

Target data type to map to.

Source code in src/bani/domain/project.py
@dataclass(frozen=True)
class TypeMappingOverride:
    """Type mapping override from source to target type.

    Attributes:
        source_type: Source data type.
        target_type: Target data type to map to.
    """

    source_type: str
    target_type: str

ProjectOptions dataclass

Project-level configuration options.

Attributes:

Name Type Description
batch_size int

Number of rows per batch.

parallel_workers int

Number of parallel workers.

memory_limit_mb int

Memory limit in MB.

on_error ErrorHandlingStrategy

Error handling strategy.

create_target_schema bool

Whether to create target schema.

drop_target_tables_first bool

Whether to drop target tables first.

transfer_indexes bool

Whether to transfer indexes.

transfer_foreign_keys bool

Whether to transfer foreign keys.

transfer_defaults bool

Whether to transfer default values.

transfer_check_constraints bool

Whether to transfer check constraints.

Source code in src/bani/domain/project.py
@dataclass(frozen=True)
class ProjectOptions:
    """Project-level configuration options.

    Attributes:
        batch_size: Number of rows per batch.
        parallel_workers: Number of parallel workers.
        memory_limit_mb: Memory limit in MB.
        on_error: Error handling strategy.
        create_target_schema: Whether to create target schema.
        drop_target_tables_first: Whether to drop target tables first.
        transfer_indexes: Whether to transfer indexes.
        transfer_foreign_keys: Whether to transfer foreign keys.
        transfer_defaults: Whether to transfer default values.
        transfer_check_constraints: Whether to transfer check constraints.
    """

    batch_size: int = 100_000
    parallel_workers: int = 4
    memory_limit_mb: int = 2048
    on_error: ErrorHandlingStrategy = ErrorHandlingStrategy.LOG_AND_CONTINUE
    create_target_schema: bool = True
    drop_target_tables_first: bool = False
    transfer_indexes: bool = True
    transfer_foreign_keys: bool = True
    transfer_defaults: bool = True
    transfer_check_constraints: bool = True

ScheduleConfig dataclass

Configuration for scheduled migrations.

Attributes:

Name Type Description
enabled bool

Whether the schedule is enabled.

cron str | None

Cron expression for scheduling (optional).

timezone str

Timezone for cron evaluation.

max_retries int

Maximum number of retries on failure.

retry_delay_seconds int

Delay in seconds between retries.

Source code in src/bani/domain/project.py
@dataclass(frozen=True)
class ScheduleConfig:
    """Configuration for scheduled migrations.

    Attributes:
        enabled: Whether the schedule is enabled.
        cron: Cron expression for scheduling (optional).
        timezone: Timezone for cron evaluation.
        max_retries: Maximum number of retries on failure.
        retry_delay_seconds: Delay in seconds between retries.
    """

    enabled: bool = False
    cron: str | None = None
    timezone: str = "UTC"
    max_retries: int = 0
    retry_delay_seconds: int = 0

SyncConfig dataclass

Configuration for incremental sync.

Attributes:

Name Type Description
enabled bool

Whether sync is enabled.

strategy SyncStrategy

Sync strategy to use.

tracking_columns tuple[tuple[str, str], ...]

Tuples of (table, column) for tracking changes.

Source code in src/bani/domain/project.py
@dataclass(frozen=True)
class SyncConfig:
    """Configuration for incremental sync.

    Attributes:
        enabled: Whether sync is enabled.
        strategy: Sync strategy to use.
        tracking_columns: Tuples of (table, column) for tracking changes.
    """

    enabled: bool = False
    strategy: SyncStrategy = SyncStrategy.FULL
    tracking_columns: tuple[tuple[str, str], ...] = ()

TableMapping dataclass

Mapping configuration for a single table.

Attributes:

Name Type Description
source_schema str

Source schema name.

source_table str

Source table name.

target_schema str

Target schema name (may differ from source).

target_table str

Target table name (may differ from source).

column_mappings tuple[ColumnMapping, ...]

Tuple of column mappings (empty = all).

filter_sql str | None

Optional WHERE clause to filter source rows.

write_strategy WriteStrategy

How to write data to the target.

batch_size int | None

Number of rows per batch (None = use project default).

Source code in src/bani/domain/project.py
@dataclass(frozen=True)
class TableMapping:
    """Mapping configuration for a single table.

    Attributes:
        source_schema: Source schema name.
        source_table: Source table name.
        target_schema: Target schema name (may differ from source).
        target_table: Target table name (may differ from source).
        column_mappings: Tuple of column mappings (empty = all).
        filter_sql: Optional WHERE clause to filter source rows.
        write_strategy: How to write data to the target.
        batch_size: Number of rows per batch (``None`` = use project default).
    """

    source_schema: str
    source_table: str
    target_schema: str = ""
    target_table: str = ""
    column_mappings: tuple[ColumnMapping, ...] = ()
    filter_sql: str | None = None
    write_strategy: WriteStrategy = WriteStrategy.INSERT
    batch_size: int | None = None

HookConfig dataclass

Configuration for a pre- or post-migration hook.

Attributes:

Name Type Description
name str

Human-readable hook name.

event str

When the hook runs (e.g. "before-migration", "after-migration", "before-table", "after-table").

command str

Command text (SQL statement or shell command).

hook_type str

"sql" or "shell".

target str

For SQL hooks, which connection to run against: "source" or "target".

table_name str

For per-table hooks, the table this applies to.

timeout_seconds int

Maximum execution time before the hook is killed.

on_failure str

Action on failure: "abort" or "continue".

Source code in src/bani/domain/project.py
@dataclass(frozen=True)
class HookConfig:
    """Configuration for a pre- or post-migration hook.

    Attributes:
        name: Human-readable hook name.
        event: When the hook runs (e.g. ``"before-migration"``,
            ``"after-migration"``, ``"before-table"``, ``"after-table"``).
        command: Command text (SQL statement or shell command).
        hook_type: ``"sql"`` or ``"shell"``.
        target: For SQL hooks, which connection to run against:
            ``"source"`` or ``"target"``.
        table_name: For per-table hooks, the table this applies to.
        timeout_seconds: Maximum execution time before the hook is killed.
        on_failure: Action on failure: ``"abort"`` or ``"continue"``.
    """

    name: str
    event: str
    command: str
    hook_type: str = "shell"
    target: str = ""
    table_name: str = ""
    timeout_seconds: int = 300
    on_failure: str = "abort"

ProjectModel dataclass

In-memory representation of a parsed BDL migration project.

Attributes:

Name Type Description
name str

Project name.

source ConnectionConfig | None

Source database connection configuration.

target ConnectionConfig | None

Target database connection configuration.

description str

Optional project description.

author str

Project author.

created datetime | None

Project creation timestamp.

tags tuple[str, ...]

Tuple of project tags.

table_mappings tuple[TableMapping, ...]

Tuple of per-table mapping configurations.

type_overrides tuple[TypeMappingOverride, ...]

Optional user-supplied type mapping overrides.

options ProjectOptions | None

Project-level configuration options.

hooks tuple[HookConfig, ...]

Tuple of pre/post migration hooks.

schedule ScheduleConfig | None

Schedule configuration for migrations.

sync SyncConfig | None

Sync configuration for incremental migrations.

Source code in src/bani/domain/project.py
@dataclass(frozen=True)
class ProjectModel:
    """In-memory representation of a parsed BDL migration project.

    Attributes:
        name: Project name.
        source: Source database connection configuration.
        target: Target database connection configuration.
        description: Optional project description.
        author: Project author.
        created: Project creation timestamp.
        tags: Tuple of project tags.
        table_mappings: Tuple of per-table mapping configurations.
        type_overrides: Optional user-supplied type mapping overrides.
        options: Project-level configuration options.
        hooks: Tuple of pre/post migration hooks.
        schedule: Schedule configuration for migrations.
        sync: Sync configuration for incremental migrations.
    """

    name: str
    source: ConnectionConfig | None = None
    target: ConnectionConfig | None = None
    description: str = ""
    author: str = ""
    created: datetime | None = None
    tags: tuple[str, ...] = ()
    table_mappings: tuple[TableMapping, ...] = ()
    type_overrides: tuple[TypeMappingOverride, ...] = ()
    options: ProjectOptions | None = None
    hooks: tuple[HookConfig, ...] = ()
    schedule: ScheduleConfig | None = None
    sync: SyncConfig | None = None

MigrationPlan dataclass

A resolved plan ready for execution.

Created from a ProjectModel after schema introspection and dependency resolution. Contains the ordered list of tables and their resolved configurations.

Attributes:

Name Type Description
project ProjectModel

The source project model.

ordered_tables tuple[str, ...]

Tables in dependency-safe execution order.

deferred_fk_tables tuple[str, ...]

Tables whose FK constraints are deferred.

total_estimated_rows int | None

Sum of row count estimates across all tables.

Source code in src/bani/domain/project.py
@dataclass(frozen=True)
class MigrationPlan:
    """A resolved plan ready for execution.

    Created from a ``ProjectModel`` after schema introspection and dependency
    resolution. Contains the ordered list of tables and their resolved
    configurations.

    Attributes:
        project: The source project model.
        ordered_tables: Tables in dependency-safe execution order.
        deferred_fk_tables: Tables whose FK constraints are deferred.
        total_estimated_rows: Sum of row count estimates across all tables.
    """

    project: ProjectModel
    ordered_tables: tuple[str, ...]
    deferred_fk_tables: tuple[str, ...] = ()
    total_estimated_rows: int | None = None

Schema Model

bani.domain.schema

Schema introspection models (Section 11.1).

All dataclasses are frozen and use tuples (not lists) for collection fields, ensuring immutability and thread-safety as required by Section 11.2.

ConstraintType

Bases: Enum

Types of constraints that can be applied to a table.

Source code in src/bani/domain/schema.py
class ConstraintType(Enum):
    """Types of constraints that can be applied to a table."""

    PRIMARY_KEY = auto()
    UNIQUE = auto()
    CHECK = auto()
    FOREIGN_KEY = auto()

ColumnDefinition dataclass

A single column within a table.

Attributes:

Name Type Description
name str

Column name.

data_type str

Raw source type string, e.g. "VARCHAR(255)".

nullable bool

Whether the column allows NULL values.

default_value str | None

Default value expression, if any.

is_auto_increment bool

Whether the column auto-increments.

ordinal_position int

0-based position of the column in the table.

arrow_type_str str | None

Canonical Arrow type string (e.g. "int32", "timestamp[us]"). Populated during schema introspection and used by sink connectors to generate target-native DDL without needing NxN source→target translation tables. None when the column has not been through introspection (e.g. hand-built TableDefinition in tests).

Source code in src/bani/domain/schema.py
@dataclass(frozen=True)
class ColumnDefinition:
    """A single column within a table.

    Attributes:
        name: Column name.
        data_type: Raw source type string, e.g. ``"VARCHAR(255)"``.
        nullable: Whether the column allows NULL values.
        default_value: Default value expression, if any.
        is_auto_increment: Whether the column auto-increments.
        ordinal_position: 0-based position of the column in the table.
        arrow_type_str: Canonical Arrow type string (e.g. ``"int32"``,
            ``"timestamp[us]"``).  Populated during schema introspection
            and used by sink connectors to generate target-native DDL
            without needing NxN source→target translation tables.
            ``None`` when the column has not been through introspection
            (e.g. hand-built ``TableDefinition`` in tests).
    """

    name: str
    data_type: str
    nullable: bool = True
    default_value: str | None = None
    is_auto_increment: bool = False
    ordinal_position: int = 0
    arrow_type_str: str | None = None

IndexDefinition dataclass

An index on a table.

Attributes:

Name Type Description
name str

Index name.

columns tuple[str, ...]

Ordered tuple of column names in the index.

is_unique bool

Whether the index enforces uniqueness.

is_clustered bool

Whether the index is clustered.

filter_expression str | None

Optional partial-index filter expression.

Source code in src/bani/domain/schema.py
@dataclass(frozen=True)
class IndexDefinition:
    """An index on a table.

    Attributes:
        name: Index name.
        columns: Ordered tuple of column names in the index.
        is_unique: Whether the index enforces uniqueness.
        is_clustered: Whether the index is clustered.
        filter_expression: Optional partial-index filter expression.
    """

    name: str
    columns: tuple[str, ...]
    is_unique: bool = False
    is_clustered: bool = False
    filter_expression: str | None = None

ForeignKeyDefinition dataclass

A foreign key relationship between two tables.

Attributes:

Name Type Description
name str

Constraint name.

source_table str

Fully qualified name of the referencing table.

source_columns tuple[str, ...]

Columns in the referencing table.

referenced_table str

Fully qualified name of the referenced table.

referenced_columns tuple[str, ...]

Columns in the referenced table.

on_delete str

Referential action on delete.

on_update str

Referential action on update.

Source code in src/bani/domain/schema.py
@dataclass(frozen=True)
class ForeignKeyDefinition:
    """A foreign key relationship between two tables.

    Attributes:
        name: Constraint name.
        source_table: Fully qualified name of the referencing table.
        source_columns: Columns in the referencing table.
        referenced_table: Fully qualified name of the referenced table.
        referenced_columns: Columns in the referenced table.
        on_delete: Referential action on delete.
        on_update: Referential action on update.
    """

    name: str
    source_table: str
    source_columns: tuple[str, ...]
    referenced_table: str
    referenced_columns: tuple[str, ...]
    on_delete: str = "NO ACTION"
    on_update: str = "NO ACTION"

TableDefinition dataclass

A table within a database schema.

Attributes:

Name Type Description
schema_name str

Database schema (namespace) the table belongs to.

table_name str

Name of the table.

columns tuple[ColumnDefinition, ...]

Ordered tuple of column definitions.

primary_key tuple[str, ...]

Tuple of column names forming the primary key.

indexes tuple[IndexDefinition, ...]

Tuple of index definitions on this table.

foreign_keys tuple[ForeignKeyDefinition, ...]

Tuple of foreign key definitions on this table.

check_constraints tuple[str, ...]

Tuple of CHECK constraint expressions.

row_count_estimate int | None

Estimated row count from schema introspection, or None if unavailable.

Source code in src/bani/domain/schema.py
@dataclass(frozen=True)
class TableDefinition:
    """A table within a database schema.

    Attributes:
        schema_name: Database schema (namespace) the table belongs to.
        table_name: Name of the table.
        columns: Ordered tuple of column definitions.
        primary_key: Tuple of column names forming the primary key.
        indexes: Tuple of index definitions on this table.
        foreign_keys: Tuple of foreign key definitions on this table.
        check_constraints: Tuple of CHECK constraint expressions.
        row_count_estimate: Estimated row count from schema introspection,
            or ``None`` if unavailable.
    """

    schema_name: str
    table_name: str
    columns: tuple[ColumnDefinition, ...]
    primary_key: tuple[str, ...] = ()
    indexes: tuple[IndexDefinition, ...] = ()
    foreign_keys: tuple[ForeignKeyDefinition, ...] = ()
    check_constraints: tuple[str, ...] = ()
    row_count_estimate: int | None = None

    @property
    def fully_qualified_name(self) -> str:
        """Return ``schema_name.table_name``."""
        return f"{self.schema_name}.{self.table_name}"

fully_qualified_name property

Return schema_name.table_name.

DatabaseSchema dataclass

The full schema of a database as returned by introspection.

Attributes:

Name Type Description
tables tuple[TableDefinition, ...]

Tuple of table definitions.

source_dialect str

Dialect identifier, e.g. "postgresql", "mssql".

Source code in src/bani/domain/schema.py
@dataclass(frozen=True)
class DatabaseSchema:
    """The full schema of a database as returned by introspection.

    Attributes:
        tables: Tuple of table definitions.
        source_dialect: Dialect identifier, e.g. ``"postgresql"``, ``"mssql"``.
    """

    tables: tuple[TableDefinition, ...]
    source_dialect: str

    def get_table(self, schema_name: str, table_name: str) -> TableDefinition | None:
        """Look up a table by schema and name.

        Args:
            schema_name: The schema (namespace) to search in.
            table_name: The table name to find.

        Returns:
            The matching ``TableDefinition``, or ``None`` if not found.
        """
        for table in self.tables:
            if table.schema_name == schema_name and table.table_name == table_name:
                return table
        return None

get_table(schema_name, table_name)

Look up a table by schema and name.

Parameters:

Name Type Description Default
schema_name str

The schema (namespace) to search in.

required
table_name str

The table name to find.

required

Returns:

Type Description
TableDefinition | None

The matching TableDefinition, or None if not found.

Source code in src/bani/domain/schema.py
def get_table(self, schema_name: str, table_name: str) -> TableDefinition | None:
    """Look up a table by schema and name.

    Args:
        schema_name: The schema (namespace) to search in.
        table_name: The table name to find.

    Returns:
        The matching ``TableDefinition``, or ``None`` if not found.
    """
    for table in self.tables:
        if table.schema_name == schema_name and table.table_name == table_name:
            return table
    return None

Error Hierarchy

bani.domain.errors

Domain-specific exception hierarchy for Bani.

All exceptions inherit from BaniError. Each exception carries structured context so that callers can programmatically inspect the failure without parsing message strings.

Hierarchy (Section 12.1)::

BaniError
├── ConfigurationError
│   ├── BDLValidationError
│   ├── ConnectionConfigError
│   └── TypeMappingError
├── ConnectionError
│   ├── SourceConnectionError
│   └── TargetConnectionError
├── SchemaError
│   ├── IntrospectionError
│   ├── SchemaTranslationError
│   └── DependencyResolutionError
├── DataTransferError
│   ├── ReadError
│   ├── WriteError
│   ├── BatchError
│   └── TransformError
├── HookExecutionError
└── SchedulerError

BaniError

Bases: Exception

Base exception for all Bani errors.

Source code in src/bani/domain/errors.py
class BaniError(Exception):
    """Base exception for all Bani errors."""

    def __init__(self, message: str, **context: object) -> None:
        super().__init__(message)
        self.context = context

ConfigurationError

Bases: BaniError

Error in project or connection configuration.

Source code in src/bani/domain/errors.py
class ConfigurationError(BaniError):
    """Error in project or connection configuration."""

BDLValidationError

Bases: ConfigurationError

BDL document failed XSD or semantic validation.

Source code in src/bani/domain/errors.py
class BDLValidationError(ConfigurationError):
    """BDL document failed XSD or semantic validation."""

    def __init__(
        self,
        message: str,
        *,
        document_path: str | None = None,
        line_number: int | None = None,
        **context: object,
    ) -> None:
        super().__init__(
            message,
            document_path=document_path,
            line_number=line_number,
            **context,
        )
        self.document_path = document_path
        self.line_number = line_number

ConnectionConfigError

Bases: ConfigurationError

Invalid or missing connection configuration.

Source code in src/bani/domain/errors.py
class ConnectionConfigError(ConfigurationError):
    """Invalid or missing connection configuration."""

    def __init__(
        self,
        message: str,
        *,
        connection_name: str | None = None,
        **context: object,
    ) -> None:
        super().__init__(message, connection_name=connection_name, **context)
        self.connection_name = connection_name

TypeMappingError

Bases: ConfigurationError

A source type could not be mapped to a target type.

Source code in src/bani/domain/errors.py
class TypeMappingError(ConfigurationError):
    """A source type could not be mapped to a target type."""

    def __init__(
        self,
        message: str,
        *,
        source_type: str | None = None,
        target_dialect: str | None = None,
        **context: object,
    ) -> None:
        super().__init__(
            message,
            source_type=source_type,
            target_dialect=target_dialect,
            **context,
        )
        self.source_type = source_type
        self.target_dialect = target_dialect

BaniConnectionError

Bases: BaniError

Base for connection-related failures.

Named BaniConnectionError to avoid shadowing the built-in ConnectionError.

Source code in src/bani/domain/errors.py
class BaniConnectionError(BaniError):
    """Base for connection-related failures.

    Named ``BaniConnectionError`` to avoid shadowing the built-in
    ``ConnectionError``.
    """

SourceConnectionError

Bases: BaniConnectionError

Failed to connect to the source database.

Source code in src/bani/domain/errors.py
class SourceConnectionError(BaniConnectionError):
    """Failed to connect to the source database."""

TargetConnectionError

Bases: BaniConnectionError

Failed to connect to the target database.

Source code in src/bani/domain/errors.py
class TargetConnectionError(BaniConnectionError):
    """Failed to connect to the target database."""

SchemaError

Bases: BaniError

Error during schema introspection or translation.

Source code in src/bani/domain/errors.py
class SchemaError(BaniError):
    """Error during schema introspection or translation."""

IntrospectionError

Bases: SchemaError

Failed to introspect the source database schema.

Source code in src/bani/domain/errors.py
class IntrospectionError(SchemaError):
    """Failed to introspect the source database schema."""

SchemaTranslationError

Bases: SchemaError

Failed to translate schema between source and target dialects.

Source code in src/bani/domain/errors.py
class SchemaTranslationError(SchemaError):
    """Failed to translate schema between source and target dialects."""

DependencyResolutionError

Bases: SchemaError

Failed to resolve table dependencies (e.g. circular FK chains).

Source code in src/bani/domain/errors.py
class DependencyResolutionError(SchemaError):
    """Failed to resolve table dependencies (e.g. circular FK chains)."""

    def __init__(
        self,
        message: str,
        *,
        tables: tuple[str, ...] = (),
        **context: object,
    ) -> None:
        super().__init__(message, tables=tables, **context)
        self.tables = tables

DataTransferError

Bases: BaniError

Error during data transfer between source and target.

Source code in src/bani/domain/errors.py
class DataTransferError(BaniError):
    """Error during data transfer between source and target."""

ReadError

Bases: DataTransferError

Failed to read data from the source.

Source code in src/bani/domain/errors.py
class ReadError(DataTransferError):
    """Failed to read data from the source."""

WriteError

Bases: DataTransferError

Failed to write data to the target.

Source code in src/bani/domain/errors.py
class WriteError(DataTransferError):
    """Failed to write data to the target."""

BatchError

Bases: DataTransferError

A specific batch failed during transfer.

Carries the batch number and the offset of the first row in the batch so that the resumability protocol can pick up from the right place.

Source code in src/bani/domain/errors.py
class BatchError(DataTransferError):
    """A specific batch failed during transfer.

    Carries the batch number and the offset of the first row in the batch so
    that the resumability protocol can pick up from the right place.
    """

    def __init__(
        self,
        message: str,
        *,
        batch_number: int,
        first_row_offset: int,
        **context: object,
    ) -> None:
        super().__init__(
            message,
            batch_number=batch_number,
            first_row_offset=first_row_offset,
            **context,
        )
        self.batch_number = batch_number
        self.first_row_offset = first_row_offset

TransformError

Bases: DataTransferError

A transform step failed during the pipeline.

Source code in src/bani/domain/errors.py
class TransformError(DataTransferError):
    """A transform step failed during the pipeline."""

HookExecutionError

Bases: BaniError

A pre- or post-migration hook failed.

Source code in src/bani/domain/errors.py
class HookExecutionError(BaniError):
    """A pre- or post-migration hook failed."""

SchedulerError

Bases: BaniError

Scheduler-related failure (cron integration, task scheduling).

Source code in src/bani/domain/errors.py
class SchedulerError(BaniError):
    """Scheduler-related failure (cron integration, task scheduling)."""