-
Notifications
You must be signed in to change notification settings - Fork 92
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #480 from Gexar/gexar/update_pgsql_plugin_478
Update Postgres Plugin for dbt-duckdb to Use ATTACH Syntax and Improve Compatibility Gexar/update pgsql plugin #478
- Loading branch information
Showing
5 changed files
with
496 additions
and
54 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -80,3 +80,5 @@ target/ | |
.idea/ | ||
.vscode/ | ||
.env | ||
|
||
.venv |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,35 +1,132 @@ | ||
from typing import Any | ||
from typing import Dict | ||
from typing import List | ||
from typing import Optional | ||
from typing import Tuple | ||
|
||
from duckdb import DuckDBPyConnection | ||
|
||
from . import BasePlugin | ||
from dbt.adapters.events.logging import AdapterLogger | ||
|
||
PG_EXT = "postgres" | ||
|
||
|
||
class Plugin(BasePlugin): | ||
logger = AdapterLogger("DuckDB_PostgresPlugin") | ||
|
||
def __init__(self, name: str, plugin_config: Dict[str, Any]): | ||
""" | ||
Initialize the Plugin with a name and configuration. | ||
""" | ||
super().__init__(name, plugin_config) | ||
self.logger.debug( | ||
"Plugin __init__ called with name: %s and config: %s", name, plugin_config | ||
) | ||
self.initialize(plugin_config) | ||
|
||
def initialize(self, config: Dict[str, Any]): | ||
self._dsn = config.get("dsn") | ||
if self._dsn is None: | ||
raise Exception("'dsn' is a required argument for the postgres plugin!") | ||
self._source_schema = config.get("source_schema", "public") | ||
self._sink_schema = config.get("sink_schema", "main") | ||
self._overwrite = config.get("overwrite", False) | ||
self._filter_pushdown = config.get("filter_pushdown", False) | ||
""" | ||
Initialize the plugin with the provided configuration. | ||
""" | ||
self.logger.debug("Initializing PostgreSQL plugin with config: %s", config) | ||
|
||
self._dsn: str = config["dsn"] | ||
if not self._dsn: | ||
self.logger.error( | ||
"Initialization failed: 'dsn' is a required argument for the postgres plugin!" | ||
) | ||
raise ValueError("'dsn' is a required argument for the postgres plugin!") | ||
|
||
self._pg_schema: Optional[str] = config.get("pg_schema") # Can be None | ||
self._duckdb_alias: str = config.get("duckdb_alias", "postgres_db") | ||
self._read_only: bool = config.get("read_only", False) | ||
self._secret: Optional[str] = config.get("secret") | ||
self._attach_options: Dict[str, Any] = config.get( | ||
"attach_options", {} | ||
) # Additional ATTACH options | ||
self._settings: Dict[str, Any] = config.get( | ||
"settings", {} | ||
) # Extension settings via SET commands | ||
|
||
self.logger.info( | ||
"PostgreSQL plugin initialized with dsn='%s', pg_schema='%s', " | ||
"duckdb_alias='%s', read_only=%s, secret='%s'", | ||
self._dsn, | ||
self._pg_schema, | ||
self._duckdb_alias, | ||
self._read_only, | ||
self._secret, | ||
) | ||
|
||
def configure_connection(self, conn: DuckDBPyConnection): | ||
conn.install_extension("postgres") | ||
conn.load_extension("postgres") | ||
|
||
if self._sink_schema: | ||
conn.execute(f"CREATE SCHEMA IF NOT EXISTS {self._sink_schema}") | ||
|
||
attach_args = [ | ||
("source_schema", f"'{self._source_schema}'"), | ||
("sink_schema", f"'{self._sink_schema}'"), | ||
("overwrite", str(self._overwrite).lower()), | ||
("filter_pushdown", str(self._filter_pushdown).lower()), | ||
] | ||
attach_stmt = ( | ||
f"CALL postgres_attach('{self._dsn}', {', '.join(f'{k}={v}' for k, v in attach_args)})" | ||
""" | ||
Configure the DuckDB connection to attach the PostgreSQL database. | ||
""" | ||
self.logger.debug("Configuring DuckDB connection for PostgreSQL plugin.") | ||
|
||
conn.install_extension(PG_EXT) | ||
conn.load_extension(PG_EXT) | ||
self.logger.info("PostgreSQL extension installed and loaded.") | ||
|
||
# Set any extension settings provided | ||
self._set_extension_settings(conn) | ||
|
||
# Build and execute the ATTACH command | ||
attach_stmt = self._build_attach_statement() | ||
self.logger.debug("Executing ATTACH statement: %s", attach_stmt) | ||
try: | ||
conn.execute(attach_stmt) | ||
self.logger.info("Successfully attached PostgreSQL database with DSN: %s", self._dsn) | ||
except Exception as e: | ||
self.logger.error("Failed to attach PostgreSQL database: %s", e) | ||
raise | ||
|
||
def _set_extension_settings(self, conn: DuckDBPyConnection): | ||
""" | ||
Set extension settings via SET commands. | ||
""" | ||
for setting, value in self._settings.items(): | ||
# Quote string values | ||
if isinstance(value, str): | ||
value = f"'{value}'" | ||
elif isinstance(value, bool): | ||
value = "true" if value else "false" | ||
set_stmt = f"SET {setting} = {value};" | ||
self.logger.debug("Setting extension option: %s", set_stmt) | ||
try: | ||
conn.execute(set_stmt) | ||
except Exception as e: | ||
self.logger.error("Failed to set option %s: %s", setting, e) | ||
raise | ||
|
||
def _build_attach_statement(self) -> str: | ||
""" | ||
Build the ATTACH statement for connecting to the PostgreSQL database. | ||
""" | ||
attach_options: List[Tuple[str, Optional[str]]] = [("TYPE", "POSTGRES")] | ||
|
||
if self._pg_schema: | ||
attach_options.append(("SCHEMA", f"'{self._pg_schema}'")) | ||
|
||
if self._secret: | ||
attach_options.append(("SECRET", f"'{self._secret}'")) | ||
|
||
# Additional attach options | ||
for k, v in self._attach_options.items(): | ||
if isinstance(v, bool): | ||
v = "true" if v else "false" | ||
elif isinstance(v, str): | ||
v = f"'{v}'" | ||
attach_options.append((k.upper(), v)) | ||
|
||
if self._read_only: | ||
attach_options.append(("READ_ONLY", None)) # No value assigned | ||
|
||
# Convert options to string | ||
attach_options_str = ", ".join( | ||
f"{k} {v}" if v is not None else k for k, v in attach_options | ||
) | ||
conn.execute(attach_stmt) | ||
|
||
attach_stmt = f"ATTACH '{self._dsn}' AS {self._duckdb_alias} ({attach_options_str});" | ||
return attach_stmt |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.