diff --git a/flows/deploy.py b/flows/deploy.py index 86cfdec9ddc2..e6865374b36b 100644 --- a/flows/deploy.py +++ b/flows/deploy.py @@ -1,5 +1,7 @@ +import shutil import subprocess import sys +from pathlib import Path import anyio @@ -49,11 +51,13 @@ def main(): finally: subprocess.check_call( - ["prefect", "work-pool", "delete", "test-pool"], + ["prefect", "--no-prompt", "work-pool", "delete", "test-pool"], stdout=sys.stdout, stderr=sys.stderr, ) + shutil.rmtree(Path(__file__).parent.parent / "prefect-recipes") + if __name__ == "__main__": main() diff --git a/src/prefect/futures.py b/src/prefect/futures.py index c90442135ec1..8fa027878bcd 100644 --- a/src/prefect/futures.py +++ b/src/prefect/futures.py @@ -298,10 +298,10 @@ def wait(self, timeout: Optional[float] = None) -> None: wait(self, timeout=timeout) def result( - self, + self: "PrefectFutureList[R]", timeout: Optional[float] = None, raise_on_failure: bool = True, - ) -> List: + ) -> List[R]: """ Get the results of all task runs associated with the futures in the list. diff --git a/src/prefect/tasks.py b/src/prefect/tasks.py index cdc95ae55661..a6e27f05637e 100644 --- a/src/prefect/tasks.py +++ b/src/prefect/tasks.py @@ -1060,6 +1060,8 @@ def submit( task runner. This call only blocks execution while the task is being submitted, once it is submitted, the flow function will continue executing. + This method is always synchronous, even if the underlying user function is asynchronous. + Args: *args: Arguments to run the task with return_state: Return the result of the flow run wrapped in a @@ -1112,7 +1114,7 @@ def submit( >>> >>> @flow >>> async def my_flow(): - >>> await my_async_task.submit() + >>> my_async_task.submit() Run a sync task in an async flow @@ -1170,51 +1172,73 @@ def submit( @overload def map( - self: "Task[P, NoReturn]", - *args: P.args, - **kwargs: P.kwargs, - ) -> PrefectFutureList[PrefectFuture[NoReturn]]: + self: "Task[P, R]", + *args: Any, + return_state: Literal[True], + wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ..., + deferred: bool = ..., + **kwargs: Any, + ) -> List[State[R]]: ... @overload def map( - self: "Task[P, Coroutine[Any, Any, T]]", - *args: P.args, - **kwargs: P.kwargs, - ) -> PrefectFutureList[PrefectFuture[T]]: + self: "Task[P, R]", + *args: Any, + wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ..., + deferred: bool = ..., + **kwargs: Any, + ) -> PrefectFutureList[R]: ... @overload def map( - self: "Task[P, T]", - *args: P.args, - **kwargs: P.kwargs, - ) -> PrefectFutureList[PrefectFuture[T]]: + self: "Task[P, R]", + *args: Any, + return_state: Literal[True], + wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ..., + deferred: bool = ..., + **kwargs: Any, + ) -> List[State[R]]: ... @overload def map( - self: "Task[P, Coroutine[Any, Any, T]]", - *args: P.args, - return_state: Literal[True], - **kwargs: P.kwargs, - ) -> PrefectFutureList[State[T]]: + self: "Task[P, R]", + *args: Any, + wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ..., + deferred: bool = ..., + **kwargs: Any, + ) -> PrefectFutureList[R]: ... @overload def map( - self: "Task[P, T]", - *args: P.args, + self: "Task[P, Coroutine[Any, Any, R]]", + *args: Any, return_state: Literal[True], - **kwargs: P.kwargs, - ) -> PrefectFutureList[State[T]]: + wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ..., + deferred: bool = ..., + **kwargs: Any, + ) -> List[State[R]]: + ... + + @overload + def map( + self: "Task[P, Coroutine[Any, Any, R]]", + *args: Any, + return_state: Literal[False], + wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = ..., + deferred: bool = ..., + **kwargs: Any, + ) -> PrefectFutureList[R]: ... def map( self, *args: Any, return_state: bool = False, - wait_for: Optional[Iterable[PrefectFuture]] = None, + wait_for: Optional[Iterable[Union[PrefectFuture[T], T]]] = None, deferred: bool = False, **kwargs: Any, ): @@ -1235,6 +1259,8 @@ def map( also blocks while the tasks are being submitted, once they are submitted, the flow function will continue executing. + This method is always synchronous, even if the underlying user function is asynchronous. + Args: *args: Iterable and static arguments to run the tasks with return_state: Return a list of Prefect States that wrap the results