Skip to content

SDK Reference

bani.sdk.bani

Top-level Bani class for loading and running migrations.

BaniProject

Wrapper around a loaded ProjectModel with validation and execution methods.

Source code in src/bani/sdk/bani.py
class BaniProject:
    """Wrapper around a loaded ProjectModel with validation and execution methods."""

    def __init__(self, project: ProjectModel) -> None:
        """Initialize a BaniProject.

        Args:
            project: The ProjectModel to wrap.
        """
        self._project = project

    def validate(self) -> tuple[bool, list[str]]:
        """Validate the project configuration.

        Returns:
            A tuple of (is_valid, error_messages).
        """
        # For now, basic validation
        errors: list[str] = []

        if not self._project.name:
            errors.append("Project name is required")

        if self._project.source is None:
            errors.append("Source connection is required")
        elif not self._project.source.dialect:
            errors.append("Source dialect is required")

        if self._project.target is None:
            errors.append("Target connection is required")
        elif not self._project.target.dialect:
            errors.append("Target dialect is required")

        return (len(errors) == 0, errors)

    def run(
        self,
        on_progress: Callable[[Any], None] | None = None,
        resume: bool = False,
        cancel_event: Any | None = None,
        checkpoint: Any | None = None,
        projects_dir: str = "~/.bani/projects",
    ) -> MigrationResult:
        """Execute the migration.

        Args:
            on_progress: Optional callback for progress updates.
            resume: If ``True``, resume from the last checkpoint.
                Completed tables are skipped; in-progress/failed
                tables are dropped and re-transferred.
            cancel_event: Optional ``threading.Event`` that signals
                cancellation. The orchestrator checks it between
                batches and stops gracefully when set.
            checkpoint: Optional ``CheckpointManager`` instance. If
                ``None``, a default one is created.

        Returns:
            A MigrationResult with execution summary.

        Raises:
            ValueError: If the project is invalid.
            Exception: If migration execution fails.
        """
        is_valid, errors = self.validate()
        if not is_valid:
            msg = "Project validation failed: " + "; ".join(errors)
            raise ValueError(msg)

        # Get source and target connectors
        source_cfg = self._project.source
        target_cfg = self._project.target

        assert source_cfg is not None
        assert target_cfg is not None

        # Determine pool size from project options
        pool_size = (
            self._project.options.parallel_workers if self._project.options else 4
        )

        # Create source connector and connect
        source_connector_class = ConnectorRegistry.get(source_cfg.dialect)
        source = cast(type[SourceConnector], source_connector_class)()
        source.connect(source_cfg, pool_size=pool_size)

        # Create sink connector and connect
        sink_connector_class = ConnectorRegistry.get(target_cfg.dialect)
        sink = cast(type[SinkConnector], sink_connector_class)()
        sink.connect(target_cfg, pool_size=pool_size)

        try:
            # Create progress tracker if callback provided
            tracker = None
            if on_progress:
                tracker = ProgressTracker()
                tracker.add_listener(on_progress)

            kwargs: dict[str, Any] = {
                "tracker": tracker,
                "projects_dir": projects_dir,
            }
            if checkpoint is not None:
                kwargs["checkpoint"] = checkpoint
            orchestrator = MigrationOrchestrator(self._project, source, sink, **kwargs)
            if cancel_event is not None:
                orchestrator.set_cancel_event(cancel_event)
            return orchestrator.execute(resume=resume)
        finally:
            source.disconnect()
            sink.disconnect()

    def preview(
        self,
        tables: list[str] | None = None,
        sample_size: int = 10,
    ) -> PreviewResult:
        """Preview data from the source database.

        Delegates to :func:`bani.application.preview.preview_source` for
        the actual introspection and sampling logic.

        Args:
            tables: Optional list of table names to preview. If ``None``,
                all tables are previewed.
            sample_size: Number of rows to sample per table.

        Returns:
            A :class:`PreviewResult` with column metadata and sample rows.
        """
        source_cfg = self._project.source
        assert source_cfg is not None

        pool_size = (
            self._project.options.parallel_workers if self._project.options else 1
        )
        source_connector_class = ConnectorRegistry.get(source_cfg.dialect)
        source = cast(type[SourceConnector], source_connector_class)()
        source.connect(source_cfg, pool_size=pool_size)

        try:
            return preview_source(source, tables=tables, sample_size=sample_size)
        finally:
            source.disconnect()

