OpenClaw 智能体通信协议与高级模式 (2025-2026)
最后更新: 2026年4月30日 | 状态: 生产就绪 覆盖范围: 智能体间通信协议、事件驱动编排、LLM-as-Judge 评估、自愈回路、上下文压缩策略
1. 智能体间通信协议
6年4月30日 | 状态: 生产就绪
覆盖范围: 智能体间通信协议、事件驱动编排、LLM-as-Judge 评估、自愈回路、上下文压缩策略
1. 智能体间通信协议### 1.1 结构化消息格式
智能体间通信应使用严格定义的消息格式,避免自然语言歧义:
from pydantic import BaseModel, Field
from typing import Optional, Any
from enum import Enum
class MessageType(str, Enum):
TASK_REQUEST = "task_request"
TASK_RESULT = "task_result"
STATUS_UPDATE = "status_update"
ERROR_REPORT = "error_report"
CONTEXT_SHARE = "context_share"
VERIFICATION_REQUEST = "verification_request"
"error_report"
CONTEXT_SHARE = "context_share"
VERIFICATION_REQUEST = "verification_request"class AgentMessage(BaseModel):
message_id: str = Field(..., description="UUID for deduplication")
sender: str = Field(..., description="Sender agent ID")
receiver: str = Field(..., description="Target agent ID or '*' for broadcast")
message_type: MessageType
priority: int = Field(default=5, ge=1, le=10, description="1=highest, 10=lowest")
payload: Any = Field(..., description="Task-specific data")
context_summary: str = Field(default="", description="Compressed context for routing")
timestamp: str = Field(default_factory=lambda: datetime.utcnow().isoformat())
ttl: int = Field(default=300, description="Time-to-live in seconds")
class Config:
json_schema_extra = {
"example": {
"message_id": "uuid-123",
"sender": "researcher_agent",
"receiver": "analyst_agent",
"message_type": "task_result",
"priority": 3,
"payload": {"findings": [...], "confidence": 0.85},
"context_summary": "User requested market analysis for AI sector"
}
}
基于 Redis Streams 的异步消息总线:
import redis
import json
import asyncio
from typing import Callable, Dict
reams 的异步消息总线**:
```python
import redis
import json
import asyncio
from typing import Callable, Dictclass AgentMessageBus:
def __init__(self, redis_url="redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self.handlers: Dict[str, Callable] = {}
self.running = False
async def publish(self, message: AgentMessage):
"""发布消息到消息总线"""
stream_name = f"agent:{message.receiver}"
self.redis.xadd(stream_name, {"data": message.model_dump_json()})
# 也发布到广播频道
self.redis.xadd("agent:broadcast", {"data": message.model_dump_json()})
async def subscribe(self, agent_id: str, handler: Callable):
"""订阅特定智能体的消息"""
self.handlers[agent_id] = handler
async def run(self, agent_id: str):
"""消息循环"""
self.running = True
stream_name = f"agent:{agent_id}"
last_id = "0"
while self.running:
messages = self.redis.xread(
{stream_name: last_id}, block=1000, count=10
)
if messages:
for _, msgs in messages:
for msg_id, data in msgs:
last_id = msg_id.decode()
message = AgentMessage.model_validate_json(data["data"])
if message.message_type in self.handlers:
await self.handlers[message.message_type](message)
async def stop(self):
self.running = False
async def stop(self):
self.running = False
``` ---## 2. 事件驱动编排
2.1 事件定义
```python from pydantic import BaseModel from typing import Any, Dict class AgentEvent(BaseModel): event_type: str # "task_completed", "error", "timeout", "guardrail_triggered" agent_id: str payload: Dict[str, Any] metadata: Dict[str, Any] = {}
事件驱动状态机
EVENT_TRANSITIONS = {
"idle": {
"task_assigned": "processing",
"shutdown_requested": "shutting_down"
},
"processing": {
"task_completed": "idle",
"error_occurred": "error_recovery",
"timeout": "error_recovery",
"guardrail_triggered": "blocked"
},
"error_recovery": {
"retry_successful": "processing",
"retry_exhausted": "failed",
"escalated": "waiting_human"
},
"blocked": {
"guardrail_cleared": "processing",
"task_cancelled": "idle"
}
}
"blocked": {
"guardrail_cleared": "processing",
"task_cancelled": "idle"
}
}### 2.2 反应式智能体管线
2.2 反应式智能体管线```python
class ReactiveAgentPipeline: """ 事件驱动的自主智能体管线 替代传统的 DAG 编排,实现更灵活的动态路由 """
def __init__(self):
self.state_machine = StateMachine(EVENT_TRANSITIONS)
self.event_queue = asyncio.Queue()
self.subscribers = {}
async def emit_event(self, event: AgentEvent):
"""发布事件,触发订阅者"""
await self.event_queue.put(event)
# 通知订阅该事件类型的处理者
if event.event_type in self.subscribers:
for handler in self.subscribers[event.event_type]:
asyncio.create_task(handler(event))
async def process_events(self):
"""事件循环"""
while True:
event = await self.event_queue.get()
# 状态转换
current_state = self.state_machine.current_state
next_state = EVENT_TRANSITIONS.get(current_state, {}).get(event.event_type)
if next_state:
self.state_machine.transition(next_state)
# 根据新状态触发后续动作
if next_state == "processing":
await self._execute_task(event)
elif next_state == "error_recovery":
await self._handle_error(event)
elif next_state == "blocked":
await self._trigger_guardrail(event)
async def _execute_task(self, event):
"""执行任务,完成后发布事件"""
try:
result = await self._run_agent_logic(event.payload)
"""执行任务,完成后发布事件""" try: result = await self._run_agent_logic(event.payload) """执行任务,完成后发布事件""" try: result = await self._run_agent_logic(event.payload) await self.emit_event(AgentEvent( event_type="task_completed", agent_id=event.agent_id, payload={"result": result} )) except Exception as e: await self.emit_event(AgentEvent( event_type="error_occurred", agent_id=event.agent_id, payload={"error": str(e), "traceback": traceback.format_exc()} ))
d,
payload={"error": str(e), "traceback": traceback.format_exc()}
))
```---
---## 3. LLM-as-Judge 评估循环
### 3.1 评估器设计
```python
from typing import List
class LLMEvaluator:
"""
使用独立 LLM 评估智能体输出质量
在输出交付用户前进行自动质量检查
"""
EVALUATION_PROMPT = """
你是一位严格的质量评估专家。请评估以下 AI 智能体生成的输出。
任务描述: {task_description}
智能体输出: {agent_output}
请从以下维度评分(1-10):
1. 准确性: 输出是否准确回答了问题?
2. 完整性: 是否覆盖了所有关键方面?
3. 格式: 输出格式是否清晰结构化?
4. 安全性: 是否包含有害/不当内容?
请以 JSON 格式返回:
{{"accuracy": 8, "completeness": 7, "format": 9, "safety": 10,
"overall": 8.5, "needs_regeneration": false, "feedback": "..."}}
"""
def __init__(self, evaluator_model="gpt-4"):
self.model = evaluator_model
async def evaluate(self, task_desc: str, output: str) -> dict:
response = await self._call_llm(
self.EVALUATION_PROMPT.format(
task_description=task_desc,
agent_output=output[:4000] # 限制上下文长度
)
)
scores = json.loads(response)
scores["passed"] = (
scores["overall"] >= 7.0 and
scores["safety"] == 10 and
not scores["needs_regeneration"]
)
return scores
async def generate_with_self_correction(
agent, task, max_attempts=3, min_score=7.0
):
"""
生成 → 评估 → 若不通过则重新生成
"""
evaluator = LLMEvaluator()
for attempt in range(max_attempts):
output = await agent.generate(task)
scores = await evaluator.evaluate(task.description, output)
if scores["passed"]:
return output, scores
if attempt < max_attempts - 1:
# 将评估反馈传递给智能体用于重新生成
task.feedback = scores.get("feedback", "请改进输出质量")
task.previous_output = output
# 所有尝试失败,返回最后一次结果并标记
return output, {**scores, "passed": False, "max_attempts_reached": True}
4. 自愈回路(Self-Healing)
output, {**scores, "passed": False, "max_attempts_reached": True}
---
## 4. 自愈回路(Self-Healing)### 4.1 智能体健康监控
```python
class AgentHealthMonitor:
"""
监控智能体健康状态,自动检测和恢复故障
"""
def __init__(self, agent_registry):
self.registry = agent_registry
self.health_metrics = {}
async def check_health(self, agent_id: str) -> dict:
metrics = {
"response_time_ms": await self._measure_latency(agent_id),
"error_rate": await self._calc_error_rate(agent_id, window=300),
"token_usage_rate": await self._calc_token_rate(agent_id),
"queue_depth": await self._get_queue_depth(agent_id),
}
metrics["healthy"] = (
metrics["response_time_ms"] < 30000 and
metrics["error_rate"] < 0.1 and
metrics["queue_depth"] < 100
)
return metrics
async def auto_heal(self, agent_id: str):
"""自动恢复流程"""
metrics = await self.check_health(agent_id)
if metrics["healthy"]:
return
if metrics["error_rate"] > 0.3:
# 高错误率 → 重启智能体
await self._restart_agent(agent_id)
elif metrics["queue_depth"] > 50:
# 队列积压 → 水平扩展
await self._spawn_replica(agent_id)
elif metrics["response_time_ms"] > 25000:
# 响应过慢 → 切换备用模型
await self._switch_fallback_model(agent_id)
| 故障类型 | 检测信号 | 降级动作 | 恢复条件 |
|---|---|---|---|
| API 超时 | 响应时间 >30s | 切换到备用模型/提供商 | 响应时间 <10s 持续 60s |
| 高错误率 | 错误率 >20% | 减少并发,增加重试间隔 | 错误率 <5% 持续 120s |
| Token 预算超限 | 剩余 token <10% | 切换到更小的模型 | token 预算重置 |
| 幻觉检测 | LLM-as-Judge 评分 <5 | 增加上下文,强制结构化输出 | 评分 >7 连续 3 次 |
| 消息队列积压 | 队列深度 >50 | 水平扩展 worker | 队列深度 <10 |
5. 上下文压缩策略
| 增加上下文,强制结构化输出 | 评分 >7 连续 3 次 | | 消息队列积压 | 队列深度 >50 | 水平扩展 worker | 队列深度 <10 |
5. 上下文压缩策略### 5.1 滚动上下文窗口
class ContextCompressor:
"""
智能体上下文管理:防止上下文溢出
"""
def __init__(self, max_tokens=8000):
self.max_tokens = max_tokens
self.message_history = []
self.summary = ""
def add_message(self, role: str, content: str):
self.message_history.append({"role": role, "content": content})
# 检查是否超过 token 限制
total_tokens = self._estimate_tokens()
if total_tokens > self.max_tokens:
self._compress()
def _compress(self):
"""压缩上下文:保留最新消息,将旧消息摘要化"""
# 保留最后 N 条消息
keep_count = max(3, len(self.message_history) // 3)
old_messages = self.message_history[:-keep_count]
self.message_history = self.message_history[-keep_count:]
# 将旧消息压缩为摘要
old_text = " ".join([m["content"] for m in old_messages])
self.summary = self._summarize(old_text)
def _summarize(self, text: str, max_length=200) -> str:
"""使用 LLM 压缩文本"""
prompt = f"请将以下内容压缩为{max_length}字以内的摘要,保留关键信息:\n{text}"
# 调用 LLM 生成摘要...
return compressed_text
def get_context(self) -> List[dict]:
"""获取完整上下文(摘要 + 消息历史)"""
context = []
if self.summary:
context.append({
"role": "system",
"content": f"以下是之前对话的摘要: {self.summary}"
})
context.extend(self.message_history)
return context
<system>
你是研究助手,负责分析市场数据。
</system>
<task>
分析 A 股科技板块本周走势,找出涨幅前 5 的行业
</task>
<tools>
- stock_api: 获取股票数据
- news_api: 获取相关新闻
- analysis_engine: 技术分析计算
</tools>
<constraints>
- 仅使用过去 7 天的数据
- 忽略 ST 股票
- 输出格式为 JSON
</constraints>
<output_format>
{
"sectors": [{"name": "...", "return_pct": ..., "key_stocks": [...]}]
}
</output_format>
6. 【新增】2026 最新架构演进
6.1 智能体 Swarm 模式(去中心化协作)
# 去中心化 Swarm:无中央管理者的智能体协作
class AgentSwarm:
def __init__(self, agents, shared_state):
self.agents = agents
self.state = shared_state # Redis-backed shared state
self.consensus_threshold = 0.6
async def run_exploration(self, task):
"""
所有智能体并行探索同一任务
通过投票达成共识
"""
# 并行执行
results = await asyncio.gather(*[
agent.explore(task) for agent in self.agents
])
# 共识投票
votes = {}
for agent, result in zip(self.agents, results):
agent_vote = agent.vote(result, results)
votes[agent.id] = agent_vote
# 选择得票最多的结果
best_result = max(results,
key=lambda r: sum(1 for v in votes.values() if v == r))
return best_result
6.2 MCP 工具注册表动态发现
6.2 MCP 工具注册表动态发现
```python# 动态 MCP 工具发现与注册 class MCPToolRegistry: def init(self): self.servers = {} # server_name -> MCP connection self.tools = {} # tool_name -> tool_definition
async def discover_tools(self):
"""自动发现所有已连接 MCP 服务器的工具"""
for server_name, connection in self.servers.items():
tools = await connection.list_tools()
for tool in tools:
self.tools[tool.name] = {
"server": server_name,
"definition": tool.definition,
"permissions": self._check_permissions(tool.name)
}
async def route_tool_call(self, agent_id, tool_name, arguments):
"""路由工具调用到正确的 MCP 服务器"""
tool_info = self.tools.get(tool_name)
if not tool_info:
raise ValueError(f"Unknown tool: {tool_name}")
if not self._agent_has_permission(agent_id, tool_name):
raise PermissionError(f"Agent {agent_id} cannot use {tool_name}")
connection = self.servers[tool_info["server"]]
return await connection.call_tool(tool_name, arguments)
```
文档更新日期: 2026年4月30日 | 来源: LangGraph/CrewAI/AutoGen 官方文档、MCP 规范、多智能体系统论文