Tutorial 04: Human-in-the-Loop
This tutorial teaches how to pause agent execution for human review and approval before taking sensitive actions.
What You'll Learn
- Interrupts: Pausing graph execution at specific points
- interrupt_before: Static breakpoints defined at compile time
- interrupt(): Dynamic breakpoints at runtime
- Command: Resuming execution with human input
- Approval workflows: Review and approve agent actions
Prerequisites
- Completed Tutorial 03: Memory & Persistence
- Understanding of checkpointers (required for interrupts)
Why Human-in-the-Loop?
Agents are powerful but need oversight. Before an agent:
- Sends an email or message
- Makes a purchase or payment
- Deletes or modifies data
- Calls external APIs with side effects
You want a human to review and approve the action.
Common Patterns
According to the LangGraph documentation, there are four typical patterns:
- Approve/Reject: Pause before a critical step, review, and approve or reject
- Edit State: Pause to review and modify the graph state
- Review Tool Calls: Inspect and edit tool calls before execution
- Provide Input: Ask the human for additional information
Core Concepts
1. Interrupts and Checkpointers
Interrupts use LangGraph's persistence layer. When you call an interrupt:
- Graph execution pauses
- Current state is saved to the checkpointer
- The thread is marked as "interrupted"
- You can inspect the state and decide what to do
- Resume with
invoke(None, config)orinvoke(Command(resume=value), config)
Important: Interrupts require a checkpointer. Without one, there's no way to save and resume state.
2. interrupt_before and interrupt_after
These are the simplest ways to add interrupts - specify them at compile time:
graph = workflow.compile(
checkpointer=memory,
interrupt_before=["tools"], # Pause BEFORE tools node
interrupt_after=["agent"], # Pause AFTER agent node
)3. Checking Interrupt Status
After invoking a graph, check if it's paused:
state = graph.get_state(config)
if state.next: # If there's a next node, we're paused
print(f"Paused before: {state.next}")
else:
print("Execution complete")4. Resuming Execution
To continue after approval:
# Simple resume (continue as-is)
result = graph.invoke(None, config=config)
# Resume with input (using Command)
from langgraph.types import Command
result = graph.invoke(Command(resume="approved"), config=config)Building an Approval Workflow
Step 1: Define Sensitive Tools
from langchain_core.tools import tool
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""Send an email (sensitive action!)."""
return f"Email sent to {to}"
@tool
def get_weather(location: str) -> str:
"""Get weather (safe action)."""
return f"Weather in {location}: Sunny, 72°F"
# Mark which tools are sensitive
SENSITIVE_TOOLS = {"send_email"}Step 2: Build the Graph
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
workflow = StateGraph(State)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue, {"tools": "tools", "end": END})
workflow.add_edge("tools", "agent")
# Compile with interrupt
memory = MemorySaver()
graph = workflow.compile(
checkpointer=memory,
interrupt_before=["tools"] # Pause before ANY tool execution
)Step 3: Run with Approval
config = {"configurable": {"thread_id": "approval-1"}}
# Start execution
result = graph.invoke(
{"messages": [("user", "Send email to alice@example.com")]},
config=config
)
# Check if paused
state = graph.get_state(config)
if state.next:
# Show pending action
last_msg = state.values["messages"][-1]
for tc in last_msg.tool_calls:
print(f"Pending: {tc['name']}({tc['args']})")
# Get human approval
if input("Approve? (y/n): ").lower() == 'y':
result = graph.invoke(None, config=config)
else:
print("Rejected!")Selective Interrupts
Not all tools need approval. Route sensitive tools to a node with interrupts:
def should_continue(state: State) -> str:
last_msg = state["messages"][-1]
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
# Check if any tool is sensitive
for tc in last_msg.tool_calls:
if tc["name"] in SENSITIVE_TOOLS:
return "sensitive_tools"
return "safe_tools"
return "end"
# Build graph with separate paths
workflow.add_node("sensitive_tools", tool_node)
workflow.add_node("safe_tools", tool_node)
workflow.add_conditional_edges(
"agent", should_continue,
{"sensitive_tools": "sensitive_tools", "safe_tools": "safe_tools", "end": END}
)
# Only interrupt for sensitive tools
graph = workflow.compile(
checkpointer=memory,
interrupt_before=["sensitive_tools"] # NOT safe_tools
)Complete Code
import json
from typing import Annotated
from typing_extensions import TypedDict
from langchain_core.tools import tool
from langchain_core.messages import ToolMessage
from langchain_ollama import ChatOllama
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langgraph_ollama_local import LocalAgentConfig
# === Tools ===
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""Send an email."""
return f"Email sent to {to} with subject: {subject}"
tools = [send_email]
tools_by_name = {t.name: t for t in tools}
# === LLM ===
config = LocalAgentConfig()
llm = ChatOllama(
model=config.ollama.model,
base_url=config.ollama.base_url,
temperature=0,
).bind_tools(tools)
# === State ===
class State(TypedDict):
messages: Annotated[list, add_messages]
# === Nodes ===
def agent_node(state: State) -> dict:
return {"messages": [llm.invoke(state["messages"])]}
def tool_node(state: State) -> dict:
outputs = []
for tc in state["messages"][-1].tool_calls:
result = tools_by_name[tc["name"]].invoke(tc["args"])
outputs.append(ToolMessage(
content=json.dumps(result),
name=tc["name"],
tool_call_id=tc["id"],
))
return {"messages": outputs}
def should_continue(state: State) -> str:
last = state["messages"][-1]
if hasattr(last, "tool_calls") and last.tool_calls:
return "tools"
return "end"
# === Graph ===
workflow = StateGraph(State)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue, {"tools": "tools", "end": END})
workflow.add_edge("tools", "agent")
# === Compile with Interrupt ===
memory = MemorySaver()
graph = workflow.compile(
checkpointer=memory,
interrupt_before=["tools"]
)
# === Approval Helper ===
def run_with_approval(user_input: str, thread_id: str):
config = {"configurable": {"thread_id": thread_id}}
result = graph.invoke({"messages": [("user", user_input)]}, config=config)
while True:
state = graph.get_state(config)
if not state.next:
break
# Show pending action
last_msg = state.values["messages"][-1]
print(f"Pending: {last_msg.tool_calls}")
# In production: get approval from UI
result = graph.invoke(None, config=config) # Auto-approve for demo
return result["messages"][-1].content
# === Use ===
response = run_with_approval("Send email to test@example.com", "demo-1")
print(response)Advanced Patterns
1. Dynamic Interrupts with interrupt()
Instead of compile-time interrupt_before, use runtime interrupt():
from langgraph.types import interrupt
def tool_node(state: State) -> dict:
outputs = []
for tc in state["messages"][-1].tool_calls:
# Dynamic interrupt for sensitive tools only
if tc["name"] in SENSITIVE_TOOLS:
approval = interrupt({
"action": tc["name"],
"args": tc["args"],
"message": f"Approve {tc['name']} with args {tc['args']}?"
})
if approval.get("approved") != True:
outputs.append(ToolMessage(
content="Action rejected by user",
name=tc["name"],
tool_call_id=tc["id"],
))
continue
result = tools_by_name[tc["name"]].invoke(tc["args"])
outputs.append(ToolMessage(
content=json.dumps(result),
name=tc["name"],
tool_call_id=tc["id"],
))
return {"messages": outputs}2. Editing State Before Resuming
Modify tool calls before execution:
from langgraph.types import Command
# Get paused state
state = graph.get_state(config)
last_msg = state.values["messages"][-1]
# Review tool calls
for tc in last_msg.tool_calls:
print(f"Tool: {tc['name']}, Args: {tc['args']}")
# Modify if needed
edited_args = {"to": "safe@example.com", "subject": "Modified", "body": "Safe content"}
last_msg.tool_calls[0]["args"] = edited_args
# Update state with modifications
graph.update_state(config, {"messages": [last_msg]})
# Resume with edited state
result = graph.invoke(None, config=config)Common Pitfalls
1. No Checkpointer
# WRONG - interrupts without checkpointer
graph = workflow.compile(interrupt_before=["tools"])
# RuntimeError: Checkpointer required for interrupts
# CORRECT - always include checkpointer
memory = MemorySaver()
graph = workflow.compile(
checkpointer=memory,
interrupt_before=["tools"]
)2. Resuming Without Interrupt
# WRONG - trying to resume when not paused
result = graph.invoke({"messages": [("user", "Hi")]}, config=config)
result = graph.invoke(None, config=config) # Nothing to resume!
# CORRECT - check if paused first
state = graph.get_state(config)
if state.next:
result = graph.invoke(None, config=config)3. Wrong Config After Resume
# WRONG - different thread_id loses state
result = graph.invoke(input, config={"configurable": {"thread_id": "a"}})
result = graph.invoke(None, config={"configurable": {"thread_id": "b"}})
# Error: No state to resume
# CORRECT - same thread_id
config = {"configurable": {"thread_id": "consistent-id"}}
result = graph.invoke(input, config=config)
# ... later ...
result = graph.invoke(None, config=config) # Same configQuiz
Test your understanding of human-in-the-loop patterns:
Knowledge Check
What two things are required to use interrupts in LangGraph?
Knowledge Check
How do you resume execution after an interrupt?
Knowledge Check
What does state.next indicate when inspecting a paused graph?
Knowledge Check T/F
Interrupts can work without a checkpointer if you handle state manually.
Knowledge Check Fill In
What compile-time parameter pauses execution BEFORE a specific node runs?
What's Next?
Tutorial 05: Reflection - Learn how to build self-critiquing agents that iteratively improve their outputs.