导读:经过前三篇的学习,你已经掌握了Agent开发的基础。本文将带你进入进阶领域,学习如何构建高效、可靠、可扩展的生产级Agent系统。


系列文章导航

  1. AI智能体开发(一):从概念到架构设计
  2. AI智能体开发(二):技术栈选择与工具集成
  3. AI智能体开发(三):实战构建研究助手Agent
  4. AI智能体开发(四):进阶技巧与性能优化

多Agent协作模式

顺序协作模式

场景:任务需要按固定顺序执行,每个Agent负责一个环节

示例:内容创作流水线

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from crewai import Agent, Task, Crew

# 定义角色
researcher = Agent(
role='研究员',
goal='收集和分析信息',
backstory='擅长快速定位高质量资料',
verbose=True
)

writer = Agent(
role='作家',
goal='基于研究撰写文章',
backstory='资深科技作家,文风生动',
verbose=True
)

editor = Agent(
role='编辑',
goal='审查和优化文章质量',
backstory='严格的编辑,注重细节',
verbose=True
)

# 定义任务(顺序执行)
task1 = Task(
description='调研{topic}的最新进展',
agent=researcher,
expected_output='详细的研究笔记'
)

task2 = Task(
description='基于研究笔记撰写文章',
agent=writer,
expected_output='结构完整的文章草稿',
context=[task1] # 依赖task1的输出
)

task3 = Task(
description='审查并优化文章',
agent=editor,
expected_output='最终版本的文章',
context=[task2] # 依赖task2的输出
)

# 创建团队
crew = Crew(
agents=[researcher, writer, editor],
tasks=[task1, task2, task3],
verbose=2
)

# 执行
result = crew.kickoff(inputs={"topic": "量子计算"})

优点

  • 流程清晰,易于调试
  • 每个环节可独立优化
  • 适合标准化工作流

缺点

  • 串行执行,速度较慢
  • 某个环节失败会影响整个流程

并行协作模式

场景:多个子任务可以独立执行,最后汇总结果

示例:多角度分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import asyncio
from concurrent.futures import ThreadPoolExecutor

class ParallelAnalysisAgent:
"""并行分析Agent"""

def __init__(self):
self.llm = ChatOpenAI(model="gpt-4")

async def analyze_from_perspective(self, topic: str, perspective: str) -> str:
"""从特定角度分析主题"""
prompt = f"""
请从{perspective}的角度分析:{topic}

要求:
1. 列出3-5个关键观点
2. 提供具体案例或数据支持
3. 指出该角度的局限性
"""

response = await self.llm.ainvoke(prompt)
return response.content

async def comprehensive_analysis(self, topic: str) -> str:
"""综合分析"""
perspectives = [
"技术可行性",
"商业价值",
"社会影响",
"伦理考量",
"未来趋势"
]

# 并行执行多个分析任务
tasks = [
self.analyze_from_perspective(topic, p)
for p in perspectives
]

results = await asyncio.gather(*tasks)

# 汇总结果
summary_prompt = f"""
请综合以下不同角度的分析,生成一份全面的分析报告:

{''.join([f'\n\n{p}: {r}' for p, r in zip(perspectives, results)])}

要求:
1. 整合各角度的核心观点
2. 指出共识和分歧
3. 给出综合判断和建议
"""

final_response = await self.llm.ainvoke(summary_prompt)
return final_response.content

# 使用
agent = ParallelAnalysisAgent()
result = asyncio.run(agent.comprehensive_analysis("AI在医疗中的应用"))

优点

  • 速度快,充分利用资源
  • 多角度分析更全面
  • 某个任务失败不影响其他任务

缺点

  • 需要处理并发和同步
  • 结果整合可能复杂

层次化协作模式

场景:有一个Manager Agent负责任务分解和协调,Worker Agents负责执行

示例:项目管理Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from crewai import Agent, Task, Crew, Process

# Manager Agent
manager = Agent(
role='项目经理',
goal='规划和协调整个项目',
backstory='经验丰富的项目经理,善于任务分解和资源分配',
verbose=True,
allow_delegation=True # 允许委派任务
)

# Worker Agents
developer = Agent(
role='开发工程师',
goal='实现功能模块',
backstory='全栈工程师,技术全面',
verbose=True
)

designer = Agent(
role='UI设计师',
goal='设计用户界面',
backstory='资深设计师,注重用户体验',
verbose=True
)

