Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to Set Interrupts for Subgraph Nodes in LangGraph GUI #232

Open
johnbenac opened this issue Jan 21, 2025 · 0 comments
Open

Unable to Set Interrupts for Subgraph Nodes in LangGraph GUI #232

johnbenac opened this issue Jan 21, 2025 · 0 comments

Comments

@johnbenac
Copy link

johnbenac commented Jan 21, 2025

Description:
In the LangGraph GUI, subgraphs are treated as single nodes when it comes to interrupt configuration. While it is possible to add interrupts to the entire subgraph (e.g., joke_builder), the individual nodes within the subgraph (e.g., make_setup and make_punchline) cannot be configured independently for interrupts. This limitation reduces flexibility when debugging or customizing the behavior of workflows.

Expected Behavior:
The GUI should allow for granular interrupt configuration, enabling users to add interrupts to individual nodes within subgraphs in addition to the entire subgraph.

Actual Behavior:
The entire subgraph is treated as a single node that can have interrupts. However, the nodes within the subgraph (make_setup and make_punchline) are not accessible for independent interrupt configuration.

Image

Steps to Reproduce:

Open the LangGraph GUI.
Navigate to the workflow where joke_builder contains the subgraph nodes make_setup and make_punchline.
Attempt to add interrupts to the nodes within the subgraph.
Observe that interrupt options are only available for the entire joke_builder subgraph and not for its internal nodes.

Environment Details:

  • Operating System: macOS Sonoma (Intel Mac)
  • Running on: Docker Desktop
  • Python Version: 3.11
  • LangGraph Version: Referenced in project dependencies.

Relevant Code and Configuration:
Here are the key details from the project structure:

  • Subgraph nodes are defined in joke_generation_graph (File: ./src/agent/graphs/joke_generation_graph.py):
    subgraph_builder.add_node("make_setup", make_setup)
    subgraph_builder.add_node("make_punchline", make_punchline)
  • The subgraph is invoked in joke_builder (File: ./src/agent/agents/joke_builder_agent.py):
    result = await subgraph.ainvoke(subgraph_state)

Suggested Fix:
Extend the GUI to allow interrupt configuration at the node level within subgraphs. This could involve:

Expanding subgraphs in the GUI to expose internal nodes for interrupt configuration.
Ensuring interrupt functionality applies to individual nodes and not just the subgraph as a whole.

Priority:
High, as the current limitation hinders debugging and fine-tuning of workflows.

here is the code:



File: ./langgraph.json
------------------------------------
{
  "dockerfile_lines": [],
  "graphs": {
    "agent": "./src/agent/graphs/main_graph.py:graph"
  },
  "env": ".env",
  "python_version": "3.11",
  "dependencies": [
    "."
  ]
}


File: ./tests/unit_tests/test_configuration.py
------------------------------------
from agent.configuration import Configuration


def test_configuration_empty() -> None:
    Configuration.from_runnable_config({})


File: ./tests/unit_tests/__init__.py
------------------------------------
"""Define any unit tests you may want in this directory."""


File: ./tests/integration_tests/__init__.py
------------------------------------
"""Define any integration tests you want in this directory."""


File: ./tests/integration_tests/test_graph.py
------------------------------------
import pytest
from langsmith import unit

from agent import graph


@pytest.mark.asyncio
@unit
async def test_agent_simple_passthrough() -> None:
    res = await graph.ainvoke({"changeme": "some_val"})
    assert res is not None


File: ./src/agent/configuration.py
------------------------------------
"""This module contains configuration classes and utilities for the agent."""

from __future__ import annotations

from dataclasses import dataclass, fields
import logging
from typing import Optional, Literal

from langchain_core.runnables import RunnableConfig
from langchain_openai import ChatOpenAI

# Set up logging for the entire application
logger = logging.getLogger("agent")
logger.setLevel(logging.INFO)

# ModelName = Literal[
#     "gpt-4",
#     "gpt-4-turbo-preview",
#     "gpt-3.5-turbo",
#     "gpt-3.5-turbo-16k"
# ]

