Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Alertario] adding backfill flow from SSH #531

Open
wants to merge 46 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
e6312f8
adding backfill flow
patriciacatandi Oct 17, 2023
fd7c3a8
Merge branch 'master' into staging/dump_alertario
mergify[bot] Oct 19, 2023
af8dfd2
Merge branch 'master' into staging/dump_alertario
mergify[bot] Oct 20, 2023
505edc9
Merge branch 'master' into staging/dump_alertario
mergify[bot] Oct 20, 2023
558f76d
Merge branch 'master' into staging/dump_alertario
mergify[bot] Oct 23, 2023
00dcaf7
Merge branch 'master' into staging/dump_alertario
mergify[bot] Oct 23, 2023
4839f0b
Merge branch 'master' into staging/dump_alertario
mergify[bot] Oct 23, 2023
c63a0e6
Merge branch 'master' into staging/dump_alertario
mergify[bot] Oct 24, 2023
93e9726
Merge branch 'master' into staging/dump_alertario
mergify[bot] Oct 24, 2023
ff5c297
Merge branch 'master' into staging/dump_alertario
mergify[bot] Oct 24, 2023
5a37381
Merge branch 'master' into staging/dump_alertario
mergify[bot] Oct 26, 2023
80269d4
Merge branch 'master' into staging/dump_alertario
mergify[bot] Oct 27, 2023
907a9b9
Merge branch 'master' into staging/dump_alertario
mergify[bot] Oct 27, 2023
bb082df
Merge branch 'master' into staging/dump_alertario
mergify[bot] Nov 13, 2023
459162b
Merge branch 'master' into staging/dump_alertario
mergify[bot] Nov 13, 2023
a4147b8
Merge branch 'master' into staging/dump_alertario
mergify[bot] Nov 13, 2023
f0a7e21
Merge branch 'master' into staging/dump_alertario
mergify[bot] Nov 13, 2023
bf13c79
Merge branch 'master' into staging/dump_alertario
mergify[bot] Nov 13, 2023
fc9151b
Merge branch 'master' into staging/dump_alertario
mergify[bot] Nov 14, 2023
2afdb40
Merge branch 'master' into staging/dump_alertario
mergify[bot] Nov 14, 2023
f55c1fe
Merge branch 'master' into staging/dump_alertario
mergify[bot] Nov 15, 2023
850d170
Merge branch 'master' into staging/dump_alertario
mergify[bot] Nov 15, 2023
db4e3e2
Merge branch 'master' into staging/dump_alertario
mergify[bot] Nov 15, 2023
4f2e738
Merge branch 'master' into staging/dump_alertario
mergify[bot] Nov 19, 2023
f0d401c
Merge branch 'master' into staging/dump_alertario
mergify[bot] Nov 19, 2023
3edb48b
Merge branch 'master' into staging/dump_alertario
mergify[bot] Nov 24, 2023
376a4bf
Merge branch 'master' into staging/dump_alertario
mergify[bot] Dec 11, 2023
cad3606
Merge branch 'master' into staging/dump_alertario
mergify[bot] Dec 12, 2023
18b8b1e
Merge branch 'master' into staging/dump_alertario
mergify[bot] Dec 12, 2023
e9a047b
Merge branch 'master' into staging/dump_alertario
mergify[bot] Dec 12, 2023
fcd2482
Merge branch 'master' into staging/dump_alertario
mergify[bot] Dec 13, 2023
3416c48
Merge branch 'master' into staging/dump_alertario
mergify[bot] Dec 14, 2023
b7121aa
Merge branch 'master' into staging/dump_alertario
mergify[bot] Dec 14, 2023
165a07a
Merge branch 'master' into staging/dump_alertario
mergify[bot] Dec 18, 2023
bc543a7
Merge branch 'master' into staging/dump_alertario
mergify[bot] Dec 18, 2023
2c45853
Merge branch 'master' into staging/dump_alertario
mergify[bot] Dec 18, 2023
a3cf41a
Merge branch 'master' into staging/dump_alertario
mergify[bot] Dec 18, 2023
dd60cbf
Merge branch 'master' into staging/dump_alertario
mergify[bot] Dec 21, 2023
5f5c5f2
Merge branch 'master' into staging/dump_alertario
mergify[bot] Jan 2, 2024
8e337e6
Merge branch 'master' into staging/dump_alertario
mergify[bot] Jan 8, 2024
3c85a53
Merge branch 'master' into staging/dump_alertario
mergify[bot] Jan 14, 2024
714216f
Merge branch 'master' into staging/dump_alertario
mergify[bot] Jan 15, 2024
1f36510
Merge branch 'master' into staging/dump_alertario
mergify[bot] Jan 15, 2024
1ae4df1
Merge branch 'master' into staging/dump_alertario
mergify[bot] Jan 16, 2024
d78dbb6
Merge branch 'master' into staging/dump_alertario
mergify[bot] Jan 16, 2024
3581cbd
Merge branch 'master' into staging/dump_alertario
mergify[bot] Jan 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions pipelines/rj_cor/meteorologia/precipitacao_alertario/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
)
from pipelines.rj_cor.meteorologia.precipitacao_alertario.tasks import (
check_to_run_dbt,
dump_from_ssh,
tratar_dados,
treat_dumped_data,
salvar_dados,
save_last_dbt_update,
)
from pipelines.rj_cor.meteorologia.precipitacao_alertario.schedules import (
minute_schedule,
day_schedule,
)
from pipelines.rj_escritorio.rain_dashboard.constants import (
constants as rain_dashboard_constants,
Expand Down Expand Up @@ -212,3 +215,28 @@
)
cor_meteorologia_precipitacao_alertario.executor = LocalDaskExecutor(num_workers=10)
cor_meteorologia_precipitacao_alertario.schedule = minute_schedule

