Skip to content

Connector Base Classes

bani.connectors.base

Abstract base classes for database connectors (Section 6.1).

Defines SourceConnector and SinkConnector abstract base classes that all database connectors must implement. These form the boundary between the domain layer and connector implementations.

SourceConnector

Bases: ABC

Port: reads schema and data from a source database.

A source connector is responsible for introspecting a source database's schema and reading data from its tables. It abstracts away database-specific details like SQL dialects and driver APIs.

Source code in src/bani/connectors/base.py
class SourceConnector(ABC):
    """Port: reads schema and data from a source database.

    A source connector is responsible for introspecting a source database's
    schema and reading data from its tables. It abstracts away database-specific
    details like SQL dialects and driver APIs.
    """

    _pool: ConnectionPool[Any] | None = None

    @abstractmethod
    def connect(self, config: ConnectionConfig, pool_size: int = 1) -> None:
        """Establish a connection to the source database.

        Args:
            config: Connection configuration with resolved credentials.
            pool_size: Number of connections to create in the pool.

        Raises:
            Exception: If the connection fails.
        """
        ...

    @abstractmethod
    def disconnect(self) -> None:
        """Close the connection to the source database.

        Raises:
            Exception: If disconnection fails.
        """
        ...

    def reconnect(self) -> None:
        """Re-establish the connection using the stored config.

        Connectors store ``_config`` during ``connect()``.  This method
        closes the existing connection (ignoring errors) and opens a
        fresh one.  Useful for recovering from network timeouts.
        """
        config = getattr(self, "_config", None)
        pool_size = getattr(self, "_pool_size", 1)
        if config is None:
            raise RuntimeError("Cannot reconnect: no stored config")
        try:
            self.disconnect()
        except Exception:
            pass
        self.connect(config, pool_size=pool_size)

    @contextmanager
    def checkout(self) -> Iterator[Any]:
        """Check out a connection from the pool.

        Falls back to yielding ``self.connection`` when no pool is
        configured (backward compatibility).
        """
        if self._pool is not None:
            with self._pool.acquire() as conn:
                yield conn
        else:
            yield getattr(self, "connection", None)

    @abstractmethod
    def introspect_schema(self) -> DatabaseSchema:
        """Introspect the complete schema of the source database.

        Reads all tables, columns, constraints, indexes, and other schema
        metadata and returns it as a DatabaseSchema object.

        Returns:
            A DatabaseSchema containing all tables and their metadata.

        Raises:
            Exception: If introspection fails.
        """
        ...

    @abstractmethod
    def read_table(
        self,
        table_name: str,
        schema_name: str,
        columns: list[str] | None = None,
        filter_sql: str | None = None,
        batch_size: int = 100_000,
    ) -> Iterator[pa.RecordBatch]:
        """Read data from a table as batches of Arrow records.

        Yields batches of data from the specified table. Uses server-side
        cursors for memory efficiency on large tables.

        Args:
            table_name: Name of the table to read from.
            schema_name: Schema (namespace) containing the table.
            columns: Optional list of column names to read. If None, all
                columns are read.
            filter_sql: Optional WHERE clause (without "WHERE" keyword) to
                filter rows.
            batch_size: Number of rows per batch.

        Yields:
            pyarrow.RecordBatch instances.

        Raises:
            Exception: If reading fails.
        """
        ...

    @abstractmethod
    def estimate_row_count(self, table_name: str, schema_name: str) -> int:
        """Get an estimated row count for a table.

        Returns a fast estimate based on schema statistics if available,
        or executes a COUNT(*) query if not.

        Args:
            table_name: Name of the table.
            schema_name: Schema containing the table.

        Returns:
            Estimated or exact row count.

        Raises:
            Exception: If the query fails.
        """
        ...

connect(config, pool_size=1) abstractmethod

Establish a connection to the source database.

Parameters:

Name Type Description Default
config ConnectionConfig

Connection configuration with resolved credentials.

required
pool_size int

