Skip to content

Commit

Permalink
Fix bug in streaming associated with additive chunks (#158)
Browse files Browse the repository at this point in the history
- Fix issue in stream/astream endpoint associated with addable types

- Have not been able to figure how to run sync unit tests yet with
pytest:
- fastapi app is async so there's an event loop that's created somewhere
  - within fastapi sse_starlette also deals with an event loop for the
    streaming endpoint
- This results in a failure when running all unit tests together
(sysid/sse-starlette#68)
  • Loading branch information
eyurtsev authored Nov 3, 2023
1 parent 876201e commit ce17cc4
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 21 deletions.
58 changes: 49 additions & 9 deletions langserve/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
get_async_callback_manager_for_config,
get_callback_manager_for_config,
)
from langchain.schema.runnable.utils import Input, Output
from langchain.schema.runnable.utils import AddableDict, Input, Output

from langserve.callbacks import CallbackEventDict, ahandle_callbacks, handle_callbacks
from langserve.serialization import (
Expand Down Expand Up @@ -453,6 +453,7 @@ def stream(
callback_manager = get_callback_manager_for_config(config)

final_output: Optional[Output] = None
final_output_supported = True

run_manager = callback_manager.on_chain_start(
dumpd(self),
Expand Down Expand Up @@ -481,12 +482,31 @@ def stream(
for sse in event_source.iter_sse():
if sse.event == "data":
chunk = self._lc_serializer.loads(sse.data)
if isinstance(chunk, dict):
# Any dict returned from streaming end point
# is assumed to follow additive semantics
# and will be converted to an AddableDict
# automatically
chunk = AddableDict(chunk)
yield chunk

if final_output:
final_output += chunk
else:
final_output = chunk
if final_output_supported:
# here we attempt to aggregate the final output
# from the stream.
# the final output is used for the final callback
# event (`on_chain_end`)
# Aggregating the final output is only supported
# if the output is additive (e.g., string or
# AddableDict, etc.)
# We attempt to aggregate it on best effort basis.
if final_output is None:
final_output = chunk
else:
try:
final_output = final_output + chunk
except TypeError:
final_output = None
final_output_supported = False
elif sse.event == "error":
# This can only be a server side error
_raise_exception_from_data(
Expand Down Expand Up @@ -516,6 +536,7 @@ async def astream(
callback_manager = get_async_callback_manager_for_config(config)

final_output: Optional[Output] = None
final_output_supported = True

run_manager = await callback_manager.on_chain_start(
dumpd(self),
Expand All @@ -541,12 +562,31 @@ async def astream(
async for sse in event_source.aiter_sse():
if sse.event == "data":
chunk = self._lc_serializer.loads(sse.data)
if isinstance(chunk, dict):
# Any dict returned from streaming end point
# is assumed to follow additive semantics
# and will be converted to an AddableDict
# automatically
chunk = AddableDict(chunk)
yield chunk

if final_output:
final_output += chunk
else:
final_output = chunk
if final_output_supported:
# here we attempt to aggregate the final output
# from the stream.
# the final output is used for the final callback
# event (`on_chain_end`)
# Aggregating the final output is only supported
# if the output is additive (e.g., string or
# AddableDict, etc.)
# We attempt to aggregate it on best effort basis.
if final_output is None:
final_output = chunk
else:
try:
final_output = final_output + chunk
except TypeError:
final_output = None
final_output_supported = False

elif sse.event == "error":
# This can only be a server side error
Expand Down
68 changes: 56 additions & 12 deletions tests/unit_tests/test_server_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import uuid
from asyncio import AbstractEventLoop
from contextlib import asynccontextmanager, contextmanager
from typing import Any, Dict, Iterator, List, Optional, Union
from typing import Any, Dict, Iterable, Iterator, List, Optional, Union

import httpx
import pytest
Expand Down Expand Up @@ -1263,8 +1263,14 @@ async def passthrough_dict(d: Any) -> Any:
}


class ErroringRunnable(Runnable):
"""A custom runnable for testing errors are raised server side."""
class StreamingRunnable(Runnable):
"""A custom runnable used for testing purposes"""

iterable: Iterable[Any]

def __init__(self, iterable: Iterable[Any]) -> None:
"""Initialize the runnable."""
self.iterable = iterable

def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
"""Invoke the runnable."""
Expand All @@ -1276,27 +1282,64 @@ def stream(
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Iterator[Output]:
yield 1
yield 2
raise ValueError("An exception occurred")
raise NotImplementedError()

async def astream(
self,
input: Iterator[Input],
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Iterator[Output]:
yield 1
yield 2
raise ValueError("An exception occurred")
for element in self.iterable:
if isinstance(element, BaseException):
raise element
yield element


# Have not figured out how to test sync stream yet
# def test_streaming_dict_sync() -> None:
# """Test streaming different types of items."""
# app = FastAPI()
#
# stream_dict = StreamingRunnable(iterable=[{"a": "1"}, {"a": "2"}])
#
# add_routes(app, stream_dict)
#
# # Invoke request
# with get_sync_remote_runnable(app) as runnable:
# chunks = []
# for chunk in runnable.stream("input ignored"):
# chunks.append(chunk)
#
# assert chunks == [{"a": "1"}, {"a": "2"}]


@pytest.mark.asyncio
async def test_streaming_dict_async() -> None:
"""Test streaming different types of items."""
app = FastAPI()

stream_dict = StreamingRunnable(iterable=[{"a": "1"}, {"a": "2"}])

add_routes(app, stream_dict)

# Invoke request
async with get_async_remote_runnable(app, raise_app_exceptions=False) as runnable:
chunks = []
async for chunk in runnable.astream("input ignored"):
chunks.append(chunk)

assert chunks == [{"a": "1"}, {"a": "2"}]


@pytest.mark.asyncio
async def test_server_side_error() -> None:
"""Test server side error handling."""

app = FastAPI()
add_routes(app, ErroringRunnable())

erroring_stream = StreamingRunnable(iterable=[1, 2, ValueError("An error")])
add_routes(app, erroring_stream)

# Invoke request
async with get_async_remote_runnable(app, raise_app_exceptions=False) as runnable:
Expand Down Expand Up @@ -1346,11 +1389,12 @@ async def test_server_side_error() -> None:
# assert e.response.text == "Internal Server Error"


def test_server_side_error_sync() -> None:
def test_server_side_error_sync(event_loop: AbstractEventLoop) -> None:
"""Test server side error handling."""

app = FastAPI()
add_routes(app, ErroringRunnable())
erroring_stream = StreamingRunnable(iterable=[1, 2, ValueError("An error")])
add_routes(app, erroring_stream)

# Invoke request
with get_sync_remote_runnable(app, raise_server_exceptions=False) as runnable:
Expand Down

0 comments on commit ce17cc4

Please sign in to comment.