__init__(project)

Initialize a BaniProject.

Parameters:

Name Type Description Default
project ProjectModel

The ProjectModel to wrap.

required
Source code in src/bani/sdk/bani.py
def __init__(self, project: ProjectModel) -> None:
    """Initialize a BaniProject.

    Args:
        project: The ProjectModel to wrap.
    """
    self._project = project

validate()

Validate the project configuration.

Returns:

Type Description
tuple[bool, list[str]]

A tuple of (is_valid, error_messages).

Source code in src/bani/sdk/bani.py
def validate(self) -> tuple[bool, list[str]]:
    """Validate the project configuration.

    Returns:
        A tuple of (is_valid, error_messages).
    """
    # For now, basic validation
    errors: list[str] = []

    if not self._project.name:
        errors.append("Project name is required")

    if self._project.source is None:
        errors.append("Source connection is required")
    elif not self._project.source.dialect:
        errors.append("Source dialect is required")

    if self._project.target is None:
        errors.append("Target connection is required")
    elif not self._project.target.dialect:
        errors.append("Target dialect is required")

    return (len(errors) == 0, errors)

run(on_progress=None, resume=False, cancel_event=None, checkpoint=None, projects_dir='~/.bani/projects')

Execute the migration.

Parameters:

Name Type Description Default
on_progress Callable[[Any], None] | None

Optional callback for progress updates.

None
resume bool

If True, resume from the last checkpoint. Completed tables are skipped; in-progress/failed tables are dropped and re-transferred.

False
cancel_event Any | None

Optional threading.Event that signals cancellation. The orchestrator checks it between batches and stops gracefully when set.

None
checkpoint Any | None

Optional CheckpointManager instance. If None, a default one is created.

None

Returns:

Type Description
MigrationResult

A MigrationResult with execution summary.

Raises:

Type Description
ValueError

If the project is invalid.

Exception

If migration execution fails.

Source code in src/bani/sdk/bani.py
def run(
    self,
    on_progress: Callable[[Any], None] | None = None,
    resume: bool = False,
    cancel_event: Any | None = None,
    checkpoint: Any | None = None,
    projects_dir: str = "~/.bani/projects",
) -> MigrationResult:
    """Execute the migration.

    Args:
        on_progress: Optional callback for progress updates.
        resume: If ``True``, resume from the last checkpoint.
            Completed tables are skipped; in-progress/failed
            tables are dropped and re-transferred.
        cancel_event: Optional ``threading.Event`` that signals
            cancellation. The orchestrator checks it between
            batches and stops gracefully when set.
        checkpoint: Optional ``CheckpointManager`` instance. If
            ``None``, a default one is created.

    Returns:
        A MigrationResult with execution summary.

    Raises:
        ValueError: If the project is invalid.
        Exception: If migration execution fails.
    """
    is_valid, errors = self.validate()
    if not is_valid:
        msg = "Project validation failed: " + "; ".join(errors)
        raise ValueError(msg)

    # Get source and target connectors
    source_cfg = self._project.source
    target_cfg = self._project.target

    assert source_cfg is not None
    assert target_cfg is not None

    # Determine pool size from project options
    pool_size = (
        self._project.options.parallel_workers if self._project.options else 4
    )

    # Create source connector and connect
    source_connector_class = ConnectorRegistry.get(source_cfg.dialect)
    source = cast(type[SourceConnector], source_connector_class)()
    source.connect(source_cfg, pool_size=pool_size)

    # Create sink connector and connect
    sink_connector_class = ConnectorRegistry.get(target_cfg.dialect)
    sink = cast(type[SinkConnector], sink_connector_class)()
    sink.connect(target_cfg, pool_size=pool_size)

    try:
        # Create progress tracker if callback provided
        tracker = None
        if on_progress:
            tracker = ProgressTracker()
            tracker.add_listener(on_progress)

        kwargs: dict[str, Any] = {
            "tracker": tracker,
            "projects_dir": projects_dir,
        }
        if checkpoint is not None:
            kwargs["checkpoint"] = checkpoint
        orchestrator = MigrationOrchestrator(self._project, source, sink, **kwargs)
        if cancel_event is not None:
            orchestrator.set_cancel_event(cancel_event)
        return orchestrator.execute(resume=resume)
    finally:
        source.disconnect()
        sink.disconnect()

