LangGraph Integration
Integrate Watchlight authorization with LangGraph workflows.
Installation
pip install wl-apdp[langgraph]
Quick Start
from langgraph.graph import StateGraph, END
from wl_apdp.langgraph import (
AuthorizedNode,
WatchlightCheckpoint,
authorization_gate
)
# Define state
class AgentState(TypedDict):
messages: list
current_user: str
authorization_context: dict
# Create authorized node
@AuthorizedNode(
principal_key="current_user",
action="execute",
resource="Tool::\"web-search\""
)
async def search_node(state: AgentState):
# This only runs if authorized
result = await web_search(state["messages"][-1])
return {"messages": state["messages"] + [result]}
# Build graph
graph = StateGraph(AgentState)
graph.add_node("search", search_node)
graph.set_entry_point("search")
graph.add_edge("search", END)
# Compile with Watchlight checkpoint
app = graph.compile(
checkpointer=WatchlightCheckpoint(
watchlight_url="http://localhost:8081"
)
)
# Run
result = app.invoke({
"messages": ["Search for AI news"],
"current_user": "User::\"analyst\"",
"authorization_context": {
"intent": "search_news",
"goal": "research"
}
})
AuthorizedNode
Decorator for nodes requiring authorization:
from wl_apdp.langgraph import AuthorizedNode
@AuthorizedNode(
# Extract principal from state
principal_key="agent_id",
# Or use a callable
principal_fn=lambda state: f"Agent::\"{state['agent_name']}\"",
# Action to authorize
action="execute",
# Resource to access
resource="Tool::\"database-query\"",
# Or dynamic resource
resource_fn=lambda state: f"Database::\"{state['target_db']}\"",
# Include context from state
context_keys=["intent", "goal", "delegation_chain"],
# Behavior on denial
on_denied="skip", # "skip", "raise", "redirect"
redirect_to="fallback_node"
)
async def query_node(state):
return await execute_query(state)
Authorization Gates
Add authorization checks at graph edges:
from wl_apdp.langgraph import authorization_gate
# Conditional edge based on authorization
def should_access_sensitive(state):
return authorization_gate(
state,
principal=state["user_id"],
action="access",
resource="SensitiveData::\"pii\""
)
graph.add_conditional_edges(
"router",
should_access_sensitive,
{
True: "sensitive_handler",
False: "public_handler"
}
)
WatchlightCheckpoint
Track authorization across graph execution:
from wl_apdp.langgraph import WatchlightCheckpoint
checkpointer = WatchlightCheckpoint(
watchlight_url="http://localhost:8081",
# Track all authorization decisions
track_decisions=True,
# Persist delegation chains
persist_delegations=True,
# Log to Watchlight for audit
audit_logging=True
)
app = graph.compile(checkpointer=checkpointer)
Retrieving Authorization History
# Get authorization decisions for a thread
history = checkpointer.get_authorization_history(thread_id)
for decision in history:
print(f"{decision.timestamp}: {decision.action} on {decision.resource} -> {decision.result}")
Delegation in Workflows
Automatic Delegation Chains
from wl_apdp.langgraph import DelegationAwareState
class WorkflowState(DelegationAwareState):
messages: list
current_agent: str
# Delegation chain is automatically maintained
@AuthorizedNode(
principal_key="current_agent",
use_delegation_chain=True # Include chain from state
)
async def worker_node(state):
# Authorization includes full delegation chain
return await do_work(state)
Manual Delegation
from wl_apdp.langgraph import add_delegation
async def coordinator_node(state):
# Coordinator delegates to worker
new_state = add_delegation(
state,
from_principal=state["coordinator_id"],
to_principal=state["worker_id"],
scope=["read", "execute"]
)
return new_state
Tool Authorization
Authorized Tool Nodes
from langchain_core.tools import tool
from wl_apdp.langgraph import authorized_tool_node
@tool
def search_web(query: str) -> str:
"""Search the web."""
return do_search(query)
# Create node with authorization
search_node = authorized_tool_node(
tools=[search_web],
tool_resources={
"search_web": "Tool::\"web-search\""
},
principal_key="agent_id"
)
graph.add_node("tools", search_node)
Dynamic Tool Authorization
from wl_apdp.langgraph import AuthorizedToolExecutor
executor = AuthorizedToolExecutor(
watchlight_url="http://localhost:8081",
# Map tool names to resources
tool_mapping={
"search": "Tool::\"web-search\"",
"calculator": "Tool::\"calculator\"",
"database": "Tool::\"database-query\""
},
# Risk levels
risk_levels={
"search": 1,
"calculator": 1,
"database": 3
}
)
async def tool_node(state):
tool_call = state["pending_tool_call"]
result = await executor.execute(
tool_name=tool_call.name,
args=tool_call.args,
principal=state["agent_id"],
context=state["authorization_context"]
)
return {"tool_results": [result]}
Subgraph Authorization
Authorized Subgraphs
from wl_apdp.langgraph import AuthorizedSubgraph
# Subgraph with its own authorization scope
@AuthorizedSubgraph(
principal_key="agent_id",
required_scope=["execute_subgraph"],
resource="Subgraph::\"data-analysis\""
)
def analysis_subgraph():
subgraph = StateGraph(AnalysisState)
# ... define subgraph nodes
return subgraph.compile()
# Add to main graph
main_graph.add_node("analysis", analysis_subgraph())
Human-in-the-Loop
from wl_apdp.langgraph import human_approval_required
@human_approval_required(
actions=["delete", "modify"],
resources=["*::\"production-*\""]
)
async def dangerous_operation(state):
# This waits for human approval
return await perform_operation(state)
# In graph
graph.add_node("dangerous_op", dangerous_operation)
Full Example
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from wl_apdp.langgraph import (
AuthorizedNode,
WatchlightCheckpoint,
authorization_gate,
DelegationAwareState
)
# State definition
class ResearchState(DelegationAwareState):
messages: list
user_id: str
agent_id: str
intent: str
goal: str
research_results: list
final_report: str
# Authorized nodes
@AuthorizedNode(
principal_key="agent_id",
action="execute",
resource="Tool::\"web-search\"",
context_keys=["intent", "goal"]
)
async def research_node(state: ResearchState):
llm = ChatOpenAI(model="gpt-4")
query = state["messages"][-1].content
# Perform authorized research
results = await search_web(query)
return {
"research_results": state["research_results"] + results,
"messages": state["messages"] + [AIMessage(content=f"Found {len(results)} results")]
}
@AuthorizedNode(
principal_key="agent_id",
action="execute",
resource="Tool::\"report-generator\"",
context_keys=["intent", "goal"]
)
async def report_node(state: ResearchState):
llm = ChatOpenAI(model="gpt-4")
# Generate report from research
report = await llm.ainvoke([
HumanMessage(content=f"Summarize: {state['research_results']}")
])
return {
"final_report": report.content,
"messages": state["messages"] + [report]
}
# Router based on authorization
def route_to_report(state):
# Check if user is authorized to generate reports
if authorization_gate(
state,
principal=state["user_id"],
action="generate",
resource="Report::\"research\""
):
return "report"
return "end_without_report"
# Build graph
workflow = StateGraph(ResearchState)
workflow.add_node("research", research_node)
workflow.add_node("report", report_node)
workflow.add_node("end_without_report", lambda s: s)
workflow.set_entry_point("research")
workflow.add_conditional_edges(
"research",
route_to_report,
{
"report": "report",
"end_without_report": "end_without_report"
}
)
workflow.add_edge("report", END)
workflow.add_edge("end_without_report", END)
# Compile with Watchlight
app = workflow.compile(
checkpointer=WatchlightCheckpoint(
watchlight_url="http://localhost:8081",
track_decisions=True,
audit_logging=True
)
)
# Run
async def main():
result = await app.ainvoke({
"messages": [HumanMessage(content="Research AI agent frameworks")],
"user_id": "User::\"researcher\"",
"agent_id": "Agent::\"research-bot\"",
"intent": "search_frameworks",
"goal": "research",
"research_results": [],
"final_report": "",
"delegation_chain": [
{
"from": "User::\"researcher\"",
"to": "Agent::\"research-bot\"",
"scope": ["read", "execute"]
}
]
})
print(result["final_report"])
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Configuration
# langgraph_config.yaml
watchlight:
url: http://localhost:8081
api_key: ${WATCHLIGHT_API_KEY}
checkpoint:
track_decisions: true
persist_delegations: true
audit_logging: true
defaults:
on_denied: redirect
redirect_node: unauthorized_handler
Next Steps
- AutoGen Integration - Use with Microsoft AutoGen
- Authorization Concepts - Understand the model
- Delegation Chains - Advanced delegation