Number of connections to create in the pool.

1

Raises:

Type Description
Exception

If the connection fails.

Source code in src/bani/connectors/base.py
@abstractmethod
def connect(self, config: ConnectionConfig, pool_size: int = 1) -> None:
    """Establish a connection to the source database.

    Args:
        config: Connection configuration with resolved credentials.
        pool_size: Number of connections to create in the pool.

    Raises:
        Exception: If the connection fails.
    """
    ...

disconnect() abstractmethod

Close the connection to the source database.

Raises:

Type Description
Exception

If disconnection fails.

Source code in src/bani/connectors/base.py
@abstractmethod
def disconnect(self) -> None:
    """Close the connection to the source database.

    Raises:
        Exception: If disconnection fails.
    """
    ...

reconnect()

Re-establish the connection using the stored config.

Connectors store _config during connect(). This method closes the existing connection (ignoring errors) and opens a fresh one. Useful for recovering from network timeouts.

Source code in src/bani/connectors/base.py
def reconnect(self) -> None:
    """Re-establish the connection using the stored config.

    Connectors store ``_config`` during ``connect()``.  This method
    closes the existing connection (ignoring errors) and opens a
    fresh one.  Useful for recovering from network timeouts.
    """
    config = getattr(self, "_config", None)
    pool_size = getattr(self, "_pool_size", 1)
    if config is None:
        raise RuntimeError("Cannot reconnect: no stored config")
    try:
        self.disconnect()
    except Exception:
        pass
    self.connect(config, pool_size=pool_size)

checkout()

Check out a connection from the pool.

Falls back to yielding self.connection when no pool is configured (backward compatibility).

Source code in src/bani/connectors/base.py
@contextmanager
def checkout(self) -> Iterator[Any]:
    """Check out a connection from the pool.

    Falls back to yielding ``self.connection`` when no pool is
    configured (backward compatibility).
    """
    if self._pool is not None:
        with self._pool.acquire() as conn:
            yield conn
    else:
        yield getattr(self, "connection", None)

introspect_schema() abstractmethod

Introspect the complete schema of the source database.

Reads all tables, columns, constraints, indexes, and other schema metadata and returns it as a DatabaseSchema object.

Returns:

Type Description
DatabaseSchema

A DatabaseSchema containing all tables and their metadata.

Raises:

Type Description
Exception

If introspection fails.

Source code in src/bani/connectors/base.py
@abstractmethod
def introspect_schema(self) -> DatabaseSchema:
    """Introspect the complete schema of the source database.

    Reads all tables, columns, constraints, indexes, and other schema
    metadata and returns it as a DatabaseSchema object.

    Returns:
        A DatabaseSchema containing all tables and their metadata.

    Raises:
        Exception: If introspection fails.
    """
    ...

read_table(table_name, schema_name, columns=None, filter_sql=None, batch_size=100000) abstractmethod

Read data from a table as batches of Arrow records.

Yields batches of data from the specified table. Uses server-side cursors for memory efficiency on large tables.

Parameters:

Name Type Description Default
table_name str

Name of the table to read from.

required
schema_name str

Schema (namespace) containing the table.

required
columns list[str] | None

Optional list of column names to read. If None, all columns are read.

None
filter_sql str | None

Optional WHERE clause (without "WHERE" keyword) to filter rows.

None
batch_size int

Number of rows per batch.

100000

Yields:

Type Description
RecordBatch

pyarrow.RecordBatch instances.

Raises:

Type Description
Exception

If reading fails.

Source code in src/bani/connectors/base.py
@abstractmethod
def read_table(
    self,
    table_name: str,
    schema_name: str,
    columns: list[str] | None = None,
    filter_sql: str | None = None,
    batch_size: int = 100_000,
) -> Iterator[pa.RecordBatch]:
    """Read data from a table as batches of Arrow records.

    Yields batches of data from the specified table. Uses server-side
    cursors for memory efficiency on large tables.

    Args:
        table_name: Name of the table to read from.
        schema_name: Schema (namespace) containing the table.
        columns: Optional list of column names to read. If None, all
            columns are read.
        filter_sql: Optional WHERE clause (without "WHERE" keyword) to
            filter rows.
        batch_size: Number of rows per batch.

    Yields:
        pyarrow.RecordBatch instances.

    Raises:
        Exception: If reading fails.
    """
    ...

