From 6e3457a676042476ab3176937d6ee5f9b4e62bd8 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Tue, 2 Apr 2024 09:51:53 -0300 Subject: [PATCH 01/14] changing atividades and pop --- pipelines/rj_cor/comando/eventos/constants.py | 2 +- pipelines/rj_cor/comando/eventos/flows.py | 338 +++++++----------- pipelines/rj_cor/comando/eventos/tasks.py | 79 ++-- 3 files changed, 192 insertions(+), 227 deletions(-) diff --git a/pipelines/rj_cor/comando/eventos/constants.py b/pipelines/rj_cor/comando/eventos/constants.py index e1b8a0635..6fcf8d9f0 100644 --- a/pipelines/rj_cor/comando/eventos/constants.py +++ b/pipelines/rj_cor/comando/eventos/constants.py @@ -17,7 +17,7 @@ class constants(Enum): # pylint: disable=c0103 TABLE_ID_EVENTOS = "ocorrencias_nova_api" REDIS_NAME = "last_update" TABLE_ID_ATIVIDADES_EVENTOS = "ocorrencias_orgaos_responsaveis_nova_api" - # TABLE_ID_POPS = "procedimento_operacional_padrao" + TABLE_ID_POPS = "procedimento_operacional_padrao_nova_api" # TABLE_ID_ATIVIDADES_POPS = "procedimento_operacional_padrao_orgaos_responsaveis" RAIN_DASHBOARD_FLOW_SCHEDULE_PARAMETERS = { "redis_data_key": "data_alagamento_recente_comando", diff --git a/pipelines/rj_cor/comando/eventos/flows.py b/pipelines/rj_cor/comando/eventos/flows.py index c50fffeee..37951d855 100644 --- a/pipelines/rj_cor/comando/eventos/flows.py +++ b/pipelines/rj_cor/comando/eventos/flows.py @@ -16,14 +16,16 @@ from pipelines.rj_cor.comando.eventos.constants import ( constants as comando_constants, ) -from pipelines.rj_cor.comando.eventos.schedules import every_hour # , every_month +from pipelines.rj_cor.comando.eventos.schedules import every_hour, every_month from pipelines.rj_cor.comando.eventos.tasks import ( download_data_ocorrencias, download_data_atividades, + download_data_pops, get_date_interval, - get_redis_df, + # get_redis_df, get_redis_max_date, save_data, + save_no_partition, save_redis_max_date, treat_data_ocorrencias, treat_data_atividades, @@ -293,19 +295,33 @@ dfr = download_data_atividades(first_date, last_date) - dfr_redis = get_redis_df( + redis_max_date = get_redis_max_date( dataset_id=dataset_id, table_id=table_id, name=redis_name, mode=redis_mode, ) - dfr_treated, dfr_redis = treat_data_atividades( + dfr_treated, redis_max_date = treat_data_atividades( dfr, - dfr_redis=dfr_redis, - columns=["id_evento", "data_inicio", "sigla", "descricao", "status"], + redis_max_date, ) + path = save_data(dfr_treated) + + # dfr_redis = get_redis_df( + # dataset_id=dataset_id, + # table_id=table_id, + # name=redis_name, + # mode=redis_mode, + # ) + + # dfr_treated, dfr_redis = treat_data_atividades( + # dfr, + # dfr_redis=dfr_redis, + # columns=["id_evento", "data_inicio", "sigla", "descricao", "status"], + # ) + # dfr = compare_actual_df_with_redis_df( # dfr, # dfr_redis=dfr_redis, @@ -357,12 +373,12 @@ project_name=constants.PREFECT_DEFAULT_PROJECT.value, parameters={ "dataset_id": dataset_id, - "table_id": "ocorrencias_orgaos_responsaveis", # change to table_id + "table_id": table_id, "mode": materialization_mode, "materialize_to_datario": materialize_to_datario, }, labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.ocorrencias_orgaos_responsaveis", + run_name=f"Materialize {dataset_id}.{table_id}", ) materialization_flow.set_upstream(task_upload) @@ -416,198 +432,114 @@ rj_cor_comando_atividades_evento_flow.schedule = every_hour -# with Flow( -# "COR: Comando - POPs e Atividades dos POPs", -# code_owners=[ -# "paty", -# ], -# ) as rj_cor_comando_pops_flow: -# # Dump mode -# dump_mode = Parameter("dump_mode", default="overwrite", required=False) - -# # Materialization parameters -# materialize_after_dump = Parameter( -# "materialize_after_dump", default=False, required=False -# ) -# materialization_mode = Parameter( -# "materialization_mode", default="prod", required=False -# ) -# materialize_to_datario = Parameter( -# "materialize_to_datario", default=False, required=False -# ) - -# # Dump to GCS after? Should only dump to GCS if materializing to datario -# dump_to_gcs = Parameter("dump_to_gcs", default=False, required=False) -# maximum_bytes_processed = Parameter( -# "maximum_bytes_processed", -# required=False, -# default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, -# ) - -# dataset_id = Parameter( -# "dataset_id", default=comando_constants.DATASET_ID.value, required=False -# ) -# table_id_pops = Parameter( -# "table_id_pops", default=comando_constants.TABLE_ID_POPS.value, required=False -# ) -# table_id_atividades_pops = Parameter( -# "table_id_atividades_pops", -# default=comando_constants.TABLE_ID_ATIVIDADES_POPS.value, -# required=False, -# ) - -# pops = get_pops() -# redis_pops = get_on_redis(dataset_id, table_id_atividades_pops, mode="dev") -# atividades_pops, update_pops_redis = get_atividades_pops( -# pops=pops, redis_pops=redis_pops -# ) -# has_update = not_none(update_pops_redis) - -# path_pops = save_no_partition(dataframe=pops) - -# task_upload_pops = create_table_and_upload_to_gcs( -# data_path=path_pops, -# dataset_id=dataset_id, -# table_id=table_id_pops, -# biglake_table=False, -# dump_mode=dump_mode, -# ) - -# with case(has_update, True): -# path_atividades_pops = save_no_partition( -# dataframe=atividades_pops, append=False -# ) - -# task_upload_atividades_pops = create_table_and_upload_to_gcs( -# data_path=path_atividades_pops, -# dataset_id=dataset_id, -# table_id=table_id_atividades_pops, -# biglake_table=False, -# dump_mode="overwrite", -# ) - -# save_on_redis( -# dataset_id, -# table_id_atividades_pops, -# "dev", -# update_pops_redis, -# wait=task_upload_atividades_pops, -# ) - -# with case(materialize_after_dump, True): -# # Trigger DBT flow run -# current_flow_labels = get_current_flow_labels() - -# materialization_pops_flow = create_flow_run( -# flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, -# project_name=constants.PREFECT_DEFAULT_PROJECT.value, -# parameters={ -# "dataset_id": dataset_id, -# "table_id": table_id_pops, -# "mode": materialization_mode, -# "materialize_to_datario": materialize_to_datario, -# }, -# labels=current_flow_labels, -# run_name=f"Materialize {dataset_id}.{table_id_pops}", -# ) -# materialization_pops_flow.set_upstream(task_upload_pops) - -# materialization_atividades_pops_flow = create_flow_run( -# flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, -# project_name=constants.PREFECT_DEFAULT_PROJECT.value, -# parameters={ -# "dataset_id": dataset_id, -# "table_id": table_id_atividades_pops, -# "mode": materialization_mode, -# "materialize_to_datario": materialize_to_datario, -# }, -# labels=current_flow_labels, -# run_name=f"Materialize {dataset_id}.{table_id_atividades_pops}", -# ) -# materialization_atividades_pops_flow.set_upstream(task_upload_atividades_pops) - -# wait_for_pops_materialization = wait_for_flow_run( -# materialization_pops_flow, -# stream_states=True, -# stream_logs=True, -# raise_final_state=True, -# ) -# wait_for_pops_materialization.max_retries = ( -# dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value -# ) -# wait_for_pops_materialization.retry_delay = timedelta( -# seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value -# ) - -# wait_for_atividades_pops_materialization = wait_for_flow_run( -# materialization_atividades_pops_flow, -# stream_states=True, -# stream_logs=True, -# raise_final_state=True, -# ) -# wait_for_atividades_pops_materialization.max_retries = ( -# dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value -# ) -# wait_for_atividades_pops_materialization.retry_delay = timedelta( -# seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value -# ) - -# with case(dump_to_gcs, True): -# # Trigger Dump to GCS flow run with project id as datario -# dump_pops_to_gcs_flow = create_flow_run( -# flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, -# project_name=constants.PREFECT_DEFAULT_PROJECT.value, -# parameters={ -# "project_id": "datario", -# "dataset_id": dataset_id, -# "table_id": table_id_pops, -# "maximum_bytes_processed": maximum_bytes_processed, -# }, -# labels=[ -# "datario", -# ], -# run_name=f"Dump to GCS {dataset_id}.{table_id_pops}", -# ) -# dump_pops_to_gcs_flow.set_upstream(wait_for_pops_materialization) - -# dump_atividades_pops_to_gcs_flow = create_flow_run( -# flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, -# project_name=constants.PREFECT_DEFAULT_PROJECT.value, -# parameters={ -# "project_id": "datario", -# "dataset_id": dataset_id, -# "table_id": table_id_atividades_pops, -# "maximum_bytes_processed": maximum_bytes_processed, -# }, -# labels=[ -# "datario", -# ], -# run_name=f"Dump to GCS {dataset_id}.{table_id_atividades_pops}", -# ) -# dump_atividades_pops_to_gcs_flow.set_upstream( -# wait_for_atividades_pops_materialization -# ) - -# wait_for_dump_pops_to_gcs = wait_for_flow_run( -# dump_pops_to_gcs_flow, -# stream_states=True, -# stream_logs=True, -# raise_final_state=True, -# ) - -# wait_for_dump_atividades_pops_to_gcs = wait_for_flow_run( -# dump_atividades_pops_to_gcs_flow, -# stream_states=True, -# stream_logs=True, -# raise_final_state=True, -# ) - -# rj_cor_comando_pops_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -# rj_cor_comando_pops_flow.run_config = KubernetesRun( -# image=constants.DOCKER_IMAGE.value, -# labels=[ -# constants.RJ_COR_AGENT_LABEL.value, -# ], -# ) - -# rj_cor_comando_pops_flow.schedule = every_month +with Flow( + "COR: Comando - POPs e Atividades dos POPs", + code_owners=[ + "paty", + ], +) as rj_cor_comando_pops_flow: + # Dump mode + dump_mode = Parameter("dump_mode", default="overwrite", required=False) + + # Materialization parameters + materialize_after_dump = Parameter( + "materialize_after_dump", default=False, required=False + ) + materialization_mode = Parameter( + "materialization_mode", default="prod", required=False + ) + materialize_to_datario = Parameter( + "materialize_to_datario", default=False, required=False + ) + + # Dump to GCS after? Should only dump to GCS if materializing to datario + dump_to_gcs = Parameter("dump_to_gcs", default=False, required=False) + maximum_bytes_processed = Parameter( + "maximum_bytes_processed", + required=False, + default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, + ) + + dataset_id = Parameter( + "dataset_id", default=comando_constants.DATASET_ID.value, required=False + ) + table_id_pops = Parameter( + "table_id_pops", default=comando_constants.TABLE_ID_POPS.value, required=False + ) + + pops = download_data_pops() + path_pops = save_no_partition(dataframe=pops) + + task_upload_pops = create_table_and_upload_to_gcs( + data_path=path_pops, + dataset_id=dataset_id, + table_id=table_id_pops, + biglake_table=False, + dump_mode=dump_mode, + ) + + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + + materialization_pops_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id_pops, + "mode": materialization_mode, + "materialize_to_datario": materialize_to_datario, + }, + labels=current_flow_labels, + run_name=f"Materialize {dataset_id}.{table_id_pops}", + ) + materialization_pops_flow.set_upstream(task_upload_pops) + + wait_for_pops_materialization = wait_for_flow_run( + materialization_pops_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_pops_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_pops_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(dump_to_gcs, True): + # Trigger Dump to GCS flow run with project id as datario + dump_pops_to_gcs_flow = create_flow_run( + flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "project_id": "datario", + "dataset_id": dataset_id, + "table_id": table_id_pops, + "maximum_bytes_processed": maximum_bytes_processed, + }, + labels=[ + "datario", + ], + run_name=f"Dump to GCS {dataset_id}.{table_id_pops}", + ) + dump_pops_to_gcs_flow.set_upstream(wait_for_pops_materialization) + + wait_for_dump_pops_to_gcs = wait_for_flow_run( + dump_pops_to_gcs_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + +rj_cor_comando_pops_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +rj_cor_comando_pops_flow.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[ + constants.RJ_COR_AGENT_LABEL.value, + ], +) + +rj_cor_comando_pops_flow.schedule = every_month diff --git a/pipelines/rj_cor/comando/eventos/tasks.py b/pipelines/rj_cor/comando/eventos/tasks.py index 6efd18fec..bf98a58a8 100644 --- a/pipelines/rj_cor/comando/eventos/tasks.py +++ b/pipelines/rj_cor/comando/eventos/tasks.py @@ -5,7 +5,7 @@ # TODO: gerar alerta quando tiver id_pop novo # TODO: apagar histórico da nova api para ter o id_pop novo # TODO: criar tabela dim do id_pop novo -# TODO: salvar no redis o máximo entre as colunas de data_inicio e data_fim, seguir flow só se tiver novidades em alguma dessas colunas +# TODO: pipe atividades dos eventos """ Tasks for comando """ @@ -27,7 +27,7 @@ # from prefect.triggers import all_successful # url_eventos = "http://aplicativo.cocr.com.br/comando/ocorrencias_api_nova" -from pipelines.rj_cor.utils import compare_actual_df_with_redis_df +# from pipelines.rj_cor.utils import compare_actual_df_with_redis_df from pipelines.rj_cor.comando.eventos.utils import ( # build_redis_key, format_date, @@ -165,7 +165,7 @@ def download_data_ocorrencias(first_date, last_date, wait=None) -> pd.DataFrame: def treat_data_ocorrencias( dfr: pd.DataFrame, redis_max_date: str, -) -> Tuple[pd.DataFrame, pd.DataFrame]: +) -> Tuple[pd.DataFrame, str]: """ Rename cols and normalize data. """ @@ -280,34 +280,55 @@ def download_data_atividades(first_date, last_date, wait=None) -> pd.DataFrame: return dfr +# @task(nout=2) +# def treat_data_atividades( +# dfr: pd.DataFrame, +# dfr_redis: pd.DataFrame, +# columns: list, +# ) -> Tuple[pd.DataFrame, pd.DataFrame]: @task(nout=2) def treat_data_atividades( dfr: pd.DataFrame, - dfr_redis: pd.DataFrame, - columns: list, -) -> Tuple[pd.DataFrame, pd.DataFrame]: + redis_max_date: str, +) -> Tuple[pd.DataFrame, str]: """ Normalize data to be similiar to old API. """ print("Start treating data") + dfr["id_evento"] = dfr["id_evento"].astype(float).astype(int).astype(str) dfr.orgao = dfr.orgao.replace(["\r", "\n"], ["", ""], regex=True) print(f"Dataframe before comparing with last data saved on redis {dfr.head()}") + for col in ["data_inicio", "data_fim", "data_chegada"]: + dfr[col] = pd.to_datetime(dfr[col], errors="coerce") - dfr, dfr_redis = compare_actual_df_with_redis_df( - dfr, - dfr_redis, - columns, - ) - print(f"Dataframe after comparing with last data saved on redis {dfr.head()}") + max_date = dfr[["data_inicio", "data_fim", "data_chegada"]].max().max() + max_date = max_date.strftime("%Y-%m-%d %H:%M:%S") + + log(f"Last API data was {max_date} and last redis uptade was {redis_max_date}") - # If df is empty stop flow - if dfr.shape[0] == 0: + if max_date <= redis_max_date: skip_text = "No new data available on API" print(skip_text) raise ENDRUN(state=Skipped(skip_text)) + # Get new max_date to save on redis + redis_max_date = max_date + + # dfr, dfr_redis = compare_actual_df_with_redis_df( + # dfr, + # dfr_redis, + # columns, + # ) + # print(f"Dataframe after comparing with last data saved on redis {dfr.head()}") + + # # If df is empty stop flow + # if dfr.shape[0] == 0: + # skip_text = "No new data available on API" + # print(skip_text) + # raise ENDRUN(state=Skipped(skip_text)) + mandatory_cols = [ "id_evento", "sigla", @@ -334,14 +355,7 @@ def treat_data_atividades( print("\n\nDEBUG", dfr[categorical_cols]) for i in categorical_cols: dfr[i] = dfr[i].str.capitalize() - # dfr[i] = dfr[i].apply(unidecode) - - for col in ["data_inicio", "data_fim", "data_chegada"]: - dfr[col] = pd.to_datetime(dfr[col], errors="coerce") - - # TODO: Essa conversão é temporária - for col in ["data_inicio", "data_fim", "data_chegada"]: - dfr[col] = dfr[col].dt.tz_convert("America/Sao_Paulo") + dfr[i] = dfr[i].apply(unidecode) for col in ["data_inicio", "data_fim", "data_chegada"]: dfr[col] = dfr[col].dt.strftime("%Y-%m-%d %H:%M:%S") @@ -354,7 +368,7 @@ def treat_data_atividades( "%Y-%m-%d %H:%M:%S" ) - return dfr.drop_duplicates(), dfr_redis + return dfr.drop_duplicates(), redis_max_date @task @@ -379,6 +393,25 @@ def save_data(dataframe: pd.DataFrame) -> Union[str, Path]: return prepath +@task( + nout=1, + max_retries=3, + retry_delay=timedelta(seconds=60), +) +def download_data_pops() -> pd.DataFrame: + """ + Download data from POP's API + """ + + url_secret = get_vault_secret("comando")["data"] + url = url_secret["endpoint_pops"] + + log("\n\nDownloading POP's data") + dfr = pd.read_json(f"{url}") + + return dfr + + @task def save_no_partition(dataframe: pd.DataFrame, append: bool = False) -> str: """ From 1f21a0ebe71904340e735da2c9f87f623b189d4c Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Tue, 2 Apr 2024 11:20:39 -0300 Subject: [PATCH 02/14] try to find bug --- pipelines/rj_cor/comando/eventos/flows.py | 9 +++++++++ pipelines/rj_cor/comando/eventos/tasks.py | 1 + 2 files changed, 10 insertions(+) diff --git a/pipelines/rj_cor/comando/eventos/flows.py b/pipelines/rj_cor/comando/eventos/flows.py index 37951d855..fc14dc2e6 100644 --- a/pipelines/rj_cor/comando/eventos/flows.py +++ b/pipelines/rj_cor/comando/eventos/flows.py @@ -349,6 +349,15 @@ wait=path, ) + save_redis_max_date( + dataset_id=dataset_id, + table_id=table_id, + name=redis_name, + mode=redis_mode, + redis_max_date=redis_max_date, + wait=task_upload, + ) + # Warning: this task won't execute if we provide a date interval # on parameters. The reason this happens is for if we want to # perform backfills, it won't mess with the Redis interval. diff --git a/pipelines/rj_cor/comando/eventos/tasks.py b/pipelines/rj_cor/comando/eventos/tasks.py index bf98a58a8..9a540cb70 100644 --- a/pipelines/rj_cor/comando/eventos/tasks.py +++ b/pipelines/rj_cor/comando/eventos/tasks.py @@ -377,6 +377,7 @@ def save_data(dataframe: pd.DataFrame) -> Union[str, Path]: Save data on a csv file to be uploaded to GCP """ + log(f"Data that will be saved {dataframe.iloc[0]}") prepath = Path("/tmp/data/") prepath.mkdir(parents=True, exist_ok=True) From 9be6c23ebf3ed3f4ea37ebae83a69062b8864e3e Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Tue, 2 Apr 2024 11:45:14 -0300 Subject: [PATCH 03/14] removing duplicated save_data --- pipelines/rj_cor/comando/eventos/flows.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pipelines/rj_cor/comando/eventos/flows.py b/pipelines/rj_cor/comando/eventos/flows.py index fc14dc2e6..490bcc502 100644 --- a/pipelines/rj_cor/comando/eventos/flows.py +++ b/pipelines/rj_cor/comando/eventos/flows.py @@ -338,8 +338,6 @@ # mode = mode, # ) - path = save_data(dfr_treated) - task_upload = create_table_and_upload_to_gcs( data_path=path, dataset_id=dataset_id, From a07e0f15cae15089259468568f73227b31f7f7b2 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Tue, 2 Apr 2024 14:04:14 -0300 Subject: [PATCH 04/14] replacing partition_column by the min date of date columns --- pipelines/rj_cor/comando/eventos/tasks.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_cor/comando/eventos/tasks.py b/pipelines/rj_cor/comando/eventos/tasks.py index 9a540cb70..976fdda85 100644 --- a/pipelines/rj_cor/comando/eventos/tasks.py +++ b/pipelines/rj_cor/comando/eventos/tasks.py @@ -375,13 +375,26 @@ def treat_data_atividades( def save_data(dataframe: pd.DataFrame) -> Union[str, Path]: """ Save data on a csv file to be uploaded to GCP + PS: It's not mandatory to start an activity of an event. As a result we have some activities + without any start date, but with an end date. The main problem is that the partition column + is based on data_inicio that is null. To try to solve this, the partition_column will be + created based on the min date between data_inicio, data_fim and created_at. """ log(f"Data that will be saved {dataframe.iloc[0]}") prepath = Path("/tmp/data/") prepath.mkdir(parents=True, exist_ok=True) - partition_column = "data_inicio" + dataframe[["data_inicio", "data_fim", "created_at"]] = dataframe[ + ["data_inicio", "data_fim", "created_at"] + ].apply(pd.to_datetime) + dataframe["min_date"] = ( + dataframe[["data_inicio", "data_fim", "created_at"]] + .min(axis=1) + .dt.strftime("%Y-%m-%d %H:%M:%S") + ) + + partition_column = "min_date" dataframe, partitions = parse_date_columns(dataframe, partition_column) to_partitions( From dfb12cc18d73b94afa56bb84b404c948e6d45f10 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Tue, 2 Apr 2024 15:33:48 -0300 Subject: [PATCH 05/14] drop columns min_date --- pipelines/rj_cor/comando/eventos/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_cor/comando/eventos/tasks.py b/pipelines/rj_cor/comando/eventos/tasks.py index 976fdda85..c2a488a40 100644 --- a/pipelines/rj_cor/comando/eventos/tasks.py +++ b/pipelines/rj_cor/comando/eventos/tasks.py @@ -398,7 +398,7 @@ def save_data(dataframe: pd.DataFrame) -> Union[str, Path]: dataframe, partitions = parse_date_columns(dataframe, partition_column) to_partitions( - data=dataframe, + data=dataframe.drop(columns="min_date"), partition_columns=partitions, savepath=prepath, data_type="csv", From e0fd2c8269142d0f96f3601cd70ef76db13deb1d Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Wed, 3 Apr 2024 11:25:52 -0300 Subject: [PATCH 06/14] creating a new column that will be used to generate partitions --- pipelines/rj_cor/comando/eventos/tasks.py | 50 +++++++++++------------ 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/pipelines/rj_cor/comando/eventos/tasks.py b/pipelines/rj_cor/comando/eventos/tasks.py index c2a488a40..7c47e73cb 100644 --- a/pipelines/rj_cor/comando/eventos/tasks.py +++ b/pipelines/rj_cor/comando/eventos/tasks.py @@ -30,7 +30,6 @@ # from pipelines.rj_cor.utils import compare_actual_df_with_redis_df from pipelines.rj_cor.comando.eventos.utils import ( # build_redis_key, - format_date, treat_wrong_id_pop, ) from pipelines.utils.utils import ( @@ -47,19 +46,17 @@ @task def get_date_interval(first_date, last_date) -> Tuple[dict, str]: """ - If `first_date` and `last_date` are provided, format it to DD/MM/YYYY. Else, - get data from last 3 days. + If `first_date` and `last_date` are provided, convert it to pendulum + and add one day to `last_date`. Else, get data from last 3 days. first_date: str YYYY-MM-DD last_date: str YYYY-MM-DD """ if first_date and last_date: - first_date, last_date = format_date(first_date, last_date) + first_date = pendulum.from_format(first_date, "YYYY-MM-DD") + last_date = pendulum.from_format(last_date, "YYYY-MM-DD").add(days=1) else: - last_date = pendulum.today(tz="America/Sao_Paulo").date() - first_date = last_date.subtract(days=3) - first_date, last_date = format_date( - first_date.strftime("%Y-%m-%d"), last_date.strftime("%Y-%m-%d") - ) + last_date = pendulum.today(tz="America/Sao_Paulo").date().add(days=1) + first_date = last_date.subtract(days=4) return first_date, last_date @@ -148,16 +145,26 @@ def save_redis_max_date( # pylint: disable=too-many-arguments ) def download_data_ocorrencias(first_date, last_date, wait=None) -> pd.DataFrame: """ - Download data from API + Download data from API adding one day at a time so we can save + the date in a new column `data_particao` that will be used to + create the partitions when saving data. """ # auth_token = get_token() url_secret = get_vault_secret("comando")["data"] url_eventos = url_secret["endpoint_eventos"] - log(f"\n\nDownloading data from {first_date} to {last_date} (not included)") - dfr = pd.read_json(f"{url_eventos}/?data_i={first_date}&data_f={last_date}") + dfr = pd.DataFrame() + temp_date = first_date.add(days=1) + while temp_date <= last_date: + log(f"\n\nDownloading data from {first_date} to {temp_date} (not included)") + dfr_temp = pd.read_json( + f"{url_eventos}/?data_i={first_date.strftime('%d/%m/%Y')}&data_f={temp_date.strftime('%d/%m/%Y')}" + ) + dfr_temp["create_partition"] = first_date.strftime("%Y-%m-%d") + dfr = pd.concat([dfr, dfr_temp]) + first_date, temp_date = first_date.add(days=1), temp_date.add(days=1) return dfr @@ -376,29 +383,20 @@ def save_data(dataframe: pd.DataFrame) -> Union[str, Path]: """ Save data on a csv file to be uploaded to GCP PS: It's not mandatory to start an activity of an event. As a result we have some activities - without any start date, but with an end date. The main problem is that the partition column - is based on data_inicio that is null. To try to solve this, the partition_column will be - created based on the min date between data_inicio, data_fim and created_at. + without any start date, but with an end date. The main problem is that we can not create the + partition column from data_inicio, that's why we created the column create_partition when + requesting the API. """ log(f"Data that will be saved {dataframe.iloc[0]}") prepath = Path("/tmp/data/") prepath.mkdir(parents=True, exist_ok=True) - dataframe[["data_inicio", "data_fim", "created_at"]] = dataframe[ - ["data_inicio", "data_fim", "created_at"] - ].apply(pd.to_datetime) - dataframe["min_date"] = ( - dataframe[["data_inicio", "data_fim", "created_at"]] - .min(axis=1) - .dt.strftime("%Y-%m-%d %H:%M:%S") - ) - - partition_column = "min_date" + partition_column = "create_partition" dataframe, partitions = parse_date_columns(dataframe, partition_column) to_partitions( - data=dataframe.drop(columns="min_date"), + data=dataframe.drop(columns="create_partition"), partition_columns=partitions, savepath=prepath, data_type="csv", From 0bebfed271eba1728f4e3d74c4378b745721473b Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Wed, 3 Apr 2024 14:09:53 -0300 Subject: [PATCH 07/14] addin try/except when accessing api --- pipelines/rj_cor/comando/eventos/tasks.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pipelines/rj_cor/comando/eventos/tasks.py b/pipelines/rj_cor/comando/eventos/tasks.py index 7c47e73cb..6ed25eb81 100644 --- a/pipelines/rj_cor/comando/eventos/tasks.py +++ b/pipelines/rj_cor/comando/eventos/tasks.py @@ -5,7 +5,6 @@ # TODO: gerar alerta quando tiver id_pop novo # TODO: apagar histórico da nova api para ter o id_pop novo # TODO: criar tabela dim do id_pop novo -# TODO: pipe atividades dos eventos """ Tasks for comando """ @@ -16,6 +15,7 @@ from pathlib import Path from typing import Any, Union, Tuple from uuid import uuid4 +from urllib.error import HTTPError from unidecode import unidecode import pandas as pd @@ -159,11 +159,14 @@ def download_data_ocorrencias(first_date, last_date, wait=None) -> pd.DataFrame: while temp_date <= last_date: log(f"\n\nDownloading data from {first_date} to {temp_date} (not included)") - dfr_temp = pd.read_json( - f"{url_eventos}/?data_i={first_date.strftime('%d/%m/%Y')}&data_f={temp_date.strftime('%d/%m/%Y')}" - ) - dfr_temp["create_partition"] = first_date.strftime("%Y-%m-%d") - dfr = pd.concat([dfr, dfr_temp]) + try: + dfr_temp = pd.read_json( + f"{url_eventos}/?data_i={first_date.strftime('%d/%m/%Y')}&data_f={temp_date.strftime('%d/%m/%Y')}" + ) + dfr_temp["create_partition"] = first_date.strftime("%Y-%m-%d") + dfr = pd.concat([dfr, dfr_temp]) + except HTTPError as error: + print(f"Error downloading this data: {error}") first_date, temp_date = first_date.add(days=1), temp_date.add(days=1) return dfr From 1b8d1fc1a7c1fe85645d84fc62f9e9088dfe3c14 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Wed, 3 Apr 2024 15:12:02 -0300 Subject: [PATCH 08/14] bugfix --- pipelines/rj_cor/comando/eventos/tasks.py | 30 +++------ pipelines/rj_cor/comando/eventos/utils.py | 81 +++++++---------------- 2 files changed, 33 insertions(+), 78 deletions(-) diff --git a/pipelines/rj_cor/comando/eventos/tasks.py b/pipelines/rj_cor/comando/eventos/tasks.py index 6ed25eb81..46ea27db5 100644 --- a/pipelines/rj_cor/comando/eventos/tasks.py +++ b/pipelines/rj_cor/comando/eventos/tasks.py @@ -15,7 +15,6 @@ from pathlib import Path from typing import Any, Union, Tuple from uuid import uuid4 -from urllib.error import HTTPError from unidecode import unidecode import pandas as pd @@ -30,6 +29,7 @@ # from pipelines.rj_cor.utils import compare_actual_df_with_redis_df from pipelines.rj_cor.comando.eventos.utils import ( # build_redis_key, + download_data, treat_wrong_id_pop, ) from pipelines.utils.utils import ( @@ -152,22 +152,9 @@ def download_data_ocorrencias(first_date, last_date, wait=None) -> pd.DataFrame: # auth_token = get_token() url_secret = get_vault_secret("comando")["data"] - url_eventos = url_secret["endpoint_eventos"] - - dfr = pd.DataFrame() - temp_date = first_date.add(days=1) - - while temp_date <= last_date: - log(f"\n\nDownloading data from {first_date} to {temp_date} (not included)") - try: - dfr_temp = pd.read_json( - f"{url_eventos}/?data_i={first_date.strftime('%d/%m/%Y')}&data_f={temp_date.strftime('%d/%m/%Y')}" - ) - dfr_temp["create_partition"] = first_date.strftime("%Y-%m-%d") - dfr = pd.concat([dfr, dfr_temp]) - except HTTPError as error: - print(f"Error downloading this data: {error}") - first_date, temp_date = first_date.add(days=1), temp_date.add(days=1) + url = url_secret["endpoint_eventos"] + + dfr = download_data(first_date, last_date, url) return dfr @@ -231,6 +218,7 @@ def treat_data_ocorrencias( "longitude", "status", "tipo", + "create_partition", ] # Create cols if they don exist on new API for col in mandatory_cols: @@ -281,12 +269,9 @@ def download_data_atividades(first_date, last_date, wait=None) -> pd.DataFrame: """ url_secret = get_vault_secret("comando")["data"] - url_atividades_evento = url_secret["endpoint_atividades_evento"] - - dfr = pd.read_json( - f"{url_atividades_evento}/?data_i={first_date}&data_f={last_date}" - ) + url = url_secret["endpoint_atividades_evento"] + dfr = download_data(first_date, last_date, url) return dfr @@ -348,6 +333,7 @@ def treat_data_atividades( "data_fim", "descricao", "status", + "create_partition", ] # Create cols if they don exist on new API diff --git a/pipelines/rj_cor/comando/eventos/utils.py b/pipelines/rj_cor/comando/eventos/utils.py index 7bf879893..7e8d4b0db 100644 --- a/pipelines/rj_cor/comando/eventos/utils.py +++ b/pipelines/rj_cor/comando/eventos/utils.py @@ -3,17 +3,15 @@ General purpose functions for the comando project """ # pylint: disable=W0611 -import json +from urllib.error import HTTPError import requests from requests.adapters import HTTPAdapter, Retry -import pendulum import pandas as pd +import pendulum from pipelines.utils.utils import ( - get_redis_client, get_vault_secret, log, - treat_redis_output, ) @@ -29,58 +27,6 @@ def format_date(first_date, last_date): return first_date, last_date -def get_redis_output(redis_key, is_df: bool = False): - """ - Get Redis output. Use get to obtain a df from redis or hgetall if is a key value pair. - """ - redis_client = get_redis_client() # (host="127.0.0.1") - - if is_df: - json_data = redis_client.get(redis_key) - log(f"[DEGUB] json_data {json_data}") - if json_data: - # If data is found, parse the JSON string back to a Python object (dictionary) - data_dict = json.loads(json_data) - # Convert the dictionary back to a DataFrame - return pd.DataFrame(data_dict) - - return pd.DataFrame() - - output = redis_client.hgetall(redis_key) - if len(output) > 0: - output = treat_redis_output(output) - return output - - -def compare_actual_df_with_redis_df( - dfr: pd.DataFrame, - dfr_redis: pd.DataFrame, - columns: list, -) -> pd.DataFrame: - """ - Compare df from redis to actual df and return only the rows from actual df - that are not already saved on redis. - """ - for col in columns: - if col not in dfr_redis.columns: - dfr_redis[col] = None - dfr_redis[col] = dfr_redis[col].astype(dfr[col].dtypes) - log(f"\nEnded conversion types from dfr to dfr_redis: \n{dfr_redis.dtypes}") - - dfr_diff = ( - pd.merge(dfr, dfr_redis, how="left", on=columns, indicator=True) - .query('_merge == "left_only"') - .drop("_merge", axis=1) - ) - log( - f"\nDf resulted from the difference between dft_redis and dfr: \n{dfr_diff.head()}" - ) - - updated_dfr_redis = pd.concat([dfr_redis, dfr_diff[columns]]) - - return dfr_diff, updated_dfr_redis - - def treat_wrong_id_pop(dfr): """ Create id_pop based on pop_titulo column @@ -175,3 +121,26 @@ def get_url(url, parameters: dict = None, token: str = None): # pylint: disable log(f"This resulted in the following error: {exc}") response = {"response": None} return response + + +def download_data(first_date, last_date, url) -> pd.DataFrame: + """ + Download data from API adding one day at a time so we can save + the date in a new column `data_particao` that will be used to + create the partitions when saving data. + """ + dfr = pd.DataFrame() + temp_date = first_date.add(days=1) + fmt = "%d/%m/%Y" + while temp_date <= last_date: + log(f"\n\nDownloading data from {first_date} to {temp_date} (not included)") + try: + dfr_temp = pd.read_json( + f"{url}/?data_i={first_date.strftime(fmt)}&data_f={temp_date.strftime(fmt)}" + ) + dfr_temp["create_partition"] = first_date.strftime("%Y-%m-%d") + dfr = pd.concat([dfr, dfr_temp]) + except HTTPError as error: + print(f"Error downloading this data: {error}") + first_date, temp_date = first_date.add(days=1), temp_date.add(days=1) + return dfr From 129487034b7f49283fb497dcab97e734d41474b1 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Wed, 3 Apr 2024 16:53:17 -0300 Subject: [PATCH 09/14] removing id_pop from table --- pipelines/rj_cor/comando/eventos/flows.py | 257 ++++++++++------------ pipelines/rj_cor/comando/eventos/tasks.py | 28 +-- 2 files changed, 118 insertions(+), 167 deletions(-) diff --git a/pipelines/rj_cor/comando/eventos/flows.py b/pipelines/rj_cor/comando/eventos/flows.py index 490bcc502..5b30e3898 100644 --- a/pipelines/rj_cor/comando/eventos/flows.py +++ b/pipelines/rj_cor/comando/eventos/flows.py @@ -16,16 +16,16 @@ from pipelines.rj_cor.comando.eventos.constants import ( constants as comando_constants, ) -from pipelines.rj_cor.comando.eventos.schedules import every_hour, every_month +from pipelines.rj_cor.comando.eventos.schedules import every_hour # , every_month from pipelines.rj_cor.comando.eventos.tasks import ( download_data_ocorrencias, download_data_atividades, - download_data_pops, + # download_data_pops, get_date_interval, # get_redis_df, get_redis_max_date, save_data, - save_no_partition, + # save_no_partition, save_redis_max_date, treat_data_ocorrencias, treat_data_atividades, @@ -309,35 +309,6 @@ path = save_data(dfr_treated) - # dfr_redis = get_redis_df( - # dataset_id=dataset_id, - # table_id=table_id, - # name=redis_name, - # mode=redis_mode, - # ) - - # dfr_treated, dfr_redis = treat_data_atividades( - # dfr, - # dfr_redis=dfr_redis, - # columns=["id_evento", "data_inicio", "sigla", "descricao", "status"], - # ) - - # dfr = compare_actual_df_with_redis_df( - # dfr, - # dfr_redis=dfr_redis, - # columns=columns, - - # ) - - # save_redis_df( - # dfr_redis, - # dataset_id, - # table_id, - # redis_name, - # keep_n_days=1, - # mode = mode, - # ) - task_upload = create_table_and_upload_to_gcs( data_path=path, dataset_id=dataset_id, @@ -439,114 +410,114 @@ rj_cor_comando_atividades_evento_flow.schedule = every_hour -with Flow( - "COR: Comando - POPs e Atividades dos POPs", - code_owners=[ - "paty", - ], -) as rj_cor_comando_pops_flow: - # Dump mode - dump_mode = Parameter("dump_mode", default="overwrite", required=False) - - # Materialization parameters - materialize_after_dump = Parameter( - "materialize_after_dump", default=False, required=False - ) - materialization_mode = Parameter( - "materialization_mode", default="prod", required=False - ) - materialize_to_datario = Parameter( - "materialize_to_datario", default=False, required=False - ) - - # Dump to GCS after? Should only dump to GCS if materializing to datario - dump_to_gcs = Parameter("dump_to_gcs", default=False, required=False) - maximum_bytes_processed = Parameter( - "maximum_bytes_processed", - required=False, - default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, - ) - - dataset_id = Parameter( - "dataset_id", default=comando_constants.DATASET_ID.value, required=False - ) - table_id_pops = Parameter( - "table_id_pops", default=comando_constants.TABLE_ID_POPS.value, required=False - ) - - pops = download_data_pops() - path_pops = save_no_partition(dataframe=pops) - - task_upload_pops = create_table_and_upload_to_gcs( - data_path=path_pops, - dataset_id=dataset_id, - table_id=table_id_pops, - biglake_table=False, - dump_mode=dump_mode, - ) - - with case(materialize_after_dump, True): - # Trigger DBT flow run - current_flow_labels = get_current_flow_labels() - - materialization_pops_flow = create_flow_run( - flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "dataset_id": dataset_id, - "table_id": table_id_pops, - "mode": materialization_mode, - "materialize_to_datario": materialize_to_datario, - }, - labels=current_flow_labels, - run_name=f"Materialize {dataset_id}.{table_id_pops}", - ) - materialization_pops_flow.set_upstream(task_upload_pops) - - wait_for_pops_materialization = wait_for_flow_run( - materialization_pops_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - wait_for_pops_materialization.max_retries = ( - dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value - ) - wait_for_pops_materialization.retry_delay = timedelta( - seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value - ) - - with case(dump_to_gcs, True): - # Trigger Dump to GCS flow run with project id as datario - dump_pops_to_gcs_flow = create_flow_run( - flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, - project_name=constants.PREFECT_DEFAULT_PROJECT.value, - parameters={ - "project_id": "datario", - "dataset_id": dataset_id, - "table_id": table_id_pops, - "maximum_bytes_processed": maximum_bytes_processed, - }, - labels=[ - "datario", - ], - run_name=f"Dump to GCS {dataset_id}.{table_id_pops}", - ) - dump_pops_to_gcs_flow.set_upstream(wait_for_pops_materialization) - - wait_for_dump_pops_to_gcs = wait_for_flow_run( - dump_pops_to_gcs_flow, - stream_states=True, - stream_logs=True, - raise_final_state=True, - ) - -rj_cor_comando_pops_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -rj_cor_comando_pops_flow.run_config = KubernetesRun( - image=constants.DOCKER_IMAGE.value, - labels=[ - constants.RJ_COR_AGENT_LABEL.value, - ], -) - -rj_cor_comando_pops_flow.schedule = every_month +# with Flow( +# "COR: Comando - POPs e Atividades dos POPs", +# code_owners=[ +# "paty", +# ], +# ) as rj_cor_comando_pops_flow: +# # Dump mode +# dump_mode = Parameter("dump_mode", default="overwrite", required=False) + +# # Materialization parameters +# materialize_after_dump = Parameter( +# "materialize_after_dump", default=False, required=False +# ) +# materialization_mode = Parameter( +# "materialization_mode", default="prod", required=False +# ) +# materialize_to_datario = Parameter( +# "materialize_to_datario", default=False, required=False +# ) + +# # Dump to GCS after? Should only dump to GCS if materializing to datario +# dump_to_gcs = Parameter("dump_to_gcs", default=False, required=False) +# maximum_bytes_processed = Parameter( +# "maximum_bytes_processed", +# required=False, +# default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, +# ) + +# dataset_id = Parameter( +# "dataset_id", default=comando_constants.DATASET_ID.value, required=False +# ) +# table_id_pops = Parameter( +# "table_id_pops", default=comando_constants.TABLE_ID_POPS.value, required=False +# ) + +# pops = download_data_pops() +# path_pops = save_no_partition(dataframe=pops) + +# task_upload_pops = create_table_and_upload_to_gcs( +# data_path=path_pops, +# dataset_id=dataset_id, +# table_id=table_id_pops, +# biglake_table=False, +# dump_mode=dump_mode, +# ) + +# with case(materialize_after_dump, True): +# # Trigger DBT flow run +# current_flow_labels = get_current_flow_labels() + +# materialization_pops_flow = create_flow_run( +# flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, +# project_name=constants.PREFECT_DEFAULT_PROJECT.value, +# parameters={ +# "dataset_id": dataset_id, +# "table_id": table_id_pops, +# "mode": materialization_mode, +# "materialize_to_datario": materialize_to_datario, +# }, +# labels=current_flow_labels, +# run_name=f"Materialize {dataset_id}.{table_id_pops}", +# ) +# materialization_pops_flow.set_upstream(task_upload_pops) + +# wait_for_pops_materialization = wait_for_flow_run( +# materialization_pops_flow, +# stream_states=True, +# stream_logs=True, +# raise_final_state=True, +# ) +# wait_for_pops_materialization.max_retries = ( +# dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value +# ) +# wait_for_pops_materialization.retry_delay = timedelta( +# seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value +# ) + +# with case(dump_to_gcs, True): +# # Trigger Dump to GCS flow run with project id as datario +# dump_pops_to_gcs_flow = create_flow_run( +# flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, +# project_name=constants.PREFECT_DEFAULT_PROJECT.value, +# parameters={ +# "project_id": "datario", +# "dataset_id": dataset_id, +# "table_id": table_id_pops, +# "maximum_bytes_processed": maximum_bytes_processed, +# }, +# labels=[ +# "datario", +# ], +# run_name=f"Dump to GCS {dataset_id}.{table_id_pops}", +# ) +# dump_pops_to_gcs_flow.set_upstream(wait_for_pops_materialization) + +# wait_for_dump_pops_to_gcs = wait_for_flow_run( +# dump_pops_to_gcs_flow, +# stream_states=True, +# stream_logs=True, +# raise_final_state=True, +# ) + +# rj_cor_comando_pops_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +# rj_cor_comando_pops_flow.run_config = KubernetesRun( +# image=constants.DOCKER_IMAGE.value, +# labels=[ +# constants.RJ_COR_AGENT_LABEL.value, +# ], +# ) + +# rj_cor_comando_pops_flow.schedule = every_month diff --git a/pipelines/rj_cor/comando/eventos/tasks.py b/pipelines/rj_cor/comando/eventos/tasks.py index 46ea27db5..ba1c8a1b3 100644 --- a/pipelines/rj_cor/comando/eventos/tasks.py +++ b/pipelines/rj_cor/comando/eventos/tasks.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # pylint: disable=R0914,W0613,W0102,W0613,R0912,R0915,E1136,E1137,W0702 # flake8: noqa: E722 -# TODO: colocar id_pops novos +# TODO: colocar id_pops novos no redis toda vez que atualizar e buscar nos flows # TODO: gerar alerta quando tiver id_pop novo # TODO: apagar histórico da nova api para ter o id_pop novo # TODO: criar tabela dim do id_pop novo @@ -30,7 +30,6 @@ from pipelines.rj_cor.comando.eventos.utils import ( # build_redis_key, download_data, - treat_wrong_id_pop, ) from pipelines.utils.utils import ( build_redis_key, @@ -174,7 +173,6 @@ def treat_data_ocorrencias( "pop_id": "id_pop", "inicio": "data_inicio", "fim": "data_fim", - "pop": "pop_titulo", "titulo": "pop_especificacao", } ) @@ -203,11 +201,11 @@ def treat_data_ocorrencias( "Secundária": "Secundario", } ) - dfr["descricao"] = dfr["descricao"].apply(unidecode) + dfr[["pop", "descricao"]] = dfr[["pop", "descricao"]].apply(unidecode) mandatory_cols = [ - "id_pop", "id_evento", + "pop", "bairro", "data_inicio", "data_fim", @@ -231,30 +229,17 @@ def treat_data_ocorrencias( "gravidade", "status", "tipo", - "pop_titulo", + "pop", ] for i in categorical_cols: dfr[i] = dfr[i].str.capitalize() - # This treatment is temporary. Now the id_pop from API is comming with the same value as id_evento - dfr = treat_wrong_id_pop(dfr) - log(f"This id_pop are missing {dfr[dfr.id_pop.isna()]} they were replaced by 99") - dfr["id_pop"] = dfr["id_pop"].fillna(99) - - # Treat id_pop col - dfr["id_pop"] = dfr["id_pop"].astype(float).astype(int) - for col in ["data_inicio", "data_fim"]: dfr[col] = dfr[col].dt.strftime("%Y-%m-%d %H:%M:%S") # Set the order to match the original table dfr = dfr[mandatory_cols] - # Create a column with time of row creation to keep last event on dbt - dfr["created_at"] = pendulum.now(tz="America/Sao_Paulo").strftime( - "%Y-%m-%d %H:%M:%S" - ) - return dfr.drop_duplicates(), redis_max_date @@ -359,11 +344,6 @@ def treat_data_atividades( # Set the order to match the original table dfr = dfr[mandatory_cols] - # Create a column with time of row creation to keep last event on dbt - dfr["created_at"] = pendulum.now(tz="America/Sao_Paulo").strftime( - "%Y-%m-%d %H:%M:%S" - ) - return dfr.drop_duplicates(), redis_max_date From 5eb0f1474331050eb73c2a64ad2a25426fba2bc2 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Wed, 3 Apr 2024 17:12:26 -0300 Subject: [PATCH 10/14] bugfix --- pipelines/rj_cor/comando/eventos/tasks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_cor/comando/eventos/tasks.py b/pipelines/rj_cor/comando/eventos/tasks.py index ba1c8a1b3..9bacbce54 100644 --- a/pipelines/rj_cor/comando/eventos/tasks.py +++ b/pipelines/rj_cor/comando/eventos/tasks.py @@ -201,7 +201,8 @@ def treat_data_ocorrencias( "Secundária": "Secundario", } ) - dfr[["pop", "descricao"]] = dfr[["pop", "descricao"]].apply(unidecode) + dfr["pop"] = dfr["pop"].apply(unidecode) + dfr["descricao"] = dfr["descricao"].apply(unidecode) mandatory_cols = [ "id_evento", From 29668240d949f135a3e74b109c37dc9f88aebdd3 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Wed, 3 Apr 2024 19:49:58 -0300 Subject: [PATCH 11/14] unidecode all categorical columns --- pipelines/rj_cor/comando/eventos/tasks.py | 25 ++++++++++++++--------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/pipelines/rj_cor/comando/eventos/tasks.py b/pipelines/rj_cor/comando/eventos/tasks.py index 9bacbce54..a4fd9fd37 100644 --- a/pipelines/rj_cor/comando/eventos/tasks.py +++ b/pipelines/rj_cor/comando/eventos/tasks.py @@ -1,10 +1,6 @@ # -*- coding: utf-8 -*- -# pylint: disable=R0914,W0613,W0102,W0613,R0912,R0915,E1136,E1137,W0702 # flake8: noqa: E722 -# TODO: colocar id_pops novos no redis toda vez que atualizar e buscar nos flows -# TODO: gerar alerta quando tiver id_pop novo # TODO: apagar histórico da nova api para ter o id_pop novo -# TODO: criar tabela dim do id_pop novo """ Tasks for comando """ @@ -25,7 +21,6 @@ from prefect.engine.state import Skipped # from prefect.triggers import all_successful -# url_eventos = "http://aplicativo.cocr.com.br/comando/ocorrencias_api_nova" # from pipelines.rj_cor.utils import compare_actual_df_with_redis_df from pipelines.rj_cor.comando.eventos.utils import ( # build_redis_key, @@ -43,7 +38,7 @@ @task -def get_date_interval(first_date, last_date) -> Tuple[dict, str]: +def get_date_interval(first_date: str = None, last_date: str = None): """ If `first_date` and `last_date` are provided, convert it to pendulum and add one day to `last_date`. Else, get data from last 3 days. @@ -142,7 +137,11 @@ def save_redis_max_date( # pylint: disable=too-many-arguments max_retries=3, retry_delay=timedelta(seconds=60), ) -def download_data_ocorrencias(first_date, last_date, wait=None) -> pd.DataFrame: +def download_data_ocorrencias( + first_date: pendulum, + last_date: pendulum, + wait=None, # pylint: disable=unused-argument +) -> pd.DataFrame: """ Download data from API adding one day at a time so we can save the date in a new column `data_particao` that will be used to @@ -201,8 +200,10 @@ def treat_data_ocorrencias( "Secundária": "Secundario", } ) - dfr["pop"] = dfr["pop"].apply(unidecode) - dfr["descricao"] = dfr["descricao"].apply(unidecode) + categorical_cols = ["pop", "descricao", "bairro", "gravidade", "status"] + for i in categorical_cols: + dfr[i] = dfr[i].str.capitalize() + dfr[i] = dfr[i].apply(unidecode) mandatory_cols = [ "id_evento", @@ -249,7 +250,11 @@ def treat_data_ocorrencias( max_retries=3, retry_delay=timedelta(seconds=60), ) -def download_data_atividades(first_date, last_date, wait=None) -> pd.DataFrame: +def download_data_atividades( + first_date, + last_date, + wait=None, # pylint: disable=unused-argument +) -> pd.DataFrame: """ Download data from API """ From ca0ac39a189cdb2da216577c4077bf6db4c1135b Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Thu, 4 Apr 2024 09:31:48 -0300 Subject: [PATCH 12/14] returning last update column --- pipelines/rj_cor/comando/eventos/tasks.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_cor/comando/eventos/tasks.py b/pipelines/rj_cor/comando/eventos/tasks.py index a4fd9fd37..16ab55c52 100644 --- a/pipelines/rj_cor/comando/eventos/tasks.py +++ b/pipelines/rj_cor/comando/eventos/tasks.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- # flake8: noqa: E722 -# TODO: apagar histórico da nova api para ter o id_pop novo """ Tasks for comando """ @@ -242,6 +241,11 @@ def treat_data_ocorrencias( # Set the order to match the original table dfr = dfr[mandatory_cols] + # Create a column with time of row update to keep last event on dbt + dfr["last_updated_at"] = pendulum.now(tz="America/Sao_Paulo").strftime( + "%Y-%m-%d %H:%M:%S" + ) + return dfr.drop_duplicates(), redis_max_date @@ -350,6 +354,11 @@ def treat_data_atividades( # Set the order to match the original table dfr = dfr[mandatory_cols] + # Create a column with time of row update to keep last event on dbt + dfr["last_updated_at"] = pendulum.now(tz="America/Sao_Paulo").strftime( + "%Y-%m-%d %H:%M:%S" + ) + return dfr.drop_duplicates(), redis_max_date From 7486e71c48a6d3530ee889674f03d52606fe2e62 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Thu, 4 Apr 2024 09:36:40 -0300 Subject: [PATCH 13/14] returning pop flow --- pipelines/rj_cor/comando/eventos/flows.py | 228 +++++++++++----------- 1 file changed, 114 insertions(+), 114 deletions(-) diff --git a/pipelines/rj_cor/comando/eventos/flows.py b/pipelines/rj_cor/comando/eventos/flows.py index 5b30e3898..166ae474d 100644 --- a/pipelines/rj_cor/comando/eventos/flows.py +++ b/pipelines/rj_cor/comando/eventos/flows.py @@ -16,16 +16,16 @@ from pipelines.rj_cor.comando.eventos.constants import ( constants as comando_constants, ) -from pipelines.rj_cor.comando.eventos.schedules import every_hour # , every_month +from pipelines.rj_cor.comando.eventos.schedules import every_hour, every_month from pipelines.rj_cor.comando.eventos.tasks import ( download_data_ocorrencias, download_data_atividades, - # download_data_pops, + download_data_pops, get_date_interval, # get_redis_df, get_redis_max_date, save_data, - # save_no_partition, + save_no_partition, save_redis_max_date, treat_data_ocorrencias, treat_data_atividades, @@ -410,114 +410,114 @@ rj_cor_comando_atividades_evento_flow.schedule = every_hour -# with Flow( -# "COR: Comando - POPs e Atividades dos POPs", -# code_owners=[ -# "paty", -# ], -# ) as rj_cor_comando_pops_flow: -# # Dump mode -# dump_mode = Parameter("dump_mode", default="overwrite", required=False) - -# # Materialization parameters -# materialize_after_dump = Parameter( -# "materialize_after_dump", default=False, required=False -# ) -# materialization_mode = Parameter( -# "materialization_mode", default="prod", required=False -# ) -# materialize_to_datario = Parameter( -# "materialize_to_datario", default=False, required=False -# ) - -# # Dump to GCS after? Should only dump to GCS if materializing to datario -# dump_to_gcs = Parameter("dump_to_gcs", default=False, required=False) -# maximum_bytes_processed = Parameter( -# "maximum_bytes_processed", -# required=False, -# default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, -# ) - -# dataset_id = Parameter( -# "dataset_id", default=comando_constants.DATASET_ID.value, required=False -# ) -# table_id_pops = Parameter( -# "table_id_pops", default=comando_constants.TABLE_ID_POPS.value, required=False -# ) - -# pops = download_data_pops() -# path_pops = save_no_partition(dataframe=pops) - -# task_upload_pops = create_table_and_upload_to_gcs( -# data_path=path_pops, -# dataset_id=dataset_id, -# table_id=table_id_pops, -# biglake_table=False, -# dump_mode=dump_mode, -# ) - -# with case(materialize_after_dump, True): -# # Trigger DBT flow run -# current_flow_labels = get_current_flow_labels() - -# materialization_pops_flow = create_flow_run( -# flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, -# project_name=constants.PREFECT_DEFAULT_PROJECT.value, -# parameters={ -# "dataset_id": dataset_id, -# "table_id": table_id_pops, -# "mode": materialization_mode, -# "materialize_to_datario": materialize_to_datario, -# }, -# labels=current_flow_labels, -# run_name=f"Materialize {dataset_id}.{table_id_pops}", -# ) -# materialization_pops_flow.set_upstream(task_upload_pops) - -# wait_for_pops_materialization = wait_for_flow_run( -# materialization_pops_flow, -# stream_states=True, -# stream_logs=True, -# raise_final_state=True, -# ) -# wait_for_pops_materialization.max_retries = ( -# dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value -# ) -# wait_for_pops_materialization.retry_delay = timedelta( -# seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value -# ) - -# with case(dump_to_gcs, True): -# # Trigger Dump to GCS flow run with project id as datario -# dump_pops_to_gcs_flow = create_flow_run( -# flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, -# project_name=constants.PREFECT_DEFAULT_PROJECT.value, -# parameters={ -# "project_id": "datario", -# "dataset_id": dataset_id, -# "table_id": table_id_pops, -# "maximum_bytes_processed": maximum_bytes_processed, -# }, -# labels=[ -# "datario", -# ], -# run_name=f"Dump to GCS {dataset_id}.{table_id_pops}", -# ) -# dump_pops_to_gcs_flow.set_upstream(wait_for_pops_materialization) - -# wait_for_dump_pops_to_gcs = wait_for_flow_run( -# dump_pops_to_gcs_flow, -# stream_states=True, -# stream_logs=True, -# raise_final_state=True, -# ) - -# rj_cor_comando_pops_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) -# rj_cor_comando_pops_flow.run_config = KubernetesRun( -# image=constants.DOCKER_IMAGE.value, -# labels=[ -# constants.RJ_COR_AGENT_LABEL.value, -# ], -# ) - -# rj_cor_comando_pops_flow.schedule = every_month +with Flow( + "COR: Comando - POPs e Atividades dos POPs", + code_owners=[ + "paty", + ], +) as rj_cor_comando_pops_flow: + # Dump mode + dump_mode = Parameter("dump_mode", default="overwrite", required=False) + + # Materialization parameters + materialize_after_dump = Parameter( + "materialize_after_dump", default=False, required=False + ) + materialization_mode = Parameter( + "materialization_mode", default="prod", required=False + ) + materialize_to_datario = Parameter( + "materialize_to_datario", default=False, required=False + ) + + # Dump to GCS after? Should only dump to GCS if materializing to datario + dump_to_gcs = Parameter("dump_to_gcs", default=False, required=False) + maximum_bytes_processed = Parameter( + "maximum_bytes_processed", + required=False, + default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value, + ) + + dataset_id = Parameter( + "dataset_id", default=comando_constants.DATASET_ID.value, required=False + ) + table_id_pops = Parameter( + "table_id_pops", default=comando_constants.TABLE_ID_POPS.value, required=False + ) + + pops = download_data_pops() + path_pops = save_no_partition(dataframe=pops) + + task_upload_pops = create_table_and_upload_to_gcs( + data_path=path_pops, + dataset_id=dataset_id, + table_id=table_id_pops, + biglake_table=False, + dump_mode=dump_mode, + ) + + with case(materialize_after_dump, True): + # Trigger DBT flow run + current_flow_labels = get_current_flow_labels() + + materialization_pops_flow = create_flow_run( + flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "dataset_id": dataset_id, + "table_id": table_id_pops, + "mode": materialization_mode, + "materialize_to_datario": materialize_to_datario, + }, + labels=current_flow_labels, + run_name=f"Materialize {dataset_id}.{table_id_pops}", + ) + materialization_pops_flow.set_upstream(task_upload_pops) + + wait_for_pops_materialization = wait_for_flow_run( + materialization_pops_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + wait_for_pops_materialization.max_retries = ( + dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value + ) + wait_for_pops_materialization.retry_delay = timedelta( + seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value + ) + + with case(dump_to_gcs, True): + # Trigger Dump to GCS flow run with project id as datario + dump_pops_to_gcs_flow = create_flow_run( + flow_name=utils_constants.FLOW_DUMP_TO_GCS_NAME.value, + project_name=constants.PREFECT_DEFAULT_PROJECT.value, + parameters={ + "project_id": "datario", + "dataset_id": dataset_id, + "table_id": table_id_pops, + "maximum_bytes_processed": maximum_bytes_processed, + }, + labels=[ + "datario", + ], + run_name=f"Dump to GCS {dataset_id}.{table_id_pops}", + ) + dump_pops_to_gcs_flow.set_upstream(wait_for_pops_materialization) + + wait_for_dump_pops_to_gcs = wait_for_flow_run( + dump_pops_to_gcs_flow, + stream_states=True, + stream_logs=True, + raise_final_state=True, + ) + +rj_cor_comando_pops_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +rj_cor_comando_pops_flow.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[ + constants.RJ_COR_AGENT_LABEL.value, + ], +) + +rj_cor_comando_pops_flow.schedule = every_month From bad50b5b349f385addc6a57e65b757f9d9296741 Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Thu, 4 Apr 2024 11:22:32 -0300 Subject: [PATCH 14/14] adding nout on funcion --- pipelines/rj_cor/comando/eventos/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_cor/comando/eventos/tasks.py b/pipelines/rj_cor/comando/eventos/tasks.py index 16ab55c52..6ba6c177d 100644 --- a/pipelines/rj_cor/comando/eventos/tasks.py +++ b/pipelines/rj_cor/comando/eventos/tasks.py @@ -36,7 +36,7 @@ ) -@task +@task(nout=2) def get_date_interval(first_date: str = None, last_date: str = None): """ If `first_date` and `last_date` are provided, convert it to pendulum