Skip to content

Commit

Permalink
Merge pull request #646 from prefeitura-rio/staging/cor-precipitacao-…
Browse files Browse the repository at this point in the history
…alertario

[dashbord alagamento] changing alagamento query and data_final
  • Loading branch information
patriciacatandi authored Apr 2, 2024
2 parents b515023 + 1a6b061 commit 5f7c282
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 58 deletions.
50 changes: 36 additions & 14 deletions pipelines/rj_cor/comando/eventos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class constants(Enum): # pylint: disable=c0103
PATH_BASE_ENDERECOS = "/tmp/base_enderecos.csv"
DATASET_ID = "adm_cor_comando"
TABLE_ID_EVENTOS = "ocorrencias_nova_api"
REDIS_NAME = "cor_api_last_days"
REDIS_NAME = "last_update"
TABLE_ID_ATIVIDADES_EVENTOS = "ocorrencias_orgaos_responsaveis_nova_api"
# TABLE_ID_POPS = "procedimento_operacional_padrao"
# TABLE_ID_ATIVIDADES_POPS = "procedimento_operacional_padrao_orgaos_responsaveis"
Expand All @@ -32,14 +32,16 @@ class constants(Enum): # pylint: disable=c0103
WHEN id_pop = "33" THEN 1 -- "Lâmina d'água"
END AS tipo,
ST_GEOGPOINT(CAST(longitude AS FLOAT64),
CAST(latitude AS FLOAT64)) AS geometry
CAST(latitude AS FLOAT64)) AS geometry,
status AS status_ocorrencia,
data_inicio,
data_fim,
row_number() OVER (PARTITION BY id_evento ORDER BY created_at DESC) AS last_update
-- FROM `rj-cor.adm_cor_comando_staging.ocorrencias`
FROM `rj-cor.adm_cor_comando_staging.ocorrencias_nova_api`
WHERE id_pop IN ("5", "6", "31", "32", "33")
AND data_particao >= CAST(DATE_TRUNC(TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE), day) AS STRING)
AND CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE)
-- AND data_fim IS NULL
AND status = "Aberto"
AND CAST(data_particao AS DATE) >= CAST(DATE_TRUNC(TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE), day) AS date)
-- AND data_fim IS NULL # data_fim não está confiável, temos status fechados sem esse campo
),
intersected_areas AS (
Expand Down Expand Up @@ -70,14 +72,25 @@ class constants(Enum): # pylint: disable=c0103
FROM intersected_areas
LEFT JOIN alagamentos
ON ST_CONTAINS(intersected_areas.geometry, alagamentos.geometry)
AND alagamentos.last_update = 1
-- AND CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE) -- seleciona ocorrencias que iniciaram nos últimos 15min
AND (CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE)
OR CAST(data_fim AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 15 MINUTE)
OR status_ocorrencia = "Aberto") -- seleciona ocorrencias que iniciaram ou finalizaram nos últimos 15min ou ainda não finalizaram
WHERE intersected_areas.row_num = 1
GROUP BY id_h3, bairro
)
SELECT
id_h3,
bairro,
tipo AS qnt_alagamentos,
-- tipo AS qnt_alagamentos,
CASE
WHEN tipo = 3 THEN "Alagamento"
WHEN tipo = 2 THEN "Bolsão d'água"
WHEN tipo = 1 THEN "Lâmina d'água"
ELSE "sem alagamento"
END AS qnt_alagamentos,
CASE
WHEN tipo = 3 THEN "extremamente crítico" --"Alagamento"
WHEN tipo = 2 THEN "crítico" -- "Bolsão d'água"
Expand All @@ -89,8 +102,9 @@ class constants(Enum): # pylint: disable=c0103
WHEN tipo = 2 THEN '#A9CBE8'--'#BFA230'
WHEN tipo = 3 THEN '#125999'--'#E0701F'
ELSE '#ffffff'
END AS color
END AS color,
FROM final_table
-- order by qnt_alagamentos
""",
"query_update": """
SELECT date_trunc(current_datetime("America/Sao_Paulo"), minute) AS last_update
Expand All @@ -109,15 +123,15 @@ class constants(Enum): # pylint: disable=c0103
WHEN id_pop = "33" THEN 1 -- "Lâmina d'água"
END AS tipo,
ST_GEOGPOINT(CAST(longitude AS FLOAT64),
CAST(latitude AS FLOAT64)) AS geometry
CAST(latitude AS FLOAT64)) AS geometry,
status AS status_ocorrencia,
data_inicio,
data_fim,
row_number() OVER (PARTITION BY id_evento ORDER BY created_at DESC) AS last_update
-- FROM `rj-cor.adm_cor_comando_staging.ocorrencias`
FROM `rj-cor.adm_cor_comando_staging.ocorrencias_nova_api`
WHERE id_pop IN ("5", "6", "31", "32", "33")
AND data_particao >= CAST(DATE_TRUNC(TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 120 MINUTE), day) AS STRING)
AND (CAST(data_fim AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 120 MINUTE)
-- OR data_fim IS NULL
)
AND status = "Aberto"
AND CAST(data_particao AS DATE) >= CAST(DATE_TRUNC(TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 2 HOUR), day) AS date)
),
intersected_areas AS (
Expand Down Expand Up @@ -148,6 +162,13 @@ class constants(Enum): # pylint: disable=c0103
FROM intersected_areas
LEFT JOIN alagamentos
ON ST_CONTAINS(intersected_areas.geometry, alagamentos.geometry)
AND alagamentos.last_update = 1
AND (CAST(data_inicio AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 2 HOUR)
OR CAST(data_fim AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 2 HOUR)
OR status_ocorrencia = "Aberto") -- seleciona ocorrencias que iniciaram ou finalizaram nas últimas 2h ou ainda não finalizaram
-- AND (CAST(data_fim AS DATETIME) >= TIMESTAMP_SUB(CURRENT_DATETIME("America/Sao_Paulo"), INTERVAL 24 hour)
-- OR data_fim IS NULL # data_fim não está confiável, temos status fechados sem esse campo
-- )
WHERE intersected_areas.row_num = 1
GROUP BY id_h3, bairro
)
Expand All @@ -174,6 +195,7 @@ class constants(Enum): # pylint: disable=c0103
ELSE '#ffffff'
END AS color
FROM final_table
# order by qnt_alagamentos
""",
"query_update": """
SELECT date_trunc(current_datetime("America/Sao_Paulo"), minute) AS last_update
Expand Down
36 changes: 16 additions & 20 deletions pipelines/rj_cor/comando/eventos/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
download_data_atividades,
get_date_interval,
get_redis_df,
get_redis_max_date,
save_data,
save_redis_max_date,
treat_data_ocorrencias,
treat_data_atividades,
)
Expand Down Expand Up @@ -84,35 +86,18 @@

