多Agent协作
复杂任务往往需要多个Agent协作完成。本节介绍多Agent系统的架构和实现方法。
为什么需要多Agent?
单Agent的局限:
- 一个Agent精通所有领域很难
- 复杂任务难以有效分解
- 并行执行效率低
多Agent优势:
- 专业分工,各司其职
- 并行处理,提高效率
- 相互校验,减少错误多Agent架构
层级架构
┌─────────────┐
│ 主控Agent │ 负责任务分发和汇总
└──────┬──────┘
│
┌──────────┼──────────┐
↓ ↓ ↓
┌───────┐ ┌───────┐ ┌───────┐
│Agent A│ │Agent B│ │Agent C│ 专业Agent
└───────┘ └───────┘ └───────┘对等架构
┌───────┐ ┌───────┐
│Agent A│ ←→ │Agent B│
└───┬───┘ └───┬───┘
│ │
↓ ↓
┌───┴───┐ ┌───┴───┐
│Agent C│ ←→ │Agent D│
└───────┘ └───────┘黑板架构
┌─────────────────────────────┐
│ 黑板(共享状态) │
│ - 任务队列 │
│ - 中间结果 │
│ - 决策记录 │
└───────────┬─────────────────┘
│
┌───────┼───────┐
↓ ↓ ↓
┌─────┐ ┌─────┐ ┌─────┐
│Agt1 │ │Agt2 │ │Agt3 │
└─────┘ └─────┘ └─────┘使用LangGraph实现多Agent
主控+专业Agent
python
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
# 定义状态
class MultiAgentState(TypedDict):
messages: list
next_agent: str
task: str
results: dict
# 创建模型
model = ChatOpenAI(model="gpt-4o")
# 主控Agent:决定调用哪个专业Agent
def supervisor(state: MultiAgentState):
task = state["task"]
response = model.invoke(f"""分析以下任务,决定由哪个专业Agent处理:
任务:{task}
可选Agent:
- research: 负责搜索和收集信息
- coder: 负责编写代码
- writer: 负责撰写文档
只输出Agent名称。
""")
return {"next_agent": response.content.strip()}
# 研究Agent
def research_agent(state: MultiAgentState):
task = state["task"]
result = model.invoke(f"作为研究专家,请分析:{task}")
return {"results": {"research": result.content}}
# 编程Agent
def coder_agent(state: MultiAgentState):
task = state["task"]
result = model.invoke(f"作为编程专家,请处理:{task}")
return {"results": {"code": result.content}}
# 写作Agent
def writer_agent(state: MultiAgentState):
task = state["task"]
result = model.invoke(f"作为写作专家,请撰写:{task}")
return {"results": {"writing": result.content}}
# 路由函数
def route_agent(state: MultiAgentState):
next_agent = state["next_agent"]
if "research" in next_agent:
return "research"
elif "coder" in next_agent:
return "coder"
elif "writer" in next_agent:
return "writer"
return END
# 构建图
workflow = StateGraph(MultiAgentState)
workflow.add_node("supervisor", supervisor)
workflow.add_node("research", research_agent)
workflow.add_node("coder", coder_agent)
workflow.add_node("writer", writer_agent)
workflow.set_entry_point("supervisor")
workflow.add_conditional_edges("supervisor", route_agent)
workflow.add_edge("research", END)
workflow.add_edge("coder", END)
workflow.add_edge("writer", END)
app = workflow.compile()
# 使用
result = app.invoke({"task": "帮我写一个Python爬虫脚本", "messages": [], "results": {}})顺序协作Agent
python
from langgraph.graph import StateGraph
class PipelineState(TypedDict):
input_text: str
translated: str
summarized: str
final: str
def translator(state: PipelineState):
"""翻译Agent"""
result = model.invoke(f"翻译成英文:{state['input_text']}")
return {"translated": result.content}
def summarizer(state: PipelineState):
"""总结Agent"""
result = model.invoke(f"用一句话总结:{state['translated']}")
return {"summarized": result.content}
def formatter(state: PipelineState):
"""格式化Agent"""
result = model.invoke(f"将以下内容格式化为JSON:{state['summarized']}")
return {"final": result.content}
# 构建流水线
pipeline = StateGraph(PipelineState)
pipeline.add_node("translator", translator)
pipeline.add_node("summarizer", summarizer)
pipeline.add_node("formatter", formatter)
pipeline.set_entry_point("translator")
pipeline.add_edge("translator", "summarizer")
pipeline.add_edge("summarizer", "formatter")
pipeline.add_edge("formatter", END)
app = pipeline.compile()
result = app.invoke({"input_text": "人工智能正在改变世界"})并行协作Agent
python
from langgraph.graph import StateGraph
from langchain_core.runnables import RunnableParallel
class ParallelState(TypedDict):
input_text: str
analysis: dict
def sentiment_agent(state: ParallelState):
"""情感分析Agent"""
result = model.invoke(f"分析情感:{state['input_text']}")
return {"analysis": {"sentiment": result.content}}
def keyword_agent(state: ParallelState):
"""关键词提取Agent"""
result = model.invoke(f"提取关键词:{state['input_text']}")
return {"analysis": {"keywords": result.content}}
def category_agent(state: ParallelState):
"""分类Agent"""
result = model.invoke(f"分类:{state['input_text']}")
return {"analysis": {"category": result.content}}
# 并行执行
def parallel_analysis(state: ParallelState):
results = RunnableParallel(
sentiment=sentiment_agent,
keywords=keyword_agent,
category=category_agent
).invoke(state)
return {"analysis": results}
# 构建图
workflow = StateGraph(ParallelState)
workflow.add_node("parallel", parallel_analysis)
workflow.set_entry_point("parallel")
workflow.add_edge("parallel", END)
app = workflow.compile()Agent通信机制
共享状态
python
class SharedState(TypedDict):
messages: list # 所有消息历史
task_queue: list # 待处理任务
results: dict # 各Agent的结果
context: dict # 共享上下文
def agent_a(state: SharedState):
# 读取其他Agent的结果
other_result = state["results"].get("agent_b")
# 处理任务
result = model.invoke(f"处理任务,参考:{other_result}")
# 写入结果
return {"results": {"agent_a": result.content}}消息传递
python
def agent_communication(sender: str, receiver: str, message: str, state: dict):
"""Agent间通信"""
communication_log = state.get("communication", [])
communication_log.append({
"from": sender,
"to": receiver,
"message": message,
"timestamp": datetime.now()
})
return {"communication": communication_log}冲突解决
投票机制
python
def voting_resolution(state: MultiAgentState):
"""投票解决冲突"""
results = state["results"]
# 多个Agent给出答案,投票决定
votes = {}
for agent, result in results.items():
key = extract_key_point(result)
votes[key] = votes.get(key, 0) + 1
# 选择票数最多的
final = max(votes, key=votes.get)
return {"final_result": final}仲裁机制
python
def arbiter(state: MultiAgentState):
"""仲裁Agent"""
results = state["results"]
prompt = f"""多个Agent给出了不同的答案,请选择最佳答案:
Agent1: {results.get('agent1')}
Agent2: {results.get('agent2')}
Agent3: {results.get('agent3')}
选择最准确的答案并解释原因。
"""
return model.invoke(prompt)完整示例:软件开发团队
python
from typing import TypedDict
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o")
class DevTeamState(TypedDict):
requirements: str
design: str
code: str
review: str
tests: str
final: str
# 产品经理:分析需求
def product_manager(state: DevTeamState):
result = model.invoke(f"""作为产品经理,分析以下需求:
{state['requirements']}
输出:
1. 功能列表
2. 技术要求
3. 验收标准
""")
return {"design": result.content}
# 开发工程师:编写代码
def developer(state: DevTeamState):
result = model.invoke(f"""作为开发工程师,根据设计编写代码:
设计文档:
{state['design']}
输出完整的代码实现。
""")
return {"code": result.content}
# 测试工程师:编写测试
def tester(state: DevTeamState):
result = model.invoke(f"""作为测试工程师,为代码编写测试:
代码:
{state['code']}
输出单元测试代码。
""")
return {"tests": result.content}
# 代码审查员:审查代码
def reviewer(state: DevTeamState):
result = model.invoke(f"""作为代码审查员,审查代码和测试:
代码:
{state['code']}
测试:
{state['tests']}
输出:
1. 代码质量评估
2. 潜在问题
3. 改进建议
""")
return {"review": result.content}
# 构建工作流
workflow = StateGraph(DevTeamState)
workflow.add_node("pm", product_manager)
workflow.add_node("dev", developer)
workflow.add_node("test", tester)
workflow.add_node("review", reviewer)
workflow.set_entry_point("pm")
workflow.add_edge("pm", "dev")
workflow.add_edge("dev", "test")
workflow.add_edge("test", "review")
workflow.add_edge("review", END)
app = workflow.compile()
# 使用
result = app.invoke({
"requirements": "实现一个用户登录功能,支持用户名密码和第三方登录"
})
print("最终代码:")
print(result["code"])
print("\n代码审查:")
print(result["review"])小结
| 模式 | 说明 | 适用场景 |
|---|---|---|
| 层级架构 | 主控+专业Agent | 任务类型明确 |
| 顺序协作 | 流水线处理 | 有固定流程 |
| 并行协作 | 同时处理 | 独立子任务 |
| 黑板架构 | 共享状态 | 需要紧密协作 |
下一步
学完了Agent开发,继续学习 项目实战,将所学知识应用到真实项目中。