← 返回投肯智能知识库首页
首页 / 技术教程 / 高级架构

LangGraph多Agent系统开发实战:从架构设计到代码实现的完整指南

📖 阅读时长:70分钟更新:2026-05-28

一、为什么需要多Agent系统

1.1 单Agent的局限性

一个通用的AI助手(如GPT-4、Claude)可以回答各种问题,但它的能力是"大而全"而非"专而深"。当业务场景需要专业化能力时,单Agent的局限性就显现出来:

1.2 多Agent协同的核心思想

多Agent系统的核心理念是专业分工 + 有序协作:让每个Agent专注于自己擅长的领域,通过标准化的消息传递协议实现协作。

# 单Agent vs 多Agent 对比

# 单Agent(ChatGPT风格):
# 用户: "帮我分析销售数据,然后写成报告"
# AI: [理解任务] → [分析数据] → [写报告] → 返回结果
# 问题:一个模型要同时做数据分析和文章写作,两项都不够专业

# 多Agent(分工协作):
# 用户: "帮我分析销售数据,然后写成报告"
# 
#  orchestrator(调度员)
#       ↓ 分解任务
#   ┌──────────┴──────────┐
#   ↓                     ↓
# data_analyst        report_writer
# (数据分析专家)      (文档写作专家)
#   ↓                     ↓
# 数据分析报告          初稿文本
#       ↓                     ↓
#       └──────────┬──────────┘
#                   ↓
#            orchestrator(汇总)
#                   ↓
#              最终报告
# 
# 关键:每个Agent只做自己擅长的事,通过结构化消息传递协作

二、LangGraph核心概念详解

2.1 什么是LangGraph

LangGraph是LangChain团队推出的用于构建有状态的多Agent系统的框架。它的核心思想是将Agent工作流建模为一个有向图(Directed Graph)

2.2 状态(State)设计

状态是LangGraph中最核心的概念。它定义了在整个工作流中需要传递什么信息。

from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
import operator

# ============ 定义Agent协作的状态结构 ============
class AgentState(TypedDict):
    """
    多Agent系统的共享状态
    每个Agent都可以读取和修改这个状态
    """
    
    # 消息历史(用于上下文传递)
    # 使用 operator.iconcat 可以将新消息追加到列表,而不是覆盖
    messages: Annotated[Sequence[BaseMessage], operator.iconcat]
    
    # 用户原始输入
    user_input: str
    
    # 当前任务状态
    current_task: str
    
    # 任务分解结果
    subtasks: list[str]
    
    # 各Agent的执行结果
    analysis_result: dict | None      # data_analyst的结果
    report_draft: str | None          # report_writer的结果
    review_result: dict | None        # reviewer的结果
    
    # 工作流控制
    next_agent: str                   # 下一步应该执行哪个Agent
    iteration_count: int              # 迭代次数(用于防止无限循环)
    error_messages: list[str]          # 错误记录
    
    # 最终输出
    final_output: str | None


# AgentState字段说明:
# messages: 保留完整的对话历史,每个Agent的输出都会追加到messages
# subtasks: 由orchestrator分解出的子任务列表
# analysis_result: data_analyst输出的结构化数据
# report_draft: report_writer生成的报告文本
# review_result: reviewer的审核意见
# next_agent: 决定下一步执行哪个Agent(支持条件路由)
# iteration_count: 防止Agent之间无限循环,超过阈值强制终止
# error_messages: 记录执行过程中的错误,便于事后排查

2.3 节点(Node)与边(Edge)的概念

# ============ 节点的定义方式 ============
# 节点就是一个Python函数,接收当前状态,返回更新后的状态

def data_analyst_node(state: AgentState) -> AgentState:
    """
    数据分析Agent节点
    接收状态,执行业务逻辑,返回更新后的状态
    """
    # 读取状态中的信息
    user_input = state["user_input"]
    current_task = state["current_task"]
    
    # 执行业务逻辑(这里简化,实际会调用LLM或外部API)
    analysis = perform_data_analysis(user_input)
    
    # 更新状态
    new_state = state.copy()
    new_state["analysis_result"] = analysis
    new_state["messages"] = state["messages"] + [
        AIMessage(content=f"数据分析完成,结果:{analysis}")
    ]
    new_state["next_agent"] = "report_writer"  # 下一步去report_writer
    
    return new_state

