From 9712ab2427b0937b59c6c68372827b9c94aaca69 Mon Sep 17 00:00:00 2001 From: Ben Cassell Date: Fri, 13 Oct 2023 16:15:47 -0700 Subject: [PATCH 1/5] first blush at implementing this --- dbt/adapters/databricks/impl.py | 7 ++- dbt/adapters/databricks/relation.py | 22 ++++++- dbt/include/databricks/macros/metadata.sql | 32 ++++++++++ .../adapter/test_source_freshness.py | 59 +++++++++++++++++++ 4 files changed, 118 insertions(+), 2 deletions(-) create mode 100644 dbt/include/databricks/macros/metadata.sql create mode 100644 tests/functional/adapter/test_source_freshness.py diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index e1d6e1973..8e469f6da 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -20,9 +20,10 @@ from agate import Row, Table, Text from dbt.adapters.base import AdapterConfig, PythonJobHelper -from dbt.adapters.base.impl import catch_as_completed +from dbt.adapters.base.impl import catch_as_completed, FreshnessResponse from dbt.adapters.base.meta import available from dbt.adapters.base.relation import BaseRelation, InformationSchema +from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability from dbt.adapters.spark.impl import ( SparkAdapter, GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, @@ -107,6 +108,10 @@ class DatabricksAdapter(SparkAdapter): AdapterSpecificConfigs = DatabricksConfig + _capabilities = CapabilityDict( + {Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full)} + ) + @available.parse(lambda *a, **k: 0) def compare_dbr_version(self, major: int, minor: int) -> int: """ diff --git a/dbt/adapters/databricks/relation.py b/dbt/adapters/databricks/relation.py index f26f43499..e84fed528 100644 --- a/dbt/adapters/databricks/relation.py +++ b/dbt/adapters/databricks/relation.py @@ -3,7 +3,7 @@ from dbt.contracts.relation import ( ComponentName, ) -from dbt.adapters.base.relation import BaseRelation, Policy +from dbt.adapters.base.relation import BaseRelation, Policy, InformationSchema from dbt.adapters.spark.impl import KEY_TABLE_OWNER, KEY_TABLE_STATISTICS from dbt.dataclass_schema import StrEnum @@ -37,6 +37,16 @@ class DatabricksRelationType(StrEnum): StreamingTable = "streamingtable" +@dataclass(frozen=True, eq=False, repr=False) +class DatabricksInformationSchema(InformationSchema): + quote_policy: Policy = field(default_factory=lambda: DatabricksQuotePolicy()) + include_policy: Policy = field(default_factory=lambda: DatabricksIncludePolicy()) + quote_character: str = "`" + + def is_hive_metastore(self): + return self.database is None or self.database == "hive_metastore" + + @dataclass(frozen=True, eq=False, repr=False) class DatabricksRelation(BaseRelation): type: Optional[DatabricksRelationType] = None # type: ignore @@ -115,3 +125,13 @@ def matches( @classproperty def get_relation_type(cls) -> Type[DatabricksRelationType]: return DatabricksRelationType + + def information_schema(self, view_name=None) -> InformationSchema: + # some of our data comes from jinja, where things can be `Undefined`. + if not isinstance(view_name, str): + view_name = None + + # Kick the user-supplied schema out of the information schema relation + # Instead address this as .information_schema by default + info_schema = DatabricksInformationSchema.from_relation(self, view_name) + return info_schema.incorporate(path={"schema": None}) diff --git a/dbt/include/databricks/macros/metadata.sql b/dbt/include/databricks/macros/metadata.sql new file mode 100644 index 000000000..d30e046a6 --- /dev/null +++ b/dbt/include/databricks/macros/metadata.sql @@ -0,0 +1,32 @@ +{% macro databricks__get_relation_last_modified(information_schema, relations) -%} + + {%- call statement('last_modified', fetch_result=True) -%} + {% if information_schema.is_hive_metastore %} + {%- for relation in relations -%} + select '{{ relation.schema }}' as schema, + '{{ relation.identifier }}' as identifier, + max(timestamp) as last_modified, + {{ current_timestamp() }} as snapshotted_at + from (describe history {{ relation.schema }}.{{ relation.identifier }}) + {% if not loop.last %} + union all + {% endif %} + {%- endfor -%} + {% else %} + select table_schema as schema, + table_name as identifier, + last_altered as last_modified, + {{ current_timestamp() }} as snapshotted_at + from {{ information_schema }}.tables + where ( + {%- for relation in relations -%} + (upper(table_schema) = upper('{{ relation.schema }}') and + upper(table_name) = upper('{{ relation.identifier }}')){%- if not loop.last %} or {% endif -%} + {%- endfor -%} + ) + {% endif %} + {%- endcall -%} + + {{ return(load_result('last_modified')) }} + +{% endmacro %} \ No newline at end of file diff --git a/tests/functional/adapter/test_source_freshness.py b/tests/functional/adapter/test_source_freshness.py new file mode 100644 index 000000000..76051b635 --- /dev/null +++ b/tests/functional/adapter/test_source_freshness.py @@ -0,0 +1,59 @@ +import os +import pytest + +from dbt.cli.main import dbtRunner + +freshness_via_metadata_schema_yml = """version: 2 +sources: + - name: test_source + freshness: + warn_after: {count: 10, period: hour} + error_after: {count: 1, period: day} + schema: "{{ env_var('DBT_GET_RELATION_TEST_SCHEMA') }}" + tables: + - name: test_table +""" + + +class TestGetRelationLastModified: + @pytest.fixture(scope="class", autouse=True) + def set_env_vars(self, project): + os.environ["DBT_GET_RELATION_TEST_SCHEMA"] = project.test_schema + yield + del os.environ["DBT_GET_RELATION_TEST_SCHEMA"] + + @pytest.fixture(scope="class") + def models(self): + return {"schema.yml": freshness_via_metadata_schema_yml} + + @pytest.fixture(scope="class") + def custom_schema(self, project, set_env_vars): + with project.adapter.connection_named("__test"): + relation = project.adapter.Relation.create( + database=project.database, schema=os.environ["DBT_GET_RELATION_TEST_SCHEMA"] + ) + project.adapter.drop_schema(relation) + project.adapter.create_schema(relation) + + yield relation.schema + + with project.adapter.connection_named("__test"): + project.adapter.drop_schema(relation) + + def test_get_relation_last_modified(self, project, set_env_vars, custom_schema): + project.run_sql( + f"create table {custom_schema}.test_table (id integer, name varchar(100) not null);" + ) + + warning_or_error = False + + def probe(e): + nonlocal warning_or_error + if e.info.level in ["warning", "error"]: + warning_or_error = True + + runner = dbtRunner(callbacks=[probe]) + runner.invoke(["source", "freshness"]) + + # The 'source freshness' command should succeed without warnings or errors. + assert not warning_or_error From 5f9d50788b69189d2495bf2ed82e570df94633bb Mon Sep 17 00:00:00 2001 From: Ben Cassell Date: Tue, 17 Oct 2023 10:09:54 -0700 Subject: [PATCH 2/5] pass, rather than not error --- tests/functional/adapter/test_source_freshness.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/tests/functional/adapter/test_source_freshness.py b/tests/functional/adapter/test_source_freshness.py index 76051b635..6d8c26636 100644 --- a/tests/functional/adapter/test_source_freshness.py +++ b/tests/functional/adapter/test_source_freshness.py @@ -1,7 +1,7 @@ import os import pytest -from dbt.cli.main import dbtRunner +from dbt.tests.util import get_artifact, run_dbt freshness_via_metadata_schema_yml = """version: 2 sources: @@ -45,15 +45,8 @@ def test_get_relation_last_modified(self, project, set_env_vars, custom_schema): f"create table {custom_schema}.test_table (id integer, name varchar(100) not null);" ) - warning_or_error = False + run_dbt(["source", "freshness"]) - def probe(e): - nonlocal warning_or_error - if e.info.level in ["warning", "error"]: - warning_or_error = True + sources = get_artifact("target/sources.json") - runner = dbtRunner(callbacks=[probe]) - runner.invoke(["source", "freshness"]) - - # The 'source freshness' command should succeed without warnings or errors. - assert not warning_or_error + assert sources["results"][0]["status"] == "pass" From 92373ecf7ab6e4581c22d055a7c6341415292d08 Mon Sep 17 00:00:00 2001 From: Ben Cassell Date: Tue, 17 Oct 2023 10:24:09 -0700 Subject: [PATCH 3/5] remove unused fixture reference --- tests/functional/adapter/test_source_freshness.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/functional/adapter/test_source_freshness.py b/tests/functional/adapter/test_source_freshness.py index 6d8c26636..fe0b0318a 100644 --- a/tests/functional/adapter/test_source_freshness.py +++ b/tests/functional/adapter/test_source_freshness.py @@ -40,7 +40,7 @@ def custom_schema(self, project, set_env_vars): with project.adapter.connection_named("__test"): project.adapter.drop_schema(relation) - def test_get_relation_last_modified(self, project, set_env_vars, custom_schema): + def test_get_relation_last_modified(self, project, custom_schema): project.run_sql( f"create table {custom_schema}.test_table (id integer, name varchar(100) not null);" ) From 823397fa5addefc3c2224389d916d53b29c97878 Mon Sep 17 00:00:00 2001 From: Ben Cassell <98852248+benc-db@users.noreply.github.com> Date: Mon, 23 Oct 2023 10:11:32 -0700 Subject: [PATCH 4/5] Update tests/functional/adapter/test_source_freshness.py Co-authored-by: Jesse --- tests/functional/adapter/test_source_freshness.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/functional/adapter/test_source_freshness.py b/tests/functional/adapter/test_source_freshness.py index fe0b0318a..ea8091571 100644 --- a/tests/functional/adapter/test_source_freshness.py +++ b/tests/functional/adapter/test_source_freshness.py @@ -3,7 +3,8 @@ from dbt.tests.util import get_artifact, run_dbt -freshness_via_metadata_schema_yml = """version: 2 +freshness_via_metadata_schema_yml = """ +version: 2 sources: - name: test_source freshness: From 8204482be8f8719733c9cf170211d37ccf622524 Mon Sep 17 00:00:00 2001 From: Ben Cassell Date: Mon, 23 Oct 2023 10:17:52 -0700 Subject: [PATCH 5/5] addressing comment, passing lint --- CHANGELOG.md | 4 ++++ dbt/adapters/databricks/impl.py | 2 +- dbt/adapters/databricks/relation.py | 4 ++-- dbt/include/databricks/macros/metadata.sql | 4 ++-- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a2ec4f82f..d9dc0441d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## dbt-databricks 1.7.x (TBD) +### Features + +- Added support for getting freshness from metadata ([481](https://github.com/databricks/dbt-databricks/pull/481)) + ## dbt-databricks 1.7.0rc1 (October 13, 2023) ### Fixes diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index 8e469f6da..88869fa1b 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -20,7 +20,7 @@ from agate import Row, Table, Text from dbt.adapters.base import AdapterConfig, PythonJobHelper -from dbt.adapters.base.impl import catch_as_completed, FreshnessResponse +from dbt.adapters.base.impl import catch_as_completed from dbt.adapters.base.meta import available from dbt.adapters.base.relation import BaseRelation, InformationSchema from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability diff --git a/dbt/adapters/databricks/relation.py b/dbt/adapters/databricks/relation.py index e84fed528..00d6720a4 100644 --- a/dbt/adapters/databricks/relation.py +++ b/dbt/adapters/databricks/relation.py @@ -43,7 +43,7 @@ class DatabricksInformationSchema(InformationSchema): include_policy: Policy = field(default_factory=lambda: DatabricksIncludePolicy()) quote_character: str = "`" - def is_hive_metastore(self): + def is_hive_metastore(self) -> bool: return self.database is None or self.database == "hive_metastore" @@ -126,7 +126,7 @@ def matches( def get_relation_type(cls) -> Type[DatabricksRelationType]: return DatabricksRelationType - def information_schema(self, view_name=None) -> InformationSchema: + def information_schema(self, view_name: Optional[str] = None) -> InformationSchema: # some of our data comes from jinja, where things can be `Undefined`. if not isinstance(view_name, str): view_name = None diff --git a/dbt/include/databricks/macros/metadata.sql b/dbt/include/databricks/macros/metadata.sql index d30e046a6..42ee73033 100644 --- a/dbt/include/databricks/macros/metadata.sql +++ b/dbt/include/databricks/macros/metadata.sql @@ -20,8 +20,8 @@ from {{ information_schema }}.tables where ( {%- for relation in relations -%} - (upper(table_schema) = upper('{{ relation.schema }}') and - upper(table_name) = upper('{{ relation.identifier }}')){%- if not loop.last %} or {% endif -%} + (table_schema = '{{ relation.schema }}' and + table_name = '{{ relation.identifier }}'){%- if not loop.last %} or {% endif -%} {%- endfor -%} ) {% endif %}