with Flow(
name="COR: Meteorologia - Precipitacao ALERTARIO Backfill",
code_owners=[
"paty",
],
# skip_if_running=True,
) as cor_meteorologia_precipitacao_alertario_backfill:
mode = Parameter("mode", default="prod", required=False)
start_date = Parameter("start_date", default=None, required=False)
end_date = Parameter("end_date", default=None, required=False)

filepath = dump_from_ssh()
treat_dumped_data(filepath)
# dfr = treat_dumped_data(filepath, start_date, end_date)

# para rodar na cloud
cor_meteorologia_precipitacao_alertario_backfill.storage = GCS(
constants.GCS_FLOWS_BUCKET.value
)
cor_meteorologia_precipitacao_alertario_backfill.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_COR_AGENT_LABEL.value],
)
cor_meteorologia_precipitacao_alertario_backfill.schedule = day_schedule
19 changes: 19 additions & 0 deletions pipelines/rj_cor/meteorologia/precipitacao_alertario/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,22 @@
),
]
)

day_schedule = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=1),
start_date=datetime(2021, 1, 1, 5, 0, 30),
labels=[
constants.RJ_COR_AGENT_LABEL.value,
],
parameter_defaults={
# "trigger_rain_dashboard_update": True,
"materialize_after_dump": True,
"mode": "prod",
"materialize_to_datario": True,
"dump_to_gcs": False,
},
),
]
)
122 changes: 122 additions & 0 deletions pipelines/rj_cor/meteorologia/precipitacao_alertario/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""
from datetime import timedelta
from pathlib import Path
import subprocess
from typing import Union, Tuple

import numpy as np
Expand All @@ -13,6 +14,8 @@
from prefect import task

import pandas_read_xml as pdx
from paramiko import SSHClient
from scp import SCPClient

# from prefect import context

Expand All @@ -24,6 +27,7 @@
from pipelines.utils.utils import (
build_redis_key,
compare_dates_between_tables_redis,
get_vault_secret,
log,
to_partitions,
save_str_on_redis,
Expand Down Expand Up @@ -231,3 +235,121 @@ def check_to_run_dbt(
)
log(f">>>> debug data alertario > data dbt: {run_dbt}")
return run_dbt


@task(
max_retries=2,
retry_delay=timedelta(seconds=30),
)
def dump_from_ssh() -> str:
"""
Fetch files from INEA server

Args:
remote_file (str): Remote file to be fetched
radar (str): Radar name. Must be `gua` or `mac`
output_directory (str): Directory where the files will be saved
"""
dicionario = get_vault_secret("websempre_ssh")
hostname = dicionario["data"]["hostname"]
username = dicionario["data"]["username"]
password = dicionario["data"]["password"]
port = dicionario["data"]["port"]

# Open SSH client
ssh_client = SSHClient()
ssh_client.load_system_host_keys()
ssh_client.connect(
hostname=hostname,
username=username,
password=password,
port=port,
timeout=300,
auth_timeout=300,
banner_timeout=300,
)

log("SSH conection estabilished")
command = "find /mnt/backup -name 'alertadb' | sort -k 6 -M -r"
_, stdout, _ = ssh_client.exec_command(command)
remote_files = stdout.read().decode("utf-8").splitlines()
log(f"Remote files found: {remote_files}")

# Keep first file on the list since it is the last file modified on folder
remote_file = remote_files[0]
log(f"Last modified file: {remote_file}")

# Open SCP client
scp = SCPClient(ssh_client.get_transport(), sanitize=lambda x: x)

# Fetch VOL file
output_directory = Path("/temp/")
output_directory.mkdir(parents=True, exist_ok=True)
scp.get(remote_file, local_path=str(output_directory))

# Close connection
scp.close()

# Return local file path
return Path(output_directory) / remote_file.split("/")[-1]


@task()
def treat_dumped_data(
filepath: str,
# start_date: str = None,
# end_date: str = None,
) -> pd.DataFrame:
"""
Treat file from Alertario server

Args:
filepath (str):
start_date (str): Start date of the files to be fetched (e.g. 2022-01-25)
end_date (str): End fate of the files to be fetched (e.g. 2022-01-25)
"""

file = filepath.split("/")[-1]
log("Starting restoring dump")
command = [
"pg_restore",
"-a",
"-n",
"public",
"-t",
"estacoes_leiturachuva",
"-f",
"output_data.txt",
file,
]
subprocess.run(command, check=True)
log("Finished restoring dump")

dfr = pd.read_csv("output_data.txt", skiprows=30, nrows=20, header=None, sep="\t")
dfr.columns = [
"id",
"leitura_id",
"m15",
"h01",
"h04",
"h24",
"h96",
"mes",
"h02",
"h03",
"m30",
"h06",
"h12",
"h36",
"h48",
"h72",
"h168",
"m05",
"m10",
]
log(f"DEBUG dfr: \n{dfr.head()}")

# if not start_date:
# start_date = current_time = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M")

return dfr
Loading