Skip to content

prefect.utilities.asyncutils

Utilities for interoperability with async functions and workers from various contexts.

GatherIncomplete

Bases: RuntimeError

Used to indicate retrieving gather results before completion

Source code in prefect/utilities/asyncutils.py
310
311
class GatherIncomplete(RuntimeError):
    """Used to indicate retrieving gather results before completion"""

GatherTaskGroup

Bases: anyio.abc.TaskGroup

A task group that gathers results.

AnyIO does not include support gather. This class extends the TaskGroup interface to allow simple gathering.

See https://github.com/agronholm/anyio/issues/100

This class should be instantiated with create_gather_task_group.

Source code in prefect/utilities/asyncutils.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
class GatherTaskGroup(anyio.abc.TaskGroup):
    """
    A task group that gathers results.

    AnyIO does not include support `gather`. This class extends the `TaskGroup`
    interface to allow simple gathering.

    See https://github.com/agronholm/anyio/issues/100

    This class should be instantiated with `create_gather_task_group`.
    """

    def __init__(self, task_group: anyio.abc.TaskGroup):
        self._results: Dict[UUID, Any] = {}
        # The concrete task group implementation to use
        self._task_group: anyio.abc.TaskGroup = task_group

    async def _run_and_store(self, key, fn, args):
        self._results[key] = await fn(*args)

    def start_soon(self, fn, *args) -> UUID:
        key = uuid4()
        # Put a placeholder in-case the result is retrieved earlier
        self._results[key] = GatherIncomplete
        self._task_group.start_soon(self._run_and_store, key, fn, args)
        return key

    async def start(self, fn, *args):
        """
        Since `start` returns the result of `task_status.started()` but here we must
        return the key instead, we just won't support this method for now.
        """
        raise RuntimeError("`GatherTaskGroup` does not support `start`.")

    def get_result(self, key: UUID) -> Any:
        result = self._results[key]
        if result is GatherIncomplete:
            raise GatherIncomplete(
                "Task is not complete. "
                "Results should not be retrieved until the task group exits."
            )
        return result

    async def __aenter__(self):
        await self._task_group.__aenter__()
        return self

    async def __aexit__(self, *tb):
        try:
            retval = await self._task_group.__aexit__(*tb)
            return retval
        finally:
            del self._task_group

start async

Since start returns the result of task_status.started() but here we must return the key instead, we just won't support this method for now.

Source code in prefect/utilities/asyncutils.py
341
342
343
344
345
346
async def start(self, fn, *args):
    """
    Since `start` returns the result of `task_status.started()` but here we must
    return the key instead, we just won't support this method for now.
    """
    raise RuntimeError("`GatherTaskGroup` does not support `start`.")

add_event_loop_shutdown_callback async

Adds a callback to the given callable on event loop closure. The callable must be a coroutine function. It will be awaited when the current event loop is shutting down.

Requires use of asyncio.run() which waits for async generator shutdown by default or explicit call of asyncio.shutdown_asyncgens(). If the application is entered with asyncio.run_until_complete() and the user calls asyncio.close() without the generator shutdown call, this will not trigger callbacks.

asyncio does not provided any other way to clean up a resource when the event loop is about to close.

Source code in prefect/utilities/asyncutils.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
async def add_event_loop_shutdown_callback(coroutine_fn: Callable[[], Awaitable]):
    """
    Adds a callback to the given callable on event loop closure. The callable must be
    a coroutine function. It will be awaited when the current event loop is shutting
    down.

    Requires use of `asyncio.run()` which waits for async generator shutdown by
    default or explicit call of `asyncio.shutdown_asyncgens()`. If the application
    is entered with `asyncio.run_until_complete()` and the user calls
    `asyncio.close()` without the generator shutdown call, this will not trigger
    callbacks.

    asyncio does not provided _any_ other way to clean up a resource when the event
    loop is about to close.
    """

    async def on_shutdown(key):
        try:
            yield
        except GeneratorExit:
            await coroutine_fn()
            # Remove self from the garbage collection set
            EVENT_LOOP_GC_REFS.pop(key)

    # Create the iterator and store it in a global variable so it is not garbage
    # collected. If the iterator is garbage collected before the event loop closes, the
    # callback will not run. Since this function does not know the scope of the event
    # loop that is calling it, a reference with global scope is necessary to ensure
    # garbage collection does not occur until after event loop closure.
    key = id(on_shutdown)
    EVENT_LOOP_GC_REFS[key] = on_shutdown(key)

    # Begin iterating so it will be cleaned up as an incomplete generator
    await EVENT_LOOP_GC_REFS[key].__anext__()

create_gather_task_group

Create a new task group that gathers results

