diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_gtfs/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_gtfs/flows.py index 97a3280b2..6246d1bd2 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_gtfs/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_gtfs/flows.py @@ -3,6 +3,7 @@ Flows for gtfs """ from copy import deepcopy +from datetime import timedelta # Imports # @@ -26,9 +27,7 @@ # SMTR Imports # from pipelines.rj_smtr.constants import constants -from pipelines.rj_smtr.tasks import ( - get_current_timestamp, -) +from pipelines.rj_smtr.tasks import get_current_timestamp, get_scheduled_start_times from pipelines.rj_smtr.flows import default_capture_flow, default_materialization_flow @@ -71,7 +70,7 @@ timestamp = get_current_timestamp() rename_flow_run = rename_current_flow_run_now_time( - prefix=gtfs_captura_tratamento.name + " ", + prefix=gtfs_captura_tratamento.name + " " + data_versao_gtfs + " ", now_time=timestamp, ) @@ -88,6 +87,11 @@ project_name=unmapped(emd_constants.PREFECT_DEFAULT_PROJECT.value), parameters=gtfs_capture_parameters, labels=unmapped(LABELS), + scheduled_start_time=get_scheduled_start_times( + timestamp=timestamp, + parameters=gtfs_capture_parameters, + intervals={"agency": timedelta(minutes=11)}, + ), ) wait_captura_true = wait_for_flow_run.map( diff --git a/pipelines/rj_smtr/constants.py b/pipelines/rj_smtr/constants.py index 5080bbed2..a5eaad0ef 100644 --- a/pipelines/rj_smtr/constants.py +++ b/pipelines/rj_smtr/constants.py @@ -465,6 +465,10 @@ class constants(Enum): # pylint: disable=c0103 } GTFS_TABLE_CAPTURE_PARAMS = [ + { + "table_id": "shapes", + "primary_key": ["shape_id", "shape_pt_sequence"], + }, { "table_id": "agency", "primary_key": ["agency_id"], @@ -489,18 +493,10 @@ class constants(Enum): # pylint: disable=c0103 "table_id": "routes", "primary_key": ["route_id"], }, - { - "table_id": "shapes", - "primary_key": ["shape_id", "shape_pt_sequence"], - }, { "table_id": "stops", "primary_key": ["stop_id"], }, - { - "table_id": "stop_times", - "primary_key": ["trip_id", "stop_sequence"], - }, { "table_id": "trips", "primary_key": ["trip_id"], @@ -518,6 +514,10 @@ class constants(Enum): # pylint: disable=c0103 "primary_key": ["servico"], "extract_params": {"filename": "ordem_servico"}, }, + { + "table_id": "stop_times", + "primary_key": ["trip_id", "stop_sequence"], + }, ] GTFS_MATERIALIZACAO_PARAMS = { diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index 4f79f042c..9e5a5b9bc 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -1379,3 +1379,40 @@ def check_mapped_query_logs_output(query_logs_output: list[tuple]) -> bool: recapture_list = [i[0] for i in query_logs_output] return any(recapture_list) + + +@task +def get_scheduled_start_times( + timestamp: datetime, parameters: list, intervals: Union[None, dict] = None +): + """ + Task to get start times to schedule flows + + Args: + timestamp (datetime): initial flow run timestamp + parameters (list): parameters for the flow + intervals (Union[None, dict], optional): intervals between each flow run. Defaults to None. + Optionally, you can pass specific intervals for some table_ids. + Suggests to pass intervals based on previous table observed execution times. + Defaults to dict(default=timedelta(minutes=2)). + + Returns: + list[datetime]: list of scheduled start times + """ + + if intervals is None: + intervals = dict() + + if "default" not in intervals.keys(): + intervals["default"] = timedelta(minutes=2) + + timestamps = [None] + last_schedule = timestamp + + for param in parameters[1:]: + last_schedule += intervals.get( + param.get("table_id", "default"), intervals["default"] + ) + timestamps.append(last_schedule) + + return timestamps