dfr = download_data_ocorrencias(first_date, last_date)

dfr_redis = get_redis_df(
redis_max_date = get_redis_max_date(
dataset_id=dataset_id,
table_id=table_id,
name=redis_name,
mode=redis_mode,
)

dfr_treated, dfr_redis = treat_data_ocorrencias(
dfr_treated, redis_max_date = treat_data_ocorrencias(
dfr,
dfr_redis=dfr_redis,
columns=["id_evento", "data_inicio", "status"],
redis_max_date,
)

# dfr = compare_actual_df_with_redis_df(
# dfr,
# dfr_redis=dfr_redis,
# columns=columns,

# )

# save_redis_df(
# dfr_redis,
# dataset_id,
# table_id,
# redis_name,
# keep_n_days=1,
# mode = mode,
# )

path = save_data(dfr_treated)
task_upload = create_table_and_upload_to_gcs(
data_path=path,
Expand All @@ -123,6 +108,17 @@
wait=path,
)

save_redis_max_date(
dataset_id=dataset_id,
table_id=table_id,
name=redis_name,
mode=redis_mode,
redis_max_date=redis_max_date,
wait=task_upload,
)

# save_redis_max_date.set_upstream(task_upload)

# Warning: this task won't execute if we provide a date interval
# on parameters. The reason this happens is for if we want to
# perform backfills, it won't mess with the Redis interval.
Expand Down
96 changes: 72 additions & 24 deletions pipelines/rj_cor/comando/eventos/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# -*- coding: utf-8 -*-
# pylint: disable=R0914,W0613,W0102,W0613,R0912,R0915,E1136,E1137,W0702
# flake8: noqa: E722
# TODO: colocar id_pops novos
# TODO: gerar alerta quando tiver id_pop novo
# TODO: apagar histórico da nova api para ter o id_pop novo
# TODO: criar tabela dim do id_pop novo
# TODO: salvar no redis o máximo entre as colunas de data_inicio e data_fim, seguir flow só se tiver novidades em alguma dessas colunas
"""
Tasks for comando
"""
Expand All @@ -21,18 +26,20 @@
from prefect.engine.state import Skipped

# from prefect.triggers import all_successful

# url_eventos = "http://aplicativo.cocr.com.br/comando/ocorrencias_api_nova"
from pipelines.rj_cor.utils import compare_actual_df_with_redis_df
from pipelines.rj_cor.comando.eventos.utils import (
build_redis_key,
compare_actual_df_with_redis_df,
get_redis_output, # TODO: atualizar o do utils.utils
# build_redis_key,
format_date,
treat_wrong_id_pop,
)
from pipelines.utils.utils import (
build_redis_key,
get_redis_output,
get_vault_secret,
log,
parse_date_columns,
save_str_on_redis,
to_partitions,
)

Expand All @@ -49,7 +56,7 @@ def get_date_interval(first_date, last_date) -> Tuple[dict, str]:
first_date, last_date = format_date(first_date, last_date)
else:
last_date = pendulum.today(tz="America/Sao_Paulo").date()
first_date = last_date.subtract(days=1) # atenção mudar para 3
first_date = last_date.subtract(days=3)
first_date, last_date = format_date(
first_date.strftime("%Y-%m-%d"), last_date.strftime("%Y-%m-%d")
)
Expand All @@ -70,7 +77,8 @@ def get_redis_df(
redis_key = build_redis_key(dataset_id, table_id, name, mode)
log(f"Acessing redis_key: {redis_key}")

dfr_redis = get_redis_output(redis_key, is_df=True)
dfr_redis = get_redis_output(redis_key)
# dfr_redis = get_redis_output(redis_key, is_df=True)
log(f"Redis output: {dfr_redis}")

# if len(dfr_redis) == 0:
Expand All @@ -90,6 +98,49 @@ def get_redis_df(
return dfr_redis


@task
def get_redis_max_date(
dataset_id: str,
table_id: str,
name: str = None,
mode: str = "prod",
) -> str:
"""
Acess redis to get the last saved date and compare to actual df.
"""
redis_key = build_redis_key(dataset_id, table_id, name, mode)
log(f"Acessing redis_key: {redis_key}")

redis_max_date = get_redis_output(redis_key)

try:
redis_max_date = redis_max_date["max_date"]
except KeyError:
redis_max_date = "1990-01-01"
log("Creating a fake date because this key doesn't exist.")

log(f"Redis output: {redis_max_date}")
return redis_max_date


@task
def save_redis_max_date( # pylint: disable=too-many-arguments
dataset_id: str,
table_id: str,
name: str = None,
mode: str = "prod",
redis_max_date: str = None,
wait=None, # pylint: disable=unused-argument
):
"""
Acess redis to save last date.
"""
redis_key = build_redis_key(dataset_id, table_id, name, mode)
log(f"Acessing redis_key: {redis_key}")

save_str_on_redis(redis_key, "max_date", redis_max_date)


@task(
nout=1,
max_retries=3,
Expand All @@ -104,6 +155,7 @@ def download_data_ocorrencias(first_date, last_date, wait=None) -> pd.DataFrame:
url_secret = get_vault_secret("comando")["data"]
url_eventos = url_secret["endpoint_eventos"]

log(f"\n\nDownloading data from {first_date} to {last_date} (not included)")
dfr = pd.read_json(f"{url_eventos}/?data_i={first_date}&data_f={last_date}")

return dfr
Expand All @@ -112,8 +164,7 @@ def download_data_ocorrencias(first_date, last_date, wait=None) -> pd.DataFrame:
@task(nout=2)
def treat_data_ocorrencias(
dfr: pd.DataFrame,
dfr_redis: pd.DataFrame,
columns: list,
redis_max_date: str,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Rename cols and normalize data.
Expand All @@ -125,28 +176,30 @@ def treat_data_ocorrencias(
"id": "id_evento",
"pop_id": "id_pop",
"inicio": "data_inicio",
"fim": "data_fim",
"pop": "pop_titulo",
"titulo": "pop_especificacao",
}
)

log(f"First row: \n{dfr.iloc[0]}")

dfr["id_evento"] = dfr["id_evento"].astype(float).astype(int).astype(str)

log(f"Dataframe before comparing with last data saved on redis \n{dfr.head()}")
columns = ["id_evento", "data_inicio", "status"]
dfr, dfr_redis = compare_actual_df_with_redis_df(
dfr,
dfr_redis,
columns,
)
log(f"Dataframe after comparing with last data saved on redis {dfr.head()}")
for col in ["data_inicio", "data_fim"]:
dfr[col] = pd.to_datetime(dfr[col], errors="coerce")
max_date = dfr[["data_inicio", "data_fim"]].max().max()
max_date = max_date.strftime("%Y-%m-%d %H:%M:%S")
log(f"Last API data was {max_date} and last redis uptade was {redis_max_date}")

# If df is empty stop flow
if dfr.shape[0] == 0:
if max_date <= redis_max_date:
skip_text = "No new data available on API"
print(skip_text)
raise ENDRUN(state=Skipped(skip_text))

# Get new max_date to save on redis
redis_max_date = max_date

dfr["tipo"] = dfr["tipo"].replace(
{
"Primária": "Primario",
Expand Down Expand Up @@ -193,9 +246,6 @@ def treat_data_ocorrencias(
# Treat id_pop col
dfr["id_pop"] = dfr["id_pop"].astype(float).astype(int)

for col in ["data_inicio", "data_fim"]:
dfr[col] = pd.to_datetime(dfr[col], errors="coerce")

for col in ["data_inicio", "data_fim"]:
dfr[col] = dfr[col].dt.strftime("%Y-%m-%d %H:%M:%S")

Expand All @@ -207,7 +257,7 @@ def treat_data_ocorrencias(
"%Y-%m-%d %H:%M:%S"
)

return dfr.drop_duplicates(), dfr_redis
return dfr.drop_duplicates(), redis_max_date


@task(
Expand Down Expand Up @@ -318,14 +368,12 @@ def save_data(dataframe: pd.DataFrame) -> Union[str, Path]:

partition_column = "data_inicio"
dataframe, partitions = parse_date_columns(dataframe, partition_column)
current_time = pendulum.now("America/Sao_Paulo").strftime("%Y%m%d%H%M")

to_partitions(
data=dataframe,
partition_columns=partitions,
savepath=prepath,
data_type="csv",
suffix=current_time,
)
log(f"[DEBUG] Files saved on {prepath}")
return prepath
Expand Down
Loading

0 comments on commit 5f7c282

Please sign in to comment.