Skip to content

Commit

Permalink
Merge pull request #556 from prefeitura-rio/feat/deteccao-alagamento
Browse files Browse the repository at this point in the history
[FEATURE] Adiciona pipeline de detecção de alagamentos
  • Loading branch information
gabriel-milan authored Nov 14, 2023
2 parents 588b7ee + f7fe080 commit fc92660
Show file tree
Hide file tree
Showing 9 changed files with 801 additions and 2 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ RUN apt-get update && \
curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - && \
echo "deb [arch=amd64,arm64,armhf] https://packages.microsoft.com/debian/12/prod bookworm main" > /etc/apt/sources.list.d/mssql-release.list && \
apt-get update && \
ACCEPT_EULA=Y apt-get install --no-install-recommends -y msodbcsql17 openssl unixodbc-dev && \
ACCEPT_EULA=Y apt-get install --no-install-recommends -y ffmpeg libsm6 libxext6 msodbcsql17 openssl unixodbc-dev && \
rm -rf /var/lib/apt/lists/* && \
sh -c "echo /opt/oracle/instantclient_21_5 > /etc/ld.so.conf.d/oracle-instantclient.conf" && \
ldconfig
Expand Down
1 change: 1 addition & 0 deletions pipelines/rj_escritorio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pipelines.rj_escritorio.dados_mestres_dump_datario.flows import *
from pipelines.rj_escritorio.data_catalog.flows import *
from pipelines.rj_escritorio.dummy_predict.flows import *
from pipelines.rj_escritorio.flooding_detection.flows import *
from pipelines.rj_escritorio.notify_flooding.flows import *
from pipelines.rj_escritorio.template_pipeline.flows import *
from pipelines.rj_escritorio.tweets_flamengo.flows import *
Expand Down
Empty file.
105 changes: 105 additions & 0 deletions pipelines/rj_escritorio/flooding_detection/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
"""
Flow definition for flooding detection using AI.
"""
from prefect import Parameter
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.utilities.edges import unmapped

from pipelines.constants import constants
from pipelines.rj_escritorio.flooding_detection.schedules import (
update_flooding_data_schedule,
)
from pipelines.rj_escritorio.flooding_detection.tasks import (
get_last_update,
get_openai_api_key,
get_prediction,
get_snapshot,
pick_cameras,
update_flooding_api_data,
)
from pipelines.utils.decorators import Flow

with Flow(
name="EMD: flooding_detection - Atualizar detecção de alagamento (IA) na API",
code_owners=[
"gabriel",
"diego",
],
) as rj_escritorio__flooding_detection__flow:
# Parameters
cameras_geodf_url = Parameter(
"cameras_geodf_url",
required=True,
)
mocked_cameras_number = Parameter(
"mocked_cameras_number",
default=0,
)
openai_api_max_tokens = Parameter("openai_api_max_tokens", default=300)
openai_api_model = Parameter("openai_api_model", default="gpt-4-vision-preview")
openai_api_url = Parameter(
"openai_api_url",
default="https://api.openai.com/v1/chat/completions",
)
openai_api_key_secret_path = Parameter("openai_api_key_secret_path", required=True)
openai_flooding_detection_prompt = Parameter(
"openai_flooding_detection_prompt", required=True
)
rain_api_data_url = Parameter(
"rain_api_url",
default="https://api.dados.rio/v2/clima_pluviometro/precipitacao_15min/",
)
rain_api_update_url = Parameter(
"rain_api_update_url",
default="https://api.dados.rio/v2/clima_pluviometro/ultima_atualizacao_precipitacao_15min/",
)
redis_key_predictions_buffer = Parameter(
"redis_key_predictions_buffer", default="flooding_detection_predictions_buffer"
)
redis_key_flooding_detection_data = Parameter(
"redis_key_flooding_detection_data", default="flooding_detection_data"
)
redis_key_flooding_detection_last_update = Parameter(
"redis_key_flooding_detection_last_update",
default="flooding_detection_last_update",
)

# Flow
last_update = get_last_update(rain_api_update_url=rain_api_update_url)
cameras = pick_cameras(
rain_api_data_url=rain_api_data_url,
cameras_data_url=cameras_geodf_url,
last_update=last_update,
predictions_buffer_key=redis_key_predictions_buffer,
number_mock_rain_cameras=mocked_cameras_number,
)
openai_api_key = get_openai_api_key(secret_path=openai_api_key_secret_path)
images = get_snapshot.map(
camera=cameras,
)
predictions = get_prediction.map(
image=images,
flooding_prompt=unmapped(openai_flooding_detection_prompt),
openai_api_key=unmapped(openai_api_key),
openai_api_model=unmapped(openai_api_model),
openai_api_max_tokens=unmapped(openai_api_max_tokens),
openai_api_url=unmapped(openai_api_url),
)
update_flooding_api_data(
predictions=predictions,
cameras=cameras,
images=images,
data_key=redis_key_flooding_detection_data,
last_update_key=redis_key_flooding_detection_last_update,
predictions_buffer_key=redis_key_predictions_buffer,
)


rj_escritorio__flooding_detection__flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
rj_escritorio__flooding_detection__flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[constants.RJ_ESCRITORIO_AGENT_LABEL.value],
)
rj_escritorio__flooding_detection__flow.schedule = update_flooding_data_schedule
45 changes: 45 additions & 0 deletions pipelines/rj_escritorio/flooding_detection/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
"""
Schedules for the data catalog pipeline.
"""

from datetime import timedelta, datetime

from prefect.schedules import Schedule
from prefect.schedules.clocks import IntervalClock
import pytz

from pipelines.constants import constants

update_flooding_data_schedule = Schedule(
clocks=[
IntervalClock(
interval=timedelta(minutes=3),
start_date=datetime(2023, 1, 1, tzinfo=pytz.timezone("America/Sao_Paulo")),
labels=[
constants.RJ_ESCRITORIO_AGENT_LABEL.value,
],
parameter_defaults={
"cameras_geodf_url": "https://prefeitura-rio.github.io/storage/cameras_geo_min.csv",
"mocked_cameras_number": 0,
"openai_api_key_secret_path": "openai-api-key-flooding-detection",
"openai_api_max_tokens": 300,
"openai_api_model": "gpt-4-vision-preview",
"openai_api_url": "https://api.openai.com/v1/chat/completions",
"openai_flooding_detection_prompt": """You are an expert flooding detector. You are
given a image. You must detect if there is flooding in the image. The output MUST
be a JSON object with a boolean value for the key "flooding_detected". If you don't
know what to anwser, you can set the key "flooding_detect" as false. Example:
{
"flooding_detected": true
}
""",
"rain_api_update_url": "https://api.dados.rio/v2/clima_pluviometro/ultima_atualizacao_precipitacao_15min/", # noqa
"rain_api_url": "https://api.dados.rio/v2/clima_pluviometro/precipitacao_15min/",
"redis_key_flooding_detection_data": "flooding_detection_data",
"redis_key_flooding_detection_last_update": "flooding_detection_last_update",
"redis_key_predictions_buffer": "flooding_detection_predictions_buffer",
},
),
]
)
Loading

0 comments on commit fc92660

Please sign in to comment.