# ============ 边的定义方式 ============
# 边定义了节点之间的流转关系

# 方式1:固定边(无条件跳转)
# node_a → node_b:node_a执行完后无条件执行node_b
graph.add_edge("node_a", "node_b")

# 方式2:条件边(有条件跳转)
# 根据状态中的某个字段决定下一步去哪
def route_based_on_task(state: AgentState) -> str:
    """根据任务类型决定下一步"""
    task = state.get("current_task", "")
    
    if "分析" in task or "数据" in task:
        return "data_analyst"
    elif "写作" in task or "报告" in task:
        return "report_writer"
    elif "审核" in task or "检查" in task:
        return "reviewer"
    else:
        return "orchestrator"  # 默认回调度员

# 条件边的语法:from node + 条件函数 + to 多个候选节点
graph.add_conditional_edges(
    "orchestrator",
    route_based_on_task,
    {
        "data_analyst": "data_analyst",
        "report_writer": "report_writer",
        "reviewer": "reviewer",
        "END": "__end__"  # 结束工作流
    }
)

三、完整代码实现:销售数据分析报告生成系统

3.1 系统架构设计

"""
销售数据分析报告生成系统
架构:
  User → Orchestrator → [DataAnalyst → ReportWriter → Reviewer] → User
                                     ↓
                               (如果Review不通过)
                                     ↓
                                ReportWriter(修订)
                                     ↓
                                  Reviewer(复核)
"""

# ============ 完整可运行的代码 ============

from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
import operator

# ============ Step 1: 定义状态 ============
class SalesReportState(TypedDict):
    """
    销售报告生成系统的状态
    """
    # 消息历史(operator.iconcat实现追加而非覆盖)
    messages: Annotated[Sequence[BaseMessage], operator.iconcat]
    
    # 用户输入
    user_request: str
    
    # 数据分析结果
    sales_data: dict | None
    
    # 报告草稿
    report_draft: str | None
    
    # 审核结果
    review_feedback: str | None
    review_passed: bool
    
    # 报告终稿
    final_report: str | None
    
    # 迭代控制
    revision_count: int  # 修订次数
    max_revisions: int  # 最大修订次数

# ============ Step 2: 定义LLM ============
# 使用Ollama作为后端(也可以换成OpenAI)
llm = ChatOpenAI(
    base_url="http://localhost:11434/v1",
    api_key="ollama",  # Ollama不需要真实API Key
    model="qwen2.5-14b-instruct",
    temperature=0.7,
    max_tokens=4096,
)

# ============ Step 3: 定义各Agent节点 ============

def orchestrator_node(state: SalesReportState) -> SalesReportState:
    """
    调度员节点:接收用户请求,分解任务,判断工作流走向
    """
    user_request = state["user_request"]
    
    prompt = f"""你是一个销售报告生成系统的调度员。
用户请求:{user_request}

请分析这个请求,判断需要哪些步骤来完成:
1. 是否需要数据分析?(涉及销售数据、增长率、对比等)
2. 是否需要生成报告?(涉及文档写作、结构化输出)
3. 是否需要审核?

请输出一个JSON格式的任务分解:
{{
    "needs_analysis": true/false,
    "needs_report": true/false,
    "priority_order": ["analysis", "report", "review"] 或其他顺序,
    "special_requirements": "任何特殊要求"
}}

只输出JSON,不要有其他内容。
"""
    
    response = llm.invoke([HumanMessage(content=prompt)])
    
    # 解析JSON(简化处理,实际需要更健壮的解析)
    import json
    try:
        task_plan = json.loads(response.content)
    except:
        task_plan = {"needs_analysis": True, "needs_report": True, "priority_order": ["analysis", "report", "review"]}
    
    new_state = state.copy()
    new_state["messages"] = state["messages"] + [
        AIMessage(content=f"任务已接收,正在分析请求...\n任务计划:{task_plan}")
    ]
    
    print(f"[Orchestrator] 任务计划: {task_plan}")
    
    return new_state


