Skip to content

Commit

Permalink
fix map typing (#15757)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz authored Oct 20, 2024
1 parent cd4994c commit 1cd1a9e
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 26 deletions.
6 changes: 5 additions & 1 deletion flows/deploy.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import shutil
import subprocess
import sys
from pathlib import Path

import anyio

Expand Down Expand Up @@ -49,11 +51,13 @@ def main():

finally:
subprocess.check_call(
["prefect", "work-pool", "delete", "test-pool"],
["prefect", "--no-prompt", "work-pool", "delete", "test-pool"],
stdout=sys.stdout,
stderr=sys.stderr,
)

shutil.rmtree(Path(__file__).parent.parent / "prefect-recipes")


if __name__ == "__main__":
main()
4 changes: 2 additions & 2 deletions src/prefect/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,10 @@ def wait(self, timeout: Optional[float] = None) -> None:
wait(self, timeout=timeout)

def result(
self,
self: "PrefectFutureList[R]",
timeout: Optional[float] = None,
raise_on_failure: bool = True,
) -> List:
) -> List[R]:
"""
Get the results of all task runs associated with the futures in the list.
Expand Down
72 changes: 49 additions & 23 deletions src/prefect/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,8 @@ def submit(
task runner. This call only blocks execution while the task is being submitted,
once it is submitted, the flow function will continue executing.
This method is always synchronous, even if the underlying user function is asynchronous.
Args:
*args: Arguments to run the task with
return_state: Return the result of the flow run wrapped in a
Expand Down Expand Up @@ -1112,7 +1114,7 @@ def submit(
>>>
>>> @flow
>>> async def my_flow():
>>> await my_async_task.submit()
>>> my_async_task.submit()
Run a sync task in an async flow
Expand Down Expand Up @@ -1170,51 +1172,73 @@ def submit(

@overload
def map(
self: "Task[P, NoReturn]",
*args: P.args,
**kwargs: P.kwargs,
) -> PrefectFutureList[PrefectFuture[NoReturn]]:
self: "Task[P, R]",
*args: Any,
return_state: Literal[True],
wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ...,
deferred: bool = ...,
**kwargs: Any,
) -> List[State[R]]:
...

@overload
def map(
self: "Task[P, Coroutine[Any, Any, T]]",
*args: P.args,
**kwargs: P.kwargs,
) -> PrefectFutureList[PrefectFuture[T]]:
self: "Task[P, R]",
*args: Any,
wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ...,
deferred: bool = ...,
**kwargs: Any,
) -> PrefectFutureList[R]:
...

@overload
def map(
self: "Task[P, T]",
*args: P.args,
**kwargs: P.kwargs,
) -> PrefectFutureList[PrefectFuture[T]]:
self: "Task[P, R]",
*args: Any,
return_state: Literal[True],
wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ...,
deferred: bool = ...,
**kwargs: Any,
) -> List[State[R]]:
...

@overload
def map(
self: "Task[P, Coroutine[Any, Any, T]]",
*args: P.args,
return_state: Literal[True],
**kwargs: P.kwargs,
) -> PrefectFutureList[State[T]]:
self: "Task[P, R]",
*args: Any,
wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ...,
deferred: bool = ...,
**kwargs: Any,
) -> PrefectFutureList[R]:
...

@overload
def map(
self: "Task[P, T]",
*args: P.args,
self: "Task[P, Coroutine[Any, Any, R]]",
*args: Any,
return_state: Literal[True],
**kwargs: P.kwargs,
) -> PrefectFutureList[State[T]]:
wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ...,
deferred: bool = ...,
**kwargs: Any,
) -> List[State[R]]:
...

@overload
def map(
self: "Task[P, Coroutine[Any, Any, R]]",
*args: Any,
return_state: Literal[False],
wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ...,
deferred: bool = ...,
**kwargs: Any,
) -> PrefectFutureList[R]:
...

def map(
self,
*args: Any,
return_state: bool = False,
wait_for: Optional[Iterable[PrefectFuture]] = None,
wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = None,
deferred: bool = False,
**kwargs: Any,
):
Expand All @@ -1235,6 +1259,8 @@ def map(
also blocks while the tasks are being submitted, once they are
submitted, the flow function will continue executing.
This method is always synchronous, even if the underlying user function is asynchronous.
Args:
*args: Iterable and static arguments to run the tasks with
return_state: Return a list of Prefect States that wrap the results
Expand Down

0 comments on commit 1cd1a9e

Please sign in to comment.