diff --git a/docs/03-callback-config.ipynb b/docs/03-callback-config.ipynb new file mode 100644 index 0000000..34bac9f --- /dev/null +++ b/docs/03-callback-config.ipynb @@ -0,0 +1,499 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Callback Config" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The `callback` is an essential utility in `graphai` that allows us to easily stream output from various nodes in our graph. Through the `Callback` class we can modify what our returned tokens look like _and_ also what we are and are _not_ returning." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Default Callback" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's start with the default callback, we initialize it like so:" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/jamesbriggs/Documents/aurelio/graphai/.venv/lib/python3.12/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", + " from .autonotebook import tqdm as notebook_tqdm\n" + ] + } + ], + "source": [ + "from graphai.callback import Callback\n", + "\n", + "cb = Callback()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can setup a simple `stream` function to test this out:" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "async def stream(cb: Callback, text: str):\n", + " tokens = text.split(\" \")\n", + " for token in tokens:\n", + " await cb.acall(token)\n", + " await cb.close()\n", + " return" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To stream we use `asyncio.create_task` to _create the task_ and then we can use `async for` to stream the tokens from our initialized callback object (`cb`):" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['Hello', 'over', 'there', '']" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import asyncio\n", + "\n", + "response = asyncio.create_task(stream(cb, \"Hello over there\"))\n", + "\n", + "test = []\n", + "\n", + "async for token in cb.aiter():\n", + " test.append(token)\n", + "\n", + "test" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "When streaming we will see each token/word is output, with a final token to identify the end of the stream. By default the final token is `\"\"`, each part of this token is configurable and is split into three parts:\n", + "\n", + "- `identifier`: the identifier of the callback, in this case `graphai`\n", + "- `token`: the token to be returned, in this case `END`\n", + "- `params`: the parameters to be returned, in this case an empty string\n", + "\n", + "The default callback format here is:\n", + "\n", + "```\n", + "<{identifier}:{token}:{params}>\n", + "```\n", + "\n", + "We can modify the structure and the `identifier` as preferred, for example we can change the `identifier` to `demo` and the structure to `[{identifier}:{token}:{params}]` like so:" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "cb = Callback(\n", + " identifier=\"demo\",\n", + " special_token_format=\"[{identifier}:{token}:{params}]\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Rerun the stream:" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['Hello', 'over', 'there', '[demo:END:]']" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "response = asyncio.create_task(stream(cb, \"Hello over there\"))\n", + "\n", + "test = []\n", + "\n", + "async for token in cb.aiter():\n", + " test.append(token)\n", + "\n", + "test" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can also drop one or more of the components, if we'd like to keep only the `token` for example we can:" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "cb = Callback(\n", + " identifier=\"demo\",\n", + " special_token_format=\"<<{token}>>\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['Hello', 'over', 'there', '<>']" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "response = asyncio.create_task(stream(cb, \"Hello over there\"))\n", + "\n", + "test = []\n", + "\n", + "async for token in cb.aiter():\n", + " test.append(token)\n", + "\n", + "test" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Callback Across Nodes" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Our callback inherits information about which node it is being executed from, this is useful when we may want to treat streamed output differently depending on it's source. Let's setup a simple graph with a few nodes to see this in action:" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Adding edge from node_start to node_a\n", + "Adding edge from node_a to node_b\n", + "Adding edge from node_b to node_c\n", + "Adding edge from node_c to node_d\n", + "Adding edge from node_d to node_end\n" + ] + } + ], + "source": [ + "from graphai import Graph, node\n", + "import asyncio\n", + "\n", + "@node(start=True)\n", + "async def node_start(input: str):\n", + " await asyncio.sleep(0.1)\n", + " print(\">> node_start\")\n", + " # no stream added here\n", + " return {\"input\": input}\n", + "\n", + "@node(stream=True)\n", + "async def node_a(input: str, callback: Callback):\n", + " await asyncio.sleep(0.1)\n", + " print(\">> node_a\")\n", + " tokens = [\"Hello\", \"World\", \"!\"]\n", + " for token in tokens:\n", + " await callback.acall(token)\n", + " return {\"input\": input}\n", + "\n", + "@node(stream=True)\n", + "async def node_b(input: str, callback: Callback):\n", + " await asyncio.sleep(0.1)\n", + " print(\">> node_b\")\n", + " tokens = [\"Here\", \"is\", \"node\", \"B\", \"!\"]\n", + " for token in tokens:\n", + " await callback.acall(token)\n", + " return {\"input\": input}\n", + "\n", + "@node\n", + "async def node_c(input: str):\n", + " await asyncio.sleep(0.1)\n", + " print(\">> node_c\")\n", + " # no stream added here\n", + " return {\"input\": input}\n", + "\n", + "@node(stream=True)\n", + "async def node_d(input: str, callback: Callback):\n", + " await asyncio.sleep(0.1)\n", + " print(\">> node_d\")\n", + " tokens = [\"Here\", \"is\", \"node\", \"D\", \"!\"]\n", + " for token in tokens:\n", + " await callback.acall(token)\n", + " return {\"input\": input}\n", + "\n", + "@node(end=True)\n", + "async def node_end(input: str):\n", + " await asyncio.sleep(0.1)\n", + " print(\">> node_end\")\n", + " return {\"input\": input}\n", + "\n", + "graph = Graph()\n", + "\n", + "nodes = [node_start, node_a, node_b, node_c, node_d, node_end]\n", + "\n", + "for i, node_fn in enumerate(nodes):\n", + " graph.add_node(node_fn)\n", + " if i > 0:\n", + " print(f\"Adding edge from {nodes[i-1].__name__} to {node_fn.__name__}\")\n", + " graph.add_edge(nodes[i-1], node_fn)\n", + "\n", + "graph.compile()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's see this in action, note that we've added a `print` statement to each node to see the order in which they are executed, which we expect to align with the `{token}` field being output by node `start` and `end` special tokens." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + ">> node_start\n", + "\n", + "\n", + ">> node_a\n", + "Hello\n", + "World\n", + "!\n", + "\n", + "\n", + "\n", + ">> node_b\n", + "Here\n", + "is\n", + "node\n", + "B\n", + "!\n", + "\n", + ">> node_c\n", + "\n", + "\n", + ">> node_d\n", + "Here\n", + "is\n", + "node\n", + "D\n", + "!\n", + "\n", + ">> node_end\n", + "\n" + ] + } + ], + "source": [ + "cb = graph.get_callback()\n", + "\n", + "asyncio.create_task(graph.execute(\n", + " input={\"input\": \"Hello\"}\n", + "))\n", + "\n", + "async for token in cb.aiter():\n", + " print(token)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can see everything being output here, with each node we automatically output a `start` and `end` special token. Between these special tokens we know that every chunk of information being streamed back to us is being executed from inside that node.\n", + "\n", + "This behaviour can be particularly useful when wanted to treat output from various nodes differently, for example we may want to render that a specific tool is being used inside a research agent application, before rendering the research agent's final response as we would typically render streamed tokens." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Custom Callback" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can also modify the callback tokens as we did before, for example we can change the `identifier` to `demo` and the structure to `[{identifier}:{token}:{params}]` like so:" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "cb = Callback(\n", + " identifier=\"demo\",\n", + " special_token_format=\"[{identifier}:{token}:{params}]\"\n", + ")\n", + "\n", + "# set the custom callback to our graph\n", + "graph.callback = cb" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We now execute the graph:" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + ">> node_start\n", + "[demo:node_a:start:]\n", + "[demo:node_a:]\n", + ">> node_a\n", + "Hello\n", + "World\n", + "!\n", + "[demo:node_a:end:]\n", + "[demo:node_b:start:]\n", + "[demo:node_b:]\n", + ">> node_b\n", + "Here\n", + "is\n", + "node\n", + "B\n", + "!\n", + "[demo:node_b:end:]\n", + ">> node_c\n", + "[demo:node_d:start:]\n", + "[demo:node_d:]\n", + ">> node_d\n", + "Here\n", + "is\n", + "node\n", + "D\n", + "!\n", + "[demo:node_d:end:]\n", + ">> node_end\n", + "[demo:END:]\n" + ] + } + ], + "source": [ + "asyncio.create_task(graph.execute(\n", + " input={\"input\": \"Hello\"}\n", + "))\n", + "\n", + "async for token in cb.aiter():\n", + " print(token)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "With that we have our custom callback being used across our graph.\n", + "\n", + "---" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.7" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}