tester = Agent(
role='测试工程师',
goal='确保产品质量',
backstory='细致的测试工程师,善于发现bug',
verbose=True
)

# 任务(层次化)
project_task = Task(
description='完成{project_name}项目',
agent=manager,
expected_output='完整的项目交付物'
)

# 创建团队(使用层次化流程)
crew = Crew(
agents=[manager, developer, designer, tester],
tasks=[project_task],
process=Process.hierarchical, # 层次化流程
manager_agent=manager, # 指定Manager
verbose=2
)

result = crew.kickoff(inputs={"project_name": "电商网站"})

优点

  • 灵活性强,动态调整
  • Manager可以智能分配任务
  • 适合复杂、不确定性高的项目

缺点

  • Manager的决策质量很关键
  • 可能出现通信开销

辩论式协作模式

场景:多个Agent从不同角度提出观点,通过辩论达成共识

示例:决策支持系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
class DebateAgent:
"""辩论式Agent"""

def __init__(self):
self.llm = ChatOpenAI(model="gpt-4")

def debate(self, topic: str, rounds: int = 3) -> str:
"""进行多轮辩论"""

# 初始化两个对立观点
pro_argument = self.generate_argument(topic, "support")
con_argument = self.generate_argument(topic, "oppose")

print(f"正方观点: {pro_argument}\n")
print(f"反方观点: {con_argument}\n")

# 多轮辩论
for i in range(rounds):
print(f"\n=== 第 {i+1} 轮辩论 ===\n")

# 正方反驳
pro_rebuttal = self.rebut(pro_argument, con_argument, "support")
print(f"正方反驳: {pro_rebuttal}\n")

# 反方反驳
con_rebuttal = self.rebut(con_argument, pro_argument, "oppose")
print(f"反方反驳: {con_rebuttal}\n")

pro_argument = pro_rebuttal
con_argument = con_rebuttal

# 总结辩论,得出结论
conclusion = self.summarize_debate(topic, pro_argument, con_argument)
return conclusion

def generate_argument(self, topic: str, stance: str) -> str:
"""生成论点"""
stance_text = "支持" if stance == "support" else "反对"
prompt = f"""
请从{stance_text}的立场,就以下议题提出论点:

议题:{topic}

要求:
1. 提出3个主要论据
2. 每个论据要有逻辑支撑
3. 语言有说服力
"""

response = self.llm.invoke(prompt)
return response.content

def rebut(self, own_argument: str, opponent_argument: str, stance: str) -> str:
"""反驳对方观点"""
stance_text = "正方" if stance == "support" else "反方"
prompt = f"""
你是{stance_text}。请针对对方的观点进行反驳,并强化自己的立场。

你的原观点:
{own_argument}

对方观点:
{opponent_argument}

要求:
1. 指出对方观点的漏洞
2. 强化自己的论据
3. 回应可能的质疑
"""

response = self.llm.invoke(prompt)
return response.content

def summarize_debate(self, topic: str, pro: str, con: str) -> str:
"""总结辩论"""
prompt = f"""
请总结以下关于"{topic}"的辩论,并给出平衡的结论。

正方观点:
{pro}

反方观点:
{con}

要求:
1. 概括双方的核心论点
2. 指出共识和分歧
3. 给出平衡的建议或结论
4. 保持客观中立
"""

response = self.llm.invoke(prompt)
return response.content

# 使用
debater = DebateAgent()
conclusion = debater.debate("是否应该全面禁止AI deepfake技术")
print(f"\n最终结论:\n{conclusion}")

优点

  • 避免单一视角的偏见
  • 通过辩论发现盲点
  • 结论更加全面和平衡

缺点

  • 耗时较长
  • 需要精心设计辩论规则

性能优化策略

并行化工具调用

问题:Agent顺序调用工具,速度慢

解决方案:并行执行独立的工具调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed

class ParallelToolExecutor:
"""并行工具执行器"""

def __init__(self, max_workers: int = 5):
self.executor = ThreadPoolExecutor(max_workers=max_workers)

def execute_parallel(self, tool_calls: list) -> list:
"""
并行执行多个工具调用

Args:
tool_calls: [(tool_func, args, kwargs), ...]

Returns:
[results, ...]
"""
futures = []

for func, args, kwargs in tool_calls:
future = self.executor.submit(func, *args, **kwargs)
futures.append(future)

