导读:经过前三篇的学习,你已经掌握了Agent开发的基础。本文将带你进入进阶领域,学习如何构建高效、可靠、可扩展的生产级Agent系统。
系列文章导航
- AI智能体开发(一):从概念到架构设计
- AI智能体开发(二):技术栈选择与工具集成
- AI智能体开发(三):实战构建研究助手Agent
- AI智能体开发(四):进阶技巧与性能优化
多Agent协作模式
顺序协作模式
场景:任务需要按固定顺序执行,每个Agent负责一个环节
示例:内容创作流水线
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| from crewai import Agent, Task, Crew
researcher = Agent( role='研究员', goal='收集和分析信息', backstory='擅长快速定位高质量资料', verbose=True )
writer = Agent( role='作家', goal='基于研究撰写文章', backstory='资深科技作家,文风生动', verbose=True )
editor = Agent( role='编辑', goal='审查和优化文章质量', backstory='严格的编辑,注重细节', verbose=True )
task1 = Task( description='调研{topic}的最新进展', agent=researcher, expected_output='详细的研究笔记' )
task2 = Task( description='基于研究笔记撰写文章', agent=writer, expected_output='结构完整的文章草稿', context=[task1] )
task3 = Task( description='审查并优化文章', agent=editor, expected_output='最终版本的文章', context=[task2] )
crew = Crew( agents=[researcher, writer, editor], tasks=[task1, task2, task3], verbose=2 )
result = crew.kickoff(inputs={"topic": "量子计算"})
|
优点:
- 流程清晰,易于调试
- 每个环节可独立优化
- 适合标准化工作流
缺点:
并行协作模式
场景:多个子任务可以独立执行,最后汇总结果
示例:多角度分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| import asyncio from concurrent.futures import ThreadPoolExecutor
class ParallelAnalysisAgent: """并行分析Agent""" def __init__(self): self.llm = ChatOpenAI(model="gpt-4") async def analyze_from_perspective(self, topic: str, perspective: str) -> str: """从特定角度分析主题""" prompt = f""" 请从{perspective}的角度分析:{topic} 要求: 1. 列出3-5个关键观点 2. 提供具体案例或数据支持 3. 指出该角度的局限性 """ response = await self.llm.ainvoke(prompt) return response.content async def comprehensive_analysis(self, topic: str) -> str: """综合分析""" perspectives = [ "技术可行性", "商业价值", "社会影响", "伦理考量", "未来趋势" ] tasks = [ self.analyze_from_perspective(topic, p) for p in perspectives ] results = await asyncio.gather(*tasks) summary_prompt = f""" 请综合以下不同角度的分析,生成一份全面的分析报告: {''.join([f'\n\n{p}: {r}' for p, r in zip(perspectives, results)])} 要求: 1. 整合各角度的核心观点 2. 指出共识和分歧 3. 给出综合判断和建议 """ final_response = await self.llm.ainvoke(summary_prompt) return final_response.content
agent = ParallelAnalysisAgent() result = asyncio.run(agent.comprehensive_analysis("AI在医疗中的应用"))
|
优点:
- 速度快,充分利用资源
- 多角度分析更全面
- 某个任务失败不影响其他任务
缺点:
层次化协作模式
场景:有一个Manager Agent负责任务分解和协调,Worker Agents负责执行
示例:项目管理Agent
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| from crewai import Agent, Task, Crew, Process
manager = Agent( role='项目经理', goal='规划和协调整个项目', backstory='经验丰富的项目经理,善于任务分解和资源分配', verbose=True, allow_delegation=True )
developer = Agent( role='开发工程师', goal='实现功能模块', backstory='全栈工程师,技术全面', verbose=True )
designer = Agent( role='UI设计师', goal='设计用户界面', backstory='资深设计师,注重用户体验', verbose=True )
tester = Agent( role='测试工程师', goal='确保产品质量', backstory='细致的测试工程师,善于发现bug', verbose=True )
project_task = Task( description='完成{project_name}项目', agent=manager, expected_output='完整的项目交付物' )
crew = Crew( agents=[manager, developer, designer, tester], tasks=[project_task], process=Process.hierarchical, manager_agent=manager, verbose=2 )
result = crew.kickoff(inputs={"project_name": "电商网站"})
|
优点:
- 灵活性强,动态调整
- Manager可以智能分配任务
- 适合复杂、不确定性高的项目
缺点:
辩论式协作模式
场景:多个Agent从不同角度提出观点,通过辩论达成共识
示例:决策支持系统
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
| class DebateAgent: """辩论式Agent""" def __init__(self): self.llm = ChatOpenAI(model="gpt-4") def debate(self, topic: str, rounds: int = 3) -> str: """进行多轮辩论""" pro_argument = self.generate_argument(topic, "support") con_argument = self.generate_argument(topic, "oppose") print(f"正方观点: {pro_argument}\n") print(f"反方观点: {con_argument}\n") for i in range(rounds): print(f"\n=== 第 {i+1} 轮辩论 ===\n") pro_rebuttal = self.rebut(pro_argument, con_argument, "support") print(f"正方反驳: {pro_rebuttal}\n") con_rebuttal = self.rebut(con_argument, pro_argument, "oppose") print(f"反方反驳: {con_rebuttal}\n") pro_argument = pro_rebuttal con_argument = con_rebuttal conclusion = self.summarize_debate(topic, pro_argument, con_argument) return conclusion def generate_argument(self, topic: str, stance: str) -> str: """生成论点""" stance_text = "支持" if stance == "support" else "反对" prompt = f""" 请从{stance_text}的立场,就以下议题提出论点: 议题:{topic} 要求: 1. 提出3个主要论据 2. 每个论据要有逻辑支撑 3. 语言有说服力 """ response = self.llm.invoke(prompt) return response.content def rebut(self, own_argument: str, opponent_argument: str, stance: str) -> str: """反驳对方观点""" stance_text = "正方" if stance == "support" else "反方" prompt = f""" 你是{stance_text}。请针对对方的观点进行反驳,并强化自己的立场。 你的原观点: {own_argument} 对方观点: {opponent_argument} 要求: 1. 指出对方观点的漏洞 2. 强化自己的论据 3. 回应可能的质疑 """ response = self.llm.invoke(prompt) return response.content def summarize_debate(self, topic: str, pro: str, con: str) -> str: """总结辩论""" prompt = f""" 请总结以下关于"{topic}"的辩论,并给出平衡的结论。 正方观点: {pro} 反方观点: {con} 要求: 1. 概括双方的核心论点 2. 指出共识和分歧 3. 给出平衡的建议或结论 4. 保持客观中立 """ response = self.llm.invoke(prompt) return response.content
debater = DebateAgent() conclusion = debater.debate("是否应该全面禁止AI deepfake技术") print(f"\n最终结论:\n{conclusion}")
|
优点:
- 避免单一视角的偏见
- 通过辩论发现盲点
- 结论更加全面和平衡
缺点:
性能优化策略
并行化工具调用
问题:Agent顺序调用工具,速度慢
解决方案:并行执行独立的工具调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| import asyncio from concurrent.futures import ThreadPoolExecutor, as_completed
class ParallelToolExecutor: """并行工具执行器""" def __init__(self, max_workers: int = 5): self.executor = ThreadPoolExecutor(max_workers=max_workers) def execute_parallel(self, tool_calls: list) -> list: """ 并行执行多个工具调用 Args: tool_calls: [(tool_func, args, kwargs), ...] Returns: [results, ...] """ futures = [] for func, args, kwargs in tool_calls: future = self.executor.submit(func, *args, **kwargs) futures.append(future) results = [] for future in as_completed(futures): try: result = future.result(timeout=30) results.append(result) except Exception as e: print(f"工具调用失败: {e}") results.append(None) return results
executor = ParallelToolExecutor(max_workers=5)
tool_calls = [ (search_papers, ("machine learning",), {}), (search_papers, ("deep learning",), {}), (search_papers, ("neural networks",), {}), (search_papers, ("transformer",), {}), (search_papers, ("attention mechanism",), {}), ]
results = executor.execute_parallel(tool_calls)
|
性能提升:
- 串行执行:5次 × 3秒 = 15秒
- 并行执行:max(3秒) = 3秒
- 提升5倍!
流式响应
问题:用户需要等待完整响应,体验差
解决方案:使用流式输出,实时显示进度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
llm_streaming = ChatOpenAI( model="gpt-4", streaming=True, callbacks=[StreamingStdOutCallbackHandler()] )
class ProgressCallback(BaseCallbackHandler): """进度回调""" def on_llm_start(self, serialized, prompts, **kwargs): print("\n🤖 Agent正在思考...") def on_llm_new_token(self, token: str, **kwargs): print(token, end="", flush=True) def on_tool_start(self, serialized, input_str, **kwargs): print(f"\n🔧 调用工具: {serialized['name']}") def on_tool_end(self, output: str, **kwargs): print(f"\n- 工具调用完成") def on_chain_end(self, outputs, **kwargs): print("\n✨ 任务完成!")
agent_executor = AgentExecutor( agent=agent, tools=tools, callbacks=[ProgressCallback()], verbose=False )
result = agent_executor.invoke({"input": query})
|
缓存机制
问题:重复的LLM调用浪费Token和时间
解决方案:多级缓存策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| import hashlib import json from functools import lru_cache from diskcache import Cache
class MultiLevelCache: """多级缓存""" def __init__(self, cache_dir="./cache"): self.memory_cache = {} self.disk_cache = Cache(cache_dir) self.stats = { 'hits': 0, 'misses': 0 } def get(self, key: str) -> any: """获取缓存""" if key in self.memory_cache: self.stats['hits'] += 1 return self.memory_cache[key] if key in self.disk_cache: self.stats['hits'] += 1 value = self.disk_cache[key] self.memory_cache[key] = value return value self.stats['misses'] += 1 return None def set(self, key: str, value: any, ttl: int = 3600): """设置缓存""" self.memory_cache[key] = value self.disk_cache.set(key, value, expire=ttl) def generate_key(self, func_name: str, args: tuple, kwargs: dict) -> str: """生成缓存键""" data = { 'func': func_name, 'args': args, 'kwargs': kwargs } hash_str = hashlib.md5(json.dumps(data, sort_keys=True).encode()).hexdigest() return f"{func_name}:{hash_str}" def get_stats(self) -> dict: """获取缓存统计""" total = self.stats['hits'] + self.stats['misses'] hit_rate = self.stats['hits'] / total if total > 0 else 0 return { **self.stats, 'hit_rate': f"{hit_rate:.2%}", 'memory_size': len(self.memory_cache), 'disk_size': len(self.disk_cache) }
cache = MultiLevelCache()
def cached_llm_call(prompt: str) -> str: """带缓存的LLM调用""" key = cache.generate_key("llm_call", (prompt,), {}) cached_result = cache.get(key) if cached_result: print("💾 使用缓存结果") return cached_result print("🤖 调用LLM") response = llm.invoke(prompt) result = response.content cache.set(key, result, ttl=3600) return result
print(cache.get_stats())
|
缓存策略建议:
- 短期缓存(5-15分钟):搜索结果、API调用
- 中期缓存(1-24小时):论文总结、数据分析
- 长期缓存(7-30天):常见问题答案、模板内容
模型路由
问题:所有任务都用最贵的模型,成本高
解决方案:根据任务复杂度选择合适的模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| class ModelRouter: """模型路由器""" def __init__(self): self.models = { 'fast': ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7), 'balanced': ChatOpenAI(model="gpt-4", temperature=0.7), 'premium': ChatOpenAI(model="gpt-4-turbo", temperature=0.7) } self.costs = { 'fast': 0.002, 'balanced': 0.03, 'premium': 0.01 } def route(self, task_description: str, complexity: str = None) -> ChatOpenAI: """ 根据任务复杂度选择模型 Args: task_description: 任务描述 complexity: 手动指定复杂度(可选) Returns: 合适的LLM实例 """ if complexity: return self.models[complexity] complexity = self.assess_complexity(task_description) print(f"📊 任务复杂度: {complexity}") print(f"💰 预估成本: ${self.costs[complexity]}/1K tokens") return self.models[complexity] def assess_complexity(self, task: str) -> str: """评估任务复杂度""" simple_keywords = [ '翻译', '总结', '分类', '提取', '格式化', 'translate', 'summarize', 'classify' ] complex_keywords = [ '分析', '推理', '规划', '设计', '优化', 'analyze', 'reason', 'plan', 'design', 'optimize' ] task_lower = task.lower() simple_score = sum(1 for kw in simple_keywords if kw in task_lower) complex_score = sum(1 for kw in complex_keywords if kw in task_lower) length_factor = len(task) / 100 total_score = complex_score - simple_score + length_factor if total_score < 1: return 'fast' elif total_score < 3: return 'balanced' else: return 'premium'
router = ModelRouter()
llm_fast = router.route("将这段文字翻译成英文") result = llm_fast.invoke("你好世界")
llm_balanced = router.route("分析这篇文章的主要观点")
llm_premium = router.route("设计一个完整的AI系统架构,考虑性能、成本、可扩展性")
|
成本节省:
- 假设60%任务用fast模型,30%用balanced,10%用premium
- 相比全部用premium:节省约70%成本
批量处理
问题:逐个处理大量任务效率低
解决方案:批量处理和异步执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| import asyncio from typing import List
class BatchProcessor: """批量处理器""" def __init__(self, batch_size: int = 10, max_concurrent: int = 5): self.batch_size = batch_size self.max_concurrent = max_concurrent self.semaphore = asyncio.Semaphore(max_concurrent) async def process_batch(self, items: List[str], process_func) -> List: """ 批量处理项目 Args: items: 待处理的项目列表 process_func: 处理函数(异步) Returns: 处理结果列表 """ results = [] for i in range(0, len(items), self.batch_size): batch = items[i:i + self.batch_size] print(f"\n处理批次 {i//self.batch_size + 1}/{(len(items)-1)//self.batch_size + 1}") batch_tasks = [ self._process_with_semaphore(item, process_func) for item in batch ] batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True) for result in batch_results: if isinstance(result, Exception): print(f"- 处理失败: {result}") results.append(None) else: results.append(result) await asyncio.sleep(1) return results async def _process_with_semaphore(self, item, process_func): """带信号量的处理(控制并发数)""" async with self.semaphore: return await process_func(item)
async def summarize_paper(paper_url: str) -> str: """总结单篇论文""" pass
processor = BatchProcessor(batch_size=10, max_concurrent=5)
paper_urls = [...] results = await processor.process_batch(paper_urls, summarize_paper)
|
人类介入(Human-in-the-Loop)
在某些场景下,需要人类审核Agent的决策或操作。
关键操作审批
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| class HumanApprovalSystem: """人类审批系统""" def __init__(self): self.approval_required_actions = [ 'send_email', 'delete_file', 'modify_database', 'make_payment' ] def requires_approval(self, action: str) -> bool: """检查是否需要审批""" return action in self.approval_required_actions def request_approval(self, action: str, details: dict) -> bool: """请求人类审批""" print("\n" + "="*60) print("⚠️ 需要您的批准") print("="*60) print(f"操作类型: {action}") print(f"详细信息:") for key, value in details.items(): print(f" {key}: {value}") print("="*60) while True: response = input("是否继续?(y/n/详情): ").lower().strip() if response == 'y': print("- 已批准") return True elif response == 'n': print("- 已拒绝") return False elif response == '详情': print("\n更多详细信息...") else: print("请输入 y、n 或 详情")
approval_system = HumanApprovalSystem()
def send_email_with_approval(recipient: str, subject: str, body: str) -> str: """带审批的邮件发送""" if approval_system.requires_approval('send_email'): approved = approval_system.request_approval( 'send_email', { '收件人': recipient, '主题': subject, '正文预览': body[:100] + '...' } ) if not approved: return "操作已被用户取消" return "邮件已发送"
|
不确定时询问用户
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| class ClarificationSystem: """澄清系统""" def __init__(self, confidence_threshold: float = 0.8): self.confidence_threshold = confidence_threshold def needs_clarification(self, confidence: float) -> bool: """判断是否需要澄清""" return confidence < self.confidence_threshold def ask_user(self, question: str, options: List[str] = None) -> str: """向用户提问""" print(f"\n❓ {question}") if options: for i, option in enumerate(options, 1): print(f" {i}. {option}") return input("您的回答: ")
clarifier = ClarificationSystem(confidence_threshold=0.8)
def ambiguous_query_handler(query: str, confidence: float) -> str: """处理模糊查询""" if clarifier.needs_clarification(confidence): clarification = clarifier.ask_user( "您的问题有些模糊,请问您是想:", ["了解概念", "获取教程", "查找工具", "其他"] ) query = f"{query} - {clarification}" return process_query(query)
|
反馈学习
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| class FeedbackLearningSystem: """反馈学习系统""" def __init__(self): self.feedback_db = {} def collect_feedback(self, task_id: str, rating: int, comments: str = ""): """收集用户反馈""" self.feedback_db[task_id] = { 'rating': rating, 'comments': comments, 'timestamp': datetime.now() } print("感谢您的反馈!") def learn_from_feedback(self): """从反馈中学习(简化版)""" low_rated_tasks = [ tid for tid, data in self.feedback_db.items() if data['rating'] <= 2 ] if low_rated_tasks: print(f"\n发现 {len(low_rated_tasks)} 个低评分任务") print("建议优化以下方面:") def get_performance_report(self) -> dict: """获取性能报告""" if not self.feedback_db: return {"message": "暂无反馈数据"} ratings = [data['rating'] for data in self.feedback_db.values()] avg_rating = sum(ratings) / len(ratings) return { 'total_feedback': len(self.feedback_db), 'average_rating': f"{avg_rating:.2f}/5.0", 'distribution': { '5星': ratings.count(5), '4星': ratings.count(4), '3星': ratings.count(3), '2星': ratings.count(2), '1星': ratings.count(1) } }
feedback_system = FeedbackLearningSystem()
task_id = "task_001" rating = int(input("请为本次服务评分(1-5): ")) comments = input("有什么建议吗?(可选): ")
feedback_system.collect_feedback(task_id, rating, comments)
print(feedback_system.get_performance_report())
|
监控与日志
完整的日志系统
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| import logging from logging.handlers import RotatingFileHandler
class AgentLogger: """Agent日志系统""" def __init__(self, log_dir="./logs"): os.makedirs(log_dir, exist_ok=True) self.logger = logging.getLogger("AgentLogger") self.logger.setLevel(logging.DEBUG) file_handler = RotatingFileHandler( f"{log_dir}/agent.log", maxBytes=10*1024*1024, backupCount=5 ) file_handler.setLevel(logging.DEBUG) console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) file_handler.setFormatter(formatter) console_handler.setFormatter(formatter) self.logger.addHandler(file_handler) self.logger.addHandler(console_handler) def log_agent_action(self, action: str, details: dict): """记录Agent行动""" self.logger.info(f"AGENT_ACTION: {action} | {details}") def log_tool_call(self, tool_name: str, input_data: any, output_data: any): """记录工具调用""" self.logger.debug(f"TOOL_CALL: {tool_name} | Input: {input_data} | Output: {output_data}") def log_error(self, error: Exception, context: dict): """记录错误""" self.logger.error(f"ERROR: {str(error)} | Context: {context}", exc_info=True) def log_performance(self, metric: str, value: float, unit: str = ""): """记录性能指标""" self.logger.info(f"PERFORMANCE: {metric} = {value}{unit}")
logger = AgentLogger()
logger.log_agent_action("start_research", {"topic": "AI"}) logger.log_tool_call("search", "query", "results") logger.log_performance("response_time", 2.5, "s")
|
性能监控仪表板
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| import time from collections import defaultdict
class PerformanceMonitor: """性能监控器""" def __init__(self): self.metrics = defaultdict(list) self.start_time = time.time() def record_metric(self, name: str, value: float): """记录指标""" self.metrics[name].append({ 'value': value, 'timestamp': time.time() }) def get_summary(self) -> dict: """获取性能摘要""" summary = {} for name, records in self.metrics.items(): values = [r['value'] for r in records] summary[name] = { 'count': len(values), 'avg': sum(values) / len(values), 'min': min(values), 'max': max(values), 'total': sum(values) } uptime = time.time() - self.start_time summary['uptime'] = uptime return summary def export_to_json(self, filepath: str = "performance_report.json"): """导出性能报告""" import json report = { 'generated_at': datetime.now().isoformat(), 'metrics': self.get_summary() } with open(filepath, 'w') as f: json.dump(report, f, indent=2) print(f"📊 性能报告已导出至: {filepath}")
monitor = PerformanceMonitor()
start = time.time()
duration = time.time() - start monitor.record_metric('task_duration', duration)
monitor.record_metric('tokens_used', 1500) monitor.record_metric('cost', 0.045)
print(monitor.get_summary())
monitor.export_to_json()
|
安全与隐私
API密钥管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| from cryptography.fernet import Fernet import base64
class SecureKeyManager: """安全的密钥管理器""" def __init__(self, key_file=".secret.key"): self.key_file = key_file self.fernet = self._load_or_generate_key() def _load_or_generate_key(self) -> Fernet: """加载或生成加密密钥""" if os.path.exists(self.key_file): with open(self.key_file, 'rb') as f: key = f.read() else: key = Fernet.generate_key() with open(self.key_file, 'wb') as f: f.write(key) os.chmod(self.key_file, 0o600) return Fernet(key) def encrypt(self, plaintext: str) -> str: """加密""" encrypted = self.fernet.encrypt(plaintext.encode()) return base64.urlsafe_b64encode(encrypted).decode() def decrypt(self, ciphertext: str) -> str: """解密""" encrypted = base64.urlsafe_b64decode(ciphertext.encode()) return self.fernet.decrypt(encrypted).decode() def store_api_key(self, service: str, api_key: str): """存储API密钥""" encrypted_key = self.encrypt(api_key) config = self._load_config() config[service] = encrypted_key self._save_config(config) print(f"- {service} 的API密钥已安全存储") def get_api_key(self, service: str) -> str: """获取API密钥""" config = self._load_config() if service not in config: raise ValueError(f"未找到 {service} 的API密钥") encrypted_key = config[service] return self.decrypt(encrypted_key) def _load_config(self) -> dict: """加载配置""" config_file = ".api_keys.json" if os.path.exists(config_file): with open(config_file, 'r') as f: return json.load(f) return {} def _save_config(self, config: dict): """保存配置""" config_file = ".api_keys.json" with open(config_file, 'w') as f: json.dump(config, f, indent=2) os.chmod(config_file, 0o600)
key_manager = SecureKeyManager()
key_manager.store_api_key("openai", "sk-your-api-key")
api_key = key_manager.get_api_key("openai") os.environ["OPENAI_API_KEY"] = api_key
|
敏感信息过滤
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import re
class SensitiveDataFilter: """敏感数据过滤器""" def __init__(self): self.patterns = { 'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', 'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', 'credit_card': r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b', 'ssn': r'\b\d{3}-\d{2}-\d{4}\b', 'api_key': r'\bsk-[A-Za-z0-9]{20,}\b' } def filter_text(self, text: str) -> str: """过滤敏感信息""" filtered = text for data_type, pattern in self.patterns.items(): matches = re.finditer(pattern, filtered) for match in matches: original = match.group() replacement = f"[{data_type.upper()}_REDACTED]" filtered = filtered.replace(original, replacement) return filtered def safe_log(self, message: str): """安全日志(自动过滤)""" filtered_message = self.filter_text(message) logger.info(filtered_message)
filter = SensitiveDataFilter()
text = "联系邮箱: user@example.com, 电话: 123-456-7890" filtered = filter.filter_text(text) print(filtered)
filter.safe_log(f"用户数据: {text}")
|
部署最佳实践
Docker化部署
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt
COPY . .
RUN useradd -m appuser USER appuser
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD python -c "import requests; requests.get('http://localhost:8000/health')" || exit 1
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
|
环境变量配置
1 2 3 4 5 6 7
| OPENAI_API_KEY=${VAULT_OPENAI_KEY} DATABASE_URL=postgresql://user:pass@db:5432/agentdb REDIS_URL=redis://redis:6379 LOG_LEVEL=WARNING ENABLE_CACHE=true MAX_CONCURRENT_REQUESTS=10
|
Kubernetes部署(简化版)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| apiVersion: apps/v1 kind: Deployment metadata: name: agent-service spec: replicas: 3 selector: matchLabels: app: agent template: metadata: labels: app: agent spec: containers: - name: agent image: your-registry/agent:latest ports: - containerPort: 8000 envFrom: - secretRef: name: agent-secrets resources: requests: memory: "512Mi" cpu: "500m" limits: memory: "1Gi" cpu: "1000m" livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 30 periodSeconds: 10
|
系列总结
恭喜!你已经完成了整个AI智能体开发系列的学习。让我们回顾一下:
第一篇:概念与架构
- 理解Agent的定义和核心特征
- 掌握四层架构设计
- 了解关键组件(Planner、Memory、Tools、Decision Engine)
第二篇:技术栈与工具
- 对比主流框架(LangChain、CrewAI、AutoGen、LlamaIndex)
- 学习各类工具集成方法
- 获得不同规模项目的技术栈推荐
第三篇:实战构建
- 从零构建学术研究助手Agent
- 实现论文搜索、阅读、总结、报告生成
- 掌握模块化设计和工程实践
第四篇:进阶优化(本文)
- 多Agent协作模式(顺序、并行、层次化、辩论式)
- 性能优化策略(并行化、流式、缓存、模型路由、批量处理)
- 人类介入机制(审批、澄清、反馈学习)
- 监控与日志系统
- 安全与隐私保护
- 部署最佳实践
继续探索
完成本系列学习后,你可以:
动手实践:选择一个实际场景开始构建Agent,从简单的单Agent起步,逐步扩展到多Agent协作系统。
深入学习:关注Agent领域的最新研究进展,参与开源社区贡献,与全球开发者交流经验。
拓展方向:探索模型微调、多模态Agent、Agent自我进化等前沿技术,不断突破能力边界。
资源汇总
官方文档:
优秀项目:
社区资源:
- Reddit: r/LangChain, r/LocalLLaMA, r/ArtificialIntelligence
- Discord: LangChain官方社区、CrewAI社区
- GitHub: 关注 trending AI Agent repos
结语
AI智能体代表了AI应用的下一个演进方向。从被动的问答系统到主动的任务执行者,Agent正在改变我们与AI交互的方式,也将在未来几年重塑各行各业的工作流程。
希望这个系列能为你打开Agent开发的大门。记住,最好的学习方式就是动手实践。不要害怕犯错,每一个bug都是学习的机会。
期待看到你用Agent创造出令人惊叹的作品!