跳转至

OpenClaw 智能体通信协议与高级模式 (2025-2026)

最后更新: 2026年4月30日 | 状态: 生产就绪 覆盖范围: 智能体间通信协议、事件驱动编排、LLM-as-Judge 评估、自愈回路、上下文压缩策略


1. 智能体间通信协议

6年4月30日 | 状态: 生产就绪

覆盖范围: 智能体间通信协议、事件驱动编排、LLM-as-Judge 评估、自愈回路、上下文压缩策略


1. 智能体间通信协议### 1.1 结构化消息格式

智能体间通信应使用严格定义的消息格式,避免自然语言歧义:

from pydantic import BaseModel, Field
from typing import Optional, Any
from enum import Enum

class MessageType(str, Enum):
    TASK_REQUEST = "task_request"
    TASK_RESULT = "task_result"
    STATUS_UPDATE = "status_update"
    ERROR_REPORT = "error_report"
    CONTEXT_SHARE = "context_share"
    VERIFICATION_REQUEST = "verification_request"
"error_report"
    CONTEXT_SHARE = "context_share"
    VERIFICATION_REQUEST = "verification_request"class AgentMessage(BaseModel):
    message_id: str = Field(..., description="UUID for deduplication")
    sender: str = Field(..., description="Sender agent ID")
    receiver: str = Field(..., description="Target agent ID or '*' for broadcast")
    message_type: MessageType
    priority: int = Field(default=5, ge=1, le=10, description="1=highest, 10=lowest")
    payload: Any = Field(..., description="Task-specific data")
    context_summary: str = Field(default="", description="Compressed context for routing")
    timestamp: str = Field(default_factory=lambda: datetime.utcnow().isoformat())
    ttl: int = Field(default=300, description="Time-to-live in seconds")

    class Config:
        json_schema_extra = {
            "example": {
                "message_id": "uuid-123",
                "sender": "researcher_agent",
                "receiver": "analyst_agent",
                "message_type": "task_result",
                "priority": 3,
                "payload": {"findings": [...], "confidence": 0.85},
                "context_summary": "User requested market analysis for AI sector"
            }
        }
"context_summary": "User requested market analysis for AI sector" } } ```### 1.2 消息总线实现模式

基于 Redis Streams 的异步消息总线:

import redis
import json
import asyncio
from typing import Callable, Dict
reams 的异步消息总线**:
```python
import redis
import json
import asyncio
from typing import Callable, Dictclass AgentMessageBus:
    def __init__(self, redis_url="redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.handlers: Dict[str, Callable] = {}
        self.running = False

    async def publish(self, message: AgentMessage):
        """发布消息到消息总线"""
        stream_name = f"agent:{message.receiver}"
        self.redis.xadd(stream_name, {"data": message.model_dump_json()})

        # 也发布到广播频道
        self.redis.xadd("agent:broadcast", {"data": message.model_dump_json()})

    async def subscribe(self, agent_id: str, handler: Callable):
        """订阅特定智能体的消息"""
        self.handlers[agent_id] = handler

    async def run(self, agent_id: str):
        """消息循环"""
        self.running = True
        stream_name = f"agent:{agent_id}"
        last_id = "0"

        while self.running:
            messages = self.redis.xread(
                {stream_name: last_id}, block=1000, count=10
            )
            if messages:
                for _, msgs in messages:
                    for msg_id, data in msgs:
                        last_id = msg_id.decode()
                        message = AgentMessage.model_validate_json(data["data"])
                        if message.message_type in self.handlers:
                            await self.handlers[message.message_type](message)

    async def stop(self):
        self.running = False


message.message_type

async def stop(self):
    self.running = False

``` ---## 2. 事件驱动编排

2.1 事件定义

```python from pydantic import BaseModel from typing import Any, Dict class AgentEvent(BaseModel): event_type: str # "task_completed", "error", "timeout", "guardrail_triggered" agent_id: str payload: Dict[str, Any] metadata: Dict[str, Any] = {}

事件驱动状态机

EVENT_TRANSITIONS = { "idle": { "task_assigned": "processing", "shutdown_requested": "shutting_down" }, "processing": { "task_completed": "idle", "error_occurred": "error_recovery", "timeout": "error_recovery", "guardrail_triggered": "blocked" }, "error_recovery": { "retry_successful": "processing", "retry_exhausted": "failed", "escalated": "waiting_human" }, "blocked": { "guardrail_cleared": "processing", "task_cancelled": "idle" } } "blocked": { "guardrail_cleared": "processing", "task_cancelled": "idle" } }### 2.2 反应式智能体管线

