From 9759d9bb9ef9fbbebb09a6bbb785023a74a926ab Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Mon, 13 Nov 2023 14:19:44 -0300 Subject: [PATCH 01/17] chore: add pipeline structure --- pipelines/rj_escritorio/__init__.py | 1 + .../flooding_detection/__init__.py | 0 .../rj_escritorio/flooding_detection/flows.py | 83 +++++++++ .../flooding_detection/schedules.py | 25 +++ .../rj_escritorio/flooding_detection/tasks.py | 169 ++++++++++++++++++ 5 files changed, 278 insertions(+) create mode 100644 pipelines/rj_escritorio/flooding_detection/__init__.py create mode 100644 pipelines/rj_escritorio/flooding_detection/flows.py create mode 100644 pipelines/rj_escritorio/flooding_detection/schedules.py create mode 100644 pipelines/rj_escritorio/flooding_detection/tasks.py diff --git a/pipelines/rj_escritorio/__init__.py b/pipelines/rj_escritorio/__init__.py index a5c864245..0b8ffcfd6 100644 --- a/pipelines/rj_escritorio/__init__.py +++ b/pipelines/rj_escritorio/__init__.py @@ -9,6 +9,7 @@ from pipelines.rj_escritorio.dados_mestres_dump_datario.flows import * from pipelines.rj_escritorio.data_catalog.flows import * from pipelines.rj_escritorio.dummy_predict.flows import * +from pipelines.rj_escritorio.flooding_detection.flows import * from pipelines.rj_escritorio.notify_flooding.flows import * from pipelines.rj_escritorio.template_pipeline.flows import * from pipelines.rj_escritorio.tweets_flamengo.flows import * diff --git a/pipelines/rj_escritorio/flooding_detection/__init__.py b/pipelines/rj_escritorio/flooding_detection/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pipelines/rj_escritorio/flooding_detection/flows.py b/pipelines/rj_escritorio/flooding_detection/flows.py new file mode 100644 index 000000000..5f5a048e9 --- /dev/null +++ b/pipelines/rj_escritorio/flooding_detection/flows.py @@ -0,0 +1,83 @@ +# -*- coding: utf-8 -*- +""" +Flow definition for generating a data catalog from BigQuery. +""" +from prefect import Parameter +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS +from prefect.utilities.edges import unmapped + +from pipelines.constants import constants +from pipelines.rj_escritorio.flooding_detection.schedules import ( + update_flooding_data_schedule, +) +from pipelines.rj_escritorio.flooding_detection.tasks import ( + get_last_update, + get_openai_api_key, + get_prediction, + get_raining_hexagons, + get_snapshot, + pick_cameras, + update_flooding_api_data, +) +from pipelines.utils.decorators import Flow + +with Flow( + name="EMD: flooding_detection - Atualizar detecção de alagamento (IA) na API", + code_owners=[ + "gabriel", + "diego", + ], +) as rj_escritorio__flooding_detection__flow: + # Parameters + openai_flooding_detection_prompt = Parameter( + "openai_flooding_detection_prompt", required=True + ) + rain_api_data_url = Parameter( + "rain_api_url", + default="https://api.dados.rio/v2/clima_pluviometro/precipitacao_15min/", + ) + rain_api_update_url = Parameter( + "rain_api_update_url", + default="https://api.dados.rio/v2/clima_pluviometro/ultima_atualizacao_precipitacao_15min/", + ) + redis_key_predictions_buffer = Parameter( + "redis_key_predictions_buffer", default="flooding_detection_predictions_buffer" + ) + redis_key_flooding_detection_data = Parameter( + "redis_key_flooding_detection_data", default="flooding_detection_data" + ) + redis_key_flooding_detection_last_update = Parameter( + "redis_key_flooding_detection_last_update", + default="flooding_detection_last_update", + ) + + # Flow + hexagons = get_raining_hexagons(rain_api_data_url=rain_api_data_url) + last_update = get_last_update(rain_api_update_url=rain_api_update_url) + cameras = pick_cameras(hexagons=hexagons, last_update=last_update) + openai_api_key = get_openai_api_key() + images = get_snapshot.map( + camera=cameras, + ) + predictions = get_prediction.map( + image=images, + flooding_prompt=unmapped(openai_flooding_detection_prompt), + openai_api_key=unmapped(openai_api_key), + predictions_buffer_key=unmapped(redis_key_predictions_buffer), + ) + update_flooding_api_data( + predictions=predictions, + cameras=cameras, + images=images, + data_key=redis_key_flooding_detection_data, + last_update_key=redis_key_flooding_detection_last_update, + ) + + +rj_escritorio__flooding_detection__flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +rj_escritorio__flooding_detection__flow.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[constants.RJ_ESCRITORIO_AGENT_LABEL.value], +) +rj_escritorio__flooding_detection__flow.schedule = update_flooding_data_schedule diff --git a/pipelines/rj_escritorio/flooding_detection/schedules.py b/pipelines/rj_escritorio/flooding_detection/schedules.py new file mode 100644 index 000000000..c974eaac7 --- /dev/null +++ b/pipelines/rj_escritorio/flooding_detection/schedules.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +""" +Schedules for the data catalog pipeline. +""" + +from datetime import timedelta, datetime + +from prefect.schedules import Schedule +from prefect.schedules.clocks import IntervalClock +import pytz + +from pipelines.constants import constants + +update_flooding_data_schedule = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(minutes=3), + start_date=datetime(2023, 1, 1, tzinfo=pytz.timezone("America/Sao_Paulo")), + labels=[ + constants.RJ_ESCRITORIO_AGENT_LABEL.value, + ], + parameter_defaults={}, # TODO: Add parameters + ), + ] +) diff --git a/pipelines/rj_escritorio/flooding_detection/tasks.py b/pipelines/rj_escritorio/flooding_detection/tasks.py new file mode 100644 index 000000000..a5ef9e310 --- /dev/null +++ b/pipelines/rj_escritorio/flooding_detection/tasks.py @@ -0,0 +1,169 @@ +# -*- coding: utf-8 -*- +from datetime import datetime +from typing import Any, Dict, List, Union + +from prefect import task + + +@task +def get_last_update( + rain_api_update_url: str, +) -> datetime: + """ + Gets the last update datetime from the rain API. + + Args: + rain_api_update_url: The rain API update url. + + Returns: + The last update datetime. + """ + # TODO: Implement + raise NotImplementedError() + + +@task +def get_openai_api_key() -> str: + """ + Gets the OpenAI API key. + + Returns: + The OpenAI API key. + """ + # TODO: Implement + raise NotImplementedError() + + +@task +def get_prediction( + image: str, + flooding_prompt: str, + openai_api_key: str, + predictions_buffer_key: str, +) -> Dict[str, Union[str, float, bool]]: + """ + Gets the flooding detection prediction from OpenAI API. + + Args: + image: The image in base64 format. + flooding_prompt: The flooding prompt. + openai_api_key: The OpenAI API key. + predictions_buffer_key: The Redis key for the predictions buffer. + + Returns: + The prediction in the following format: + { + "object": "alagamento", + "label": True, + "confidence": 0.7, + } + """ + # TODO: Implement + raise NotImplementedError() + + +@task +def get_raining_hexagons( + rain_api_data_url: str, +) -> List[str]: + """ + Gets the raining hexagons from the rain API. + + Args: + rain_api_data_url: The rain API data url. + + Returns: + The raining hexagons. + """ + # TODO: Implement + raise NotImplementedError() + + +@task +def get_snapshot( + camera: Dict[str, Union[str, float]], +) -> str: + """ + Gets a snapshot from a camera. + + Args: + camera: The camera in the following format: + { + "id_camera": "1", + "url_camera": "rtsp://...", + "latitude": -22.912, + "longitude": -43.230, + } + + Returns: + The snapshot in base64 format. + """ + # TODO: Implement + raise NotImplementedError() + + +@task +def pick_cameras( + hexagons: List[str], + last_update: datetime, +) -> List[Dict[str, Union[str, float]]]: + """ + Picks cameras based on the raining hexagons and last update. + + Args: + hexagons: The H3 hexagons that are raining. + last_update: The last update datetime. + + Returns: + A list of cameras in the following format: + [ + { + "id_camera": "1", + "url_camera": "rtsp://...", + "latitude": -22.912, + "longitude": -43.230, + }, + ... + ] + """ + # TODO: Implement + raise NotImplementedError() + + +@task +def update_flooding_api_data( + prediction: List[Dict[str, Union[str, float, bool]]], + cameras: List[Dict[str, Union[str, float]]], + images: List[str], + data_key: str, + last_update_key: str, +) -> None: + """ + Updates Redis keys with flooding detection data and last update datetime (now). + + Args: + prediction: The AI predictions in the following format: + [ + { + "object": "alagamento", + "label": True, + "confidence": 0.7, + }, + ... + ] + cameras: A list of cameras in the following format: + [ + { + "id_camera": "1", + "url_camera": "rtsp://...", + "latitude": -22.912, + "longitude": -43.230, + }, + ... + ] + images: A list of images in base64 format. + data_key: The Redis key for the flooding detection data. + last_update_key: The Redis key for the last update datetime. + """ + # TODO: Implement + raise NotImplementedError() From b51f06f5803e0121b6846c05fd80df5bfdfaacc4 Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Mon, 13 Nov 2023 14:25:31 -0300 Subject: [PATCH 02/17] chore: add parameter for new business rule --- pipelines/rj_escritorio/flooding_detection/tasks.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pipelines/rj_escritorio/flooding_detection/tasks.py b/pipelines/rj_escritorio/flooding_detection/tasks.py index a5ef9e310..e95ce3b91 100644 --- a/pipelines/rj_escritorio/flooding_detection/tasks.py +++ b/pipelines/rj_escritorio/flooding_detection/tasks.py @@ -106,6 +106,7 @@ def get_snapshot( def pick_cameras( hexagons: List[str], last_update: datetime, + predictions_buffer_key: str, ) -> List[Dict[str, Union[str, float]]]: """ Picks cameras based on the raining hexagons and last update. @@ -113,6 +114,7 @@ def pick_cameras( Args: hexagons: The H3 hexagons that are raining. last_update: The last update datetime. + predictions_buffer_key: The Redis key for the predictions buffer. Returns: A list of cameras in the following format: From 002caeed2c7387b410488efcd2516e6e04d246fd Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Mon, 13 Nov 2023 18:27:02 -0300 Subject: [PATCH 03/17] feat: implement few tasks --- .../rj_escritorio/flooding_detection/flows.py | 9 +++++++-- .../rj_escritorio/flooding_detection/tasks.py | 18 ++++++++++++------ 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/pipelines/rj_escritorio/flooding_detection/flows.py b/pipelines/rj_escritorio/flooding_detection/flows.py index 5f5a048e9..94b0a4e10 100644 --- a/pipelines/rj_escritorio/flooding_detection/flows.py +++ b/pipelines/rj_escritorio/flooding_detection/flows.py @@ -30,6 +30,7 @@ ], ) as rj_escritorio__flooding_detection__flow: # Parameters + openai_api_key_secret_path = Parameter("openai_api_key_secret_path", required=True) openai_flooding_detection_prompt = Parameter( "openai_flooding_detection_prompt", required=True ) @@ -55,8 +56,12 @@ # Flow hexagons = get_raining_hexagons(rain_api_data_url=rain_api_data_url) last_update = get_last_update(rain_api_update_url=rain_api_update_url) - cameras = pick_cameras(hexagons=hexagons, last_update=last_update) - openai_api_key = get_openai_api_key() + cameras = pick_cameras( + hexagons=hexagons, + last_update=last_update, + predictions_buffer_key=redis_key_predictions_buffer, + ) + openai_api_key = get_openai_api_key(secret_path=openai_api_key_secret_path) images = get_snapshot.map( camera=cameras, ) diff --git a/pipelines/rj_escritorio/flooding_detection/tasks.py b/pipelines/rj_escritorio/flooding_detection/tasks.py index e95ce3b91..412132e36 100644 --- a/pipelines/rj_escritorio/flooding_detection/tasks.py +++ b/pipelines/rj_escritorio/flooding_detection/tasks.py @@ -1,8 +1,11 @@ # -*- coding: utf-8 -*- from datetime import datetime -from typing import Any, Dict, List, Union +from typing import Dict, List, Union from prefect import task +import requests + +from pipelines.utils.utils import get_vault_secret @task @@ -18,20 +21,23 @@ def get_last_update( Returns: The last update datetime. """ - # TODO: Implement - raise NotImplementedError() + data = requests.get(rain_api_update_url).text + return datetime.strptime(data, "%d/%m/%Y %H:%M:%S") @task -def get_openai_api_key() -> str: +def get_openai_api_key(secret_path: str) -> str: """ Gets the OpenAI API key. + Args: + secret_path: The secret path. + Returns: The OpenAI API key. """ - # TODO: Implement - raise NotImplementedError() + secret = get_vault_secret(secret_path)["data"] + return secret["api_key"] @task From 8a6d005ba6aad642207e267bae12e349ec36a9bf Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Tue, 14 Nov 2023 12:32:03 -0300 Subject: [PATCH 04/17] feat: implement few tasks --- .../rj_escritorio/flooding_detection/flows.py | 9 +- .../rj_escritorio/flooding_detection/tasks.py | 93 ++++++++++++++----- .../rj_escritorio/flooding_detection/utils.py | 39 ++++++++ poetry.lock | 54 ++++++++++- pyproject.toml | 1 + 5 files changed, 168 insertions(+), 28 deletions(-) create mode 100644 pipelines/rj_escritorio/flooding_detection/utils.py diff --git a/pipelines/rj_escritorio/flooding_detection/flows.py b/pipelines/rj_escritorio/flooding_detection/flows.py index 94b0a4e10..d353964fe 100644 --- a/pipelines/rj_escritorio/flooding_detection/flows.py +++ b/pipelines/rj_escritorio/flooding_detection/flows.py @@ -15,7 +15,6 @@ get_last_update, get_openai_api_key, get_prediction, - get_raining_hexagons, get_snapshot, pick_cameras, update_flooding_api_data, @@ -30,6 +29,10 @@ ], ) as rj_escritorio__flooding_detection__flow: # Parameters + cameras_geodf_url = Parameter( + "cameras_geodf_url", + required=True, + ) openai_api_key_secret_path = Parameter("openai_api_key_secret_path", required=True) openai_flooding_detection_prompt = Parameter( "openai_flooding_detection_prompt", required=True @@ -54,10 +57,10 @@ ) # Flow - hexagons = get_raining_hexagons(rain_api_data_url=rain_api_data_url) last_update = get_last_update(rain_api_update_url=rain_api_update_url) cameras = pick_cameras( - hexagons=hexagons, + rain_api_data_url=rain_api_data_url, + cameras_data_url=cameras_geodf_url, last_update=last_update, predictions_buffer_key=redis_key_predictions_buffer, ) diff --git a/pipelines/rj_escritorio/flooding_detection/tasks.py b/pipelines/rj_escritorio/flooding_detection/tasks.py index 412132e36..29cc935fc 100644 --- a/pipelines/rj_escritorio/flooding_detection/tasks.py +++ b/pipelines/rj_escritorio/flooding_detection/tasks.py @@ -1,11 +1,19 @@ # -*- coding: utf-8 -*- from datetime import datetime +from pathlib import Path from typing import Dict, List, Union +import geopandas as gpd +import numpy as np +import pandas as pd from prefect import task import requests -from pipelines.utils.utils import get_vault_secret +from pipelines.rj_escritorio.flooding_detection.utils import ( + download_file, + h3_id_to_polygon, +) +from pipelines.utils.utils import get_vault_secret, log @task @@ -22,6 +30,8 @@ def get_last_update( The last update datetime. """ data = requests.get(rain_api_update_url).text + data = data.strip('"') + log(f"Last update: {data}") return datetime.strptime(data, "%d/%m/%Y %H:%M:%S") @@ -68,23 +78,6 @@ def get_prediction( raise NotImplementedError() -@task -def get_raining_hexagons( - rain_api_data_url: str, -) -> List[str]: - """ - Gets the raining hexagons from the rain API. - - Args: - rain_api_data_url: The rain API data url. - - Returns: - The raining hexagons. - """ - # TODO: Implement - raise NotImplementedError() - - @task def get_snapshot( camera: Dict[str, Union[str, float]], @@ -110,7 +103,8 @@ def get_snapshot( @task def pick_cameras( - hexagons: List[str], + rain_api_data_url: str, + cameras_data_url: str, last_update: datetime, predictions_buffer_key: str, ) -> List[Dict[str, Union[str, float]]]: @@ -118,7 +112,7 @@ def pick_cameras( Picks cameras based on the raining hexagons and last update. Args: - hexagons: The H3 hexagons that are raining. + rain_api_data_url: The rain API data url. last_update: The last update datetime. predictions_buffer_key: The Redis key for the predictions buffer. @@ -134,13 +128,64 @@ def pick_cameras( ... ] """ - # TODO: Implement - raise NotImplementedError() + # TODO: + # - Must always pick cameras whose buffer contains flooding predictions + # Download the cameras data + cameras_data_path = Path("/tmp") / "cameras_geo_min.csv" + if not download_file(url=cameras_data_url, output_path=cameras_data_path): + raise RuntimeError("Failed to download the cameras data.") + df_cameras = gpd.read_file( + cameras_data_path, GEOM_POSSIBLE_NAMES="geometry", KEEP_GEOM_COLUMNS="NO" + ) + log("Successfully downloaded cameras data.") + log(f"Cameras shape: {df_cameras.shape}") + + # Get rain data + rain_data = requests.get(rain_api_data_url).json() + df_rain = pd.DataFrame(rain_data) + df_rain["last_update"] = last_update + df_rain = df_rain.rename(columns={"status": "status_chuva"}) + geometry = df_rain["id_h3"].apply(lambda h3_id: h3_id_to_polygon(h3_id)) + df_rain_geo = gpd.GeoDataFrame(df_rain, geometry=geometry) + df_rain_geo.crs = {"init": "epsg:4326"} + log("Successfully downloaded rain data.") + log(f"Rain data shape: {df_rain.shape}") + + # Join the dataframes + df_cameras_h3: gpd.GeoDataFrame = gpd.sjoin( + df_cameras, df_rain_geo, how="left", op="within" + ) + df_cameras_h3 = df_cameras_h3.drop(columns=["index_right"]) + df_cameras_h3 = df_cameras_h3[df_cameras_h3["id_h3"].notnull()] + log("Successfully joined the dataframes.") + log(f"Cameras H3 shape: {df_cameras_h3.shape}") + + # Pick cameras + mask = np.logical_not( + df_cameras_h3["status_chuva"].isin(["sem chuva", "chuva fraca"]) + ) + df_cameras_h3 = df_cameras_h3[mask] + log("Successfully picked cameras.") + log(f"Picked cameras shape: {df_cameras_h3.shape}") + + # Set output + output = [] + for _, row in df_cameras_h3.iterrows(): + output.append( + { + "id_camera": row["codigo"], + "url_camera": row["nome_da_camera"], + "latitude": row["geometry"].y, + "longitude": row["geometry"].x, + } + ) + log(f"Picked cameras: {output}") + return output @task def update_flooding_api_data( - prediction: List[Dict[str, Union[str, float, bool]]], + predictions: List[Dict[str, Union[str, float, bool]]], cameras: List[Dict[str, Union[str, float]]], images: List[str], data_key: str, @@ -150,7 +195,7 @@ def update_flooding_api_data( Updates Redis keys with flooding detection data and last update datetime (now). Args: - prediction: The AI predictions in the following format: + predictions: The AI predictions in the following format: [ { "object": "alagamento", diff --git a/pipelines/rj_escritorio/flooding_detection/utils.py b/pipelines/rj_escritorio/flooding_detection/utils.py new file mode 100644 index 000000000..c42e85882 --- /dev/null +++ b/pipelines/rj_escritorio/flooding_detection/utils.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +from pathlib import Path +from typing import Union + +import h3 +import requests +from shapely.geometry import Polygon + + +def download_file(url: str, output_path: Union[str, Path]) -> bool: + """ + Downloads a file from a URL. + + Args: + url: The URL. + output_path: The output path. + + Returns: + Whether the file was downloaded successfully. + """ + response = requests.get(url) + if response.status_code == 200: + with open(output_path, "wb") as f: + f.write(response.content) + return True + return False + + +def h3_id_to_polygon(h3_id: str): + """ + Converts an H3 ID to a Polygon. + + Args: + h3_id: The H3 ID. + + Returns: + The Polygon. + """ + return Polygon(h3.h3_to_geo_boundary(h3_id, geo_json=True)) diff --git a/poetry.lock b/poetry.lock index 1881c1107..c271b32bb 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2708,6 +2708,58 @@ files = [ {file = "h11-0.14.0.tar.gz", hash = "sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d"}, ] +[[package]] +name = "h3" +version = "3.7.6" +description = "Hierarchical hexagonal geospatial indexing system" +optional = false +python-versions = "*" +files = [ + {file = "h3-3.7.6-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:cd4a5103a86a7d98cffa3be77eb82080aa2e9d676bbd1661f3db9ecad7a4ef2b"}, + {file = "h3-3.7.6-cp27-cp27m-manylinux2010_x86_64.whl", hash = "sha256:231959dceb4cc4ae86fe4fe2c385b176ed81712549e787b889dfa66f583676df"}, + {file = "h3-3.7.6-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:b9de9da755c90bbc90d6c041396a20c91816cd86a0bafa3b8899681cfdc2c4c6"}, + {file = "h3-3.7.6-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:cda9a427b0de0d4069115ec765118888f180d0db915b5bc0dba52f5ae957b789"}, + {file = "h3-3.7.6-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:8bf1e080b9a47774754834e7f10155f3d2e3542bf895488a0519b2ae7d5b15db"}, + {file = "h3-3.7.6-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1d3b93e3f38eb6c8fc5051d1b289b74614fb5f2415d272fea18085dea260d6b0"}, + {file = "h3-3.7.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:783b2ca4448360c5a184fd43b84fc5554e3a8fd02738ff31349506189c5b4b49"}, + {file = "h3-3.7.6-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:3bae8b95f21f20f04141a35f15c8caa74f2046eb01ef49e35fc45e6a8edfc8df"}, + {file = "h3-3.7.6-cp310-cp310-win_amd64.whl", hash = "sha256:6ca9dd410e250d37f24a87c4ecb0615bb6d44a3f90eb5dbbf1b5e3d4489b8703"}, + {file = "h3-3.7.6-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:991ee991f2ae41f629feb1cd32fa677b8512c72696eb0ad94fcf359d61184b2e"}, + {file = "h3-3.7.6-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:fcbfff87d223279f8e38bbee3ebf52b1b96ae280e9e7de24674d3c284373d946"}, + {file = "h3-3.7.6-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:eddf10d1d2139b3ea3ad1618c2074e1c47d3d36bddb5359e4955f5fd0b089d93"}, + {file = "h3-3.7.6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:76abc02f14a8df42fb5d80e6045023fb756c49d3cb08d69a8ceb9362b95d4bec"}, + {file = "h3-3.7.6-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:fc8030968586a7810aa192397ad9a4f7d7a963f57c9b3e210fc38de0aa5c2533"}, + {file = "h3-3.7.6-cp311-cp311-win_amd64.whl", hash = "sha256:1bdc790d91138e781973dcaade5231db7fe8a876330939e0903f602acc4fb64c"}, + {file = "h3-3.7.6-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:198718ab20a06ebe52f0aaafc02469e4a51964e5ba7990c3cc1d2fc32e7a54d9"}, + {file = "h3-3.7.6-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:02faa911f2d8425c641a1f7c08f3dc9c10a5a3d81408832afa40748534b999c8"}, + {file = "h3-3.7.6-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1b4db92ceaeb9a51cc875c302cdc5e1aa27eed776d95943ee55c941bc2f219a3"}, + {file = "h3-3.7.6-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:4810ddb3d91411a6cbf87a28108fe31712f618ef223c349e1f6675602af2c473"}, + {file = "h3-3.7.6-cp36-cp36m-win_amd64.whl", hash = "sha256:211ef3317dcf7863e2d01a97ab6da319b8451d78cd1633dd28faaf69e66bc321"}, + {file = "h3-3.7.6-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:99b81620608378fc9910a5c630b0f17eb939156fa13e1adc55229d31f8c3f5ca"}, + {file = "h3-3.7.6-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:26f3175bd3ea3ee528dbf49308e7215a58351ce425e1c3a9838ae22526663311"}, + {file = "h3-3.7.6-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7d7b69015f5bab2525914fad370b96dc386437e19a14cfed3eb13868589263db"}, + {file = "h3-3.7.6-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:d0c2890fa10fa8695c020569c8d55da79e2c552a39533de4ae6991c7acb122e1"}, + {file = "h3-3.7.6-cp37-cp37m-win_amd64.whl", hash = "sha256:1cd4f07c49721023c5fef401a4de03c47000705dfd116579dc6b61cad821305d"}, + {file = "h3-3.7.6-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:f8d1db3dcd8f6ce7f54e061e6c9fbecbb5c3978b9e54e44af05a53787c4f99b3"}, + {file = "h3-3.7.6-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:495e37b1dee0ec46ccd88b278e571234b0d0d30648f161799d65a8d7f390b3f2"}, + {file = "h3-3.7.6-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b2e2c4808b7691b176c89ebf23c173b3b23dd4ce42f8f494b32c6e31ceee49af"}, + {file = "h3-3.7.6-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b58b1298bf1068704c6d9426749c8ae6021b53d982d5153cc4161c7042ecd810"}, + {file = "h3-3.7.6-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:a2872f695168c4700c73edd6eab9c6181387d7ea177de13b130ae61e613ff7de"}, + {file = "h3-3.7.6-cp38-cp38-win_amd64.whl", hash = "sha256:98c3951c3b67ca3de06ef70aa74a9752640a3eca9b4d68e0d5d8e4fc6fa72337"}, + {file = "h3-3.7.6-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:0724d4645c1da59e02b3305050c52b93ce1d8971d1d139433d464fcc103249a6"}, + {file = "h3-3.7.6-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:e1f6ec0f2246381ce3a7f72da1ce825a5474eb7c8fb25a2ea1f16c6606ce34a7"}, + {file = "h3-3.7.6-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1471ff4d3875b25b521eeec5c2b72abe27b8e6af10ab99b7da5c0de545b0e832"}, + {file = "h3-3.7.6-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eb96f2caae519d0ed17acde625af528476dca121b0336d3eb776429d40284ef6"}, + {file = "h3-3.7.6-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:b1b1bce0dee05175f8d422e50ffa1afacb9a7e78ae0963059aebfbef50e10175"}, + {file = "h3-3.7.6-cp39-cp39-win_amd64.whl", hash = "sha256:36ea935833c37fdfd7ffbfc000d7cd20addcdf67f30b26a6b9bccb9210b03704"}, + {file = "h3-3.7.6.tar.gz", hash = "sha256:9bbd3dbac99532fa521d7d2e288ff55877bea3223b070f659ed7b5f8f1f213eb"}, +] + +[package.extras] +all = ["flake8", "numpy", "pylint", "pytest", "pytest-cov"] +numpy = ["numpy"] +test = ["flake8", "pylint", "pytest", "pytest-cov"] + [[package]] name = "h5py" version = "3.9.0" @@ -7213,4 +7265,4 @@ testing = ["coverage (>=5.0.3)", "zope.event", "zope.testing"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<3.11" -content-hash = "26bc9f8eea3e12980a49451806dbc243c4ac6bc940bef5dd5f3e1f758f8c0e31" +content-hash = "b788af3d35ff07ac9dcbda64936ef602c4196038307f0c602a47f5ce1c3d363d" diff --git a/pyproject.toml b/pyproject.toml index c861bcc93..f08195f90 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,6 +63,7 @@ psycopg2-binary = "^2.9.9" azure-storage-blob = "^12.17.0" icecream = "^2.1.3" pyodbc = "^5.0.1" +h3 = "^3.7.6" [tool.poetry.dev-dependencies] pylint = "^2.12.2" From bdd39dc0924875df33a4e730215ce310a6ff20f0 Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Tue, 14 Nov 2023 12:47:58 -0300 Subject: [PATCH 05/17] feat: implement few tasks --- .../rj_escritorio/flooding_detection/flows.py | 9 +++ .../rj_escritorio/flooding_detection/tasks.py | 63 +++++++++++++++++-- 2 files changed, 68 insertions(+), 4 deletions(-) diff --git a/pipelines/rj_escritorio/flooding_detection/flows.py b/pipelines/rj_escritorio/flooding_detection/flows.py index d353964fe..01b56c197 100644 --- a/pipelines/rj_escritorio/flooding_detection/flows.py +++ b/pipelines/rj_escritorio/flooding_detection/flows.py @@ -33,6 +33,12 @@ "cameras_geodf_url", required=True, ) + openai_api_max_tokens = Parameter("openai_api_max_tokens", default=300) + openai_api_model = Parameter("openai_api_model", default="gpt-4-vision-preview") + openai_api_url = Parameter( + "openai_api_url", + default="https://api.openai.com/v1/chat/completions", + ) openai_api_key_secret_path = Parameter("openai_api_key_secret_path", required=True) openai_flooding_detection_prompt = Parameter( "openai_flooding_detection_prompt", required=True @@ -72,7 +78,10 @@ image=images, flooding_prompt=unmapped(openai_flooding_detection_prompt), openai_api_key=unmapped(openai_api_key), + openai_api_model=unmapped(openai_api_model), predictions_buffer_key=unmapped(redis_key_predictions_buffer), + openai_api_max_tokens=unmapped(openai_api_max_tokens), + openai_api_url=unmapped(openai_api_url), ) update_flooding_api_data( predictions=predictions, diff --git a/pipelines/rj_escritorio/flooding_detection/tasks.py b/pipelines/rj_escritorio/flooding_detection/tasks.py index 29cc935fc..034135141 100644 --- a/pipelines/rj_escritorio/flooding_detection/tasks.py +++ b/pipelines/rj_escritorio/flooding_detection/tasks.py @@ -1,11 +1,16 @@ # -*- coding: utf-8 -*- +import base64 from datetime import datetime +import io +import json from pathlib import Path from typing import Dict, List, Union +import cv2 import geopandas as gpd import numpy as np import pandas as pd +from PIL import Image from prefect import task import requests @@ -55,7 +60,10 @@ def get_prediction( image: str, flooding_prompt: str, openai_api_key: str, + openai_api_model: str, predictions_buffer_key: str, + openai_api_max_tokens: int = 300, + openai_api_url: str = "https://api.openai.com/v1/chat/completions", ) -> Dict[str, Union[str, float, bool]]: """ Gets the flooding detection prediction from OpenAI API. @@ -74,8 +82,45 @@ def get_prediction( "confidence": 0.7, } """ - # TODO: Implement - raise NotImplementedError() + # TODO: + # - Add confidence value + # Setup the request + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {openai_api_key}", + } + payload = { + "model": openai_api_model, + "messages": [ + { + "role": "user", + "content": [ + { + "type": "text", + "text": flooding_prompt, + }, + { + "type": "image_url", + "image_url": {"url": f"data:image/jpeg;base64,{image}"}, + }, + ], + } + ], + "max_tokens": openai_api_max_tokens, + } + response = requests.post(openai_api_url, headers=headers, json=payload) + data: dict = response.json() + if data.get("error"): + raise RuntimeError(f"Failed to get prediction: {data['error']}") + content: str = data["choices"][0]["message"]["content"] + json_string = content.replace("```json\n", "").replace("\n```", "") + json_object = json.loads(json_string) + flooding_detected = json_object["flooding_detected"] + return { + "object": "alagamento", + "label": flooding_detected, + "confidence": 0.7, + } @task @@ -97,8 +142,18 @@ def get_snapshot( Returns: The snapshot in base64 format. """ - # TODO: Implement - raise NotImplementedError() + rtsp_url = camera["url_camera"] + cap = cv2.VideoCapture(rtsp_url) + ret, frame = cap.read() + if not ret: + raise RuntimeError(f"Failed to get snapshot from URL {rtsp_url}.") + cap.release() + img = Image.fromarray(frame) + buffer = io.BytesIO() + img.save(buffer, format="JPEG") + img_b64 = base64.b64encode(buffer.getvalue()).decode("utf-8") + log(f"Successfully got snapshot from URL {rtsp_url}.") + return img_b64 @task From eabd2aeb81ae9cda7ef2b945414948fe4edfeecf Mon Sep 17 00:00:00 2001 From: d116626 Date: Tue, 14 Nov 2023 13:05:30 -0300 Subject: [PATCH 06/17] feat: add cleanup cameras code to utils --- .../rj_escritorio/flooding_detection/utils.py | 179 +++++++++++++++++- 1 file changed, 177 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_escritorio/flooding_detection/utils.py b/pipelines/rj_escritorio/flooding_detection/utils.py index c42e85882..91ff741e0 100644 --- a/pipelines/rj_escritorio/flooding_detection/utils.py +++ b/pipelines/rj_escritorio/flooding_detection/utils.py @@ -1,10 +1,15 @@ # -*- coding: utf-8 -*- from pathlib import Path -from typing import Union +from typing import Any, Dict, Optional, Union +import geopandas as gpd import h3 +import pandas as pd import requests -from shapely.geometry import Polygon +from shapely.geometry import Point, Polygon + + +from utils.utils import remove_columns_accents def download_file(url: str, output_path: Union[str, Path]) -> bool: @@ -37,3 +42,173 @@ def h3_id_to_polygon(h3_id: str): The Polygon. """ return Polygon(h3.h3_to_geo_boundary(h3_id, geo_json=True)) + + +def extract_data(row: Dict[str, Any]) -> pd.Series: + """ + Extracts username, password, and path from a given row with camera data. + + Parameters: + - row (Dict[str, Any]): A dictionary representing a row of camera data. + Expected keys are 'rtsp' and 'ip'. + + Returns: + - pd.Series: A pandas Series containing extracted 'username', 'password', + and 'path' information. + """ + + try: + rtsp = row["rtsp"] + # Remove protocol + rtsp = rtsp.replace("rtsp://", "").replace("rtsp:/", "") + # If we have an "@" in the URL, we have username and password + if "@" in rtsp: + # Extract username and password + username_password = rtsp.split("@")[0].split(":") + if len(username_password) == 2: + username = username_password[0] + password = username_password[1] + else: + print(username_password) + raise Exception("Why???") + # Remove username and password from rtsp + rtsp = rtsp.split("@")[1] + else: + username = None + password = None + # Extract path + path = "/".join(rtsp.split("/")[1:]) + # Return the data + return pd.Series( + { + "username": username, + "password": password, + "path": path, + } + ) + except Exception as exc: + print(row["rtsp"]) + raise exc + + +def build_rtsp(row: pd.Series) -> str: + """ + Builds a complete RTSP URL from the given row data. + + Parameters: + - row (pd.Series): A pandas Series containing 'username', 'password', 'path', and 'ip'. + + Returns: + - str: The complete RTSP URL. + """ + username = row["username"] + password = row["password"] + path = row["path"] + ip = row["ip"] + # If we have username and password, add them to the URL + if username and password: + return f"rtsp://{username}:{password}@{ip}/{path}" + else: + return f"rtsp://{ip}/{path}" + + +def get_rain_dataframe() -> pd.DataFrame: + """ + Fetches and returns rainfall data from a specified API. + + Returns: + - pd.DataFrame: A pandas DataFrame containing the rainfall data. + """ + api_url = "https://api.dados.rio/v2/clima_pluviometro/precipitacao_15min/" + data = requests.get(api_url).json() + df_rain = pd.DataFrame(data) + + last_update_url = "https://api.dados.rio/v2/clima_pluviometro/ultima_atualizacao_precipitacao_15min/" # noqa + last_update = requests.get(last_update_url).json() + df_rain["last_update"] = last_update + df_rain["last_update"] = pd.to_datetime(df_rain["last_update"]) + + return df_rain + + +def get_cameras_h3(df: pd.DataFrame) -> gpd.GeoDataFrame: + """ + Enhances camera data with geographical information and joins it with rainfall data. + + Parameters: + - df (pd.DataFrame): A DataFrame containing camera data. + + Returns: + - gpd.GeoDataFrame: A GeoDataFrame containing the joined camera and rainfall data. + """ + cameras = df.copy() + geometry = [Point(xy) for xy in zip(cameras["longitude"], cameras["latitude"])] + cameras_geo = gpd.GeoDataFrame(cameras, geometry=geometry) + cameras_geo.crs = {"init": "epsg:4326"} + + pluviometro = get_rain_dataframe() + pluviometro = pluviometro.rename(columns={"status": "status_chuva"}) + geometry = pluviometro["id_h3"].apply(lambda h3_id: h3_id_to_polygon(h3_id)) + pluviometro_geo = gpd.GeoDataFrame(pluviometro, geometry=geometry) + pluviometro_geo.crs = {"init": "epsg:4326"} + print("pluviometro_geo:", pluviometro_geo.shape) + + cameras_h3 = gpd.sjoin(cameras_geo, pluviometro_geo, how="left", op="within") + cameras_h3 = cameras_h3.drop(columns=["index_right"]) + cameras_h3 = cameras_h3[cameras_h3["id_h3"].notnull()] + + return cameras_h3 + + +def clean_and_padronize_cameras() -> gpd.GeoDataFrame: + """ + Cleans and standardizes camera data from a CSV file, then merges it with geographical data. + + Returns: + - gpd.GeoDataFrame: A GeoDataFrame containing the cleaned, standardized, and geographically + enriched camera data. + """ + df = pd.read_csv( + "./data/Cameras_em_2023-11-13.csv", delimiter=";", encoding="latin1" + ) + df.columns = remove_columns_accents(df) + df["codigo"] = df["codigo"].str.replace("'", "") + df = df[df["status"] == "Online"] + df = df[df["rtsp"].str.startswith("rtsp")] + + df["ip_in_rtsp"] = df.apply(lambda row: row["ip"] in row["rtsp"], axis=1) + df[~df["ip_in_rtsp"]] + + df["ip"] = df["ip"].replace("10.151.48.04", "10.151.48.4") + df["ip_in_rtsp"] = df.apply(lambda row: row["ip"] in row["rtsp"], axis=1) + df[~df["ip_in_rtsp"]] + + df[["username", "password", "path"]] = df.apply(extract_data, axis=1) + df["rtsp"] = df.apply(build_rtsp, axis=1) + # Filter out by subnet + df = df[ + df["ip"].str.startswith("10.10") + | df["ip"].str.startswith("10.151") + | df["ip"].str.startswith("10.152") + | df["ip"].str.startswith("10.153") + | df["ip"].str.startswith("10.50") + | df["ip"].str.startswith("10.52") + ] + + cameras_h3 = get_cameras_h3(df) + cols = [ + "codigo", + "nome_da_camera", + "rtsp", + "latitude", + "longitude", + "geometry", + "id_h3", + ] + cameras_h3 = cameras_h3[cols] + + cameras_h3 = cameras_h3.rename( + columns={"codigo": "id_camera", "nome_da_camera": "nome"} + ) + + return cameras_h3.reset_index(drop=True) From 8f24a6c13e0b18ca50f301c806ea26dc0e229be5 Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Tue, 14 Nov 2023 13:08:58 -0300 Subject: [PATCH 07/17] merge --- .../rj_escritorio/flooding_detection/flows.py | 1 + .../rj_escritorio/flooding_detection/tasks.py | 2 ++ .../rj_escritorio/flooding_detection/utils.py | 16 +++++++++++++++- 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_escritorio/flooding_detection/flows.py b/pipelines/rj_escritorio/flooding_detection/flows.py index 01b56c197..2ac6390e3 100644 --- a/pipelines/rj_escritorio/flooding_detection/flows.py +++ b/pipelines/rj_escritorio/flooding_detection/flows.py @@ -89,6 +89,7 @@ images=images, data_key=redis_key_flooding_detection_data, last_update_key=redis_key_flooding_detection_last_update, + predictions_buffer_key=redis_key_predictions_buffer, ) diff --git a/pipelines/rj_escritorio/flooding_detection/tasks.py b/pipelines/rj_escritorio/flooding_detection/tasks.py index 034135141..8133a5e95 100644 --- a/pipelines/rj_escritorio/flooding_detection/tasks.py +++ b/pipelines/rj_escritorio/flooding_detection/tasks.py @@ -245,6 +245,7 @@ def update_flooding_api_data( images: List[str], data_key: str, last_update_key: str, + predictions_buffer_key: str, ) -> None: """ Updates Redis keys with flooding detection data and last update datetime (now). @@ -272,6 +273,7 @@ def update_flooding_api_data( images: A list of images in base64 format. data_key: The Redis key for the flooding detection data. last_update_key: The Redis key for the last update datetime. + predictions_buffer_key: The Redis key for the predictions buffer. """ # TODO: Implement raise NotImplementedError() diff --git a/pipelines/rj_escritorio/flooding_detection/utils.py b/pipelines/rj_escritorio/flooding_detection/utils.py index 91ff741e0..fa4bb7e11 100644 --- a/pipelines/rj_escritorio/flooding_detection/utils.py +++ b/pipelines/rj_escritorio/flooding_detection/utils.py @@ -1,10 +1,11 @@ # -*- coding: utf-8 -*- from pathlib import Path -from typing import Any, Dict, Optional, Union +from typing import Any, Dict, List, Optional, Union import geopandas as gpd import h3 import pandas as pd +from redis_pal import RedisPal import requests from shapely.geometry import Point, Polygon @@ -212,3 +213,16 @@ def clean_and_padronize_cameras() -> gpd.GeoDataFrame: ) return cameras_h3.reset_index(drop=True) + + +def redis_get_prediction_buffer(key: str, len: int = 3) -> List[bool]: + """ + Gets the prediction buffer from Redis. + + Args: + key: The Redis key. + len: The length of the buffer. + + Returns: + The prediction buffer. + """ From 7fd27a2373ff11cc8c71f361a3c9e31cba252833 Mon Sep 17 00:00:00 2001 From: d116626 Date: Tue, 14 Nov 2023 13:19:55 -0300 Subject: [PATCH 08/17] chore: refactor pick_cametas for the cleaned file --- .../rj_escritorio/flooding_detection/tasks.py | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/pipelines/rj_escritorio/flooding_detection/tasks.py b/pipelines/rj_escritorio/flooding_detection/tasks.py index 034135141..9d0c1fb60 100644 --- a/pipelines/rj_escritorio/flooding_detection/tasks.py +++ b/pipelines/rj_escritorio/flooding_detection/tasks.py @@ -13,10 +13,10 @@ from PIL import Image from prefect import task import requests +from shapely.geometry import Point from pipelines.rj_escritorio.flooding_detection.utils import ( download_file, - h3_id_to_polygon, ) from pipelines.utils.utils import get_vault_secret, log @@ -189,9 +189,11 @@ def pick_cameras( cameras_data_path = Path("/tmp") / "cameras_geo_min.csv" if not download_file(url=cameras_data_url, output_path=cameras_data_path): raise RuntimeError("Failed to download the cameras data.") - df_cameras = gpd.read_file( - cameras_data_path, GEOM_POSSIBLE_NAMES="geometry", KEEP_GEOM_COLUMNS="NO" - ) + cameras = pd.read_csv("cameras.csv") + cameras = cameras.drop(columns=["geometry"]) + geometry = [Point(xy) for xy in zip(cameras["longitude"], cameras["latitude"])] + df_cameras = gpd.GeoDataFrame(cameras, geometry=geometry) + df_cameras.crs = {"init": "epsg:4326"} log("Successfully downloaded cameras data.") log(f"Cameras shape: {df_cameras.shape}") @@ -199,19 +201,11 @@ def pick_cameras( rain_data = requests.get(rain_api_data_url).json() df_rain = pd.DataFrame(rain_data) df_rain["last_update"] = last_update - df_rain = df_rain.rename(columns={"status": "status_chuva"}) - geometry = df_rain["id_h3"].apply(lambda h3_id: h3_id_to_polygon(h3_id)) - df_rain_geo = gpd.GeoDataFrame(df_rain, geometry=geometry) - df_rain_geo.crs = {"init": "epsg:4326"} log("Successfully downloaded rain data.") log(f"Rain data shape: {df_rain.shape}") # Join the dataframes - df_cameras_h3: gpd.GeoDataFrame = gpd.sjoin( - df_cameras, df_rain_geo, how="left", op="within" - ) - df_cameras_h3 = df_cameras_h3.drop(columns=["index_right"]) - df_cameras_h3 = df_cameras_h3[df_cameras_h3["id_h3"].notnull()] + df_cameras_h3 = pd.merge(df_cameras, df_rain, how="left", on="id_h3") log("Successfully joined the dataframes.") log(f"Cameras H3 shape: {df_cameras_h3.shape}") From 69e9cb152dcf46247e0013fffa39e4a58e622813 Mon Sep 17 00:00:00 2001 From: d116626 Date: Tue, 14 Nov 2023 13:22:53 -0300 Subject: [PATCH 09/17] fix: column name --- pipelines/rj_escritorio/flooding_detection/tasks.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pipelines/rj_escritorio/flooding_detection/tasks.py b/pipelines/rj_escritorio/flooding_detection/tasks.py index 6d1052b59..e7f460f1c 100644 --- a/pipelines/rj_escritorio/flooding_detection/tasks.py +++ b/pipelines/rj_escritorio/flooding_detection/tasks.py @@ -210,9 +210,7 @@ def pick_cameras( log(f"Cameras H3 shape: {df_cameras_h3.shape}") # Pick cameras - mask = np.logical_not( - df_cameras_h3["status_chuva"].isin(["sem chuva", "chuva fraca"]) - ) + mask = np.logical_not(df_cameras_h3["status"].isin(["sem chuva", "chuva fraca"])) df_cameras_h3 = df_cameras_h3[mask] log("Successfully picked cameras.") log(f"Picked cameras shape: {df_cameras_h3.shape}") From 175fee87fad7b6fd2d43aa5a9bc1c5e1c4bbb528 Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Tue, 14 Nov 2023 13:32:24 -0300 Subject: [PATCH 10/17] feat: add task for updating api data --- .../rj_escritorio/flooding_detection/tasks.py | 51 +++++++++++++++++-- .../rj_escritorio/flooding_detection/utils.py | 34 +++++++++++-- 2 files changed, 77 insertions(+), 8 deletions(-) diff --git a/pipelines/rj_escritorio/flooding_detection/tasks.py b/pipelines/rj_escritorio/flooding_detection/tasks.py index e7f460f1c..338561588 100644 --- a/pipelines/rj_escritorio/flooding_detection/tasks.py +++ b/pipelines/rj_escritorio/flooding_detection/tasks.py @@ -10,6 +10,7 @@ import geopandas as gpd import numpy as np import pandas as pd +import pendulum from PIL import Image from prefect import task import requests @@ -17,8 +18,9 @@ from pipelines.rj_escritorio.flooding_detection.utils import ( download_file, + redis_add_to_prediction_buffer, ) -from pipelines.utils.utils import get_vault_secret, log +from pipelines.utils.utils import get_redis_client, get_vault_secret, log @task @@ -189,7 +191,7 @@ def pick_cameras( cameras_data_path = Path("/tmp") / "cameras_geo_min.csv" if not download_file(url=cameras_data_url, output_path=cameras_data_path): raise RuntimeError("Failed to download the cameras data.") - cameras = pd.read_csv("cameras.csv") + cameras = pd.read_csv(cameras_data_path) cameras = cameras.drop(columns=["geometry"]) geometry = [Point(xy) for xy in zip(cameras["longitude"], cameras["latitude"])] df_cameras = gpd.GeoDataFrame(cameras, geometry=geometry) @@ -267,5 +269,46 @@ def update_flooding_api_data( last_update_key: The Redis key for the last update datetime. predictions_buffer_key: The Redis key for the predictions buffer. """ - # TODO: Implement - raise NotImplementedError() + # Build API data + last_update = pendulum.now(tz="America/Sao_Paulo") + api_data = [] + for prediction, camera, image in zip(predictions, cameras, images): + # Get AI classifications + ai_classification = [] + current_prediction = prediction["label"] + predictions_buffer_camera_key = ( + f"{predictions_buffer_key}_{camera['id_camera']}" + ) + predictions_buffer = redis_add_to_prediction_buffer( + predictions_buffer_camera_key, current_prediction + ) + # Get most common prediction + most_common_prediction = max( + set(predictions_buffer), key=predictions_buffer.count + ) + # Add classifications + if most_common_prediction: + ai_classification.append( + { + "object": "alagamento", + "label": True, + "confidence": 0.7, + } + ) + api_data.append( + { + "datetime": last_update.to_datetime_string(), + "id_camera": camera["id_camera"], + "url_camera": camera["url_camera"], + "latitude": camera["latitude"], + "longitude": camera["longitude"], + "image_base64": image, + "ai_classification": ai_classification, + } + ) + + # Update API data + redis_client = get_redis_client() + redis_client.set(data_key, json.dumps(api_data)) + redis_client.set(last_update_key, last_update.to_datetime_string()) + log("Successfully updated flooding detection data.") diff --git a/pipelines/rj_escritorio/flooding_detection/utils.py b/pipelines/rj_escritorio/flooding_detection/utils.py index fa4bb7e11..b6e570e65 100644 --- a/pipelines/rj_escritorio/flooding_detection/utils.py +++ b/pipelines/rj_escritorio/flooding_detection/utils.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- from pathlib import Path -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Union import geopandas as gpd import h3 @@ -9,8 +9,7 @@ import requests from shapely.geometry import Point, Polygon - -from utils.utils import remove_columns_accents +from pipelines.utils.utils import get_redis_client, remove_columns_accents def download_file(url: str, output_path: Union[str, Path]) -> bool: @@ -215,7 +214,24 @@ def clean_and_padronize_cameras() -> gpd.GeoDataFrame: return cameras_h3.reset_index(drop=True) -def redis_get_prediction_buffer(key: str, len: int = 3) -> List[bool]: +def redis_add_to_prediction_buffer(key: str, value: bool, len_: int = 3) -> List[bool]: + """ + Adds a value to the prediction buffer in Redis. + + Args: + key: The Redis key. + value: The value to be added. + len: The length of the buffer. + """ + prediction_buffer = redis_get_prediction_buffer(key, len_) + prediction_buffer.append(value) + prediction_buffer = prediction_buffer[-len_:] + redis_client: RedisPal = get_redis_client() + redis_client.set(key, prediction_buffer) + return prediction_buffer + + +def redis_get_prediction_buffer(key: str, len_: int = 3) -> List[bool]: """ Gets the prediction buffer from Redis. @@ -226,3 +242,13 @@ def redis_get_prediction_buffer(key: str, len: int = 3) -> List[bool]: Returns: The prediction buffer. """ + redis_client: RedisPal = get_redis_client() + prediction_buffer = redis_client.get(key) + if prediction_buffer is None: + return [False] * len_ + elif not isinstance(prediction_buffer, list): + return [False] * len_ + elif len(prediction_buffer) < len_: + diff = len_ - len(prediction_buffer) + return [False] * diff + prediction_buffer + return prediction_buffer From f50b0f704755c535ab3e437e443bd5f25324469f Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Tue, 14 Nov 2023 13:59:26 -0300 Subject: [PATCH 11/17] feat: mock a few cameras for testing --- .../rj_escritorio/flooding_detection/flows.py | 6 +++- .../rj_escritorio/flooding_detection/tasks.py | 32 ++++++++++++++++--- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/pipelines/rj_escritorio/flooding_detection/flows.py b/pipelines/rj_escritorio/flooding_detection/flows.py index 2ac6390e3..0ea1f63e0 100644 --- a/pipelines/rj_escritorio/flooding_detection/flows.py +++ b/pipelines/rj_escritorio/flooding_detection/flows.py @@ -33,6 +33,10 @@ "cameras_geodf_url", required=True, ) + mocked_cameras_number = Parameter( + "mocked_cameras_number", + default=0, + ) openai_api_max_tokens = Parameter("openai_api_max_tokens", default=300) openai_api_model = Parameter("openai_api_model", default="gpt-4-vision-preview") openai_api_url = Parameter( @@ -69,6 +73,7 @@ cameras_data_url=cameras_geodf_url, last_update=last_update, predictions_buffer_key=redis_key_predictions_buffer, + number_mock_rain_cameras=mocked_cameras_number, ) openai_api_key = get_openai_api_key(secret_path=openai_api_key_secret_path) images = get_snapshot.map( @@ -79,7 +84,6 @@ flooding_prompt=unmapped(openai_flooding_detection_prompt), openai_api_key=unmapped(openai_api_key), openai_api_model=unmapped(openai_api_model), - predictions_buffer_key=unmapped(redis_key_predictions_buffer), openai_api_max_tokens=unmapped(openai_api_max_tokens), openai_api_url=unmapped(openai_api_url), ) diff --git a/pipelines/rj_escritorio/flooding_detection/tasks.py b/pipelines/rj_escritorio/flooding_detection/tasks.py index 338561588..19852418d 100644 --- a/pipelines/rj_escritorio/flooding_detection/tasks.py +++ b/pipelines/rj_escritorio/flooding_detection/tasks.py @@ -4,6 +4,7 @@ import io import json from pathlib import Path +import random from typing import Dict, List, Union import cv2 @@ -19,6 +20,7 @@ from pipelines.rj_escritorio.flooding_detection.utils import ( download_file, redis_add_to_prediction_buffer, + redis_get_prediction_buffer, ) from pipelines.utils.utils import get_redis_client, get_vault_secret, log @@ -63,7 +65,6 @@ def get_prediction( flooding_prompt: str, openai_api_key: str, openai_api_model: str, - predictions_buffer_key: str, openai_api_max_tokens: int = 300, openai_api_url: str = "https://api.openai.com/v1/chat/completions", ) -> Dict[str, Union[str, float, bool]]: @@ -74,7 +75,9 @@ def get_prediction( image: The image in base64 format. flooding_prompt: The flooding prompt. openai_api_key: The OpenAI API key. - predictions_buffer_key: The Redis key for the predictions buffer. + openai_api_model: The OpenAI API model. + openai_api_max_tokens: The OpenAI API max tokens. + openai_api_url: The OpenAI API URL. Returns: The prediction in the following format: @@ -164,6 +167,7 @@ def pick_cameras( cameras_data_url: str, last_update: datetime, predictions_buffer_key: str, + number_mock_rain_cameras: int = 0, ) -> List[Dict[str, Union[str, float]]]: """ Picks cameras based on the raining hexagons and last update. @@ -185,8 +189,6 @@ def pick_cameras( ... ] """ - # TODO: - # - Must always pick cameras whose buffer contains flooding predictions # Download the cameras data cameras_data_path = Path("/tmp") / "cameras_geo_min.csv" if not download_file(url=cameras_data_url, output_path=cameras_data_path): @@ -211,6 +213,26 @@ def pick_cameras( log("Successfully joined the dataframes.") log(f"Cameras H3 shape: {df_cameras_h3.shape}") + # Modify status based on buffers + for _, row in df_cameras_h3.iterrows(): + predictions_buffer_camera_key = f"{predictions_buffer_key}_{row['id_camera']}" + predictions_buffer = redis_get_prediction_buffer(predictions_buffer_camera_key) + # Get most common prediction + most_common_prediction = max( + set(predictions_buffer), key=predictions_buffer.count + ) + # Add classifications + if most_common_prediction or predictions_buffer[-1]: + row["status"] = "chuva moderada" + + # Mock a few cameras when argument is set + if number_mock_rain_cameras > 0: + df_len = len(df_cameras_h3) + for _ in range(number_mock_rain_cameras): + mocked_index = random.randint(0, df_len) + df_cameras_h3.loc[mocked_index, "status"] = "chuva moderada" + log(f'Mocked camera ID: {df_cameras_h3.loc[mocked_index]["id_camera"]}') + # Pick cameras mask = np.logical_not(df_cameras_h3["status"].isin(["sem chuva", "chuva fraca"])) df_cameras_h3 = df_cameras_h3[mask] @@ -222,7 +244,7 @@ def pick_cameras( for _, row in df_cameras_h3.iterrows(): output.append( { - "id_camera": row["codigo"], + "id_camera": row["id_camera"], "url_camera": row["nome_da_camera"], "latitude": row["geometry"].y, "longitude": row["geometry"].x, From 72ac9a37421abe85d49d97f5a4b2ea7f5e9626be Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Tue, 14 Nov 2023 14:15:48 -0300 Subject: [PATCH 12/17] fix: add missing libs to Docker --- Dockerfile | 2 +- pipelines/rj_escritorio/flooding_detection/flows.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index bf37c31f2..1620c1264 100644 --- a/Dockerfile +++ b/Dockerfile @@ -35,7 +35,7 @@ RUN apt-get update && \ curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - && \ echo "deb [arch=amd64,arm64,armhf] https://packages.microsoft.com/debian/12/prod bookworm main" > /etc/apt/sources.list.d/mssql-release.list && \ apt-get update && \ - ACCEPT_EULA=Y apt-get install --no-install-recommends -y msodbcsql17 openssl unixodbc-dev && \ + ACCEPT_EULA=Y apt-get install --no-install-recommends -y ffmpeg libsm6 libxext6 msodbcsql17 openssl unixodbc-dev && \ rm -rf /var/lib/apt/lists/* && \ sh -c "echo /opt/oracle/instantclient_21_5 > /etc/ld.so.conf.d/oracle-instantclient.conf" && \ ldconfig diff --git a/pipelines/rj_escritorio/flooding_detection/flows.py b/pipelines/rj_escritorio/flooding_detection/flows.py index 0ea1f63e0..f79cdbd4e 100644 --- a/pipelines/rj_escritorio/flooding_detection/flows.py +++ b/pipelines/rj_escritorio/flooding_detection/flows.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- """ -Flow definition for generating a data catalog from BigQuery. +Flow definition for flooding detection using AI. """ from prefect import Parameter from prefect.run_configs import KubernetesRun From e5fdc98f05c20f614f774e4f12f4c06eacaeec89 Mon Sep 17 00:00:00 2001 From: d116626 Date: Tue, 14 Nov 2023 14:38:06 -0300 Subject: [PATCH 13/17] fix: pick_cameras output --- pipelines/rj_escritorio/flooding_detection/tasks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_escritorio/flooding_detection/tasks.py b/pipelines/rj_escritorio/flooding_detection/tasks.py index 19852418d..32fde4570 100644 --- a/pipelines/rj_escritorio/flooding_detection/tasks.py +++ b/pipelines/rj_escritorio/flooding_detection/tasks.py @@ -245,7 +245,8 @@ def pick_cameras( output.append( { "id_camera": row["id_camera"], - "url_camera": row["nome_da_camera"], + "nome_camera": row["nome"], + "url_camera": row["rtsp"], "latitude": row["geometry"].y, "longitude": row["geometry"].x, } From 92c92fef005e39cf26700f2bbd96287e7c4ed622 Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Tue, 14 Nov 2023 15:14:38 -0300 Subject: [PATCH 14/17] fix: use db 1 for api --- .../rj_escritorio/flooding_detection/tasks.py | 24 +++++++++---------- .../rj_escritorio/flooding_detection/utils.py | 4 ++-- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pipelines/rj_escritorio/flooding_detection/tasks.py b/pipelines/rj_escritorio/flooding_detection/tasks.py index 19852418d..58a2f85d8 100644 --- a/pipelines/rj_escritorio/flooding_detection/tasks.py +++ b/pipelines/rj_escritorio/flooding_detection/tasks.py @@ -317,20 +317,20 @@ def update_flooding_api_data( "confidence": 0.7, } ) - api_data.append( - { - "datetime": last_update.to_datetime_string(), - "id_camera": camera["id_camera"], - "url_camera": camera["url_camera"], - "latitude": camera["latitude"], - "longitude": camera["longitude"], - "image_base64": image, - "ai_classification": ai_classification, - } - ) + api_data.append( + { + "datetime": last_update.to_datetime_string(), + "id_camera": camera["id_camera"], + "url_camera": camera["url_camera"], + "latitude": camera["latitude"], + "longitude": camera["longitude"], + "image_base64": image, + "ai_classification": ai_classification, + } + ) # Update API data - redis_client = get_redis_client() + redis_client = get_redis_client(db=1) redis_client.set(data_key, json.dumps(api_data)) redis_client.set(last_update_key, last_update.to_datetime_string()) log("Successfully updated flooding detection data.") diff --git a/pipelines/rj_escritorio/flooding_detection/utils.py b/pipelines/rj_escritorio/flooding_detection/utils.py index b6e570e65..078552a2a 100644 --- a/pipelines/rj_escritorio/flooding_detection/utils.py +++ b/pipelines/rj_escritorio/flooding_detection/utils.py @@ -226,7 +226,7 @@ def redis_add_to_prediction_buffer(key: str, value: bool, len_: int = 3) -> List prediction_buffer = redis_get_prediction_buffer(key, len_) prediction_buffer.append(value) prediction_buffer = prediction_buffer[-len_:] - redis_client: RedisPal = get_redis_client() + redis_client: RedisPal = get_redis_client(db=1) redis_client.set(key, prediction_buffer) return prediction_buffer @@ -242,7 +242,7 @@ def redis_get_prediction_buffer(key: str, len_: int = 3) -> List[bool]: Returns: The prediction buffer. """ - redis_client: RedisPal = get_redis_client() + redis_client: RedisPal = get_redis_client(db=1) prediction_buffer = redis_client.get(key) if prediction_buffer is None: return [False] * len_ From a427436831bb562cf7a98349a52b46a1ab16fb7d Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Tue, 14 Nov 2023 15:37:53 -0300 Subject: [PATCH 15/17] chore: add retry for `get_snapshot` --- pipelines/rj_escritorio/flooding_detection/tasks.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_escritorio/flooding_detection/tasks.py b/pipelines/rj_escritorio/flooding_detection/tasks.py index 23e0d1aa1..467fc61c9 100644 --- a/pipelines/rj_escritorio/flooding_detection/tasks.py +++ b/pipelines/rj_escritorio/flooding_detection/tasks.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- +# TODO: Make it resilient to camera failures import base64 -from datetime import datetime +from datetime import datetime, timedelta import io import json from pathlib import Path @@ -128,7 +129,10 @@ def get_prediction( } -@task +@task( + max_retries=3, + retry_delay=timedelta(seconds=5), +) def get_snapshot( camera: Dict[str, Union[str, float]], ) -> str: From 916b0cec71a192c3b926fbcc2b696fac3a51b971 Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Tue, 14 Nov 2023 15:50:21 -0300 Subject: [PATCH 16/17] fix: python types are supported --- pipelines/rj_escritorio/flooding_detection/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_escritorio/flooding_detection/tasks.py b/pipelines/rj_escritorio/flooding_detection/tasks.py index 467fc61c9..24eae6879 100644 --- a/pipelines/rj_escritorio/flooding_detection/tasks.py +++ b/pipelines/rj_escritorio/flooding_detection/tasks.py @@ -336,6 +336,6 @@ def update_flooding_api_data( # Update API data redis_client = get_redis_client(db=1) - redis_client.set(data_key, json.dumps(api_data)) + redis_client.set(data_key, api_data) redis_client.set(last_update_key, last_update.to_datetime_string()) log("Successfully updated flooding detection data.") From f7fe0800843bbaabb343ce7aa873448a0543d357 Mon Sep 17 00:00:00 2001 From: Gabriel Gazola Milan Date: Tue, 14 Nov 2023 16:07:46 -0300 Subject: [PATCH 17/17] feat: add parameters to schedule --- .../flooding_detection/schedules.py | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_escritorio/flooding_detection/schedules.py b/pipelines/rj_escritorio/flooding_detection/schedules.py index c974eaac7..7d9a135f2 100644 --- a/pipelines/rj_escritorio/flooding_detection/schedules.py +++ b/pipelines/rj_escritorio/flooding_detection/schedules.py @@ -19,7 +19,27 @@ labels=[ constants.RJ_ESCRITORIO_AGENT_LABEL.value, ], - parameter_defaults={}, # TODO: Add parameters + parameter_defaults={ + "cameras_geodf_url": "https://prefeitura-rio.github.io/storage/cameras_geo_min.csv", + "mocked_cameras_number": 0, + "openai_api_key_secret_path": "openai-api-key-flooding-detection", + "openai_api_max_tokens": 300, + "openai_api_model": "gpt-4-vision-preview", + "openai_api_url": "https://api.openai.com/v1/chat/completions", + "openai_flooding_detection_prompt": """You are an expert flooding detector. You are + given a image. You must detect if there is flooding in the image. The output MUST + be a JSON object with a boolean value for the key "flooding_detected". If you don't + know what to anwser, you can set the key "flooding_detect" as false. Example: + { + "flooding_detected": true + } + """, + "rain_api_update_url": "https://api.dados.rio/v2/clima_pluviometro/ultima_atualizacao_precipitacao_15min/", # noqa + "rain_api_url": "https://api.dados.rio/v2/clima_pluviometro/precipitacao_15min/", + "redis_key_flooding_detection_data": "flooding_detection_data", + "redis_key_flooding_detection_last_update": "flooding_detection_last_update", + "redis_key_predictions_buffer": "flooding_detection_predictions_buffer", + }, ), ] )