@dataclass(kw_only=True)
class Configuration:
    """The configuration for the agent.
    
    This class defines both runtime configurations and creates shared resources
    like the LLM that can be used across all agents.
    """
    # LLM Configuration
    model_name: ModelName = "gpt-4o"
    temperature: float = 0.7
    max_tokens: Optional[int] = None
    
    # Other configurable parameters
    my_configurable_param: str = "changeme"

    @classmethod
    def from_runnable_config(
        cls, config: Optional[RunnableConfig] = None
    ) -> Configuration:
        """Create a Configuration instance from a RunnableConfig object."""
        configurable = (config.get("configurable") or {}) if config else {}
        _fields = {f.name for f in fields(cls) if f.init}
        return cls(**{k: v for k, v in configurable.items() if k in _fields})
    
    @property
    def llm(self) -> ChatOpenAI:
        """Get a configured LLM instance."""
        return ChatOpenAI(
            model=self.model_name,
            temperature=self.temperature,
            max_tokens=self.max_tokens
        )

# Create a default configuration instance
default_config = Configuration()


File: ./src/agent/tools/response_evaluation.py
------------------------------------
"""Tool for evaluating user responses to jokes."""
from typing import Annotated
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import InjectedToolArg

from .models import JokeResponse

async def evaluate_response(
    response: str,
    *,
    config: Annotated[RunnableConfig, InjectedToolArg]
) -> JokeResponse:
    """Evaluate if the user wants to hear another joke."""
    # Default to simple text analysis as fallback
    basic_indicators = {
        "yes": True, "sure": True, "another": True, "more": True,
        "no": False, "nope": False, "enough": False, "stop": False
    }
    response_lower = response.lower()
    
    for word, indicates_more in basic_indicators.items():
        if word in response_lower:
            reason = f"Response contained keyword '{word}'"
            return JokeResponse(wants_another=indicates_more, reason=reason)
    
    # If no clear indicators, assume they don't want more
    return JokeResponse(
        wants_another=False,
        reason="No clear indication of wanting more jokes"
    )

__all__ = ["evaluate_response"]


File: ./src/agent/tools/models.py
------------------------------------
"""Models used by the tools."""
from pydantic import BaseModel

class JokeResponse(BaseModel):
    """Response from the user about a joke."""
    wants_another: bool
    reason: str

__all__ = ["JokeResponse"]


File: ./src/agent/tools/__init__.py
------------------------------------
"""Tools module for the agent.

This module provides various tools and utilities that can be used by the agents
in the system, including:
- Response evaluation tools for understanding user feedback
- Web search capabilities for retrieving information
"""

from .models import JokeResponse
from .response_evaluation import evaluate_response
from .web_search import tavily_search

__all__ = [
    "JokeResponse",
    "evaluate_response",
    "tavily_search",
]


File: ./src/agent/tools/web_search.py
------------------------------------
"""Tool for performing web searches using Tavily."""
import os
from typing import Annotated
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import InjectedToolArg
from tavily import TavilyClient

async def tavily_search(
    query: str,
    *,
    config: Annotated[RunnableConfig, InjectedToolArg],
) -> str:
    """Search the internet using Tavily's search API.
    
    Args:
        query: The search query to send to Tavily.
            Include specific keywords and be as precise as possible.
        config: Configuration for the tool (injected).
    
    Returns:
        str: A formatted string containing search results and their summaries.
    """
    # Get Tavily API key from environment
    tavily_api_key = os.getenv("TAVILY_API_KEY")
    if not tavily_api_key:
        raise ValueError("TAVILY_API_KEY not found in environment variables")
    
    # Initialize Tavily client
    client = TavilyClient(api_key=tavily_api_key)
    
    try:
        # Perform the search
        response = client.search(
            query=query,
            search_depth="moderate",
            max_results=5
        )
        
        # Format the results into a readable summary
        formatted_results = []
        for idx, result in enumerate(response['results'], 1):
            formatted_results.append(
                f"{idx}. {result['title']}
"
                f"Source: {result['url']}
"
                f"Summary: {result['content'][:200]}...
"
            )
        
        return "

".join(formatted_results)
        
    except Exception as e:
        return f"Error performing search: {str(e)}"

__all__ = ["tavily_search"]


File: ./src/agent/__init__.py
------------------------------------
"""Agent-based LangGraph system.

This module defines a hierarchical system of agents organized into graphs
for generating jokes in different comedian styles.
"""

from .graphs.main_graph import graph
from .tools import evaluate_response, JokeResponse

__all__ = ["graph", "evaluate_response", "JokeResponse"]


File: ./src/agent/agents/setup_maker_agent.py
------------------------------------
import logging
from typing import TypedDict
from agent.state import MainState
from agent.configuration import default_config

# Set up logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

# Initialize LLM using configuration
llm = default_config.llm

class SetupState(TypedDict):
    setup: str
    comedian_style: str
    joke_state: dict

