LangGraph
Add human review nodes to LangGraph workflows.
LangGraph lets you build stateful, multi-step agent workflows as graphs. This guide shows how to add human review nodes using the Datashift SDK.
Developer-Enforced Review
This guide shows how to add review nodes in your graph. You decide which paths require human approval. For agent-initiated review where the agent decides when to request human input, see the MCP integration guide.
Prerequisites
- •A Datashift account with an API key
- •An OpenAI API key (or other LLM provider)
- •Python 3.9+
Installation
pip install langgraph langchain-openai datashiftSet your environment variables:
export OPENAI_API_KEY=sk-...
export DATASHIFT_API_KEY=ds_xxxxxxxxxxxxxxxxFull Example
Here's a workflow that generates an email, submits it for review, and routes based on the decision:
import os
import time
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from datashift import Datashift
# Initialize clients
llm = ChatOpenAI(model="gpt-4")
datashift = Datashift(api_key=os.environ["DATASHIFT_API_KEY"])
# Define state
class AgentState(TypedDict):
messages: list
email_draft: dict | None
review_result: str | None
# Node: Generate email draft
def generate_email(state: AgentState) -> AgentState:
messages = state["messages"]
response = llm.invoke(messages + [{
"role": "system",
"content": "Generate an email draft as JSON with 'to', 'subject', 'body' fields."
}])
# Parse the email draft from response
email_draft = parse_email_from_response(response.content)
return {**state, "email_draft": email_draft}
# Node: Submit for human review
def submit_for_review(state: AgentState) -> AgentState:
email = state["email_draft"]
task = datashift.submit_task(
queue_key="outbound-communications",
data={
"to": email["to"],
"subject": email["subject"],
"body": email["body"],
},
summary=f"Review email to {email['to']}: {email['subject']}",
)
# Wait for human decision
result = datashift.get_task(task.id)
while result.state in ["queued", "in_review"]:
time.sleep(5)
result = datashift.get_task(task.id)
review_result = "approved" if "approved" in result.reviews[0].result else "rejected"
return {**state, "review_result": review_result}
# Node: Send the email
def send_email(state: AgentState) -> AgentState:
email = state["email_draft"]
# Actually send the email
# send_actual_email(email["to"], email["subject"], email["body"])
return {
**state,
"messages": state["messages"] + [{
"role": "assistant",
"content": f"Email sent to {email['to']}"
}]
}
# Node: Handle rejection
def handle_rejection(state: AgentState) -> AgentState:
return {
**state,
"messages": state["messages"] + [{
"role": "assistant",
"content": "The email was rejected by a reviewer. Would you like me to revise it?"
}]
}
# Conditional edge: route based on review result
def route_after_review(state: AgentState) -> str:
if state["review_result"] == "approved":
return "send_email"
else:
return "handle_rejection"
# Build the graph
workflow = StateGraph(AgentState)
workflow.add_node("generate_email", generate_email)
workflow.add_node("submit_for_review", submit_for_review)
workflow.add_node("send_email", send_email)
workflow.add_node("handle_rejection", handle_rejection)
workflow.set_entry_point("generate_email")
workflow.add_edge("generate_email", "submit_for_review")
workflow.add_conditional_edges("submit_for_review", route_after_review)
workflow.add_edge("send_email", END)
workflow.add_edge("handle_rejection", END)
# Compile and run
app = workflow.compile()
result = app.invoke({
"messages": [{
"role": "user",
"content": "Send a thank you email to john@example.com for their purchase"
}],
"email_draft": None,
"review_result": None,
})
print(result["messages"][-1]["content"])How It Works
Generate content
The first node generates the email draft using the LLM.
Submit for review
The review node submits the draft to Datashift and waits for a decision.
Conditional routing
A conditional edge routes to either “send” or “handle rejection” based on the review result.
Execute or revise
The workflow either executes the action or handles the rejection gracefully.
Async with Checkpoints
For long-running reviews, use LangGraph's checkpointing to pause and resume:
from langgraph.checkpoint.memory import MemorySaver
# Create a checkpoint saver for persistence
checkpointer = MemorySaver()
# Node that submits for review and returns immediately
def submit_for_review_async(state: AgentState) -> AgentState:
email = state["email_draft"]
task = datashift.submit_task(
queue_key="outbound-communications",
data=email,
summary=f"Review email to {email['to']}",
)
# Store task ID in state and interrupt
return {**state, "pending_task_id": task.id}
# Later, resume with the result
def resume_with_result(thread_id: str, task_result: dict):
"""Called by webhook when review completes."""
config = {"configurable": {"thread_id": thread_id}}
# Get current state
state = app.get_state(config)
# Update with review result
review_result = "approved" if "approved" in task_result["result"] else "rejected"
app.update_state(config, {"review_result": review_result})
# Resume execution
for event in app.stream(None, config):
print(event)Use webhooks to trigger the resume when the review completes.
Reusable Review Subgraph
Create a reusable review subgraph for different queues:
# Define a reusable review subgraph
def create_review_subgraph(queue_key: str):
review_graph = StateGraph(ReviewState)
def submit(state):
task = datashift.submit_task(
queue_key=queue_key,
data=state["action_data"],
summary=state["summary"],
)
result = datashift.get_task(task.id)
while result.state in ["queued", "in_review"]:
time.sleep(5)
result = datashift.get_task(task.id)
return {"approved": "approved" in result.reviews[0].result}
review_graph.add_node("submit", submit)
review_graph.set_entry_point("submit")
review_graph.add_edge("submit", END)
return review_graph.compile()
# Use in your main graph
email_review = create_review_subgraph("outbound-communications")
payment_review = create_review_subgraph("payment-approvals")Best Practices
Use conditional edges
Route the workflow based on review outcomes - approved, rejected, or needs revision.
Store context in state
Keep relevant context in the graph state so reviewers have full information.
Use checkpointing for production
Don't block on review - checkpoint the state and resume via webhook.
Create reusable subgraphs
Encapsulate review logic in subgraphs for different queues and action types.