estimate_row_count(table_name, schema_name) abstractmethod

Get an estimated row count for a table.

Returns a fast estimate based on schema statistics if available, or executes a COUNT(*) query if not.

Parameters:

Name Type Description Default
table_name str

Name of the table.

required
schema_name str

Schema containing the table.

required

Returns:

Type Description
int

Estimated or exact row count.

Raises:

Type Description
Exception

If the query fails.

Source code in src/bani/connectors/base.py
@abstractmethod
def estimate_row_count(self, table_name: str, schema_name: str) -> int:
    """Get an estimated row count for a table.

    Returns a fast estimate based on schema statistics if available,
    or executes a COUNT(*) query if not.

    Args:
        table_name: Name of the table.
        schema_name: Schema containing the table.

    Returns:
        Estimated or exact row count.

    Raises:
        Exception: If the query fails.
    """
    ...

SinkConnector

Bases: ABC

Port: writes schema and data to a target database.

A sink connector is responsible for creating tables and writing data to a target database. It abstracts away database-specific DDL and DML details.

Subclasses may override recommended_batch_size and recommended_parallel_workers to declare optimal defaults for the target engine. The orchestrator applies these when the project uses global defaults (i.e. the user did not explicitly set them).

Source code in src/bani/connectors/base.py
class SinkConnector(ABC):
    """Port: writes schema and data to a target database.

    A sink connector is responsible for creating tables and writing data
    to a target database. It abstracts away database-specific DDL and
    DML details.

    Subclasses may override ``recommended_batch_size`` and
    ``recommended_parallel_workers`` to declare optimal defaults for
    the target engine.  The orchestrator applies these when the project
    uses global defaults (i.e. the user did not explicitly set them).
    """

    _pool: ConnectionPool[Any] | None = None

    # Connector-specific performance recommendations.
    # None means "use the global default".
    recommended_batch_size: int | None = None
    recommended_parallel_workers: int | None = None

    @property
    def default_schema(self) -> str:
        """Return the sink's default schema name.

        The orchestrator uses this to remap source schema names so
        tables are created in the correct target schema/database.
        Subclasses should override to return the connected database
        name (MySQL/MSSQL) or owner (Oracle) or 'public' (PG).
        """
        return ""

    @abstractmethod
    def connect(self, config: ConnectionConfig, pool_size: int = 1) -> None:
        """Establish a connection to the target database.

        Args:
            config: Connection configuration with resolved credentials.
            pool_size: Number of connections to create in the pool.

        Raises:
            Exception: If the connection fails.
        """
        ...

    @abstractmethod
    def disconnect(self) -> None:
        """Close the connection to the target database.

        Raises:
            Exception: If disconnection fails.
        """
        ...

    def reconnect(self) -> None:
        """Re-establish the connection using the stored config.

        Connectors store ``_config`` during ``connect()``.  This method
        closes the existing connection (ignoring errors) and opens a
        fresh one.  Useful for recovering from network timeouts.
        """
        config = getattr(self, "_config", None)
        pool_size = getattr(self, "_pool_size", 1)
        if config is None:
            raise RuntimeError("Cannot reconnect: no stored config")
        try:
            self.disconnect()
        except Exception:
            pass
        self.connect(config, pool_size=pool_size)

    @contextmanager
    def checkout(self) -> Iterator[Any]:
        """Check out a connection from the pool.

        Falls back to yielding ``self.connection`` when no pool is
        configured (backward compatibility).
        """
        if self._pool is not None:
            with self._pool.acquire() as conn:
                yield conn
        else:
            yield getattr(self, "connection", None)

    @abstractmethod
    def create_table(self, table_def: TableDefinition) -> None:
        """Create a table in the target database.

        Creates a table with all columns and constraints as specified in
        the table definition. Primary key and check constraints are
        included in the CREATE TABLE statement.

        Args:
            table_def: TableDefinition describing the table to create.

        Raises:
            Exception: If table creation fails.
        """
        ...

    @abstractmethod
    def write_batch(
        self, table_name: str, schema_name: str, batch: pa.RecordBatch
    ) -> int:
        """Write a batch of Arrow records to a table.

        Uses the most efficient method available (e.g., COPY for PostgreSQL).

        Args:
            table_name: Name of the target table.
            schema_name: Schema containing the table.
            batch: A pyarrow.RecordBatch to write.

        Returns:
            Number of rows written.

        Raises:
            Exception: If writing fails.
        """
        ...

    @abstractmethod
    def create_indexes(
        self, table_name: str, schema_name: str, indexes: tuple[IndexDefinition, ...]
    ) -> None:
        """Create indexes on a table.

        Args:
            table_name: Name of the table.
            schema_name: Schema containing the table.
            indexes: Tuple of index definitions to create.

        Raises:
            Exception: If index creation fails.
        """
        ...

    @abstractmethod
    def create_foreign_keys(self, fks: tuple[ForeignKeyDefinition, ...]) -> None:
        """Create foreign key constraints.

        Args:
            fks: Tuple of foreign key definitions to create.

        Raises:
            Exception: If creation fails.
        """
        ...

    @abstractmethod
    def execute_sql(self, sql: str) -> None:
        """Execute arbitrary SQL.

        Used for custom DDL or DML not covered by other methods.

        Args:
            sql: SQL statement to execute.

        Raises:
            Exception: If execution fails.
        """
        ...