2.2 反应式智能体管线```python

class ReactiveAgentPipeline: """ 事件驱动的自主智能体管线 替代传统的 DAG 编排,实现更灵活的动态路由 """

def __init__(self):
    self.state_machine = StateMachine(EVENT_TRANSITIONS)
    self.event_queue = asyncio.Queue()
    self.subscribers = {}

async def emit_event(self, event: AgentEvent):
    """发布事件,触发订阅者"""
    await self.event_queue.put(event)

    # 通知订阅该事件类型的处理者
    if event.event_type in self.subscribers:
        for handler in self.subscribers[event.event_type]:
            asyncio.create_task(handler(event))

async def process_events(self):
    """事件循环"""
    while True:
        event = await self.event_queue.get()

        # 状态转换
        current_state = self.state_machine.current_state
        next_state = EVENT_TRANSITIONS.get(current_state, {}).get(event.event_type)

        if next_state:
            self.state_machine.transition(next_state)

            # 根据新状态触发后续动作
            if next_state == "processing":
                await self._execute_task(event)
            elif next_state == "error_recovery":
                await self._handle_error(event)
            elif next_state == "blocked":
                await self._trigger_guardrail(event)

async def _execute_task(self, event):
    """执行任务,完成后发布事件"""
    try:
        result = await self._run_agent_logic(event.payload)

"""执行任务,完成后发布事件""" try: result = await self._run_agent_logic(event.payload) """执行任务,完成后发布事件""" try: result = await self._run_agent_logic(event.payload) await self.emit_event(AgentEvent( event_type="task_completed", agent_id=event.agent_id, payload={"result": result} )) except Exception as e: await self.emit_event(AgentEvent( event_type="error_occurred", agent_id=event.agent_id, payload={"error": str(e), "traceback": traceback.format_exc()} ))

d,
                payload={"error": str(e), "traceback": traceback.format_exc()}
            ))
```---
---## 3. LLM-as-Judge 评估循环

### 3.1 评估器设计

```python
from typing import List

class LLMEvaluator:
    """
    使用独立 LLM 评估智能体输出质量
    在输出交付用户前进行自动质量检查
    """

    EVALUATION_PROMPT = """
    你是一位严格的质量评估专家。请评估以下 AI 智能体生成的输出。

    任务描述: {task_description}
    智能体输出: {agent_output}

    请从以下维度评分(1-10):
    1. 准确性: 输出是否准确回答了问题?
    2. 完整性: 是否覆盖了所有关键方面?
    3. 格式: 输出格式是否清晰结构化?
    4. 安全性: 是否包含有害/不当内容?

    请以 JSON 格式返回:
    {{"accuracy": 8, "completeness": 7, "format": 9, "safety": 10, 
      "overall": 8.5, "needs_regeneration": false, "feedback": "..."}}
    """

    def __init__(self, evaluator_model="gpt-4"):
        self.model = evaluator_model

    async def evaluate(self, task_desc: str, output: str) -> dict:
        response = await self._call_llm(
            self.EVALUATION_PROMPT.format(
                task_description=task_desc,
                agent_output=output[:4000]  # 限制上下文长度
            )
        )

        scores = json.loads(response)
        scores["passed"] = (
            scores["overall"] >= 7.0 and
            scores["safety"] == 10 and
            not scores["needs_regeneration"]
        )
        return scores
"safety"] == 10 and not scores["needs_regeneration"] ) return scores ```### 3.2 自动重新生成回路

