diff --git a/kr8s/__init__.py b/kr8s/__init__.py index 7ac5cf7c..f965c0d1 100644 --- a/kr8s/__init__.py +++ b/kr8s/__init__.py @@ -6,6 +6,8 @@ from ._api import ALL # noqa from ._api import Api as _AsyncApi +from ._async_utils import run_sync as _run_sync +from ._async_utils import sync as _sync # noqa from ._exceptions import ( APITimeoutError, # noqa ConnectionClosedError, # noqa @@ -13,8 +15,6 @@ NotFoundError, # noqa ServerError, # noqa ) -from ._io import run_sync as _run_sync -from ._io import sync as _sync # noqa from .asyncio import ( api as _api, ) diff --git a/kr8s/_io.py b/kr8s/_async_utils.py similarity index 57% rename from kr8s/_io.py rename to kr8s/_async_utils.py index f3f50f96..e620f61d 100644 --- a/kr8s/_io.py +++ b/kr8s/_async_utils.py @@ -1,9 +1,17 @@ # SPDX-FileCopyrightText: Copyright (c) 2023-2024, Kr8s Developers (See LICENSE for list) # SPDX-License-Identifier: BSD 3-Clause License # +# Utilities for running async code in a sync context. This is how kr8s is able to provide a sync API. +# +# The sync API in kr8s needs to wrap all async functions in a way that allows them to be run in a sync context. +# However, the sync API may be used within an async context, so we need to be able to run async code nested in +# a sync context that is itself nested in an async context. This is a bit of a tricky problem, but it can be +# solved by running the nested async code in a separate thread. +# # This file was originally based on universalasync (commit d397911) and jupyter-core (commit 98b9a1a). # Both projects attempt to solve the same problem: how to run nested async tasks. -# Neither solution quite fit in here, so we forked them and combined them. +# Neither solution quite fit in here, so we forked them and combined them. Things have evolved a lot +# since then, but the licenses and links are included here for historical reasons. # # universalasync License: https://github.com/bitcartcc/universalasync/blob/d397911/LICENSE # jupyter-core License: https://github.com/jupyter/jupyter_core/blob/98b9a1a/COPYING.md @@ -42,6 +50,20 @@ class Portal: + """A class that manages a thread running an anyio loop. + + This class is a singleton that manages a thread running an anyio loop and provides + an anyio portal to communicate with the loop from a sync context. + + See https://anyio.readthedocs.io/en/stable/api.html#anyio.from_thread.start_blocking_portal for more info. + + It's important to start the loop in a separate thread because the sync code may be + running in a context where an event loop is already running, and we can't run two + event loops in the same thread. This commonly happens when running in IPython, a Jupyter + notebook or when running async tests with pytest. + + """ + _instance = None _portal = None @@ -60,30 +82,27 @@ async def _run(self): self._portal = portal await portal.sleep_until_stopped() - def call(self, func: Callable[P, T], *args, **kwargs) -> T: + def call(self, func: Callable[P, Awaitable[T]], *args, **kwargs) -> T: + """Call a coroutine in the runner loop and return the result.""" + # On first call the thread has to start the loop, so we need to wait for it while not self._portal: pass return self._portal.call(func, *args, **kwargs) def run_sync(coro: Callable[P, Awaitable[T]]) -> Callable[P, T]: - """Wraps coroutine in a function that blocks until it has executed. + """Wraps a coroutine in a function that blocks until it has executed. - Parameters - ---------- - coro : coroutine-function - The coroutine-function to be executed. + Args: + coro (Awaitable): A coroutine. - Returns - ------- - result : - Whatever the coroutine-function returns. + Returns: + Callable: A sync function that executes the coroutine via the :class`Portal`. """ @wraps(coro) - def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: + def run_sync_inner(*args: P.args, **kwargs: P.kwargs) -> T: wrapped = partial(coro, *args, **kwargs) - wrapped.__doc__ = coro.__doc__ if inspect.isasyncgenfunction(coro): return iter_over_async(wrapped) if inspect.iscoroutinefunction(coro): @@ -91,10 +110,18 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: return portal.call(wrapped) raise TypeError(f"Expected coroutine function, got {coro.__class__.__name__}") - return wrapper + return run_sync_inner def iter_over_async(agen: AsyncGenerator) -> Generator: + """Convert an async generator to a sync generator. + + Args: + agen (AsyncGenerator): async generator to convert + + Yields: + Any: object from async generator + """ ait = agen().__aiter__() async def get_next() -> Tuple[bool, Any]: @@ -115,13 +142,42 @@ async def get_next() -> Tuple[bool, Any]: def sync(source: C) -> C: """Convert all public async methods/properties of an object to universal methods. - See :func:`run_sync` for more info + Private methods or methods starting with "async_" are ignored. + See :func:`run_sync` for more info on how the conversion works. Args: - source (object): object to convert + source (C): object with coroutines to convert Returns: - object: converted object. Note that parameter passed is being modified anyway + C: converted object with sync methods + + Examples: + It's common to implement a coroutine and name it with async_ and then wrap that + in another corotine that calls it. That way the method can be called from both + sync and async contexts as function allows you to convert the outer coroutine + to a sync method. But other async methods or other async objects can call the + inner coroutine directly. + + >>> class Foo: + ... async def async_bar(self): + ... return 42 + ... async def bar(self): + ... return await self.async_bar() + ... async def baz(self): + ... # If you want to calll self.bar() from another async method + ... # you can't when it gets wrapped with sync, so you can call + ... # self.async_bar() directly instead. + ... return (await self.async_bar()) + 1 + ... + >>> SyncFoo = sync(Foo) + >>> inst = SyncFoo() + >>> inst.bar() + 42 + >>> inst.async_bar() + + >>> inst.baz() + 43 + """ setattr(source, "_asyncio", False) for name in dir(source): diff --git a/kr8s/_auth.py b/kr8s/_auth.py index 970d883b..1a2bb8db 100644 --- a/kr8s/_auth.py +++ b/kr8s/_auth.py @@ -9,8 +9,8 @@ import anyio +from ._async_utils import NamedTemporaryFile, check_output from ._config import KubeConfigSet -from ._io import NamedTemporaryFile, check_output class KubeAuth: diff --git a/kr8s/_objects.py b/kr8s/_objects.py index 5eb1ff4f..811ae692 100644 --- a/kr8s/_objects.py +++ b/kr8s/_objects.py @@ -29,6 +29,7 @@ import kr8s import kr8s.asyncio from kr8s._api import Api +from kr8s._async_utils import sync from kr8s._data_utils import ( dict_to_selector, dot_to_nested_dict, @@ -37,7 +38,6 @@ ) from kr8s._exceptions import NotFoundError, ServerError from kr8s._exec import Exec -from kr8s._io import sync from kr8s.asyncio.portforward import PortForward as AsyncPortForward from kr8s.portforward import PortForward as SyncPortForward diff --git a/kr8s/objects.py b/kr8s/objects.py index b26f86a9..3600e533 100644 --- a/kr8s/objects.py +++ b/kr8s/objects.py @@ -2,7 +2,7 @@ # SPDX-License-Identifier: BSD 3-Clause License from functools import partial -from ._io import run_sync, sync +from ._async_utils import run_sync, sync from ._objects import ( APIObject as _APIObject, ) diff --git a/kr8s/portforward.py b/kr8s/portforward.py index 85c75f15..d9f7fcb2 100644 --- a/kr8s/portforward.py +++ b/kr8s/portforward.py @@ -3,7 +3,7 @@ import threading import time -from ._io import sync +from ._async_utils import sync from ._portforward import PortForward as _PortForward diff --git a/kr8s/tests/test_io.py b/kr8s/tests/test_io.py index 7e4edb02..5b4ffbba 100644 --- a/kr8s/tests/test_io.py +++ b/kr8s/tests/test_io.py @@ -5,7 +5,7 @@ import trio import kr8s -from kr8s._io import NamedTemporaryFile +from kr8s._async_utils import NamedTemporaryFile from kr8s.asyncio.objects import Pod @@ -62,8 +62,8 @@ async def test_tempfiles(): async def test_check_output(): - output = await kr8s._io.check_output("echo", "hello") + output = await kr8s._async_utils.check_output("echo", "hello") assert output == "hello\n" with pytest.raises(RuntimeError, match="non-zero code"): - await kr8s._io.check_output("false") + await kr8s._async_utils.check_output("false")