Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
baskaryan committed Jan 11, 2025
1 parent 414ef69 commit 1446452
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 43 deletions.
77 changes: 35 additions & 42 deletions python/langsmith/testing/_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ def _end_tests(
"revision_id": ls_env.get_langchain_env_var_metadata().get("revision_id"),
},
)
test_suite.wait()
test_suite.shutdown()


VT = TypeVar("VT", bound=Optional[dict])
Expand Down Expand Up @@ -448,7 +448,7 @@ def __init__(
self._dataset = dataset
self._version: Optional[datetime.datetime] = None
self._executor = ls_utils.ContextThreadPoolExecutor(max_workers=1)
self._example_futures: list[Future] = []
self._example_futures: dict[ID_TYPE, list[Future]] = defaultdict(list)
atexit.register(_end_tests, self)

@property
Expand Down Expand Up @@ -537,7 +537,8 @@ def sync_example(
outputs,
metadata.copy() if metadata else metadata,
)
self._example_futures.append(future)
with self._lock:
self._example_futures[example_id].append(future)

def _sync_example(
self,
Expand Down Expand Up @@ -585,17 +586,16 @@ def _submit_feedback(
)

def _create_feedback(self, run_id: ID_TYPE, feedback: dict, **kwargs: Any) -> None:
run = self.client.read_run(run_id)
trace_id = run.trace_id
trace_id = self.client.read_run(run_id).trace_id
self.client.create_feedback(trace_id, **feedback, **kwargs)

def wait(self):
def shutdown(self):
self._executor.shutdown(wait=True)

def wait_example_updates(self):
def wait_example_updates(self, example_id: ID_TYPE):
"""Wait for all example updates to complete."""
while self._example_futures:
self._example_futures.pop().result()
while self._example_futures[example_id]:
self._example_futures[example_id].pop().result()

def end_run(self, run_tree, example_id, outputs) -> Future:
return self._executor.submit(
Expand All @@ -604,12 +604,13 @@ def end_run(self, run_tree, example_id, outputs) -> Future:

def _end_run(self, run_tree, example_id, outputs) -> None:
# Ensure example is fully updated
self.wait_example_updates()
self.wait_example_updates(example_id)
# Ensure that run end time is after example modified at.
end_time = cast(
datetime.datetime, self.client.read_example(example_id).modified_at
) + datetime.timedelta(seconds=0.01)
run_tree.end(outputs=outputs, end_time=end_time)
run_tree.patch()


class _TestCase:
Expand Down Expand Up @@ -681,7 +682,6 @@ def _test():
inspect.signature(func), *test_args, **test_kwargs
)
# Make sure example is created before creating a run that references it.
test_suite.wait_example_updates()
with rh.trace(
name=getattr(func, "__name__", "Test"),
run_id=run_id,
Expand All @@ -692,27 +692,25 @@ def _test():
) as run_tree:
try:
result = func(*test_args, **test_kwargs)
outputs = (
result
if result is None or isinstance(result, dict)
else {"output": result}
)
run_future = test_suite.end_run(run_tree, example_id, outputs)
except SkipException as e:
test_suite.submit_result(run_id, error=repr(e), skipped=True)
outputs = {"skipped_reason": repr(e)}
test_suite.end_run(run_tree, example_id, outputs).result()
test_suite.end_run(run_tree, example_id, outputs)
raise e
except BaseException as e:
test_suite.submit_result(run_id, error=repr(e))
raise e
try:
test_suite.submit_result(run_id, error=None)
except BaseException as e:
logger.warning(f"Failed to create feedback for run_id {run_id}: {e}")

# Ensure run is updated before exiting tracing context.
run_future.result()
else:
outputs = (
result
if result is None or isinstance(result, dict)
else {"output": result}
)
test_suite.end_run(run_tree, example_id, outputs)
try:
test_suite.submit_result(run_id, error=None)
except BaseException as e:
logger.warning(f"Failed to create feedback for run_id {run_id}: {e}")

cache_path = (
Path(langtest_extra["cache"]) / f"{test_suite.id}.yaml"
Expand Down Expand Up @@ -748,8 +746,6 @@ async def _test():
func_inputs = rh._get_inputs_safe(
inspect.signature(func), *test_args, **test_kwargs
)
# Make sure example is created before creating a run that references it.
test_suite.wait_example_updates()
with rh.trace(
name=getattr(func, "__name__", "Test"),
run_id=run_id,
Expand All @@ -760,28 +756,25 @@ async def _test():
) as run_tree:
try:
result = await func(*test_args, **test_kwargs)
outputs = (
result
if result is None or isinstance(result, dict)
else {"output": result}
)
run_future = test_suite.end_run(run_tree, example_id, outputs)
except SkipException as e:
test_suite.submit_result(run_id, error=repr(e), skipped=True)
outputs = {"skipped_reason": repr(e)}
test_suite.end_run(run_tree, example_id, outputs).result()
test_suite.end_run(run_tree, example_id, outputs)
raise e
except BaseException as e:
test_suite.submit_result(run_id, error=repr(e))
raise e

try:
test_suite.submit_result(run_id, error=None)
except BaseException as e:
logger.warning(f"Failed to create feedback for run_id {run_id}: {e}")

# Ensure run is updated before exiting tracing context.
run_future.result()
else:
outputs = (
result
if result is None or isinstance(result, dict)
else {"output": result}
)
test_suite.end_run(run_tree, example_id, outputs)
try:
test_suite.submit_result(run_id, error=None)
except BaseException as e:
logger.warning(f"Failed to create feedback for run_id {run_id}: {e}")

cache_path = (
Path(langtest_extra["cache"]) / f"{test_suite.id}.yaml"
Expand Down
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "langsmith"
version = "0.2.11rc4"
version = "0.2.11rc5"
description = "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform."
authors = ["LangChain <[email protected]>"]
license = "MIT"
Expand Down

0 comments on commit 1446452

Please sign in to comment.