Skip to content

Commit

Permalink
1.7.0 Compat (#499)
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db authored Nov 9, 2023
2 parents 55dca91 + 9164b46 commit c5a4c45
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 56 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
DBT_DATABRICKS_CLIENT_ID: ${{ secrets.TEST_PECO_SP_ID }}
DBT_DATABRICKS_CLIENT_SECRET: ${{ secrets.TEST_PECO_SP_SECRET }}
DBT_DATABRICKS_UC_INITIAL_CATALOG: peco
DBT_DATABRICKS_LOCATION_ROOT: ${{ secrets.TEST_PECO_EXTERNAL_LOCATION }}
DBT_DATABRICKS_LOCATION_ROOT: ${{ secrets.TEST_PECO_EXTERNAL_LOCATION }}test
TEST_PECO_UC_CLUSTER_ID: ${{ secrets.TEST_PECO_UC_CLUSTER_ID }}
steps:
- name: Check out repository
Expand Down Expand Up @@ -41,7 +41,7 @@ jobs:
DBT_DATABRICKS_CLIENT_SECRET: ${{ secrets.TEST_PECO_SP_SECRET }}
DBT_DATABRICKS_HTTP_PATH: ${{ secrets.TEST_PECO_WAREHOUSE_HTTP_PATH }}
DBT_DATABRICKS_UC_INITIAL_CATALOG: peco
DBT_DATABRICKS_LOCATION_ROOT: ${{ secrets.TEST_PECO_EXTERNAL_LOCATION }}
DBT_DATABRICKS_LOCATION_ROOT: ${{ secrets.TEST_PECO_EXTERNAL_LOCATION }}test
TEST_PECO_UC_CLUSTER_ID: ${{ secrets.TEST_PECO_UC_CLUSTER_ID }}
steps:
- name: Check out repository
Expand All @@ -67,7 +67,7 @@ jobs:
DBT_DATABRICKS_HOST_NAME: ${{ secrets.DATABRICKS_HOST }}
DBT_DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
TEST_PECO_CLUSTER_ID: ${{ secrets.TEST_PECO_CLUSTER_ID }}
DBT_DATABRICKS_LOCATION_ROOT: ${{ secrets.TEST_PECO_EXTERNAL_LOCATION }}
DBT_DATABRICKS_LOCATION_ROOT: ${{ secrets.TEST_PECO_EXTERNAL_LOCATION }}test
steps:
- name: Check out repository
uses: actions/checkout@v3
Expand Down
9 changes: 5 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
## dbt-databricks 1.7.0 (TBD)
## dbt-databricks 1.7.0 (November 9, 2023)

### Features

- Added support for getting info only on specified relations to improve performance of gathering metadata ([486](https://github.com/databricks/dbt-databricks/pull/486))
- Added support for getting info only on specified relations to improve performance of gathering metadata ([486](https://github.com/databricks/dbt-databricks/pull/486)), also (with generous help from from @mikealfare) ([499](https://github.com/databricks/dbt-databricks/pull/499))
- Added support for getting freshness from metadata ([481](https://github.com/databricks/dbt-databricks/pull/481))

### Fixes

- Node info now gets added to SQLQuery event ([494](https://github.com/databricks/dbt-databricks/pull/494))
- Node info now gets added to SQLQuery event (thanks @davidharting!) ([494](https://github.com/databricks/dbt-databricks/pull/494))
- Compatibility with dbt-spark and dbt-core 1.7.1 ([499](https://github.com/databricks/dbt-databricks/pull/499))

### Under the Hood

- Added required adapter tests to ensure compatibility with 1.7.0 ([487](https://github.com/databricks/dbt-databricks/pull/487))
- Improved large seed performance by not casting every value (thanks @nrichards17!) ([493](https://github.com/databricks/dbt-databricks/pull/493))
- Improved large seed performance by not casting every value (thanks @nrichards17!) ([493](https://github.com/databricks/dbt-databricks/pull/493)). Note: for `file_format="parquet"` we still need to cast.

## dbt-databricks 1.7.0rc1 (October 13, 2023)

Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/databricks/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version: str = "1.7.0rc1"
version: str = "1.7.0"
109 changes: 70 additions & 39 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,9 @@ def get_relations_without_caching(self, relation: DatabricksRelation) -> Table:
kwargs = {"relation": relation}

new_rows: List[Tuple[Optional[str], str, str, str]]
if relation.database is not None:
assert relation.schema is not None
if all([relation.database, relation.schema]):
tables = self.connections.list_tables(
database=relation.database, schema=relation.schema
database=relation.database, schema=relation.schema # type: ignore[arg-type]
)

new_rows = []
Expand All @@ -256,47 +255,35 @@ def get_relations_without_caching(self, relation: DatabricksRelation) -> Table:

# if there are any table types to be resolved
if any(not row[3] for row in new_rows):
# Get view names and create a dictionay of view name to materialization
# Get view names and create a dictionary of view name to materialization
relation_all_tables = self.Relation.create(
database=relation.database, schema=relation.schema, identifier="*"
)

with self._catalog(relation.database):
views = self.execute_macro(SHOW_VIEWS_MACRO_NAME, kwargs=kwargs)

tables = self.execute_macro(
SHOW_TABLE_EXTENDED_MACRO_NAME, kwargs={"schema_relation": relation_all_tables}
)
view_names: Dict[str, bool] = {
view["viewName"]: view.get("isMaterialized", False) for view in views
}

# a function to resolve an unknown table type
def typeFromNames(
database: Optional[str], schema: str, name: str
) -> DatabricksRelationType:
if name in view_names:
# it is either a view or a materialized view
return (
DatabricksRelationType.MaterializedView
if view_names[name]
else DatabricksRelationType.View
)
elif is_hive_metastore(database):
return DatabricksRelationType.Table
else:
# not a view so it might be a streaming table
# get extended information to determine
rel = self.Relation.create(database, schema, name)
rel = self._set_relation_information(rel)
if (
rel.metadata is not None
and rel.metadata.get("Type", "table") == "STREAMING_TABLE"
):
return DatabricksRelationType.StreamingTable
else:
return DatabricksRelationType.Table
table_names: Dict[str, bool] = {
table["tableName"]: (self._parse_type(table["information"]) == "STREAMING_TABLE")
for table in tables
}

# create a new collection of rows with the correct table types
new_rows = [
(
row[0],
row[1],
row[2],
str(row[3] if row[3] else typeFromNames(row[0], row[1], row[2])),
str(
row[3]
if row[3]
else self._type_from_names(row[0], row[2], view_names, table_names)
),
)
for row in new_rows
]
Expand All @@ -307,6 +294,40 @@ def typeFromNames(
column_types=[Text(), Text(), Text(), Text()],
)

def _parse_type(self, information: str) -> str:
type_entry = [
entry.strip() for entry in information.split("\n") if entry.split(":")[0] == "Type"
]
return type_entry[0] if type_entry else ""

def _type_from_names(
self,
database: Optional[str],
name: str,
view_names: Dict[str, bool],
table_names: Dict[str, bool],
) -> DatabricksRelationType:
if name in view_names:
# it is either a view or a materialized view
return (
DatabricksRelationType.MaterializedView
if view_names[name]
else DatabricksRelationType.View
)
elif is_hive_metastore(database):
return DatabricksRelationType.Table
elif name in table_names:
# it is either a table or a streaming table
return (
DatabricksRelationType.StreamingTable
if table_names[name]
else DatabricksRelationType.Table
)
else:
raise dbt.exceptions.DbtRuntimeError(
f"Unexpected relation type discovered: Database:{database}, Relation:{name}"
)

def get_relation(
self,
database: Optional[str],
Expand Down Expand Up @@ -429,18 +450,28 @@ def parse_columns_from_information( # type: ignore[override]
return columns

def get_catalog(
self, manifest: Manifest, selected_nodes: Optional[Set] = None
self, manifest: Manifest, selected_nodes: Optional[Set[Any]] = None
) -> Tuple[Table, List[Exception]]:
if selected_nodes:
relations: Set[BaseRelation] = {
self.Relation.create_from(self.config, n) for n in selected_nodes
}
else:
relations = set(self._get_catalog_relations(manifest))
return self.get_catalog_by_relations(manifest, relations)

def get_catalog_by_relations(
self, manifest: Manifest, relations: Set[BaseRelation]
) -> Tuple[Table, List[Exception]]:
with executor(self.config) as tpe:
catalog_relations = self._get_catalog_relations(manifest, selected_nodes)
relations_by_catalog = self._get_catalog_relations_by_info_schema(catalog_relations)
relations_by_catalog = self._get_catalog_relations_by_info_schema(relations)

futures: List[Future[Table]] = []

for info_schema, relations in relations_by_catalog.items():
for info_schema, catalog_relations in relations_by_catalog.items():
if is_hive_metastore(info_schema.database):
schema_map = defaultdict(list)
for relation in relations:
for relation in catalog_relations:
schema_map[relation.schema].append(relation)

for schema, schema_relations in schema_map.items():
Expand All @@ -460,7 +491,7 @@ def get_catalog(
name,
self._get_one_catalog_by_relations,
info_schema,
relations,
catalog_relations,
manifest,
)
futures.append(fut)
Expand All @@ -471,7 +502,7 @@ def get_catalog(
def _get_hive_catalog(
self,
schema: str,
relations: List[BaseRelation],
relations: Set[BaseRelation],
) -> Table:
table_names = extract_identifiers(relations)
columns: List[Dict[str, Any]] = []
Expand Down
4 changes: 2 additions & 2 deletions dbt/adapters/databricks/relation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Set, Type
from typing import Any, Dict, Optional, Set, Type
from dbt.contracts.relation import (
ComponentName,
)
Expand Down Expand Up @@ -141,5 +141,5 @@ def is_hive_metastore(database: Optional[str]) -> bool:
return database is None or database.lower() == "hive_metastore"


def extract_identifiers(relations: List[BaseRelation]) -> Set[str]:
def extract_identifiers(relations: Set[BaseRelation]) -> Set[str]:
return {r.identifier for r in relations if r.identifier is not None}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

{% set batch_size = get_batch_size() %}
{% set column_override = model['config'].get('column_types', {}) %}
{% set must_cast = model['config'].get("file_format", "delta") == "parquet" %}

{% set statements = [] %}

Expand All @@ -20,7 +21,13 @@
insert {% if loop.index0 == 0 -%} overwrite {% else -%} into {% endif -%} {{ this.render() }} values
{% for row in chunk -%}
({%- for col_name in agate_table.column_names -%}
{{ get_binding_char() }}
{%- if must_cast -%}
{%- set inferred_type = adapter.convert_type(agate_table, loop.index0) -%}
{%- set type = column_override.get(col_name, inferred_type) -%}
cast({{ get_binding_char() }} as {{type}})
{%- else -%}
{{ get_binding_char() }}
{%- endif -%}
{%- if not loop.last%},{%- endif %}
{%- endfor -%})
{%- if not loop.last%},{%- endif %}
Expand Down
4 changes: 2 additions & 2 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ tox>=3.2.0
types-requests
types-mock

dbt-core==1.7.0rc1
dbt-tests-adapter==1.7.0rc1
dbt-core==1.7.1
dbt-tests-adapter==1.7.1
# git+https://github.com/dbt-labs/[email protected]#egg=dbt-spark
# git+https://github.com/dbt-labs/[email protected]#egg=dbt-core&subdirectory=core
# git+https://github.com/dbt-labs/[email protected]#egg=dbt-tests-adapter&subdirectory=tests/adapter
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
databricks-sql-connector>=2.9.3, <3.0.0
dbt-spark==1.7.0rc1
databricks-sdk==0.9.0
dbt-spark==1.7.1
databricks-sdk>=0.9.0
keyring>=23.13.0
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def _get_plugin_version():
packages=find_namespace_packages(include=["dbt", "dbt.*"]),
include_package_data=True,
install_requires=[
"dbt-spark==1.7.0rc1",
"dbt-spark==1.7.1",
"databricks-sql-connector>=2.9.3, <3.0.0",
"databricks-sdk>=0.9.0",
"keyring>=23.13.0",
Expand Down
6 changes: 5 additions & 1 deletion tests/functional/adapter/test_seeds.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dbt.tests.adapter.simple_seed.test_seed import SeedUniqueDelimiterTestBase
from dbt.tests.adapter.simple_seed.test_seed import SeedUniqueDelimiterTestBase, BaseTestEmptySeed
from dbt.tests.util import run_dbt
import pytest

Expand Down Expand Up @@ -557,3 +557,7 @@ def test_seed_with_empty_delimiter(self, project):
"""Testing failure of running dbt seed with an empty configured delimiter value"""
seed_result = run_dbt(["seed"], expect_pass=False)
assert "compilation error" in seed_result.results[0].message.lower()


class TestDatabricksEmptySeed(BaseTestEmptySeed):
pass

0 comments on commit c5a4c45

Please sign in to comment.