def data_analyst_node(state: SalesReportState) -> SalesReportState:
    """
    数据分析Agent:分析销售数据,生成结构化分析结果
    """
    user_request = state["user_request"]
    
    # 构建分析prompt
    analysis_prompt = f"""你是一个专业的数据分析师。
用户请求:{user_request}

请基于以下模拟销售数据进行分析(实际使用时替换为真实数据库查询):
销售数据(2024年Q1-Q4):
- Q1: 营收1200万,新客500,老客800,复购率40%
- Q2: 营收1500万,新客700,老客900,复购率44%
- Q3: 营收1350万,新客600,老客850,复购率42%
- Q4: 营收1800万,新客900,老客1050,复购率46%

请输出:
1. 同比/环比增长率分析
2. 客户结构分析(新客vs老客趋势)
3. 复购率变化及原因推断
4. 关键洞察(3-5条)
5. 数据可信度评估

输出格式:结构化Markdown,包含表格和关键数据。
"""
    
    response = llm.invoke([HumanMessage(content=analysis_prompt)])
    analysis_result = response.content
    
    new_state = state.copy()
    new_state["sales_data"] = {
        "analysis_text": analysis_result,
        "data_source": "模拟数据(替换为真实查询)",
        "generated_at": "2026-05-28"
    }
    new_state["messages"] = state["messages"] + [
        AIMessage(content=f"✅ 数据分析完成\n\n{analysis_result}")
    ]
    
    print(f"[DataAnalyst] 分析完成")
    
    return new_state


def report_writer_node(state: SalesReportState) -> SalesReportState:
    """
    报告写作Agent:根据数据分析结果生成报告草稿
    如果有审核反馈,需要修订
    """
    analysis_result = state["sales_data"]["analysis_text"] if state["sales_data"] else "无数据"
    review_feedback = state.get("review_feedback", "")
    
    # 如果有审核反馈,说明是修订版本
    is_revision = bool(review_feedback)
    revision_note = f"\n\n【修订要求】\n审核反馈:{review_feedback}\n请根据反馈修订报告。" if is_revision else ""
    
    report_prompt = f"""你是一个专业的商业报告撰写师。
请基于以下数据分析结果,生成一份销售分析报告。

【数据分析结果】
{analysis_result}
{revision_note}

报告要求:
1. 结构清晰:摘要 → 数据概览 → 深度分析 → 建议 → 风险提示
2. 语言专业但易懂,适合管理层阅读
3. 包含具体数字和百分比
4. 每个建议都要有数据支撑

请生成完整的报告。
"""
    
    response = llm.invoke([HumanMessage(content=report_prompt)])
    report_draft = response.content
    
    new_state = state.copy()
    new_state["report_draft"] = report_draft
    new_state["messages"] = state["messages"] + [
        AIMessage(content=f"{'📝 报告已修订' if is_revision else '📝 报告草稿已生成'}\n\n{report_draft[:200]}...")
    ]
    
    print(f"[ReportWriter] {'修订' if is_revision else '生成'}完成")
    
    return new_state