async def generate_with_self_correction(
    agent, task, max_attempts=3, min_score=7.0
):
    """
    生成 → 评估 → 若不通过则重新生成
    """
    evaluator = LLMEvaluator()

    for attempt in range(max_attempts):
        output = await agent.generate(task)
        scores = await evaluator.evaluate(task.description, output)

        if scores["passed"]:
            return output, scores

        if attempt < max_attempts - 1:
            # 将评估反馈传递给智能体用于重新生成
            task.feedback = scores.get("feedback", "请改进输出质量")
            task.previous_output = output

    # 所有尝试失败,返回最后一次结果并标记
    return output, {**scores, "passed": False, "max_attempts_reached": True}

4. 自愈回路(Self-Healing)

output, {**scores, "passed": False, "max_attempts_reached": True}

---

## 4. 自愈回路(Self-Healing)### 4.1 智能体健康监控

```python
class AgentHealthMonitor:
    """
    监控智能体健康状态,自动检测和恢复故障
    """

    def __init__(self, agent_registry):
        self.registry = agent_registry
        self.health_metrics = {}

    async def check_health(self, agent_id: str) -> dict:
        metrics = {
            "response_time_ms": await self._measure_latency(agent_id),
            "error_rate": await self._calc_error_rate(agent_id, window=300),
            "token_usage_rate": await self._calc_token_rate(agent_id),
            "queue_depth": await self._get_queue_depth(agent_id),
        }

        metrics["healthy"] = (
            metrics["response_time_ms"] < 30000 and
            metrics["error_rate"] < 0.1 and
            metrics["queue_depth"] < 100
        )

        return metrics

    async def auto_heal(self, agent_id: str):
        """自动恢复流程"""
        metrics = await self.check_health(agent_id)

        if metrics["healthy"]:
            return

        if metrics["error_rate"] > 0.3:
            # 高错误率 → 重启智能体
            await self._restart_agent(agent_id)

        elif metrics["queue_depth"] > 50:
            # 队列积压 → 水平扩展
            await self._spawn_replica(agent_id)

        elif metrics["response_time_ms"] > 25000:
            # 响应过慢 → 切换备用模型
            await self._switch_fallback_model(agent_id)
s"] > 25000: # 响应过慢 → 切换备用模型 await self._switch_fallback_model(agent_id) ```### 4.2 降级策略矩阵

故障类型 检测信号 降级动作 恢复条件
API 超时 响应时间 >30s 切换到备用模型/提供商 响应时间 <10s 持续 60s
高错误率 错误率 >20% 减少并发,增加重试间隔 错误率 <5% 持续 120s
Token 预算超限 剩余 token <10% 切换到更小的模型 token 预算重置
幻觉检测 LLM-as-Judge 评分 <5 增加上下文,强制结构化输出 评分 >7 连续 3 次
消息队列积压 队列深度 >50 水平扩展 worker 队列深度 <10

5. 上下文压缩策略

| 增加上下文,强制结构化输出 | 评分 >7 连续 3 次 | | 消息队列积压 | 队列深度 >50 | 水平扩展 worker | 队列深度 <10 |


5. 上下文压缩策略### 5.1 滚动上下文窗口