preview(tables=None, sample_size=10)

Preview data from the source database.

Delegates to :func:bani.application.preview.preview_source for the actual introspection and sampling logic.

Parameters:

Name Type Description Default
tables list[str] | None

Optional list of table names to preview. If None, all tables are previewed.

None
sample_size int

Number of rows to sample per table.

10

Returns:

Name Type Description
A PreviewResult

class:PreviewResult with column metadata and sample rows.

Source code in src/bani/sdk/bani.py
def preview(
    self,
    tables: list[str] | None = None,
    sample_size: int = 10,
) -> PreviewResult:
    """Preview data from the source database.

    Delegates to :func:`bani.application.preview.preview_source` for
    the actual introspection and sampling logic.

    Args:
        tables: Optional list of table names to preview. If ``None``,
            all tables are previewed.
        sample_size: Number of rows to sample per table.

    Returns:
        A :class:`PreviewResult` with column metadata and sample rows.
    """
    source_cfg = self._project.source
    assert source_cfg is not None

    pool_size = (
        self._project.options.parallel_workers if self._project.options else 1
    )
    source_connector_class = ConnectorRegistry.get(source_cfg.dialect)
    source = cast(type[SourceConnector], source_connector_class)()
    source.connect(source_cfg, pool_size=pool_size)

    try:
        return preview_source(source, tables=tables, sample_size=sample_size)
    finally:
        source.disconnect()

Bani

Top-level entry point for Bani operations.

Source code in src/bani/sdk/bani.py
class Bani:
    """Top-level entry point for Bani operations."""

    @staticmethod
    def load(path: str | Path) -> BaniProject:
        """Load a BDL project file.

        Args:
            path: Path to the BDL file.

        Returns:
            A BaniProject ready for validation and execution.

        Raises:
            BDLValidationError: If parsing fails.
        """
        path_obj = Path(path) if isinstance(path, str) else path
        project = parse(path_obj)
        return BaniProject(project)

    @staticmethod
    def validate_file(path: str | Path) -> tuple[bool, list[str]]:
        """Validate a BDL file.

        Args:
            path: Path to the BDL file.

        Returns:
            A tuple of (is_valid, error_messages).
        """
        path_obj = Path(path) if isinstance(path, str) else path

        with open(path_obj) as f:
            content = f.read()

        if content.strip().startswith("<"):
            errors = validate_xml(content)
        else:
            errors = validate_json(content)

        return (len(errors) == 0, errors)

load(path) staticmethod

Load a BDL project file.

Parameters:

Name Type Description Default
path str | Path

Path to the BDL file.

required

Returns:

Type Description
BaniProject

A BaniProject ready for validation and execution.

Raises:

Type Description
BDLValidationError

If parsing fails.

Source code in src/bani/sdk/bani.py
@staticmethod
def load(path: str | Path) -> BaniProject:
    """Load a BDL project file.

    Args:
        path: Path to the BDL file.

    Returns:
        A BaniProject ready for validation and execution.

    Raises:
        BDLValidationError: If parsing fails.
    """
    path_obj = Path(path) if isinstance(path, str) else path
    project = parse(path_obj)
    return BaniProject(project)

