From 289dc17a14a67f6f9a40f51fa5fd2a061f4fc92a Mon Sep 17 00:00:00 2001 From: sydneynotthecity Date: Mon, 11 Nov 2024 12:47:02 -0600 Subject: [PATCH] Revert operator default and pass on caller --- dags/dbt_enriched_base_tables_dag.py | 4 ++-- dags/dbt_stellar_marts_dag.py | 24 +++++++++++----------- dags/dbt_stellar_marts_mgi_dag.py | 2 +- dags/stellar_etl_airflow/build_dbt_task.py | 2 +- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/dags/dbt_enriched_base_tables_dag.py b/dags/dbt_enriched_base_tables_dag.py index 1b4a07c6..c44caaf5 100644 --- a/dags/dbt_enriched_base_tables_dag.py +++ b/dags/dbt_enriched_base_tables_dag.py @@ -35,9 +35,9 @@ # DBT models to run enriched_history_operations_task = dbt_task( - dag, tag="enriched_history_operations", excluded="singular_test" + dag, tag="enriched_history_operations", operator="+", excluded="singular_test" ) -current_state_task = dbt_task(dag, tag="current_state") +current_state_task = dbt_task(dag, tag="current_state", operator="+") # DAG task graph wait_on_history_table >> enriched_history_operations_task diff --git a/dags/dbt_stellar_marts_dag.py b/dags/dbt_stellar_marts_dag.py index 9ea08bf4..489c8195 100644 --- a/dags/dbt_stellar_marts_dag.py +++ b/dags/dbt_stellar_marts_dag.py @@ -33,19 +33,19 @@ ) # DBT models to run -ohlc_task = dbt_task(dag, tag="ohlc") -liquidity_pool_trade_volume_task = dbt_task(dag, tag="liquidity_pool_trade_volume") +ohlc_task = dbt_task(dag, tag="ohlc", operator="+") +liquidity_pool_trade_volume_task = dbt_task(dag, tag="liquidity_pool_trade_volume", operator="+") -liquidity_providers_task = dbt_task(dag, tag="liquidity_providers") -liquidity_pools_values_task = dbt_task(dag, tag="liquidity_pools_value") -liquidity_pools_value_history_task = dbt_task(dag, tag="liquidity_pools_value_history") -trade_agg_task = dbt_task(dag, tag="trade_agg") -fee_stats_agg_task = dbt_task(dag, tag="fee_stats") -asset_stats_agg_task = dbt_task(dag, tag="asset_stats") -network_stats_agg_task = dbt_task(dag, tag="network_stats") -partnership_assets_task = dbt_task(dag, tag="partnership_assets") -history_assets = dbt_task(dag, tag="history_assets") -soroban = dbt_task(dag, tag="soroban") +liquidity_providers_task = dbt_task(dag, tag="liquidity_providers", operator="+") +liquidity_pools_values_task = dbt_task(dag, tag="liquidity_pools_value", operator="+") +liquidity_pools_value_history_task = dbt_task(dag, tag="liquidity_pools_value_history", operator="+") +trade_agg_task = dbt_task(dag, tag="trade_agg", operator="+") +fee_stats_agg_task = dbt_task(dag, tag="fee_stats", operator="+") +asset_stats_agg_task = dbt_task(dag, tag="asset_stats", operator="+") +network_stats_agg_task = dbt_task(dag, tag="network_stats", operator="+") +partnership_assets_task = dbt_task(dag, tag="partnership_assets", operator="+") +history_assets = dbt_task(dag, tag="history_assets", operator="+") +soroban = dbt_task(dag, tag="soroban", operator="+") # Disable snapshot state tables because they're broken # snapshot_state = dbt_task(dag, tag="snapshot_state") # Disable releveant_asset_trades due to bugs in SCD tables diff --git a/dags/dbt_stellar_marts_mgi_dag.py b/dags/dbt_stellar_marts_mgi_dag.py index d2fbb994..c5b83181 100644 --- a/dags/dbt_stellar_marts_mgi_dag.py +++ b/dags/dbt_stellar_marts_mgi_dag.py @@ -34,7 +34,7 @@ # DBT models to run -mgi_task = dbt_task(dag, tag="mgi") +mgi_task = dbt_task(dag, tag="mgi", operator="+") wait_on_dbt_enriched_base_tables >> mgi_task wait_on_partner_pipeline_dag >> mgi_task diff --git a/dags/stellar_etl_airflow/build_dbt_task.py b/dags/stellar_etl_airflow/build_dbt_task.py index 7fc4bc8d..53c7f085 100644 --- a/dags/stellar_etl_airflow/build_dbt_task.py +++ b/dags/stellar_etl_airflow/build_dbt_task.py @@ -63,7 +63,7 @@ def dbt_task( model_name=None, tag=None, flag="select", - operator="+", + operator="", command_type="build", excluded=None, resource_cfg="default",