From 6f737a089fc847ca88702ed6c039351384bfd99a Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Fri, 22 Mar 2024 19:04:05 -0300 Subject: [PATCH 1/9] changing alagamento query and data_final --- pipelines/rj_cor/comando/eventos/constants.py | 8 +++++--- pipelines/rj_cor/comando/eventos/tasks.py | 1 + 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pipelines/rj_cor/comando/eventos/constants.py b/pipelines/rj_cor/comando/eventos/constants.py index 152d698cf..f5410e1d4 100644 --- a/pipelines/rj_cor/comando/eventos/constants.py +++ b/pipelines/rj_cor/comando/eventos/constants.py @@ -36,9 +36,11 @@ class constants(Enum): # pylint: disable=c0103 -- FROM `rj-cor.adm_cor_comando_staging.ocorrencias` FROM `rj-cor.adm_cor_comando_staging.ocorrencias_nova_api` WHERE id_pop IN ("5", "6", "31", "32", "33") - AND data_particao >= CAST(DATE_TRUNC(TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE), day) AS STRING) - AND CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE) - -- AND data_fim IS NULL + -- AND CAST(data_particao AS DATE) >= CAST(DATE_TRUNC(TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE), day) AS date) + AND CAST(data_particao AS DATE) >= CAST(DATE_TRUNC(TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 24 hour), day) AS date) + AND CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 24 HOUR) + -- AND CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE) + AND data_fim IS NULL AND status = "Aberto" ), diff --git a/pipelines/rj_cor/comando/eventos/tasks.py b/pipelines/rj_cor/comando/eventos/tasks.py index fb1c6e5c1..dde0df896 100644 --- a/pipelines/rj_cor/comando/eventos/tasks.py +++ b/pipelines/rj_cor/comando/eventos/tasks.py @@ -125,6 +125,7 @@ def treat_data_ocorrencias( "id": "id_evento", "pop_id": "id_pop", "inicio": "data_inicio", + "fim": "data_fim", "pop": "pop_titulo", "titulo": "pop_especificacao", } From 8c856791e5f2946e401087269623ed0b91568b86 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Mon, 25 Mar 2024 10:06:10 -0300 Subject: [PATCH 2/9] changing 120 query --- pipelines/rj_cor/comando/eventos/constants.py | 8 +++++--- pipelines/rj_escritorio/rain_dashboard/tasks.py | 1 + 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pipelines/rj_cor/comando/eventos/constants.py b/pipelines/rj_cor/comando/eventos/constants.py index f5410e1d4..c5cbb9ea3 100644 --- a/pipelines/rj_cor/comando/eventos/constants.py +++ b/pipelines/rj_cor/comando/eventos/constants.py @@ -115,10 +115,12 @@ class constants(Enum): # pylint: disable=c0103 -- FROM `rj-cor.adm_cor_comando_staging.ocorrencias` FROM `rj-cor.adm_cor_comando_staging.ocorrencias_nova_api` WHERE id_pop IN ("5", "6", "31", "32", "33") - AND data_particao >= CAST(DATE_TRUNC(TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 120 MINUTE), day) AS STRING) - AND (CAST(data_fim AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 120 MINUTE) + AND CAST(data_particao AS DATE) >= CAST(DATE_TRUNC(TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 24 hour), day) AS date) + AND CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 24 HOUR) + -- AND data_particao >= CAST(DATE_TRUNC(TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 24 hour), day) AS STRING) + -- AND (CAST(data_fim AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 24 hour) -- OR data_fim IS NULL - ) + -- ) AND status = "Aberto" ), diff --git a/pipelines/rj_escritorio/rain_dashboard/tasks.py b/pipelines/rj_escritorio/rain_dashboard/tasks.py index cd02a73f1..c78e039dd 100644 --- a/pipelines/rj_escritorio/rain_dashboard/tasks.py +++ b/pipelines/rj_escritorio/rain_dashboard/tasks.py @@ -76,3 +76,4 @@ def set_redis_key( redis_client = get_redis_client(host=host, port=port, db=db) redis_client.set(key, value) log("Redis key set successfully.") + log(f"key: {key} and value: {value}") From dea2081627dde9ca6665385433d79cbec588df4e Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Mon, 25 Mar 2024 16:27:19 -0300 Subject: [PATCH 3/9] query dashboard --- pipelines/rj_cor/comando/eventos/constants.py | 49 +++++++++++++------ 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/pipelines/rj_cor/comando/eventos/constants.py b/pipelines/rj_cor/comando/eventos/constants.py index c5cbb9ea3..7ecd605e0 100644 --- a/pipelines/rj_cor/comando/eventos/constants.py +++ b/pipelines/rj_cor/comando/eventos/constants.py @@ -32,16 +32,17 @@ class constants(Enum): # pylint: disable=c0103 WHEN id_pop = "33" THEN 1 -- "Lâmina d'água" END AS tipo, ST_GEOGPOINT(CAST(longitude AS FLOAT64), - CAST(latitude AS FLOAT64)) AS geometry + CAST(latitude AS FLOAT64)) AS geometry, + status AS status_ocorrencia, + data_inicio, + data_fim, + row_number() OVER (PARTITION BY id_evento ORDER BY created_at DESC) AS last_update -- FROM `rj-cor.adm_cor_comando_staging.ocorrencias` FROM `rj-cor.adm_cor_comando_staging.ocorrencias_nova_api` WHERE id_pop IN ("5", "6", "31", "32", "33") - -- AND CAST(data_particao AS DATE) >= CAST(DATE_TRUNC(TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE), day) AS date) - AND CAST(data_particao AS DATE) >= CAST(DATE_TRUNC(TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 24 hour), day) AS date) - AND CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 24 HOUR) - -- AND CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE) - AND data_fim IS NULL - AND status = "Aberto" + AND CAST(data_particao AS DATE) >= CAST(DATE_TRUNC(TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE), day) AS date) + AND CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE) + -- AND data_fim IS NULL # data_fim não está confiável, temos status fechados sem esse campo ), intersected_areas AS ( @@ -72,6 +73,10 @@ class constants(Enum): # pylint: disable=c0103 FROM intersected_areas LEFT JOIN alagamentos ON ST_CONTAINS(intersected_areas.geometry, alagamentos.geometry) + AND alagamentos.last_update = 1 + AND CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE) -- seleciona ocorrencias que iniciaram nos últimos 15min + -- AND (CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE) + -- OR status_ocorrencia = "Aberto") -- seleciona ocorrencias que iniciaram nos últimos 15min ou ainda não finalizaram WHERE intersected_areas.row_num = 1 GROUP BY id_h3, bairro ) @@ -79,7 +84,13 @@ class constants(Enum): # pylint: disable=c0103 SELECT id_h3, bairro, - tipo AS qnt_alagamentos, + -- tipo AS qnt_alagamentos, + CASE + WHEN tipo = 3 THEN "Alagamento" + WHEN tipo = 2 THEN "Bolsão d'água" + WHEN tipo = 1 THEN "Lâmina d'água" + ELSE "sem alagamento" + END AS qnt_alagamentos, CASE WHEN tipo = 3 THEN "extremamente crítico" --"Alagamento" WHEN tipo = 2 THEN "crítico" -- "Bolsão d'água" @@ -93,6 +104,7 @@ class constants(Enum): # pylint: disable=c0103 ELSE '#ffffff' END AS color FROM final_table + -- order by qnt_alagamentos """, "query_update": """ SELECT date_trunc(current_datetime("America/Sao_Paulo"), minute) AS last_update @@ -111,17 +123,15 @@ class constants(Enum): # pylint: disable=c0103 WHEN id_pop = "33" THEN 1 -- "Lâmina d'água" END AS tipo, ST_GEOGPOINT(CAST(longitude AS FLOAT64), - CAST(latitude AS FLOAT64)) AS geometry + CAST(latitude AS FLOAT64)) AS geometry, + status AS status_ocorrencia, + data_inicio, + data_fim, + row_number() OVER (PARTITION BY id_evento ORDER BY created_at DESC) AS last_update -- FROM `rj-cor.adm_cor_comando_staging.ocorrencias` FROM `rj-cor.adm_cor_comando_staging.ocorrencias_nova_api` WHERE id_pop IN ("5", "6", "31", "32", "33") - AND CAST(data_particao AS DATE) >= CAST(DATE_TRUNC(TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 24 hour), day) AS date) - AND CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 24 HOUR) - -- AND data_particao >= CAST(DATE_TRUNC(TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 24 hour), day) AS STRING) - -- AND (CAST(data_fim AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 24 hour) - -- OR data_fim IS NULL - -- ) - AND status = "Aberto" + AND CAST(data_particao AS DATE) >= CAST(DATE_TRUNC(TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 2 HOUR), day) AS date) ), intersected_areas AS ( @@ -152,6 +162,12 @@ class constants(Enum): # pylint: disable=c0103 FROM intersected_areas LEFT JOIN alagamentos ON ST_CONTAINS(intersected_areas.geometry, alagamentos.geometry) + AND alagamentos.last_update = 1 + AND (CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 2 HOUR) + OR status_ocorrencia = "Aberto") -- seleciona ocorrencias que iniciaram nas últimas 2h ou ainda não finalizaram + -- AND (CAST(data_fim AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 24 hour) + -- OR data_fim IS NULL # data_fim não está confiável, temos status fechados sem esse campo + -- ) WHERE intersected_areas.row_num = 1 GROUP BY id_h3, bairro ) @@ -178,6 +194,7 @@ class constants(Enum): # pylint: disable=c0103 ELSE '#ffffff' END AS color FROM final_table + # order by qnt_alagamentos """, "query_update": """ SELECT date_trunc(current_datetime("America/Sao_Paulo"), minute) AS last_update From 820953f321e25ee625abba6360ab05d0ebd639b9 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Mon, 25 Mar 2024 16:35:17 -0300 Subject: [PATCH 4/9] query dashboard --- pipelines/rj_cor/comando/eventos/constants.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pipelines/rj_cor/comando/eventos/constants.py b/pipelines/rj_cor/comando/eventos/constants.py index 7ecd605e0..c729ae0eb 100644 --- a/pipelines/rj_cor/comando/eventos/constants.py +++ b/pipelines/rj_cor/comando/eventos/constants.py @@ -41,7 +41,6 @@ class constants(Enum): # pylint: disable=c0103 FROM `rj-cor.adm_cor_comando_staging.ocorrencias_nova_api` WHERE id_pop IN ("5", "6", "31", "32", "33") AND CAST(data_particao AS DATE) >= CAST(DATE_TRUNC(TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE), day) AS date) - AND CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE) -- AND data_fim IS NULL # data_fim não está confiável, temos status fechados sem esse campo ), @@ -74,9 +73,9 @@ class constants(Enum): # pylint: disable=c0103 LEFT JOIN alagamentos ON ST_CONTAINS(intersected_areas.geometry, alagamentos.geometry) AND alagamentos.last_update = 1 - AND CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE) -- seleciona ocorrencias que iniciaram nos últimos 15min - -- AND (CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE) - -- OR status_ocorrencia = "Aberto") -- seleciona ocorrencias que iniciaram nos últimos 15min ou ainda não finalizaram + -- AND CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE) -- seleciona ocorrencias que iniciaram nos últimos 15min + AND (CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE) + OR status_ocorrencia = "Aberto") -- seleciona ocorrencias que iniciaram nos últimos 15min ou ainda não finalizaram WHERE intersected_areas.row_num = 1 GROUP BY id_h3, bairro ) @@ -102,7 +101,7 @@ class constants(Enum): # pylint: disable=c0103 WHEN tipo = 2 THEN '#A9CBE8'--'#BFA230' WHEN tipo = 3 THEN '#125999'--'#E0701F' ELSE '#ffffff' - END AS color + END AS color, FROM final_table -- order by qnt_alagamentos """, From aa5074b1ff2bdd752e03b94f18fc855374c4c870 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Mon, 25 Mar 2024 17:08:44 -0300 Subject: [PATCH 5/9] query dashboard --- pipelines/rj_cor/comando/eventos/constants.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_cor/comando/eventos/constants.py b/pipelines/rj_cor/comando/eventos/constants.py index c729ae0eb..7aff4d51d 100644 --- a/pipelines/rj_cor/comando/eventos/constants.py +++ b/pipelines/rj_cor/comando/eventos/constants.py @@ -75,7 +75,8 @@ class constants(Enum): # pylint: disable=c0103 AND alagamentos.last_update = 1 -- AND CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE) -- seleciona ocorrencias que iniciaram nos últimos 15min AND (CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE) - OR status_ocorrencia = "Aberto") -- seleciona ocorrencias que iniciaram nos últimos 15min ou ainda não finalizaram + OR CAST(data_fim AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE) + OR status_ocorrencia = "Aberto") -- seleciona ocorrencias que iniciaram ou finalizaram nos últimos 15min ou ainda não finalizaram WHERE intersected_areas.row_num = 1 GROUP BY id_h3, bairro ) @@ -163,7 +164,8 @@ class constants(Enum): # pylint: disable=c0103 ON ST_CONTAINS(intersected_areas.geometry, alagamentos.geometry) AND alagamentos.last_update = 1 AND (CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 2 HOUR) - OR status_ocorrencia = "Aberto") -- seleciona ocorrencias que iniciaram nas últimas 2h ou ainda não finalizaram + OR CAST(data_fim AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 2 HOUR) + OR status_ocorrencia = "Aberto") -- seleciona ocorrencias que iniciaram ou finalizaram nas últimas 2h ou ainda não finalizaram -- AND (CAST(data_fim AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 24 hour) -- OR data_fim IS NULL # data_fim não está confiável, temos status fechados sem esse campo -- ) From 526529f954035141ebf33a034c9d4b8ffe35b637 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Tue, 26 Mar 2024 10:31:26 -0300 Subject: [PATCH 6/9] overwrite csv when saving on gcp --- pipelines/rj_cor/comando/eventos/tasks.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pipelines/rj_cor/comando/eventos/tasks.py b/pipelines/rj_cor/comando/eventos/tasks.py index dde0df896..999159225 100644 --- a/pipelines/rj_cor/comando/eventos/tasks.py +++ b/pipelines/rj_cor/comando/eventos/tasks.py @@ -319,14 +319,12 @@ def save_data(dataframe: pd.DataFrame) -> Union[str, Path]: partition_column = "data_inicio" dataframe, partitions = parse_date_columns(dataframe, partition_column) - current_time = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M") to_partitions( data=dataframe, partition_columns=partitions, savepath=prepath, data_type="csv", - suffix=current_time, ) log(f"[DEBUG] Files saved on {prepath}") return prepath From d27c332e17a03af41fa07678978b379062be1e17 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Tue, 26 Mar 2024 15:48:44 -0300 Subject: [PATCH 7/9] run flow only if it has new data --- pipelines/rj_cor/comando/eventos/constants.py | 2 +- pipelines/rj_cor/comando/eventos/flows.py | 35 ++++---- pipelines/rj_cor/comando/eventos/tasks.py | 85 ++++++++++++++----- pipelines/rj_cor/utils.py | 59 +++++++++++++ 4 files changed, 141 insertions(+), 40 deletions(-) diff --git a/pipelines/rj_cor/comando/eventos/constants.py b/pipelines/rj_cor/comando/eventos/constants.py index 7aff4d51d..e1b8a0635 100644 --- a/pipelines/rj_cor/comando/eventos/constants.py +++ b/pipelines/rj_cor/comando/eventos/constants.py @@ -15,7 +15,7 @@ class constants(Enum): # pylint: disable=c0103 PATH_BASE_ENDERECOS = "/tmp/base_enderecos.csv" DATASET_ID = "adm_cor_comando" TABLE_ID_EVENTOS = "ocorrencias_nova_api" - REDIS_NAME = "cor_api_last_days" + REDIS_NAME = "last_update" TABLE_ID_ATIVIDADES_EVENTOS = "ocorrencias_orgaos_responsaveis_nova_api" # TABLE_ID_POPS = "procedimento_operacional_padrao" # TABLE_ID_ATIVIDADES_POPS = "procedimento_operacional_padrao_orgaos_responsaveis" diff --git a/pipelines/rj_cor/comando/eventos/flows.py b/pipelines/rj_cor/comando/eventos/flows.py index 0f2b70f73..9a5b4e15b 100644 --- a/pipelines/rj_cor/comando/eventos/flows.py +++ b/pipelines/rj_cor/comando/eventos/flows.py @@ -22,7 +22,9 @@ download_data_atividades, get_date_interval, get_redis_df, + get_redis_max_date, save_data, + save_redis_max_date, treat_data_ocorrencias, treat_data_atividades, ) @@ -84,35 +86,18 @@ dfr = download_data_ocorrencias(first_date, last_date) - dfr_redis = get_redis_df( + redis_max_date = get_redis_max_date( dataset_id=dataset_id, table_id=table_id, name=redis_name, mode=redis_mode, ) - dfr_treated, dfr_redis = treat_data_ocorrencias( + dfr_treated, redis_max_date = treat_data_ocorrencias( dfr, - dfr_redis=dfr_redis, - columns=["id_evento", "data_inicio", "status"], + redis_max_date, ) - # dfr = compare_actual_df_with_redis_df( - # dfr, - # dfr_redis=dfr_redis, - # columns=columns, - - # ) - - # save_redis_df( - # dfr_redis, - # dataset_id, - # table_id, - # redis_name, - # keep_n_days=1, - # mode = mode, - # ) - path = save_data(dfr_treated) task_upload = create_table_and_upload_to_gcs( data_path=path, @@ -123,6 +108,16 @@ wait=path, ) + save_redis_max_date( + dataset_id=dataset_id, + table_id=table_id, + name=redis_name, + mode=redis_mode, + redis_max_date=redis_max_date, + ) + + save_redis_max_date.set_upstream(task_upload) + # Warning: this task won't execute if we provide a date interval # on parameters. The reason this happens is for if we want to # perform backfills, it won't mess with the Redis interval. diff --git a/pipelines/rj_cor/comando/eventos/tasks.py b/pipelines/rj_cor/comando/eventos/tasks.py index 999159225..b6bd1260b 100644 --- a/pipelines/rj_cor/comando/eventos/tasks.py +++ b/pipelines/rj_cor/comando/eventos/tasks.py @@ -1,6 +1,11 @@ # -*- coding: utf-8 -*- # pylint: disable=R0914,W0613,W0102,W0613,R0912,R0915,E1136,E1137,W0702 # flake8: noqa: E722 +# TODO: colocar id_pops novos +# TODO: gerar alerta quando tiver id_pop novo +# TODO: apagar histórico da nova api para ter o id_pop novo +# TODO: criar tabela dim do id_pop novo +# TODO: salvar no redis o máximo entre as colunas de data_inicio e data_fim, seguir flow só se tiver novidades em alguma dessas colunas """ Tasks for comando """ @@ -21,18 +26,20 @@ from prefect.engine.state import Skipped # from prefect.triggers import all_successful - +# url_eventos = "http://aplicativo.cocr.com.br/comando/ocorrencias_api_nova" +from pipelines.rj_cor.utils import compare_actual_df_with_redis_df from pipelines.rj_cor.comando.eventos.utils import ( - build_redis_key, - compare_actual_df_with_redis_df, - get_redis_output, # TODO: atualizar o do utils.utils + # build_redis_key, format_date, treat_wrong_id_pop, ) from pipelines.utils.utils import ( + build_redis_key, + get_redis_output, get_vault_secret, log, parse_date_columns, + save_str_on_redis, to_partitions, ) @@ -49,7 +56,7 @@ def get_date_interval(first_date, last_date) -> Tuple[dict, str]: first_date, last_date = format_date(first_date, last_date) else: last_date = pendulum.today(tz="America/Sao_Paulo").date() - first_date = last_date.subtract(days=1) # atenção mudar para 3 + first_date = last_date.subtract(days=3) first_date, last_date = format_date( first_date.strftime("%Y-%m-%d"), last_date.strftime("%Y-%m-%d") ) @@ -70,7 +77,8 @@ def get_redis_df( redis_key = build_redis_key(dataset_id, table_id, name, mode) log(f"Acessing redis_key: {redis_key}") - dfr_redis = get_redis_output(redis_key, is_df=True) + dfr_redis = get_redis_output(redis_key) + # dfr_redis = get_redis_output(redis_key, is_df=True) log(f"Redis output: {dfr_redis}") # if len(dfr_redis) == 0: @@ -90,6 +98,48 @@ def get_redis_df( return dfr_redis +@task +def get_redis_max_date( + dataset_id: str, + table_id: str, + name: str = None, + mode: str = "prod", +) -> str: + """ + Acess redis to get the last saved date and compare to actual df. + """ + redis_key = build_redis_key(dataset_id, table_id, name, mode) + log(f"Acessing redis_key: {redis_key}") + + redis_max_date = get_redis_output(redis_key) + + try: + redis_max_date = redis_max_date["max_date"] + except KeyError: + redis_max_date = "1990-01-01" + log("Creating a fake date because this key doesn't exist.") + + log(f"Redis output: {redis_max_date}") + return redis_max_date + + +@task +def save_redis_max_date( + dataset_id: str, + table_id: str, + name: str = None, + mode: str = "prod", + redis_max_date: str = None, +) -> str: + """ + Acess redis to save last date. + """ + redis_key = build_redis_key(dataset_id, table_id, name, mode) + log(f"Acessing redis_key: {redis_key}") + + save_str_on_redis(redis_key, "max_date", redis_max_date) + + @task( nout=1, max_retries=3, @@ -104,6 +154,7 @@ def download_data_ocorrencias(first_date, last_date, wait=None) -> pd.DataFrame: url_secret = get_vault_secret("comando")["data"] url_eventos = url_secret["endpoint_eventos"] + log(f"\n\nDownloading data from {first_date} to {last_date} (not included)") dfr = pd.read_json(f"{url_eventos}/?data_i={first_date}&data_f={last_date}") return dfr @@ -112,8 +163,7 @@ def download_data_ocorrencias(first_date, last_date, wait=None) -> pd.DataFrame: @task(nout=2) def treat_data_ocorrencias( dfr: pd.DataFrame, - dfr_redis: pd.DataFrame, - columns: list, + redis_max_date: str, ) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Rename cols and normalize data. @@ -133,21 +183,18 @@ def treat_data_ocorrencias( dfr["id_evento"] = dfr["id_evento"].astype(float).astype(int).astype(str) - log(f"Dataframe before comparing with last data saved on redis \n{dfr.head()}") - columns = ["id_evento", "data_inicio", "status"] - dfr, dfr_redis = compare_actual_df_with_redis_df( - dfr, - dfr_redis, - columns, - ) - log(f"Dataframe after comparing with last data saved on redis {dfr.head()}") + max_date = dfr[["data_inicio", "data_fim"]].max().max() - # If df is empty stop flow - if dfr.shape[0] == 0: + log(f"Last API data was {max_date} and last redis uptade was {redis_max_date}") + + if max_date <= redis_max_date: skip_text = "No new data available on API" print(skip_text) raise ENDRUN(state=Skipped(skip_text)) + # Get new max_date to save on redis + redis_max_date = max_date + dfr["tipo"] = dfr["tipo"].replace( { "Primária": "Primario", @@ -208,7 +255,7 @@ def treat_data_ocorrencias( "%Y-%m-%d %H:%M:%S" ) - return dfr.drop_duplicates(), dfr_redis + return dfr.drop_duplicates(), redis_max_date @task( diff --git a/pipelines/rj_cor/utils.py b/pipelines/rj_cor/utils.py index 98ff29d3c..aaaa2bcfe 100644 --- a/pipelines/rj_cor/utils.py +++ b/pipelines/rj_cor/utils.py @@ -2,6 +2,13 @@ """ Utils for rj-cor """ +import json +import pandas as pd +from pipelines.utils.utils import ( + get_redis_client, + log, + treat_redis_output, +) def build_redis_key(dataset_id: str, table_id: str, name: str, mode: str = "prod"): @@ -10,3 +17,55 @@ def build_redis_key(dataset_id: str, table_id: str, name: str, mode: str = "prod """ key = mode + "." + dataset_id + "." + table_id + "." + name return key + + +def get_redis_output(redis_key, is_df: bool = False): + """ + Get Redis output. Use get to obtain a df from redis or hgetall if is a key value pair. + """ + redis_client = get_redis_client() # (host="127.0.0.1") + + if is_df: + json_data = redis_client.get(redis_key) + log(f"[DEGUB] json_data {json_data}") + if json_data: + # If data is found, parse the JSON string back to a Python object (dictionary) + data_dict = json.loads(json_data) + # Convert the dictionary back to a DataFrame + return pd.DataFrame(data_dict) + + return pd.DataFrame() + + output = redis_client.hgetall(redis_key) + if len(output) > 0: + output = treat_redis_output(output) + return output + + +def compare_actual_df_with_redis_df( + dfr: pd.DataFrame, + dfr_redis: pd.DataFrame, + columns: list, +) -> pd.DataFrame: + """ + Compare df from redis to actual df and return only the rows from actual df + that are not already saved on redis. + """ + for col in columns: + if col not in dfr_redis.columns: + dfr_redis[col] = None + dfr_redis[col] = dfr_redis[col].astype(dfr[col].dtypes) + log(f"\nEnded conversion types from dfr to dfr_redis: \n{dfr_redis.dtypes}") + + dfr_diff = ( + pd.merge(dfr, dfr_redis, how="left", on=columns, indicator=True) + .query('_merge == "left_only"') + .drop("_merge", axis=1) + ) + log( + f"\nDf resulted from the difference between dft_redis and dfr: \n{dfr_diff.head()}" + ) + + updated_dfr_redis = pd.concat([dfr_redis, dfr_diff[columns]]) + + return dfr_diff, updated_dfr_redis From 876494b8b45ab6aa9caac6d838ab4b2559b8c2fa Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Wed, 27 Mar 2024 14:59:23 -0300 Subject: [PATCH 8/9] bugfix max date --- pipelines/rj_cor/comando/eventos/tasks.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pipelines/rj_cor/comando/eventos/tasks.py b/pipelines/rj_cor/comando/eventos/tasks.py index b6bd1260b..d6d2c56e6 100644 --- a/pipelines/rj_cor/comando/eventos/tasks.py +++ b/pipelines/rj_cor/comando/eventos/tasks.py @@ -181,10 +181,14 @@ def treat_data_ocorrencias( } ) + log(f"First row: \n{dfr.iloc[0]}") + dfr["id_evento"] = dfr["id_evento"].astype(float).astype(int).astype(str) + for col in ["data_inicio", "data_fim"]: + dfr[col] = pd.to_datetime(dfr[col], errors="coerce") max_date = dfr[["data_inicio", "data_fim"]].max().max() - + max_date = max_date.strftime("%Y-%m-%d %H:%M:%S") log(f"Last API data was {max_date} and last redis uptade was {redis_max_date}") if max_date <= redis_max_date: @@ -241,9 +245,6 @@ def treat_data_ocorrencias( # Treat id_pop col dfr["id_pop"] = dfr["id_pop"].astype(float).astype(int) - for col in ["data_inicio", "data_fim"]: - dfr[col] = pd.to_datetime(dfr[col], errors="coerce") - for col in ["data_inicio", "data_fim"]: dfr[col] = dfr[col].dt.strftime("%Y-%m-%d %H:%M:%S") From e455dff4d128a947b7f5ffcd8f2596ac5582a77d Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Wed, 27 Mar 2024 15:54:16 -0300 Subject: [PATCH 9/9] changing save on redis to see if bug disappears --- pipelines/rj_cor/comando/eventos/flows.py | 3 ++- pipelines/rj_cor/comando/eventos/tasks.py | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pipelines/rj_cor/comando/eventos/flows.py b/pipelines/rj_cor/comando/eventos/flows.py index 9a5b4e15b..c50fffeee 100644 --- a/pipelines/rj_cor/comando/eventos/flows.py +++ b/pipelines/rj_cor/comando/eventos/flows.py @@ -114,9 +114,10 @@ name=redis_name, mode=redis_mode, redis_max_date=redis_max_date, + wait=task_upload, ) - save_redis_max_date.set_upstream(task_upload) + # save_redis_max_date.set_upstream(task_upload) # Warning: this task won't execute if we provide a date interval # on parameters. The reason this happens is for if we want to diff --git a/pipelines/rj_cor/comando/eventos/tasks.py b/pipelines/rj_cor/comando/eventos/tasks.py index d6d2c56e6..6efd18fec 100644 --- a/pipelines/rj_cor/comando/eventos/tasks.py +++ b/pipelines/rj_cor/comando/eventos/tasks.py @@ -124,13 +124,14 @@ def get_redis_max_date( @task -def save_redis_max_date( +def save_redis_max_date( # pylint: disable=too-many-arguments dataset_id: str, table_id: str, name: str = None, mode: str = "prod", redis_max_date: str = None, -) -> str: + wait=None, # pylint: disable=unused-argument +): """ Acess redis to save last date. """