diff --git a/.changes/unreleased/Features-20241031-201031.yaml b/.changes/unreleased/Features-20241031-201031.yaml new file mode 100644 index 00000000000..7d5f5500f0f --- /dev/null +++ b/.changes/unreleased/Features-20241031-201031.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Return agate_table in dbt run-operation result +time: 2024-10-31T20:10:31.10956+08:00 +custom: + Author: acjh + Issue: "10956" diff --git a/core/dbt/artifacts/schemas/run/v5/run.py b/core/dbt/artifacts/schemas/run/v5/run.py index f354443d9e4..3dd2ec95944 100644 --- a/core/dbt/artifacts/schemas/run/v5/run.py +++ b/core/dbt/artifacts/schemas/run/v5/run.py @@ -72,6 +72,9 @@ class RunResultOutput(BaseResult): compiled: Optional[bool] compiled_code: Optional[str] relation_name: Optional[str] + agate_table: Optional["agate.Table"] = field( + default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None} + ) batch_results: Optional[BatchResults] = None @@ -88,6 +91,7 @@ def process_run_result(result: RunResult) -> RunResultOutput: message=result.message, adapter_response=result.adapter_response, failures=result.failures, + agate_table=result.agate_table, batch_results=result.batch_results, compiled=result.node.compiled if compiled else None, # type:ignore compiled_code=result.node.compiled_code if compiled else None, # type:ignore @@ -128,7 +132,7 @@ def from_execution_results( args: Dict, ): processed_results = [ - process_run_result(result) for result in results if isinstance(result, RunResult) + cls._process_run_result(result) for result in results if isinstance(result, RunResult) ] meta = RunResultsMetadata( dbt_schema_version=str(cls.dbt_schema_version), @@ -178,5 +182,9 @@ def upgrade_schema_version(cls, data): result["relation_name"] = "" return cls.from_dict(data) + @classmethod + def _process_run_result(cls, result: RunResult) -> RunResultOutput: + return process_run_result(result) + def write(self, path: str): write_json(path, self.to_dict(omit_none=False)) diff --git a/core/dbt/task/run_operation.py b/core/dbt/task/run_operation.py index f87ac63e04e..5fd139a53c0 100644 --- a/core/dbt/task/run_operation.py +++ b/core/dbt/task/run_operation.py @@ -49,6 +49,9 @@ def _run_unsafe(self, package_name, macro_name) -> "agate.Table": macro_name, project=package_name, kwargs=macro_kwargs, macro_resolver=self.manifest ) + if isinstance(res, str): + return None + return res def run(self) -> RunResultsArtifact: @@ -61,10 +64,11 @@ def run(self) -> RunResultsArtifact: success = True package_name, macro_name = self._get_macro_parts() + execute_macro_result = None with collect_timing_info("execute", timing.append): try: - self._run_unsafe(package_name, macro_name) + execute_macro_result = self._run_unsafe(package_name, macro_name) except dbt_common.exceptions.DbtBaseException as exc: fire_event(RunningOperationCaughtError(exc=str(exc))) fire_event(LogDebugStackTrace(exc_info=traceback.format_exc())) @@ -113,6 +117,7 @@ def run(self) -> RunResultsArtifact: ), thread_id=threading.current_thread().name, timing=timing, + agate_table=execute_macro_result, batch_results=None, ) diff --git a/tests/functional/adapter/dbt_run_operations/__init__.py b/tests/functional/adapter/dbt_run_operations/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/functional/adapter/dbt_run_operations/fixtures.py b/tests/functional/adapter/dbt_run_operations/fixtures.py new file mode 100644 index 00000000000..672a9e20a7d --- /dev/null +++ b/tests/functional/adapter/dbt_run_operations/fixtures.py @@ -0,0 +1,16 @@ +happy_macros_sql = """ +{% macro select_something(name) %} + {% set query %} + select 'hello, {{ name }}' as name + {% endset %} + {% set table = run_query(query) %} +{% endmacro %} + +{% macro select_something_with_return(name) %} + {% set query %} + select 'hello, {{ name }}' as name + {% endset %} + {% set table = run_query(query) %} + {% do return(table) %} +{% endmacro %} +""" diff --git a/tests/functional/adapter/dbt_run_operations/test_dbt_run_operations.py b/tests/functional/adapter/dbt_run_operations/test_dbt_run_operations.py new file mode 100644 index 00000000000..7b8a1e4960e --- /dev/null +++ b/tests/functional/adapter/dbt_run_operations/test_dbt_run_operations.py @@ -0,0 +1,33 @@ +import pytest +import yaml + +from dbt.tests.util import run_dbt +from tests.functional.adapter.dbt_run_operations.fixtures import happy_macros_sql + + +# -- Below we define base classes for tests you import based on if your adapter supports dbt run-operation or not -- +class BaseRunOperationResult: + @pytest.fixture(scope="class") + def macros(self): + return {"happy_macros.sql": happy_macros_sql} + + def run_operation(self, macro, expect_pass=True, extra_args=None, **kwargs): + args = ["run-operation", macro] + if kwargs: + args.extend(("--args", yaml.safe_dump(kwargs))) + if extra_args: + args.extend(extra_args) + return run_dbt(args, expect_pass=expect_pass) + + def test_result_without_return(self, project): + results = self.run_operation("select_something", name="world") + assert results.results[0].agate_table is None + + def test_result_with_return(self, project): + results = self.run_operation("select_something_with_return", name="world") + assert len(results.results[0].agate_table) == 1 + assert results.results[0].agate_table.rows[0]["name"] == "hello, world" + + +class TestPostgresRunOperationResult(BaseRunOperationResult): + pass