results = []
for future in as_completed(futures):
try:
result = future.result(timeout=30)
results.append(result)
except Exception as e:
print(f"工具调用失败: {e}")
results.append(None)

return results

# 使用示例
executor = ParallelToolExecutor(max_workers=5)

# 并行搜索多个关键词
tool_calls = [
(search_papers, ("machine learning",), {}),
(search_papers, ("deep learning",), {}),
(search_papers, ("neural networks",), {}),
(search_papers, ("transformer",), {}),
(search_papers, ("attention mechanism",), {}),
]

results = executor.execute_parallel(tool_calls)

性能提升

  • 串行执行:5次 × 3秒 = 15秒
  • 并行执行:max(3秒) = 3秒
  • 提升5倍!

流式响应

问题:用户需要等待完整响应,体验差

解决方案:使用流式输出,实时显示进度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

# 方法1:LangChain内置流式回调
llm_streaming = ChatOpenAI(
model="gpt-4",
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()]
)

# 方法2:自定义流式回调
class ProgressCallback(BaseCallbackHandler):
"""进度回调"""

def on_llm_start(self, serialized, prompts, **kwargs):
print("\n🤖 Agent正在思考...")

def on_llm_new_token(self, token: str, **kwargs):
# 实时显示生成的token
print(token, end="", flush=True)

def on_tool_start(self, serialized, input_str, **kwargs):
print(f"\n🔧 调用工具: {serialized['name']}")

def on_tool_end(self, output: str, **kwargs):
print(f"\n- 工具调用完成")

def on_chain_end(self, outputs, **kwargs):
print("\n✨ 任务完成!")

# 使用
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
callbacks=[ProgressCallback()],
verbose=False # 关闭默认verbose,使用自定义回调
)

result = agent_executor.invoke({"input": query})

缓存机制

问题:重复的LLM调用浪费Token和时间

解决方案:多级缓存策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import hashlib
import json
from functools import lru_cache
from diskcache import Cache

class MultiLevelCache:
"""多级缓存"""

def __init__(self, cache_dir="./cache"):
# L1: 内存缓存(最快,容量小)
self.memory_cache = {}

# L2: 磁盘缓存(较快,容量大)
self.disk_cache = Cache(cache_dir)

# 统计
self.stats = {
'hits': 0,
'misses': 0
}

def get(self, key: str) -> any:
"""获取缓存"""
# L1缓存
if key in self.memory_cache:
self.stats['hits'] += 1
return self.memory_cache[key]

# L2缓存
if key in self.disk_cache:
self.stats['hits'] += 1
value = self.disk_cache[key]
self.memory_cache[key] = value # 提升到L1
return value

self.stats['misses'] += 1
return None

def set(self, key: str, value: any, ttl: int = 3600):
"""设置缓存"""
self.memory_cache[key] = value
self.disk_cache.set(key, value, expire=ttl)

def generate_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
"""生成缓存键"""
data = {
'func': func_name,
'args': args,
'kwargs': kwargs
}
hash_str = hashlib.md5(json.dumps(data, sort_keys=True).encode()).hexdigest()
return f"{func_name}:{hash_str}"

def get_stats(self) -> dict:
"""获取缓存统计"""
total = self.stats['hits'] + self.stats['misses']
hit_rate = self.stats['hits'] / total if total > 0 else 0

return {
**self.stats,
'hit_rate': f"{hit_rate:.2%}",
'memory_size': len(self.memory_cache),
'disk_size': len(self.disk_cache)
}

# 使用示例
cache = MultiLevelCache()

def cached_llm_call(prompt: str) -> str:
"""带缓存的LLM调用"""
key = cache.generate_key("llm_call", (prompt,), {})

# 检查缓存
cached_result = cache.get(key)
if cached_result:
print("💾 使用缓存结果")
return cached_result

# 调用LLM
print("🤖 调用LLM")
response = llm.invoke(prompt)
result = response.content

# 保存缓存(TTL 1小时)
cache.set(key, result, ttl=3600)

return result

# 查看缓存统计
print(cache.get_stats())

缓存策略建议

  • 短期缓存(5-15分钟):搜索结果、API调用
  • 中期缓存(1-24小时):论文总结、数据分析
  • 长期缓存(7-30天):常见问题答案、模板内容

模型路由

问题:所有任务都用最贵的模型,成本高

