From e6312f8c43511db253fc469ec194a87235097f2c Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Tue, 17 Oct 2023 16:41:14 -0300 Subject: [PATCH] adding backfill flow --- .../precipitacao_alertario/flows.py | 28 ++++ .../precipitacao_alertario/schedules.py | 19 +++ .../precipitacao_alertario/tasks.py | 122 ++++++++++++++++++ 3 files changed, 169 insertions(+) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_alertario/flows.py b/pipelines/rj_cor/meteorologia/precipitacao_alertario/flows.py index 1d3f45166..3c367066a 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_alertario/flows.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_alertario/flows.py @@ -18,12 +18,15 @@ ) from pipelines.rj_cor.meteorologia.precipitacao_alertario.tasks import ( check_to_run_dbt, + dump_from_ssh, tratar_dados, + treat_dumped_data, salvar_dados, save_last_dbt_update, ) from pipelines.rj_cor.meteorologia.precipitacao_alertario.schedules import ( minute_schedule, + day_schedule, ) from pipelines.rj_escritorio.rain_dashboard.constants import ( constants as rain_dashboard_constants, @@ -210,3 +213,28 @@ labels=[constants.RJ_COR_AGENT_LABEL.value], ) cor_meteorologia_precipitacao_alertario.schedule = minute_schedule + +with Flow( + name="COR: Meteorologia - Precipitacao ALERTARIO Backfill", + code_owners=[ + "paty", + ], + # skip_if_running=True, +) as cor_meteorologia_precipitacao_alertario_backfill: + mode = Parameter("mode", default="prod", required=False) + start_date = Parameter("start_date", default=None, required=False) + end_date = Parameter("end_date", default=None, required=False) + + filepath = dump_from_ssh() + treat_dumped_data(filepath) + # dfr = treat_dumped_data(filepath, start_date, end_date) + +# para rodar na cloud +cor_meteorologia_precipitacao_alertario_backfill.storage = GCS( + constants.GCS_FLOWS_BUCKET.value +) +cor_meteorologia_precipitacao_alertario_backfill.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[constants.RJ_COR_AGENT_LABEL.value], +) +cor_meteorologia_precipitacao_alertario_backfill.schedule = day_schedule diff --git a/pipelines/rj_cor/meteorologia/precipitacao_alertario/schedules.py b/pipelines/rj_cor/meteorologia/precipitacao_alertario/schedules.py index c296d3922..e7274b965 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_alertario/schedules.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_alertario/schedules.py @@ -27,3 +27,22 @@ ), ] ) + +day_schedule = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(days=1), + start_date=datetime(2021, 1, 1, 5, 0, 30), + labels=[ + constants.RJ_COR_AGENT_LABEL.value, + ], + parameter_defaults={ + # "trigger_rain_dashboard_update": True, + "materialize_after_dump": True, + "mode": "prod", + "materialize_to_datario": True, + "dump_to_gcs": False, + }, + ), + ] +) diff --git a/pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py b/pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py index 002d7cd33..393a05d7c 100644 --- a/pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py +++ b/pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py @@ -5,6 +5,7 @@ """ from datetime import timedelta from pathlib import Path +import subprocess from typing import Union, Tuple import numpy as np @@ -13,6 +14,8 @@ from prefect import task import pandas_read_xml as pdx +from paramiko import SSHClient +from scp import SCPClient # from prefect import context @@ -24,6 +27,7 @@ from pipelines.utils.utils import ( build_redis_key, compare_dates_between_tables_redis, + get_vault_secret, log, to_partitions, save_str_on_redis, @@ -221,3 +225,121 @@ def check_to_run_dbt( ) log(f">>>> debug data alertario > data dbt: {run_dbt}") return run_dbt + + +@task( + max_retries=2, + retry_delay=timedelta(seconds=30), +) +def dump_from_ssh() -> str: + """ + Fetch files from INEA server + + Args: + remote_file (str): Remote file to be fetched + radar (str): Radar name. Must be `gua` or `mac` + output_directory (str): Directory where the files will be saved + """ + dicionario = get_vault_secret("websempre_ssh") + hostname = dicionario["data"]["hostname"] + username = dicionario["data"]["username"] + password = dicionario["data"]["password"] + port = dicionario["data"]["port"] + + # Open SSH client + ssh_client = SSHClient() + ssh_client.load_system_host_keys() + ssh_client.connect( + hostname=hostname, + username=username, + password=password, + port=port, + timeout=300, + auth_timeout=300, + banner_timeout=300, + ) + + log("SSH conection estabilished") + command = "find /mnt/backup -name 'alertadb' | sort -k 6 -M -r" + _, stdout, _ = ssh_client.exec_command(command) + remote_files = stdout.read().decode("utf-8").splitlines() + log(f"Remote files found: {remote_files}") + + # Keep first file on the list since it is the last file modified on folder + remote_file = remote_files[0] + log(f"Last modified file: {remote_file}") + + # Open SCP client + scp = SCPClient(ssh_client.get_transport(), sanitize=lambda x: x) + + # Fetch VOL file + output_directory = Path("/temp/") + output_directory.mkdir(parents=True, exist_ok=True) + scp.get(remote_file, local_path=str(output_directory)) + + # Close connection + scp.close() + + # Return local file path + return Path(output_directory) / remote_file.split("/")[-1] + + +@task() +def treat_dumped_data( + filepath: str, + # start_date: str = None, + # end_date: str = None, +) -> pd.DataFrame: + """ + Treat file from Alertario server + + Args: + filepath (str): + start_date (str): Start date of the files to be fetched (e.g. 2022-01-25) + end_date (str): End fate of the files to be fetched (e.g. 2022-01-25) + """ + + file = filepath.split("/")[-1] + log("Starting restoring dump") + command = [ + "pg_restore", + "-a", + "-n", + "public", + "-t", + "estacoes_leiturachuva", + "-f", + "output_data.txt", + file, + ] + subprocess.run(command, check=True) + log("Finished restoring dump") + + dfr = pd.read_csv("output_data.txt", skiprows=30, nrows=20, header=None, sep="\t") + dfr.columns = [ + "id", + "leitura_id", + "m15", + "h01", + "h04", + "h24", + "h96", + "mes", + "h02", + "h03", + "m30", + "h06", + "h12", + "h36", + "h48", + "h72", + "h168", + "m05", + "m10", + ] + log(f"DEBUG dfr: \n{dfr.head()}") + + # if not start_date: + # start_date = current_time = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M") + + return dfr