Skip to content

Commit

Permalink
Rename async utilities and add more docstrings (#388)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobtomlinson authored May 23, 2024
1 parent 2884d74 commit 374968d
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 26 deletions.
4 changes: 2 additions & 2 deletions kr8s/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@

from ._api import ALL # noqa
from ._api import Api as _AsyncApi
from ._async_utils import run_sync as _run_sync
from ._async_utils import sync as _sync # noqa
from ._exceptions import (
APITimeoutError, # noqa
ConnectionClosedError, # noqa
ExecError, # noqa
NotFoundError, # noqa
ServerError, # noqa
)
from ._io import run_sync as _run_sync
from ._io import sync as _sync # noqa
from .asyncio import (
api as _api,
)
Expand Down
90 changes: 73 additions & 17 deletions kr8s/_io.py → kr8s/_async_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
# SPDX-FileCopyrightText: Copyright (c) 2023-2024, Kr8s Developers (See LICENSE for list)
# SPDX-License-Identifier: BSD 3-Clause License
#
# Utilities for running async code in a sync context. This is how kr8s is able to provide a sync API.
#
# The sync API in kr8s needs to wrap all async functions in a way that allows them to be run in a sync context.
# However, the sync API may be used within an async context, so we need to be able to run async code nested in
# a sync context that is itself nested in an async context. This is a bit of a tricky problem, but it can be
# solved by running the nested async code in a separate thread.
#
# This file was originally based on universalasync (commit d397911) and jupyter-core (commit 98b9a1a).
# Both projects attempt to solve the same problem: how to run nested async tasks.
# Neither solution quite fit in here, so we forked them and combined them.
# Neither solution quite fit in here, so we forked them and combined them. Things have evolved a lot
# since then, but the licenses and links are included here for historical reasons.
#
# universalasync License: https://github.com/bitcartcc/universalasync/blob/d397911/LICENSE
# jupyter-core License: https://github.com/jupyter/jupyter_core/blob/98b9a1a/COPYING.md
Expand Down Expand Up @@ -42,6 +50,20 @@


class Portal:
"""A class that manages a thread running an anyio loop.
This class is a singleton that manages a thread running an anyio loop and provides
an anyio portal to communicate with the loop from a sync context.
See https://anyio.readthedocs.io/en/stable/api.html#anyio.from_thread.start_blocking_portal for more info.
It's important to start the loop in a separate thread because the sync code may be
running in a context where an event loop is already running, and we can't run two
event loops in the same thread. This commonly happens when running in IPython, a Jupyter
notebook or when running async tests with pytest.
"""

_instance = None
_portal = None

Expand All @@ -60,41 +82,46 @@ async def _run(self):
self._portal = portal
await portal.sleep_until_stopped()

def call(self, func: Callable[P, T], *args, **kwargs) -> T:
def call(self, func: Callable[P, Awaitable[T]], *args, **kwargs) -> T:
"""Call a coroutine in the runner loop and return the result."""
# On first call the thread has to start the loop, so we need to wait for it
while not self._portal:
pass
return self._portal.call(func, *args, **kwargs)


def run_sync(coro: Callable[P, Awaitable[T]]) -> Callable[P, T]:
"""Wraps coroutine in a function that blocks until it has executed.
"""Wraps a coroutine in a function that blocks until it has executed.
Parameters
----------
coro : coroutine-function
The coroutine-function to be executed.
Args:
coro (Awaitable): A coroutine.
Returns
-------
result :
Whatever the coroutine-function returns.
Returns:
Callable: A sync function that executes the coroutine via the :class`Portal`.
"""

@wraps(coro)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
def run_sync_inner(*args: P.args, **kwargs: P.kwargs) -> T:
wrapped = partial(coro, *args, **kwargs)
wrapped.__doc__ = coro.__doc__
if inspect.isasyncgenfunction(coro):
return iter_over_async(wrapped)
if inspect.iscoroutinefunction(coro):
portal = Portal()
return portal.call(wrapped)
raise TypeError(f"Expected coroutine function, got {coro.__class__.__name__}")

return wrapper
return run_sync_inner


def iter_over_async(agen: AsyncGenerator) -> Generator:
"""Convert an async generator to a sync generator.
Args:
agen (AsyncGenerator): async generator to convert
Yields:
Any: object from async generator
"""
ait = agen().__aiter__()

async def get_next() -> Tuple[bool, Any]:
Expand All @@ -115,13 +142,42 @@ async def get_next() -> Tuple[bool, Any]:
def sync(source: C) -> C:
"""Convert all public async methods/properties of an object to universal methods.
See :func:`run_sync` for more info
Private methods or methods starting with "async_" are ignored.
See :func:`run_sync` for more info on how the conversion works.
Args:
source (object): object to convert
source (C): object with coroutines to convert
Returns:
object: converted object. Note that parameter passed is being modified anyway
C: converted object with sync methods
Examples:
It's common to implement a coroutine and name it with async_ and then wrap that
in another corotine that calls it. That way the method can be called from both
sync and async contexts as function allows you to convert the outer coroutine
to a sync method. But other async methods or other async objects can call the
inner coroutine directly.
>>> class Foo:
... async def async_bar(self):
... return 42
... async def bar(self):
... return await self.async_bar()
... async def baz(self):
... # If you want to calll self.bar() from another async method
... # you can't when it gets wrapped with sync, so you can call
... # self.async_bar() directly instead.
... return (await self.async_bar()) + 1
...
>>> SyncFoo = sync(Foo)
>>> inst = SyncFoo()
>>> inst.bar()
42
>>> inst.async_bar()
<coroutine object Foo.async_bar at 0x7fbe0442b940>
>>> inst.baz()
43
"""
setattr(source, "_asyncio", False)
for name in dir(source):
Expand Down
2 changes: 1 addition & 1 deletion kr8s/_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

import anyio

from ._async_utils import NamedTemporaryFile, check_output
from ._config import KubeConfigSet
from ._io import NamedTemporaryFile, check_output


class KubeAuth:
Expand Down
2 changes: 1 addition & 1 deletion kr8s/_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import kr8s
import kr8s.asyncio
from kr8s._api import Api
from kr8s._async_utils import sync
from kr8s._data_utils import (
dict_to_selector,
dot_to_nested_dict,
Expand All @@ -37,7 +38,6 @@
)
from kr8s._exceptions import NotFoundError, ServerError
from kr8s._exec import Exec
from kr8s._io import sync
from kr8s.asyncio.portforward import PortForward as AsyncPortForward
from kr8s.portforward import PortForward as SyncPortForward

Expand Down
2 changes: 1 addition & 1 deletion kr8s/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# SPDX-License-Identifier: BSD 3-Clause License
from functools import partial

from ._io import run_sync, sync
from ._async_utils import run_sync, sync
from ._objects import (
APIObject as _APIObject,
)
Expand Down
2 changes: 1 addition & 1 deletion kr8s/portforward.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import threading
import time

from ._io import sync
from ._async_utils import sync
from ._portforward import PortForward as _PortForward


Expand Down
6 changes: 3 additions & 3 deletions kr8s/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import trio

import kr8s
from kr8s._io import NamedTemporaryFile
from kr8s._async_utils import NamedTemporaryFile
from kr8s.asyncio.objects import Pod


Expand Down Expand Up @@ -62,8 +62,8 @@ async def test_tempfiles():


async def test_check_output():
output = await kr8s._io.check_output("echo", "hello")
output = await kr8s._async_utils.check_output("echo", "hello")
assert output == "hello\n"

with pytest.raises(RuntimeError, match="non-zero code"):
await kr8s._io.check_output("false")
await kr8s._async_utils.check_output("false")

0 comments on commit 374968d

Please sign in to comment.