解决方案:根据任务复杂度选择合适的模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class ModelRouter:
"""模型路由器"""

def __init__(self):
self.models = {
'fast': ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7),
'balanced': ChatOpenAI(model="gpt-4", temperature=0.7),
'premium': ChatOpenAI(model="gpt-4-turbo", temperature=0.7)
}

# 成本(每1K tokens)
self.costs = {
'fast': 0.002,
'balanced': 0.03,
'premium': 0.01
}

def route(self, task_description: str, complexity: str = None) -> ChatOpenAI:
"""
根据任务复杂度选择模型

Args:
task_description: 任务描述
complexity: 手动指定复杂度(可选)

Returns:
合适的LLM实例
"""
if complexity:
return self.models[complexity]

# 自动判断复杂度
complexity = self.assess_complexity(task_description)

print(f"📊 任务复杂度: {complexity}")
print(f"💰 预估成本: ${self.costs[complexity]}/1K tokens")

return self.models[complexity]

def assess_complexity(self, task: str) -> str:
"""评估任务复杂度"""

# 简单任务特征
simple_keywords = [
'翻译', '总结', '分类', '提取', '格式化',
'translate', 'summarize', 'classify'
]

# 复杂任务特征
complex_keywords = [
'分析', '推理', '规划', '设计', '优化',
'analyze', 'reason', 'plan', 'design', 'optimize'
]

task_lower = task.lower()

# 计分
simple_score = sum(1 for kw in simple_keywords if kw in task_lower)
complex_score = sum(1 for kw in complex_keywords if kw in task_lower)

# 长度因素
length_factor = len(task) / 100

total_score = complex_score - simple_score + length_factor

if total_score < 1:
return 'fast'
elif total_score < 3:
return 'balanced'
else:
return 'premium'

# 使用
router = ModelRouter()

# 简单任务 - 使用快速模型
llm_fast = router.route("将这段文字翻译成英文")
result = llm_fast.invoke("你好世界")

# 中等任务 - 使用平衡模型
llm_balanced = router.route("分析这篇文章的主要观点")

# 复杂任务 - 使用高级模型
llm_premium = router.route("设计一个完整的AI系统架构,考虑性能、成本、可扩展性")

成本节省

  • 假设60%任务用fast模型,30%用balanced,10%用premium
  • 相比全部用premium:节省约70%成本

批量处理

问题:逐个处理大量任务效率低

解决方案:批量处理和异步执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import asyncio
from typing import List

class BatchProcessor:
"""批量处理器"""

def __init__(self, batch_size: int = 10, max_concurrent: int = 5):
self.batch_size = batch_size
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)

async def process_batch(self, items: List[str], process_func) -> List:
"""
批量处理项目

Args:
items: 待处理的项目列表
process_func: 处理函数(异步)

Returns:
处理结果列表
"""
results = []

# 分批处理
for i in range(0, len(items), self.batch_size):
batch = items[i:i + self.batch_size]
print(f"\n处理批次 {i//self.batch_size + 1}/{(len(items)-1)//self.batch_size + 1}")

# 并发处理批次内的项目
batch_tasks = [
self._process_with_semaphore(item, process_func)
for item in batch
]

batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)

# 处理异常
for result in batch_results:
if isinstance(result, Exception):
print(f"- 处理失败: {result}")
results.append(None)
else:
results.append(result)

# 批次间短暂休息,避免速率限制
await asyncio.sleep(1)

return results

async def _process_with_semaphore(self, item, process_func):
"""带信号量的处理(控制并发数)"""
async with self.semaphore:
return await process_func(item)

# 使用示例
async def summarize_paper(paper_url: str) -> str:
"""总结单篇论文"""
# ... 处理逻辑
pass

processor = BatchProcessor(batch_size=10, max_concurrent=5)

# 批量处理100篇论文
paper_urls = [...] # 100个URL
results = await processor.process_batch(paper_urls, summarize_paper)

人类介入(Human-in-the-Loop)

在某些场景下,需要人类审核Agent的决策或操作。

关键操作审批

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class HumanApprovalSystem:
"""人类审批系统"""

def __init__(self):
self.approval_required_actions = [
'send_email',
'delete_file',
'modify_database',
'make_payment'
]

def requires_approval(self, action: str) -> bool:
"""检查是否需要审批"""
return action in self.approval_required_actions

