From c0bb021322516e70d646af72bdfd132e06015192 Mon Sep 17 00:00:00 2001 From: hellcassius Date: Wed, 31 Jan 2024 16:24:47 -0300 Subject: [PATCH 1/9] re-commit janitor flow --- pipelines/rj_smtr/__init__.py | 1 + pipelines/rj_smtr/janitor/__init__.py | 0 pipelines/rj_smtr/janitor/flows.py | 29 ++++++ pipelines/rj_smtr/janitor/tasks.py | 140 ++++++++++++++++++++++++++ 4 files changed, 170 insertions(+) create mode 100644 pipelines/rj_smtr/janitor/__init__.py create mode 100644 pipelines/rj_smtr/janitor/flows.py create mode 100644 pipelines/rj_smtr/janitor/tasks.py diff --git a/pipelines/rj_smtr/__init__.py b/pipelines/rj_smtr/__init__.py index b9532af8d..2205e6c54 100644 --- a/pipelines/rj_smtr/__init__.py +++ b/pipelines/rj_smtr/__init__.py @@ -21,3 +21,4 @@ from pipelines.rj_smtr.br_rj_riodejaneiro_gtfs.flows import * from pipelines.rj_smtr.br_rj_riodejaneiro_stu.flows import * from pipelines.rj_smtr.br_rj_riodejaneiro_diretorios.flows import * +from pipelines.rj_smtr.janitor.flows import * diff --git a/pipelines/rj_smtr/janitor/__init__.py b/pipelines/rj_smtr/janitor/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pipelines/rj_smtr/janitor/flows.py b/pipelines/rj_smtr/janitor/flows.py new file mode 100644 index 000000000..74b420dbc --- /dev/null +++ b/pipelines/rj_smtr/janitor/flows.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +"Flows for janitor" +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS +from pipelines.constants import constants as emd_constants +from pipelines.utils.decorators import Flow + +# from pipelines.rj_escritorio.cleanup.tasks import cancel_flow_run + +from pipelines.rj_smtr.schedules import every_10_minutes +from pipelines.rj_smtr.janitor.tasks import ( + get_active_flow_names, + query_archived_scheduled_runs, + cancel_flow_runs, +) + +with Flow( + "SMTR: Desagendamento de runs arquivadas", code_owners=["caio"] +) as janitor_flow: + flow_names = get_active_flow_names() + archived_flow_runs = query_archived_scheduled_runs.map(flow_name=flow_names) + cancel_flow_runs.map(flow_runs=archived_flow_runs) + +janitor_flow.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) +janitor_flow.run_config = KubernetesRun( + image=emd_constants.DOCKER_IMAGE.value, + labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], +) +janitor_flow.schedule = every_10_minutes diff --git a/pipelines/rj_smtr/janitor/tasks.py b/pipelines/rj_smtr/janitor/tasks.py new file mode 100644 index 000000000..b5d404876 --- /dev/null +++ b/pipelines/rj_smtr/janitor/tasks.py @@ -0,0 +1,140 @@ +# -*- coding: utf-8 -*- +from typing import Dict, List + +import pendulum +from prefect import task +from prefect.client import Client + +from pipelines.utils.utils import log, get_redis_client + + +def query_active_flow_names(prefix, prefect_client=None): + query = """ +query ($prefix: String, $offset: Int){ + flow( + where: { + name: {_like: $prefix}, + archived: {_eq: false}, + project: {name:{_eq:"main"}} + } + offset: $offset + ){ + name + version + } +} +""" + if not prefect_client: + prefect_client = Client() + variables = {"prefix": prefix, "offset": 0} + flow_names = [] + response = prefect_client.graphql(query=query, variables=variables)["data"] + for flow in response["flow"]: + flow_names.append(flow["name"]) + flow_names = list(set(flow_names)) + return flow_names + + +@task +def get_prefect_client(): + return Client() + + +@task +def get_active_flow_names(prefix="%SMTR%"): + flow_names = query_active_flow_names(prefix=prefix) + log(f"Got flow_names\n{flow_names[:10]}\n...\n{flow_names[-10:-1]}") + return flow_names + + +@task +def query_archived_scheduled_runs(flow_name, prefect_client=None): + """ + Queries the graphql API for scheduled flow_runs of + archived versions of + + Args: + flow_name (str): flow name + """ + query = """ +query($flow_name: String, $offset: Int){ + flow( + where:{ + name: {_eq:$flow_name}, + archived: {_eq:true}, + project: {name:{_eq:"main"}} + } + offset: $offset + order_by: {version:desc} + ){ + name + version + flow_runs( + where:{ + state: {_eq: "Scheduled"} + } + order_by: {version:desc} + ){ + id + scheduled_start_time + } + } +} +""" + if not prefect_client: + prefect_client = Client() + + variables = {"flow_name": flow_name, "offset": 0} + archived_flow_runs = [] + response = prefect_client.graphql(query=query, variables=variables)["data"] + + for flow in response["flow"]: + for flow_run in flow["flow_runs"]: + if flow["flow_runs"]: + archived_flow_runs.append(flow_run) + log( + f"Got flow_run {flow_run['id']}, scheduled: {flow_run['scheduled_start_time']}" + ) + # while len(response): + # if len(response["flow"]["flow_runs"]): + + # variables["offset"] += len(response) + # response = prefect_client.graphql(query=query, variables=variables) + if archived_flow_runs: + log(f"O Flow {flow_name} possui runs a serem canceladas") + return archived_flow_runs + + +@task +def cancel_flow_runs(flow_runs: List[Dict[str, str]], client: Client = None) -> None: + """ + Cancels a flow run from the API. + """ + if not flow_runs: + log("No flow runs to cancel") + return + flow_run_ids = [flow_run["id"] for flow_run in flow_runs] + log(f">>>>>>>>>> Cancelling flow runs\n{flow_run_ids}") + if not client: + client = Client() + + query = """ + mutation($flow_run_id: UUID!) { + cancel_flow_run ( + input: { + flow_run_id: $flow_run_id + } + ) { + state + } + } + """ + for flow_run_id in flow_run_ids: + try: + response = client.graphql( + query=query, variables=dict(flow_run_id=flow_run_id) + ) + state: str = response["data"]["cancel_flow_run"]["state"] + log(f">>>>>>>>>> Flow run {flow_run_id} is now {state}") + except Exception: + log(f"Flow_run {flow_run_id} could not be cancelled") From bd7ca8f0c64d6623a208956f9f9bf2298d12cacf Mon Sep 17 00:00:00 2001 From: hellcassius Date: Wed, 31 Jan 2024 17:31:42 -0300 Subject: [PATCH 2/9] add notification to discord --- pipelines/rj_smtr/janitor/flows.py | 4 ++-- pipelines/rj_smtr/janitor/tasks.py | 31 +++++++++++++++++++++++++----- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/pipelines/rj_smtr/janitor/flows.py b/pipelines/rj_smtr/janitor/flows.py index 74b420dbc..ef3c8193f 100644 --- a/pipelines/rj_smtr/janitor/flows.py +++ b/pipelines/rj_smtr/janitor/flows.py @@ -7,7 +7,7 @@ # from pipelines.rj_escritorio.cleanup.tasks import cancel_flow_run -from pipelines.rj_smtr.schedules import every_10_minutes +from pipelines.rj_smtr.schedules import every_5_minutes from pipelines.rj_smtr.janitor.tasks import ( get_active_flow_names, query_archived_scheduled_runs, @@ -26,4 +26,4 @@ image=emd_constants.DOCKER_IMAGE.value, labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], ) -janitor_flow.schedule = every_10_minutes +janitor_flow.schedule = every_5_minutes diff --git a/pipelines/rj_smtr/janitor/tasks.py b/pipelines/rj_smtr/janitor/tasks.py index b5d404876..6a4ebb0e9 100644 --- a/pipelines/rj_smtr/janitor/tasks.py +++ b/pipelines/rj_smtr/janitor/tasks.py @@ -5,7 +5,7 @@ from prefect import task from prefect.client import Client -from pipelines.utils.utils import log, get_redis_client +from pipelines.utils.utils import log, send_discord_message, get_vault_secret def query_active_flow_names(prefix, prefect_client=None): @@ -35,6 +35,16 @@ def query_active_flow_names(prefix, prefect_client=None): return flow_names +def send_cancelled_run_on_discord(cancelled_runs, flow_name, webhook_url): + message = f""" +O Flow {flow_name} teve {len(cancelled_runs)} canceladas. +Link para as runs:\n +""" + for run_id in cancelled_runs: + message.append(f"https://prefect.dados.rio/{run_id}") + send_discord_message(message=message, webhook_url=webhook_url) + + @task def get_prefect_client(): return Client() @@ -102,7 +112,7 @@ def query_archived_scheduled_runs(flow_name, prefect_client=None): # response = prefect_client.graphql(query=query, variables=variables) if archived_flow_runs: log(f"O Flow {flow_name} possui runs a serem canceladas") - return archived_flow_runs + return {"flow_name": flow_name, "flow_runs": archived_flow_runs} @task @@ -110,10 +120,11 @@ def cancel_flow_runs(flow_runs: List[Dict[str, str]], client: Client = None) -> """ Cancels a flow run from the API. """ - if not flow_runs: - log("No flow runs to cancel") + if not flow_runs["flow_runs"]: + log(f"O flow {flow_runs['flow_name']} não possui runs para cancelar") return - flow_run_ids = [flow_run["id"] for flow_run in flow_runs] + flow_run_ids = [flow_run["id"] for flow_run in flow_runs["flow_runs"]] + cancelled_runs = [] log(f">>>>>>>>>> Cancelling flow runs\n{flow_run_ids}") if not client: client = Client() @@ -136,5 +147,15 @@ def cancel_flow_runs(flow_runs: List[Dict[str, str]], client: Client = None) -> ) state: str = response["data"]["cancel_flow_run"]["state"] log(f">>>>>>>>>> Flow run {flow_run_id} is now {state}") + cancelled_runs.append(flow_run_id) except Exception: log(f"Flow_run {flow_run_id} could not be cancelled") + + # Notify cancellation + try: + url = get_vault_secret("cancelled_runs_webhook") + send_cancelled_run_on_discord( + cancelled_runs, flow_runs["flow_name"], webhook_url=url + ) + except Exception: + log("Could not get a webhook to send messages to") From 7686cd1382ac36b2a81170c5c427cec1e9143284 Mon Sep 17 00:00:00 2001 From: hellcassius Date: Thu, 1 Feb 2024 08:22:07 -0300 Subject: [PATCH 3/9] fix run link --- pipelines/rj_smtr/janitor/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/rj_smtr/janitor/tasks.py b/pipelines/rj_smtr/janitor/tasks.py index 6a4ebb0e9..16a745ca1 100644 --- a/pipelines/rj_smtr/janitor/tasks.py +++ b/pipelines/rj_smtr/janitor/tasks.py @@ -41,7 +41,7 @@ def send_cancelled_run_on_discord(cancelled_runs, flow_name, webhook_url): Link para as runs:\n """ for run_id in cancelled_runs: - message.append(f"https://prefect.dados.rio/{run_id}") + message.append(f"https://prefect.dados.rio/default/flow-run/{run_id}") send_discord_message(message=message, webhook_url=webhook_url) @@ -153,7 +153,7 @@ def cancel_flow_runs(flow_runs: List[Dict[str, str]], client: Client = None) -> # Notify cancellation try: - url = get_vault_secret("cancelled_runs_webhook") + url = get_vault_secret("cancelled_runs_webhook")["url"] send_cancelled_run_on_discord( cancelled_runs, flow_runs["flow_name"], webhook_url=url ) From 2367e470e1e34232f05cf6c39451f0407bd7910d Mon Sep 17 00:00:00 2001 From: hellcassius Date: Thu, 1 Feb 2024 08:24:09 -0300 Subject: [PATCH 4/9] remove unused code --- pipelines/rj_smtr/janitor/tasks.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pipelines/rj_smtr/janitor/tasks.py b/pipelines/rj_smtr/janitor/tasks.py index 16a745ca1..47e8db834 100644 --- a/pipelines/rj_smtr/janitor/tasks.py +++ b/pipelines/rj_smtr/janitor/tasks.py @@ -1,7 +1,6 @@ # -*- coding: utf-8 -*- from typing import Dict, List -import pendulum from prefect import task from prefect.client import Client @@ -105,11 +104,7 @@ def query_archived_scheduled_runs(flow_name, prefect_client=None): log( f"Got flow_run {flow_run['id']}, scheduled: {flow_run['scheduled_start_time']}" ) - # while len(response): - # if len(response["flow"]["flow_runs"]): - # variables["offset"] += len(response) - # response = prefect_client.graphql(query=query, variables=variables) if archived_flow_runs: log(f"O Flow {flow_name} possui runs a serem canceladas") return {"flow_name": flow_name, "flow_runs": archived_flow_runs} From 0e41d4146eb5f460a2584fe9d3fe97d7e4aff942 Mon Sep 17 00:00:00 2001 From: Boris Marinho Date: Tue, 6 Feb 2024 18:38:08 -0300 Subject: [PATCH 5/9] modifica a forma de fazer a limpeza das runs antigas --- pipelines/rj_smtr/janitor/flows.py | 18 ++-- pipelines/rj_smtr/janitor/tasks.py | 135 ++++++++++++++++++++++------- 2 files changed, 113 insertions(+), 40 deletions(-) diff --git a/pipelines/rj_smtr/janitor/flows.py b/pipelines/rj_smtr/janitor/flows.py index ef3c8193f..01b6e33fb 100644 --- a/pipelines/rj_smtr/janitor/flows.py +++ b/pipelines/rj_smtr/janitor/flows.py @@ -4,22 +4,22 @@ from prefect.storage import GCS from pipelines.constants import constants as emd_constants from pipelines.utils.decorators import Flow - -# from pipelines.rj_escritorio.cleanup.tasks import cancel_flow_run - +from prefect.utilities.edges import unmapped from pipelines.rj_smtr.schedules import every_5_minutes from pipelines.rj_smtr.janitor.tasks import ( - get_active_flow_names, - query_archived_scheduled_runs, - cancel_flow_runs, + query_active_flow_names, + query_not_active_flows, + cancel_flows, + get_prefect_client ) with Flow( "SMTR: Desagendamento de runs arquivadas", code_owners=["caio"] ) as janitor_flow: - flow_names = get_active_flow_names() - archived_flow_runs = query_archived_scheduled_runs.map(flow_name=flow_names) - cancel_flow_runs.map(flow_runs=archived_flow_runs) + client = get_prefect_client() + flows = query_active_flow_names(prefect_client=client) + archived_flow_runs = query_not_active_flows.map(flows=flows, prefect_client=unmapped(client)) + cancel_flows.map(flows=archived_flow_runs, prefect_client=unmapped(client)) janitor_flow.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) janitor_flow.run_config = KubernetesRun( diff --git a/pipelines/rj_smtr/janitor/tasks.py b/pipelines/rj_smtr/janitor/tasks.py index 47e8db834..917a1f8d6 100644 --- a/pipelines/rj_smtr/janitor/tasks.py +++ b/pipelines/rj_smtr/janitor/tasks.py @@ -1,13 +1,16 @@ # -*- coding: utf-8 -*- from typing import Dict, List +from datetime import datetime from prefect import task from prefect.client import Client -from pipelines.utils.utils import log, send_discord_message, get_vault_secret +from pipelines.utils.utils import log, get_vault_secret +import requests -def query_active_flow_names(prefix, prefect_client=None): +@task +def query_active_flow_names(prefix='%SMTR%', prefect_client=None): query = """ query ($prefix: String, $offset: Int){ flow( @@ -26,22 +29,89 @@ def query_active_flow_names(prefix, prefect_client=None): if not prefect_client: prefect_client = Client() variables = {"prefix": prefix, "offset": 0} - flow_names = [] + # flow_names = [] response = prefect_client.graphql(query=query, variables=variables)["data"] + active_flows = [] for flow in response["flow"]: - flow_names.append(flow["name"]) - flow_names = list(set(flow_names)) - return flow_names + active_flows.append((flow["name"], flow["version"])) + # flow_names.append(flow["name"]) + # flow_names = list(set(flow_names)) + active_flows = list(set(active_flows)) + return active_flows + +@task +def query_not_active_flows(flows, prefect_client=None): + """ + Queries the graphql API for scheduled flow_runs of + archived versions of + + Args: + flow_name (str): flow name + """ + flow_name, last_version = flows + now = datetime.now().isoformat() + query = """ +query($flow_name: String, $last_version: Int, $now: timestamptz!, $offset: Int){ + flow( + where:{ + name: {_eq:$flow_name}, + version: {_lt:$last_version} + project: {name:{_eq:"main"}} + } + offset: $offset + order_by: {version:desc} + ){ + id + name + version + flow_runs( + where:{ + scheduled_start_time: {_gte: $now}, + state: {_neq: "Cancelled"} + } + order_by: {version:desc} + ){ + id + scheduled_start_time + } + } +} +""" + if not prefect_client: + prefect_client = Client() + + variables = {"flow_name": flow_name, "last_version": last_version, "now": now, "offset": 0} + archived_flows = [] + response = prefect_client.graphql(query=query, variables=variables)["data"] + # log(response) + for flow in response["flow"]: + if flow["flow_runs"]: + try: + archived_flows.append({'id': flow['id'], 'name': flow['name'], 'version': flow['version'], 'count': len(flow['flow_runs'])}) + # log( + # f"Insurgent flow {flow['name']}, version: {flow['version']}, count: {len(flow['flow_runs'])}" + # ) + except: + log(flow) + + return archived_flows +def send_cancelled_run_on_discord(flows, webhook_url): -def send_cancelled_run_on_discord(cancelled_runs, flow_name, webhook_url): message = f""" -O Flow {flow_name} teve {len(cancelled_runs)} canceladas. -Link para as runs:\n +Os Flows de nome {flows[0]['name']} tiveram as seguintes versões arquivadas: +Link para as versões:\n """ - for run_id in cancelled_runs: - message.append(f"https://prefect.dados.rio/default/flow-run/{run_id}") - send_discord_message(message=message, webhook_url=webhook_url) + for flow in flows: + message.append(f"Versão {flow['version']}: https://prefect.dados.rio/default/flow-run/{flow['id']}") + + r = requests.post( + webhook_url, + data={"content": message}, + ) + + log(r.status_code) + log(r.text) @task @@ -111,46 +181,49 @@ def query_archived_scheduled_runs(flow_name, prefect_client=None): @task -def cancel_flow_runs(flow_runs: List[Dict[str, str]], client: Client = None) -> None: +def cancel_flows(flows, prefect_client: Client = None) -> None: """ Cancels a flow run from the API. """ - if not flow_runs["flow_runs"]: - log(f"O flow {flow_runs['flow_name']} não possui runs para cancelar") + if not flows: + # log(f"O flow {flow_runs['flow_name']} não possui runs para cancelar") return - flow_run_ids = [flow_run["id"] for flow_run in flow_runs["flow_runs"]] - cancelled_runs = [] - log(f">>>>>>>>>> Cancelling flow runs\n{flow_run_ids}") - if not client: - client = Client() + log(f">>>>>>>>>> Cancelling flows") + + if not prefect_client: + prefect_client = Client() query = """ - mutation($flow_run_id: UUID!) { - cancel_flow_run ( + mutation($flow_id: UUID!) { + archive_flow ( input: { - flow_run_id: $flow_run_id + flow_id: $flow_id } ) { - state + archived } } """ - for flow_run_id in flow_run_ids: + cancelled_flows = [] + import traceback + for flow in flows: try: - response = client.graphql( - query=query, variables=dict(flow_run_id=flow_run_id) + response = prefect_client.graphql( + query=query, variables=dict(flow_id=flow['id']) ) state: str = response["data"]["cancel_flow_run"]["state"] - log(f">>>>>>>>>> Flow run {flow_run_id} is now {state}") - cancelled_runs.append(flow_run_id) + log(f">>>>>>>>>> Flow run {flow['id']} arquivada") + cancelled_flows.append(flow) except Exception: - log(f"Flow_run {flow_run_id} could not be cancelled") + log(traceback.format_exc()) + log(f"Flow {flow['id']} could not be cancelled") # Notify cancellation + try: url = get_vault_secret("cancelled_runs_webhook")["url"] send_cancelled_run_on_discord( - cancelled_runs, flow_runs["flow_name"], webhook_url=url + cancelled_flows, flows, webhook_url=url ) except Exception: log("Could not get a webhook to send messages to") From a49f73d7060620c0bfcc5078c3bd0797fec5b0fe Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 6 Feb 2024 21:39:19 +0000 Subject: [PATCH 6/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pipelines/rj_smtr/janitor/flows.py | 6 +++-- pipelines/rj_smtr/janitor/tasks.py | 35 +++++++++++++++++++++--------- 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/pipelines/rj_smtr/janitor/flows.py b/pipelines/rj_smtr/janitor/flows.py index 01b6e33fb..6c0e4f10e 100644 --- a/pipelines/rj_smtr/janitor/flows.py +++ b/pipelines/rj_smtr/janitor/flows.py @@ -10,7 +10,7 @@ query_active_flow_names, query_not_active_flows, cancel_flows, - get_prefect_client + get_prefect_client, ) with Flow( @@ -18,7 +18,9 @@ ) as janitor_flow: client = get_prefect_client() flows = query_active_flow_names(prefect_client=client) - archived_flow_runs = query_not_active_flows.map(flows=flows, prefect_client=unmapped(client)) + archived_flow_runs = query_not_active_flows.map( + flows=flows, prefect_client=unmapped(client) + ) cancel_flows.map(flows=archived_flow_runs, prefect_client=unmapped(client)) janitor_flow.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) diff --git a/pipelines/rj_smtr/janitor/tasks.py b/pipelines/rj_smtr/janitor/tasks.py index 917a1f8d6..793d7897f 100644 --- a/pipelines/rj_smtr/janitor/tasks.py +++ b/pipelines/rj_smtr/janitor/tasks.py @@ -9,8 +9,9 @@ import requests + @task -def query_active_flow_names(prefix='%SMTR%', prefect_client=None): +def query_active_flow_names(prefix="%SMTR%", prefect_client=None): query = """ query ($prefix: String, $offset: Int){ flow( @@ -39,6 +40,7 @@ def query_active_flow_names(prefix='%SMTR%', prefect_client=None): active_flows = list(set(active_flows)) return active_flows + @task def query_not_active_flows(flows, prefect_client=None): """ @@ -80,14 +82,26 @@ def query_not_active_flows(flows, prefect_client=None): if not prefect_client: prefect_client = Client() - variables = {"flow_name": flow_name, "last_version": last_version, "now": now, "offset": 0} + variables = { + "flow_name": flow_name, + "last_version": last_version, + "now": now, + "offset": 0, + } archived_flows = [] response = prefect_client.graphql(query=query, variables=variables)["data"] # log(response) for flow in response["flow"]: if flow["flow_runs"]: try: - archived_flows.append({'id': flow['id'], 'name': flow['name'], 'version': flow['version'], 'count': len(flow['flow_runs'])}) + archived_flows.append( + { + "id": flow["id"], + "name": flow["name"], + "version": flow["version"], + "count": len(flow["flow_runs"]), + } + ) # log( # f"Insurgent flow {flow['name']}, version: {flow['version']}, count: {len(flow['flow_runs'])}" # ) @@ -96,14 +110,16 @@ def query_not_active_flows(flows, prefect_client=None): return archived_flows -def send_cancelled_run_on_discord(flows, webhook_url): +def send_cancelled_run_on_discord(flows, webhook_url): message = f""" Os Flows de nome {flows[0]['name']} tiveram as seguintes versões arquivadas: Link para as versões:\n """ for flow in flows: - message.append(f"Versão {flow['version']}: https://prefect.dados.rio/default/flow-run/{flow['id']}") + message.append( + f"Versão {flow['version']}: https://prefect.dados.rio/default/flow-run/{flow['id']}" + ) r = requests.post( webhook_url, @@ -206,10 +222,11 @@ def cancel_flows(flows, prefect_client: Client = None) -> None: """ cancelled_flows = [] import traceback + for flow in flows: try: response = prefect_client.graphql( - query=query, variables=dict(flow_id=flow['id']) + query=query, variables=dict(flow_id=flow["id"]) ) state: str = response["data"]["cancel_flow_run"]["state"] log(f">>>>>>>>>> Flow run {flow['id']} arquivada") @@ -219,11 +236,9 @@ def cancel_flows(flows, prefect_client: Client = None) -> None: log(f"Flow {flow['id']} could not be cancelled") # Notify cancellation - + try: url = get_vault_secret("cancelled_runs_webhook")["url"] - send_cancelled_run_on_discord( - cancelled_flows, flows, webhook_url=url - ) + send_cancelled_run_on_discord(cancelled_flows, flows, webhook_url=url) except Exception: log("Could not get a webhook to send messages to") From 55523fcbcc08a950c21dd09a44e4616837de9c1c Mon Sep 17 00:00:00 2001 From: hellcassius Date: Wed, 7 Feb 2024 15:58:13 -0300 Subject: [PATCH 7/9] add condition to query --- pipelines/rj_smtr/janitor/tasks.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/pipelines/rj_smtr/janitor/tasks.py b/pipelines/rj_smtr/janitor/tasks.py index 793d7897f..797ed8d3b 100644 --- a/pipelines/rj_smtr/janitor/tasks.py +++ b/pipelines/rj_smtr/janitor/tasks.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- from typing import Dict, List +import traceback from datetime import datetime from prefect import task @@ -69,7 +70,7 @@ def query_not_active_flows(flows, prefect_client=None): flow_runs( where:{ scheduled_start_time: {_gte: $now}, - state: {_neq: "Cancelled"} + state: {_nin: ["Cancelled", "Running"]} } order_by: {version:desc} ){ @@ -102,10 +103,7 @@ def query_not_active_flows(flows, prefect_client=None): "count": len(flow["flow_runs"]), } ) - # log( - # f"Insurgent flow {flow['name']}, version: {flow['version']}, count: {len(flow['flow_runs'])}" - # ) - except: + except Exception: log(flow) return archived_flows @@ -204,7 +202,7 @@ def cancel_flows(flows, prefect_client: Client = None) -> None: if not flows: # log(f"O flow {flow_runs['flow_name']} não possui runs para cancelar") return - log(f">>>>>>>>>> Cancelling flows") + log(">>>>>>>>>> Cancelling flows") if not prefect_client: prefect_client = Client() @@ -221,14 +219,14 @@ def cancel_flows(flows, prefect_client: Client = None) -> None: } """ cancelled_flows = [] - import traceback for flow in flows: try: response = prefect_client.graphql( query=query, variables=dict(flow_id=flow["id"]) ) - state: str = response["data"]["cancel_flow_run"]["state"] + # state: str = response["data"]["cancel_flow_run"]["state"] + log(response) log(f">>>>>>>>>> Flow run {flow['id']} arquivada") cancelled_flows.append(flow) except Exception: From b447952b7c95897c94c2e327a4704e8cf63dc8f5 Mon Sep 17 00:00:00 2001 From: hellcassius Date: Thu, 8 Feb 2024 09:52:00 -0300 Subject: [PATCH 8/9] fix mutation --- pipelines/rj_smtr/janitor/tasks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pipelines/rj_smtr/janitor/tasks.py b/pipelines/rj_smtr/janitor/tasks.py index 797ed8d3b..3c6d571bf 100644 --- a/pipelines/rj_smtr/janitor/tasks.py +++ b/pipelines/rj_smtr/janitor/tasks.py @@ -214,7 +214,7 @@ def cancel_flows(flows, prefect_client: Client = None) -> None: flow_id: $flow_id } ) { - archived + success } } """ @@ -239,4 +239,5 @@ def cancel_flows(flows, prefect_client: Client = None) -> None: url = get_vault_secret("cancelled_runs_webhook")["url"] send_cancelled_run_on_discord(cancelled_flows, flows, webhook_url=url) except Exception: + log(traceback.format_exc()) log("Could not get a webhook to send messages to") From ad596599fbe7bc59691ac077189179a38fe2ea72 Mon Sep 17 00:00:00 2001 From: hellcassius Date: Thu, 8 Feb 2024 10:15:56 -0300 Subject: [PATCH 9/9] fix query_not_active_flows --- pipelines/rj_smtr/janitor/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/rj_smtr/janitor/tasks.py b/pipelines/rj_smtr/janitor/tasks.py index 3c6d571bf..c8d309cf1 100644 --- a/pipelines/rj_smtr/janitor/tasks.py +++ b/pipelines/rj_smtr/janitor/tasks.py @@ -70,7 +70,7 @@ def query_not_active_flows(flows, prefect_client=None): flow_runs( where:{ scheduled_start_time: {_gte: $now}, - state: {_nin: ["Cancelled", "Running"]} + state: {_nin: ["Cancelled"]} } order_by: {version:desc} ){