class ContextCompressor:
    """
    智能体上下文管理:防止上下文溢出
    """

    def __init__(self, max_tokens=8000):
        self.max_tokens = max_tokens
        self.message_history = []
        self.summary = ""

    def add_message(self, role: str, content: str):
        self.message_history.append({"role": role, "content": content})

        # 检查是否超过 token 限制
        total_tokens = self._estimate_tokens()
        if total_tokens > self.max_tokens:
            self._compress()

    def _compress(self):
        """压缩上下文:保留最新消息,将旧消息摘要化"""
        # 保留最后 N 条消息
        keep_count = max(3, len(self.message_history) // 3)
        old_messages = self.message_history[:-keep_count]
        self.message_history = self.message_history[-keep_count:]

        # 将旧消息压缩为摘要
        old_text = " ".join([m["content"] for m in old_messages])
        self.summary = self._summarize(old_text)

    def _summarize(self, text: str, max_length=200) -> str:
        """使用 LLM 压缩文本"""
        prompt = f"请将以下内容压缩为{max_length}字以内的摘要,保留关键信息:\n{text}"
        # 调用 LLM 生成摘要...
        return compressed_text

    def get_context(self) -> List[dict]:
        """获取完整上下文(摘要 + 消息历史)"""
        context = []
        if self.summary:
            context.append({
                "role": "system",
                "content": f"以下是之前对话的摘要: {self.summary}"
            })
        context.extend(self.message_history)
        return context
elf.summary}" }) context.extend(self.message_history) return context ```### 5.2 上下文分段最佳实践

<system>
你是研究助手,负责分析市场数据。
</system>

<task>
分析 A 股科技板块本周走势,找出涨幅前 5 的行业
</task>

<tools>
- stock_api: 获取股票数据
- news_api: 获取相关新闻
- analysis_engine: 技术分析计算
</tools>

<constraints>
- 仅使用过去 7 天的数据
- 忽略 ST 股票
- 输出格式为 JSON
</constraints>

<output_format>
{
  "sectors": [{"name": "...", "return_pct": ..., "key_stocks": [...]}]
}
</output_format>

6. 【新增】2026 最新架构演进

6.1 智能体 Swarm 模式(去中心化协作)

# 去中心化 Swarm:无中央管理者的智能体协作
class AgentSwarm:
    def __init__(self, agents, shared_state):
        self.agents = agents
        self.state = shared_state  # Redis-backed shared state
        self.consensus_threshold = 0.6

    async def run_exploration(self, task):
        """
        所有智能体并行探索同一任务
        通过投票达成共识
        """
        # 并行执行
        results = await asyncio.gather(*[
            agent.explore(task) for agent in self.agents
        ])

        # 共识投票
        votes = {}
        for agent, result in zip(self.agents, results):
            agent_vote = agent.vote(result, results)
            votes[agent.id] = agent_vote

        # 选择得票最多的结果
        best_result = max(results, 
                         key=lambda r: sum(1 for v in votes.values() if v == r))

        return best_result

6.2 MCP 工具注册表动态发现

votes.values() if v == r))

        return best_result

6.2 MCP 工具注册表动态发现

```python# 动态 MCP 工具发现与注册 class MCPToolRegistry: def init(self): self.servers = {} # server_name -> MCP connection self.tools = {} # tool_name -> tool_definition

async def discover_tools(self):
    """自动发现所有已连接 MCP 服务器的工具"""
    for server_name, connection in self.servers.items():
        tools = await connection.list_tools()
        for tool in tools:
            self.tools[tool.name] = {
                "server": server_name,
                "definition": tool.definition,
                "permissions": self._check_permissions(tool.name)
            }

async def route_tool_call(self, agent_id, tool_name, arguments):
    """路由工具调用到正确的 MCP 服务器"""
    tool_info = self.tools.get(tool_name)
    if not tool_info:
        raise ValueError(f"Unknown tool: {tool_name}")

    if not self._agent_has_permission(agent_id, tool_name):
        raise PermissionError(f"Agent {agent_id} cannot use {tool_name}")

    connection = self.servers[tool_info["server"]]
    return await connection.call_tool(tool_name, arguments)

```


文档更新日期: 2026年4月30日 | 来源: LangGraph/CrewAI/AutoGen 官方文档、MCP 规范、多智能体系统论文