Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Goes 16] Salvar arquivos do Storage em parquet #579

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions pipelines/rj_cor/meteorologia/satelite/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
dataset_id=dataset_id,
table_id=table_id,
dump_mode=dump_mode,
data_type="parquet",
wait=path,
)

Expand Down
53 changes: 27 additions & 26 deletions pipelines/rj_cor/meteorologia/satelite/satellite_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,13 @@
import xarray as xr

from pipelines.rj_cor.meteorologia.satelite.remap import remap
from pipelines.utils.utils import get_credentials_from_env, list_blobs_with_prefix, log
from pipelines.utils.utils import (
get_credentials_from_env,
list_blobs_with_prefix,
log,
parse_date_columns,
to_partitions,
)


def get_blob_with_prefix(bucket_name: str, prefix: str, mode: str = "prod") -> str:
Expand Down Expand Up @@ -432,27 +438,10 @@ def save_data_in_file(
"""
Read all nc or tif files and save them in a unique file inside a partition
"""
date_save = datetime_save[:8]
time_save = str(int(datetime_save[9:11]))

year = date_save[:4]
month = str(int(date_save[4:6]))
day = str(int(date_save[6:8]))
date = year + "-" + month.zfill(2) + "-" + day.zfill(2)
partitions = os.path.join(
f"ano_particao={year}",
f"mes_particao={month}",
f"data_particao={date}",
f"hora_particao={time_save}",
)

folder_path = f"{os.getcwd()}/temp/treated/{product}/"
# cria pasta de partições se elas não existem
output_path = os.path.join(os.getcwd(), "temp", "output", mode_redis, product)
partitions_path = os.path.join(output_path, partitions)

if not os.path.exists(partitions_path):
os.makedirs(partitions_path)

# Loop through all NetCDF files in the folder
data = pd.DataFrame()
Expand All @@ -469,17 +458,29 @@ def save_data_in_file(
data["horario"] = pendulum.from_format(
datetime_save, "YYYYMMDD HHmmss"
).to_time_string()
data["data_medicao"] = pendulum.from_format(
datetime_save, "YYYYMMDD HHmmss"
).to_date_string()

print(f"Final df: {data.head()}")
# Fixa ordem das colunas
data = data[["longitude", "latitude", "horario"] + [i.lower() for i in variable]]
print("cols", data.columns)
data = data[
["longitude", "latitude", "data_medicao", "horario"]
+ [i.lower() for i in variable]
]
print(f"Final df: {data.head()}")

file_name = files[0].split("_variable-")[0]
print(f"\n\n[DEGUB]: Saving {file_name} on {output_path}\n\n")
print(f"Data_save: {date_save}, time_save: {time_save}")
# log(f"\n\n[DEGUB]: Saving {file_name} on {parquet_path}\n\n")
# log(f"Data_save: {date_save}, time_save: {time_save}")
file_path = os.path.join(partitions_path, f"{file_name}.csv")
data.to_csv(file_path, index=False)

partition_column = "data_medicao"
data, partitions = parse_date_columns(data, partition_column)
print(f"\n\n[DEGUB]: Partitions {partitions}\n\n")
print(f"Final df: {data.head()}")

to_partitions(
data=data,
partition_columns=partitions,
savepath=output_path,
data_type="parquet",
)
return output_path
2 changes: 1 addition & 1 deletion pipelines/rj_cor/meteorologia/satelite/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def save_data(info: dict, mode_redis: str = "prod") -> Union[str, Path]:
Concat all netcdf data and save partitioned by date on a csv
"""

log("Start saving product on a csv")
log("Start saving product on a file")
output_path = save_data_in_file(
product=info["product"],
variable=info["variable"],
Expand Down
5 changes: 3 additions & 2 deletions pipelines/utils/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def create_table_and_upload_to_gcs(
dataset_id: str,
table_id: str,
dump_mode: str,
data_type: str = "csv",
biglake_table: bool = True,
wait=None, # pylint: disable=unused-argument
) -> None:
Expand Down Expand Up @@ -190,15 +191,15 @@ def create_table_and_upload_to_gcs(
else:
# the header is needed to create a table when dosen't exist
log("MODE APPEND: Table DOSEN'T EXISTS\nStart to CREATE HEADER file")
header_path = dump_header_to_file(data_path=data_path)
header_path = dump_header_to_file(data_path=data_path, data_type=data_type)
log("MODE APPEND: Created HEADER file:\n" f"{header_path}")

tb.create(
path=header_path,
if_storage_data_exists="replace",
if_table_exists="replace",
biglake_table=biglake_table,
dataset_is_public=dataset_is_public,
source_format=data_type,
)

log(
Expand Down
Loading