AI Agent的核心里有两件事:让大模型「会思考」和让大模型「能动手」。LangChain负责前者,MCP(Model Context Protocol)负责后者。两者结合,才构成一套完整的Agent开发体系。
本文面向已有Python基础的AI开发者,从协议原理到代码实战,从单Tool调用到多Tool编排,从手写MCP Server到线上调试,手把手拆解每一个细节。没有营销话术,全是工程干货。
在大模型应用里,「给模型提供上下文」这件事经历了三个阶段:
MCP的核心设计目标是:让AI应用和外部工具之间的接口统一化。无论你的工具是搜索API、数据库还是文件系统,只要遵循MCP协议,任意兼容MCP的客户端都能连接使用。
MCP的消息格式完全遵循JSON-RPC 2.0规范。JSON-RPC是一种无状态的轻量级远程过程调用协议,所有数据以JSON格式传输。
// JSON-RPC 2.0 请求格式
{
"jsonrpc": "2.0", // 协议版本,固定为"2.0"
"id": 1, // 请求ID,用于匹配响应(整数或字符串)
"method": "tools/list", // 方法名,MCP定义了一系列标准方法
"params": {} // 方法参数,MCP规范定义了各方法的参数结构
}
// JSON-RPC 2.0 响应格式(成功)
{
"jsonrpc": "2.0",
"id": 1, // 与请求ID对应,证明这个响应匹配哪个请求
"result": { // 调用结果,各方法返回不同结构
"tools": [...]
}
}
// JSON-RPC 2.0 响应格式(错误)
{
"jsonrpc": "2.0",
"id": 1,
"error": { // 错误对象,包含错误信息
"code": -32600, // 错误码,JSON-RPC标准错误码或MCP扩展错误码
"message": "Invalid Request"
}
}
MCP采用经典的客户端-服务器架构,但引入了第三个角色Host,构成三层结构:
提供工具(tools)、资源(resources)和提示(prompts)的实际提供者。可以理解为一个「工具仓库」,对外暴露一系列可调用的能力。每个Server有独立的命名空间,避免工具名冲突。
每个Server对应一个Client,建立一对一的WebSocket连接。Client负责维护这个连接,接收Server的响应,并向Server发送请求。Client对上层(Host)屏蔽了通信细节。
用户直接交互的程序。Host持有所有Client的引用,协调整个MCP会话。当用户问一个问题,Host决定调用哪个Server的哪个Tool,汇总结果后返回给用户。
架构图解:
用户 / 应用程序(Host)
│
│ ① 用户提问:"帮我查一下北京天气"
▼
┌─────────────────────────────────────┐
│ Host 进程 │
│ (LangChain Agent / 主应用程序) │
│ │
│ · 持有 Client1, Client2, ... │
│ · 统一管理所有工具调用 │
│ · 将工具结果注入LLM上下文 │
└────────────────┬────────────────────┘
│
② Host通过Client1发送JSON-RPC请求
③ Client1通过WebSocket转发给Server1
▼
┌─────────────────────────────────────┐
│ MCP Server (工具提供方) │
│ 例:weather-server │
│ │
│ · 实现 tools/callable 接口 │
│ · 访问外部API/数据库/文件系统 │
│ · 返回JSON-RPC响应 │
└─────────────────────────────────────┘
| 维度 | Function Calling | Tool Use(OpenAI格式) | MCP协议 |
|---|---|---|---|
| 标准化程度 | 厂商私有,各家定义不同 | 厂商私有(OpenAI/Bing) | 社区开放标准,各家兼容 |
| 连接方式 | HTTP轮询 | HTTP轮询 | WebSocket长连接 |
| 工具发现机制 | 手动注册,每次请求需重复传递 | 每次请求传递tool定义 | 启动时发现,运行时缓存tool列表 |
| 双向通信 | 不支持(纯请求-响应) | 不支持 | 支持(Server可主动推送) |
| 资源管理 | 无 | 无 | 有(Resources接口) |
| 多工具协调 | 应用层自行实现 | 应用层自行实现 | Host统一管理,多Server协作 |
| 适用场景 | 单厂商模型、快速集成 | 单厂商模型、快速集成 | 多工具复杂应用、需要长期连接 |
# Python 3.10+ 环境验证
$ python --version
Python 3.10.13
# 创建虚拟环境(推荐,生产项目隔离依赖)
$ python -m venv .venv
$ source .venv/bin/activate
# 安装LangChain核心库
$ pip install langchain langchain-core
# 安装LangChain社区版(包含大量集成工具)
$ pip install langchain-community
# 安装OpenAI集成(如果用OpenAI模型)
$ pip install openai
# 安装Tavily搜索(常用搜索工具)
$ pip install tavily-python
# 安装JSON-RPC支持(MCP Server开发需要)
$ pip install python-jsonrpc-server jsonrpc-websocket
# 验证安装
$ python -c "import langchain; print('LangChain', langchain.__version__)"
LangChain 0.3.14
LangChain的Tool是连接LLM和外部世界的桥梁。每个Tool本质是一个函数,有名称、描述(供LLM理解何时调用)和执行逻辑。
# 步骤1:导入LangChain内置工具
from langchain_community.tools import TavilySearchResults
# 步骤2:创建工具实例,max_results限制返回条数
# description参数是给LLM看的"使用说明书",决定模型何时选择这个Tool
search_tool = TavilySearchResults(
max_results=5,
description=(
"当用户询问实时信息、新闻、天气预报、"
"股价、比赛结果等需要最新数据的问题时使用"
)
)
# 步骤3:测试工具直接调用(不通过Agent)
result = search_tool.invoke({"query": "Python 3.12 新特性"})
print(result)
# 输出格式:[{'url': '...', 'content': '...'}, ...]
# 步骤4:绑定到LLM,创建Agent
from langchain_openai import ChatOpenAI
from langchain.agents import AgentType, create_react_agent
from langchain import hub
# 初始化大模型(verbose=True开启详细日志)
llm = ChatOpenAI(
model="gpt-4o",
temperature=0,
api_key="your-api-key", # 实际使用时请替换,或通过环境变量 OPENAI_API_KEY
verbose=True # 开启后会在控制台打印Agent推理全过程
)
# 从LangChain Hub拉取ReAct Agent的提示词模板
# prompt = hub.pull("hwchase17/react-chat") # 聊天版本
prompt = hub.pull("hwchase17/react") # 标准ReAct版本
# 创建Agent,将工具列表传入
agent = create_react_agent(
llm=llm,
tools=[search_tool],
prompt=prompt
)
# 步骤5:运行Agent
from langchain.agents import AgentExecutor
agent_executor = AgentExecutor(
agent=agent,
tools=[search_tool],
verbose=True, # 打印完整推理过程,方便调试
max_iterations=10, # 限制最大推理步数,防止死循环
handle_parsing_errors=True # 解析出错时自动修正,而非直接崩溃
)
response = agent_executor.invoke({"input": "2024年诺贝尔物理学奖得主是谁?"})
print(response["output"])
ReAct(Reasoning + Acting)是目前最主流的Agent推理模式。核心思想是让模型交替进行「推理」和「行动」,每一步的推理决定下一步的行动。
# ============================================================
# ReAct 模式完整实现:不依赖LangChain内置Agent,手写推理循环
# 展示ReAct的内部原理,理解每一步在做什么
# ============================================================
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langchain_community.tools import TavilySearchResults
import os
# 设置API Key(生产环境推荐用环境变量)
os.environ["OPENAI_API_KEY"] = "your-api-key"
os.environ["TAVILY_API_KEY"] = "your-tavily-key" # 从 tavily.ai 免费申请
# 初始化模型和工具
llm = ChatOpenAI(model="gpt-4o", temperature=0)
search_tool = TavilySearchResults(max_results=3)
tools = {search_tool.name: search_tool} # 用字典方便按名称查找Tool
# ============================================================
# ReAct提示词模板:包含推理格式的指令
# 格式为:Thought(思考)→ Action(行动)→ Observation(观察)
# ============================================================
REACT_SYSTEM_PROMPT = """你是一个善于使用工具的问题解决助手。
针对用户问题,按照以下格式循环:
Thought: 分析当前情况,决定下一步行动
Action: tool_name{"tool_argument"} # 格式:工具名{参数}
Observation: 工具返回结果 # 观察结果,进入下一轮推理
...(重复上述步骤直到得到答案)
Final Answer: 最终答案
可用的工具:
- tavily_search_results: 搜索互联网获取实时信息,参数格式:{"query": "搜索关键词"}
开始!"""
# ============================================================
# 手动实现ReAct推理循环
# ============================================================
def react_loop(question: str, max_steps: int = 10):
"""
手动实现ReAct循环,理解每一步发生了什么
参数:
question: 用户问题
max_steps: 最大推理步数,防止死循环
返回:最终答案字符串
"""
# 构建消息历史,以SystemMessage开头定义行为规则
messages = [
SystemMessage(content=REACT_SYSTEM_PROMPT),
HumanMessage(content=question)
]
for step in range(max_steps):
print(f"\n{'='*50}")
print(f"🔄 推理步骤 {step + 1}/{max_steps}")
# ----------------------------------------
# Step 1:让LLM生成下一步行动(包含Action)
# ----------------------------------------
response = llm.invoke(messages)
response_text = response.content
print(f"🤖 LLM输出:\n{response_text}")
# 将LLM回复加入历史(用于下一轮上下文)
messages.append(AIMessage(content=response_text))
# ----------------------------------------
# Step 2:解析LLM输出中的Action
# ----------------------------------------
# 查找 Action: 行的格式:Action: tool_name{"..."}
import re
# 匹配格式:Action: search_tool{"query": "..."}
action_pattern = r"Action:\s*(\w+)\s*\{(.+)\}"
match = re.search(action_pattern, response_text, re.DOTALL)
if not match:
# 没有匹配到Action,说明模型认为已经得到答案
# 检查是否有 Final Answer
final_pattern = r"Final Answer:\s*(.+)"
final_match = re.search(final_pattern, response_text, re.DOTALL)
if final_match:
return final_match.group(1).strip()
# 没有Action也没有Final Answer,说明输出格式异常
# 将解析错误反馈给模型让它自我修正
messages.append(HumanMessage(
content=f"你的输出无法解析,请严格按照格式输出。"
f"格式:Thought: ...\nAction: tool_name{{...参数...}}\n"
f"或者 Final Answer: 最终答案"
))
continue
# 提取工具名和参数
tool_name = match.group(1)
tool_args_str = match.group(2).strip()
# 解析参数JSON(处理单引号问题)
import json
try:
# 有些模型输出用单引号,转成双引号
tool_args_str_normalized = tool_args_str.replace("'", '"')
tool_args = json.loads(tool_args_str_normalized)
except json.JSONDecodeError as e:
print(f"⚠️ 参数JSON解析失败:{e},原始文本:{tool_args_str}")
messages.append(HumanMessage(
content=f"参数格式错误:{tool_args_str},请修正为合法的JSON格式。"
))
continue
# ----------------------------------------
# Step 3:检查工具是否存在
# ----------------------------------------
if tool_name not in tools:
print(f"⚠️ 工具 {tool_name} 不存在,跳过")
messages.append(HumanMessage(
content=f"工具 {tool_name} 不存在。可用工具:{list(tools.keys())}"
))
continue
# ----------------------------------------
# Step 4:调用工具
# ----------------------------------------
print(f"🔧 调用工具:{tool_name},参数:{tool_args}")
try:
tool_result = tools[tool_name].invoke(tool_args)
# 工具结果转字符串加入观察
observation = f"Observation: {str(tool_result)}"
print(f"✅ 工具返回:{tool_result}")
except Exception as e:
observation = f"Observation: 工具执行出错:{str(e)}"
print(f"❌ 工具执行出错:{e}")
# ----------------------------------------
# Step 5:将观察结果加入上下文,进入下一轮
# ----------------------------------------
messages.append(HumanMessage(content=observation))
# 超过最大步数仍未结束
return "推理超时,请简化问题或增加max_steps"
# 测试运行
if __name__ == "__main__":
result = react_loop("2026年Python最新版本是多少?")
print(f"\n🎯 最终答案:{result}")
# 方式一:使用 @tool 装饰器,最简单直接
from langchain_core.tools import tool
@tool
def calculate_bmi(weight_kg: float, height_m: float) -> str:
"""
计算BMI身体质量指数。
参数:
weight_kg: 体重(公斤)
height_m: 身高(米)
返回:
BMI值和健康建议
"""
bmi = weight_kg / (height_m ** 2)
if bmi < 18.5:
advice = "偏瘦,建议适当增重"
elif bmi < 24:
advice = "正常范围,继续保持"
elif bmi < 28:
advice = "偏胖,建议适当运动"
else:
advice = "肥胖,建议咨询医生"
return f"BMI = {bmi:.2f},{advice}"
# @tool装饰器自动提取函数签名和docstring生成Tool定义
# 无需手动指定name和description,name默认用函数名
# description默认从docstring第一行提取
# 测试调用
result = calculate_bmi.invoke({"weight_kg": 70, "height_m": 1.75})
print(result)
# 输出:BMI = 22.86,正常范围,继续保持
# ============================================================
# 带描述的@tool(推荐写法)
# ============================================================
@tool
def get_weather(city: str, country: str = "中国") -> str:
"""
查询指定城市的实时天气。
当用户询问"某城市天气如何"、"要不要带伞"时使用。
参数:
city: 城市名称(中文),如"北京"
country: 国家,默认为"中国",国外城市需指定
返回:
天气描述,包括温度、湿度、天气状况
"""
# 这里可以接真实天气API,此处简化演示
return f"{city}今天晴,气温18-26°C,湿度45%,适合出行"
# 在Agent中使用
from langchain.agents import create_react_agent
from langchain import hub
agent = create_react_agent(
llm=llm,
tools=[calculate_bmi, get_weather],
prompt=hub.pull("hwchase17/react")
)
from langchain.agents import AgentExecutor
executor = AgentExecutor(
agent=agent,
tools=[calculate_bmi, get_weather],
verbose=True
)
print(executor.invoke({"input": "我体重75公斤,身高1米8,帮我算一下BMI"})["output"])
# 方式二:继承 BaseTool,使用Pydantic模型定义参数
# 优点:参数类型检查更严格,支持嵌套参数,适合复杂工具
from langchain_core.tools import BaseTool, tool
from pydantic import BaseModel, Field
from typing import Optional
# ============================================================
# 第一步:定义工具的参数模型(输入)
# ============================================================
class SearchCodeInput(BaseModel):
"""搜索代码示例的输入参数模型"""
query: str = Field(
description="搜索关键词,如'Python异步编程'",
examples=["Python异步", "JavaScript事件循环"]
)
language: Optional[str] = Field(
default="python",
description="编程语言,默认为Python"
)
max_results: int = Field(
default=5,
description="最多返回结果数"
)
# ============================================================
# 第二步:继承 BaseTool,实现核心方法
# ============================================================
class CodeSearchTool(BaseTool):
"""
代码示例搜索引擎
当用户询问"怎么实现XXX功能"、
"给我一个XXX的代码示例"时使用。
"""
name: str = "code_search" # Tool的唯一标识名
description: str = "搜索编程代码示例" # 供LLM理解何时调用
args_schema: type = SearchCodeInput # 参数模型(用于验证和文档)
# ----------------------------------------
# _run:同步执行模式(最常用)
# ----------------------------------------
def _run(self, query: str, language: str = "python", max_results: int = 5):
"""
同步执行工具逻辑
参数由args_schema定义并验证,返回字符串作为工具结果
"""
# 这里接真实代码搜索API(如GitHub Search API)
mock_results = [
f"示例{i}:{query} - {language}实现(第{i}个结果)"
for i in range(1, max_results + 1)
]
return "\n".join(mock_results)
# ----------------------------------------
# _arun:异步执行模式(当工具需要异步IO时实现)
# ----------------------------------------
async def _arun(self, query: str, language: str = "python", max_results: int = 5):
"""
异步执行工具逻辑
当需要调用异步API(如aiohttp)时使用此方法
"""
import asyncio
await asyncio.sleep(0.1) # 模拟异步请求
return self._run(query, language, max_results)
# ============================================================
# 第三步:在Agent中使用Pydantic Tool
# ============================================================
code_search = CodeSearchTool()
agent = create_react_agent(
llm=llm,
tools=[code_search],
prompt=hub.pull("hwchase17/react")
)
executor = AgentExecutor(
agent=agent,
tools=[code_search],
verbose=True
)
result = executor.invoke({
"input": "怎么用Python实现一个简单的Web服务器?给我5个示例"
})
print(result["output"])
多Tool协作的核心问题:工具之间有依赖关系,顺序很重要。典型的编排模式有两种:顺序编排(结果链式传递)和并行编排(互不依赖的工具同时调用)。
# ============================================================
# 场景:用户问"帮我查一下腾讯最新季报,然后算一下同比增长"
#
# 需要两个Tool协作:
# 1. search_financial_report:查财报(先执行)
# 2. calculate_growth:计算增长率(依赖财报结果,后执行)
# ============================================================
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_react_agent
from langchain import hub
llm = ChatOpenAI(model="gpt-4o", temperature=0)
@tool
def search_financial_report(company_name: str, period: str) -> str:
"""
搜索指定公司的财务报表数据。
参数:
company_name: 公司名称,如"腾讯"、"阿里巴巴"
period: 财报周期,如"2024Q1"、"2023年报"
"""
# 真实场景调用财务数据API,此处模拟
mock_data = {
"腾讯_2024Q1": {"revenue": 1595, "profit": 502, "currency": "亿元"},
"腾讯_2023Q1": {"revenue": 1500, "profit": 480, "currency": "亿元"},
}
key = f"{company_name}_{period}"
if key in mock_data:
return str(mock_data[key])
return f"未找到{company_name}{period}数据"
@tool
def calculate_growth(current_value: float, previous_value: float) -> str:
"""
计算增长率。
参数:
current_value: 本期数值
previous_value: 上期数值
"""
if previous_value == 0:
return "除数不能为0"
growth = ((current_value - previous_value) / previous_value) * 100
return f"同比增长 {growth:.2f}%"
# 使用LangChain的Runnable接口编排工具链
from langchain_core.runnables import RunnableSequence, RunnableParallel
# 方案A:使用LangChain Expression Language(LCEL)编排
# RunnableSequence:顺序执行,上一个输出传给下一个
financial_chain = RunnableSequence(
first=search_financial_report, # 第一步:查财报
last=calculate_growth # 第二步:算增长率
)
# 由于calculate_growth需要两个参数(current和previous),
# 而search_financial_report只返回一个值,我们需要写一个转换函数
def extract_for_growth(report_result: str) -> dict:
"""
从财报结果中提取数值,准备计算增长率
财报数据格式如:{'revenue': 1595, 'profit': 502, 'currency': '亿元'}
"""
import ast
data = ast.literal_eval(report_result)
return {
"current_value": data["revenue"],
"previous_value": data["revenue"] * 0.94 # 模拟去年同期94%
}
# 用LCEL链起来
chain_with_extraction = search_financial_report | extract_for_growth | calculate_growth
# 测试
result = chain_with_extraction.invoke({
"company_name": "腾讯",
"period": "2024Q1"
})
print(f"计算结果:{result}")
# 输出:同比增长 6.33%
# ============================================================
# 场景:用户问"帮我同时查一下北京、上海、广州今天的天气"
#
# 三个城市的天气查询互不依赖,可以并行执行提高效率
# ============================================================
from langchain_core.tools import tool
from langchain_core.runnables import RunnableParallel
import concurrent.futures
@tool
def get_weather(city: str) -> str:
"""查询单个城市的天气,参数:city-城市名"""
weathers = {
"北京": "晴,18-26°C,PM2.5优",
"上海": "多云,20-28°C,PM2.5良",
"广州": "雷阵雨,25-32°C,PM2.5中"
}
return weathers.get(city, f"未找到{city}数据")
# RunnableParallel:并行执行,所有工具同时运行
parallel_weather = RunnableParallel(
beijing=get_weather,
shanghai=get_weather,
guangzhou=get_weather
)
# invoke一次,所有城市并行查询
result = parallel_weather.invoke({
"beijing": {"city": "北京"},
"shanghai": {"city": "上海"},
"guangzhou": {"city": "广州"}
})
print(result)
# 输出:{'beijing': '晴,18-26°C,PM2.5优',
# 'shanghai': '多云,20-28°C,PM2.5良',
# 'guangzhou': '雷阵雨,25-32°C,PM2.5中'}
# ============================================================
# 复杂编排:并行查数据,汇总后再分析
# ============================================================
from langchain_core.runnables import RunnableBranch
# 并行执行多个独立查询(不互相依赖的工具)
parallel_search = RunnableParallel(
revenue=lambda _: "腾讯Q1营收1595亿元",
user_active=lambda _: "微信月活13.5亿",
market_share=lambda _: "市占率约35%"
)
# 汇总后做分析(等并行结果都返回后才执行)
def summarize_analysis(search_results: dict) -> str:
"""汇总分析三个查询结果"""
return (
f"财报分析:\n"
f"1. 营收:{search_results['revenue']}\n"
f"2. 用户活跃:{search_results['user_active']}\n"
f"3. 市占率:{search_results['market_share']}\n"
f"综合判断:业务稳健,用户基数大,营收增长空间可观"
)
full_chain = parallel_search | summarize_analysis
analysis_result = full_chain.invoke({})
print(analysis_result)
MCP Server的实现核心是暴露三个标准接口:initialize(初始化握手)、tools/list(列出可用工具)、tools/call(调用指定工具)。以下是一个最小可用MCP Server的完整实现:
# ============================================================
# mcp_simple_server.py
# 最小化MCP Server实现:搜索工具 + 计算器工具
# 运行方式:python mcp_simple_server.py
# 或通过stdio方式运行:python -m uvicorn mcp_simple_server:app
# ============================================================
import json
import sys
from datetime import datetime
# ----------------------------------------
# 工具定义:MCP规范下每个工具是一个对象
# 包含 name(唯一标识)、description(供LLM理解)、input_schema(参数校验)
# ----------------------------------------
TOOLS = [
{
"name": "web_search", # 工具唯一标识
"description": "搜索互联网获取信息。用户询问实时新闻、"
"天气、股价等需要最新数据的问题时使用。",
"inputSchema": { # JSON Schema格式的参数定义
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索关键词"
},
"max_results": {
"type": "integer",
"description": "最多返回条数,默认5",
"default": 5
}
},
"required": ["query"]
}
},
{
"name": "calculator", # 计算器工具
"description": "执行数学计算,支持加减乘除和幂运算。",
"inputSchema": {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "数学表达式,如 '2+3*5' 或 '10**2'"
}
},
"required": ["expression"]
}
}
]
# ----------------------------------------
# 工具执行函数映射表
# key为工具名,value为执行函数
# ----------------------------------------
def execute_web_search(args: dict) -> str:
"""执行搜索(这里用模拟数据,生产环境接真实搜索API)"""
query = args["query"]
max_results = args.get("max_results", 5)
results = [
f"结果{i}:关于「{query}」的相关信息(第{i}条)"
for i in range(1, max_results + 1)
]
return "\n".join(results)
def execute_calculator(args: dict) -> str:
"""执行数学计算(使用安全eval,不直接用python eval)"""
expression = args["expression"]
# 安全的数学表达式计算(只允许数字和运算符)
allowed_chars = set("0123456789+-*/.()** ")
if not all(c in allowed_chars for c in expression):
return f"错误:表达式包含非法字符,只支持数字和运算符 +-*/()"
try:
result = eval(expression, {"__builtins__": {}}, {}) # 受限eval
return f"计算结果:{expression} = {result}"
except Exception as e:
return f"计算错误:{str(e)}"
TOOL_EXECATORS = {
"web_search": execute_web_search,
"calculator": execute_calculator
}
# ----------------------------------------
# JSON-RPC请求处理主函数
# ----------------------------------------
def handle_request(request: dict) -> dict:
"""
处理收到的JSON-RPC请求,返回符合规范的响应
参数:
request: JSON-RPC格式的请求对象
返回:
JSON-RPC格式的响应对象(成功用result,失败用error)
"""
jsonrpc = request.get("jsonrpc", "2.0")
req_id = request.get("id")
method = request.get("method")
params = request.get("params", {})
# ----------------------------------------
# 方法1:initialize - 握手初始化
# Client连接Server时首先发送此请求,交换协议版本和能力
# ----------------------------------------
if method == "initialize":
return {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"protocolVersion": "2024-11-05", # MCP协议版本
"capabilities": { # Server声明自己的能力
"tools": {}, # 声明自己支持tools能力
# "resources": {}, # 如需资源管理能力,在此声明
# "prompts": {} # 如需提示管理能力,在此声明
},
"serverInfo": {
"name": "simple-mcp-server",
"version": "1.0.0"
}
}
}
# ----------------------------------------
# 方法2:tools/list - 列出所有可用工具
# Host通过此请求获取Server提供哪些工具及其定义
# ----------------------------------------
elif method == "tools/list":
return {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"tools": TOOLS # 返回工具定义列表
}
}
# ----------------------------------------
# 方法3:tools/call - 调用具体工具
# Host决定调用某个工具时发送此请求
# params格式:{"name": "工具名", "arguments": {...}}
# ----------------------------------------
elif method == "tools/call":
tool_name = params.get("name")
tool_args = params.get("arguments", {})
# 工具不存在检查
if tool_name not in TOOL_EXECATORS:
return {
"jsonrpc": "2.0",
"id": req_id,
"error": {
"code": -32601, # JSON-RPC标准错误码:方法未找到
"message": f"工具 '{tool_name}' 不存在"
}
}
# 执行工具
try:
result = TOOL_EXECATORS[tool_name](tool_args)
return {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"content": [ # MCP规定格式:content数组
{
"type": "text",
"text": result
}
]
}
}
except Exception as e:
# 工具执行出错
return {
"jsonrpc": "2.0",
"id": req_id,
"error": {
"code": -32603, # JSON-RPC标准错误码:内部错误
"message": f"工具执行出错:{str(e)}"
}
}
# ----------------------------------------
# 方法4:notifications/initialized - 客户端握手完成通知
# Client在收到initialize响应后发送此通知,表示握手完成
# Server收到后可以开始正常通信
# ----------------------------------------
elif method == "notifications/initialized":
# 通知类方法不需要返回响应
return None
else:
return {
"jsonrpc": "2.0",
"id": req_id,
"error": {
"code": -32601,
"message": f"方法 '{method}' 不存在或不支持"
}
}
# ----------------------------------------
# 标准输入输出模式(stdio):
# MCP Client通过stdin/stdout与Server通信
# 适用于本地进程场景
# ----------------------------------------
def run_stdio_server():
"""通过标准输入输出运行MCP Server(最常用的本地模式)"""
print("INFO: MCP Server启动,使用stdio模式", file=sys.stderr)
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
request = json.loads(line)
except json.JSONDecodeError:
# JSON解析失败,返回错误响应
print(json.dumps({
"jsonrpc": "2.0",
"id": None,
"error": {
"code": -32700,
"message": "无效的JSON格式"
}
}), file=sys.stdout)
continue
# 处理请求
response = handle_request(request)
# notifications/initialized 等通知类返回None,不输出
if response is not None:
print(json.dumps(response), file=sys.stdout)
sys.stdout.flush()
if __name__ == "__main__":
run_stdio_server()
工具注册的核心是在启动时将所有Tool的定义(name、description、inputSchema)和执行函数(executor)绑定在一起。MCP Server启动后会:
# ============================================================
# 工具注册和请求处理的完整流程
# 演示如何组织代码结构,便于维护和扩展
# ============================================================
from typing import Callable, Any
from dataclasses import dataclass
# ----------------------------------------
# 工具定义数据类
# ----------------------------------------
@dataclass
class ToolDefinition:
"""
工具定义数据结构
"""
name: str # 唯一标识
description: str # 供LLM理解何时使用
input_schema: dict # JSON Schema格式的参数定义
executor: Callable[[dict], Any] # 执行函数
# ----------------------------------------
# 工具注册表(全局)
# 所有工具在此注册,tools/list返回此表
# ----------------------------------------
_tool_registry: dict[str, ToolDefinition] = {}
def register_tool(
name: str,
description: str,
input_schema: dict,
executor: Callable[[dict], Any]
):
"""
工具注册函数
参数:
name: 工具名,全局唯一
description: 给LLM看的描述
input_schema: 参数JSON Schema
executor: 执行函数,接收参数字典
"""
_tool_registry[name] = ToolDefinition(
name=name,
description=description,
input_schema=input_schema,
executor=executor
)
def get_tool_definitions() -> list[dict]:
"""
获取所有工具定义(用于tools/list响应)
不包含executor,只包含元数据
"""
return [
{
"name": t.name,
"description": t.description,
"inputSchema": t.input_schema
}
for t in _tool_registry.values()
]
def execute_tool(name: str, arguments: dict) -> Any:
"""
执行指定工具
参数:
name: 工具名
arguments: 参数字典
"""
if name not in _tool_registry:
raise ValueError(f"工具 {name} 不存在")
return _tool_registry[name].executor(arguments)
# ----------------------------------------
# 注册工具示例(用装饰器方式注册)
# ----------------------------------------
def define_tool(name: str, description: str, input_schema: dict):
"""
工具定义装饰器
用法:
@define_tool("tool_name", "description", {"type": "object", ...})
def my_tool(args):
...
"""
def decorator(func: Callable[[dict], Any]):
register_tool(name, description, input_schema, func)
return func
return decorator
# 使用装饰器注册工具
@define_tool(
name="currency_converter",
description="货币换算,将一个金额从一种货币换算成另一种货币",
input_schema={
"type": "object",
"properties": {
"amount": {"type": "number", "description": "金额"},
"from_currency": {"type": "string", "description": "源货币,如USD、CNY"},
"to_currency": {"type": "string", "description": "目标货币,如EUR、JPY"}
},
"required": ["amount", "from_currency", "to_currency"]
}
)
def currency_converter(args: dict) -> str:
"""货币换算工具"""
rates = {"USD_CNY": 7.2, "USD_EUR": 0.92, "CNY_JPY": 20.5}
amount = args["amount"]
from_c = args["from_currency"]
to_c = args["to_currency"]
key = f"{from_c}_{to_c}"
if key in rates:
result = amount * rates[key]
return f"{amount} {from_c} = {result:.2f} {to_c}"
return f"暂不支持 {from_c} 到 {to_c} 的换算"
@define_tool(
name="file_reader",
description="读取文件内容,返回文件文本",
input_schema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件路径"}
},
"required": ["path"]
}
)
def file_reader(args: dict) -> str:
"""读取文件工具"""
try:
with open(args["path"], "r", encoding="utf-8") as f:
return f.read(4096) # 最多读取4KB,防止内存溢出
except FileNotFoundError:
return f"文件不存在:{args['path']}"
except Exception as e:
return f"读取出错:{str(e)}"
# ----------------------------------------
# 初始化时注册所有工具
# ----------------------------------------
def initialize_tools():
"""在Server启动时调用,初始化所有工具注册"""
currency_converter({"amount": 1, "from_currency": "USD", "to_currency": "CNY"}) # 注册
file_reader({"path": "/tmp/dummy"}) # 注册
# 在模块加载时自动注册(也可以在Server启动时显式调用)
initialize_tools()
MCP的返回值规范要求工具结果放在result.content数组里,每个元素有type和text字段。错误遵循JSON-RPC 2.0标准错误码规范。
# ============================================================
# MCP 错误处理规范
# ============================================================
# JSON-RPC 标准错误码(部分)
MCP_ERROR_CODES = {
-32700: "解析错误 - 收到无效JSON", # Parse error
-32600: "非法请求 - JSON格式正确但结构非法", # Invalid Request
-32601: "方法未找到", # Method not found
-32602: "参数无效", # Invalid params
-32603: "内部错误", # DM Sansnal error
# MCP扩展错误码(从-32000开始)
-32000: "工具执行失败",
-32001: "工具超时",
-32002: "资源不可用",
}
# ----------------------------------------
# 错误响应构建函数
# ----------------------------------------
def mcp_error(code: int, message: str, req_id=None) -> dict:
"""构建MCP错误响应"""
return {
"jsonrpc": "2.0",
"id": req_id,
"error": {
"code": code,
"message": message,
"data": None # 可扩展:附加错误详情
}
}
# ----------------------------------------
# 工具返回值规范
# MCP要求content字段是一个content数组
# 每个元素可以是text(文本)、image(图片)、blob(二进制)
# ----------------------------------------
def mcp_success(text: str, req_id=None) -> dict:
"""构建MCP成功响应"""
return {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"content": [
{"type": "text", "text": text}
],
"isError": False # 标记是否为错误类型结果
}
}
# 工具执行失败的响应(isError=True,HTTP 200但内容含错误信息)
def mcp_tool_error(text: str, req_id=None) -> dict:
"""工具执行出错时的响应(isError=True但HTTP状态码200)"""
return {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"content": [
{"type": "text", "text": f"工具执行失败:{text}"}
],
"isError": True
}
}
# ============================================================
# 完整错误处理示例:参数校验 → 执行 → 异常捕获
# ============================================================
def safe_execute_tool(tool_name: str, arguments: dict, req_id=None) -> dict:
"""
带完整错误处理的工具执行函数
错误处理流程:
1. 检查工具是否存在 → -32601
2. 校验参数格式 → -32602
3. 执行并捕获异常 → -32603 或 -32000
"""
# Step 1: 工具存在性检查
if tool_name not in _tool_registry:
return mcp_error(
-32601,
f"工具 '{tool_name}' 不存在,可用工具:{list(_tool_registry.keys())}",
req_id
)
tool_def = _tool_registry[tool_name]
# Step 2: 参数校验(基于inputSchema)
# 生产环境推荐用jsonschema库做严格校验
import jsonschema
try:
jsonschema.validate(instance=arguments, schema=tool_def.input_schema)
except jsonschema.ValidationError as e:
return mcp_error(
-32602,
f"参数校验失败:{e.message},期望格式:{tool_def.input_schema}",
req_id
)
# Step 3: 执行并捕获异常
try:
result = tool_def.executor(arguments)
return mcp_success(str(result), req_id)
except TimeoutError:
return mcp_error(-32001, "工具执行超时", req_id)
except Exception as e:
return mcp_error(-32000, f"工具执行异常:{str(e)}", req_id)
# ============================================================
# 开启verbose=True后,LangChain会打印完整的推理过程
# 分析这些日志可以定位Agent"在想什么"
# ============================================================
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_react_agent
from langchain import hub
from langchain_community.tools import TavilySearchResults
llm = ChatOpenAI(model="gpt-4o", temperature=0)
search = TavilySearchResults(max_results=3)
agent = create_react_agent(llm=llm, tools=[search], prompt=hub.pull("hwchase17/react"))
executor = AgentExecutor(
agent=agent,
tools=[search],
verbose=True, # 🔑 开启详细日志
max_iterations=5
)
executor.invoke({"input": "2026年FIFA世界杯在哪里举办?"})
# ============================================================
# verbose=True 输出的结构解析
# ============================================================
"""
👀 Entering new AgentExecutor chain...
# Step 1:第一轮推理
Thought: 用户问的是2026年世界杯举办地,这是实时信息,需要用搜索工具。
Action: tavily_search_results{"query": "2026年FIFA世界杯举办地"}
Action: tavily_search_results call started...
# Step 2:工具执行中
[tavily] Entering child run - call: tavily_search_results with input: {'query': '2026年FIFA世界杯举办地'}
[tavily] Finished chain.
# Step 3:工具返回结果给LLM
Observation: ['结果1:2026年FIFA世界杯将在美国、加拿大、墨西哥联合举办...']
# Step 4:第二轮推理(根据工具结果给出最终答案)
Thought: 已得到答案,2026年世界杯由北美三国联合举办。
Final Answer: 2026年FIFA世界杯将由美国、加拿大和墨西哥联合举办,这是历史上首次由三国联合举办的世界杯。
"""
# ============================================================
# 当Agent行为异常时,从日志中定位问题:
# 1. Thought行:检查LLM的推理是否合理
# 2. Action行:检查工具选择是否正确
# 3. Observation行:检查工具返回是否符合预期
# ============================================================
# ============================================================
# 自定义回调:实时监控每一步推理和工具调用
# 适用于生产环境需要记录完整执行轨迹
# ============================================================
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.agent import AgentFinish, AgentAction
class DebugCallback(BaseCallbackHandler):
"""
自定义调试回调,打印Agent执行过程中的每一步
继承BaseCallbackHandler,覆盖感兴趣的事件方法
"""
def on_agent_action(self, action: AgentAction, **kwargs):
"""每当Agent执行一个Tool时调用"""
print(f"\n🔧 [Tool调用] 工具名:{action.tool}")
print(f"📥 [Tool参数] {action.tool_input}")
def on_agent_finish(self, finish: AgentFinish, **kwargs):
"""当Agent完成最终输出时调用"""
print(f"\n✅ [最终答案] {finish.return_values['output']}")
def on_chain_end(self, chain_end: dict, **kwargs):
"""每个Chain执行完毕时调用,可用于统计耗时"""
print(f"\n📊 [Chain结束] 输出长度:{len(str(chain_end))}")
# 使用回调
from langchain_core.callbacks import StdOutCallbackHandler
agent = create_react_agent(llm=llm, tools=[search], prompt=hub.pull("hwchase17/react"))
executor = AgentExecutor(
agent=agent,
tools=[search],
callbacks=[DebugCallback()] # 🔑 注入自定义回调
)
executor.invoke({"input": "Python之父是谁?"})
# ============================================================
# MCP协议调试:stdio模式下的日志输出
# 在Server代码中加入结构化日志,便于分析协议交互
# ============================================================
import json
import sys
from datetime import datetime
def log_json_rpc(direction: str, message: dict):
"""
打印JSON-RPC消息的日志(用于调试分析)
参数:
direction: "→" 表示收到的请求,"←" 表示发出的响应
message: JSON-RPC消息对象
"""
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
method = message.get("method", "response/batch")
req_id = message.get("id", "-")
# 彩色输出(Unix终端)
arrow = "→" if direction == "IN" else "←"
print(
f"[{timestamp}] {arrow} {method} (id={req_id})",
file=sys.stderr
)
# 如果是debug模式,打印完整payload
if len(sys.argv) > 1 and sys.argv[1] == "--debug":
print(
json.dumps(message, indent=2, ensure_ascii=False),
file=sys.stderr
)
sys.stderr.flush()
# 在stdio循环中:
def run_debug_server():
"""带调试输出的Server循环"""
for line in sys.stdin:
line = line.strip()
if not line:
continue
request = json.loads(line)
log_json_rpc("IN", request) # 打印收到的请求
response = handle_request(request)
if response is not None:
log_json_rpc("OUT", response) # 打印发出的响应
if response is not None:
print(json.dumps(response), file=sys.stdout)
sys.stdout.flush()
# ----------------------------------------
# 启动命令:
# python mcp_server.py --debug 2>&1 | grep "→\|←"
# ----------------------------------------
# ----------------------------------------
# WebSocket模式的抓包(使用python jsonrpc-websocket)
# ----------------------------------------
"""
import asyncio
import json
from jsonrpc_websocket import Server
async def test_mcp_ws():
server = Server("ws://localhost:8000/mcp")
# 初始化握手
init_result = await server.send("initialize", {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"clientInfo": {"name": "test-client", "version": "1.0.0"}
})
print("初始化响应:", init_result)
# 列出工具
tools_result = await server.send("tools/list")
print("工具列表:", tools_result)
# 调用工具
call_result = await server.send("tools/call", {
"name": "calculator",
"arguments": {"expression": "2**10"}
})
print("工具调用:", call_result)
asyncio.run(test_mcp_ws())
"""
排查步骤:
# 调试:打印Agent知道的工具列表
print("Agent可用工具:", [t.name for t in executor.tools])
# 应该包含你注册的所有工具
排查步骤:
# 常见错误:参数类型不匹配
# 定义:weight: int = Field(description="体重(公斤)")
# 传入:{"weight": "75"} ← 字符串,应该传整数 75
# 解决:让LLM传正确的类型,或在Tool内部做类型转换
def _run(self, weight_kg: int, **kwargs):
weight = int(weight_kg) # 做容错处理
...
排查步骤:
# WebSocket Server超时设置
import asyncio
from jsonrpc_websocket import Server
async def call_with_timeout():
server = Server("ws://localhost:8000/mcp")
try:
# 5秒超时
result = await asyncio.wait_for(
server.send("tools/list"),
timeout=5.0
)
return result
except asyncio.TimeoutError:
print("Server连接超时,检查Server是否正常运行")
return None
# ============================================================
# LangChain Agent的Streaming模式
# 将LLM的输出流式返回给用户,提升响应体验
# 适用于打字效果、长文本生成的场景
# ============================================================
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_react_agent
from langchain import hub
llm = ChatOpenAI(model="gpt-4o", temperature=0, streaming=True) # 🔑 开启流式
search = TavilySearchResults(max_results=3)
agent = create_react_agent(llm=llm, tools=[search], prompt=hub.pull("hwchase17/react"))
executor = AgentExecutor(
agent=agent,
tools=[search],
streaming=True # 🔑 AgentExecutor也要开启streaming
)
# 使用streaming模式
from langchain_core.callbacks import StdOutCallbackHandler
# 方法1:通过回调实时获取流式输出
print("Agent回答:", end="", flush=True)
for chunk in executor.stream({"input": "解释一下什么是量子计算"}):
# chunk是每个推理步骤的输出
if "output" in chunk:
print(chunk["output"], end="", flush=True)
elif "steps" in chunk:
# 工具调用步骤的输出
pass
print() # 最后换行
# ----------------------------------------
# Streaming注意事项:
# ----------------------------------------
# 1. Tool执行本身不支持Streaming(Tool是整块返回结果的)
# 只有LLM的思考过程是流式的
# 2. verbose=True和streaming=True同时开启时,日志会有交织
# 生产环境建议分开调试
# 3. 流式输出时需要前端配合(如SSE/WebSocket推送),后端改动不大
# ----------------------------------------
# ============================================================
# 批量Tool调用的并发控制
# 场景:Agent一次需要调用多个不相关的Tool
# 方案:asyncio + ThreadPoolExecutor 控制并发度
# ============================================================
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Callable, Any
# ----------------------------------------
# 方案1:信号量控制并发数(Semaphore)
# 适合IO密集型Tool(网络请求)
# ----------------------------------------
async def parallel_tool_calls_semaphore(
tool_calls: List[dict],
max_concurrency: int = 3
) -> List[Any]:
"""
使用asyncio.Semaphore控制Tool并发数
参数:
tool_calls: [{"tool": tool1, "args": {...}}, ...]
max_concurrency: 最大同时执行的Tool数
"""
semaphore = asyncio.Semaphore(max_concurrency)
async def call_with_semaphore(tool_call: dict) -> Any:
async with semaphore:
# 在这里调用Tool(如果是同步函数,用run_in_executor包装)
return await asyncio.get_event_loop().run_in_executor(
None, # 使用默认线程池
tool_call["tool"].invoke,
tool_call["args"]
)
tasks = [call_with_semaphore(tc) for tc in tool_calls]
# 🔑 asyncio.gather: 并发执行所有任务,结果按顺序返回
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常结果(某Tool出错不影响其他Tool)
return [
r if not isinstance(r, Exception) else f"错误:{str(r)}"
for r in results
]
# ----------------------------------------
# 方案2:ThreadPoolExecutor控制线程池大小
# 适合需要控制线程资源的场景
# ----------------------------------------
def parallel_tool_calls_threadpool(
tool_calls: List[dict],
max_workers: int = 5
) -> List[Any]:
"""
使用ThreadPoolExecutor控制Tool并发执行
参数:
tool_calls: [{"tool": tool, "args": {...}}, ...]
max_workers: 最大线程数
"""
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = [
pool.submit(tc["tool"].invoke, tc["args"])
for tc in tool_calls
]
results = [f.result() for f in futures]
return results
# ----------------------------------------
# 示例:同时搜索多个关键词
# ----------------------------------------
@tool
def search_news(query: str) -> str:
"""搜索新闻,参数:query-搜索词"""
return f"新闻搜索结果:{query}相关资讯..."
async def demo_batch_search():
queries = [
{"query": "AI大模型最新进展"},
{"query": "量子计算突破"},
{"query": "新能源技术"},
{"query": "自动驾驶现状"},
{"query": "AR/VR设备"},
]
tool_calls = [
{"tool": search_news, "args": q}
for q in queries
]
print("开始并发搜索(最大并发3)...")
results = await parallel_tool_calls_semaphore(tool_calls, max_concurrency=3)
for i, r in enumerate(results):
print(f" [{i+1}] {r}")
asyncio.run(demo_batch_search())
# ============================================================
# Tool结果缓存:避免重复调用耗时Tool
# 场景:同一查询短时间内被多次触发
# ============================================================
from functools import lru_cache
from typing import Any, Callable, Hashable
import hashlib
import json
# ----------------------------------------
# 方案1:基于函数参数的LRU缓存(适合纯函数)
# ----------------------------------------
@lru_cache(maxsize=128)
def cached_search(query: str, max_results: int = 5) -> str:
"""
带LRU缓存的搜索函数
相同参数(query + max_results)的搜索请求,在缓存有效期内直接返回结果
适合搜索、查库等幂等操作
"""
# 这里放真实搜索逻辑
return f"搜索结果:{query}"
# 清除缓存
# cached_search.cache_clear()
# ----------------------------------------
# 方案2:基于Tool输入的通用缓存装饰器
# 支持不可哈希的参数(字典)
# ----------------------------------------
def hash_args(args: dict) -> str:
"""将参数字典转成哈希字符串,用于缓存key"""
json_str = json.dumps(args, sort_keys=True, ensure_ascii=True)
return hashlib.md5(json_str.encode()).hexdigest()
def cache_tool_result(maxsize: int = 128, ttl_seconds: float = 300):
"""
Tool结果缓存装饰器
参数:
maxsize: 缓存容器大小(最近N次结果)
ttl_seconds: 缓存有效期(秒),默认5分钟
"""
cache: dict = {}
cache_time: dict = {}
def decorator(func: Callable) -> Callable:
def wrapper(args: dict) -> str:
cache_key = hash_args(args)
now = __import__("time").time()
# 命中缓存且未过期
if cache_key in cache:
if now - cache_time[cache_key] < ttl_seconds:
return f"[缓存命中] {cache[cache_key]}"
else:
# 过期了,删除缓存
del cache[cache_key]
del cache_time[cache_key]
# 执行真实逻辑
result = func(args)
cache[cache_key] = result
cache_time[cache_key] = now
# 超过maxsize,清除最老的缓存
if len(cache) > maxsize:
oldest_key = min(cache_time, key=cache_time.get)
del cache[oldest_key]
del cache_time[oldest_key]
return result
return wrapper
return decorator
# 使用示例
@cache_tool_result(maxsize=64, ttl_seconds=600)
def expensive_api_tool(args: dict) -> str:
"""需要调用外部付费API的Tool,应该加缓存"""
return f"API返回结果:{args}"
# ============================================================
# max_iterations / max_execution_time
# 防止Agent死循环或无限执行的核心手段
# ============================================================
from langchain.agents import AgentExecutor, create_react_agent
from langchain import hub
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o", temperature=0)
search = TavilySearchResults(max_results=3)
agent = create_react_agent(llm=llm, tools=[search], prompt=hub.pull("hwchase17/react"))
# ----------------------------------------
# AgentExecutor 核心参数
# ----------------------------------------
executor = AgentExecutor(
agent=agent,
tools=[search],
# 🔑 max_iterations:最大推理步数(每个Tool调用算1步)
# 设置后,Agent超过此步数会停止并返回中间结果
max_iterations=10,
# 🔑 max_execution_time:最大执行时间(秒)
# 达到时间限制后强制停止
max_execution_time=60.0,
# 🔑 early_stopping_method:停止时用什么策略
# "force" = 强制停止,返回最终状态
# "generate" = 让Agent生成最终答案再停止
early_stopping_method="force",
# 🔑 handle_parsing_errors:遇到解析错误怎么办
# True = 把错误信息反馈给Agent让它自我修正
# False = 直接抛出异常
handle_parsing_errors=True,
# 🔑 return_intermediate_steps:是否返回中间步骤日志
# True = 返回所有Tool调用记录,便于调试
# False = 只返回最终答案(节省token)
return_intermediate_steps=True
)
# ----------------------------------------
# 检查中间步骤:看Agent走了哪些弯路
# ----------------------------------------
result = executor.invoke({"input": "解释一下什么是RAG"})
print("✅ 最终答案:", result["output"])
# 中间步骤:分析Agent的思考路径
print("\n🔍 Agent推理路径(共{}步):".format(len(result["intermediate_steps"])))
for i, step in enumerate(result["intermediate_steps"]):
action = step[0] # AgentAction
observation = step[1] # 工具返回值
print(f" Step {i+1}: 工具={action.tool}, 参数={action.tool_input}")
print(f" 工具返回={observation[:80]}...")
本文覆盖了LangChain Agent与MCP协议整合开发的全链路知识:
这套体系的核心价值在于:一次开发,多处复用。你写的MCP Server可以被任意MCP兼容的Agent连接调用,你定义的LangChain Tool可以在不同Agent里重复使用。在AI应用快速迭代的阶段,这种工程化复用能力是竞争力的关键。