Skip to content

Commit

Permalink
bpo-32309: Implement asyncio.to_thread() (pythonGH-20143)
Browse files Browse the repository at this point in the history
Implements `asyncio.to_thread`, a coroutine for asynchronously running IO-bound functions in a separate thread without blocking the event loop. See the discussion starting from [here](python#18410 (comment)) in pythonGH-18410 for context.

Automerge-Triggered-By: @aeros
  • Loading branch information
aeros authored and arturoescaip committed May 24, 2020
1 parent 5254f41 commit 5946f7b
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 0 deletions.
3 changes: 3 additions & 0 deletions Doc/library/asyncio-api-index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ await on multiple things with timeouts.
* - :class:`Task`
- Task object.

* - :func:`to_thread`
- Asychronously run a function in a separate OS thread.

* - :func:`run_coroutine_threadsafe`
- Schedule a coroutine from another OS thread.

Expand Down
56 changes: 56 additions & 0 deletions Doc/library/asyncio-task.rst
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,62 @@ Waiting Primitives
# ...


Running in Threads
==================

.. coroutinefunction:: to_thread(func, /, \*args, \*\*kwargs)

Asynchronously run function *func* in a separate thread.

Any \*args and \*\*kwargs supplied for this function are directly passed
to *func*.

Return an :class:`asyncio.Future` which represents the eventual result of
*func*.

This coroutine function is primarily intended to be used for executing
IO-bound functions/methods that would otherwise block the event loop if
they were ran in the main thread. For example::

def blocking_io():
print(f"start blocking_io at {time.strftime('%X')}")
# Note that time.sleep() can be replaced with any blocking
# IO-bound operation, such as file operations.
time.sleep(1)
print(f"blocking_io complete at {time.strftime('%X')}")

async def main():
print(f"started main at {time.strftime('%X')}")

await asyncio.gather(
asyncio.to_thread(blocking_io),
asyncio.sleep(1))

print(f"finished main at {time.strftime('%X')}")


asyncio.run(main())

# Expected output:
#
# started main at 19:50:53
# start blocking_io at 19:50:53
# blocking_io complete at 19:50:54
# finished main at 19:50:54

Directly calling `blocking_io()` in any coroutine would block the event loop
for its duration, resulting in an additional 1 second of run time. Instead,
by using `asyncio.to_thread()`, we can run it in a separate thread without
blocking the event loop.

.. note::

Due to the :term:`GIL`, `asyncio.to_thread()` can typically only be used
to make IO-bound functions non-blocking. However, for extension modules
that release the GIL or alternative Python implementations that don't
have one, `asyncio.to_thread()` can also be used for CPU-bound functions.


Scheduling From Other Threads
=============================

Expand Down
6 changes: 6 additions & 0 deletions Doc/whatsnew/3.9.rst
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,12 @@ that schedules a shutdown for the default executor that waits on the
Added :class:`asyncio.PidfdChildWatcher`, a Linux-specific child watcher
implementation that polls process file descriptors. (:issue:`38692`)

Added a new :term:`coroutine` :func:`asyncio.to_thread`. It is mainly used for
running IO-bound functions in a separate thread to avoid blocking the event
loop, and essentially works as a high-level version of
:meth:`~asyncio.loop.run_in_executor` that can directly take keyword arguments.
(Contributed by Kyle Stanley and Yury Selivanov in :issue:`32309`.)

compileall
----------

Expand Down
2 changes: 2 additions & 0 deletions Lib/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .streams import *
from .subprocess import *
from .tasks import *
from .threads import *
from .transports import *

# Exposed for _asynciomodule.c to implement now deprecated
Expand All @@ -35,6 +36,7 @@
streams.__all__ +
subprocess.__all__ +
tasks.__all__ +
threads.__all__ +
transports.__all__)

if sys.platform == 'win32': # pragma: no cover
Expand Down
21 changes: 21 additions & 0 deletions Lib/asyncio/threads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""High-level support for working with threads in asyncio"""

import functools

from . import events


__all__ = "to_thread",


async def to_thread(func, /, *args, **kwargs):
"""Asynchronously run function *func* in a separate thread.
Any *args and **kwargs supplied for this function are directly passed
to *func*.
Return an asyncio.Future which represents the eventual result of *func*.
"""
loop = events.get_running_loop()
func_call = functools.partial(func, *args, **kwargs)
return await loop.run_in_executor(None, func_call)
79 changes: 79 additions & 0 deletions Lib/test/test_asyncio/test_threads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Tests for asyncio/threads.py"""

import asyncio
import unittest

from unittest import mock
from test.test_asyncio import utils as test_utils


def tearDownModule():
asyncio.set_event_loop_policy(None)


class ToThreadTests(test_utils.TestCase):
def setUp(self):
super().setUp()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

def tearDown(self):
self.loop.run_until_complete(
self.loop.shutdown_default_executor())
self.loop.close()
asyncio.set_event_loop(None)
self.loop = None
super().tearDown()

def test_to_thread(self):
async def main():
return await asyncio.to_thread(sum, [40, 2])

result = self.loop.run_until_complete(main())
self.assertEqual(result, 42)

def test_to_thread_exception(self):
def raise_runtime():
raise RuntimeError("test")

async def main():
await asyncio.to_thread(raise_runtime)

with self.assertRaisesRegex(RuntimeError, "test"):
self.loop.run_until_complete(main())

def test_to_thread_once(self):
func = mock.Mock()

async def main():
await asyncio.to_thread(func)

self.loop.run_until_complete(main())
func.assert_called_once()

def test_to_thread_concurrent(self):
func = mock.Mock()

async def main():
futs = []
for _ in range(10):
fut = asyncio.to_thread(func)
futs.append(fut)
await asyncio.gather(*futs)

self.loop.run_until_complete(main())
self.assertEqual(func.call_count, 10)

def test_to_thread_args_kwargs(self):
# Unlike run_in_executor(), to_thread() should directly accept kwargs.
func = mock.Mock()

async def main():
await asyncio.to_thread(func, 'test', something=True)

self.loop.run_until_complete(main())
func.assert_called_once_with('test', something=True)


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Added a new :term:`coroutine` :func:`asyncio.to_thread`. It is mainly used for
running IO-bound functions in a separate thread to avoid blocking the event
loop, and essentially works as a high-level version of
:meth:`~asyncio.loop.run_in_executor` that can directly take keyword arguments.

0 comments on commit 5946f7b

Please sign in to comment.