Skip to main content

LangGraph Integration

Integrate Watchlight authorization with LangGraph workflows.

Installation

pip install wl-apdp[langgraph]

Quick Start

from langgraph.graph import StateGraph, END
from wl_apdp.langgraph import (
AuthorizedNode,
WatchlightCheckpoint,
authorization_gate
)

# Define state
class AgentState(TypedDict):
messages: list
current_user: str
authorization_context: dict

# Create authorized node
@AuthorizedNode(
principal_key="current_user",
action="execute",
resource="Tool::\"web-search\""
)
async def search_node(state: AgentState):
# This only runs if authorized
result = await web_search(state["messages"][-1])
return {"messages": state["messages"] + [result]}

# Build graph
graph = StateGraph(AgentState)
graph.add_node("search", search_node)
graph.set_entry_point("search")
graph.add_edge("search", END)

# Compile with Watchlight checkpoint
app = graph.compile(
checkpointer=WatchlightCheckpoint(
watchlight_url="http://localhost:8081"
)
)

# Run
result = app.invoke({
"messages": ["Search for AI news"],
"current_user": "User::\"analyst\"",
"authorization_context": {
"intent": "search_news",
"goal": "research"
}
})

AuthorizedNode

Decorator for nodes requiring authorization:

from wl_apdp.langgraph import AuthorizedNode

@AuthorizedNode(
# Extract principal from state
principal_key="agent_id",

# Or use a callable
principal_fn=lambda state: f"Agent::\"{state['agent_name']}\"",

# Action to authorize
action="execute",

# Resource to access
resource="Tool::\"database-query\"",

# Or dynamic resource
resource_fn=lambda state: f"Database::\"{state['target_db']}\"",

# Include context from state
context_keys=["intent", "goal", "delegation_chain"],

# Behavior on denial
on_denied="skip", # "skip", "raise", "redirect"
redirect_to="fallback_node"
)
async def query_node(state):
return await execute_query(state)

Authorization Gates

Add authorization checks at graph edges:

from wl_apdp.langgraph import authorization_gate

# Conditional edge based on authorization
def should_access_sensitive(state):
return authorization_gate(
state,
principal=state["user_id"],
action="access",
resource="SensitiveData::\"pii\""
)

graph.add_conditional_edges(
"router",
should_access_sensitive,
{
True: "sensitive_handler",
False: "public_handler"
}
)

WatchlightCheckpoint

Track authorization across graph execution:

from wl_apdp.langgraph import WatchlightCheckpoint

checkpointer = WatchlightCheckpoint(
watchlight_url="http://localhost:8081",

# Track all authorization decisions
track_decisions=True,

# Persist delegation chains
persist_delegations=True,

# Log to Watchlight for audit
audit_logging=True
)

app = graph.compile(checkpointer=checkpointer)

Retrieving Authorization History

# Get authorization decisions for a thread
history = checkpointer.get_authorization_history(thread_id)
for decision in history:
print(f"{decision.timestamp}: {decision.action} on {decision.resource} -> {decision.result}")

Delegation in Workflows

Automatic Delegation Chains

from wl_apdp.langgraph import DelegationAwareState

class WorkflowState(DelegationAwareState):
messages: list
current_agent: str

# Delegation chain is automatically maintained
@AuthorizedNode(
principal_key="current_agent",
use_delegation_chain=True # Include chain from state
)
async def worker_node(state):
# Authorization includes full delegation chain
return await do_work(state)

Manual Delegation

from wl_apdp.langgraph import add_delegation

async def coordinator_node(state):
# Coordinator delegates to worker
new_state = add_delegation(
state,
from_principal=state["coordinator_id"],
to_principal=state["worker_id"],
scope=["read", "execute"]
)
return new_state

Tool Authorization

Authorized Tool Nodes

from langchain_core.tools import tool
from wl_apdp.langgraph import authorized_tool_node

@tool
def search_web(query: str) -> str:
"""Search the web."""
return do_search(query)

# Create node with authorization
search_node = authorized_tool_node(
tools=[search_web],
tool_resources={
"search_web": "Tool::\"web-search\""
},
principal_key="agent_id"
)

graph.add_node("tools", search_node)

Dynamic Tool Authorization

from wl_apdp.langgraph import AuthorizedToolExecutor

