diff --git a/Dockerfile b/Dockerfile index bf37c31f2..1620c1264 100644 --- a/Dockerfile +++ b/Dockerfile @@ -35,7 +35,7 @@ RUN apt-get update && \ curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - && \ echo "deb [arch=amd64,arm64,armhf] https://packages.microsoft.com/debian/12/prod bookworm main" > /etc/apt/sources.list.d/mssql-release.list && \ apt-get update && \ - ACCEPT_EULA=Y apt-get install --no-install-recommends -y msodbcsql17 openssl unixodbc-dev && \ + ACCEPT_EULA=Y apt-get install --no-install-recommends -y ffmpeg libsm6 libxext6 msodbcsql17 openssl unixodbc-dev && \ rm -rf /var/lib/apt/lists/* && \ sh -c "echo /opt/oracle/instantclient_21_5 > /etc/ld.so.conf.d/oracle-instantclient.conf" && \ ldconfig diff --git a/pipelines/rj_escritorio/__init__.py b/pipelines/rj_escritorio/__init__.py index a5c864245..0b8ffcfd6 100644 --- a/pipelines/rj_escritorio/__init__.py +++ b/pipelines/rj_escritorio/__init__.py @@ -9,6 +9,7 @@ from pipelines.rj_escritorio.dados_mestres_dump_datario.flows import * from pipelines.rj_escritorio.data_catalog.flows import * from pipelines.rj_escritorio.dummy_predict.flows import * +from pipelines.rj_escritorio.flooding_detection.flows import * from pipelines.rj_escritorio.notify_flooding.flows import * from pipelines.rj_escritorio.template_pipeline.flows import * from pipelines.rj_escritorio.tweets_flamengo.flows import * diff --git a/pipelines/rj_escritorio/flooding_detection/__init__.py b/pipelines/rj_escritorio/flooding_detection/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pipelines/rj_escritorio/flooding_detection/flows.py b/pipelines/rj_escritorio/flooding_detection/flows.py new file mode 100644 index 000000000..f79cdbd4e --- /dev/null +++ b/pipelines/rj_escritorio/flooding_detection/flows.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- +""" +Flow definition for flooding detection using AI. +""" +from prefect import Parameter +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS +from prefect.utilities.edges import unmapped + +from pipelines.constants import constants +from pipelines.rj_escritorio.flooding_detection.schedules import ( + update_flooding_data_schedule, +) +from pipelines.rj_escritorio.flooding_detection.tasks import ( + get_last_update, + get_openai_api_key, + get_prediction, + get_snapshot, + pick_cameras, + update_flooding_api_data, +) +from pipelines.utils.decorators import Flow + +with Flow( + name="EMD: flooding_detection - Atualizar detecção de alagamento (IA) na API", + code_owners=[ + "gabriel", + "diego", + ], +) as rj_escritorio__flooding_detection__flow: + # Parameters + cameras_geodf_url = Parameter( + "cameras_geodf_url", + required=True, + ) + mocked_cameras_number = Parameter( + "mocked_cameras_number", + default=0, + ) + openai_api_max_tokens = Parameter("openai_api_max_tokens", default=300) + openai_api_model = Parameter("openai_api_model", default="gpt-4-vision-preview") + openai_api_url = Parameter( + "openai_api_url", + default="https://api.openai.com/v1/chat/completions", + ) + openai_api_key_secret_path = Parameter("openai_api_key_secret_path", required=True) + openai_flooding_detection_prompt = Parameter( + "openai_flooding_detection_prompt", required=True + ) + rain_api_data_url = Parameter( + "rain_api_url", + default="https://api.dados.rio/v2/clima_pluviometro/precipitacao_15min/", + ) + rain_api_update_url = Parameter( + "rain_api_update_url", + default="https://api.dados.rio/v2/clima_pluviometro/ultima_atualizacao_precipitacao_15min/", + ) + redis_key_predictions_buffer = Parameter( + "redis_key_predictions_buffer", default="flooding_detection_predictions_buffer" + ) + redis_key_flooding_detection_data = Parameter( + "redis_key_flooding_detection_data", default="flooding_detection_data" + ) + redis_key_flooding_detection_last_update = Parameter( + "redis_key_flooding_detection_last_update", + default="flooding_detection_last_update", + ) + + # Flow + last_update = get_last_update(rain_api_update_url=rain_api_update_url) + cameras = pick_cameras( + rain_api_data_url=rain_api_data_url, + cameras_data_url=cameras_geodf_url, + last_update=last_update, + predictions_buffer_key=redis_key_predictions_buffer, + number_mock_rain_cameras=mocked_cameras_number, + ) + openai_api_key = get_openai_api_key(secret_path=openai_api_key_secret_path) + images = get_snapshot.map( + camera=cameras, + ) + predictions = get_prediction.map( + image=images, + flooding_prompt=unmapped(openai_flooding_detection_prompt), + openai_api_key=unmapped(openai_api_key), + openai_api_model=unmapped(openai_api_model), + openai_api_max_tokens=unmapped(openai_api_max_tokens), + openai_api_url=unmapped(openai_api_url), + ) + update_flooding_api_data( + predictions=predictions, + cameras=cameras, + images=images, + data_key=redis_key_flooding_detection_data, + last_update_key=redis_key_flooding_detection_last_update, + predictions_buffer_key=redis_key_predictions_buffer, + ) + + +rj_escritorio__flooding_detection__flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +rj_escritorio__flooding_detection__flow.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, + labels=[constants.RJ_ESCRITORIO_AGENT_LABEL.value], +) +rj_escritorio__flooding_detection__flow.schedule = update_flooding_data_schedule diff --git a/pipelines/rj_escritorio/flooding_detection/schedules.py b/pipelines/rj_escritorio/flooding_detection/schedules.py new file mode 100644 index 000000000..7d9a135f2 --- /dev/null +++ b/pipelines/rj_escritorio/flooding_detection/schedules.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +""" +Schedules for the data catalog pipeline. +""" + +from datetime import timedelta, datetime + +from prefect.schedules import Schedule +from prefect.schedules.clocks import IntervalClock +import pytz + +from pipelines.constants import constants + +update_flooding_data_schedule = Schedule( + clocks=[ + IntervalClock( + interval=timedelta(minutes=3), + start_date=datetime(2023, 1, 1, tzinfo=pytz.timezone("America/Sao_Paulo")), + labels=[ + constants.RJ_ESCRITORIO_AGENT_LABEL.value, + ], + parameter_defaults={ + "cameras_geodf_url": "https://prefeitura-rio.github.io/storage/cameras_geo_min.csv", + "mocked_cameras_number": 0, + "openai_api_key_secret_path": "openai-api-key-flooding-detection", + "openai_api_max_tokens": 300, + "openai_api_model": "gpt-4-vision-preview", + "openai_api_url": "https://api.openai.com/v1/chat/completions", + "openai_flooding_detection_prompt": """You are an expert flooding detector. You are + given a image. You must detect if there is flooding in the image. The output MUST + be a JSON object with a boolean value for the key "flooding_detected". If you don't + know what to anwser, you can set the key "flooding_detect" as false. Example: + { + "flooding_detected": true + } + """, + "rain_api_update_url": "https://api.dados.rio/v2/clima_pluviometro/ultima_atualizacao_precipitacao_15min/", # noqa + "rain_api_url": "https://api.dados.rio/v2/clima_pluviometro/precipitacao_15min/", + "redis_key_flooding_detection_data": "flooding_detection_data", + "redis_key_flooding_detection_last_update": "flooding_detection_last_update", + "redis_key_predictions_buffer": "flooding_detection_predictions_buffer", + }, + ), + ] +) diff --git a/pipelines/rj_escritorio/flooding_detection/tasks.py b/pipelines/rj_escritorio/flooding_detection/tasks.py new file mode 100644 index 000000000..24eae6879 --- /dev/null +++ b/pipelines/rj_escritorio/flooding_detection/tasks.py @@ -0,0 +1,341 @@ +# -*- coding: utf-8 -*- +# TODO: Make it resilient to camera failures +import base64 +from datetime import datetime, timedelta +import io +import json +from pathlib import Path +import random +from typing import Dict, List, Union + +import cv2 +import geopandas as gpd +import numpy as np +import pandas as pd +import pendulum +from PIL import Image +from prefect import task +import requests +from shapely.geometry import Point + +from pipelines.rj_escritorio.flooding_detection.utils import ( + download_file, + redis_add_to_prediction_buffer, + redis_get_prediction_buffer, +) +from pipelines.utils.utils import get_redis_client, get_vault_secret, log + + +@task +def get_last_update( + rain_api_update_url: str, +) -> datetime: + """ + Gets the last update datetime from the rain API. + + Args: + rain_api_update_url: The rain API update url. + + Returns: + The last update datetime. + """ + data = requests.get(rain_api_update_url).text + data = data.strip('"') + log(f"Last update: {data}") + return datetime.strptime(data, "%d/%m/%Y %H:%M:%S") + + +@task +def get_openai_api_key(secret_path: str) -> str: + """ + Gets the OpenAI API key. + + Args: + secret_path: The secret path. + + Returns: + The OpenAI API key. + """ + secret = get_vault_secret(secret_path)["data"] + return secret["api_key"] + + +@task +def get_prediction( + image: str, + flooding_prompt: str, + openai_api_key: str, + openai_api_model: str, + openai_api_max_tokens: int = 300, + openai_api_url: str = "https://api.openai.com/v1/chat/completions", +) -> Dict[str, Union[str, float, bool]]: + """ + Gets the flooding detection prediction from OpenAI API. + + Args: + image: The image in base64 format. + flooding_prompt: The flooding prompt. + openai_api_key: The OpenAI API key. + openai_api_model: The OpenAI API model. + openai_api_max_tokens: The OpenAI API max tokens. + openai_api_url: The OpenAI API URL. + + Returns: + The prediction in the following format: + { + "object": "alagamento", + "label": True, + "confidence": 0.7, + } + """ + # TODO: + # - Add confidence value + # Setup the request + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {openai_api_key}", + } + payload = { + "model": openai_api_model, + "messages": [ + { + "role": "user", + "content": [ + { + "type": "text", + "text": flooding_prompt, + }, + { + "type": "image_url", + "image_url": {"url": f"data:image/jpeg;base64,{image}"}, + }, + ], + } + ], + "max_tokens": openai_api_max_tokens, + } + response = requests.post(openai_api_url, headers=headers, json=payload) + data: dict = response.json() + if data.get("error"): + raise RuntimeError(f"Failed to get prediction: {data['error']}") + content: str = data["choices"][0]["message"]["content"] + json_string = content.replace("```json\n", "").replace("\n```", "") + json_object = json.loads(json_string) + flooding_detected = json_object["flooding_detected"] + return { + "object": "alagamento", + "label": flooding_detected, + "confidence": 0.7, + } + + +@task( + max_retries=3, + retry_delay=timedelta(seconds=5), +) +def get_snapshot( + camera: Dict[str, Union[str, float]], +) -> str: + """ + Gets a snapshot from a camera. + + Args: + camera: The camera in the following format: + { + "id_camera": "1", + "url_camera": "rtsp://...", + "latitude": -22.912, + "longitude": -43.230, + } + + Returns: + The snapshot in base64 format. + """ + rtsp_url = camera["url_camera"] + cap = cv2.VideoCapture(rtsp_url) + ret, frame = cap.read() + if not ret: + raise RuntimeError(f"Failed to get snapshot from URL {rtsp_url}.") + cap.release() + img = Image.fromarray(frame) + buffer = io.BytesIO() + img.save(buffer, format="JPEG") + img_b64 = base64.b64encode(buffer.getvalue()).decode("utf-8") + log(f"Successfully got snapshot from URL {rtsp_url}.") + return img_b64 + + +@task +def pick_cameras( + rain_api_data_url: str, + cameras_data_url: str, + last_update: datetime, + predictions_buffer_key: str, + number_mock_rain_cameras: int = 0, +) -> List[Dict[str, Union[str, float]]]: + """ + Picks cameras based on the raining hexagons and last update. + + Args: + rain_api_data_url: The rain API data url. + last_update: The last update datetime. + predictions_buffer_key: The Redis key for the predictions buffer. + + Returns: + A list of cameras in the following format: + [ + { + "id_camera": "1", + "url_camera": "rtsp://...", + "latitude": -22.912, + "longitude": -43.230, + }, + ... + ] + """ + # Download the cameras data + cameras_data_path = Path("/tmp") / "cameras_geo_min.csv" + if not download_file(url=cameras_data_url, output_path=cameras_data_path): + raise RuntimeError("Failed to download the cameras data.") + cameras = pd.read_csv(cameras_data_path) + cameras = cameras.drop(columns=["geometry"]) + geometry = [Point(xy) for xy in zip(cameras["longitude"], cameras["latitude"])] + df_cameras = gpd.GeoDataFrame(cameras, geometry=geometry) + df_cameras.crs = {"init": "epsg:4326"} + log("Successfully downloaded cameras data.") + log(f"Cameras shape: {df_cameras.shape}") + + # Get rain data + rain_data = requests.get(rain_api_data_url).json() + df_rain = pd.DataFrame(rain_data) + df_rain["last_update"] = last_update + log("Successfully downloaded rain data.") + log(f"Rain data shape: {df_rain.shape}") + + # Join the dataframes + df_cameras_h3 = pd.merge(df_cameras, df_rain, how="left", on="id_h3") + log("Successfully joined the dataframes.") + log(f"Cameras H3 shape: {df_cameras_h3.shape}") + + # Modify status based on buffers + for _, row in df_cameras_h3.iterrows(): + predictions_buffer_camera_key = f"{predictions_buffer_key}_{row['id_camera']}" + predictions_buffer = redis_get_prediction_buffer(predictions_buffer_camera_key) + # Get most common prediction + most_common_prediction = max( + set(predictions_buffer), key=predictions_buffer.count + ) + # Add classifications + if most_common_prediction or predictions_buffer[-1]: + row["status"] = "chuva moderada" + + # Mock a few cameras when argument is set + if number_mock_rain_cameras > 0: + df_len = len(df_cameras_h3) + for _ in range(number_mock_rain_cameras): + mocked_index = random.randint(0, df_len) + df_cameras_h3.loc[mocked_index, "status"] = "chuva moderada" + log(f'Mocked camera ID: {df_cameras_h3.loc[mocked_index]["id_camera"]}') + + # Pick cameras + mask = np.logical_not(df_cameras_h3["status"].isin(["sem chuva", "chuva fraca"])) + df_cameras_h3 = df_cameras_h3[mask] + log("Successfully picked cameras.") + log(f"Picked cameras shape: {df_cameras_h3.shape}") + + # Set output + output = [] + for _, row in df_cameras_h3.iterrows(): + output.append( + { + "id_camera": row["id_camera"], + "nome_camera": row["nome"], + "url_camera": row["rtsp"], + "latitude": row["geometry"].y, + "longitude": row["geometry"].x, + } + ) + log(f"Picked cameras: {output}") + return output + + +@task +def update_flooding_api_data( + predictions: List[Dict[str, Union[str, float, bool]]], + cameras: List[Dict[str, Union[str, float]]], + images: List[str], + data_key: str, + last_update_key: str, + predictions_buffer_key: str, +) -> None: + """ + Updates Redis keys with flooding detection data and last update datetime (now). + + Args: + predictions: The AI predictions in the following format: + [ + { + "object": "alagamento", + "label": True, + "confidence": 0.7, + }, + ... + ] + cameras: A list of cameras in the following format: + [ + { + "id_camera": "1", + "url_camera": "rtsp://...", + "latitude": -22.912, + "longitude": -43.230, + }, + ... + ] + images: A list of images in base64 format. + data_key: The Redis key for the flooding detection data. + last_update_key: The Redis key for the last update datetime. + predictions_buffer_key: The Redis key for the predictions buffer. + """ + # Build API data + last_update = pendulum.now(tz="America/Sao_Paulo") + api_data = [] + for prediction, camera, image in zip(predictions, cameras, images): + # Get AI classifications + ai_classification = [] + current_prediction = prediction["label"] + predictions_buffer_camera_key = ( + f"{predictions_buffer_key}_{camera['id_camera']}" + ) + predictions_buffer = redis_add_to_prediction_buffer( + predictions_buffer_camera_key, current_prediction + ) + # Get most common prediction + most_common_prediction = max( + set(predictions_buffer), key=predictions_buffer.count + ) + # Add classifications + if most_common_prediction: + ai_classification.append( + { + "object": "alagamento", + "label": True, + "confidence": 0.7, + } + ) + api_data.append( + { + "datetime": last_update.to_datetime_string(), + "id_camera": camera["id_camera"], + "url_camera": camera["url_camera"], + "latitude": camera["latitude"], + "longitude": camera["longitude"], + "image_base64": image, + "ai_classification": ai_classification, + } + ) + + # Update API data + redis_client = get_redis_client(db=1) + redis_client.set(data_key, api_data) + redis_client.set(last_update_key, last_update.to_datetime_string()) + log("Successfully updated flooding detection data.") diff --git a/pipelines/rj_escritorio/flooding_detection/utils.py b/pipelines/rj_escritorio/flooding_detection/utils.py new file mode 100644 index 000000000..078552a2a --- /dev/null +++ b/pipelines/rj_escritorio/flooding_detection/utils.py @@ -0,0 +1,254 @@ +# -*- coding: utf-8 -*- +from pathlib import Path +from typing import Any, Dict, List, Union + +import geopandas as gpd +import h3 +import pandas as pd +from redis_pal import RedisPal +import requests +from shapely.geometry import Point, Polygon + +from pipelines.utils.utils import get_redis_client, remove_columns_accents + + +def download_file(url: str, output_path: Union[str, Path]) -> bool: + """ + Downloads a file from a URL. + + Args: + url: The URL. + output_path: The output path. + + Returns: + Whether the file was downloaded successfully. + """ + response = requests.get(url) + if response.status_code == 200: + with open(output_path, "wb") as f: + f.write(response.content) + return True + return False + + +def h3_id_to_polygon(h3_id: str): + """ + Converts an H3 ID to a Polygon. + + Args: + h3_id: The H3 ID. + + Returns: + The Polygon. + """ + return Polygon(h3.h3_to_geo_boundary(h3_id, geo_json=True)) + + +def extract_data(row: Dict[str, Any]) -> pd.Series: + """ + Extracts username, password, and path from a given row with camera data. + + Parameters: + - row (Dict[str, Any]): A dictionary representing a row of camera data. + Expected keys are 'rtsp' and 'ip'. + + Returns: + - pd.Series: A pandas Series containing extracted 'username', 'password', + and 'path' information. + """ + + try: + rtsp = row["rtsp"] + # Remove protocol + rtsp = rtsp.replace("rtsp://", "").replace("rtsp:/", "") + # If we have an "@" in the URL, we have username and password + if "@" in rtsp: + # Extract username and password + username_password = rtsp.split("@")[0].split(":") + if len(username_password) == 2: + username = username_password[0] + password = username_password[1] + else: + print(username_password) + raise Exception("Why???") + # Remove username and password from rtsp + rtsp = rtsp.split("@")[1] + else: + username = None + password = None + # Extract path + path = "/".join(rtsp.split("/")[1:]) + # Return the data + return pd.Series( + { + "username": username, + "password": password, + "path": path, + } + ) + except Exception as exc: + print(row["rtsp"]) + raise exc + + +def build_rtsp(row: pd.Series) -> str: + """ + Builds a complete RTSP URL from the given row data. + + Parameters: + - row (pd.Series): A pandas Series containing 'username', 'password', 'path', and 'ip'. + + Returns: + - str: The complete RTSP URL. + """ + username = row["username"] + password = row["password"] + path = row["path"] + ip = row["ip"] + # If we have username and password, add them to the URL + if username and password: + return f"rtsp://{username}:{password}@{ip}/{path}" + else: + return f"rtsp://{ip}/{path}" + + +def get_rain_dataframe() -> pd.DataFrame: + """ + Fetches and returns rainfall data from a specified API. + + Returns: + - pd.DataFrame: A pandas DataFrame containing the rainfall data. + """ + api_url = "https://api.dados.rio/v2/clima_pluviometro/precipitacao_15min/" + data = requests.get(api_url).json() + df_rain = pd.DataFrame(data) + + last_update_url = "https://api.dados.rio/v2/clima_pluviometro/ultima_atualizacao_precipitacao_15min/" # noqa + last_update = requests.get(last_update_url).json() + df_rain["last_update"] = last_update + df_rain["last_update"] = pd.to_datetime(df_rain["last_update"]) + + return df_rain + + +def get_cameras_h3(df: pd.DataFrame) -> gpd.GeoDataFrame: + """ + Enhances camera data with geographical information and joins it with rainfall data. + + Parameters: + - df (pd.DataFrame): A DataFrame containing camera data. + + Returns: + - gpd.GeoDataFrame: A GeoDataFrame containing the joined camera and rainfall data. + """ + cameras = df.copy() + geometry = [Point(xy) for xy in zip(cameras["longitude"], cameras["latitude"])] + cameras_geo = gpd.GeoDataFrame(cameras, geometry=geometry) + cameras_geo.crs = {"init": "epsg:4326"} + + pluviometro = get_rain_dataframe() + pluviometro = pluviometro.rename(columns={"status": "status_chuva"}) + geometry = pluviometro["id_h3"].apply(lambda h3_id: h3_id_to_polygon(h3_id)) + pluviometro_geo = gpd.GeoDataFrame(pluviometro, geometry=geometry) + pluviometro_geo.crs = {"init": "epsg:4326"} + print("pluviometro_geo:", pluviometro_geo.shape) + + cameras_h3 = gpd.sjoin(cameras_geo, pluviometro_geo, how="left", op="within") + cameras_h3 = cameras_h3.drop(columns=["index_right"]) + cameras_h3 = cameras_h3[cameras_h3["id_h3"].notnull()] + + return cameras_h3 + + +def clean_and_padronize_cameras() -> gpd.GeoDataFrame: + """ + Cleans and standardizes camera data from a CSV file, then merges it with geographical data. + + Returns: + - gpd.GeoDataFrame: A GeoDataFrame containing the cleaned, standardized, and geographically + enriched camera data. + """ + df = pd.read_csv( + "./data/Cameras_em_2023-11-13.csv", delimiter=";", encoding="latin1" + ) + df.columns = remove_columns_accents(df) + df["codigo"] = df["codigo"].str.replace("'", "") + df = df[df["status"] == "Online"] + df = df[df["rtsp"].str.startswith("rtsp")] + + df["ip_in_rtsp"] = df.apply(lambda row: row["ip"] in row["rtsp"], axis=1) + df[~df["ip_in_rtsp"]] + + df["ip"] = df["ip"].replace("10.151.48.04", "10.151.48.4") + df["ip_in_rtsp"] = df.apply(lambda row: row["ip"] in row["rtsp"], axis=1) + df[~df["ip_in_rtsp"]] + + df[["username", "password", "path"]] = df.apply(extract_data, axis=1) + df["rtsp"] = df.apply(build_rtsp, axis=1) + # Filter out by subnet + df = df[ + df["ip"].str.startswith("10.10") + | df["ip"].str.startswith("10.151") + | df["ip"].str.startswith("10.152") + | df["ip"].str.startswith("10.153") + | df["ip"].str.startswith("10.50") + | df["ip"].str.startswith("10.52") + ] + + cameras_h3 = get_cameras_h3(df) + cols = [ + "codigo", + "nome_da_camera", + "rtsp", + "latitude", + "longitude", + "geometry", + "id_h3", + ] + cameras_h3 = cameras_h3[cols] + + cameras_h3 = cameras_h3.rename( + columns={"codigo": "id_camera", "nome_da_camera": "nome"} + ) + + return cameras_h3.reset_index(drop=True) + + +def redis_add_to_prediction_buffer(key: str, value: bool, len_: int = 3) -> List[bool]: + """ + Adds a value to the prediction buffer in Redis. + + Args: + key: The Redis key. + value: The value to be added. + len: The length of the buffer. + """ + prediction_buffer = redis_get_prediction_buffer(key, len_) + prediction_buffer.append(value) + prediction_buffer = prediction_buffer[-len_:] + redis_client: RedisPal = get_redis_client(db=1) + redis_client.set(key, prediction_buffer) + return prediction_buffer + + +def redis_get_prediction_buffer(key: str, len_: int = 3) -> List[bool]: + """ + Gets the prediction buffer from Redis. + + Args: + key: The Redis key. + len: The length of the buffer. + + Returns: + The prediction buffer. + """ + redis_client: RedisPal = get_redis_client(db=1) + prediction_buffer = redis_client.get(key) + if prediction_buffer is None: + return [False] * len_ + elif not isinstance(prediction_buffer, list): + return [False] * len_ + elif len(prediction_buffer) < len_: + diff = len_ - len(prediction_buffer) + return [False] * diff + prediction_buffer + return prediction_buffer diff --git a/poetry.lock b/poetry.lock index cd825252c..0c93f6c58 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2708,6 +2708,58 @@ files = [ {file = "h11-0.14.0.tar.gz", hash = "sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d"}, ] +[[package]] +name = "h3" +version = "3.7.6" +description = "Hierarchical hexagonal geospatial indexing system" +optional = false +python-versions = "*" +files = [ + {file = "h3-3.7.6-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:cd4a5103a86a7d98cffa3be77eb82080aa2e9d676bbd1661f3db9ecad7a4ef2b"}, + {file = "h3-3.7.6-cp27-cp27m-manylinux2010_x86_64.whl", hash = "sha256:231959dceb4cc4ae86fe4fe2c385b176ed81712549e787b889dfa66f583676df"}, + {file = "h3-3.7.6-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:b9de9da755c90bbc90d6c041396a20c91816cd86a0bafa3b8899681cfdc2c4c6"}, + {file = "h3-3.7.6-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:cda9a427b0de0d4069115ec765118888f180d0db915b5bc0dba52f5ae957b789"}, + {file = "h3-3.7.6-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:8bf1e080b9a47774754834e7f10155f3d2e3542bf895488a0519b2ae7d5b15db"}, + {file = "h3-3.7.6-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1d3b93e3f38eb6c8fc5051d1b289b74614fb5f2415d272fea18085dea260d6b0"}, + {file = "h3-3.7.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:783b2ca4448360c5a184fd43b84fc5554e3a8fd02738ff31349506189c5b4b49"}, + {file = "h3-3.7.6-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:3bae8b95f21f20f04141a35f15c8caa74f2046eb01ef49e35fc45e6a8edfc8df"}, + {file = "h3-3.7.6-cp310-cp310-win_amd64.whl", hash = "sha256:6ca9dd410e250d37f24a87c4ecb0615bb6d44a3f90eb5dbbf1b5e3d4489b8703"}, + {file = "h3-3.7.6-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:991ee991f2ae41f629feb1cd32fa677b8512c72696eb0ad94fcf359d61184b2e"}, + {file = "h3-3.7.6-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:fcbfff87d223279f8e38bbee3ebf52b1b96ae280e9e7de24674d3c284373d946"}, + {file = "h3-3.7.6-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:eddf10d1d2139b3ea3ad1618c2074e1c47d3d36bddb5359e4955f5fd0b089d93"}, + {file = "h3-3.7.6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:76abc02f14a8df42fb5d80e6045023fb756c49d3cb08d69a8ceb9362b95d4bec"}, + {file = "h3-3.7.6-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:fc8030968586a7810aa192397ad9a4f7d7a963f57c9b3e210fc38de0aa5c2533"}, + {file = "h3-3.7.6-cp311-cp311-win_amd64.whl", hash = "sha256:1bdc790d91138e781973dcaade5231db7fe8a876330939e0903f602acc4fb64c"}, + {file = "h3-3.7.6-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:198718ab20a06ebe52f0aaafc02469e4a51964e5ba7990c3cc1d2fc32e7a54d9"}, + {file = "h3-3.7.6-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:02faa911f2d8425c641a1f7c08f3dc9c10a5a3d81408832afa40748534b999c8"}, + {file = "h3-3.7.6-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1b4db92ceaeb9a51cc875c302cdc5e1aa27eed776d95943ee55c941bc2f219a3"}, + {file = "h3-3.7.6-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:4810ddb3d91411a6cbf87a28108fe31712f618ef223c349e1f6675602af2c473"}, + {file = "h3-3.7.6-cp36-cp36m-win_amd64.whl", hash = "sha256:211ef3317dcf7863e2d01a97ab6da319b8451d78cd1633dd28faaf69e66bc321"}, + {file = "h3-3.7.6-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:99b81620608378fc9910a5c630b0f17eb939156fa13e1adc55229d31f8c3f5ca"}, + {file = "h3-3.7.6-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:26f3175bd3ea3ee528dbf49308e7215a58351ce425e1c3a9838ae22526663311"}, + {file = "h3-3.7.6-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7d7b69015f5bab2525914fad370b96dc386437e19a14cfed3eb13868589263db"}, + {file = "h3-3.7.6-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:d0c2890fa10fa8695c020569c8d55da79e2c552a39533de4ae6991c7acb122e1"}, + {file = "h3-3.7.6-cp37-cp37m-win_amd64.whl", hash = "sha256:1cd4f07c49721023c5fef401a4de03c47000705dfd116579dc6b61cad821305d"}, + {file = "h3-3.7.6-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:f8d1db3dcd8f6ce7f54e061e6c9fbecbb5c3978b9e54e44af05a53787c4f99b3"}, + {file = "h3-3.7.6-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:495e37b1dee0ec46ccd88b278e571234b0d0d30648f161799d65a8d7f390b3f2"}, + {file = "h3-3.7.6-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b2e2c4808b7691b176c89ebf23c173b3b23dd4ce42f8f494b32c6e31ceee49af"}, + {file = "h3-3.7.6-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b58b1298bf1068704c6d9426749c8ae6021b53d982d5153cc4161c7042ecd810"}, + {file = "h3-3.7.6-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:a2872f695168c4700c73edd6eab9c6181387d7ea177de13b130ae61e613ff7de"}, + {file = "h3-3.7.6-cp38-cp38-win_amd64.whl", hash = "sha256:98c3951c3b67ca3de06ef70aa74a9752640a3eca9b4d68e0d5d8e4fc6fa72337"}, + {file = "h3-3.7.6-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:0724d4645c1da59e02b3305050c52b93ce1d8971d1d139433d464fcc103249a6"}, + {file = "h3-3.7.6-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:e1f6ec0f2246381ce3a7f72da1ce825a5474eb7c8fb25a2ea1f16c6606ce34a7"}, + {file = "h3-3.7.6-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1471ff4d3875b25b521eeec5c2b72abe27b8e6af10ab99b7da5c0de545b0e832"}, + {file = "h3-3.7.6-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eb96f2caae519d0ed17acde625af528476dca121b0336d3eb776429d40284ef6"}, + {file = "h3-3.7.6-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:b1b1bce0dee05175f8d422e50ffa1afacb9a7e78ae0963059aebfbef50e10175"}, + {file = "h3-3.7.6-cp39-cp39-win_amd64.whl", hash = "sha256:36ea935833c37fdfd7ffbfc000d7cd20addcdf67f30b26a6b9bccb9210b03704"}, + {file = "h3-3.7.6.tar.gz", hash = "sha256:9bbd3dbac99532fa521d7d2e288ff55877bea3223b070f659ed7b5f8f1f213eb"}, +] + +[package.extras] +all = ["flake8", "numpy", "pylint", "pytest", "pytest-cov"] +numpy = ["numpy"] +test = ["flake8", "pylint", "pytest", "pytest-cov"] + [[package]] name = "h5py" version = "3.9.0" @@ -7135,4 +7187,4 @@ testing = ["coverage (>=5.0.3)", "zope.event", "zope.testing"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<3.11" -content-hash = "26bc9f8eea3e12980a49451806dbc243c4ac6bc940bef5dd5f3e1f758f8c0e31" +content-hash = "b788af3d35ff07ac9dcbda64936ef602c4196038307f0c602a47f5ce1c3d363d" diff --git a/pyproject.toml b/pyproject.toml index c861bcc93..f08195f90 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,6 +63,7 @@ psycopg2-binary = "^2.9.9" azure-storage-blob = "^12.17.0" icecream = "^2.1.3" pyodbc = "^5.0.1" +h3 = "^3.7.6" [tool.poetry.dev-dependencies] pylint = "^2.12.2"