Skip to content

Commit

Permalink
Removes the orchestration rule and background task to timeout pending…
Browse files Browse the repository at this point in the history
… task runs (#14772)
  • Loading branch information
chrisguidry authored Jul 26, 2024
1 parent 4233e56 commit 530521d
Show file tree
Hide file tree
Showing 7 changed files with 0 additions and 260 deletions.
5 changes: 0 additions & 5 deletions docs/3.0rc/api-ref/rest-api/server/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -22339,11 +22339,6 @@
"title": "Prefect Worker Webserver Port",
"default": 8080
},
"PREFECT_API_SERVICES_TASK_SCHEDULING_ENABLED": {
"type": "boolean",
"title": "Prefect Api Services Task Scheduling Enabled",
"default": true
},
"PREFECT_TASK_SCHEDULING_DEFAULT_STORAGE_BLOCK": {
"anyOf": [
{
Expand Down
3 changes: 0 additions & 3 deletions docs/mkdocs/prefect/server/services/task_scheduling.md

This file was deleted.

3 changes: 0 additions & 3 deletions src/prefect/server/api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,9 +590,6 @@ async def start_services():
if prefect.settings.PREFECT_API_SERVICES_FOREMAN_ENABLED.value():
service_instances.append(services.foreman.Foreman())

if prefect.settings.PREFECT_API_SERVICES_TASK_SCHEDULING_ENABLED.value():
service_instances.append(services.task_scheduling.TaskSchedulingTimeouts())

if prefect.settings.PREFECT_API_SERVICES_TRIGGERS_ENABLED.value():
service_instances.append(ReactiveTriggers())
service_instances.append(ProactiveTriggers())
Expand Down
1 change: 0 additions & 1 deletion src/prefect/server/services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,3 @@
import prefect.server.services.pause_expirations
import prefect.server.services.scheduler
import prefect.server.services.telemetry
import prefect.server.services.task_scheduling
128 changes: 0 additions & 128 deletions src/prefect/server/services/task_scheduling.py

This file was deleted.

5 changes: 0 additions & 5 deletions src/prefect/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -1383,11 +1383,6 @@ def default_cloud_ui_url(settings, value):
The port the worker's webserver should bind to.
"""

PREFECT_API_SERVICES_TASK_SCHEDULING_ENABLED = Setting(bool, default=True)
"""
Whether or not to start the task scheduling service in the server application.
"""

PREFECT_TASK_SCHEDULING_DEFAULT_STORAGE_BLOCK = Setting(Optional[str], default=None)
"""The `block-type/block-document` slug of a block to use as the default storage
for autonomous tasks."""
Expand Down
115 changes: 0 additions & 115 deletions tests/test_background_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,20 @@
from unittest import mock

import pytest
from exceptiongroup import ExceptionGroup, catch

import prefect.results
from prefect import Task, task, unmapped
from prefect.blocks.core import Block
from prefect.client.orchestration import get_client
from prefect.client.schemas import TaskRun
from prefect.client.schemas.objects import StateType
from prefect.filesystems import LocalFileSystem
from prefect.results import ResultFactory
from prefect.server.api.task_runs import TaskQueue
from prefect.server.schemas.core import TaskRun as ServerTaskRun
from prefect.server.services.task_scheduling import TaskSchedulingTimeouts
from prefect.settings import (
PREFECT_TASK_SCHEDULING_DEFAULT_STORAGE_BLOCK,
PREFECT_TASK_SCHEDULING_PENDING_TASK_TIMEOUT,
temporary_settings,
)
from prefect.task_worker import TaskWorker

if TYPE_CHECKING:
from prefect.client.orchestration import PrefectClient
Expand Down Expand Up @@ -229,116 +224,6 @@ async def prefect_client() -> AsyncGenerator["PrefectClient", None]:
yield client


@pytest.fixture
def enabled_task_scheduling_pending_task_timeout():
with temporary_settings({PREFECT_TASK_SCHEDULING_PENDING_TASK_TIMEOUT: 30}):
yield


async def test_scheduled_tasks_are_restored_at_server_startup(
foo_task_with_result_storage: Task,
prefect_client: "PrefectClient",
enabled_task_scheduling_pending_task_timeout: None,
):
# run one iteration of the timeouts service
service = TaskSchedulingTimeouts()
await service.start(loops=1)

# schedule a task
task_run_future = foo_task_with_result_storage.apply_async((42,))
task_run = await prefect_client.read_task_run(task_run_future.task_run_id)
assert task_run.state.is_scheduled()

# pull the task from the queue to make sure it's cleared; this simulates when a task
# server pulls a task, then the prefect server dies
enqueued: TaskRun = await TaskQueue.for_key(task_run.task_key).get()
assert enqueued.id == task_run.id

# verify there is no other task in queue
with pytest.raises(asyncio.QueueEmpty):
await TaskQueue.for_key(task_run.task_key).get_nowait()

# Run another loop to show that the task is NOT re-enqueued after the first run
await service.start(loops=1)

with pytest.raises(asyncio.QueueEmpty):
await TaskQueue.for_key(task_run.task_key).get_nowait()

# now emulate that we've restarted the Prefect server by resetting the
# TaskSchedulingTimeouts service
service = TaskSchedulingTimeouts()
await service.start(loops=1)

# the task will still be SCHEDULED
task_run = await prefect_client.read_task_run(task_run.id)
assert task_run.state.type == StateType.SCHEDULED

# ...and it should be re-enqueued
enqueued: TaskRun = await TaskQueue.for_key(task_run.task_key).get()
assert enqueued.id == task_run.id


async def test_stuck_pending_tasks_are_reenqueued(
foo_task_with_result_storage: Task,
prefect_client: "PrefectClient",
enabled_task_scheduling_pending_task_timeout: None,
):
task_run_future = foo_task_with_result_storage.apply_async((42,))
task_run = await prefect_client.read_task_run(task_run_future.task_run_id)
assert task_run.state.is_scheduled()

# now we simulate a stuck task by having the TaskWorker try to run it but fail
server = TaskWorker(foo_task_with_result_storage)

def assert_exception(exc_group: ExceptionGroup):
assert len(exc_group.exceptions) == 1
assert isinstance(exc_group.exceptions[0], ValueError)
assert "woops" in str(exc_group.exceptions[0])

with catch({ValueError: assert_exception}):
with mock.patch(
"prefect.task_worker.run_task_sync",
side_effect=ValueError("woops"),
):
await server.execute_task_run(task_run)

# now the task will be in a stuck pending state
task_run = await prefect_client.read_task_run(task_run.id)
assert task_run.state.type == StateType.PENDING

# first, run an iteration of the TaskSchedulingTimeouts loop service with the
# setting disabled to demonstrate that it will not re-schedule the task
with temporary_settings({PREFECT_TASK_SCHEDULING_PENDING_TASK_TIMEOUT: 0}):
await TaskSchedulingTimeouts().start(loops=1)

# the task will still be PENDING and not re-enqueued
task_run = await prefect_client.read_task_run(task_run.id)
assert task_run.state.type == StateType.PENDING

# now run an iteration of the TaskSchedulingTimeouts loop service with an absurdly
# long timeout so that it will never happen
with temporary_settings({PREFECT_TASK_SCHEDULING_PENDING_TASK_TIMEOUT: 1000000}):
await TaskSchedulingTimeouts().start(loops=1)

# the task will still be PENDING
task_run = await prefect_client.read_task_run(task_run.id)
assert task_run.state.type == StateType.PENDING

# now run an iteration of the TaskSchedulingTimeouts loop service with a short
# timeout so we can sleep past it and ensure that this task run will get picked up
with temporary_settings({PREFECT_TASK_SCHEDULING_PENDING_TASK_TIMEOUT: 0.1}):
await asyncio.sleep(0.2)
await TaskSchedulingTimeouts().start(loops=1)

# now the task will now be SCHEDULED
task_run = await prefect_client.read_task_run(task_run.id)
assert task_run.state.type == StateType.SCHEDULED

# ...and it should be re-enqueued
enqueued: TaskRun = await TaskQueue.for_key(task_run.task_key).get()
assert enqueued.id == task_run.id


class TestCall:
async def test_call(self, async_foo_task):
result = await async_foo_task(42)
Expand Down

0 comments on commit 530521d

Please sign in to comment.