def reviewer_node(state: SalesReportState) -> SalesReportState:
    """
    审核Agent:审核报告质量,检查数据准确性和逻辑完整性
    """
    report = state["report_draft"]
    
    review_prompt = f"""你是一个严格的报告审核专家。
请审核以下销售分析报告,检查以下方面:

1. 【数据准确性】报告中的数字是否与分析数据一致?
2. 【逻辑完整性】分析是否有逻辑漏洞或矛盾?
3. 【建议可行性】提出的建议是否具体可执行?
4. 【格式规范性】结构是否清晰,格式是否专业?
5. 【风险提示】是否识别了潜在风险?

报告内容:
{report}

请输出审核结果,格式:
{{
    "passed": true/false,
    "issues": ["问题1", "问题2", ...],
    "overall_score": 1-10,
    "recommendations": "改进建议"
}}

如果报告质量合格(80分以上),passed设为true。
"""
    
    response = llm.invoke([HumanMessage(content=review_prompt)])
    review_text = response.content
    
    # 简单解析passed字段(实际需要更健壮的解析)
    passed = "passed\": true" in review_text or "passed" not in review_text
    score_text = review_text
    
    new_state = state.copy()
    new_state["review_feedback"] = review_text
    new_state["review_passed"] = passed
    new_state["messages"] = state["messages"] + [
        AIMessage(content=f"🔍 审核完成\n\n{review_text[:300]}...")
    ]
    
    print(f"[Reviewer] 审核{'通过' if passed else '未通过'}")
    
    return new_state


def should_revise(state: SalesReportState) -> str:
    """
    条件路由函数:决定是否需要修订报告
    """
    # 如果审核通过,结束;如果不通过且还有修订机会,继续修订
    if state["review_passed"]:
        return "END"
    
    if state["revision_count"] >= state["max_revisions"]:
        # 达到最大修订次数,强制结束
        return "END"
    
    return "report_writer"  # 未通过审核,返回report_writer修订


# ============ Step 4: 构建图 ============
from langgraph.graph import StateGraph, START, END

# 创建图
graph = StateGraph(SalesReportState)

# 注册节点
graph.add_node("orchestrator", orchestrator_node)
graph.add_node("data_analyst", data_analyst_node)
graph.add_node("report_writer", report_writer_node)
graph.add_node("reviewer", reviewer_node)

# 添加边
graph.add_edge(START, "orchestrator")
graph.add_edge("orchestrator", "data_analyst")
graph.add_edge("data_analyst", "report_writer")
graph.add_edge("report_writer", "reviewer")

# 条件边:reviewer之后根据审核结果决定下一步
graph.add_conditional_edges(
    "reviewer",
    should_revise,
    {
        "report_writer": "report_writer",  # 不通过,回去修订
        "END": END                          # 通过,结束
    }
)

# 编译图
compiled_graph = graph.compile()

# ============ Step 5: 运行工作流 ============
def run_sales_report(user_request: str) -> str:
    """
    运行销售报告生成工作流
    """
    initial_state: SalesReportState = {
        "messages": [HumanMessage(content=user_request)],
        "user_request": user_request,
        "sales_data": None,
        "report_draft": None,
        "review_feedback": None,
        "review_passed": False,
        "final_report": None,
        "revision_count": 0,
        "max_revisions": 3
    }
    
    print(f"\n{'='*60}")
    print(f"开始执行工作流...")
    print(f"用户请求:{user_request}")
    print(f"{'='*60}\n")
    
    # 执行工作流(会打印中间状态)
    final_state = compiled_graph.invoke(initial_state)
    
    print(f"\n{'='*60}")
    print(f"工作流执行完成")
    print(f"修订次数:{final_state['revision_count']}")
    print(f"审核通过:{final_state['review_passed']}")
    print(f"{'='*60}\n")
    
    return final_state["report_draft"]


# ============ 执行示例 ============
if __name__ == "__main__":
    user_request = "帮我分析2024年各季度销售数据,生成一份给CEO看的季度报告,包含增长趋势、客户分析和下季度建议"
    
    final_report = run_sales_report(user_request)
    
    print("\n" + "="*60)
    print("最终报告:")
    print("="*60)
    print(final_report)

四、关键设计模式与经验

4.1 状态机的三种模式

# 模式1:线性流程(最简单)
# A → B → C → D,单向执行,无需回头
#
# 使用场景:固定流程,如:数据导入 → 数据清洗 → 分析 → 报告
# 代码示例:
graph.add_edge(START, "data_import")
graph.add_edge("data_import", "data_clean")
graph.add_edge("data_clean", "analysis")
graph.add_edge("analysis", "report")
graph.add_edge("report", END)