validate_file(path) staticmethod

Validate a BDL file.

Parameters:

Name Type Description Default
path str | Path

Path to the BDL file.

required

Returns:

Type Description
tuple[bool, list[str]]

A tuple of (is_valid, error_messages).

Source code in src/bani/sdk/bani.py
@staticmethod
def validate_file(path: str | Path) -> tuple[bool, list[str]]:
    """Validate a BDL file.

    Args:
        path: Path to the BDL file.

    Returns:
        A tuple of (is_valid, error_messages).
    """
    path_obj = Path(path) if isinstance(path, str) else path

    with open(path_obj) as f:
        content = f.read()

    if content.strip().startswith("<"):
        errors = validate_xml(content)
    else:
        errors = validate_json(content)

    return (len(errors) == 0, errors)

bani.sdk.project_builder

Fluent builder for ProjectModel construction (Section 18.3).

ProjectBuilder

Fluent builder for constructing ProjectModel instances.

Source code in src/bani/sdk/project_builder.py
class ProjectBuilder:
    """Fluent builder for constructing ProjectModel instances."""

    def __init__(self, name: str) -> None:
        """Initialize a new project builder.

        Args:
            name: The project name.
        """
        self._name = name
        self._source_config: ConnectionConfig | None = None
        self._target_config: ConnectionConfig | None = None
        self._description = ""
        self._author = ""
        self._tags: list[str] = []
        self._include_tables: list[str] = []
        self._exclude_tables: list[str] = []
        self._type_overrides: list[TypeMappingOverride] = []
        self._batch_size = 100_000
        self._parallel_workers = 4
        self._memory_limit_mb = 2048
        self._created: datetime | None = None

    def source(
        self,
        dialect: str,
        host: str = "",
        port: int = 0,
        database: str = "",
        username_env: str = "",
        password_env: str = "",
        **extra: str,
    ) -> ProjectBuilder:
        """Configure the source database.

        Args:
            dialect: Database dialect (e.g. "postgresql", "mysql").
            host: Database host.
            port: Database port.
            database: Database name.
            username_env: Environment variable name for username.
            password_env: Environment variable name for password.
            **extra: Additional connector-specific configuration.

        Returns:
            This builder for method chaining.
        """
        extra_tuple = tuple(sorted(extra.items()))
        self._source_config = ConnectionConfig(
            dialect=dialect,
            host=host,
            port=port,
            database=database,
            username_env=username_env,
            password_env=password_env,
            extra=extra_tuple,
        )
        return self

    def target(
        self,
        dialect: str,
        host: str = "",
        port: int = 0,
        database: str = "",
        username_env: str = "",
        password_env: str = "",
        **extra: str,
    ) -> ProjectBuilder:
        """Configure the target database.

        Args:
            dialect: Database dialect (e.g. "postgresql", "mysql").
            host: Database host.
            port: Database port.
            database: Database name.
            username_env: Environment variable name for username.
            password_env: Environment variable name for password.
            **extra: Additional connector-specific configuration.

        Returns:
            This builder for method chaining.
        """
        extra_tuple = tuple(sorted(extra.items()))
        self._target_config = ConnectionConfig(
            dialect=dialect,
            host=host,
            port=port,
            database=database,
            username_env=username_env,
            password_env=password_env,
            extra=extra_tuple,
        )
        return self

    def include_tables(self, tables: list[str]) -> ProjectBuilder:
        """Include only these tables in the migration.

        Tables are specified in "schema.table" format.

        Args:
            tables: List of fully qualified table names to include.

        Returns:
            This builder for method chaining.
        """
        self._include_tables = tables
        return self

    def exclude_tables(self, tables: list[str]) -> ProjectBuilder:
        """Exclude these tables from the migration.

        Tables are specified in "schema.table" format.

        Args:
            tables: List of fully qualified table names to exclude.

        Returns:
            This builder for method chaining.
        """
        self._exclude_tables = tables
        return self

    def type_mapping(self, source_type: str, target_type: str) -> ProjectBuilder:
        """Add a type mapping override.

        Args:
            source_type: Source database type.
            target_type: Target database type.

        Returns:
            This builder for method chaining.
        """
        self._type_overrides.append(
            TypeMappingOverride(source_type=source_type, target_type=target_type)
        )
        return self

    def batch_size(self, size: int) -> ProjectBuilder:
        """Set the batch size for data transfer.

        Args:
            size: Number of rows per batch.

        Returns:
            This builder for method chaining.
        """
        self._batch_size = size
        return self

    def parallel_workers(self, workers: int) -> ProjectBuilder:
        """Set the number of parallel workers.

        Args:
            workers: Number of parallel workers.

        Returns:
            This builder for method chaining.
        """
        self._parallel_workers = workers
        return self

    def memory_limit(self, mb: int) -> ProjectBuilder:
        """Set the memory limit in MB.

        Args:
            mb: Memory limit in megabytes.

        Returns:
            This builder for method chaining.
        """
        self._memory_limit_mb = mb
        return self

    def description(self, desc: str) -> ProjectBuilder:
        """Set the project description.

        Args:
            desc: Project description.

        Returns:
            This builder for method chaining.
        """
        self._description = desc
        return self

    def author(self, name: str) -> ProjectBuilder:
        """Set the project author.

        Args:
            name: Author name.

        Returns:
            This builder for method chaining.
        """
        self._author = name
        return self

    def tags(self, tags: list[str]) -> ProjectBuilder:
        """Set project tags.

        Args:
            tags: List of tags.

        Returns:
            This builder for method chaining.
        """
        self._tags = tags
        return self

    def build(self) -> ProjectModel:
        """Build and return the ProjectModel.

        Returns:
            The constructed ProjectModel.
        """
        # Build table mappings from include/exclude lists
        table_mappings: list[TableMapping] = []

        if self._include_tables:
            for table_spec in self._include_tables:
                parts = table_spec.split(".", 1)
                if len(parts) == 2:
                    schema, table = parts
                    table_mappings.append(
                        TableMapping(
                            source_schema=schema,
                            source_table=table,
                            target_schema=schema,
                            target_table=table,
                        )
                    )

        # Build options
        options = ProjectOptions(
            batch_size=self._batch_size,
            parallel_workers=self._parallel_workers,
            memory_limit_mb=self._memory_limit_mb,
        )

        # If created timestamp not set, use current time
        created = self._created or datetime.now(timezone.utc)

        return ProjectModel(
            name=self._name,
            source=self._source_config,
            target=self._target_config,
            description=self._description,
            author=self._author,
            created=created,
            tags=tuple(self._tags),
            table_mappings=tuple(table_mappings),
            type_overrides=tuple(self._type_overrides),
            options=options,
        )