default_schema property

Return the sink's default schema name.

The orchestrator uses this to remap source schema names so tables are created in the correct target schema/database. Subclasses should override to return the connected database name (MySQL/MSSQL) or owner (Oracle) or 'public' (PG).

connect(config, pool_size=1) abstractmethod

Establish a connection to the target database.

Parameters:

Name Type Description Default
config ConnectionConfig

Connection configuration with resolved credentials.

required
pool_size int

Number of connections to create in the pool.

1

Raises:

Type Description
Exception

If the connection fails.

Source code in src/bani/connectors/base.py
@abstractmethod
def connect(self, config: ConnectionConfig, pool_size: int = 1) -> None:
    """Establish a connection to the target database.

    Args:
        config: Connection configuration with resolved credentials.
        pool_size: Number of connections to create in the pool.

    Raises:
        Exception: If the connection fails.
    """
    ...

disconnect() abstractmethod

Close the connection to the target database.

Raises:

Type Description
Exception

If disconnection fails.

Source code in src/bani/connectors/base.py
@abstractmethod
def disconnect(self) -> None:
    """Close the connection to the target database.

    Raises:
        Exception: If disconnection fails.
    """
    ...

reconnect()

Re-establish the connection using the stored config.

Connectors store _config during connect(). This method closes the existing connection (ignoring errors) and opens a fresh one. Useful for recovering from network timeouts.

Source code in src/bani/connectors/base.py
def reconnect(self) -> None:
    """Re-establish the connection using the stored config.

    Connectors store ``_config`` during ``connect()``.  This method
    closes the existing connection (ignoring errors) and opens a
    fresh one.  Useful for recovering from network timeouts.
    """
    config = getattr(self, "_config", None)
    pool_size = getattr(self, "_pool_size", 1)
    if config is None:
        raise RuntimeError("Cannot reconnect: no stored config")
    try:
        self.disconnect()
    except Exception:
        pass
    self.connect(config, pool_size=pool_size)

checkout()

Check out a connection from the pool.

Falls back to yielding self.connection when no pool is configured (backward compatibility).