async def make_setup(state: SetupState) -> SetupState:
    """Setup maker agent that generates the setup for a joke."""
    logger.debug(f"[MAKE_SETUP] Making setup with state: {state}")
    
    prompt = """You are writing a joke in the style of {comedian_style}.
    Create the setup for the joke.
    
    What's the setup?"""
    
    response = await llm.ainvoke(prompt.format(comedian_style=state["comedian_style"]))
    setup = response.content.strip()
    logger.debug(f"[MAKE_SETUP] Generated setup: {setup}")
    
    return {"setup": setup, "joke_state": {"setup": setup}}


File: ./src/agent/agents/__init__.py
------------------------------------
from .style_chooser_agent import style_chooser
from .joke_builder_agent import joke_builder
from .joke_approver_agent import joke_approver
from .setup_maker_agent import make_setup
from .punchline_maker_agent import make_punchline

__all__ = [
    "style_chooser",
    "joke_builder",
    "joke_approver",
    "make_setup",
    "make_punchline"
]


File: ./src/agent/agents/style_chooser_agent.py
------------------------------------
from langchain_core.messages import AIMessage
import logging
from agent.state import MainState
from agent.configuration import default_config

# Set up logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

# Initialize LLM using configuration 
llm = default_config.llm

async def style_chooser(state: MainState) -> MainState:
    """Style chooser agent that selects a comedian's style."""
    logger.debug(f"[STYLE_CHOOSER] Received state: {state}")
    
    chat_messages = state.get("messages", [])
    
    prompt = """Based on the user's comedy preferences, choose a specific comedian whose style matches.
    Respond with just the comedian's name and a brief note about their style.
    """
    
    response = await llm.ainvoke([{"role": "system", "content": prompt}, *chat_messages])
    logger.debug(f"[STYLE_CHOOSER] Selected style: {response.content}")
    
    return {
        "plan": response.content,
        "messages": chat_messages + [AIMessage(content=f"I'll use this style: {response.content}")],
        "next_step": "joke_builder"
    }


File: ./src/agent/agents/joke_builder_agent.py
------------------------------------
from langchain_core.messages import AIMessage
import logging
from typing import Any
from agent.state import MainState, JokeState

# Set up logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

async def joke_builder(state: MainState, subgraph: Any = None) -> MainState:
    """Joke builder agent that coordinates joke generation using a proper subgraph."""
    logger.debug(f"[JOKE_BUILDER] Received state: {state}")
    
    subgraph_state = {
        "setup": "",
        "punchline": "",
        "comedian_style": state["plan"],  # This is our shared interface key
        "joke_state": {}
    }
    
    # The subgraph will be injected by the main graph
    if subgraph is None:
        # Lazy import to avoid circular dependency
        from agent.graphs.joke_generation_graph import joke_generation_graph
        subgraph = joke_generation_graph
        
    result = await subgraph.ainvoke(subgraph_state)
    logger.debug(f"[JOKE_BUILDER] Subgraph result: {result}")

    complete_joke = f"{result['joke_state']['setup']}

