概述
传统的运维自动化依赖于预定义的脚本和规则,面对复杂的系统问题时往往力不从心。AI Agent(智能体)的出现改变了这一局面——它能够理解自然语言描述的问题,自主分析系统状态,制定解决方案并执行操作,实现真正的智能运维。
本教程将带你从零构建一个 AI 运维 Agent,它能够:
- 监控服务器状态(CPU、内存、磁盘、网络)
- 自动分析日志并定位问题
- 执行常见运维操作(重启服务、清理磁盘、扩容等)
- 通过自然语言交互,降低运维门槛
- 学习历史操作记录,持续优化决策
技术栈:Python 3.11+ / LangChain / OpenAI API / Docker / Prometheus / Redis
前置条件
| 项目 | 要求 |
|---|---|
| Python | 3.11 或更高版本 |
| OpenAI API | 有效的 API 密钥(GPT-4o 或 GPT-3.5-turbo) |
| Docker | 用于容器化部署 |
| Linux 服务器 | 用于测试(Ubuntu 22.04 推荐) |
| 基础知识 | Python 编程、Linux 基本操作、Docker 基础 |
环境准备
bash
# 创建项目
mkdir -p ~/ops-agent && cd ~/ops-agent
# 创建虚拟环境
python3 -m venv venv
source venv/bin/activate
# 安装依赖
pip install langchain langchain-openai langchain-community \
openai anthropic redis docker \
prometheus-client psutil \
pyyaml rich typer \
fastapi uvicorn系统架构设计
AI 运维 Agent 的架构分为以下几层:
2.1 架构概览
┌─────────────────────────────────────────────────┐
│ 用户交互层 │
│ (CLI / Web UI / Slack Bot) │
├─────────────────────────────────────────────────┤
│ Agent 决策层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 意图识别 │→│ 方案制定 │→│ 任务执行 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
├─────────────────────────────────────────────────┤
│ 工具层 (Tools) │
│ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │系统监控│ │日志分析│ │服务管理│ │告警处理│ │
│ └────────┘ └────────┘ └────────┘ └────────┘ │
├─────────────────────────────────────────────────┤
│ 基础设施层 │
│ Linux Server / Docker / K8s / Cloud API │
└─────────────────────────────────────────────────┘
2.2 项目结构
text
ops-agent/
├── agent/
│ ├── __init__.py
│ ├── core.py # Agent 核心逻辑
│ ├── intent.py # 意图识别
│ └── planner.py # 任务规划
├── tools/
│ ├── __init__.py
│ ├── system_monitor.py # 系统监控工具
│ ├── log_analyzer.py # 日志分析工具
│ ├── service_manager.py # 服务管理工具
│ ├── disk_manager.py # 磁盘管理工具
│ └── network_tools.py # 网络诊断工具
├── memory/
│ ├── __init__.py
│ ├── short_term.py # 短期记忆(Redis)
│ └── long_term.py # 长期记忆(SQLite)
├── config.yaml # 配置文件
├── main.py # 入口文件
├── requirements.txt
└── Dockerfile核心代码实现
3.1 配置文件
yaml
# config.yaml
app:
name: ops-agent
version: "1.0.0"
debug: true
llm:
provider: openai
model: gpt-4o
api_key: ${OPENAI_API_KEY}
temperature: 0.3
max_tokens: 4096
safety:
confirm_required:
- "rm -rf"
- "systemctl stop"
- "docker rm"
- "DROP TABLE"
- "shutdown"
deny_commands:
- "rm -rf /"
- "mkfs"
- "dd if=/dev/zero"
log_retention_days: 90
monitor:
interval: 60
thresholds:
cpu_percent: 80
memory_percent: 85
disk_percent: 90
load_average: 4.0
alert:
channels:
- type: console
enabled: true
- type: webhook
enabled: false
memory:
redis_url: redis://localhost:6379/0
sqlite_path: ./data/ops_agent.db3.2 Agent 核心逻辑
python
# agent/core.py
"""AI 运维 Agent 核心模块"""
import json
import logging
from datetime import datetime
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langchain_core.tools import BaseTool
from tools.system_monitor import SystemMonitorTool
from tools.log_analyzer import LogAnalyzerTool
from tools.service_manager import ServiceManagerTool
from tools.disk_manager import DiskManagerTool
from tools.network_tools import NetworkDiagnosticTool
logger = logging.getLogger(__name__)
class OpsAgent:
"""AI 运维智能体"""
SYSTEM_PROMPT = """你是一个专业的 AI 运维助手,负责服务器和应用的自动化运维。
你的职责:
1. 监控系统状态,及时发现异常
2. 分析日志,定位问题根因
3. 执行运维操作,解决常见问题
4. 提供运维建议和最佳实践
工作原则:
- 安全第一:任何可能影响系统稳定性的操作都需要确认
- 先诊断后操作:不要盲目执行命令,先分析问题再制定方案
- 记录所有操作:每次操作都要记录到日志
- 渐进式处理:从最安全的方式开始,逐步升级处理手段
你可以使用以下工具来完成任务:
{tool_descriptions}
当前系统信息:
{system_info}
"""
def __init__(self, config: dict):
self.config = config
self.llm = ChatOpenAI(
model=config["llm"]["model"],
temperature=config["llm"]["temperature"],
max_tokens=config["llm"].get("max_tokens", 4096),
)
self.tools = self._init_tools()
self.conversation_history = []
self.operation_log = []
logger.info("OpsAgent 初始化完成")
def _init_tools(self) -> dict[str, BaseTool]:
tools = {
"system_monitor": SystemMonitorTool(),
"log_analyzer": LogAnalyzerTool(),
"service_manager": ServiceManagerTool(),
"disk_manager": DiskManagerTool(),
"network_diagnostic": NetworkDiagnosticTool(),
}
return tools
def _get_system_info(self) -> str:
import psutil
import platform
info = {
"hostname": platform.node(),
"os": f"{platform.system()} {platform.release()}",
"cpu_count": psutil.cpu_count(),
"cpu_percent": psutil.cpu_percent(interval=1),
"memory_total_gb": round(psutil.virtual_memory().total / 1024**3, 1),
"memory_percent": psutil.virtual_memory().percent,
"disk_percent": psutil.disk_usage("/").percent,
}
return json.dumps(info, ensure_ascii=False, indent=2)
def _build_tool_descriptions(self) -> str:
descriptions = []
for name, tool in self.tools.items():
descriptions.append(f"- {name}: {tool.description}")
return "\\n".join(descriptions)
def chat(self, user_input: str) -> str:
system_info = self._get_system_info()
system_prompt = self.SYSTEM_PROMPT.format(
tool_descriptions=self._build_tool_descriptions(),
system_info=system_info,
)
self.conversation_history.append(HumanMessage(content=user_input))
messages = [SystemMessage(content=system_prompt)]
messages.extend(self.conversation_history)
response = self.llm.invoke(messages)
self.conversation_history.append(AIMessage(content=response.content))
self._log_operation(user_input, response.content)
return response.content
def _log_operation(self, query: str, response: str):
log_entry = {
"timestamp": datetime.now().isoformat(),
"query": query,
"response": response[:500],
}
self.operation_log.append(log_entry)
logger.info(f"操作记录: {query[:50]}...")3.3 系统监控工具
python
# tools/system_monitor.py
"""系统监控工具"""
import psutil
import json
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
class SystemMonitorInput(BaseModel):
metric: str = Field(default="all",
description="监控指标: cpu, memory, disk, network, process, all")
top_n: int = Field(default=10, description="返回前N个结果")
class SystemMonitorTool(BaseTool):
name: str = "system_monitor"
description: str = (
"监控系统状态。支持查看 CPU 使用率、内存使用情况、磁盘空间、"
"网络流量、进程列表等。"
)
args_schema: type[BaseModel] = SystemMonitorInput
def _run(self, metric: str = "all", top_n: int = 10) -> str:
result = {}
if metric in ("all", "cpu"):
result["cpu"] = self._get_cpu_info()
if metric in ("all", "memory"):
result["memory"] = self._get_memory_info()
if metric in ("all", "disk"):
result["disk"] = self._get_disk_info()
if metric in ("all", "network"):
result["network"] = self._get_network_info()
if metric in ("all", "process"):
result["top_processes"] = self._get_top_processes(top_n)
return json.dumps(result, ensure_ascii=False, indent=2)
def _get_cpu_info(self) -> dict:
return {
"overall_percent": psutil.cpu_percent(interval=1),
"cpu_count_physical": psutil.cpu_count(logical=False),
"cpu_count_logical": psutil.cpu_count(logical=True),
"load_avg": list(psutil.getloadavg()),
}
def _get_memory_info(self) -> dict:
mem = psutil.virtual_memory()
return {
"total_gb": round(mem.total / 1024**3, 2),
"used_gb": round(mem.used / 1024**3, 2),
"available_gb": round(mem.available / 1024**3, 2),
"percent": mem.percent,
}
def _get_disk_info(self) -> dict:
disks = []
for p in psutil.disk_partitions():
try:
u = psutil.disk_usage(p.mountpoint)
disks.append({
"mountpoint": p.mountpoint,
"total_gb": round(u.total / 1024**3, 2),
"used_gb": round(u.used / 1024**3, 2),
"percent": u.percent,
})
except PermissionError:
continue
return {"partitions": disks}
def _get_top_processes(self, n: int) -> list:
procs = []
for p in psutil.process_iter(["pid","name","cpu_percent","memory_percent"]):
try: procs.append(p.info)
except: pass
procs.sort(key=lambda x: x.get("cpu_percent",0) or 0, reverse=True)
return procs[:n]3.4 日志分析工具
python
# tools/log_analyzer.py
"""日志分析工具"""
import re, os, gzip
from collections import Counter
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
class LogAnalyzerInput(BaseModel):
log_path: str = Field(description="日志文件路径")
pattern: str = Field(default="", description="搜索模式(正则表达式)")
tail: int = Field(default=100, description="读取最后N行")
level: str = Field(default="", description="过滤日志级别")
class LogAnalyzerTool(BaseTool):
name: str = "log_analyzer"
description: str = "分析日志文件。支持按关键词搜索、按日志级别过滤。"
args_schema: type[BaseModel] = LogAnalyzerInput
def _run(self, log_path: str, pattern: str = "",
tail: int = 100, level: str = "") -> str:
if not os.path.exists(log_path):
return f"错误: 日志文件不存在: {log_path}"
lines = self._read_log(log_path, tail)
if level:
lines = [l for l in lines if level.upper() in l.upper()]
if pattern:
try:
regex = re.compile(pattern, re.IGNORECASE)
lines = [l for l in lines if regex.search(l)]
except re.error:
lines = [l for l in lines if pattern.lower() in l.lower()]
analysis = {
"file": log_path,
"total_lines": len(lines),
"error_count": sum(1 for l in lines if "ERROR" in l.upper()),
"warn_count": sum(1 for l in lines if "WARN" in l.upper()),
}
return json.dumps(analysis, ensure_ascii=False, indent=2)
def _read_log(self, path: str, tail: int) -> list:
opener = gzip.open if path.endswith(".gz") else open
mode = "rt" if path.endswith(".gz") else "r"
with opener(path, mode, encoding="utf-8", errors="ignore") as f:
lines = f.readlines()
return [l.strip() for l in lines[-tail:] if l.strip()]3.5 服务管理工具
python
# tools/service_manager.py
"""服务管理工具"""
import subprocess
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
class ServiceManagerInput(BaseModel):
action: str = Field(description="操作: status, start, stop, restart, list")
service_name: str = Field(default="", description="服务名称")
class ServiceManagerTool(BaseTool):
name: str = "service_manager"
description: str = "管理系统服务。支持查看状态、启动/停止/重启服务。"
args_schema: type[BaseModel] = ServiceManagerInput
SAFE_SERVICES = {"nginx","docker","redis","postgresql","mysql","supervisord"}
def _run(self, action: str, service_name: str = "") -> str:
action = action.lower().strip()
if action == "list":
result = subprocess.run(
"systemctl list-units --type=service --state=running --no-pager",
shell=True, capture_output=True, text=True, timeout=15)
return result.stdout.strip()
if not service_name:
return "错误: 请指定服务名称"
if action in ("stop","restart") and service_name not in self.SAFE_SERVICES:
return f"安全警告: 服务不在安全列表中,请手动执行。"
cmd = f"sudo systemctl {action} {service_name}"
try:
result = subprocess.run(cmd, shell=True,
capture_output=True, text=True, timeout=30)
return result.stdout.strip() or f"操作执行成功"
except subprocess.TimeoutExpired:
return "错误: 命令执行超时"主程序入口
python
# main.py
"""AI 运维 Agent 主程序"""
import yaml, logging, typer
from rich.console import Console
from rich.panel import Panel
from agent.core import OpsAgent
app = typer.Typer(help="AI 运维助手")
console = Console()
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.FileHandler("ops_agent.log"), logging.StreamHandler()])
def load_config(path="config.yaml"):
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
@app.command()
def chat():
config = load_config()
agent = OpsAgent(config)
console.print(Panel(
"[bold cyan]AI 运维助手[/bold cyan]\n"
"输入自然语言描述你的运维需求,输入 quit 退出。\n"
"示例:\n - 检查系统状态\n - 分析日志中的错误\n - 重启 nginx",
title="投肯智能 OpsAgent v1.0", border_style="cyan"))
while True:
try:
user_input = console.input("\n[bold green]你: [/bold green]").strip()
except (EOFError, KeyboardDM Sansrupt):
break
if not user_input or user_input.lower() in ("quit","exit","q"):
break
with console.status("[cyan]思考中...[/cyan]"):
response = agent.chat(user_input)
console.print(Panel(response, title="[bold cyan]AI 助手[/bold cyan]",
border_style="cyan"))
if __name__ == "__main__":
app()Docker 部署
dockerfile
FROM python:3.12-slim
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
systemd procps net-tools iputils-ping curl \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
ENV OPENAI_API_KEY=""
ENV PYTHONPATH=/app
ENTRYPOINT ["python", "main.py"]
CMD ["chat"]bash
# 构建并运行
docker build -t ops-agent:latest .
docker run -it --rm \
-e OPENAI_API_KEY="sk-your-key-here" \
-v /var/log:/host-logs:ro \
--pid=host \
ops-agent:latest chat常见问题
Q1: Agent 执行命令时权限不足
bash
# 将运行用户添加到 sudoers
echo "your_user ALL=(ALL) NOPASSWD: /usr/bin/systemctl" | sudo tee /etc/sudoers.d/ops-agent
# 或使用 Docker 特权模式
docker run --privileged ...Q2: 如何扩展更多工具
python
# tools/custom_tool.py
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
class MyCustomTool(BaseTool):
name = "my_custom_tool"
description = "自定义工具描述"
def _run(self, param: str) -> str:
return f"结果: {param}"
# 在 agent/core.py 中注册
# self.tools["my_custom_tool"] = MyCustomTool()Q3: 如何降低 API 调用成本
- 使用 GPT-3.5-turbo 处理简单子任务,GPT-4o 处理复杂决策
- 缓存常见任务的输出结果
- 限制每个 Agent 的 max_tokens
- 使用 Prompt 模板减少重复描述
总结
本教程完整介绍了如何构建一个 AI 运维 Agent,包括:
- 架构设计:分层架构,工具化设计
- 核心实现:基于 LangChain 的 Agent 框架
- 工具开发:系统监控、日志分析、服务管理
- 安全机制:危险操作确认、命令白名单
- Docker 部署:容器化部署方案
建议进一步学习:
如有任何问题,欢迎通过微信 toukenai 联系我们。