def request_approval(self, action: str, details: dict) -> bool:
"""请求人类审批"""
print("\n" + "="*60)
print("⚠️ 需要您的批准")
print("="*60)
print(f"操作类型: {action}")
print(f"详细信息:")
for key, value in details.items():
print(f" {key}: {value}")
print("="*60)

# 交互式确认
while True:
response = input("是否继续?(y/n/详情): ").lower().strip()

if response == 'y':
print("- 已批准")
return True
elif response == 'n':
print("- 已拒绝")
return False
elif response == '详情':
print("\n更多详细信息...")
# 显示更多信息
else:
print("请输入 y、n 或 详情")

# 在Agent中使用
approval_system = HumanApprovalSystem()

def send_email_with_approval(recipient: str, subject: str, body: str) -> str:
"""带审批的邮件发送"""
if approval_system.requires_approval('send_email'):
approved = approval_system.request_approval(
'send_email',
{
'收件人': recipient,
'主题': subject,
'正文预览': body[:100] + '...'
}
)

if not approved:
return "操作已被用户取消"

# 执行发送邮件
# ...
return "邮件已发送"

不确定时询问用户

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class ClarificationSystem:
"""澄清系统"""

def __init__(self, confidence_threshold: float = 0.8):
self.confidence_threshold = confidence_threshold

def needs_clarification(self, confidence: float) -> bool:
"""判断是否需要澄清"""
return confidence < self.confidence_threshold

def ask_user(self, question: str, options: List[str] = None) -> str:
"""向用户提问"""
print(f"\n❓ {question}")

if options:
for i, option in enumerate(options, 1):
print(f" {i}. {option}")

return input("您的回答: ")

# 在Agent中使用
clarifier = ClarificationSystem(confidence_threshold=0.8)

def ambiguous_query_handler(query: str, confidence: float) -> str:
"""处理模糊查询"""
if clarifier.needs_clarification(confidence):
clarification = clarifier.ask_user(
"您的问题有些模糊,请问您是想:",
["了解概念", "获取教程", "查找工具", "其他"]
)

# 根据用户回答调整查询
query = f"{query} - {clarification}"

# 继续处理
return process_query(query)

反馈学习

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class FeedbackLearningSystem:
"""反馈学习系统"""

def __init__(self):
self.feedback_db = {} # 简化的反馈数据库

def collect_feedback(self, task_id: str, rating: int, comments: str = ""):
"""收集用户反馈"""
self.feedback_db[task_id] = {
'rating': rating, # 1-5星
'comments': comments,
'timestamp': datetime.now()
}

print("感谢您的反馈!")

def learn_from_feedback(self):
"""从反馈中学习(简化版)"""
low_rated_tasks = [
tid for tid, data in self.feedback_db.items()
if data['rating'] <= 2
]

if low_rated_tasks:
print(f"\n发现 {len(low_rated_tasks)} 个低评分任务")
print("建议优化以下方面:")

# 分析常见问题
# ...

def get_performance_report(self) -> dict:
"""获取性能报告"""
if not self.feedback_db:
return {"message": "暂无反馈数据"}

ratings = [data['rating'] for data in self.feedback_db.values()]
avg_rating = sum(ratings) / len(ratings)

return {
'total_feedback': len(self.feedback_db),
'average_rating': f"{avg_rating:.2f}/5.0",
'distribution': {
'5星': ratings.count(5),
'4星': ratings.count(4),
'3星': ratings.count(3),
'2星': ratings.count(2),
'1星': ratings.count(1)
}
}

# 使用
feedback_system = FeedbackLearningSystem()

# 任务完成后收集反馈
task_id = "task_001"
rating = int(input("请为本次服务评分(1-5): "))
comments = input("有什么建议吗?(可选): ")

feedback_system.collect_feedback(task_id, rating, comments)

# 查看性能报告
print(feedback_system.get_performance_report())

监控与日志

完整的日志系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import logging
from logging.handlers import RotatingFileHandler

class AgentLogger:
"""Agent日志系统"""

def __init__(self, log_dir="./logs"):
os.makedirs(log_dir, exist_ok=True)

# 创建logger
self.logger = logging.getLogger("AgentLogger")
self.logger.setLevel(logging.DEBUG)

# 文件handler(轮转,最大10MB,保留5个备份)
file_handler = RotatingFileHandler(
f"{log_dir}/agent.log",
maxBytes=10*1024*1024,
backupCount=5
)
file_handler.setLevel(logging.DEBUG)