{result['joke_state']['punchline']}"
    
    return {
        "joke": result,
        "complete_joke": complete_joke,
        "messages": state.get("messages", []) + [
            AIMessage(content=f"Here's a joke in that style:

{complete_joke}

Do you want to hear another one?")
        ],
        "next_step": "joke_approver"
    }


File: ./src/agent/agents/punchline_maker_agent.py
------------------------------------
import logging
from typing import TypedDict
from agent.state import MainState
from agent.configuration import default_config

# Set up logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

# Initialize LLM using configuration
llm = default_config.llm

class PunchlineState(TypedDict):
    setup: str
    punchline: str
    comedian_style: str
    joke_state: dict

async def make_punchline(state: PunchlineState) -> PunchlineState:
    """Punchline maker agent that generates the punchline for a joke."""
    logger.debug(f"[MAKE_PUNCHLINE] Making punchline with state: {state}")
    
    prompt = """You are writing a joke in the style of {comedian_style}.
    Given this setup: {setup}
    
    What's the punchline?"""
    
    response = await llm.ainvoke(prompt.format(
        comedian_style=state["comedian_style"],
        setup=state["setup"]
    ))
    punchline = response.content.strip()
    logger.debug(f"[MAKE_PUNCHLINE] Generated punchline: {punchline}")
    
    return {"punchline": punchline,
            "joke_state": {**state.get("joke_state", {}), "punchline": punchline}}


File: ./src/agent/agents/joke_approver_agent.py
------------------------------------
import logging
from langchain_core.messages import AIMessage, HumanMessage
from agent.state import MainState
from agent.tools import evaluate_response
from agent.configuration import default_config
from langgraph.graph import END

# Set up logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

# Initialize LLM using configuration
llm = default_config.llm

async def joke_approver(state: MainState) -> MainState:
    """Joke approver agent that evaluates user responses."""
    logger.debug(f"[JOKE_APPROVER] Received state: {state}")
    
    messages = state.get("messages", [])
    last_msg = messages[-1] if messages else None
    
    if isinstance(last_msg, HumanMessage):
        response = await llm.bind_tools([evaluate_response]).ainvoke(
            [
                {
                    "role": "system", 
                    "content": """Evaluate if the user wants to hear another joke.
                    Call the evaluate_response tool with their response to analyze it.
                    """
                },
                {
                    "role": "user",
                    "content": last_msg.content
                }
            ]
        )

        if response.tool_calls and response.tool_calls[0]["name"] == "evaluate_response":
            result = await evaluate_response(
                response.tool_calls[0]["args"]["response"],
                config={"configurable": {}}
            )
            
            if result.wants_another:
                return {
                    "messages": messages + [
                        AIMessage(content=f"I'll think of another one! ({result.reason})")
                    ],
                    "next_step": "style_chooser",
                    "joke": state.get("joke", {})
                }
            else:
                return {
                    "messages": messages + [
                        AIMessage(content=f"Thanks for listening to my jokes! ({result.reason})")
                    ],
                    "complete_joke": state["complete_joke"],
                    "accepted": True,
                    "next_step": END,
                    "joke": state.get("joke", {})
                }
    
    if not isinstance(last_msg, HumanMessage) and not "Do you want to hear another one?" in last_msg.content:
        return {
            "messages": messages + [
                AIMessage(content="Please let me know if you'd like to hear another joke!")
            ],
            "next_step": END,
            "joke": state.get("joke", {})
        }
    
    return {
        **state,
        "next_step": END,
        "joke": state.get("joke", {})
    }


File: ./src/agent/graphs/__init__.py
------------------------------------
from .joke_generation_graph import joke_generation_graph
from .main_graph import graph

__all__ = ["joke_generation_graph", "graph"]


File: ./src/agent/graphs/main_graph.py
------------------------------------
# main_graph.py
from langgraph.graph import StateGraph, END, START
from typing_extensions import TypedDict, Union
from typing import Literal
import logging
from agent.state import MainState
from agent.agents import style_chooser, joke_builder, joke_approver
from agent.graphs.joke_generation_graph import joke_generation_graph

# Set up logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

class OutputState(TypedDict):
    joke: str
    accepted: bool

def create_main_graph() -> StateGraph:
    """Create and return the main graph with a properly integrated subgraph."""
    # Define and build the main graph
    workflow = StateGraph(MainState, output=OutputState)

    # Add nodes
    workflow.add_node("style_chooser", style_chooser)
    
    # Create an async wrapper for joke_builder with injected subgraph
    async def joke_builder_with_subgraph(state: MainState):
        return await joke_builder(state, joke_generation_graph)
    
    workflow.add_node("joke_builder", joke_builder_with_subgraph)
    workflow.add_node("joke_approver", joke_approver)

    # Set up routing
    def route_next(state: MainState) -> Union[Literal["style_chooser", "joke_builder", "joke_approver"], END]:
        """Route to the next node based on state."""
        next_step = state.get("next_step", "style_chooser")
        logger.debug(f"[ROUTE] Routing to: {next_step}")
        return next_step

    workflow.add_edge(START, "style_chooser")
    workflow.add_conditional_edges(
        "style_chooser",
        route_next,
        {
            "joke_builder": "joke_builder",
            END: END
        }
    )

    workflow.add_conditional_edges(
        "joke_builder",
        route_next,
        {
            "joke_approver": "joke_approver",
            END: END
        }
    )

    workflow.add_conditional_edges(
        "joke_approver",
        route_next,
        {
            "style_chooser": "style_chooser",
            END: END
        }
    )

    return workflow.compile()

# Create a single instance of the main graph
graph = create_main_graph()


File: ./src/agent/graphs/joke_generation_graph.py
------------------------------------
from langgraph.graph import StateGraph, END, START
import logging
from agent.agents import make_setup, make_punchline
from agent.state import JokeState

# Set up logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

def create_joke_generation_graph() -> StateGraph:
    """Create and return the joke generation subgraph."""
    # Define and build the joke generation graph
    subgraph_builder = StateGraph(JokeState)
    
    # Add nodes
    subgraph_builder.add_node("make_setup", make_setup)
    subgraph_builder.add_node("make_punchline", make_punchline)

    # Add edges
    subgraph_builder.add_edge(START, "make_setup")
    subgraph_builder.add_edge("make_setup", "make_punchline")
    subgraph_builder.add_edge("make_punchline", END)

    # Compile and return the subgraph
    return subgraph_builder.compile()

# Create a single instance of the joke generation graph
joke_generation_graph = create_joke_generation_graph()


File: ./src/agent/state.py
------------------------------------
from __future__ import annotations

from dataclasses import dataclass
from typing import TypedDict, Optional, Dict, Any, List
from langchain_core.messages import BaseMessage


class MessagesState(TypedDict):
    """Base state type that includes messages."""
    messages: List[BaseMessage]


class MainState(MessagesState):
    """Defines the state for the main workflow."""
    plan: Optional[str]
    joke: Optional[Dict[str, Any]]
    complete_joke: Optional[str]
    accepted: Optional[bool]
    next_step: Optional[str]


class JokeState(TypedDict):
    """State definition for the joke generation subgraph."""
    setup: str
    punchline: str
    comedian_style: str  # This is our shared interface with the parent graph
    joke_state: dict    # This holds our internal state


@dataclass
class State:
    """Defines the input state for the agent."""
    changeme: str = "example"


__all__ = ["State", "MainState", "MessagesState", "JokeState"]


File: ./.github/workflows/unit-tests.yml
------------------------------------
# This workflow will run unit tests for the current project

name: CI

on:
  push:
    branches: ["main"]
  pull_request:
  workflow_dispatch: # Allows triggering the workflow manually in GitHub UI

# If another push to the same PR or branch happens while this workflow is still running,
# cancel the earlier run in favor of the next run.
concurrency:
  group: ${{ github.workflow }}-${{ github.ref }}
  cancel-in-progress: true

jobs:
  unit-tests:
    name: Unit Tests
    strategy:
      matrix:
        os: [ubuntu-latest]
        python-version: ["3.11", "3.12"]
    runs-on: ${{ matrix.os }}
    steps:
      - uses: actions/checkout@v4
      - name: Set up Python ${{ matrix.python-version }}
        uses: actions/setup-python@v4
        with:
          python-version: ${{ matrix.python-version }}
      - name: Install dependencies
        run: |
          curl -LsSf https://astral.sh/uv/install.sh | sh
          uv venv
          uv pip install -r pyproject.toml
      - name: Lint with ruff
        run: |
          uv pip install ruff
          uv run ruff check .
      - name: Lint with mypy
        run: |
          uv pip install mypy
          uv run mypy --strict src/
      - name: Check README spelling
        uses: codespell-project/actions-codespell@v2
        with:
          ignore_words_file: .codespellignore
          path: README.md
      - name: Check code spelling
        uses: codespell-project/actions-codespell@v2
        with:
          ignore_words_file: .codespellignore
          path: src/
      - name: Run tests with pytest
        run: |
          uv pip install pytest
          uv run pytest tests/unit_tests


File: ./.github/workflows/integration-tests.yml
------------------------------------
# This workflow will run integration tests for the current project once per day

name: Integration Tests

on:
  schedule:
    - cron: "37 14 * * *" # Run at 7:37 AM Pacific Time (14:37 UTC) every day
  workflow_dispatch: # Allows triggering the workflow manually in GitHub UI

# If another scheduled run starts while this workflow is still running,
# cancel the earlier run in favor of the next run.
concurrency:
  group: ${{ github.workflow }}-${{ github.ref }}
  cancel-in-progress: true

jobs:
  integration-tests:
    name: Integration Tests
    strategy:
      matrix:
        os: [ubuntu-latest]
        python-version: ["3.11", "3.12"]
    runs-on: ${{ matrix.os }}
    steps:
      - uses: actions/checkout@v4
      - name: Set up Python ${{ matrix.python-version }}
        uses: actions/setup-python@v4
        with:
          python-version: ${{ matrix.python-version }}
      - name: Install dependencies
        run: |
          curl -LsSf https://astral.sh/uv/install.sh | sh
          uv venv
          uv pip install -r pyproject.toml
          uv pip install -U pytest-asyncio
      - name: Run integration tests
        env:
          ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
          LANGSMITH_API_KEY: ${{ secrets.LANGSMITH_API_KEY }}
          LANGSMITH_TRACING: true
        run: |
          uv run pytest tests/integration_tests


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant