Skip to content

Commit

Permalink
1.7.0 Catalog fetch improvement (#486)
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db authored Oct 26, 2023
2 parents 9127720 + 04b17c2 commit 5d53855
Show file tree
Hide file tree
Showing 9 changed files with 330 additions and 75 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features

- Added support for getting info only on specified relations to improve performance of gathering metadata ([486](https://github.com/databricks/dbt-databricks/pull/486))
- Added support for getting freshness from metadata ([481](https://github.com/databricks/dbt-databricks/pull/481))

## dbt-databricks 1.7.0rc1 (October 13, 2023)
Expand Down
2 changes: 2 additions & 0 deletions dbt/adapters/databricks/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ def __post_init__(self) -> None:
f"Invalid catalog name : `{self.database}`."
)
self.database = database
else:
self.database = "hive_metastore"

connection_parameters = self.connection_parameters or {}
for key in (
Expand Down
98 changes: 49 additions & 49 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from collections import defaultdict
from concurrent.futures import Future
from contextlib import contextmanager
from itertools import chain
from dataclasses import dataclass
import os
import re
Expand All @@ -22,7 +22,7 @@
from dbt.adapters.base import AdapterConfig, PythonJobHelper
from dbt.adapters.base.impl import catch_as_completed
from dbt.adapters.base.meta import available
from dbt.adapters.base.relation import BaseRelation, InformationSchema
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability
from dbt.adapters.spark.impl import (
SparkAdapter,
Expand All @@ -36,7 +36,6 @@
from dbt.clients.agate_helper import DEFAULT_TYPE_TESTER, empty_table
from dbt.contracts.connection import AdapterResponse, Connection
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import ResultNode
from dbt.contracts.relation import RelationType
import dbt.exceptions
from dbt.events import AdapterLogger
Expand All @@ -48,7 +47,11 @@
DbtDatabricksAllPurposeClusterPythonJobHelper,
DbtDatabricksJobClusterPythonJobHelper,
)
from dbt.adapters.databricks.relation import DatabricksRelation, DatabricksRelationType
from dbt.adapters.databricks.relation import is_hive_metastore, extract_identifiers
from dbt.adapters.databricks.relation import (
DatabricksRelation,
DatabricksRelationType,
)
from dbt.adapters.databricks.utils import redact_credentials, undefined_proof


Expand Down Expand Up @@ -109,7 +112,10 @@ class DatabricksAdapter(SparkAdapter):
AdapterSpecificConfigs = DatabricksConfig

_capabilities = CapabilityDict(
{Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full)}
{
Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full),
Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full),
}
)

@available.parse(lambda *a, **k: 0)
Expand Down Expand Up @@ -269,7 +275,7 @@ def typeFromNames(
if view_names[name]
else DatabricksRelationType.View
)
elif database is None or database == "hive_metastore":
elif is_hive_metastore(database):
return DatabricksRelationType.Table
else:
# not a view so it might be a streaming table
Expand Down Expand Up @@ -342,7 +348,7 @@ def parse_describe_extended( # type: ignore[override]
table_owner=str(metadata.get(KEY_TABLE_OWNER)),
table_stats=table_stats,
column=column["col_name"],
column_index=(idx + 1),
column_index=idx,
dtype=column["data_type"],
)
for idx, column in enumerate(rows)
Expand Down Expand Up @@ -413,7 +419,7 @@ def parse_columns_from_information( # type: ignore[override]
table_schema=relation.schema,
table_name=relation.table,
table_type=relation.type,
column_index=(match_num + 1),
column_index=match_num,
table_owner=owner,
column=column_name,
dtype=DatabricksColumn.translate_type(column_type),
Expand All @@ -425,60 +431,54 @@ def parse_columns_from_information( # type: ignore[override]
def get_catalog(
self, manifest: Manifest, selected_nodes: Optional[Set] = None
) -> Tuple[Table, List[Exception]]:
schema_map = self._get_catalog_schemas(manifest)

with executor(self.config) as tpe:
catalog_relations = self._get_catalog_relations(manifest, selected_nodes)
relations_by_catalog = self._get_catalog_relations_by_info_schema(catalog_relations)

futures: List[Future[Table]] = []
for info, schemas in schema_map.items():
for schema in schemas:
futures.append(
tpe.submit_connected(
self,
schema,
self._get_one_catalog,
info,
[schema],
manifest,

for info_schema, relations in relations_by_catalog.items():
if is_hive_metastore(info_schema.database):
schema_map = defaultdict(list)
for relation in relations:
schema_map[relation.schema].append(relation)

for schema, schema_relations in schema_map.items():
futures.append(
tpe.submit_connected(
self,
"hive_metastore",
self._get_hive_catalog,
schema,
schema_relations,
)
)
else:
name = ".".join([str(info_schema.database), "information_schema"])
fut = tpe.submit_connected(
self,
name,
self._get_one_catalog_by_relations,
info_schema,
relations,
manifest,
)
futures.append(fut)

catalogs, exceptions = catch_as_completed(futures)
return catalogs, exceptions

def _get_one_catalog(
def _get_hive_catalog(
self,
information_schema: InformationSchema,
schemas: Set[str],
manifest: Manifest,
schema: str,
relations: List[BaseRelation],
) -> Table:
if len(schemas) != 1:
raise dbt.exceptions.CompilationError(
f"Expected only one schema in spark _get_one_catalog, found " f"{schemas}"
)

database = information_schema.database
schema = list(schemas)[0]

nodes: Iterator[ResultNode] = chain(
(
node
for node in manifest.nodes.values()
if (node.is_relational and not node.is_ephemeral_model)
),
manifest.sources.values(),
)

table_names: Set[str] = set()
for node in nodes:
if node.database == database and node.schema == schema:
relation = self.Relation.create_from(self.config, node)
if relation.identifier:
table_names.add(relation.identifier)

table_names = extract_identifiers(relations)
columns: List[Dict[str, Any]] = []

if len(table_names) > 0:
schema_relation = self.Relation.create(
database=database,
database="hive_metastore",
schema=schema,
identifier=get_identifier_list_string(table_names),
quote_policy=self.config.quoting,
Expand Down
12 changes: 10 additions & 2 deletions dbt/adapters/databricks/relation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Type
from typing import Any, Dict, List, Optional, Set, Type
from dbt.contracts.relation import (
ComponentName,
)
Expand Down Expand Up @@ -44,7 +44,7 @@ class DatabricksInformationSchema(InformationSchema):
quote_character: str = "`"

def is_hive_metastore(self) -> bool:
return self.database is None or self.database == "hive_metastore"
return is_hive_metastore(self.database)


@dataclass(frozen=True, eq=False, repr=False)
Expand Down Expand Up @@ -135,3 +135,11 @@ def information_schema(self, view_name: Optional[str] = None) -> InformationSche
# Instead address this as <database>.information_schema by default
info_schema = DatabricksInformationSchema.from_relation(self, view_name)
return info_schema.incorporate(path={"schema": None})


def is_hive_metastore(database: Optional[str]) -> bool:
return database is None or database.lower() == "hive_metastore"


def extract_identifiers(relations: List[BaseRelation]) -> Set[str]:
return {r.identifier for r in relations if r.identifier is not None}
81 changes: 81 additions & 0 deletions dbt/include/databricks/macros/catalog.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,84 @@
use catalog {{ adapter.quote(catalog) }}
{% endcall %}
{% endmacro %}

{% macro databricks__get_catalog_relations(information_schema, relations) -%}

{% set query %}
with tables as (
{{ databricks__get_catalog_tables_sql(information_schema) }}
{{ databricks__get_catalog_relations_where_clause_sql(relations) }}
),
columns as (
{{ databricks__get_catalog_columns_sql(information_schema) }}
{{ databricks__get_catalog_relations_where_clause_sql(relations) }}
)
{{ databricks__get_catalog_results_sql() }}
{%- endset -%}

{{ return(run_query(query)) }}
{%- endmacro %}

{% macro databricks__get_catalog_tables_sql(information_schema) -%}
select
table_catalog as table_database,
table_schema,
table_name,
lower(if(table_type in ('MANAGED', 'EXTERNAL'), 'table', table_type)) as table_type,
comment as table_comment,
table_owner,
'Last Modified' as `stats:last_modified:label`,
last_altered as `stats:last_modified:value`,
'The timestamp for last update/change' as `stats:last_modified:description`,
(last_altered is not null and table_type not ilike '%VIEW%') as `stats:last_modified:include`
from {{ information_schema }}.tables
{%- endmacro %}

{% macro databricks__get_catalog_columns_sql(information_schema) -%}
select
table_catalog as table_database,
table_schema,
table_name,
column_name,
ordinal_position as column_index,
lower(data_type) as column_type,
comment as column_comment
from {{ information_schema }}.columns
{%- endmacro %}

{% macro databricks__get_catalog_results_sql() -%}
select *
from tables
join columns using (table_database, table_schema, table_name)
order by column_index
{%- endmacro %}

{% macro databricks__get_catalog_schemas_where_clause_sql(schemas) -%}
where ({%- for schema in schemas -%}
upper(table_schema) = upper('{{ schema }}'){%- if not loop.last %} or {% endif -%}
{%- endfor -%})
{%- endmacro %}


{% macro databricks__get_catalog_relations_where_clause_sql(relations) -%}
where (
{%- for relation in relations -%}
{% if relation.schema and relation.identifier %}
(
upper(table_schema) = upper('{{ relation.schema }}')
and upper(table_name) = upper('{{ relation.identifier }}')
)
{% elif relation.schema %}
(
upper(table_schema) = upper('{{ relation.schema }}')
)
{% else %}
{% do exceptions.raise_compiler_error(
'`get_catalog_relations` requires a list of relations, each with a schema'
) %}
{% endif %}

{%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
{%- endmacro %}
Loading

0 comments on commit 5d53855

Please sign in to comment.