Skip to content

多Agent协作

复杂任务往往需要多个Agent协作完成。本节介绍多Agent系统的架构和实现方法。

为什么需要多Agent?

单Agent的局限:
- 一个Agent精通所有领域很难
- 复杂任务难以有效分解
- 并行执行效率低

多Agent优势:
- 专业分工,各司其职
- 并行处理,提高效率
- 相互校验,减少错误

多Agent架构

层级架构

        ┌─────────────┐
        │  主控Agent  │  负责任务分发和汇总
        └──────┬──────┘

    ┌──────────┼──────────┐
    ↓          ↓          ↓
┌───────┐  ┌───────┐  ┌───────┐
│Agent A│  │Agent B│  │Agent C│  专业Agent
└───────┘  └───────┘  └───────┘

对等架构

┌───────┐     ┌───────┐
│Agent A│ ←→ │Agent B│
└───┬───┘     └───┬───┘
    │             │
    ↓             ↓
┌───┴───┐     ┌───┴───┐
│Agent C│ ←→ │Agent D│
└───────┘     └───────┘

黑板架构

┌─────────────────────────────┐
│          黑板(共享状态)       │
│  - 任务队列                  │
│  - 中间结果                  │
│  - 决策记录                  │
└───────────┬─────────────────┘

    ┌───────┼───────┐
    ↓       ↓       ↓
┌─────┐ ┌─────┐ ┌─────┐
│Agt1 │ │Agt2 │ │Agt3 │
└─────┘ └─────┘ └─────┘

使用LangGraph实现多Agent

主控+专业Agent

python
from typing import TypedDict, Literal
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI

# 定义状态
class MultiAgentState(TypedDict):
    messages: list
    next_agent: str
    task: str
    results: dict

# 创建模型
model = ChatOpenAI(model="gpt-4o")

# 主控Agent:决定调用哪个专业Agent
def supervisor(state: MultiAgentState):
    task = state["task"]
    
    response = model.invoke(f"""分析以下任务,决定由哪个专业Agent处理:

任务:{task}

可选Agent:
- research: 负责搜索和收集信息
- coder: 负责编写代码
- writer: 负责撰写文档

只输出Agent名称。
""")
    
    return {"next_agent": response.content.strip()}

# 研究Agent
def research_agent(state: MultiAgentState):
    task = state["task"]
    result = model.invoke(f"作为研究专家,请分析:{task}")
    return {"results": {"research": result.content}}

# 编程Agent
def coder_agent(state: MultiAgentState):
    task = state["task"]
    result = model.invoke(f"作为编程专家,请处理:{task}")
    return {"results": {"code": result.content}}

# 写作Agent
def writer_agent(state: MultiAgentState):
    task = state["task"]
    result = model.invoke(f"作为写作专家,请撰写:{task}")
    return {"results": {"writing": result.content}}

# 路由函数
def route_agent(state: MultiAgentState):
    next_agent = state["next_agent"]
    if "research" in next_agent:
        return "research"
    elif "coder" in next_agent:
        return "coder"
    elif "writer" in next_agent:
        return "writer"
    return END

# 构建图
workflow = StateGraph(MultiAgentState)
workflow.add_node("supervisor", supervisor)
workflow.add_node("research", research_agent)
workflow.add_node("coder", coder_agent)
workflow.add_node("writer", writer_agent)

workflow.set_entry_point("supervisor")
workflow.add_conditional_edges("supervisor", route_agent)
workflow.add_edge("research", END)
workflow.add_edge("coder", END)
workflow.add_edge("writer", END)

app = workflow.compile()

# 使用
result = app.invoke({"task": "帮我写一个Python爬虫脚本", "messages": [], "results": {}})

顺序协作Agent

python
from langgraph.graph import StateGraph

class PipelineState(TypedDict):
    input_text: str
    translated: str
    summarized: str
    final: str

def translator(state: PipelineState):
    """翻译Agent"""
    result = model.invoke(f"翻译成英文:{state['input_text']}")
    return {"translated": result.content}

def summarizer(state: PipelineState):
    """总结Agent"""
    result = model.invoke(f"用一句话总结:{state['translated']}")
    return {"summarized": result.content}

def formatter(state: PipelineState):
    """格式化Agent"""
    result = model.invoke(f"将以下内容格式化为JSON:{state['summarized']}")
    return {"final": result.content}

# 构建流水线
pipeline = StateGraph(PipelineState)
pipeline.add_node("translator", translator)
pipeline.add_node("summarizer", summarizer)
pipeline.add_node("formatter", formatter)

pipeline.set_entry_point("translator")
pipeline.add_edge("translator", "summarizer")
pipeline.add_edge("summarizer", "formatter")
pipeline.add_edge("formatter", END)

app = pipeline.compile()
result = app.invoke({"input_text": "人工智能正在改变世界"})

并行协作Agent

python
from langgraph.graph import StateGraph
from langchain_core.runnables import RunnableParallel