# 模式2:条件循环(带反馈)
# A → B → C → [条件判断] → 通过?→ END : 返回B
#
# 使用场景:需要迭代优化,如:生成 → 审核 → 不通过则修订 → 重新审核
# 代码示例:
def route_after_review(state):
    if state["approved"]:
        return END
    elif state["iteration"] >= state["max_iterations"]:
        return "escalation"  # 升级处理
    else:
        return "generator"   # 继续修订

graph.add_conditional_edges(
    "reviewer",
    route_after_review,
    {
        END: END,
        "escalation": "escalation",
        "generator": "generator"
    }
)


# 模式3:并行分发 + 汇总(最复杂但最强大)
#              → Branch_A → 
# START → Router ─────────────────→ Aggregator → END
#              → Branch_B →
#              → Branch_C →
#
# 使用场景:多维度分析后需要汇总,如:同时分析销售/市场/运营三方数据后汇总
from langgraph.constants import Send

def route_to_branches(state) -> list:
    """并行分发到多个分支"""
    return [
        Send("branch_a", state),
        Send("branch_b", state),
        Send("branch_c", state)
    ]

graph.add_conditional_edges(
    "router",
    route_to_branches,
    ["branch_a", "branch_b", "branch_c"]
)

def aggregator_node(state) -> SalesReportState:
    """汇总各分支结果"""
    a_result = state["branch_a_result"]
    b_result = state["branch_b_result"]
    c_result = state["branch_c_result"]
    
    summary = f"""
综合分析报告:

【销售分析】{a_result}
【市场分析】{b_result}
【运营分析】{c_result}

【综合结论】
...
"""
    
    return {"messages": state["messages"] + [AIMessage(content=summary)]}

4.2 Agent间通信的消息格式设计

# 良好设计的消息格式应该包含:
# 1. 明确的角色标识(谁在说)
# 2. 清晰的内容类型(说什么)
# 3. 结构化的数据(便于解析)

from enum import Enum
from dataclasses import dataclass
from typing import Optional

class MessageType(Enum):
    """消息类型枚举"""
    USER_INPUT = "user_input"
    AGENT_OUTPUT = "agent_output"
    SYSTEM_INSTRUCTION = "system_instruction"
    ERROR_REPORT = "error_report"
    HANDOFF = "handoff"  # 交接给其他Agent

class ContentFormat(Enum):
    """内容格式枚举"""
    TEXT = "text"
    JSON = "json"
    MARKDOWN = "markdown"
    TABLE = "table"
    CODE = "code"

@dataclass
class AgentMessage:
    """
    标准化的Agent间通信消息格式
    """
    sender: str              # 发送者Agent名称
    receiver: str           # 接收者Agent名称("*"表示广播)
    message_type: MessageType
    content_format: ContentFormat
    content: str            # 主内容
    metadata: dict          # 元数据(如置信度、时间戳等)
    
    def to_langchain_message(self) -> AIMessage:
        """转换为LangChain消息格式"""
        full_content = f"""【来自】{self.sender}
【类型】{self.message_type.value}
【格式】{self.content_format.value}
【内容】
{self.content}"""
        return AIMessage(content=full_content)

# 使用示例
msg = AgentMessage(
    sender="data_analyst",
    receiver="report_writer",
    message_type=MessageType.AGENT_OUTPUT,
    content_format=ContentFormat.JSON,
    content='{"revenue_growth": 0.25, "new_customers": 2700}',
    metadata={"confidence": 0.95, "data_source": "ERP系统"}
)

# 在Agent中传递
def data_analyst_node(state: AgentState) -> AgentState:
    # ... 执行分析 ...
    
    msg = AgentMessage(
        sender="data_analyst",
        receiver="report_writer", 
        message_type=MessageType.AGENT_OUTPUT,
        content_format=ContentFormat.JSON,
        content=json.dumps(analysis_result),
        metadata={"confidence": 0.92}
    )
    
    new_state = state.copy()
    new_state["messages"] = state["messages"] + [msg.to_langchain_message()]
    
    return new_state

4.3 错误处理与恢复机制