Source code in src/bani/connectors/base.py
@contextmanager
def checkout(self) -> Iterator[Any]:
    """Check out a connection from the pool.

    Falls back to yielding ``self.connection`` when no pool is
    configured (backward compatibility).
    """
    if self._pool is not None:
        with self._pool.acquire() as conn:
            yield conn
    else:
        yield getattr(self, "connection", None)

create_table(table_def) abstractmethod

Create a table in the target database.

Creates a table with all columns and constraints as specified in the table definition. Primary key and check constraints are included in the CREATE TABLE statement.

Parameters:

Name Type Description Default
table_def TableDefinition

TableDefinition describing the table to create.

required

Raises:

Type Description
Exception

If table creation fails.

Source code in src/bani/connectors/base.py
@abstractmethod
def create_table(self, table_def: TableDefinition) -> None:
    """Create a table in the target database.

    Creates a table with all columns and constraints as specified in
    the table definition. Primary key and check constraints are
    included in the CREATE TABLE statement.

    Args:
        table_def: TableDefinition describing the table to create.

    Raises:
        Exception: If table creation fails.
    """
    ...

write_batch(table_name, schema_name, batch) abstractmethod

Write a batch of Arrow records to a table.

Uses the most efficient method available (e.g., COPY for PostgreSQL).

Parameters:

Name Type Description Default
table_name str

Name of the target table.

required
schema_name str

Schema containing the table.

required
batch RecordBatch

A pyarrow.RecordBatch to write.

required

Returns:

Type Description
int

Number of rows written.

Raises:

Type Description
Exception

If writing fails.

Source code in src/bani/connectors/base.py
@abstractmethod
def write_batch(
    self, table_name: str, schema_name: str, batch: pa.RecordBatch
) -> int:
    """Write a batch of Arrow records to a table.

    Uses the most efficient method available (e.g., COPY for PostgreSQL).

    Args:
        table_name: Name of the target table.
        schema_name: Schema containing the table.
        batch: A pyarrow.RecordBatch to write.

    Returns:
        Number of rows written.

    Raises:
        Exception: If writing fails.
    """
    ...

create_indexes(table_name, schema_name, indexes) abstractmethod

Create indexes on a table.

Parameters:

Name Type Description Default
table_name str

Name of the table.

required
schema_name str

Schema containing the table.

required
indexes tuple[IndexDefinition, ...]

Tuple of index definitions to create.

required

Raises:

Type Description
Exception

If index creation fails.

Source code in src/bani/connectors/base.py
@abstractmethod
def create_indexes(
    self, table_name: str, schema_name: str, indexes: tuple[IndexDefinition, ...]
) -> None:
    """Create indexes on a table.

    Args:
        table_name: Name of the table.
        schema_name: Schema containing the table.
        indexes: Tuple of index definitions to create.

    Raises:
        Exception: If index creation fails.
    """
    ...

create_foreign_keys(fks) abstractmethod

Create foreign key constraints.

Parameters:

Name Type Description Default
fks tuple[ForeignKeyDefinition, ...]

Tuple of foreign key definitions to create.

required

Raises:

Type Description
Exception

If creation fails.

Source code in src/bani/connectors/base.py
@abstractmethod
def create_foreign_keys(self, fks: tuple[ForeignKeyDefinition, ...]) -> None:
    """Create foreign key constraints.

    Args:
        fks: Tuple of foreign key definitions to create.

    Raises:
        Exception: If creation fails.
    """
    ...

execute_sql(sql) abstractmethod

Execute arbitrary SQL.

Used for custom DDL or DML not covered by other methods.

Parameters:

Name Type Description Default
sql str

SQL statement to execute.

required

Raises:

Type Description
Exception

If execution fails.

Source code in src/bani/connectors/base.py
@abstractmethod
def execute_sql(self, sql: str) -> None:
    """Execute arbitrary SQL.

    Used for custom DDL or DML not covered by other methods.

    Args:
        sql: SQL statement to execute.

    Raises:
        Exception: If execution fails.
    """
    ...