Skip to content

Commit

Permalink
Merge branch 'master' into staging/setur-seeketing-add-tables
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Nov 24, 2023
2 parents 9ba358a + ce08585 commit 8232c45
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 12 deletions.
12 changes: 8 additions & 4 deletions pipelines/rj_smtr/br_rj_riodejaneiro_gtfs/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Flows for gtfs
"""
from copy import deepcopy
from datetime import timedelta

# Imports #

Expand All @@ -26,9 +27,7 @@

# SMTR Imports #
from pipelines.rj_smtr.constants import constants
from pipelines.rj_smtr.tasks import (
get_current_timestamp,
)
from pipelines.rj_smtr.tasks import get_current_timestamp, get_scheduled_start_times

from pipelines.rj_smtr.flows import default_capture_flow, default_materialization_flow

Expand Down Expand Up @@ -71,7 +70,7 @@
timestamp = get_current_timestamp()

rename_flow_run = rename_current_flow_run_now_time(
prefix=gtfs_captura_tratamento.name + " ",
prefix=gtfs_captura_tratamento.name + " " + data_versao_gtfs + " ",
now_time=timestamp,
)

Expand All @@ -88,6 +87,11 @@
project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value),
parameters=gtfs_capture_parameters,
labels=unmapped(LABELS),
scheduled_start_time=get_scheduled_start_times(
timestamp=timestamp,
parameters=gtfs_capture_parameters,
intervals={"agency": timedelta(minutes=11)},
),
)

wait_captura_true = wait_for_flow_run.map(
Expand Down
16 changes: 8 additions & 8 deletions pipelines/rj_smtr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,10 @@ class constants(Enum): # pylint: disable=c0103
}

GTFS_TABLE_CAPTURE_PARAMS = [
{
"table_id": "shapes",
"primary_key": ["shape_id", "shape_pt_sequence"],
},
{
"table_id": "agency",
"primary_key": ["agency_id"],
Expand All @@ -489,18 +493,10 @@ class constants(Enum): # pylint: disable=c0103
"table_id": "routes",
"primary_key": ["route_id"],
},
{
"table_id": "shapes",
"primary_key": ["shape_id", "shape_pt_sequence"],
},
{
"table_id": "stops",
"primary_key": ["stop_id"],
},
{
"table_id": "stop_times",
"primary_key": ["trip_id", "stop_sequence"],
},
{
"table_id": "trips",
"primary_key": ["trip_id"],
Expand All @@ -518,6 +514,10 @@ class constants(Enum): # pylint: disable=c0103
"primary_key": ["servico"],
"extract_params": {"filename": "ordem_servico"},
},
{
"table_id": "stop_times",
"primary_key": ["trip_id", "stop_sequence"],
},
]

GTFS_MATERIALIZACAO_PARAMS = {
Expand Down
37 changes: 37 additions & 0 deletions pipelines/rj_smtr/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1379,3 +1379,40 @@ def check_mapped_query_logs_output(query_logs_output: list[tuple]) -> bool:

recapture_list = [i[0] for i in query_logs_output]
return any(recapture_list)


@task
def get_scheduled_start_times(
timestamp: datetime, parameters: list, intervals: Union[None, dict] = None
):
"""
Task to get start times to schedule flows
Args:
timestamp (datetime): initial flow run timestamp
parameters (list): parameters for the flow
intervals (Union[None, dict], optional): intervals between each flow run. Defaults to None.
Optionally, you can pass specific intervals for some table_ids.
Suggests to pass intervals based on previous table observed execution times.
Defaults to dict(default=timedelta(minutes=2)).
Returns:
list[datetime]: list of scheduled start times
"""

if intervals is None:
intervals = dict()

if "default" not in intervals.keys():
intervals["default"] = timedelta(minutes=2)

timestamps = [None]
last_schedule = timestamp

for param in parameters[1:]:
last_schedule += intervals.get(
param.get("table_id", "default"), intervals["default"]
)
timestamps.append(last_schedule)

return timestamps

0 comments on commit 8232c45

Please sign in to comment.