From 93a2dc50c6503fc57a5759d5fe0c5ede140eec7f Mon Sep 17 00:00:00 2001 From: patriciacatandi Date: Wed, 29 Nov 2023 14:58:00 -0300 Subject: [PATCH] trying to save files using parquet --- .../rj_cor/meteorologia/satelite/flows.py | 1 + .../meteorologia/satelite/satellite_utils.py | 53 ++++++++++--------- .../rj_cor/meteorologia/satelite/tasks.py | 2 +- pipelines/utils/tasks.py | 5 +- 4 files changed, 32 insertions(+), 29 deletions(-) diff --git a/pipelines/rj_cor/meteorologia/satelite/flows.py b/pipelines/rj_cor/meteorologia/satelite/flows.py index 57e1a1840..59a9665ff 100644 --- a/pipelines/rj_cor/meteorologia/satelite/flows.py +++ b/pipelines/rj_cor/meteorologia/satelite/flows.py @@ -99,6 +99,7 @@ dataset_id=dataset_id, table_id=table_id, dump_mode=dump_mode, + data_type="parquet", wait=path, ) diff --git a/pipelines/rj_cor/meteorologia/satelite/satellite_utils.py b/pipelines/rj_cor/meteorologia/satelite/satellite_utils.py index 097b9a4e7..1645eef36 100644 --- a/pipelines/rj_cor/meteorologia/satelite/satellite_utils.py +++ b/pipelines/rj_cor/meteorologia/satelite/satellite_utils.py @@ -61,7 +61,13 @@ import xarray as xr from pipelines.rj_cor.meteorologia.satelite.remap import remap -from pipelines.utils.utils import get_credentials_from_env, list_blobs_with_prefix, log +from pipelines.utils.utils import ( + get_credentials_from_env, + list_blobs_with_prefix, + log, + parse_date_columns, + to_partitions, +) def get_blob_with_prefix(bucket_name: str, prefix: str, mode: str = "prod") -> str: @@ -432,27 +438,10 @@ def save_data_in_file( """ Read all nc or tif files and save them in a unique file inside a partition """ - date_save = datetime_save[:8] - time_save = str(int(datetime_save[9:11])) - - year = date_save[:4] - month = str(int(date_save[4:6])) - day = str(int(date_save[6:8])) - date = year + "-" + month.zfill(2) + "-" + day.zfill(2) - partitions = os.path.join( - f"ano_particao={year}", - f"mes_particao={month}", - f"data_particao={date}", - f"hora_particao={time_save}", - ) folder_path = f"{os.getcwd()}/temp/treated/{product}/" # cria pasta de partições se elas não existem output_path = os.path.join(os.getcwd(), "temp", "output", mode_redis, product) - partitions_path = os.path.join(output_path, partitions) - - if not os.path.exists(partitions_path): - os.makedirs(partitions_path) # Loop through all NetCDF files in the folder data = pd.DataFrame() @@ -469,17 +458,29 @@ def save_data_in_file( data["horario"] = pendulum.from_format( datetime_save, "YYYYMMDD HHmmss" ).to_time_string() + data["data_medicao"] = pendulum.from_format( + datetime_save, "YYYYMMDD HHmmss" + ).to_date_string() - print(f"Final df: {data.head()}") # Fixa ordem das colunas - data = data[["longitude", "latitude", "horario"] + [i.lower() for i in variable]] - print("cols", data.columns) + data = data[ + ["longitude", "latitude", "data_medicao", "horario"] + + [i.lower() for i in variable] + ] + print(f"Final df: {data.head()}") file_name = files[0].split("_variable-")[0] print(f"\n\n[DEGUB]: Saving {file_name} on {output_path}\n\n") - print(f"Data_save: {date_save}, time_save: {time_save}") - # log(f"\n\n[DEGUB]: Saving {file_name} on {parquet_path}\n\n") - # log(f"Data_save: {date_save}, time_save: {time_save}") - file_path = os.path.join(partitions_path, f"{file_name}.csv") - data.to_csv(file_path, index=False) + + partition_column = "data_medicao" + data, partitions = parse_date_columns(data, partition_column) + print(f"\n\n[DEGUB]: Partitions {partitions}\n\n") + print(f"Final df: {data.head()}") + + to_partitions( + data=data, + partition_columns=partitions, + savepath=output_path, + data_type="parquet", + ) return output_path diff --git a/pipelines/rj_cor/meteorologia/satelite/tasks.py b/pipelines/rj_cor/meteorologia/satelite/tasks.py index 4eff0ff97..7d25e9be1 100644 --- a/pipelines/rj_cor/meteorologia/satelite/tasks.py +++ b/pipelines/rj_cor/meteorologia/satelite/tasks.py @@ -237,7 +237,7 @@ def save_data(info: dict, mode_redis: str = "prod") -> Union[str, Path]: Concat all netcdf data and save partitioned by date on a csv """ - log("Start saving product on a csv") + log("Start saving product on a file") output_path = save_data_in_file( product=info["product"], variable=info["variable"], diff --git a/pipelines/utils/tasks.py b/pipelines/utils/tasks.py index 15ea61f87..71568cad2 100644 --- a/pipelines/utils/tasks.py +++ b/pipelines/utils/tasks.py @@ -152,6 +152,7 @@ def create_table_and_upload_to_gcs( dataset_id: str, table_id: str, dump_mode: str, + data_type: str = "csv", biglake_table: bool = True, wait=None, # pylint: disable=unused-argument ) -> None: @@ -190,15 +191,15 @@ def create_table_and_upload_to_gcs( else: # the header is needed to create a table when dosen't exist log("MODE APPEND: Table DOSEN'T EXISTS\nStart to CREATE HEADER file") - header_path = dump_header_to_file(data_path=data_path) + header_path = dump_header_to_file(data_path=data_path, data_type=data_type) log("MODE APPEND: Created HEADER file:\n" f"{header_path}") tb.create( path=header_path, if_storage_data_exists="replace", if_table_exists="replace", - biglake_table=biglake_table, dataset_is_public=dataset_is_public, + source_format=data_type, ) log(