__init__(name)

Initialize a new project builder.

Parameters:

Name Type Description Default
name str

The project name.

required
Source code in src/bani/sdk/project_builder.py
def __init__(self, name: str) -> None:
    """Initialize a new project builder.

    Args:
        name: The project name.
    """
    self._name = name
    self._source_config: ConnectionConfig | None = None
    self._target_config: ConnectionConfig | None = None
    self._description = ""
    self._author = ""
    self._tags: list[str] = []
    self._include_tables: list[str] = []
    self._exclude_tables: list[str] = []
    self._type_overrides: list[TypeMappingOverride] = []
    self._batch_size = 100_000
    self._parallel_workers = 4
    self._memory_limit_mb = 2048
    self._created: datetime | None = None

source(dialect, host='', port=0, database='', username_env='', password_env='', **extra)

Configure the source database.

Parameters:

Name Type Description Default
dialect str

Database dialect (e.g. "postgresql", "mysql").

required
host str

Database host.

''
port int

Database port.

0
database str

Database name.

''
username_env str

Environment variable name for username.

''
password_env str

Environment variable name for password.

''
**extra str

Additional connector-specific configuration.

{}

Returns:

Type Description
ProjectBuilder

This builder for method chaining.

Source code in src/bani/sdk/project_builder.py
def source(
    self,
    dialect: str,
    host: str = "",
    port: int = 0,
    database: str = "",
    username_env: str = "",
    password_env: str = "",
    **extra: str,
) -> ProjectBuilder:
    """Configure the source database.

    Args:
        dialect: Database dialect (e.g. "postgresql", "mysql").
        host: Database host.
        port: Database port.
        database: Database name.
        username_env: Environment variable name for username.
        password_env: Environment variable name for password.
        **extra: Additional connector-specific configuration.

    Returns:
        This builder for method chaining.
    """
    extra_tuple = tuple(sorted(extra.items()))
    self._source_config = ConnectionConfig(
        dialect=dialect,
        host=host,
        port=port,
        database=database,
        username_env=username_env,
        password_env=password_env,
        extra=extra_tuple,
    )
    return self

