Guides

LangGraph

Add human review nodes to LangGraph workflows.

LangGraph lets you build stateful, multi-step agent workflows as graphs. This guide shows how to add human review nodes using the Datashift SDK.

Developer-Enforced Review

This guide shows how to add review nodes in your graph. You decide which paths require human approval. For agent-initiated review where the agent decides when to request human input, see the MCP integration guide.

Prerequisites

  • A Datashift account with an API key
  • An OpenAI API key (or other LLM provider)
  • Python 3.9+

Installation

pip install langgraph langchain-openai datashift

Set your environment variables:

bash
export OPENAI_API_KEY=sk-...
export DATASHIFT_API_KEY=ds_xxxxxxxxxxxxxxxx

Full Example

Here's a workflow that generates an email, submits it for review, and routes based on the decision:

python
import os
import time
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from datashift import Datashift

# Initialize clients
llm = ChatOpenAI(model="gpt-4")
datashift = Datashift(api_key=os.environ["DATASHIFT_API_KEY"])

# Define state
class AgentState(TypedDict):
    messages: list
    email_draft: dict | None
    review_result: str | None

# Node: Generate email draft
def generate_email(state: AgentState) -> AgentState:
    messages = state["messages"]
    response = llm.invoke(messages + [{
        "role": "system",
        "content": "Generate an email draft as JSON with 'to', 'subject', 'body' fields."
    }])

    # Parse the email draft from response
    email_draft = parse_email_from_response(response.content)

    return {**state, "email_draft": email_draft}

# Node: Submit for human review
def submit_for_review(state: AgentState) -> AgentState:
    email = state["email_draft"]

    task = datashift.submit_task(
        queue_key="outbound-communications",
        data={
            "to": email["to"],
            "subject": email["subject"],
            "body": email["body"],
        },
        summary=f"Review email to {email['to']}: {email['subject']}",
    )

    # Wait for human decision
    result = datashift.get_task(task.id)
    while result.state in ["queued", "in_review"]:
        time.sleep(5)
        result = datashift.get_task(task.id)

    review_result = "approved" if "approved" in result.reviews[0].result else "rejected"

    return {**state, "review_result": review_result}

# Node: Send the email
def send_email(state: AgentState) -> AgentState:
    email = state["email_draft"]
    # Actually send the email
    # send_actual_email(email["to"], email["subject"], email["body"])
    return {
        **state,
        "messages": state["messages"] + [{
            "role": "assistant",
            "content": f"Email sent to {email['to']}"
        }]
    }

# Node: Handle rejection
def handle_rejection(state: AgentState) -> AgentState:
    return {
        **state,
        "messages": state["messages"] + [{
            "role": "assistant",
            "content": "The email was rejected by a reviewer. Would you like me to revise it?"
        }]
    }

# Conditional edge: route based on review result
def route_after_review(state: AgentState) -> str:
    if state["review_result"] == "approved":
        return "send_email"
    else:
        return "handle_rejection"

# Build the graph
workflow = StateGraph(AgentState)

workflow.add_node("generate_email", generate_email)
workflow.add_node("submit_for_review", submit_for_review)
workflow.add_node("send_email", send_email)
workflow.add_node("handle_rejection", handle_rejection)

workflow.set_entry_point("generate_email")
workflow.add_edge("generate_email", "submit_for_review")
workflow.add_conditional_edges("submit_for_review", route_after_review)
workflow.add_edge("send_email", END)
workflow.add_edge("handle_rejection", END)

# Compile and run
app = workflow.compile()

result = app.invoke({
    "messages": [{
        "role": "user",
        "content": "Send a thank you email to john@example.com for their purchase"
    }],
    "email_draft": None,
    "review_result": None,
})
print(result["messages"][-1]["content"])

How It Works

1

Generate content

The first node generates the email draft using the LLM.

2

Submit for review

The review node submits the draft to Datashift and waits for a decision.

3

Conditional routing

A conditional edge routes to either “send” or “handle rejection” based on the review result.

4

Execute or revise

The workflow either executes the action or handles the rejection gracefully.

Async with Checkpoints

For long-running reviews, use LangGraph's checkpointing to pause and resume:

python
from langgraph.checkpoint.memory import MemorySaver

# Create a checkpoint saver for persistence
checkpointer = MemorySaver()

# Node that submits for review and returns immediately
def submit_for_review_async(state: AgentState) -> AgentState:
    email = state["email_draft"]

    task = datashift.submit_task(
        queue_key="outbound-communications",
        data=email,
        summary=f"Review email to {email['to']}",
    )

    # Store task ID in state and interrupt
    return {**state, "pending_task_id": task.id}

# Later, resume with the result
def resume_with_result(thread_id: str, task_result: dict):
    """Called by webhook when review completes."""
    config = {"configurable": {"thread_id": thread_id}}

    # Get current state
    state = app.get_state(config)

    # Update with review result
    review_result = "approved" if "approved" in task_result["result"] else "rejected"
    app.update_state(config, {"review_result": review_result})

    # Resume execution
    for event in app.stream(None, config):
        print(event)

Use webhooks to trigger the resume when the review completes.

Reusable Review Subgraph

Create a reusable review subgraph for different queues:

python
# Define a reusable review subgraph
def create_review_subgraph(queue_key: str):
    review_graph = StateGraph(ReviewState)

    def submit(state):
        task = datashift.submit_task(
            queue_key=queue_key,
            data=state["action_data"],
            summary=state["summary"],
        )
        result = datashift.get_task(task.id)
        while result.state in ["queued", "in_review"]:
            time.sleep(5)
            result = datashift.get_task(task.id)
        return {"approved": "approved" in result.reviews[0].result}

    review_graph.add_node("submit", submit)
    review_graph.set_entry_point("submit")
    review_graph.add_edge("submit", END)

    return review_graph.compile()

# Use in your main graph
email_review = create_review_subgraph("outbound-communications")
payment_review = create_review_subgraph("payment-approvals")

Best Practices

Use conditional edges

Route the workflow based on review outcomes - approved, rejected, or needs revision.

Store context in state

Keep relevant context in the graph state so reviewers have full information.

Use checkpointing for production

Don't block on review - checkpoint the state and resume via webhook.

Create reusable subgraphs

Encapsulate review logic in subgraphs for different queues and action types.