From 33dc4e1d698aae6e302332c05a1520ee417960a9 Mon Sep 17 00:00:00 2001 From: Ankush Gola Date: Mon, 9 Dec 2024 11:27:05 -0800 Subject: [PATCH 1/6] add repro script --- python/bench/tracing_script.py | 33 +++++++++++++++++++++++++++++++++ python/langsmith/client.py | 1 + 2 files changed, 34 insertions(+) create mode 100644 python/bench/tracing_script.py diff --git a/python/bench/tracing_script.py b/python/bench/tracing_script.py new file mode 100644 index 000000000..8ef45a54f --- /dev/null +++ b/python/bench/tracing_script.py @@ -0,0 +1,33 @@ +from langsmith import traceable, wrappers +from openai import Client + +import os +os.environ["LANGCHAIN_PROJECT"] = "llm_messages_test_py" +os.environ["LANGSMITH_USE_PYO3_CLIENT"] = "true" + +openai = wrappers.wrap_openai(Client()) + +import openai +from langsmith import traceable +from langsmith.wrappers import wrap_openai + +client = wrap_openai(openai.Client()) + +@traceable(run_type="tool", name="Retrieve Context") +def my_tool(question: str) -> str: + return "During this morning's meeting, we solved all world conflict." + +@traceable(name="Chat Pipeline") +def chat_pipeline(question: str): + context = my_tool(question) + messages = [ + { "role": "system", "content": "You are a helpful assistant. Please respond to the user's request only based on the given context." }, + { "role": "user", "content": f"Question: {question}\nContext: {context}"} + ] + chat_completion = client.chat.completions.create( + model="gpt-4o-mini", messages=messages + ) + return chat_completion.choices[0].message.content + +if __name__ == "__main__": + chat_pipeline("Can you summarize this morning's meetings?") \ No newline at end of file diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 1d27e7c99..a6377e23e 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -1288,6 +1288,7 @@ def create_run( and run_create.get("dotted_order") is not None ): if self._pyo3_client is not None: + print("RUN_CREATE", run_create) self._pyo3_client.create_run(run_create) elif self.tracing_queue is not None: serialized_op = serialize_run_dict("post", run_create) From f376e33a0b1296916a1dc84994d0cd1a309bac55 Mon Sep 17 00:00:00 2001 From: Ankush Gola Date: Mon, 9 Dec 2024 13:19:21 -0800 Subject: [PATCH 2/6] compression changes --- python/langsmith/_internal/_operations.py | 21 +++++++++++++++++++++ python/langsmith/client.py | 9 ++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/python/langsmith/_internal/_operations.py b/python/langsmith/_internal/_operations.py index 66decff0f..89d5f12fc 100644 --- a/python/langsmith/_internal/_operations.py +++ b/python/langsmith/_internal/_operations.py @@ -141,6 +141,27 @@ def serialize_run_dict( attachments=attachments if attachments is not None else None, ) +def serialize_run_dict_for_compressed_ingest( + operation: Literal["post", "patch"], payload: dict +): + inputs = payload.pop("inputs", None) + outputs = payload.pop("outputs", None) + events = payload.pop("events", None) + attachments = payload.pop("attachments", None) + serialized = ... + extra = ... + return SerializedRunOperation( + operation=operation, + id=payload["id"], + trace_id=payload["trace_id"], + _none=_dumps_json(payload), + inputs=_dumps_json(inputs) if inputs is not None else None, + outputs=_dumps_json(outputs) if outputs is not None else None, + events=_dumps_json(events) if events is not None else None, + attachments=attachments if attachments is not None else None, + ) + + def combine_serialized_queue_operations( ops: list[Union[SerializedRunOperation, SerializedFeedbackOperation]], diff --git a/python/langsmith/client.py b/python/langsmith/client.py index a6377e23e..63c93fc03 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -494,10 +494,13 @@ def __init__( if info is None or isinstance(info, ls_schemas.LangSmithInfo) else ls_schemas.LangSmithInfo(**info) ) + self.compressed_multipart_buffer = ... weakref.finalize(self, close_session, self.session) atexit.register(close_session, session_) # Initialize auto batching - if auto_batch_tracing: + if auto_batch_tracing and _compression_enabled: + ... + elif auto_batch_tracing: self.tracing_queue: Optional[PriorityQueue] = PriorityQueue() threading.Thread( @@ -1295,6 +1298,10 @@ def create_run( self.tracing_queue.put( TracingQueueItem(run_create["dotted_order"], serialized_op) ) + elif os.environ["COMP"]: + # Do something different + # Use existing serialized_run_dict + else: # Neither Rust nor Python batch ingestion is configured, # fall back to the non-batch approach. From 579e258239cd454e2bcc982d05d925ce0b140e27 Mon Sep 17 00:00:00 2001 From: Ankush Gola Date: Tue, 10 Dec 2024 12:32:11 -0800 Subject: [PATCH 3/6] point at local --- python/bench/tracing_script.py | 6 +----- python/langsmith/client.py | 2 ++ 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/python/bench/tracing_script.py b/python/bench/tracing_script.py index 8ef45a54f..ab6ef81ce 100644 --- a/python/bench/tracing_script.py +++ b/python/bench/tracing_script.py @@ -1,11 +1,7 @@ -from langsmith import traceable, wrappers -from openai import Client - import os os.environ["LANGCHAIN_PROJECT"] = "llm_messages_test_py" os.environ["LANGSMITH_USE_PYO3_CLIENT"] = "true" - -openai = wrappers.wrap_openai(Client()) +os.environ["LANGSMITH_ENDPOINT"] = "http://localhost:1984" import openai from langsmith import traceable diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 54dbb4237..6bede3a15 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -1754,6 +1754,8 @@ def update_run( if data["extra"]: self._insert_runtime_env([data]) + print("UPDATE_RUN", data) + if self._pyo3_client is not None: self._pyo3_client.update_run(data) elif use_multipart and self.tracing_queue is not None: From a22f99c960a1b3a3fa4770206d10539b4b40eb53 Mon Sep 17 00:00:00 2001 From: Ankush Gola Date: Thu, 12 Dec 2024 00:00:33 -0800 Subject: [PATCH 4/6] comment out update path --- python/bench/tracing_script.py | 1 - python/langsmith/client.py | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/python/bench/tracing_script.py b/python/bench/tracing_script.py index ab6ef81ce..b574c6b8d 100644 --- a/python/bench/tracing_script.py +++ b/python/bench/tracing_script.py @@ -1,7 +1,6 @@ import os os.environ["LANGCHAIN_PROJECT"] = "llm_messages_test_py" os.environ["LANGSMITH_USE_PYO3_CLIENT"] = "true" -os.environ["LANGSMITH_ENDPOINT"] = "http://localhost:1984" import openai from langsmith import traceable diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 6bede3a15..e67c106ff 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -1756,9 +1756,9 @@ def update_run( print("UPDATE_RUN", data) - if self._pyo3_client is not None: - self._pyo3_client.update_run(data) - elif use_multipart and self.tracing_queue is not None: + # if self._pyo3_client is not None: + # self._pyo3_client.update_run(data) + if use_multipart and self.tracing_queue is not None: # not collecting attachments currently, use empty dict serialized_op = serialize_run_dict(operation="patch", payload=data) self.tracing_queue.put( From 0593cc1376eda0dcb878297c1eb636271eaabdfc Mon Sep 17 00:00:00 2001 From: Ankush Gola Date: Thu, 12 Dec 2024 00:04:20 -0800 Subject: [PATCH 5/6] fix --- python/langsmith/client.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index d03153f84..e67c106ff 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -494,13 +494,10 @@ def __init__( if info is None or isinstance(info, ls_schemas.LangSmithInfo) else ls_schemas.LangSmithInfo(**info) ) - self.compressed_multipart_buffer = ... weakref.finalize(self, close_session, self.session) atexit.register(close_session, session_) # Initialize auto batching - if auto_batch_tracing and _compression_enabled: - ... - elif auto_batch_tracing: + if auto_batch_tracing: self.tracing_queue: Optional[PriorityQueue] = PriorityQueue() threading.Thread( @@ -1298,10 +1295,6 @@ def create_run( self.tracing_queue.put( TracingQueueItem(run_create["dotted_order"], serialized_op) ) - elif os.environ["COMP"]: - # Do something different - # Use existing serialized_run_dict - else: # Neither Rust nor Python batch ingestion is configured, # fall back to the non-batch approach. From c8d606b4279608c39a20c57dd8c8d554816dd072 Mon Sep 17 00:00:00 2001 From: Ankush Gola Date: Thu, 12 Dec 2024 08:41:18 -0800 Subject: [PATCH 6/6] new script --- python/bench/tracing_script.py | 52 ++++++++++++++++++++++------------ python/langsmith/client.py | 8 +++--- 2 files changed, 38 insertions(+), 22 deletions(-) diff --git a/python/bench/tracing_script.py b/python/bench/tracing_script.py index b574c6b8d..30084db9f 100644 --- a/python/bench/tracing_script.py +++ b/python/bench/tracing_script.py @@ -2,27 +2,43 @@ os.environ["LANGCHAIN_PROJECT"] = "llm_messages_test_py" os.environ["LANGSMITH_USE_PYO3_CLIENT"] = "true" -import openai from langsmith import traceable -from langsmith.wrappers import wrap_openai -client = wrap_openai(openai.Client()) +@traceable +def format_prompt(subject): + return [ + { + "role": "system", + "content": "You are a helpful assistant.", + }, + { + "role": "user", + "content": f"What's a good name for a store that sells {subject}?" + } + ] -@traceable(run_type="tool", name="Retrieve Context") -def my_tool(question: str) -> str: - return "During this morning's meeting, we solved all world conflict." +@traceable(run_type="llm") +def invoke_llm(messages): + return { + "choices": [ + { + "message": { + "role": "assistant", + "content": "Sure, how about 'Rainbow Socks'?" + } + } + ] +} -@traceable(name="Chat Pipeline") -def chat_pipeline(question: str): - context = my_tool(question) - messages = [ - { "role": "system", "content": "You are a helpful assistant. Please respond to the user's request only based on the given context." }, - { "role": "user", "content": f"Question: {question}\nContext: {context}"} - ] - chat_completion = client.chat.completions.create( - model="gpt-4o-mini", messages=messages - ) - return chat_completion.choices[0].message.content +@traceable +def parse_output(response): + return response["choices"][0]["message"]["content"] + +@traceable +def run_pipeline(): + messages = format_prompt("colorful socks") + response = invoke_llm(messages) + return parse_output(response) if __name__ == "__main__": - chat_pipeline("Can you summarize this morning's meetings?") \ No newline at end of file + run_pipeline() \ No newline at end of file diff --git a/python/langsmith/client.py b/python/langsmith/client.py index e67c106ff..c47438cbe 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -1287,10 +1287,10 @@ def create_run( run_create.get("trace_id") is not None and run_create.get("dotted_order") is not None ): - if self._pyo3_client is not None: - print("RUN_CREATE", run_create) - self._pyo3_client.create_run(run_create) - elif self.tracing_queue is not None: + # if self._pyo3_client is not None: + # print("RUN_CREATE", run_create) + # self._pyo3_client.create_run(run_create) + if self.tracing_queue is not None: serialized_op = serialize_run_dict("post", run_create) self.tracing_queue.put( TracingQueueItem(run_create["dotted_order"], serialized_op)