target(dialect, host='', port=0, database='', username_env='', password_env='', **extra)

Configure the target database.

Parameters:

Name Type Description Default
dialect str

Database dialect (e.g. "postgresql", "mysql").

required
host str

Database host.

''
port int

Database port.

0
database str

Database name.

''
username_env str

Environment variable name for username.

''
password_env str

Environment variable name for password.

''
**extra str

Additional connector-specific configuration.

{}

Returns:

Type Description
ProjectBuilder

This builder for method chaining.

Source code in src/bani/sdk/project_builder.py
def target(
    self,
    dialect: str,
    host: str = "",
    port: int = 0,
    database: str = "",
    username_env: str = "",
    password_env: str = "",
    **extra: str,
) -> ProjectBuilder:
    """Configure the target database.

    Args:
        dialect: Database dialect (e.g. "postgresql", "mysql").
        host: Database host.
        port: Database port.
        database: Database name.
        username_env: Environment variable name for username.
        password_env: Environment variable name for password.
        **extra: Additional connector-specific configuration.

    Returns:
        This builder for method chaining.
    """
    extra_tuple = tuple(sorted(extra.items()))
    self._target_config = ConnectionConfig(
        dialect=dialect,
        host=host,
        port=port,
        database=database,
        username_env=username_env,
        password_env=password_env,
        extra=extra_tuple,
    )
    return self

include_tables(tables)

Include only these tables in the migration.

Tables are specified in "schema.table" format.

Parameters:

Name Type Description Default
tables list[str]

List of fully qualified table names to include.

required

Returns:

Type Description
ProjectBuilder

This builder for method chaining.

Source code in src/bani/sdk/project_builder.py
def include_tables(self, tables: list[str]) -> ProjectBuilder:
    """Include only these tables in the migration.

    Tables are specified in "schema.table" format.

    Args:
        tables: List of fully qualified table names to include.

    Returns:
        This builder for method chaining.
    """
    self._include_tables = tables
    return self

exclude_tables(tables)

Exclude these tables from the migration.

Tables are specified in "schema.table" format.

Parameters:

Name Type Description Default
tables list[str]

List of fully qualified table names to exclude.

required

Returns:

Type Description
ProjectBuilder

This builder for method chaining.

Source code in src/bani/sdk/project_builder.py
def exclude_tables(self, tables: list[str]) -> ProjectBuilder:
    """Exclude these tables from the migration.

    Tables are specified in "schema.table" format.

    Args:
        tables: List of fully qualified table names to exclude.

    Returns:
        This builder for method chaining.
    """
    self._exclude_tables = tables
    return self

type_mapping(source_type, target_type)

Add a type mapping override.

Parameters:

Name Type Description Default
source_type str

Source database type.

required
target_type str

Target database type.

required

Returns:

Type Description
ProjectBuilder

This builder for method chaining.