# ============ 错误处理节点示例 ============
def error_handler_node(state: AgentState) -> AgentState:
    """
    全局错误处理节点
    捕获各Agent抛出的异常,记录到状态中,并决定如何恢复
    """
    error_messages = state.get("error_messages", [])
    current_task = state.get("current_task", "unknown")
    
    # 构建错误恢复prompt
    recovery_prompt = f"""发生错误,请分析原因并提出恢复方案。

当前任务:{current_task}
错误历史:
{chr(10).join(error_messages[-3:])}  # 只保留最近3条错误

请决定:
1. 是否应该重试?(如果错误是临时性的,如网络超时)
2. 是否应该跳过当前步骤?(如果错误不影响主流程)
3. 是否应该升级人工处理?(如果错误无法自动恢复)
4. 是否应该终止工作流?(如果错误导致系统不可用)

输出格式:
{{
    "action": "retry" | "skip" | "escalate" | "abort",
    "reason": "原因说明",
    "next_step": "建议的下一步"
}}
"""
    
    response = llm.invoke([HumanMessage(content=recovery_prompt)])
    
    try:
        decision = json.loads(response.content)
    except:
        decision = {"action": "escalate", "reason": "无法解析LLM响应", "next_step": "人工介入"}
    
    new_state = state.copy()
    new_state["messages"] = state["messages"] + [
        AIMessage(content=f"⚠️ 错误处理决策:{decision}")
    ]
    
    return new_state


# 在图中添加错误处理边
def route_on_error(state: AgentState) -> str:
    """错误后的路由决策"""
    error_count = len(state.get("error_messages", []))
    
    if error_count >= 3:
        # 连续3个错误,升级处理
        return "escalation"
    elif error_count > 0:
        # 有错误但不多,尝试恢复
        return "error_handler"
    else:
        # 没有错误,正常流程
        return "normal"


# 在每个Agent节点添加错误捕获
def wrapped_data_analyst_node(state: AgentState) -> AgentState:
    """
    包装后的data_analyst_node,自动捕获错误
    """
    try:
        return data_analyst_node(state)
    except Exception as e:
        error_msg = f"[data_analyst] 错误:{str(e)}"
        print(f"❌ {error_msg}")
        
        new_state = state.copy()
        new_state["error_messages"] = state.get("error_messages", []) + [error_msg]
        new_state["messages"] = state["messages"] + [
            AIMessage(content=f"❌ {error_msg}")
        ]
        
        return new_state

# 用包装后的节点替换原节点
graph.add_node("data_analyst", wrapped_data_analyst_node)

4.4 检查点(Checkpoint)机制:支持人工介入与恢复

# 检查点机制允许在某个状态保存工作流,稍后从该状态恢复
# 适用于:1)人工审核后恢复执行 2)系统崩溃后从断点恢复 3)A/B测试不同策略

from langgraph.checkpoint.memory import MemorySaver

# 创建内存检查点存储(生产环境应使用持久化存储)
checkpointer = MemorySaver()

# 编译图时添加检查点
compiled_graph = graph.compile(
    checkpointer=checkpointer,
    interrupt_before=["reviewer"],  # 在reviewer节点前中断,等待人工确认
)

# ============ 保存和恢复检查点的示例 ============

def save_checkpoint(graph, state, thread_id: str):
    """保存检查点"""
    config = {"configurable": {"thread_id": thread_id}}
    graph.update_state(config, state)
    print(f"✅ 检查点已保存:thread_id={thread_id}")

def resume_from_checkpoint(graph, thread_id: str):
    """从检查点恢复并继续执行"""
    config = {"configurable": {"thread_id": thread_id}}
    # 从检查点恢复执行(会从interrupt_before中断点继续)
    result = graph.invoke(None, config=config)  # None表示使用保存的状态
    return result

# ============ 人工介入工作流示例 ============
# Step 1: 运行到reviewer前中断
initial_state = {"user_request": "分析Q1销售数据", ...}
config = {"configurable": {"thread_id": "session-001"}}