# 控制台handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)

# 格式化
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)

self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)

def log_agent_action(self, action: str, details: dict):
"""记录Agent行动"""
self.logger.info(f"AGENT_ACTION: {action} | {details}")

def log_tool_call(self, tool_name: str, input_data: any, output_data: any):
"""记录工具调用"""
self.logger.debug(f"TOOL_CALL: {tool_name} | Input: {input_data} | Output: {output_data}")

def log_error(self, error: Exception, context: dict):
"""记录错误"""
self.logger.error(f"ERROR: {str(error)} | Context: {context}", exc_info=True)

def log_performance(self, metric: str, value: float, unit: str = ""):
"""记录性能指标"""
self.logger.info(f"PERFORMANCE: {metric} = {value}{unit}")

# 使用
logger = AgentLogger()

logger.log_agent_action("start_research", {"topic": "AI"})
logger.log_tool_call("search", "query", "results")
logger.log_performance("response_time", 2.5, "s")

性能监控仪表板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import time
from collections import defaultdict

class PerformanceMonitor:
"""性能监控器"""

def __init__(self):
self.metrics = defaultdict(list)
self.start_time = time.time()

def record_metric(self, name: str, value: float):
"""记录指标"""
self.metrics[name].append({
'value': value,
'timestamp': time.time()
})

def get_summary(self) -> dict:
"""获取性能摘要"""
summary = {}

for name, records in self.metrics.items():
values = [r['value'] for r in records]

summary[name] = {
'count': len(values),
'avg': sum(values) / len(values),
'min': min(values),
'max': max(values),
'total': sum(values)
}

# 运行时间
uptime = time.time() - self.start_time
summary['uptime'] = uptime

return summary

def export_to_json(self, filepath: str = "performance_report.json"):
"""导出性能报告"""
import json

report = {
'generated_at': datetime.now().isoformat(),
'metrics': self.get_summary()
}

with open(filepath, 'w') as f:
json.dump(report, f, indent=2)

print(f"📊 性能报告已导出至: {filepath}")

# 使用
monitor = PerformanceMonitor()

# 记录各种指标
start = time.time()
# ... 执行任务 ...
duration = time.time() - start
monitor.record_metric('task_duration', duration)

monitor.record_metric('tokens_used', 1500)
monitor.record_metric('cost', 0.045)

# 查看摘要
print(monitor.get_summary())

# 导出报告
monitor.export_to_json()

安全与隐私

API密钥管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from cryptography.fernet import Fernet
import base64

class SecureKeyManager:
"""安全的密钥管理器"""

def __init__(self, key_file=".secret.key"):
self.key_file = key_file
self.fernet = self._load_or_generate_key()

def _load_or_generate_key(self) -> Fernet:
"""加载或生成加密密钥"""
if os.path.exists(self.key_file):
with open(self.key_file, 'rb') as f:
key = f.read()
else:
key = Fernet.generate_key()
with open(self.key_file, 'wb') as f:
f.write(key)
os.chmod(self.key_file, 0o600) # 仅所有者可读写

return Fernet(key)

def encrypt(self, plaintext: str) -> str:
"""加密"""
encrypted = self.fernet.encrypt(plaintext.encode())
return base64.urlsafe_b64encode(encrypted).decode()

def decrypt(self, ciphertext: str) -> str:
"""解密"""
encrypted = base64.urlsafe_b64decode(ciphertext.encode())
return self.fernet.decrypt(encrypted).decode()

def store_api_key(self, service: str, api_key: str):
"""存储API密钥"""
encrypted_key = self.encrypt(api_key)

# 存储到配置文件
config = self._load_config()
config[service] = encrypted_key
self._save_config(config)

print(f"- {service} 的API密钥已安全存储")

def get_api_key(self, service: str) -> str:
"""获取API密钥"""
config = self._load_config()

if service not in config:
raise ValueError(f"未找到 {service} 的API密钥")

encrypted_key = config[service]
return self.decrypt(encrypted_key)

def _load_config(self) -> dict:
"""加载配置"""
config_file = ".api_keys.json"
if os.path.exists(config_file):
with open(config_file, 'r') as f:
return json.load(f)
return {}

def _save_config(self, config: dict):
"""保存配置"""
config_file = ".api_keys.json"
with open(config_file, 'w') as f:
json.dump(config, f, indent=2)
os.chmod(config_file, 0o600)

