Skip to content

Commit

Permalink
Thread pool task runner max worker configurable via environment varia…
Browse files Browse the repository at this point in the history
…ble (#15719)

Co-authored-by: nate nowack <[email protected]>
  • Loading branch information
soamicharan and zzstoatzz authored Oct 17, 2024
1 parent 0eecd59 commit b7b79a4
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 1 deletion.
6 changes: 6 additions & 0 deletions src/prefect/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -1644,6 +1644,12 @@ class Settings(PrefectBaseSettings):
description="Which cache implementation to use for the events system. Should point to a module that exports a Cache class.",
)

task_runner_thread_pool_max_workers: Optional[int] = Field(
default=None,
gt=0,
description="The maximum number of workers for ThreadPoolTaskRunner.",
)

###########################################################################
# allow deprecated access to PREFECT_SOME_SETTING_NAME

Expand Down
7 changes: 6 additions & 1 deletion src/prefect/task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
PrefectFutureList,
)
from prefect.logging.loggers import get_logger, get_run_logger
from prefect.settings import PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS
from prefect.utilities.annotations import allow_failure, quote, unmapped
from prefect.utilities.callables import (
collapse_variadic_parameters,
Expand Down Expand Up @@ -220,7 +221,11 @@ class ThreadPoolTaskRunner(TaskRunner[PrefectConcurrentFuture]):
def __init__(self, max_workers: Optional[int] = None):
super().__init__()
self._executor: Optional[ThreadPoolExecutor] = None
self._max_workers = sys.maxsize if max_workers is None else max_workers
self._max_workers = (
(PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS.value() or sys.maxsize)
if max_workers is None
else max_workers
)
self._cancel_events: Dict[uuid.UUID, threading.Event] = {}

def duplicate(self) -> "ThreadPoolTaskRunner":
Expand Down
1 change: 1 addition & 0 deletions tests/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@
"PREFECT_WORKER_QUERY_SECONDS": {"test_value": 10.0},
"PREFECT_WORKER_WEBSERVER_HOST": {"test_value": "host"},
"PREFECT_WORKER_WEBSERVER_PORT": {"test_value": 8080},
"PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS": {"test_value": 5},
}


Expand Down
6 changes: 6 additions & 0 deletions tests/test_task_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from prefect.results import _default_storages
from prefect.settings import (
PREFECT_DEFAULT_RESULT_STORAGE_BLOCK,
PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS,
PREFECT_TASK_SCHEDULING_DEFAULT_STORAGE_BLOCK,
temporary_settings,
)
Expand Down Expand Up @@ -94,6 +95,11 @@ def test_set_max_workers(self):
with ThreadPoolTaskRunner(max_workers=2) as runner:
assert runner._executor._max_workers == 2

def test_set_max_workers_through_settings(self):
with temporary_settings({PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS: 5}):
with ThreadPoolTaskRunner() as runner:
assert runner._executor._max_workers == 5

def test_submit_sync_task(self):
with ThreadPoolTaskRunner() as runner:
parameters = {"param1": 1, "param2": 2}
Expand Down

0 comments on commit b7b79a4

Please sign in to comment.