# invoke会在reviewer之前中断(因为我们设置了interrupt_before=["reviewer"])
result = compiled_graph.invoke(initial_state, config=config)

# 此时系统暂停,等待人工确认
print("⏸️ 工作流已暂停,等待人工审核...")

# Step 2: 人工审核(通过API或其他方式)
# 假设人工确认报告需要修改
human_feedback = {
    "approved": False,
    "feedback": "第三部分的数据单位有误,应该是'万元'而不是'亿元',请修订"
}

# Step 3: 将人工反馈写入状态,然后恢复执行
new_state = {
    "review_passed": False,
    "review_feedback": human_feedback["feedback"],
    "human_override": True
}

compiled_graph.update_state(config, new_state)

# Step 4: 恢复执行
# 继续从中断点执行,会进入reviewer → 根据状态决定继续修订
final_result = compiled_graph.invoke(None, config=config)
print("✅ 工作流完成")

五、调试经验:开发多Agent系统的心得

5.1 如何追踪状态变化

# 问题:工作流执行后,不知道中间状态发生了什么
# 解决方案:添加状态追踪装饰器

from functools import wraps

def trace_state_changes(func):
    """追踪状态变化的装饰器"""
    @wraps(func)
    def wrapper(state):
        input_keys = list(state.keys())
        print(f"\n📥 [{func.__name__}] 输入状态 keys: {input_keys}")
        
        result = func(state)
        
        output_keys = list(result.keys())
        changed_keys = [k for k in output_keys if state.get(k) != result.get(k)]
        
        print(f"📤 [{func.__name__}] 输出状态 变更字段: {changed_keys}")
        
        # 打印messages数量变化
        old_msg_count = len(state.get("messages", []))
        new_msg_count = len(result.get("messages", []))
        print(f"   messages: {old_msg_count} → {new_msg_count}")
        
        return result
    return wrapper

# 应用到所有节点
wrapped_orchestrator = trace_state_changes(orchestrator_node)
wrapped_data_analyst = trace_state_changes(data_analyst_node)
wrapped_report_writer = trace_state_changes(report_writer_node)
wrapped_reviewer = trace_state_changes(reviewer_node)

# 注册包装后的节点
graph.add_node("orchestrator", wrapped_orchestrator)
graph.add_node("data_analyst", wrapped_data_analyst)
graph.add_node("report_writer", wrapped_reviewer)  # 注意:reivewer注册为wrapped_reviewer
# ...

5.2 常见错误与修复

# 错误1:State被后续节点覆盖
# 问题:某个节点更新了状态,但后续节点读取时发现没有更新
# 原因:没有使用 Annotated[list, operator.iconcat],导致list被覆盖而不是追加
# 修复:

# ❌ 错误写法
messages: list[BaseMessage]  # 这样messages会被覆盖

# ✅ 正确写法
messages: Annotated[list[BaseMessage], operator.iconcat]
# operator.iconcat 会将新消息追加到列表末尾,而不是覆盖


# 错误2:循环依赖导致无限循环
# 问题:两个Agent互相调用,无法结束
# 原因:没有设置iteration_count上限,没有退出条件
# 修复:

def route_after_agent_b(state):
    iteration = state.get("iteration_count", 0) + 1
    
    if iteration >= 5:  # 最多循环5次
        return "escalation"
    
    if some_exit_condition(state):
        return END
    
    return "other_agent"

# 在状态更新时增加计数
new_state = state.copy()
new_state["iteration_count"] = state.get("iteration_count", 0) + 1


# 错误3:条件边返回了不存在的节点名
# 问题:add_conditional_edges 返回的节点名拼写错误
# 修复:
# 使用枚举或常量定义所有合法的节点名称
VALID_NODES = {"orchestrator", "data_analyst", "report_writer", "reviewer", END}

def validate_route(route: str):
    if route not in VALID_NODES:
        raise ValueError(f"无效的路由目标:{route},有效值:{VALID_NODES}")
    return route


