Connector Base Classes¶
bani.connectors.base
¶
Abstract base classes for database connectors (Section 6.1).
Defines SourceConnector and SinkConnector abstract base classes that all database connectors must implement. These form the boundary between the domain layer and connector implementations.
SourceConnector
¶
Bases: ABC
Port: reads schema and data from a source database.
A source connector is responsible for introspecting a source database's schema and reading data from its tables. It abstracts away database-specific details like SQL dialects and driver APIs.
Source code in src/bani/connectors/base.py
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 | |
connect(config, pool_size=1)
abstractmethod
¶
Establish a connection to the source database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
ConnectionConfig
|
Connection configuration with resolved credentials. |
required |
pool_size
|
int
|
Number of connections to create in the pool. |
1
|
Raises:
| Type | Description |
|---|---|
Exception
|
If the connection fails. |
Source code in src/bani/connectors/base.py
disconnect()
abstractmethod
¶
Close the connection to the source database.
Raises:
| Type | Description |
|---|---|
Exception
|
If disconnection fails. |
reconnect()
¶
Re-establish the connection using the stored config.
Connectors store _config during connect(). This method
closes the existing connection (ignoring errors) and opens a
fresh one. Useful for recovering from network timeouts.
Source code in src/bani/connectors/base.py
checkout()
¶
Check out a connection from the pool.
Falls back to yielding self.connection when no pool is
configured (backward compatibility).
Source code in src/bani/connectors/base.py
introspect_schema()
abstractmethod
¶
Introspect the complete schema of the source database.
Reads all tables, columns, constraints, indexes, and other schema metadata and returns it as a DatabaseSchema object.
Returns:
| Type | Description |
|---|---|
DatabaseSchema
|
A DatabaseSchema containing all tables and their metadata. |
Raises:
| Type | Description |
|---|---|
Exception
|
If introspection fails. |
Source code in src/bani/connectors/base.py
read_table(table_name, schema_name, columns=None, filter_sql=None, batch_size=100000)
abstractmethod
¶
Read data from a table as batches of Arrow records.
Yields batches of data from the specified table. Uses server-side cursors for memory efficiency on large tables.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
str
|
Name of the table to read from. |
required |
schema_name
|
str
|
Schema (namespace) containing the table. |
required |
columns
|
list[str] | None
|
Optional list of column names to read. If None, all columns are read. |
None
|
filter_sql
|
str | None
|
Optional WHERE clause (without "WHERE" keyword) to filter rows. |
None
|
batch_size
|
int
|
Number of rows per batch. |
100000
|
Yields:
| Type | Description |
|---|---|
RecordBatch
|
pyarrow.RecordBatch instances. |
Raises:
| Type | Description |
|---|---|
Exception
|
If reading fails. |
Source code in src/bani/connectors/base.py
estimate_row_count(table_name, schema_name)
abstractmethod
¶
Get an estimated row count for a table.
Returns a fast estimate based on schema statistics if available, or executes a COUNT(*) query if not.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
str
|
Name of the table. |
required |
schema_name
|
str
|
Schema containing the table. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Estimated or exact row count. |
Raises:
| Type | Description |
|---|---|
Exception
|
If the query fails. |
Source code in src/bani/connectors/base.py
SinkConnector
¶
Bases: ABC
Port: writes schema and data to a target database.
A sink connector is responsible for creating tables and writing data to a target database. It abstracts away database-specific DDL and DML details.
Subclasses may override recommended_batch_size and
recommended_parallel_workers to declare optimal defaults for
the target engine. The orchestrator applies these when the project
uses global defaults (i.e. the user did not explicitly set them).
Source code in src/bani/connectors/base.py
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 | |
default_schema
property
¶
Return the sink's default schema name.
The orchestrator uses this to remap source schema names so tables are created in the correct target schema/database. Subclasses should override to return the connected database name (MySQL/MSSQL) or owner (Oracle) or 'public' (PG).
connect(config, pool_size=1)
abstractmethod
¶
Establish a connection to the target database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
ConnectionConfig
|
Connection configuration with resolved credentials. |
required |
pool_size
|
int
|
Number of connections to create in the pool. |
1
|
Raises:
| Type | Description |
|---|---|
Exception
|
If the connection fails. |
Source code in src/bani/connectors/base.py
disconnect()
abstractmethod
¶
Close the connection to the target database.
Raises:
| Type | Description |
|---|---|
Exception
|
If disconnection fails. |
reconnect()
¶
Re-establish the connection using the stored config.
Connectors store _config during connect(). This method
closes the existing connection (ignoring errors) and opens a
fresh one. Useful for recovering from network timeouts.
Source code in src/bani/connectors/base.py
checkout()
¶
Check out a connection from the pool.
Falls back to yielding self.connection when no pool is
configured (backward compatibility).
Source code in src/bani/connectors/base.py
create_table(table_def)
abstractmethod
¶
Create a table in the target database.
Creates a table with all columns and constraints as specified in the table definition. Primary key and check constraints are included in the CREATE TABLE statement.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_def
|
TableDefinition
|
TableDefinition describing the table to create. |
required |
Raises:
| Type | Description |
|---|---|
Exception
|
If table creation fails. |
Source code in src/bani/connectors/base.py
write_batch(table_name, schema_name, batch)
abstractmethod
¶
Write a batch of Arrow records to a table.
Uses the most efficient method available (e.g., COPY for PostgreSQL).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
str
|
Name of the target table. |
required |
schema_name
|
str
|
Schema containing the table. |
required |
batch
|
RecordBatch
|
A pyarrow.RecordBatch to write. |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of rows written. |
Raises:
| Type | Description |
|---|---|
Exception
|
If writing fails. |
Source code in src/bani/connectors/base.py
create_indexes(table_name, schema_name, indexes)
abstractmethod
¶
Create indexes on a table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table_name
|
str
|
Name of the table. |
required |
schema_name
|
str
|
Schema containing the table. |
required |
indexes
|
tuple[IndexDefinition, ...]
|
Tuple of index definitions to create. |
required |
Raises:
| Type | Description |
|---|---|
Exception
|
If index creation fails. |
Source code in src/bani/connectors/base.py
create_foreign_keys(fks)
abstractmethod
¶
Create foreign key constraints.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fks
|
tuple[ForeignKeyDefinition, ...]
|
Tuple of foreign key definitions to create. |
required |
Raises:
| Type | Description |
|---|---|
Exception
|
If creation fails. |
Source code in src/bani/connectors/base.py
execute_sql(sql)
abstractmethod
¶
Execute arbitrary SQL.
Used for custom DDL or DML not covered by other methods.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
sql
|
str
|
SQL statement to execute. |
required |
Raises:
| Type | Description |
|---|---|
Exception
|
If execution fails. |