executor = AuthorizedToolExecutor(
watchlight_url="http://localhost:8081",
# Map tool names to resources
tool_mapping={
"search": "Tool::\"web-search\"",
"calculator": "Tool::\"calculator\"",
"database": "Tool::\"database-query\""
},
# Risk levels
risk_levels={
"search": 1,
"calculator": 1,
"database": 3
}
)

async def tool_node(state):
tool_call = state["pending_tool_call"]
result = await executor.execute(
tool_name=tool_call.name,
args=tool_call.args,
principal=state["agent_id"],
context=state["authorization_context"]
)
return {"tool_results": [result]}

Subgraph Authorization

Authorized Subgraphs

from wl_apdp.langgraph import AuthorizedSubgraph

# Subgraph with its own authorization scope
@AuthorizedSubgraph(
principal_key="agent_id",
required_scope=["execute_subgraph"],
resource="Subgraph::\"data-analysis\""
)
def analysis_subgraph():
subgraph = StateGraph(AnalysisState)
# ... define subgraph nodes
return subgraph.compile()

# Add to main graph
main_graph.add_node("analysis", analysis_subgraph())

Human-in-the-Loop

from wl_apdp.langgraph import human_approval_required

@human_approval_required(
actions=["delete", "modify"],
resources=["*::\"production-*\""]
)
async def dangerous_operation(state):
# This waits for human approval
return await perform_operation(state)

# In graph
graph.add_node("dangerous_op", dangerous_operation)

Full Example

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage

from wl_apdp.langgraph import (
AuthorizedNode,
WatchlightCheckpoint,
authorization_gate,
DelegationAwareState
)

# State definition
class ResearchState(DelegationAwareState):
messages: list
user_id: str
agent_id: str
intent: str
goal: str
research_results: list
final_report: str

# Authorized nodes
@AuthorizedNode(
principal_key="agent_id",
action="execute",
resource="Tool::\"web-search\"",
context_keys=["intent", "goal"]
)
async def research_node(state: ResearchState):
llm = ChatOpenAI(model="gpt-4")
query = state["messages"][-1].content

# Perform authorized research
results = await search_web(query)

return {
"research_results": state["research_results"] + results,
"messages": state["messages"] + [AIMessage(content=f"Found {len(results)} results")]
}

@AuthorizedNode(
principal_key="agent_id",
action="execute",
resource="Tool::\"report-generator\"",
context_keys=["intent", "goal"]
)
async def report_node(state: ResearchState):
llm = ChatOpenAI(model="gpt-4")

# Generate report from research
report = await llm.ainvoke([
HumanMessage(content=f"Summarize: {state['research_results']}")
])

return {
"final_report": report.content,
"messages": state["messages"] + [report]
}

# Router based on authorization
def route_to_report(state):
# Check if user is authorized to generate reports
if authorization_gate(
state,
principal=state["user_id"],
action="generate",
resource="Report::\"research\""
):
return "report"
return "end_without_report"

# Build graph
workflow = StateGraph(ResearchState)

workflow.add_node("research", research_node)
workflow.add_node("report", report_node)
workflow.add_node("end_without_report", lambda s: s)

workflow.set_entry_point("research")
workflow.add_conditional_edges(
"research",
route_to_report,
{
"report": "report",
"end_without_report": "end_without_report"
}
)
workflow.add_edge("report", END)
workflow.add_edge("end_without_report", END)

# Compile with Watchlight
app = workflow.compile(
checkpointer=WatchlightCheckpoint(
watchlight_url="http://localhost:8081",
track_decisions=True,
audit_logging=True
)
)

# Run
async def main():
result = await app.ainvoke({
"messages": [HumanMessage(content="Research AI agent frameworks")],
"user_id": "User::\"researcher\"",
"agent_id": "Agent::\"research-bot\"",
"intent": "search_frameworks",
"goal": "research",
"research_results": [],
"final_report": "",
"delegation_chain": [
{
"from": "User::\"researcher\"",
"to": "Agent::\"research-bot\"",
"scope": ["read", "execute"]
}
]
})
print(result["final_report"])

if __name__ == "__main__":
import asyncio
asyncio.run(main())

Configuration

# langgraph_config.yaml
watchlight:
url: http://localhost:8081
api_key: ${WATCHLIGHT_API_KEY}

checkpoint:
track_decisions: true
persist_delegations: true
audit_logging: true

defaults:
on_denied: redirect
redirect_node: unauthorized_handler

Next Steps