zeroclaw 的 Memory 是 agent 的長期記憶——它記錄對話歷史、使用者偏好、決策紀錄,並在每次對話前把相關記憶召回注入給 LLM。這篇記錄它如何用一個 trait 統一四種後端(SQLite、Markdown、Lucid、Postgres),以及它最有趣的設計:混合式語意搜尋、Embedding LRU 快取、還有讓 agent 在冷啟動時從 Markdown 文字檔重建「靈魂」的機制。
Memory Trait:七個方法,全部必填
跟 Channel 和 Provider 的設計不同——後兩者都有預設實作——Memory trait 的七個方法全部是必填的:
#[async_trait]
pub trait Memory: Send + Sync {
fn name(&self) -> &str;
async fn store(
&self,
key: &str,
content: &str,
category: MemoryCategory,
session_id: Option<&str>,
) -> anyhow::Result<()>;
async fn recall(
&self,
query: &str,
limit: usize,
session_id: Option<&str>,
) -> anyhow::Result<Vec<MemoryEntry>>;
async fn get(&self, key: &str) -> anyhow::Result<Option<MemoryEntry>>;
async fn list(
&self,
category: Option<&MemoryCategory>,
session_id: Option<&str>,
) -> anyhow::Result<Vec<MemoryEntry>>;
async fn forget(&self, key: &str) -> anyhow::Result<bool>;
async fn count(&self) -> anyhow::Result<usize>;
async fn health_check(&self) -> bool;
}
這反映了「每種後端的搜尋策略都根本不同」——SQLite 用 FTS5 + 向量搜尋,Markdown 用關鍵字比對,Postgres 用 ILIKE——沒有一個合理的預設實作能套用在所有後端,所以索性不提供。
核心資料型別
pub struct MemoryEntry {
pub id: String,
pub key: String,
pub content: String,
pub category: MemoryCategory,
pub timestamp: String,
pub session_id: Option<String>,
pub score: Option<f64>, // 只有 recall() 會填,get()/list() 都是 None
}
pub enum MemoryCategory {
Core, // 長期事實、偏好、決策
Daily, // 每日工作日誌
Conversation, // 對話上下文
Custom(String), // 開放式擴充點
}
score 的設計很有趣:它是 recall() 的副產品,代表這筆記憶和查詢的相關度。get() 和 list() 都是精確查找,沒有「相關度」的概念,所以這個欄位在那兩個情境下永遠是 None。Custom(String) 則是一個逃生艙,讓使用者可以自訂任意分類而不需要修改 enum。
四個後端,一個介面
zeroclaw 目前有四個實作(加上一個 no-op):
| 後端 |
搜尋方式 |
特性 |
SqliteMemory |
FTS5 + 向量 (cosine) |
推薦預設,單一 .db 檔 |
MarkdownMemory |
關鍵字比對 |
純文字,只能追加 |
LucidMemory |
SQLite + 外部 CLI |
橋接第三方記憶工具 |
PostgresMemory |
ILIKE 模糊比對 |
雲端持久化,無 pgvector |
NoneMemory |
— |
完全 no-op,停用記憶 |
SQLite:真正的大腦
SqliteMemory 是推薦的預設後端,資料存在 workspace/memory/brain.db。它的設計最複雜,也最值得細看。
Schema
三張表,各有不同用途:
-- 主要記憶表
CREATE TABLE memories (
id TEXT PRIMARY KEY,
key TEXT UNIQUE NOT NULL, -- upsert 的 identity key
content TEXT NOT NULL,
category TEXT NOT NULL DEFAULT 'core',
embedding BLOB, -- f32 向量,little-endian bytes
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
session_id TEXT
);
-- FTS5 全文搜尋虛擬表
CREATE VIRTUAL TABLE memories_fts USING fts5(
key, content,
content=memories, content_rowid=rowid
);
-- Embedding LRU 快取
CREATE TABLE embedding_cache (
content_hash TEXT PRIMARY KEY, -- SHA-256 前 16 hex chars
embedding BLOB NOT NULL,
created_at TEXT NOT NULL,
accessed_at TEXT NOT NULL -- LRU 用
);
FTS5 透過三個 trigger(INSERT、DELETE、UPDATE)和主表保持同步。store() 用的是 INSERT ... ON CONFLICT(key) DO UPDATE——upsert 語意,key 是唯一識別碼,id 是 UUID(每次 upsert 都會更新)。
SQLite 開啟時的 PRAGMA 調校:WAL 模式、8 MB mmap、2 MB page cache、MEMORY temp store。
為什麼用 spawn_blocking?
rusqlite 是 blocking API,直接在 async context 呼叫會卡住 tokio runtime。所有 SQLite 操作都包在 tokio::task::spawn_blocking 裡:
let result = tokio::task::spawn_blocking(move || {
let conn = db.lock();
conn.execute("INSERT INTO memories ...", params![...])?;
Ok(())
}).await??;
連線的開啟還多了一層防護——用 worker thread + mpsc::channel 加上 recv_timeout,防止 SQLite 檔案被鎖住時永遠卡住:
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let _ = tx.send(Connection::open(&path));
});
match rx.recv_timeout(Duration::from_secs(capped)) {
Ok(Ok(conn)) => conn,
Ok(Err(e)) => return Err(e.into()),
Err(_) => return Err(anyhow!("SQLite open timed out")),
}
混合語意搜尋:FTS5 + 向量
recall() 的實作是整個記憶系統最有趣的部分。它不是單一的搜尋策略,而是三層 fallback:
graph TD
A["recall(query, limit)"] --> B{有設定 Embedding provider?}
B -->|是| C[生成查詢向量]
C --> D[FTS5 BM25 搜尋]
D --> E["hybrid_merge(向量結果, 關鍵字結果)"]
E --> F[回傳 Top-N]
B -->|否| G[純 BM25 搜尋]
G --> F
D -->|FTS5 解析錯誤| H["LIKE %keyword% 兜底"]
H --> F
向量:純 Rust Cosine Similarity
向量以 f32 little-endian bytes 存在 BLOB 欄位。Cosine similarity 是純 Rust 實作,刻意用 f64 累加以減少高維向量的浮點誤差:
pub fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
let mut dot = 0.0_f64;
let mut norm_a = 0.0_f64;
let mut norm_b = 0.0_f64;
for (x, y) in a.iter().zip(b.iter()) {
let x = f64::from(*x);
let y = f64::from(*y);
dot += x * y;
norm_a += x * x;
norm_b += y * y;
}
let denom = norm_a.sqrt() * norm_b.sqrt();
if !denom.is_finite() || denom < f64::EPSILON {
return 0.0;
}
(dot / denom).clamp(0.0, 1.0) as f32
}
這是暴力掃描所有有 embedding 的 row,沒有 ANN index(如 HNSW、IVFFlat)。對一個本地 agent 的記憶量來說夠用,不需要引入 pgvector 或外部向量 DB 的依賴。
關鍵字:FTS5 BM25
查詢前先把每個 token 用雙引號包住再用 OR 連接,避免 FTS5 特殊字元造成解析錯誤:
let fts_query = query
.split_whitespace()
.map(|w| format!("\"{w}\""))
.collect::<Vec<_>>()
.join(" OR ");
FTS5 的 bm25() 分數是負數(越負越相關),取負後變成越大越好,和 cosine similarity 的方向統一。
混合合併
pub fn hybrid_merge(
vector_results: &[(String, f32)], // (id, cosine)
keyword_results: &[(String, f32)], // (id, -BM25)
vector_weight: f32, // 預設 0.7
keyword_weight: f32, // 預設 0.3
limit: usize,
) -> Vec<ScoredResult> {
// BM25 分數正規化到 [0,1]
// final_score = 0.7 * cosine + 0.3 * normalized_BM25
// 去重:HashMap keyed on id
}
合併後的 Top-N ID 用一次 WHERE id IN (...) 批次撈出,避免 N+1 query。
Embedding LRU 快取
每次 store() 都需要為新內容生成 embedding,但相同的文字不應該重複打 API。快取的 key 是內容的 SHA-256 前 16 hex chars——明確選擇 SHA-256 而不是 DefaultHasher,因為後者的輸出在不同 Rust 版本之間不保證穩定。
LRU 淘汰不在 Rust 記憶體裡維護資料結構,而是直接在 SQL 裡做:
-- 保留最新的 N 筆,刪除多餘的舊資料
DELETE FROM embedding_cache WHERE content_hash IN (
SELECT content_hash FROM embedding_cache
ORDER BY accessed_at ASC
LIMIT MAX(0, (SELECT COUNT(*) FROM embedding_cache) - ?1)
)
這樣快取的淘汰邏輯完全不依賴記憶體狀態,重啟之後一樣有效。
Embedding Provider
Embedding 本身也是一個 trait:
#[async_trait]
pub trait EmbeddingProvider: Send + Sync {
fn name(&self) -> &str;
fn dimensions(&self) -> usize;
async fn embed(&self, texts: &[&str]) -> anyhow::Result<Vec<Vec<f32>>>;
// 唯一的預設方法:批次 embed 的單一版本
async fn embed_one(&self, text: &str) -> anyhow::Result<Vec<f32>> {
let mut results = self.embed(&[text]).await?;
results.pop().ok_or_else(|| anyhow!("Empty embedding result"))
}
}
兩個具體實作:
NoopEmbedding:dimensions() = 0,回傳空向量。啟用後自動降級為純關鍵字搜尋。
OpenAiEmbedding:打 /v1/embeddings endpoint。支援自訂 base URL,用 custom: 前綴指定:
pub fn create_embedding_provider(provider: &str, api_key: ..., model: &str, dims: usize) {
match provider {
"openai" => Box::new(OpenAiEmbedding::new("https://api.openai.com", ...)),
name if name.starts_with("custom:") => {
let base_url = name.strip_prefix("custom:").unwrap_or("");
Box::new(OpenAiEmbedding::new(base_url, ...)) // 指向 Ollama、本地推理伺服器等
}
_ => Box::new(NoopEmbedding),
}
}
MarkdownMemory:只能追加的審計日誌
MarkdownMemory 的 forget() 永遠回傳 false——這個後端是只能追加的,設計上就是個審計日誌:
workspace/MEMORY.md ← Core 類別
workspace/memory/2026-02-19.md ← Daily 類別(按日期分檔)
每次 store() 就在對應的 Markdown 檔尾端追加一行 - **key**: content。recall() 則是把查詢拆成關鍵字,計算每筆記憶裡出現了幾個,算出比例當作分數。
雖然搜尋能力差,但好處是人類可讀、可以直接用 Git 追蹤,也容易備份。
LucidMemory:橋接外部 CLI
LucidMemory 是一個橋接層,SQLite 是主要儲存,外部 lucid-memory CLI 是輔助:
pub struct LucidMemory {
local: SqliteMemory,
recall_timeout: Duration, // 預設 500ms
store_timeout: Duration, // 預設 800ms
local_hit_threshold: usize, // 預設 3:本地結果夠多就不問 Lucid
failure_cooldown: Duration, // 任何錯誤後冷卻 15 秒
last_failure_at: Mutex<Option<Instant>>,
}
store() 先寫 SQLite,再 fire-and-forget 同步到 Lucid。recall() 的策略:先問 SQLite,如果結果數量低於 local_hit_threshold 且 Lucid 不在冷卻期,才去呼叫 lucid context <query>,然後合併結果。
去重用的是一個 NUL 字元分隔的簽名:
let signature = format!("{}\u{0}{}", entry.key.to_lowercase(), entry.content.to_lowercase());
Lucid 的輸出是自訂的 XML-like 格式:
<lucid-context>
- [decision] Use token refresh middleware
- [context] Working in src/auth.rs
</lucid-context>
方括號裡的標籤(decision、context、bug、learning)會被映射回 MemoryCategory。
Agent 的靈魂:Snapshot 與冷啟動恢復
最讓我覺得有趣的設計是「靈魂快照」機制。
在每次記憶體清理(hygiene)時,如果啟用了 snapshot_on_hygiene,所有 Core 類別的記憶都會被 dump 到工作目錄的 MEMORY_SNAPSHOT.md。這個檔案是 Git 可追蹤的純文字。
更厲害的是冷啟動恢復:如果 brain.db 不存在(例如全新部署、或資料庫損毀刪除),但 MEMORY_SNAPSHOT.md 存在,factory 會在開啟後端之前先呼叫 hydrate_from_snapshot(),把 Markdown 裡的記憶逐筆還原回 SQLite。
graph LR
A["brain.db 存在?"] -->|是| B[正常開啟 SqliteMemory]
A -->|否| C["MEMORY_SNAPSHOT.md 存在?"]
C -->|是| D[hydrate_from_snapshot]
D --> E[重新開啟 SqliteMemory]
C -->|否| F[全新 SqliteMemory]
B --> G[完成]
E --> G
F --> G
每次清理也會刪掉 MemoryCategory::Conversation 類別超過 conversation_retention_days 的舊紀錄——對話上下文是短暫的,不該無限累積。
在 Agent Loop 裡的角色
Memory 在每輪對話的三個時機被使用:
sequenceDiagram
participant U as 使用者
participant AL as Agent Loop
participant M as Memory
participant LLM as LLM
U->>AL: 送出訊息
AL->>M: recall(user_msg, 5)
M-->>AL: 相關記憶(score >= 0.4 才納入)
AL->>LLM: [Memory context]\n- key: content\n...\n{user_msg}
LLM-->>AL: 回應
AL->>M: store("user_msg_{uuid}", msg, Conversation)
AL->>M: store("assistant_resp_{uuid}", summary[:100], Daily)
AL-->>U: 最終答覆
注意幾個細節:
- 相關度門檻:
score >= 0.4 才注入,低分記憶不進 context,避免雜訊干擾 LLM。
- 自動儲存:使用者訊息存
Conversation 類別(會被清理),助理回應只存前 100 字存 Daily 類別(會保留較久)。
- LLM 直接操控記憶:
memory_store、memory_recall、memory_forget 這三個工具也被當成普通的 Tool 掛載到 agent 上,LLM 自己可以主動讀寫記憶。
小結
zeroclaw Memory 設計幾個值得注意的地方:
- 全部必填,沒有預設:每種後端的搜尋策略差太多,預設實作沒有意義。
- SQLite 就夠用:FTS5 + 向量暴力搜尋在本地 agent 的規模不需要 pgvector,單一
.db 檔沒有外部依賴。
- 三層 fallback 搜尋:Hybrid(FTS5 + 向量)→ BM25 only →
LIKE 兜底,優雅降級。
- SHA-256 快取,LRU 在 SQL 裡做:快取邏輯不依賴記憶體狀態,重啟後仍然有效。
- 靈魂快照:
MEMORY_SNAPSHOT.md 是 Git 可追蹤的,讓 agent 可以從純文字重建長期記憶,冷啟動不代表失憶。
- Conversation 記憶會被清理:對話上下文是短暫的,設計上就不打算永久保留。
zeroclaw 的 Tool 是 agent 採取行動的核心機制——執行 shell 指令、讀寫檔案、瀏覽網頁、呼叫 HTTP API、操作記憶體、甚至把任務委派給另一個 sub-agent。目前實作了 30+ 種工具。這篇記錄它如何用一個乾淨的 trait 統一所有工具,以及工具的組裝、dispatch、schema 正規化、安全注入等設計。
整個抽象的核心在 src/tools/traits.rs,只有四個方法是必須實作的:
#[async_trait]
pub trait Tool: Send + Sync {
fn name(&self) -> &str;
fn description(&self) -> &str;
fn parameters_schema(&self) -> serde_json::Value;
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult>;
// 預設實作:把前三個方法組裝成 ToolSpec,免費附贈
fn spec(&self) -> ToolSpec {
ToolSpec {
name: self.name().to_string(),
description: self.description().to_string(),
parameters: self.parameters_schema(),
}
}
}
幾個設計決策值得注意:
Send + Sync:讓工具可以存進 Arc<dyn Tool> 並在 thread 之間共享,必要條件。
async fn execute:所有工具都可能有 I/O,統一用 async,不需要區分同步/非同步工具。
- 參數是
serde_json::Value:不是每個工具一個強型別 struct,而是執行時從 JSON 取值。省去了大量 boilerplate,代價是錯誤在執行時才出現,而不是編譯時。
spec() 是預設方法:把 name、description、parameters_schema 這三個方法的結果組裝成 ToolSpec,新增工具完全不需要自己實作這個。
核心資料型別
三個關鍵型別:
// 每次執行的回傳值
pub struct ToolResult {
pub success: bool,
pub output: String,
pub error: Option<String>,
}
// 送給 LLM 描述「這個工具能做什麼」的規格書
pub struct ToolSpec {
pub name: String,
pub description: String,
pub parameters: serde_json::Value, // JSON Schema 物件
}
注意 execute 的錯誤處理設計:正常的執行失敗(找不到檔案、路徑不允許)用 ToolResult { success: false, error: Some(...) } 回傳;只有程式本身的 bug 或不可恢復的錯誤才回傳 anyhow::Result::Err。這讓 agent loop 可以把執行失敗的結果繼續送給 LLM 讓它決定下一步,而不是直接炸掉整個流程。
參數 schema 是用 serde_json::json!() 手寫 JSON Schema,沒有 proc macro 或 derive:
fn parameters_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The shell command to execute"
},
"approved": {
"type": "boolean",
"description": "Set true to explicitly approve medium/high-risk commands",
"default": false
}
},
"required": ["command"]
})
}
工具的組裝:工廠函式,不靠反射
工具不是自動發現的,而是在 src/tools/mod.rs 裡用工廠函式顯式組裝:
// 最小集合,給測試和簡單 agent 用
pub fn default_tools(security: Arc<SecurityPolicy>) -> Vec<Box<dyn Tool>> {
vec![
Box::new(ShellTool::new(security.clone(), runtime)),
Box::new(FileReadTool::new(security.clone())),
Box::new(FileWriteTool::new(security)),
]
}
// 完整工具集,根據設定條件啟用
pub fn all_tools_with_runtime(config, security, runtime, memory, ...) -> Vec<Box<dyn Tool>> {
let mut tools: Vec<Box<dyn Tool>> = vec![
Box::new(ShellTool::new(...)),
Box::new(FileReadTool::new(...)),
Box::new(CronAddTool::new(...)),
Box::new(MemoryStoreTool::new(...)),
// ...
];
// 條件啟用:設定決定工具集,不是全部預設開啟
if browser_config.enabled {
tools.push(Box::new(BrowserTool::new_with_backend(...)));
}
if http_config.enabled {
tools.push(Box::new(HttpRequestTool::new(...)));
}
if !agents.is_empty() {
tools.push(Box::new(DelegateTool::new(agents, ...)));
}
tools
}
這是工廠/建構器模式——沒有反射、沒有 inventory 這類 compile-time 自動收集 crate、沒有 #[register_tool] 巨集。好處是依賴關係一目了然,壞處是每加一個工具都要手動登記。對這個規模的 codebase 來說是對的取捨。
兩種 Dispatch 模式
工具的呼叫有兩條路,取決於 LLM provider 是否支援原生 function calling:
graph TD
A[LLM 回應] --> B{支援原生 Tool Calling?}
B -->|是| C[NativeToolDispatcher]
B -->|否| D[XmlToolDispatcher]
C -->|解析 API 的 tool_calls 欄位| E[ParsedToolCall]
D -->|解析文字裡的 XML tag| E
E --> F["find_tool(name)"]
F --> G["tool.execute(args)"]
G --> H[ToolResult]
H --> I[轉成 ConversationMessage]
I --> J[送回 LLM 繼續下一輪]
NativeToolDispatcher 用於 Anthropic、OpenAI、Gemini——這些 provider 會在 API response 裡回傳結構化的工具呼叫,直接解析即可。
XmlToolDispatcher 用於不支援原生 function calling 的 LLM。此時 zeroclaw 把工具的說明注入到 system prompt,要求 LLM 用特定格式回應:
<tool_call>{"name": "shell", "arguments": {"command": "ls -la"}}</tool_call>
然後用字串解析從回應文字裡抓出這些 XML tag。
Agent loop 查找工具的方式很簡單:
fn find_tool<'a>(tools: &'a [Box<dyn Tool>], name: &str) -> Option<&'a dyn Tool> {
tools.iter().find(|t| t.name() == name).map(|t| t.as_ref())
}
送給 LLM 的格式(以 OpenAI 為例):
fn tools_to_openai_format(tools: &[Box<dyn Tool>]) -> Vec<serde_json::Value> {
tools.iter().map(|tool| {
json!({
"type": "function",
"function": {
"name": tool.name(),
"description": tool.description(),
"parameters": tool.parameters_schema()
}
})
}).collect()
}
JSON Schema 的跨 Provider 正規化
各家 LLM API 對 JSON Schema 的支援程度差異很大,尤其是 Gemini——它會拒絕很多標準的 JSON Schema 關鍵字。src/tools/schema.rs 的 SchemaCleanr 專門處理這件事:
pub enum CleaningStrategy {
Gemini, // 最嚴格
Anthropic,
OpenAI, // 最寬鬆
Conservative, // 保守策略,適合未知 provider
}
pub struct SchemaCleanr;
impl SchemaCleanr {
pub fn clean_for_gemini(schema: Value) -> Value { ... }
pub fn clean_for_anthropic(schema: Value) -> Value { ... }
pub fn clean_for_openai(schema: Value) -> Value { ... }
}
各策略的主要差異:
| 關鍵字 |
Gemini |
Anthropic |
OpenAI |
minLength、pattern |
移除 |
移除 |
保留 |
$ref |
內聯展開 |
內聯展開 |
保留 |
additionalProperties |
移除 |
保留 |
保留 |
anyOf/oneOf |
攤平為第一個型別 |
保留 |
保留 |
nullable |
轉換格式 |
保留 |
保留 |
Gemini 甚至連 anyOf: [type: string, type: null](nullable 的常見寫法)都要特別轉換。這些邊緣情況全部藏在 SchemaCleanr 裡,工具本身完全不需要知道。
安全性是注入的,不是全域的
zeroclaw 的安全設計有個核心原則:安全策略是建構時注入的,不是全域的靜態狀態。
// 每個有 I/O 的工具都在建構時收到 SecurityPolicy
pub struct FileReadTool {
security: Arc<SecurityPolicy>,
}
pub struct ShellTool {
security: Arc<SecurityPolicy>,
runtime: Arc<dyn RuntimeAdapter>,
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
let path = args.get("path").and_then(|v| v.as_str())...;
// 安全檢查在 execute 裡,繞不過去
if !self.security.is_path_allowed(path) {
return Ok(ToolResult { success: false, error: Some("Path not allowed".into()), ... });
}
// 繼續執行...
}
SecurityPolicy 強制執行工作區沙盒(防止讀寫工作目錄以外的路徑)、自主等級(ReadOnly、Supervised、Full)、rate limiting 和指令白名單。
Supervised 模式下還有 ApprovalManager,在執行高風險工具前暫停等待使用者確認:
[zeroclaw] Tool: shell
Command: rm -rf /tmp/build
[y]es / [n]o / [a]lways: _
選 always 之後,這個指令會被加進 session allowlist,下次遇到同樣的指令就不再詢問。
有趣的具體工具
DelegateTool 是最有趣的工具之一——它的 parameters_schema() 是執行時動態生成的,而不是靜態寫死的:
pub struct DelegateTool {
agents: Arc<HashMap<String, DelegateAgentConfig>>,
depth: u32, // 防止無限委派遞迴
}
fn parameters_schema(&self) -> serde_json::Value {
// 根據目前設定的 agent 列表動態生成 schema
let agent_names: Vec<&str> = self.agents.keys().map(|s| s.as_str()).collect();
json!({
"properties": {
"agent": {
"type": "string",
"description": format!("Which agent to delegate to. Available: {}",
agent_names.join(", "))
},
"task": { "type": "string", "description": "Task description for the agent" }
}
})
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
if self.depth >= MAX_DELEGATION_DEPTH {
return Ok(ToolResult { success: false, error: Some("Max delegation depth reached"), ... });
}
let provider = create_provider(&agent_config.provider, ...)?;
// 120 秒 timeout,防止 sub-agent 跑太久
tokio::time::timeout(Duration::from_secs(120),
provider.chat_with_system(...)).await
}
depth 欄位防止 agent A 委派給 agent B、B 又委派回 A 的無限迴圈。
ShellTool 的安全設計很謹慎——它清空整個環境變數,只保留一個安全白名單:
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
// 清除所有環境變數,防止 API key 洩漏給子行程
let mut cmd = Command::new("sh");
cmd.env_clear(); // 全部清空
for key in SAFE_ENV_VARS { // 只加回白名單裡的
if let Ok(val) = std::env::var(key) {
cmd.env(key, val);
}
}
// 60 秒 timeout,截斷輸出到 1MB
tokio::time::timeout(Duration::from_secs(60), cmd.output()).await
}
SAFE_ENV_VARS 包含 PATH、HOME、LANG 這類系統必要的變數,但不包含任何 *_API_KEY、*_SECRET 這類敏感變數。即使主行程有這些環境變數,子行程也看不到。
sequenceDiagram
participant U as 使用者
participant AL as Agent Loop
participant P as Provider
participant LLM as LLM API
participant T as Tool
U->>AL: 送出訊息
AL->>P: tools_to_provider_format(specs)
P->>LLM: chat request + tool definitions
LLM-->>P: 回應(含 tool calls)
P-->>AL: ChatResponse { tool_calls: [...] }
loop 最多 10 次
AL->>AL: find_tool(name)
AL->>AL: ApprovalManager(Supervised 模式)
AL->>T: execute(args)
T->>T: SecurityPolicy 檢查
T-->>AL: ToolResult
AL->>P: 把結果轉成 ConversationMessage
P->>LLM: 繼續對話(含工具執行結果)
LLM-->>P: 新回應
alt 不再呼叫工具
P-->>AL: 純文字回應
AL-->>U: 最終答覆
end
end
整個流程最多迴圈 10 次(DEFAULT_MAX_TOOL_ITERATIONS)。超過就強制停止,防止 agent 無止境地呼叫工具。
小結
zeroclaw 的 Tool 設計幾個值得借鑑的地方:
- 最小介面:四個必填方法,
spec() 是免費的預設實作,新增工具的門檻很低。
- 工廠函式,不靠反射:顯式組裝、條件啟用,依賴關係清晰,沒有魔法。
- 兩種 Dispatch 兼顧:原生 API tool calling 和 XML prompt 注入都支援,對各種 LLM 都能用。
- Schema 正規化是一等公民:
SchemaCleanr 把各家 API 的奇葩限制集中處理,工具本身不受污染。
- 安全是建構時注入的:
SecurityPolicy 在 new() 時就進去了,不是全域狀態,也無法繞過。
- 執行失敗不等於程式崩潰:
ToolResult { success: false } 讓 agent 可以從失敗中學習,繼續嘗試。
zeroclaw 的 Channel 負責接收與傳送訊息,支援 Telegram、Discord、Slack、Matrix、iMessage、Signal、IRC、Email 等 14 種平台。這篇記錄它如何用一個乾淨的 trait 把這些平台統一起來,以及 runtime 如何把訊息分派給 LLM 處理。
Trait 設計:三個必填,其餘自選
Channel trait 定義在 src/channels/traits.rs,只有三個方法是必須實作的:
#[async_trait]
pub trait Channel: Send + Sync {
fn name(&self) -> &str;
async fn send(&self, message: &SendMessage) -> anyhow::Result<()>;
async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()>;
// 以下都有預設空實作(no-op)
async fn health_check(&self) -> bool { true }
async fn start_typing(&self, _recipient: &str) -> anyhow::Result<()> { Ok(()) }
async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> { Ok(()) }
// 串流草稿更新(目前只有 Telegram 實作)
fn supports_draft_updates(&self) -> bool { false }
async fn send_draft(&self, _message: &SendMessage) -> anyhow::Result<Option<String>> { Ok(None) }
async fn update_draft(&self, _recipient: &str, _message_id: &str, _text: &str) -> anyhow::Result<()> { Ok(()) }
async fn finalize_draft(&self, _recipient: &str, _message_id: &str, _text: &str) -> anyhow::Result<()> { Ok(()) }
}
最有趣的是 listen 的簽名:它拿到一個 mpsc::Sender<ChannelMessage>,然後把收到的訊息推進去——而不是回呼(callback)。這是 push-based observer pattern,讓所有平台的訊息都匯聚到同一條 mpsc 管道,runtime 只需要從一個地方消費。
訊息型別
進來的訊息(ChannelMessage)和出去的訊息(SendMessage)分開定義:
pub struct ChannelMessage {
pub id: String, // 去重用的 ID
pub sender: String, // 發送者識別碼
pub reply_target: String, // 要回覆的目標(chat_id、channel_id 等)
pub content: String, // 正規化後的純文字內容
pub channel: String, // "telegram" | "discord" | "slack" | ...
pub timestamp: u64,
}
pub struct SendMessage {
pub content: String,
pub recipient: String, // 平台專屬的目標識別碼
pub subject: Option<String>, // Email 用的主旨
}
各平台的 sender/reply_target 格式天差地別(Telegram 是數字 chat_id、Discord 是 snowflake ID、Email 是郵件地址),但對上層 runtime 來說一律是字串,不需要知道細節。
14 個平台,一個介面
src/channels/mod.rs 列出了目前支援的所有 channel:
| 分類 |
平台 |
底層協議 |
| 終端 |
CLI |
stdin/stdout |
| 即時通訊 |
Telegram |
HTTP long-polling |
|
Discord |
WebSocket Gateway |
|
Slack |
HTTP polling |
|
Mattermost |
WebSocket / HTTP |
|
WhatsApp |
Meta Cloud API webhook |
|
iMessage |
macOS osascript |
|
Matrix |
matrix-sdk 同步 |
|
Signal |
signal-cli HTTP daemon |
|
IRC |
TCP/TLS |
| 企業通訊 |
Lark/Feishu |
WebSocket + Protobuf |
|
DingTalk |
WebSocket |
|
QQ |
HTTP API |
| 電子郵件 |
Email |
SMTP + IMAP |
每個 channel 一個 struct,實作 Channel trait,差異只在各自的 struct 欄位和協議處理。
有趣的實作細節
Discord:手刻 Gateway WebSocket
Discord 沒有用任何 Discord SDK,而是用 tokio-tungstenite 手刻了整個 Gateway 協議:
- 先打 REST 拿 Gateway URL
- 建立 WebSocket 連線
- 走完
Hello → Identify → READY 握手流程
- 另開一個 task 發心跳(heartbeat)
- 處理各種 opcode:1(心跳請求)、7(重連)、9(session 失效)
這樣做的好處是二進位體積可控、依賴少,但代價是要自己維護協議細節。
Telegram:串流草稿更新
Telegram 是唯一實作 supports_draft_updates() 的 channel,可以做到「LLM 邊生成邊更新訊息」的效果:
sequenceDiagram
participant U as 使用者
participant TG as Telegram
participant A as Agent
participant LLM as LLM
U->>TG: 送出訊息
TG->>A: ChannelMessage
A->>TG: send_draft("...") → message_id
A->>+LLM: 串流請求
loop 每隔 N ms
LLM-->>A: delta token
A->>TG: update_draft(message_id, 累積文字)
Note over TG,U: 使用者看到訊息逐漸變長
end
LLM-->>-A: 生成完畢
A->>TG: finalize_draft(message_id, 最終格式)
這個效果類似 ChatGPT 網頁版的逐字輸出,但是在 Telegram 上實現。
Lark/Feishu:Protobuf Frame
Lark 的 WebSocket 協議不是 JSON,而是用 prost(Rust 的 protobuf 實作)解析自訂的 PbFrame 格式,區分 method=0(ping/pong)和 method=1(實際事件)。是這 14 個 channel 裡協議最複雜的一個。
Runtime:把一切串起來
所有 channel 都跑起來之後,start_channels() 做的事情:
- 一條共用 mpsc:所有 channel 的
listen 都推到同一個 Sender<ChannelMessage>
- 監控重啟(supervised restart):每個
listen 都包在一個重試迴圈裡,失敗後指數退避重試
- 信號量限流的 worker pool:從 mpsc 讀訊息,用
Semaphore 限制同時在飛的請求數
graph LR
TG[Telegram] -->|listen| Q[(mpsc queue)]
DC[Discord] -->|listen| Q
SL[Slack] -->|listen| Q
MT[Matrix ...] -->|listen| Q
Q --> W1[Worker]
Q --> W2[Worker]
Q --> W3[Worker]
W1 --> LLM[LLM Provider]
W2 --> LLM
W3 --> LLM
// 監控重啟
loop {
let result = ch.listen(tx.clone()).await;
// 正常退出 → 重置退避;錯誤 → 等待後重試
tokio::time::sleep(Duration::from_secs(backoff)).await;
backoff = backoff.saturating_mul(2).min(max_backoff);
}
// 信號量限流
let semaphore = Arc::new(Semaphore::new(max_in_flight));
while let Some(msg) = rx.recv().await {
let permit = semaphore.acquire_owned().await?;
workers.spawn(async move {
let _permit = permit; // drop 時自動釋放
process_channel_message(worker_ctx, msg).await;
});
}
最大並發數是 channel 數量 × 4,夾在 8 到 64 之間。
每則訊息的處理流程
process_channel_message() 做的事遠不只是「丟給 LLM」:
- 攔截 runtime 指令(
/models、/model)— 支援不重啟切換 LLM provider
- 從 per-sender 對話歷史取得上下文(最多 50 輪)
- 從記憶體召回相關內容(向量相似度搜尋)
- 依平台注入特殊指令(例如告訴 Telegram 版的 LLM 可以用
[IMAGE:path] 語法發圖)
- 如果支援 draft update,先發
"..." 佔位,再串流更新
- 送進 LLM + tool call 迴圈(最多 300 秒 timeout)
- 把結果存回對話歷史,回覆給使用者
Gateway vs Channel
zeroclaw 有兩種接收訊息的模式,不要搞混:
- Channel(
start_channels):agent 主動出去建立連線(WebSocket、long-polling),agent 是 client。
- Gateway(
src/gateway/):用 axum 開一個 HTTP server,等平台打 webhook 進來,agent 是 server。
graph TB
subgraph channel_mode["Channel 模式(Agent 主動連出)"]
direction LR
A1[Agent] <-->|WebSocket / long-polling| P1["Telegram / Discord\nSlack / Matrix ..."]
end
subgraph gateway_mode["Gateway 模式(等 Webhook 打入)"]
direction LR
P2["WhatsApp\n(Meta Cloud API)"] -->|HTTP POST + HMAC 驗簽| A2["Agent\n(axum HTTP server)"]
end
WhatsApp(Meta Cloud API)就是用 Gateway 模式,因為 Meta 只支援 webhook,不提供讓你長連的協議。Gateway 還內建了滑動視窗的 rate limiter、HMAC-SHA256 簽名驗證和請求去重(idempotency store)。
小結
zeroclaw Channel 設計的幾個值得學習的地方:
- Push to mpsc,不用 callback:
listen 只管推訊息,runtime 只管消費,兩邊完全解耦。
- Supervised restart 是一等公民:每個 channel 的生命週期都有人顧,不怕某個平台偶爾斷線。
- Draft updates 是 opt-in 擴充:預設 no-op,只有真的需要的平台實作,不污染介面。
- Gateway 和 Channel 分開:推(長連)和拉(webhook)兩種模式有各自的架構,互不混淆。
zeroclaw 是一個 Rust-first 的自主 Agent 執行時(autonomous agent runtime),設計目標是高效能、高穩定性、高擴充性與高安全性。它的架構以 trait 為核心,包含 Provider(LLM)、Channel(Telegram/Discord/Slack)、Tool、Memory、Security、Peripheral(STM32/RPi GPIO)等模組。這篇聚焦在它如何設計 LLM Provider 的抽象層,統一了 OpenAI、Anthropic、Gemini 等各種不同 API 的介面。
一個 Trait,一個必填方法
整個抽象的核心是 Provider trait,但它聰明的地方在於:只有一個方法是必須實作的:
#[async_trait]
pub trait Provider: Send + Sync {
async fn chat_with_system(
&self,
system_prompt: Option<&str>,
message: &str,
model: &str,
temperature: f64,
) -> anyhow::Result<String>;
// 其他方法都有預設實作,最終都會呼叫 chat_with_system
async fn simple_chat(&self, message: &str, model: &str, temperature: f64) -> anyhow::Result<String> { ... }
async fn chat_with_history(&self, messages: &[ChatMessage], ...) -> anyhow::Result<String> { ... }
async fn chat(&self, request: ChatRequest<'_>, ...) -> anyhow::Result<ChatResponse> { ... }
// ...
}
這意味著加一個新 provider 最少只需要 ~30 行程式碼。其他的 simple_chat、chat_with_history 這些便利方法,都有預設實作會幫你委派過去。
這裡用了 async_trait crate,因為 trait 裡的 async fn 需要特別處理才能支援 dyn Provider。
統一的訊息型別
各家 LLM API 的訊息格式其實大同小異,zeroclaw 定義了一套統一的型別:
pub struct ChatMessage {
pub role: String, // "system", "user", "assistant", "tool"
pub content: String,
}
pub struct ChatResponse {
pub text: Option<String>,
pub tool_calls: Vec<ToolCall>,
}
pub struct ToolCall {
pub id: String,
pub name: String,
pub arguments: String, // JSON 字串
}
ChatMessage 還提供了工廠方法:ChatMessage::user("hello")、ChatMessage::assistant("hi") 這樣,寫起來很順。
多輪對話(包含 tool call 結果)則用一個 enum 來表達:
pub enum ConversationMessage {
Chat(ChatMessage),
AssistantToolCalls { text: Option<String>, tool_calls: Vec<ToolCall> },
ToolResults(Vec<ToolResultMessage>),
}
Decorator Pattern:功能疊加
Provider trait 自己也可以被包起來再實作 Provider,這讓功能可以自由組合。
ReliableProvider:自動重試 + API key 輪換
pub struct ReliableProvider {
providers: Vec<(String, Box<dyn Provider>)>,
max_retries: u32,
base_backoff_ms: u64,
api_keys: Vec<String>,
key_index: AtomicUsize, // 用 AtomicUsize 做 round-robin
model_fallbacks: HashMap<String, Vec<String>>,
}
遇到 429 Rate Limit 就輪換 API key、指數退避重試;遇到 4xx 非 429 的錯誤就直接放棄;甚至還能解析 Retry-After header 來決定等多久。
RouterProvider:按 model 名稱路由
// 用 "hint:" 前綴來指定路由
fn resolve(&self, model: &str) -> (usize, String) {
if let Some(hint) = model.strip_prefix("hint:") {
if let Some((idx, resolved_model)) = self.routes.get(hint) {
return (*idx, resolved_model.clone());
}
}
(self.default_index, model.to_string())
}
傳入 "hint:reasoning" 就會路由到你設定好對應「reasoning 任務」的 provider + model 組合。很適合多 provider 環境下的靈活調度。
各家 API 對 function calling 的格式差異很大,zeroclaw 用一個 enum 來表達:
pub enum ToolsPayload {
Gemini { function_declarations: Vec<serde_json::Value> },
Anthropic { tools: Vec<serde_json::Value> },
OpenAI { tools: Vec<serde_json::Value> },
PromptGuided { instructions: String }, // 給不支援原生 tool call 的 provider 用
}
實際上支援三種策略:
- 原生 Tool Calling:Anthropic、OpenAI、Gemini 都有各自的格式,provider 各自轉換。
- Prompt 注入(PromptGuided):把 tool 的說明文字注入到 system prompt,讓 LLM 用
<tool_call>JSON</tool_call> 這樣的 XML tag 來回應。
- 自動降級:
OpenAiCompatibleProvider 會先嘗試原生格式,如果收到 "unknown parameter: tools" 這類錯誤,就自動切換到 prompt 注入模式。這樣就算某個 OpenAI 相容的 endpoint 不支援 tool call,也能無縫 fallback。
各家 Provider 的奇特之處
實作各家 provider 的過程中,可以看到各家 API 設計的一些有趣差異:
Anthropic:支援 Prompt Caching,會自動對大型 system prompt(>3072 bytes)或長對話(>4 則)加上 cache_control: ephemeral,降低費用。
Gemini:是唯一把 API key 放在 query string(?key=xxx)而不是 header 裡的。而且 OAuth 使用者(Gemini CLI)必須打完全不同的 endpoint(cloudcode-pa.googleapis.com)。
OpenAiCompatibleProvider:一個實作打遍幾乎所有 OpenAI 相容的 API——Venice、Cloudflare、Groq、Mistral、xAI、DeepSeek、Perplexity……超過 15 個 provider 都是它的變體。靠著 AuthStyle enum(Bearer、XApiKey、Custom)和可設定的 base URL 來區分。
工廠函式
最後,使用者不需要直接 new 這些 struct,而是透過三個工廠函式:
// 簡單建立單一 provider
create_provider(name, api_key)
// 加上重試 + fallback 功能
create_resilient_provider(..., reliability)
// 加上多 provider 路由
create_routed_provider(..., model_routes)
API key 解析的優先順序是:明確傳入的參數 → provider 專屬的環境變數(如 ANTHROPIC_API_KEY)→ 通用的 ZEROCLAW_API_KEY → API_KEY。
小結
zeroclaw 的設計讓我印象深刻的幾個點:
- 最小介面原則:只強制一個方法,其他都是預設實作,加新 provider 成本很低。
- Decorator 而非繼承:retry、routing 都是包裝器,跟具體 provider 完全解耦。
- 漸進式降級:tool calling 從原生到 prompt 注入的自動 fallback,使用者完全感知不到。
- 務實:面對各家 API 的奇葩設計(Gemini 的 query param auth、各種 reasoning_content 欄位)直接在具體實作裡處理,不讓這些噪音污染核心抽象。
本文涵蓋為什麼 Rust trait 不支援 async fn、async_trait crate 如何解決這個問題、它的代價是什麼,以及 Rust 1.75 之後的原生支援現況。
問題:async fn 不能直接用在 trait 裡
如果你第一次在 Rust 寫 async 相關的 trait,很可能會直覺地這樣寫:
trait MyTrait {
async fn do_something(&self) -> String; // ❌ 編譯錯誤
}
但這在穩定版 Rust(1.75 之前)是不允許的。為什麼?
根本原因:impl Trait 與 dyn Trait 的衝突
async fn 是語法糖,它會被編譯器展開成回傳 impl Future 的普通函式:
// 你寫的:
async fn do_something(&self) -> String { ... }
// 編譯器看到的:
fn do_something(&self) -> impl Future<Output = String> { ... }
問題出在 trait 裡。每個實作 trait 的型別,其 do_something 都會回傳一個不同的具體 Future 型別:
impl MyTrait for StructA {
// 回傳型別是某個只有編譯器知道名字的 Future_A
async fn do_something(&self) -> String { ... }
}
impl MyTrait for StructB {
// 回傳型別是另一個 Future_B,跟 Future_A 完全不同
async fn do_something(&self) -> String { ... }
}
這讓 trait 無法做到 object-safe(也就是無法用 dyn MyTrait),因為動態派發需要在執行期才知道要呼叫哪個實作,但每個實作的回傳型別大小不同,vtable 根本無法表達。
解法:async_trait crate
async_trait 是由 David Tolnay(serde、anyhow 的作者)開發的 procedural macro,它的做法是把所有 async fn 的回傳值統一包進 Box:
use async_trait::async_trait;
#[async_trait]
trait MyTrait {
async fn do_something(&self) -> String;
}
#[async_trait]
impl MyTrait for MyStruct {
async fn do_something(&self) -> String {
"hello".to_string()
}
}
macro 展開後,實際上變成這樣:
trait MyTrait {
fn do_something(&self) -> Pin<Box<dyn Future<Output = String> + Send + '_>>;
}
impl MyTrait for MyStruct {
fn do_something(&self) -> Pin<Box<dyn Future<Output = String> + Send + '_>> {
Box::pin(async move {
"hello".to_string()
})
}
}
回傳型別統一成 Pin<Box<dyn Future>>,大小固定,vtable 可以表達,dyn MyTrait 就可以正常運作了。
代價與限制
堆積分配(Heap allocation)
每次呼叫 async trait method,都會觸發一次 Box::pin(),也就是一次 heap 分配。對於高頻呼叫的場景,這個開銷是需要考慮的。
Send bound
預設情況下,async_trait 要求產生的 Future 必須是 Send(可以跨執行緒傳遞),適合多執行緒 async runtime(如 Tokio)。
如果你的情境不需要 Send(例如單執行緒 runtime),可以這樣關掉:
#[async_trait(?Send)]
trait MyTrait {
async fn do_something(&self) -> String;
}
生命週期複雜度
macro 會自動處理大部分生命週期,但在一些邊緣情況(例如 &self 裡有複雜的借用關係)還是可能需要手動標注,錯誤訊息也可能比較難讀。
Rust 1.75 的原生支援
Rust 1.75(2023 年 12 月)穩定了 Return Position Impl Trait in Trait(RPITIT),讓 async fn 可以直接用在 trait 裡:
trait MyTrait {
async fn do_something(&self) -> String; // ✅ Rust 1.75+ 可以!
}
不需要任何外部 crate,不需要 Box,沒有 heap 分配。
但 dyn Trait 仍有限制
原生支援的版本有一個重要限制:動態派發(dyn MyTrait)尚未完全支援。
fn call(obj: &dyn MyTrait) { // ⚠️ 可能有限制
obj.do_something();
}
如果你需要 dyn Trait,目前仍建議使用 async_trait crate,或者搭配 dynosaur 等新興 crate 來橋接。
async_trait 與 Pin 的關係
async_trait 展開後的回傳型別是 Pin<Box<dyn Future>>,這裡的 Pin 是必要的——因為 dyn Future 可能是自引用的(async block 裡可能跨 await 持有引用),必須保證它被 poll 的過程中不會被移動。
這也是為什麼 async_trait 的文件裡,你會看到它與 Pin、Box 密不可分。如果你對 Pin 還不熟悉,可以先閱讀《深入理解 Rust 的 Pin》。
應該用哪個?
| 情境 |
建議 |
Rust 1.75+,不需要 dyn Trait |
原生 async fn in trait |
需要 dyn Trait |
async_trait crate |
| 效能極度敏感,避免 heap 分配 |
考慮手動實作或 impl Trait 參數 |
| 舊版 Rust(< 1.75) |
async_trait crate |
總結
- Rust trait 原本不支援
async fn,根本原因是每個實作的 Future 型別不同,無法做到 object-safe。
async_trait crate 透過把回傳值包進 Pin<Box<dyn Future>> 解決這個問題,代價是每次呼叫需要 heap 分配。
- Rust 1.75 穩定了原生的
async fn in trait,但動態派發(dyn Trait)的支援仍不完整,async_trait 在這個場景仍有其價值。