Source code in prefect/utilities/asyncutils.py
369
370
371
372
373
374
def create_gather_task_group() -> GatherTaskGroup:
    """Create a new task group that gathers results"""
    # This function matches the AnyIO API which uses callables since the concrete
    # task group class depends on the async library being used and cannot be
    # determined until runtime
    return GatherTaskGroup(anyio.create_task_group())

gather async

Run calls concurrently and gather their results.

Unlike asyncio.gather this expects to receieve callables not coroutines. This matches anyio semantics.

Source code in prefect/utilities/asyncutils.py
377
378
379
380
381
382
383
384
385
386
387
388
async def gather(*calls: Callable[[], Coroutine[Any, Any, T]]) -> List[T]:
    """
    Run calls concurrently and gather their results.

    Unlike `asyncio.gather` this expects to receieve _callables_ not _coroutines_.
    This matches `anyio` semantics.
    """
    keys = []
    async with create_gather_task_group() as tg:
        for call in calls:
            keys.append(tg.start_soon(call))
    return [tg.get_result(key) for key in keys]

is_async_fn

Returns True if a function returns a coroutine.

See https://github.com/microsoft/pyright/issues/2142 for an example use

Source code in prefect/utilities/asyncutils.py
53
54
55
56
57
58
59
60
61
62
63
64
def is_async_fn(
    func: Union[Callable[P, R], Callable[P, Awaitable[R]]]
) -> TypeGuard[Callable[P, Awaitable[R]]]:
    """
    Returns `True` if a function returns a coroutine.

    See https://github.com/microsoft/pyright/issues/2142 for an example use
    """
    while hasattr(func, "__wrapped__"):
        func = func.__wrapped__

    return inspect.iscoroutinefunction(func)

is_async_gen_fn

Returns True if a function is an async generator.

Source code in prefect/utilities/asyncutils.py
67
68
69
70
71
72
73
74
def is_async_gen_fn(func):
    """
    Returns `True` if a function is an async generator.
    """
    while hasattr(func, "__wrapped__"):
        func = func.__wrapped__

    return inspect.isasyncgenfunction(func)

raise_async_exception_in_thread

Raise an exception in a thread asynchronously.

This will not interrupt long-running system calls like sleep or wait.

Source code in prefect/utilities/asyncutils.py
 96
 97
 98
 99
100
101
102
103
104
105
106
def raise_async_exception_in_thread(thread: Thread, exc_type: Type[BaseException]):
    """
    Raise an exception in a thread asynchronously.

    This will not interrupt long-running system calls like `sleep` or `wait`.
    """
    ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(
        ctypes.c_long(thread.ident), ctypes.py_object(exc_type)
    )
    if ret == 0:
        raise ValueError("Thread not found.")

run_async_from_worker_thread

Runs an async function in the main thread's event loop, blocking the worker thread until completion

Source code in prefect/utilities/asyncutils.py
169
170
171
172
173
174
175
176
177
def run_async_from_worker_thread(
    __fn: Callable[..., Awaitable[T]], *args: Any, **kwargs: Any
) -> T:
    """
    Runs an async function in the main thread's event loop, blocking the worker
    thread until completion
    """
    call = partial(__fn, *args, **kwargs)
    return anyio.from_thread.run(call)

run_sync_in_interruptible_worker_thread async

Runs a sync function in a new interruptible worker thread so that the main thread's event loop is not blocked

Unlike the anyio function, this performs best-effort cancellation of the thread using the C API. Cancellation will not interrupt system calls like sleep.

Source code in prefect/utilities/asyncutils.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
async def run_sync_in_interruptible_worker_thread(
    __fn: Callable[..., T], *args: Any, **kwargs: Any
) -> T:
    """
    Runs a sync function in a new interruptible worker thread so that the main
    thread's event loop is not blocked

    Unlike the anyio function, this performs best-effort cancellation of the
    thread using the C API. Cancellation will not interrupt system calls like
    `sleep`.
    """

    class NotSet:
        pass

    thread: Thread = None
    result = NotSet
    event = asyncio.Event()
    loop = asyncio.get_running_loop()

    def capture_worker_thread_and_result():
        # Captures the worker thread that AnyIO is using to execute the function so
        # the main thread can perform actions on it
        nonlocal thread, result
        try:
            thread = threading.current_thread()
            result = __fn(*args, **kwargs)
        except BaseException as exc:
            result = exc
            raise
        finally:
            loop.call_soon_threadsafe(event.set)

    async def send_interrupt_to_thread():
        # This task waits until the result is returned from the thread, if cancellation
        # occurs during that time, we will raise the exception in the thread as well
        try:
            await event.wait()
        except anyio.get_cancelled_exc_class():
            # NOTE: We could send a SIGINT here which allow us to interrupt system
            # calls but the interrupt bubbles from the child thread into the main thread
            # and there is not a clear way to prevent it.
            raise_async_exception_in_thread(thread, anyio.get_cancelled_exc_class())
            raise

    async with anyio.create_task_group() as tg:
        tg.start_soon(send_interrupt_to_thread)
        tg.start_soon(
            partial(
                anyio.to_thread.run_sync,
                capture_worker_thread_and_result,
                cancellable=True,
                limiter=get_thread_limiter(),
            )
        )

    assert result is not NotSet
    return result