Source code in src/bani/sdk/project_builder.py
def type_mapping(self, source_type: str, target_type: str) -> ProjectBuilder:
    """Add a type mapping override.

    Args:
        source_type: Source database type.
        target_type: Target database type.

    Returns:
        This builder for method chaining.
    """
    self._type_overrides.append(
        TypeMappingOverride(source_type=source_type, target_type=target_type)
    )
    return self

batch_size(size)

Set the batch size for data transfer.

Parameters:

Name Type Description Default
size int

Number of rows per batch.

required

Returns:

Type Description
ProjectBuilder

This builder for method chaining.

Source code in src/bani/sdk/project_builder.py
def batch_size(self, size: int) -> ProjectBuilder:
    """Set the batch size for data transfer.

    Args:
        size: Number of rows per batch.

    Returns:
        This builder for method chaining.
    """
    self._batch_size = size
    return self

parallel_workers(workers)

Set the number of parallel workers.

Parameters:

Name Type Description Default
workers int

Number of parallel workers.

required

Returns:

Type Description
ProjectBuilder

This builder for method chaining.

Source code in src/bani/sdk/project_builder.py
def parallel_workers(self, workers: int) -> ProjectBuilder:
    """Set the number of parallel workers.

    Args:
        workers: Number of parallel workers.

    Returns:
        This builder for method chaining.
    """
    self._parallel_workers = workers
    return self

memory_limit(mb)

Set the memory limit in MB.

Parameters:

Name Type Description Default
mb int

Memory limit in megabytes.

required

Returns:

Type Description
ProjectBuilder

This builder for method chaining.

Source code in src/bani/sdk/project_builder.py
def memory_limit(self, mb: int) -> ProjectBuilder:
    """Set the memory limit in MB.

    Args:
        mb: Memory limit in megabytes.

    Returns:
        This builder for method chaining.
    """
    self._memory_limit_mb = mb
    return self

description(desc)

Set the project description.

Parameters:

Name Type Description Default
desc str

Project description.

required

Returns:

Type Description
ProjectBuilder

This builder for method chaining.

Source code in src/bani/sdk/project_builder.py
def description(self, desc: str) -> ProjectBuilder:
    """Set the project description.

    Args:
        desc: Project description.

    Returns:
        This builder for method chaining.
    """
    self._description = desc
    return self

author(name)

Set the project author.

Parameters:

Name Type Description Default
name str

Author name.

required

Returns:

Type Description
ProjectBuilder

This builder for method chaining.

Source code in src/bani/sdk/project_builder.py
def author(self, name: str) -> ProjectBuilder:
    """Set the project author.

    Args:
        name: Author name.

    Returns:
        This builder for method chaining.
    """
    self._author = name
    return self

tags(tags)

Set project tags.

Parameters:

Name Type Description Default
tags list[str]

List of tags.

required

Returns:

Type Description
ProjectBuilder

This builder for method chaining.

Source code in src/bani/sdk/project_builder.py
def tags(self, tags: list[str]) -> ProjectBuilder:
    """Set project tags.

    Args:
        tags: List of tags.

    Returns:
        This builder for method chaining.
    """
    self._tags = tags
    return self

build()

Build and return the ProjectModel.

Returns:

Type Description
ProjectModel

The constructed ProjectModel.

Source code in src/bani/sdk/project_builder.py
def build(self) -> ProjectModel:
    """Build and return the ProjectModel.

    Returns:
        The constructed ProjectModel.
    """
    # Build table mappings from include/exclude lists
    table_mappings: list[TableMapping] = []

    if self._include_tables:
        for table_spec in self._include_tables:
            parts = table_spec.split(".", 1)
            if len(parts) == 2:
                schema, table = parts
                table_mappings.append(
                    TableMapping(
                        source_schema=schema,
                        source_table=table,
                        target_schema=schema,
                        target_table=table,
                    )
                )

    # Build options
    options = ProjectOptions(
        batch_size=self._batch_size,
        parallel_workers=self._parallel_workers,
        memory_limit_mb=self._memory_limit_mb,
    )

    # If created timestamp not set, use current time
    created = self._created or datetime.now(timezone.utc)

    return ProjectModel(
        name=self._name,
        source=self._source_config,
        target=self._target_config,
        description=self._description,
        author=self._author,
        created=created,
        tags=tuple(self._tags),
        table_mappings=tuple(table_mappings),
        type_overrides=tuple(self._type_overrides),
        options=options,
    )