class ParallelState(TypedDict):
    input_text: str
    analysis: dict

def sentiment_agent(state: ParallelState):
    """情感分析Agent"""
    result = model.invoke(f"分析情感:{state['input_text']}")
    return {"analysis": {"sentiment": result.content}}

def keyword_agent(state: ParallelState):
    """关键词提取Agent"""
    result = model.invoke(f"提取关键词:{state['input_text']}")
    return {"analysis": {"keywords": result.content}}

def category_agent(state: ParallelState):
    """分类Agent"""
    result = model.invoke(f"分类:{state['input_text']}")
    return {"analysis": {"category": result.content}}

# 并行执行
def parallel_analysis(state: ParallelState):
    results = RunnableParallel(
        sentiment=sentiment_agent,
        keywords=keyword_agent,
        category=category_agent
    ).invoke(state)
    
    return {"analysis": results}

# 构建图
workflow = StateGraph(ParallelState)
workflow.add_node("parallel", parallel_analysis)
workflow.set_entry_point("parallel")
workflow.add_edge("parallel", END)

app = workflow.compile()

Agent通信机制

共享状态

python
class SharedState(TypedDict):
    messages: list  # 所有消息历史
    task_queue: list  # 待处理任务
    results: dict  # 各Agent的结果
    context: dict  # 共享上下文

def agent_a(state: SharedState):
    # 读取其他Agent的结果
    other_result = state["results"].get("agent_b")
    
    # 处理任务
    result = model.invoke(f"处理任务,参考:{other_result}")
    
    # 写入结果
    return {"results": {"agent_a": result.content}}

消息传递

python
def agent_communication(sender: str, receiver: str, message: str, state: dict):
    """Agent间通信"""
    communication_log = state.get("communication", [])
    communication_log.append({
        "from": sender,
        "to": receiver,
        "message": message,
        "timestamp": datetime.now()
    })
    return {"communication": communication_log}

冲突解决

投票机制

python
def voting_resolution(state: MultiAgentState):
    """投票解决冲突"""
    results = state["results"]
    
    # 多个Agent给出答案,投票决定
    votes = {}
    for agent, result in results.items():
        key = extract_key_point(result)
        votes[key] = votes.get(key, 0) + 1
    
    # 选择票数最多的
    final = max(votes, key=votes.get)
    return {"final_result": final}

仲裁机制

python
def arbiter(state: MultiAgentState):
    """仲裁Agent"""
    results = state["results"]
    
    prompt = f"""多个Agent给出了不同的答案,请选择最佳答案:

Agent1: {results.get('agent1')}
Agent2: {results.get('agent2')}
Agent3: {results.get('agent3')}

选择最准确的答案并解释原因。
"""
    
    return model.invoke(prompt)

完整示例:软件开发团队

python
from typing import TypedDict
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o")

class DevTeamState(TypedDict):
    requirements: str
    design: str
    code: str
    review: str
    tests: str
    final: str

# 产品经理:分析需求
def product_manager(state: DevTeamState):
    result = model.invoke(f"""作为产品经理,分析以下需求:

{state['requirements']}

输出:
1. 功能列表
2. 技术要求
3. 验收标准
""")
    return {"design": result.content}

# 开发工程师:编写代码
def developer(state: DevTeamState):
    result = model.invoke(f"""作为开发工程师,根据设计编写代码:

设计文档:
{state['design']}

输出完整的代码实现。
""")
    return {"code": result.content}

# 测试工程师:编写测试
def tester(state: DevTeamState):
    result = model.invoke(f"""作为测试工程师,为代码编写测试:

代码:
{state['code']}

输出单元测试代码。
""")
    return {"tests": result.content}

# 代码审查员:审查代码
def reviewer(state: DevTeamState):
    result = model.invoke(f"""作为代码审查员,审查代码和测试:

代码:
{state['code']}

测试:
{state['tests']}

输出:
1. 代码质量评估
2. 潜在问题
3. 改进建议
""")
    return {"review": result.content}

# 构建工作流
workflow = StateGraph(DevTeamState)
workflow.add_node("pm", product_manager)
workflow.add_node("dev", developer)
workflow.add_node("test", tester)
workflow.add_node("review", reviewer)

workflow.set_entry_point("pm")
workflow.add_edge("pm", "dev")
workflow.add_edge("dev", "test")
workflow.add_edge("test", "review")
workflow.add_edge("review", END)

app = workflow.compile()

# 使用
result = app.invoke({
    "requirements": "实现一个用户登录功能,支持用户名密码和第三方登录"
})

print("最终代码:")
print(result["code"])
print("\n代码审查:")
print(result["review"])

小结

模式说明适用场景
层级架构主控+专业Agent任务类型明确
顺序协作流水线处理有固定流程
并行协作同时处理独立子任务
黑板架构共享状态需要紧密协作

下一步

学完了Agent开发,继续学习 项目实战,将所学知识应用到真实项目中。