run_sync_in_worker_thread async

Runs a sync function in a new worker thread so that the main thread's event loop is not blocked

Unlike the anyio function, this defaults to a cancellable thread and does not allow passing arguments to the anyio function so users can pass kwargs to their function.

Note that cancellation of threads will not result in interrupted computation, the thread may continue running — the outcome will just be ignored.

Source code in prefect/utilities/asyncutils.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
async def run_sync_in_worker_thread(
    __fn: Callable[..., T], *args: Any, **kwargs: Any
) -> T:
    """
    Runs a sync function in a new worker thread so that the main thread's event loop
    is not blocked

    Unlike the anyio function, this defaults to a cancellable thread and does not allow
    passing arguments to the anyio function so users can pass kwargs to their function.

    Note that cancellation of threads will not result in interrupted computation, the
    thread may continue running — the outcome will just be ignored.
    """
    call = partial(__fn, *args, **kwargs)
    return await anyio.to_thread.run_sync(
        call, cancellable=True, limiter=get_thread_limiter()
    )

sync

Call an async function from a synchronous context. Block until completion.

If in an asynchronous context, we will run the code in a separate loop instead of failing but a warning will be displayed since this is not recommended.

Source code in prefect/utilities/asyncutils.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def sync(__async_fn: Callable[P, Awaitable[T]], *args: P.args, **kwargs: P.kwargs) -> T:
    """
    Call an async function from a synchronous context. Block until completion.

    If in an asynchronous context, we will run the code in a separate loop instead of
    failing but a warning will be displayed since this is not recommended.
    """
    if in_async_main_thread():
        warnings.warn(
            "`sync` called from an asynchronous context; "
            "you should `await` the async function directly instead."
        )
        with anyio.start_blocking_portal() as portal:
            return portal.call(partial(__async_fn, *args, **kwargs))
    elif in_async_worker_thread():
        # In a sync context but we can access the event loop thread; send the async
        # call to the parent
        return run_async_from_worker_thread(__async_fn, *args, **kwargs)
    else:
        # In a sync context and there is no event loop; just create an event loop
        # to run the async code then tear it down
        return run_async_in_new_loop(__async_fn, *args, **kwargs)

sync_compatible

Converts an async function into a dual async and sync function.

When the returned function is called, we will attempt to determine the best way to enter the async function.

  • If in a thread with a running event loop, we will return the coroutine for the caller to await. This is normal async behavior.
  • If in a blocking worker thread with access to an event loop in another thread, we will submit the async method to the event loop.
  • If we cannot find an event loop, we will create a new one and run the async method then tear down the loop.
Source code in prefect/utilities/asyncutils.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def sync_compatible(async_fn: T) -> T:
    """
    Converts an async function into a dual async and sync function.

    When the returned function is called, we will attempt to determine the best way
    to enter the async function.

    - If in a thread with a running event loop, we will return the coroutine for the
        caller to await. This is normal async behavior.
    - If in a blocking worker thread with access to an event loop in another thread, we
        will submit the async method to the event loop.
    - If we cannot find an event loop, we will create a new one and run the async method
        then tear down the loop.
    """

    @wraps(async_fn)
    def coroutine_wrapper(*args, **kwargs):
        if in_async_main_thread():
            # In the main async context; return the coro for them to await
            return async_fn(*args, **kwargs)
        elif in_async_worker_thread():
            # In a sync context but we can access the event loop thread; send the async
            # call to the parent
            return run_async_from_worker_thread(async_fn, *args, **kwargs)
        else:
            # In a sync context and there is no event loop; just create an event loop
            # to run the async code then tear it down
            return run_async_in_new_loop(async_fn, *args, **kwargs)

    # TODO: This is breaking type hints on the callable... mypy is behind the curve
    #       on argument annotations. We can still fix this for editors though.
    if is_async_fn(async_fn):
        wrapper = coroutine_wrapper
    elif is_async_gen_fn(async_fn):
        raise ValueError("Async generators cannot yet be marked as `sync_compatible`")
    else:
        raise TypeError("The decorated function must be async.")

    wrapper.aio = async_fn
    return wrapper