diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 9fec53142..0cf775c3a 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -13,7 +13,7 @@ jobs: DBT_DATABRICKS_CLIENT_ID: ${{ secrets.TEST_PECO_SP_ID }} DBT_DATABRICKS_CLIENT_SECRET: ${{ secrets.TEST_PECO_SP_SECRET }} DBT_DATABRICKS_UC_INITIAL_CATALOG: peco - DBT_DATABRICKS_LOCATION_ROOT: ${{ secrets.TEST_PECO_EXTERNAL_LOCATION }} + DBT_DATABRICKS_LOCATION_ROOT: ${{ secrets.TEST_PECO_EXTERNAL_LOCATION }}test TEST_PECO_UC_CLUSTER_ID: ${{ secrets.TEST_PECO_UC_CLUSTER_ID }} steps: - name: Check out repository @@ -41,7 +41,7 @@ jobs: DBT_DATABRICKS_CLIENT_SECRET: ${{ secrets.TEST_PECO_SP_SECRET }} DBT_DATABRICKS_HTTP_PATH: ${{ secrets.TEST_PECO_WAREHOUSE_HTTP_PATH }} DBT_DATABRICKS_UC_INITIAL_CATALOG: peco - DBT_DATABRICKS_LOCATION_ROOT: ${{ secrets.TEST_PECO_EXTERNAL_LOCATION }} + DBT_DATABRICKS_LOCATION_ROOT: ${{ secrets.TEST_PECO_EXTERNAL_LOCATION }}test TEST_PECO_UC_CLUSTER_ID: ${{ secrets.TEST_PECO_UC_CLUSTER_ID }} steps: - name: Check out repository @@ -67,7 +67,7 @@ jobs: DBT_DATABRICKS_HOST_NAME: ${{ secrets.DATABRICKS_HOST }} DBT_DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }} TEST_PECO_CLUSTER_ID: ${{ secrets.TEST_PECO_CLUSTER_ID }} - DBT_DATABRICKS_LOCATION_ROOT: ${{ secrets.TEST_PECO_EXTERNAL_LOCATION }} + DBT_DATABRICKS_LOCATION_ROOT: ${{ secrets.TEST_PECO_EXTERNAL_LOCATION }}test steps: - name: Check out repository uses: actions/checkout@v3 diff --git a/CHANGELOG.md b/CHANGELOG.md index 65ac37c39..a8717dec4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,18 +1,19 @@ -## dbt-databricks 1.7.0 (TBD) +## dbt-databricks 1.7.0 (November 9, 2023) ### Features -- Added support for getting info only on specified relations to improve performance of gathering metadata ([486](https://github.com/databricks/dbt-databricks/pull/486)) +- Added support for getting info only on specified relations to improve performance of gathering metadata ([486](https://github.com/databricks/dbt-databricks/pull/486)), also (with generous help from from @mikealfare) ([499](https://github.com/databricks/dbt-databricks/pull/499)) - Added support for getting freshness from metadata ([481](https://github.com/databricks/dbt-databricks/pull/481)) ### Fixes -- Node info now gets added to SQLQuery event ([494](https://github.com/databricks/dbt-databricks/pull/494)) +- Node info now gets added to SQLQuery event (thanks @davidharting!) ([494](https://github.com/databricks/dbt-databricks/pull/494)) +- Compatibility with dbt-spark and dbt-core 1.7.1 ([499](https://github.com/databricks/dbt-databricks/pull/499)) ### Under the Hood - Added required adapter tests to ensure compatibility with 1.7.0 ([487](https://github.com/databricks/dbt-databricks/pull/487)) -- Improved large seed performance by not casting every value (thanks @nrichards17!) ([493](https://github.com/databricks/dbt-databricks/pull/493)) +- Improved large seed performance by not casting every value (thanks @nrichards17!) ([493](https://github.com/databricks/dbt-databricks/pull/493)). Note: for `file_format="parquet"` we still need to cast. ## dbt-databricks 1.7.0rc1 (October 13, 2023) diff --git a/dbt/adapters/databricks/__version__.py b/dbt/adapters/databricks/__version__.py index 6c0e68f7b..c97ea9909 100644 --- a/dbt/adapters/databricks/__version__.py +++ b/dbt/adapters/databricks/__version__.py @@ -1 +1 @@ -version: str = "1.7.0rc1" +version: str = "1.7.0" diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index b41c8395b..4d8990c44 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -234,10 +234,9 @@ def get_relations_without_caching(self, relation: DatabricksRelation) -> Table: kwargs = {"relation": relation} new_rows: List[Tuple[Optional[str], str, str, str]] - if relation.database is not None: - assert relation.schema is not None + if all([relation.database, relation.schema]): tables = self.connections.list_tables( - database=relation.database, schema=relation.schema + database=relation.database, schema=relation.schema # type: ignore[arg-type] ) new_rows = [] @@ -256,39 +255,23 @@ def get_relations_without_caching(self, relation: DatabricksRelation) -> Table: # if there are any table types to be resolved if any(not row[3] for row in new_rows): - # Get view names and create a dictionay of view name to materialization + # Get view names and create a dictionary of view name to materialization + relation_all_tables = self.Relation.create( + database=relation.database, schema=relation.schema, identifier="*" + ) + with self._catalog(relation.database): views = self.execute_macro(SHOW_VIEWS_MACRO_NAME, kwargs=kwargs) - + tables = self.execute_macro( + SHOW_TABLE_EXTENDED_MACRO_NAME, kwargs={"schema_relation": relation_all_tables} + ) view_names: Dict[str, bool] = { view["viewName"]: view.get("isMaterialized", False) for view in views } - - # a function to resolve an unknown table type - def typeFromNames( - database: Optional[str], schema: str, name: str - ) -> DatabricksRelationType: - if name in view_names: - # it is either a view or a materialized view - return ( - DatabricksRelationType.MaterializedView - if view_names[name] - else DatabricksRelationType.View - ) - elif is_hive_metastore(database): - return DatabricksRelationType.Table - else: - # not a view so it might be a streaming table - # get extended information to determine - rel = self.Relation.create(database, schema, name) - rel = self._set_relation_information(rel) - if ( - rel.metadata is not None - and rel.metadata.get("Type", "table") == "STREAMING_TABLE" - ): - return DatabricksRelationType.StreamingTable - else: - return DatabricksRelationType.Table + table_names: Dict[str, bool] = { + table["tableName"]: (self._parse_type(table["information"]) == "STREAMING_TABLE") + for table in tables + } # create a new collection of rows with the correct table types new_rows = [ @@ -296,7 +279,11 @@ def typeFromNames( row[0], row[1], row[2], - str(row[3] if row[3] else typeFromNames(row[0], row[1], row[2])), + str( + row[3] + if row[3] + else self._type_from_names(row[0], row[2], view_names, table_names) + ), ) for row in new_rows ] @@ -307,6 +294,40 @@ def typeFromNames( column_types=[Text(), Text(), Text(), Text()], ) + def _parse_type(self, information: str) -> str: + type_entry = [ + entry.strip() for entry in information.split("\n") if entry.split(":")[0] == "Type" + ] + return type_entry[0] if type_entry else "" + + def _type_from_names( + self, + database: Optional[str], + name: str, + view_names: Dict[str, bool], + table_names: Dict[str, bool], + ) -> DatabricksRelationType: + if name in view_names: + # it is either a view or a materialized view + return ( + DatabricksRelationType.MaterializedView + if view_names[name] + else DatabricksRelationType.View + ) + elif is_hive_metastore(database): + return DatabricksRelationType.Table + elif name in table_names: + # it is either a table or a streaming table + return ( + DatabricksRelationType.StreamingTable + if table_names[name] + else DatabricksRelationType.Table + ) + else: + raise dbt.exceptions.DbtRuntimeError( + f"Unexpected relation type discovered: Database:{database}, Relation:{name}" + ) + def get_relation( self, database: Optional[str], @@ -429,18 +450,28 @@ def parse_columns_from_information( # type: ignore[override] return columns def get_catalog( - self, manifest: Manifest, selected_nodes: Optional[Set] = None + self, manifest: Manifest, selected_nodes: Optional[Set[Any]] = None + ) -> Tuple[Table, List[Exception]]: + if selected_nodes: + relations: Set[BaseRelation] = { + self.Relation.create_from(self.config, n) for n in selected_nodes + } + else: + relations = set(self._get_catalog_relations(manifest)) + return self.get_catalog_by_relations(manifest, relations) + + def get_catalog_by_relations( + self, manifest: Manifest, relations: Set[BaseRelation] ) -> Tuple[Table, List[Exception]]: with executor(self.config) as tpe: - catalog_relations = self._get_catalog_relations(manifest, selected_nodes) - relations_by_catalog = self._get_catalog_relations_by_info_schema(catalog_relations) + relations_by_catalog = self._get_catalog_relations_by_info_schema(relations) futures: List[Future[Table]] = [] - for info_schema, relations in relations_by_catalog.items(): + for info_schema, catalog_relations in relations_by_catalog.items(): if is_hive_metastore(info_schema.database): schema_map = defaultdict(list) - for relation in relations: + for relation in catalog_relations: schema_map[relation.schema].append(relation) for schema, schema_relations in schema_map.items(): @@ -460,7 +491,7 @@ def get_catalog( name, self._get_one_catalog_by_relations, info_schema, - relations, + catalog_relations, manifest, ) futures.append(fut) @@ -471,7 +502,7 @@ def get_catalog( def _get_hive_catalog( self, schema: str, - relations: List[BaseRelation], + relations: Set[BaseRelation], ) -> Table: table_names = extract_identifiers(relations) columns: List[Dict[str, Any]] = [] diff --git a/dbt/adapters/databricks/relation.py b/dbt/adapters/databricks/relation.py index 63309e951..29dd60634 100644 --- a/dbt/adapters/databricks/relation.py +++ b/dbt/adapters/databricks/relation.py @@ -1,5 +1,5 @@ from dataclasses import dataclass, field -from typing import Any, Dict, List, Optional, Set, Type +from typing import Any, Dict, Optional, Set, Type from dbt.contracts.relation import ( ComponentName, ) @@ -141,5 +141,5 @@ def is_hive_metastore(database: Optional[str]) -> bool: return database is None or database.lower() == "hive_metastore" -def extract_identifiers(relations: List[BaseRelation]) -> Set[str]: +def extract_identifiers(relations: Set[BaseRelation]) -> Set[str]: return {r.identifier for r in relations if r.identifier is not None} diff --git a/dbt/include/databricks/macros/materializations/seeds/helpers.sql b/dbt/include/databricks/macros/materializations/seeds/helpers.sql index 30483018f..67e0f4227 100644 --- a/dbt/include/databricks/macros/materializations/seeds/helpers.sql +++ b/dbt/include/databricks/macros/materializations/seeds/helpers.sql @@ -6,6 +6,7 @@ {% set batch_size = get_batch_size() %} {% set column_override = model['config'].get('column_types', {}) %} + {% set must_cast = model['config'].get("file_format", "delta") == "parquet" %} {% set statements = [] %} @@ -20,7 +21,13 @@ insert {% if loop.index0 == 0 -%} overwrite {% else -%} into {% endif -%} {{ this.render() }} values {% for row in chunk -%} ({%- for col_name in agate_table.column_names -%} - {{ get_binding_char() }} + {%- if must_cast -%} + {%- set inferred_type = adapter.convert_type(agate_table, loop.index0) -%} + {%- set type = column_override.get(col_name, inferred_type) -%} + cast({{ get_binding_char() }} as {{type}}) + {%- else -%} + {{ get_binding_char() }} + {%- endif -%} {%- if not loop.last%},{%- endif %} {%- endfor -%}) {%- if not loop.last%},{%- endif %} diff --git a/dev-requirements.txt b/dev-requirements.txt index ad1461b83..22b46a03b 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -23,8 +23,8 @@ tox>=3.2.0 types-requests types-mock -dbt-core==1.7.0rc1 -dbt-tests-adapter==1.7.0rc1 +dbt-core==1.7.1 +dbt-tests-adapter==1.7.1 # git+https://github.com/dbt-labs/dbt-spark.git@1.5.latest#egg=dbt-spark # git+https://github.com/dbt-labs/dbt-core.git@1.5.latest#egg=dbt-core&subdirectory=core # git+https://github.com/dbt-labs/dbt-core.git@1.5.latest#egg=dbt-tests-adapter&subdirectory=tests/adapter diff --git a/requirements.txt b/requirements.txt index f9602a9df..8cc656c9b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ databricks-sql-connector>=2.9.3, <3.0.0 -dbt-spark==1.7.0rc1 -databricks-sdk==0.9.0 +dbt-spark==1.7.1 +databricks-sdk>=0.9.0 keyring>=23.13.0 diff --git a/setup.py b/setup.py index 8987a0347..7e7bfde10 100644 --- a/setup.py +++ b/setup.py @@ -54,7 +54,7 @@ def _get_plugin_version(): packages=find_namespace_packages(include=["dbt", "dbt.*"]), include_package_data=True, install_requires=[ - "dbt-spark==1.7.0rc1", + "dbt-spark==1.7.1", "databricks-sql-connector>=2.9.3, <3.0.0", "databricks-sdk>=0.9.0", "keyring>=23.13.0", diff --git a/tests/functional/adapter/test_seeds.py b/tests/functional/adapter/test_seeds.py index dd5254b79..79161baf5 100644 --- a/tests/functional/adapter/test_seeds.py +++ b/tests/functional/adapter/test_seeds.py @@ -1,4 +1,4 @@ -from dbt.tests.adapter.simple_seed.test_seed import SeedUniqueDelimiterTestBase +from dbt.tests.adapter.simple_seed.test_seed import SeedUniqueDelimiterTestBase, BaseTestEmptySeed from dbt.tests.util import run_dbt import pytest @@ -557,3 +557,7 @@ def test_seed_with_empty_delimiter(self, project): """Testing failure of running dbt seed with an empty configured delimiter value""" seed_result = run_dbt(["seed"], expect_pass=False) assert "compilation error" in seed_result.results[0].message.lower() + + +class TestDatabricksEmptySeed(BaseTestEmptySeed): + pass