# 错误4:LLM输出格式不符合预期,导致JSON解析失败
# 问题:llm输出的JSON包含额外的前缀文字,如"以下是JSON:{...}"
# 修复:使用更严格的Prompt,并添加解析容错

def robust_json_parse(text: str) -> dict:
    """健壮的JSON解析,处理LLM输出的各种格式问题"""
    import json
    import re
    
    # 方法1:直接尝试
    try:
        return json.loads(text)
    except:
        pass
    
    # 方法2:去除markdown代码块
    text = re.sub(r'```json\s*', '', text)
    text = re.sub(r'```\s*', '', text)
    try:
        return json.loads(text)
    except:
        pass
    
    # 方法3:提取第一个 {...} 或 [...]
    match = re.search(r'\{[^{}]*\}', text)
    if match:
        try:
            return json.loads(match.group())
        except:
            pass
    
    # 方法4:提取第一个数组
    match = re.search(r'\[[^\[\]]*\]', text)
    if match:
        try:
            return json.loads(match.group())
        except:
            pass
    
    # 所有方法都失败,返回默认空值
    return {}

六、生产环境部署注意事项

6.1 配置管理

# config.py - 集中管理所有配置
import os
from dataclasses import dataclass

@dataclass
class AgentConfig:
    """Agent系统配置"""
    # LLM配置
    llm_base_url: str = os.getenv("LLM_BASE_URL", "http://localhost:11434/v1")
    llm_api_key: str = os.getenv("LLM_API_KEY", "ollama")
    llm_model: str = os.getenv("LLM_MODEL", "qwen2.5-14b-instruct")
    llm_temperature: float = float(os.getenv("LLM_TEMPERATURE", "0.7"))
    llm_max_tokens: int = int(os.getenv("LLM_MAX_TOKENS", "4096"))
    
    # 工作流配置
    max_iterations: int = int(os.getenv("MAX_ITERATIONS", "10"))
    timeout_seconds: int = int(os.getenv("TIMEOUT_SECONDS", "300"))
    
    # 检查点配置
    checkpoint_enabled: bool = os.getenv("CHECKPOINT_ENABLED", "true").lower() == "true"
    checkpoint_dir: str = os.getenv("CHECKPOINT_DIR", "/data/checkpoints")
    
    # 错误处理
    max_retry: int = int(os.getenv("MAX_RETRY", "3"))
    escalation_threshold: int = int(os.getenv("ESCALATION_THRESHOLD", "3"))

config = AgentConfig()

# 在初始化LLM时使用配置
llm = ChatOpenAI(
    base_url=config.llm_base_url,
    api_key=config.llm_api_key,
    model=config.llm_model,
    temperature=config.llm_temperature,
    max_tokens=config.llm_max_tokens,
)

6.2 监控指标

# 需要监控的关键指标:

# 1. 工作流执行时间
import time
from functools import wraps

def monitor_execution_time(func):
    @wraps(func)
    def wrapper(state):
        start = time.time()
        result = func(state)
        elapsed = time.time() - start
        
        print(f"⏱️ [{func.__name__}] 执行耗时: {elapsed:.2f}s")
        
        # 将耗时记录到状态中(便于后续分析)
        result["execution_times"] = state.get("execution_times", {})
        result["execution_times"][func.__name__] = elapsed
        
        return result
    return wrapper

# 2. Token消耗统计
# 使用LangSmith或其他追踪工具记录每次LLM调用的token消耗

# 3. 错误率统计
# 在状态中添加error_tracking
def track_errors(state):
    error_count = len(state.get("error_messages", []))
    total_iterations = state.get("iteration_count", 1)
    error_rate = error_count / total_iterations
    
    metrics = {
        "error_count": error_count,
        "iteration_count": total_iterations,
        "error_rate": error_rate,
        "success": error_count == 0
    }
    
    print(f"📊 指标: {metrics}")
    return metrics

结语

LangGraph是构建复杂多Agent系统的强大工具,本指南涵盖了从概念到实现的完整知识。核心要点回顾:

多Agent系统不是银弹,建议从简单场景入手,逐步增加复杂度。