bani.sdk.schema_inspector

Schema introspection via connectors.

SchemaInspector

Inspects database schema via connector registry.

Source code in src/bani/sdk/schema_inspector.py
class SchemaInspector:
    """Inspects database schema via connector registry."""

    @staticmethod
    def inspect(
        dialect: str,
        host: str = "",
        port: int = 0,
        database: str = "",
        username_env: str = "",
        password_env: str = "",
        **kwargs: Any,
    ) -> DatabaseSchema:
        """Introspect a database schema.

        Args:
            dialect: Database dialect (e.g. "postgresql", "mysql").
            host: Database host.
            port: Database port.
            database: Database name.
            username_env: Environment variable name for username.
            password_env: Environment variable name for password.
            **kwargs: Additional connector-specific arguments.

        Returns:
            The introspected DatabaseSchema.

        Raises:
            KeyError: If no connector is registered for the dialect.
            Exception: If connection or introspection fails.
        """
        extra_config: tuple[tuple[str, str], ...] = tuple(
            (k, str(v)) for k, v in kwargs.items()
        )
        config = ConnectionConfig(
            dialect=dialect,
            host=host,
            port=port,
            database=database,
            username_env=username_env,
            password_env=password_env,
            extra=extra_config,
        )

        connector_class = ConnectorRegistry.get(dialect)
        source = cast(type[SourceConnector], connector_class)()
        try:
            source.connect(config)
            return source.introspect_schema()
        finally:
            source.disconnect()

inspect(dialect, host='', port=0, database='', username_env='', password_env='', **kwargs) staticmethod

Introspect a database schema.

Parameters:

Name Type Description Default
dialect str

Database dialect (e.g. "postgresql", "mysql").

required
host str

Database host.

''
port int

Database port.

0
database str

Database name.

''
username_env str

Environment variable name for username.

''
password_env str

Environment variable name for password.

''
**kwargs Any

Additional connector-specific arguments.

{}

Returns:

Type Description
DatabaseSchema

The introspected DatabaseSchema.

Raises:

Type Description
KeyError

If no connector is registered for the dialect.

Exception

If connection or introspection fails.

Source code in src/bani/sdk/schema_inspector.py
@staticmethod
def inspect(
    dialect: str,
    host: str = "",
    port: int = 0,
    database: str = "",
    username_env: str = "",
    password_env: str = "",
    **kwargs: Any,
) -> DatabaseSchema:
    """Introspect a database schema.

    Args:
        dialect: Database dialect (e.g. "postgresql", "mysql").
        host: Database host.
        port: Database port.
        database: Database name.
        username_env: Environment variable name for username.
        password_env: Environment variable name for password.
        **kwargs: Additional connector-specific arguments.

    Returns:
        The introspected DatabaseSchema.

    Raises:
        KeyError: If no connector is registered for the dialect.
        Exception: If connection or introspection fails.
    """
    extra_config: tuple[tuple[str, str], ...] = tuple(
        (k, str(v)) for k, v in kwargs.items()
    )
    config = ConnectionConfig(
        dialect=dialect,
        host=host,
        port=port,
        database=database,
        username_env=username_env,
        password_env=password_env,
        extra=extra_config,
    )

    connector_class = ConnectorRegistry.get(dialect)
    source = cast(type[SourceConnector], connector_class)()
    try:
        source.connect(config)
        return source.introspect_schema()
    finally:
        source.disconnect()