Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] satelite backfill #342

Open
wants to merge 47 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
016bbd6
criando commit de backfill
patriciacatandi Apr 20, 2022
2331881
merge com a master
patriciacatandi May 17, 2022
ffcf419
mudanças no backfill
patriciacatandi May 17, 2022
bf8f643
atualizando backfill!
patriciacatandi May 24, 2022
d029a04
atualizando com merge]'
patriciacatandi May 30, 2022
cdf8277
gerar baixáveis da tabela de meteorologia do inmet
patriciacatandi May 31, 2022
0acf161
modificações para poder rodar o backtest em paralelo
patriciacatandi May 31, 2022
fe2bbe9
consertando o problema m=na hora de deletar os arquivos do pc
patriciacatandi May 31, 2022
1b81097
alterações pra rodar os dados missing
patriciacatandi Jun 2, 2022
46752a0
merge master
patriciacatandi Nov 21, 2022
6bfb060
backfill satélite
patriciacatandi Nov 21, 2022
105c970
alterando pipe para rodar backfill
patriciacatandi Nov 22, 2022
8e3cdc5
alteração arquivos para backfill7
patriciacatandi Dec 5, 2022
c95a258
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 12, 2023
14e33ed
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Jan 12, 2023
68f0503
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Jan 12, 2023
c7c58b6
arrumando lint
patriciacatandi Jan 16, 2023
ac5e006
apagando log
patriciacatandi Jan 16, 2023
56e19fc
merge master
patriciacatandi Jan 16, 2023
d232c77
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Jan 16, 2023
4e6cca1
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Jan 16, 2023
63dbceb
bugfix
patriciacatandi Jan 17, 2023
3431ecb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 17, 2023
045bc2e
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Jan 27, 2023
5311a40
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Jan 31, 2023
91c46ba
voltando o build_redis_key
patriciacatandi Feb 1, 2023
2d1d828
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Feb 1, 2023
570ae22
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Feb 9, 2023
d2765c8
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Feb 9, 2023
2e24e9d
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Feb 13, 2023
cdefc64
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Feb 13, 2023
be16868
Merge branch 'master' of github.com:prefeitura-rio/pipelines into sta…
patriciacatandi Feb 13, 2023
b95d465
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Feb 13, 2023
a5d64d7
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Feb 16, 2023
ea28611
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Feb 16, 2023
e76766c
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Feb 16, 2023
c3489a6
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Feb 16, 2023
b736cf4
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Feb 16, 2023
2c75002
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Feb 16, 2023
477de8f
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Feb 16, 2023
2cdadba
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Feb 16, 2023
e1ecaa2
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Feb 23, 2023
288f05e
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Mar 2, 2023
ddb2a95
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Mar 2, 2023
4db0854
Merge branch 'master' into staging/cor_satelite_backfill
mergify[bot] Mar 3, 2023
32e4ce4
mudanças backfill
patriciacatandi Mar 29, 2023
04b3fe8
Merge branch 'staging/cor_satelite_backfill' of github.com:prefeitura…
patriciacatandi Mar 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,233 changes: 1,233 additions & 0 deletions pipelines/rj_cor/meteorologia/backfill_satelite_via_storage.ipynb

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pipelines/rj_cor/meteorologia/satelite/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
table_id_rr,
"prod",
redis_files_rr_updated,
keep_last=200,
wait=path_rr,
)

Expand Down Expand Up @@ -130,6 +131,7 @@
table_id_tpw,
"prod",
redis_files_tpw_updated,
keep_last=200,
wait=path_tpw,
)

Expand Down Expand Up @@ -169,6 +171,7 @@
table_id_cmip,
"prod",
redis_files_cmip_updated,
keep_last=200,
wait=path_cmip,
)

Expand Down
7 changes: 3 additions & 4 deletions pipelines/rj_cor/meteorologia/satelite/remap.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"""
Converte coordenada X,Y para latlon
"""
import time as t

