Skip to content

Commit

Permalink
fix(langgraph): Add test for subgraph interrupts & resume with Command (
Browse files Browse the repository at this point in the history
#698)

Co-authored-by: Nuno Campos <[email protected]>
  • Loading branch information
bracesproul and nfcampos authored Dec 3, 2024
1 parent f2e6ebc commit 4e57fca
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 2 deletions.
8 changes: 6 additions & 2 deletions libs/langgraph/src/pregel/algo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,9 @@ export function _prepareSingleTask<
...configurable[CONFIG_KEY_CHECKPOINT_MAP],
[parentNamespace]: checkpoint.id,
},
[CONFIG_KEY_RESUME_VALUE]: resume ? resume[2] : MISSING,
[CONFIG_KEY_RESUME_VALUE]: resume
? resume[2]
: configurable[CONFIG_KEY_RESUME_VALUE] ?? MISSING,
checkpoint_id: undefined,
checkpoint_ns: taskCheckpointNamespace,
},
Expand Down Expand Up @@ -744,7 +746,9 @@ export function _prepareSingleTask<
...configurable[CONFIG_KEY_CHECKPOINT_MAP],
[parentNamespace]: checkpoint.id,
},
[CONFIG_KEY_RESUME_VALUE]: resume ? resume[2] : MISSING,
[CONFIG_KEY_RESUME_VALUE]: resume
? resume[2]
: configurable[CONFIG_KEY_RESUME_VALUE] ?? MISSING,
checkpoint_id: undefined,
checkpoint_ns: taskCheckpointNamespace,
},
Expand Down
66 changes: 66 additions & 0 deletions libs/langgraph/src/tests/pregel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8808,6 +8808,72 @@ export function runPregelTests(
GraphRecursionError
);
});

it("should interrupt and resume with Command inside a subgraph", async () => {
const subgraph = new StateGraph(MessagesAnnotation)
.addNode("one", (_) => {
const interruptValue = interrupt("<INTERRUPTED>");
if (interruptValue !== "<RESUMED>") {
throw new Error("Expected interrupt to return <RESUMED>");
}
return {
messages: [
{
role: "user",
content: "success",
},
],
};
})
.addEdge(START, "one")
.compile();

const graph = new StateGraph(MessagesAnnotation)
.addNode("one", () => {
// No-op
return {};
})
.addNode("subgraph", subgraph)
.addNode("two", (state) => {
if (state.messages.length !== 1) {
throw new Error(`Expected 1 message, got ${state.messages.length}`);
}
return {};
})
.addEdge(START, "one")
.addEdge("one", "subgraph")
.addEdge("subgraph", "two")
.addEdge("two", END)
.compile({ checkpointer: await createCheckpointer() });

const config = {
configurable: { thread_id: "test_subgraph_interrupt_resume" },
};

await graph.invoke(
{
messages: [],
},
config
);

const currTasks = (await graph.getState(config)).tasks;
expect(currTasks[0].interrupts).toHaveLength(1);

// Resume with `Command`
const result = await graph.invoke(
new Command({
resume: "<RESUMED>",
}),
config
);

const currTasksAfterCmd = (await graph.getState(config)).tasks;
expect(currTasksAfterCmd).toHaveLength(0);

expect(result.messages).toBeDefined();
expect(result.messages).toHaveLength(1);
});
}

runPregelTests(() => new MemorySaverAssertImmutable());

0 comments on commit 4e57fca

Please sign in to comment.