From fe014d3d231997998bbfb4830b5b7fe0c0098162 Mon Sep 17 00:00:00 2001 From: SN <6432132+samnoyes@users.noreply.github.com> Date: Thu, 16 Jan 2025 18:59:31 -0800 Subject: [PATCH 1/2] fix: send error instead of extra --- python/langsmith/client.py | 10 +++++----- python/langsmith/evaluation/_arunner.py | 3 +-- python/langsmith/evaluation/_runner.py | 3 +-- python/langsmith/evaluation/evaluator.py | 4 ++-- python/langsmith/schemas.py | 4 ++-- 5 files changed, 11 insertions(+), 13 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index c7bb8299c..bd55e8965 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -5047,8 +5047,8 @@ def _submit_feedback(**kwargs): ), feedback_source_type=ls_schemas.FeedbackSourceType.MODEL, project_id=project_id, - extra=res.extra, trace_id=run.trace_id if run else None, + error=res.error, ) return results @@ -5116,7 +5116,7 @@ def create_feedback( project_id: Optional[ID_TYPE] = None, comparative_experiment_id: Optional[ID_TYPE] = None, feedback_group_id: Optional[ID_TYPE] = None, - extra: Optional[Dict] = None, + error: Optional[bool] = None, trace_id: Optional[ID_TYPE] = None, **kwargs: Any, ) -> ls_schemas.Feedback: @@ -5162,8 +5162,8 @@ def create_feedback( feedback_group_id (Optional[Union[UUID, str]]): When logging preferences, ranking runs, or other comparative feedback, this is used to group feedback together. - extra (Optional[Dict]): - Metadata for the feedback. + error (Optional[bool]): + Whether the evaluator run errored. trace_id (Optional[Union[UUID, str]]): The trace ID of the run to provide feedback for. Enables batch ingestion. **kwargs (Any): @@ -5234,7 +5234,7 @@ def create_feedback( comparative_experiment_id, accept_null=True ), feedback_group_id=_ensure_uuid(feedback_group_id, accept_null=True), - extra=extra, + error=error, ) use_multipart = (self.info.batch_ingest_config or {}).get( diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index 7b29241e6..745c7384e 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -817,9 +817,8 @@ async def _arun_evaluators( results=[ EvaluationResult( key=key, - source_run_id=run.id, comment=repr(e), - extra={"error": True}, + error=True, ) for key in feedback_keys ] diff --git a/python/langsmith/evaluation/_runner.py b/python/langsmith/evaluation/_runner.py index b680973b5..4b3fbd286 100644 --- a/python/langsmith/evaluation/_runner.py +++ b/python/langsmith/evaluation/_runner.py @@ -1591,9 +1591,8 @@ def _run_evaluators( results=[ EvaluationResult( key=key, - source_run_id=run.id, comment=repr(e), - extra={"error": True}, + error=True, ) for key in feedback_keys ] diff --git a/python/langsmith/evaluation/evaluator.py b/python/langsmith/evaluation/evaluator.py index a1505699a..7d65c231e 100644 --- a/python/langsmith/evaluation/evaluator.py +++ b/python/langsmith/evaluation/evaluator.py @@ -94,8 +94,8 @@ class EvaluationResult(BaseModel): If none provided, the evaluation feedback is applied to the root trace being.""" - extra: Optional[Dict] = None - """Metadata for the evaluator run.""" + error: Optional[bool] = None + """If the evaluator run errored.""" class Config: """Pydantic model configuration.""" diff --git a/python/langsmith/schemas.py b/python/langsmith/schemas.py index 58515df77..0337086bb 100644 --- a/python/langsmith/schemas.py +++ b/python/langsmith/schemas.py @@ -583,8 +583,8 @@ class FeedbackBase(BaseModel): """For preference scoring, this group ID is shared across feedbacks for each run in the group that was being compared.""" - extra: Optional[Dict] = None - """The metadata of the feedback.""" + error: Optional[bool] = None + """Whether the evaluator run errored.""" class Config: """Configuration class for the schema.""" From 1e7130427b7cfc52fc925f7ed9386c9c8cfe015c Mon Sep 17 00:00:00 2001 From: William Fu-Hinthorn <13333726+hinthornw@users.noreply.github.com> Date: Fri, 17 Jan 2025 13:42:19 -0800 Subject: [PATCH 2/2] An attempt at collecting runs --- python/langsmith/evaluation/_arunner.py | 35 +++++++------ python/langsmith/evaluation/_runner.py | 31 ++++++++---- python/langsmith/run_helpers.py | 65 +++++++++++++++++++++---- 3 files changed, 99 insertions(+), 32 deletions(-) diff --git a/python/langsmith/evaluation/_arunner.py b/python/langsmith/evaluation/_arunner.py index 745c7384e..59e217c20 100644 --- a/python/langsmith/evaluation/_arunner.py +++ b/python/langsmith/evaluation/_arunner.py @@ -8,6 +8,7 @@ import logging import pathlib import uuid +from contextlib import ExitStack from typing import ( TYPE_CHECKING, Any, @@ -784,15 +785,21 @@ async def _arun_evaluators( **(current_context["metadata"] or {}), **{"experiment": self.experiment_name}, } - with rh.tracing_context( - **{ - **current_context, - "project_name": "evaluators", - "metadata": metadata, - "enabled": "local" if not self._upload_results else True, - "client": self.client, - } - ): + stack = ExitStack() + + stack.enter_context( + rh.tracing_context( + **{ + **current_context, + "project_name": "evaluators", + "metadata": metadata, + "enabled": "local" if not self._upload_results else True, + "client": self.client, + } + ) + ) + run_collector = stack.enter_context(rh._on_run_set()) + with stack: run = current_results["run"] example = current_results["example"] eval_results = current_results["evaluation_results"] @@ -812,12 +819,16 @@ async def _arun_evaluators( except Exception as e: try: feedback_keys = _extract_feedback_keys(evaluator) + source_run_id = ( + run_collector.runs[-1].id if run_collector.runs else None + ) error_response = EvaluationResults( results=[ EvaluationResult( key=key, comment=repr(e), + source_run_id=source_run_id, error=True, ) for key in feedback_keys @@ -838,11 +849,7 @@ async def _arun_evaluators( f" run {run.id}: {repr(e)}", exc_info=True, ) - logger.error( - f"Error running evaluator {repr(evaluator)} on" - f" run {run.id}: {repr(e)}", - exc_info=True, - ) + run_collector.runs.clear() if example.attachments is not None: for attachment in example.attachments: reader = example.attachments[attachment]["reader"] diff --git a/python/langsmith/evaluation/_runner.py b/python/langsmith/evaluation/_runner.py index 4b3fbd286..1dffcaa52 100644 --- a/python/langsmith/evaluation/_runner.py +++ b/python/langsmith/evaluation/_runner.py @@ -17,6 +17,7 @@ import textwrap import threading import uuid +from contextlib import ExitStack from contextvars import copy_context from typing import ( TYPE_CHECKING, @@ -1556,15 +1557,21 @@ def _run_evaluators( "reference_run_id": current_results["run"].id, }, } - with rh.tracing_context( - **{ - **current_context, - "project_name": "evaluators", - "metadata": metadata, - "enabled": "local" if not self._upload_results else True, - "client": self.client, - } - ): + stack = ExitStack() + stack.enter_context( + rh.tracing_context( + **{ + **current_context, + "project_name": "evaluators", + "metadata": metadata, + "enabled": "local" if not self._upload_results else True, + "client": self.client, + } + ) + ) + run_collector = stack.enter_context(rh._on_run_set()) + + with stack: run = current_results["run"] example = current_results["example"] eval_results = current_results["evaluation_results"] @@ -1586,12 +1593,16 @@ def _run_evaluators( except Exception as e: try: feedback_keys = _extract_feedback_keys(evaluator) + source_run_id = ( + run_collector.runs[-1].id if run_collector.runs else None + ) error_response = EvaluationResults( results=[ EvaluationResult( key=key, comment=repr(e), + source_run_id=source_run_id, error=True, ) for key in feedback_keys @@ -1613,6 +1624,8 @@ def _run_evaluators( f" run {run.id if run else ''}: {repr(e)}", exc_info=True, ) + run_collector.runs.clear() + if example.attachments is not None: for attachment in example.attachments: reader = example.attachments[attachment]["reader"] diff --git a/python/langsmith/run_helpers.py b/python/langsmith/run_helpers.py index b9942a174..68c2f64eb 100644 --- a/python/langsmith/run_helpers.py +++ b/python/langsmith/run_helpers.py @@ -59,8 +59,6 @@ _PROJECT_NAME = contextvars.ContextVar[Optional[str]]("_PROJECT_NAME", default=None) _TAGS = contextvars.ContextVar[Optional[List[str]]]("_TAGS", default=None) _METADATA = contextvars.ContextVar[Optional[Dict[str, Any]]]("_METADATA", default=None) - - _TRACING_ENABLED = contextvars.ContextVar[Optional[Union[bool, Literal["local"]]]]( "_TRACING_ENABLED", default=None ) @@ -74,6 +72,10 @@ "client": _CLIENT, } +_ON_RUN_SET = contextvars.ContextVar[Optional["_Collector"]]( + "_ON_RUN_SET", default=None +) + def get_current_run_tree() -> Optional[run_trees.RunTree]: """Get the current run tree.""" @@ -945,11 +947,15 @@ def _setup(self) -> run_trees.RunTree: if enabled is True: self.new_run.post() if enabled: - _TAGS.set(tags_) - _METADATA.set(metadata) - _PARENT_RUN_TREE.set(self.new_run) - _PROJECT_NAME.set(project_name_) - _CLIENT.set(client_) + _set_tracing_context( + { + "tags": tags_, + "metadata": metadata, + "parent": self.new_run, + "project_name": project_name_, + "client": client_, + } + ) return self.new_run @@ -1434,8 +1440,13 @@ def _setup_run( on_end=langsmith_extra.get("on_end"), context=context, ) - context.run(_PROJECT_NAME.set, response_container["project_name"]) - context.run(_PARENT_RUN_TREE.set, response_container["new_run"]) + context.run( + _set_tracing_context, + { + "project_name": response_container["project_name"], + "parent": response_container["new_run"], + }, + ) return response_container @@ -1537,6 +1548,10 @@ def _get_inputs_and_attachments_safe( def _set_tracing_context(context: Dict[str, Any]): """Set the tracing context.""" for k, v in context.items(): + if k == "parent": + cb = _ON_RUN_SET.get() + if cb is not None: + cb(v) var = _CONTEXT_KEYS[k] var.set(v) @@ -1774,3 +1789,35 @@ def _get_function_result(results: list, reduce_fn: Callable) -> Any: return results else: return results + + +class _Collector: + """Collect runs set in contex.""" + + __slots__ = ("parent_run_id", "runs") + + def __init__(self, parent_run_id: Optional[uuid.UUID]): + """Construct callback.""" + self.parent_run_id = parent_run_id + self.runs = [] + + def __call__(self, run: Optional[schemas.Run]): + """Add a run.""" + try: + if run is None: + return + if run.parent_run_id == self.parent_run_id: + self.runs.append(run) + except Exception: + pass + + +@contextlib.contextmanager +def _on_run_set() -> Generator[_Collector, None, None]: + parent_run_tree = _PARENT_RUN_TREE.get() + parent_run_id = parent_run_tree.run_id if parent_run_tree else None + collector = _Collector(parent_run_id) + prev = _ON_RUN_SET.get() + _ON_RUN_SET.set(collector) + yield collector + _ON_RUN_SET.set(prev)