import netCDF4 as nc
import numpy as np
Expand Down Expand Up @@ -106,9 +105,9 @@ def remap(

# Perform the projection/resampling

print("Remapping", path)
# print("Remapping", path)

start = t.time()
# start = t.time()

gdal.ReprojectImage(
raw,
Expand All @@ -119,7 +118,7 @@ def remap(
options=["NUM_THREADS=ALL_CPUS"],
)

print("- finished! Time:", t.time() - start, "seconds")
# print("- finished! Time:", t.time() - start, "seconds")

# Close file
raw = None
Expand Down
47 changes: 34 additions & 13 deletions pipelines/rj_cor/meteorologia/satelite/satellite_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# pylint: disable=too-many-locals
# pylint: disable=too-many-locals,R0913
# flake8: noqa
"""
Funções úteis no tratamento de dados de satélite
"""
Expand Down Expand Up @@ -64,6 +65,7 @@
import datetime
import os
from pathlib import Path
import re
from typing import Tuple, Union

from google.cloud import storage
Expand Down Expand Up @@ -111,6 +113,7 @@ def download_blob(

credentials = get_credentials_from_env(mode=mode)
storage_client = storage.Client(credentials=credentials)
# storage_client = storage.Client()

bucket = storage_client.bucket(bucket_name)

Expand All @@ -128,10 +131,11 @@ def converte_timezone(datetime_save: str) -> str:
Recebe o formato de data hora em 'YYYYMMDD HHmm' no UTC e
retorna no mesmo formato no horário São Paulo
"""
log(f">>>>>>> {datetime_save}")
datahora = pendulum.from_format(datetime_save, "YYYYMMDD HHmm")
log(f">>>>>>> datetime_save {datetime_save}")
datahora = pendulum.from_format(datetime_save, "YYYYMMDD HHmmss")
log(f">>>>>>> datahora {datahora}")
datahora = datahora.in_tz("America/Sao_Paulo")
return datahora.format("YYYYMMDD HHmm")
return datahora.format("YYYYMMDD HHmmss")


def extract_julian_day_and_hour_from_filename(filename: str):
Expand All @@ -154,7 +158,7 @@ def extract_julian_day_and_hour_from_filename(filename: str):
julian_day = int(start[4:7])

# Time (UTC) as string
hour_utc = start[7:11]
hour_utc = start[7:13]

# Time of the start of the Scan
# time = start[7:9] + ":" + start[9:11] + ":" + start[11:13] + " UTC"
Expand Down Expand Up @@ -207,7 +211,7 @@ def get_info(path: str) -> Tuple[dict, str]:
if procura_m == -1:
procura_m = path.find("-M4")
product = path[path.find("L2-") + 3 : procura_m]
print(product)
# print(product)

# Nem todos os produtos foram adicionados no dicionário de características
# dos produtos. Olhar arquivo original caso o produto não estaja aqui
Expand Down Expand Up @@ -367,8 +371,10 @@ def get_info(path: str) -> Tuple[dict, str]:

if variable == "CMI":
# Search for the GOES-16 channel in the file name
regex = "-M\\dC\\d" # noqa: W605
find_expression = re.findall(regex, path)[0]
product_caracteristics["band"] = int(
(path[path.find("M6C") + 3 : path.find("_G16")])
(path[path.find(find_expression) + 4 : path.find("_G16")])
)
else:
product_caracteristics["band"] = np.nan
Expand Down Expand Up @@ -402,6 +408,7 @@ def remap_g16(
resolution: int,
variable: str,
datetime_save: str,
mode_redis: str = "prod",
):
"""
the GOES-16 image is reprojected to the rectangular projection in the extent region
Expand Down Expand Up @@ -436,7 +443,7 @@ def remap_g16(
)

tif_path = os.path.join(
os.getcwd(), "data", "satelite", variable, "temp", partitions
os.getcwd(), mode_redis, "data", "satelite", variable, "temp", partitions
)

if not os.path.exists(tif_path):
Expand Down Expand Up @@ -519,7 +526,7 @@ def treat_data(


def save_data_in_file(
variable: str, datetime_save: str, file_path: str
variable: str, datetime_save: str, file_path: str, mode_redis: str = "prod"
) -> Union[str, Path]:
"""
Save data in parquet
Expand All @@ -539,7 +546,14 @@ def save_data_in_file(
)

tif_data = os.path.join(
os.getcwd(), "data", "satelite", variable, "temp", partitions, "dados.tif"
os.getcwd(),
mode_redis,
"data",
"satelite",
variable,
"temp",
partitions,
"dados.tif",
)

data = xr.open_dataset(tif_data, engine="rasterio")
Expand All @@ -561,14 +575,20 @@ def save_data_in_file(
)

# cria pasta de partições se elas não existem
output_path = os.path.join(os.getcwd(), "data", "satelite", variable, "output")
output_path = os.path.join(
os.getcwd(), mode_redis, "data", "satelite", variable, "output"
)
parquet_path = os.path.join(output_path, partitions)

if not os.path.exists(parquet_path):
os.makedirs(parquet_path)

data["horario"] = pendulum.from_format(
datetime_save, "YYYYMMDD HHmmss"
).to_time_string()
# log(f">>>>> data head {data.head()}")
# Fixa ordem das colunas
data = data[["longitude", "latitude", variable.lower()]]
data = data[["longitude", "latitude", "horario", variable.lower()]]

# salva em csv
filename = file_path.split("/")[-1].replace(".nc", "")
Expand All @@ -579,7 +599,7 @@ def save_data_in_file(
return output_path


def main(path: Union[str, Path]):
def main(path: Union[str, Path], mode_redis: str = "prod"):
"""
Função principal para converter dados x,y em lon,lat
"""
Expand Down Expand Up @@ -620,6 +640,7 @@ def main(path: Union[str, Path]):
resolution,
product_caracteristics["variable"],
datetime_save,
mode_redis,
)

info = {
Expand Down
20 changes: 12 additions & 8 deletions pipelines/rj_cor/meteorologia/satelite/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def download(
ref_filename: str = None,
redis_files: list = [],
wait=None,
mode_redis: str = "prod",
) -> Union[str, Path]:
"""
Acessa o S3 e faz o download do primeiro arquivo da data-hora especificada
Expand Down Expand Up @@ -121,14 +122,16 @@ def download(
skip = Skipped("No available files on API")
raise ENDRUN(state=skip)

base_path = os.path.join(os.getcwd(), "data", "satelite", variavel[:-1], "input")
base_path = os.path.join(
os.getcwd(), mode_redis, "data", "satelite", variavel[:-1], "input"
)

if not os.path.exists(base_path):
os.makedirs(base_path)

# Seleciona primeiro arquivo que não tem o nome salvo no redis
log(f"\n\n[DEBUG]: available files on API: {path_files}")
log(f"\n\n[DEBUG]: filenames already saved on redis_files: {redis_files}")
# log(f"\n\n[DEBUG]: available files on API: {path_files}")
# log(f"\n\n[DEBUG]: filenames already saved on redis_files: {redis_files}")
log(f"\n\n[DEBUG]: ref_filename: {ref_filename}")

# keep only ref_filename if it exists
Expand All @@ -145,12 +148,13 @@ def download(
download_file = None
for path_file in path_files:
filename = path_file.split("/")[-1]
log(f"\n\n[DEBUG]: {filename} check if is in redis")
if filename not in redis_files:
log(f"\n\n[DEBUG]: {filename} not in redis")
redis_files.append(filename)
path_filename = os.path.join(base_path, filename)
download_file = path_file
log(f"[DEBUG]: filename to be append on redis_files: {redis_files}")
# log(f"[DEBUG]: filename to be append on redis_files: {redis_files}")
break

# Skip task if there is no new file
Expand All @@ -174,24 +178,24 @@ def download(


@task
def tratar_dados(filename: str) -> dict:
def tratar_dados(filename: str, mode_redis: str = "prod") -> dict:
"""
Converte coordenadas X, Y para latlon e recorta área
"""
log(f"\n>>>> Started treating file: {filename}")
grid, goes16_extent, info = main(filename)
grid, goes16_extent, info = main(filename, mode_redis)
del grid, goes16_extent
return info


@task
def save_data(info: dict, file_path: str) -> Union[str, Path]:
def save_data(info: dict, file_path: str, mode_redis: str = "prod") -> Union[str, Path]:
"""
Convert tif data to csv
"""

variable = info["variable"]
datetime_save = info["datetime_save"]
print(f"Saving {variable} in parquet")
output_path = save_data_in_file(variable, datetime_save, file_path)
output_path = save_data_in_file(variable, datetime_save, file_path, mode_redis)
return output_path
8 changes: 8 additions & 0 deletions pipelines/rj_cor/meteorologia/satelite_backfill/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# -*- coding: utf-8 -*-
"""
Prefect flows for workshop project
"""
###############################################################################
# Automatically managed, please do not touch
###############################################################################
from .flows import *
Loading