Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.7.0 Catalog fetch improvement #486

Merged
merged 6 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features

- Added support for getting info only on specified relations to improve performance of gathering metadata ([486](https://github.com/databricks/dbt-databricks/pull/486))
- Added support for getting freshness from metadata ([481](https://github.com/databricks/dbt-databricks/pull/481))

## dbt-databricks 1.7.0rc1 (October 13, 2023)
Expand Down
2 changes: 2 additions & 0 deletions dbt/adapters/databricks/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ def __post_init__(self) -> None:
f"Invalid catalog name : `{self.database}`."
)
self.database = database
else:
self.database = "hive_metastore"

connection_parameters = self.connection_parameters or {}
for key in (
Expand Down
98 changes: 49 additions & 49 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from collections import defaultdict
from concurrent.futures import Future
from contextlib import contextmanager
from itertools import chain
from dataclasses import dataclass
import os
import re
Expand All @@ -22,7 +22,7 @@
from dbt.adapters.base import AdapterConfig, PythonJobHelper
from dbt.adapters.base.impl import catch_as_completed
from dbt.adapters.base.meta import available
from dbt.adapters.base.relation import BaseRelation, InformationSchema
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability
from dbt.adapters.spark.impl import (
SparkAdapter,
Expand All @@ -36,7 +36,6 @@
from dbt.clients.agate_helper import DEFAULT_TYPE_TESTER, empty_table
from dbt.contracts.connection import AdapterResponse, Connection
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import ResultNode
from dbt.contracts.relation import RelationType
import dbt.exceptions
from dbt.events import AdapterLogger
Expand All @@ -48,7 +47,11 @@
DbtDatabricksAllPurposeClusterPythonJobHelper,
DbtDatabricksJobClusterPythonJobHelper,
)
from dbt.adapters.databricks.relation import DatabricksRelation, DatabricksRelationType
from dbt.adapters.databricks.relation import is_hive_metastore, extract_identifiers
from dbt.adapters.databricks.relation import (
DatabricksRelation,
DatabricksRelationType,
)
from dbt.adapters.databricks.utils import redact_credentials, undefined_proof


Expand Down Expand Up @@ -109,7 +112,10 @@ class DatabricksAdapter(SparkAdapter):
AdapterSpecificConfigs = DatabricksConfig

_capabilities = CapabilityDict(
{Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full)}
{
Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full),
Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full),
}
)

@available.parse(lambda *a, **k: 0)
Expand Down Expand Up @@ -269,7 +275,7 @@ def typeFromNames(
if view_names[name]
else DatabricksRelationType.View
)
elif database is None or database == "hive_metastore":
elif is_hive_metastore(database):
return DatabricksRelationType.Table
else:
# not a view so it might be a streaming table
Expand Down Expand Up @@ -342,7 +348,7 @@ def parse_describe_extended( # type: ignore[override]
table_owner=str(metadata.get(KEY_TABLE_OWNER)),
table_stats=table_stats,
column=column["col_name"],
column_index=(idx + 1),
column_index=idx,
dtype=column["data_type"],
)
for idx, column in enumerate(rows)
Expand Down Expand Up @@ -413,7 +419,7 @@ def parse_columns_from_information( # type: ignore[override]
table_schema=relation.schema,
table_name=relation.table,
table_type=relation.type,
column_index=(match_num + 1),
column_index=match_num,
table_owner=owner,
column=column_name,
dtype=DatabricksColumn.translate_type(column_type),
Expand All @@ -425,60 +431,54 @@ def parse_columns_from_information( # type: ignore[override]
def get_catalog(
self, manifest: Manifest, selected_nodes: Optional[Set] = None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noting for my own sake that in the previous implementation of this method, selected_nodes was optional but was never enforced.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe selected_nodes was added recently for the get_catalog performance improvements. We had a miss where we called it with the additional argument before checking if we could and @benc-db had to update this signature as a result. So the kwarg did nothing besides allowing get_catalog to be called with an additional argument.

) -> Tuple[Table, List[Exception]]:
schema_map = self._get_catalog_schemas(manifest)

with executor(self.config) as tpe:
catalog_relations = self._get_catalog_relations(manifest, selected_nodes)
relations_by_catalog = self._get_catalog_relations_by_info_schema(catalog_relations)

futures: List[Future[Table]] = []
for info, schemas in schema_map.items():
for schema in schemas:
futures.append(
tpe.submit_connected(
self,
schema,
self._get_one_catalog,
info,
[schema],
manifest,

for info_schema, relations in relations_by_catalog.items():
if is_hive_metastore(info_schema.database):
schema_map = defaultdict(list)
for relation in relations:
schema_map[relation.schema].append(relation)

for schema, schema_relations in schema_map.items():
futures.append(
tpe.submit_connected(
self,
"hive_metastore",
self._get_hive_catalog,
schema,
schema_relations,
)
)
else:
name = ".".join([str(info_schema.database), "information_schema"])
fut = tpe.submit_connected(
self,
name,
self._get_one_catalog_by_relations,
info_schema,
relations,
manifest,
)
futures.append(fut)

catalogs, exceptions = catch_as_completed(futures)
return catalogs, exceptions

def _get_one_catalog(
def _get_hive_catalog(
self,
information_schema: InformationSchema,
schemas: Set[str],
manifest: Manifest,
schema: str,
relations: List[BaseRelation],
) -> Table:
if len(schemas) != 1:
raise dbt.exceptions.CompilationError(
f"Expected only one schema in spark _get_one_catalog, found " f"{schemas}"
)

database = information_schema.database
schema = list(schemas)[0]

nodes: Iterator[ResultNode] = chain(
(
node
for node in manifest.nodes.values()
if (node.is_relational and not node.is_ephemeral_model)
),
manifest.sources.values(),
)

table_names: Set[str] = set()
for node in nodes:
if node.database == database and node.schema == schema:
relation = self.Relation.create_from(self.config, node)
if relation.identifier:
table_names.add(relation.identifier)

table_names = extract_identifiers(relations)
columns: List[Dict[str, Any]] = []

if len(table_names) > 0:
schema_relation = self.Relation.create(
database=database,
database="hive_metastore",
schema=schema,
identifier=get_identifier_list_string(table_names),
quote_policy=self.config.quoting,
Expand Down
12 changes: 10 additions & 2 deletions dbt/adapters/databricks/relation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Type
from typing import Any, Dict, List, Optional, Set, Type
from dbt.contracts.relation import (
ComponentName,
)
Expand Down Expand Up @@ -44,7 +44,7 @@ class DatabricksInformationSchema(InformationSchema):
quote_character: str = "`"

def is_hive_metastore(self) -> bool:
return self.database is None or self.database == "hive_metastore"
return is_hive_metastore(self.database)


@dataclass(frozen=True, eq=False, repr=False)
Expand Down Expand Up @@ -135,3 +135,11 @@ def information_schema(self, view_name: Optional[str] = None) -> InformationSche
# Instead address this as <database>.information_schema by default
info_schema = DatabricksInformationSchema.from_relation(self, view_name)
return info_schema.incorporate(path={"schema": None})


def is_hive_metastore(database: Optional[str]) -> bool:
return database is None or database.lower() == "hive_metastore"


def extract_identifiers(relations: List[BaseRelation]) -> Set[str]:
return {r.identifier for r in relations if r.identifier is not None}
81 changes: 81 additions & 0 deletions dbt/include/databricks/macros/catalog.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,84 @@
use catalog {{ adapter.quote(catalog) }}
{% endcall %}
{% endmacro %}

{% macro databricks__get_catalog_relations(information_schema, relations) -%}

{% set query %}
with tables as (
{{ databricks__get_catalog_tables_sql(information_schema) }}
{{ databricks__get_catalog_relations_where_clause_sql(relations) }}
),
columns as (
{{ databricks__get_catalog_columns_sql(information_schema) }}
{{ databricks__get_catalog_relations_where_clause_sql(relations) }}
)
{{ databricks__get_catalog_results_sql() }}
{%- endset -%}

{{ return(run_query(query)) }}
{%- endmacro %}

{% macro databricks__get_catalog_tables_sql(information_schema) -%}
select
table_catalog as table_database,
table_schema,
table_name,
lower(if(table_type in ('MANAGED', 'EXTERNAL'), 'table', table_type)) as table_type,
comment as table_comment,
table_owner,
'Last Modified' as `stats:last_modified:label`,
last_altered as `stats:last_modified:value`,
'The timestamp for last update/change' as `stats:last_modified:description`,
(last_altered is not null and table_type not ilike '%VIEW%') as `stats:last_modified:include`
from {{ information_schema }}.tables
{%- endmacro %}

{% macro databricks__get_catalog_columns_sql(information_schema) -%}
select
table_catalog as table_database,
table_schema,
table_name,
column_name,
ordinal_position as column_index,
lower(data_type) as column_type,
comment as column_comment
from {{ information_schema }}.columns
{%- endmacro %}

{% macro databricks__get_catalog_results_sql() -%}
select *
from tables
join columns using (table_database, table_schema, table_name)
order by column_index
{%- endmacro %}

{% macro databricks__get_catalog_schemas_where_clause_sql(schemas) -%}
where ({%- for schema in schemas -%}
upper(table_schema) = upper('{{ schema }}'){%- if not loop.last %} or {% endif -%}
{%- endfor -%})
{%- endmacro %}


{% macro databricks__get_catalog_relations_where_clause_sql(relations) -%}
where (
{%- for relation in relations -%}
{% if relation.schema and relation.identifier %}
(
upper(table_schema) = upper('{{ relation.schema }}')
and upper(table_name) = upper('{{ relation.identifier }}')
)
{% elif relation.schema %}
(
upper(table_schema) = upper('{{ relation.schema }}')
)
{% else %}
{% do exceptions.raise_compiler_error(
'`get_catalog_relations` requires a list of relations, each with a schema'
) %}
{% endif %}

{%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
{%- endmacro %}
Loading
Loading