# 使用
key_manager = SecureKeyManager()

# 首次存储密钥
key_manager.store_api_key("openai", "sk-your-api-key")

# 使用时获取
api_key = key_manager.get_api_key("openai")
os.environ["OPENAI_API_KEY"] = api_key

敏感信息过滤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import re

class SensitiveDataFilter:
"""敏感数据过滤器"""

def __init__(self):
self.patterns = {
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'credit_card': r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'api_key': r'\bsk-[A-Za-z0-9]{20,}\b'
}

def filter_text(self, text: str) -> str:
"""过滤敏感信息"""
filtered = text

for data_type, pattern in self.patterns.items():
matches = re.finditer(pattern, filtered)

for match in matches:
original = match.group()
replacement = f"[{data_type.upper()}_REDACTED]"
filtered = filtered.replace(original, replacement)

return filtered

def safe_log(self, message: str):
"""安全日志(自动过滤)"""
filtered_message = self.filter_text(message)
logger.info(filtered_message)

# 使用
filter = SensitiveDataFilter()

# 过滤后记录日志
text = "联系邮箱: user@example.com, 电话: 123-456-7890"
filtered = filter.filter_text(text)
print(filtered) # "联系邮箱: [EMAIL_REDACTED], 电话: [PHONE_REDACTED]"

filter.safe_log(f"用户数据: {text}")

部署最佳实践

Docker化部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY . .

# 创建非root用户
RUN useradd -m appuser
USER appuser

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8000/health')" || exit 1

# 启动
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

环境变量配置

1
2
3
4
5
6
7
# .env.production
OPENAI_API_KEY=${VAULT_OPENAI_KEY}
DATABASE_URL=postgresql://user:pass@db:5432/agentdb
REDIS_URL=redis://redis:6379
LOG_LEVEL=WARNING
ENABLE_CACHE=true
MAX_CONCURRENT_REQUESTS=10

Kubernetes部署(简化版)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: agent-service
spec:
replicas: 3
selector:
matchLabels:
app: agent
template:
metadata:
labels:
app: agent
spec:
containers:
- name: agent
image: your-registry/agent:latest
ports:
- containerPort: 8000
envFrom:
- secretRef:
name: agent-secrets
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10

系列总结

恭喜!你已经完成了整个AI智能体开发系列的学习。让我们回顾一下:

第一篇:概念与架构

  • 理解Agent的定义和核心特征
  • 掌握四层架构设计
  • 了解关键组件(Planner、Memory、Tools、Decision Engine)

第二篇:技术栈与工具

  • 对比主流框架(LangChain、CrewAI、AutoGen、LlamaIndex)
  • 学习各类工具集成方法
  • 获得不同规模项目的技术栈推荐

第三篇:实战构建

  • 从零构建学术研究助手Agent
  • 实现论文搜索、阅读、总结、报告生成
  • 掌握模块化设计和工程实践

第四篇:进阶优化(本文)

  • 多Agent协作模式(顺序、并行、层次化、辩论式)
  • 性能优化策略(并行化、流式、缓存、模型路由、批量处理)
  • 人类介入机制(审批、澄清、反馈学习)
  • 监控与日志系统
  • 安全与隐私保护
  • 部署最佳实践

继续探索

完成本系列学习后,你可以:

动手实践:选择一个实际场景开始构建Agent,从简单的单Agent起步,逐步扩展到多Agent协作系统。

深入学习:关注Agent领域的最新研究进展,参与开源社区贡献,与全球开发者交流经验。

拓展方向:探索模型微调、多模态Agent、Agent自我进化等前沿技术,不断突破能力边界。


资源汇总

官方文档

优秀项目

社区资源

  • Reddit: r/LangChain, r/LocalLLaMA, r/ArtificialIntelligence
  • Discord: LangChain官方社区、CrewAI社区
  • GitHub: 关注 trending AI Agent repos

结语

AI智能体代表了AI应用的下一个演进方向。从被动的问答系统到主动的任务执行者,Agent正在改变我们与AI交互的方式,也将在未来几年重塑各行各业的工作流程。

希望这个系列能为你打开Agent开发的大门。记住,最好的学习方式就是动手实践。不要害怕犯错,每一个bug都是学习的机会。

期待看到你用Agent创造出令人惊叹的作品!