Simply Patrick

zeroclaw 如何設計 Memory 的抽象層

zeroclaw 的 Memory 是 agent 的長期記憶——它記錄對話歷史、使用者偏好、決策紀錄,並在每次對話前把相關記憶召回注入給 LLM。這篇記錄它如何用一個 trait 統一四種後端(SQLite、Markdown、Lucid、Postgres),以及它最有趣的設計:混合式語意搜尋、Embedding LRU 快取、還有讓 agent 在冷啟動時從 Markdown 文字檔重建「靈魂」的機制。

Memory Trait:七個方法,全部必填

ChannelProvider 的設計不同——後兩者都有預設實作——Memory trait 的七個方法全部是必填的

#[async_trait]
pub trait Memory: Send + Sync {
    fn name(&self) -> &str;

    async fn store(
        &self,
        key: &str,
        content: &str,
        category: MemoryCategory,
        session_id: Option<&str>,
    ) -> anyhow::Result<()>;

    async fn recall(
        &self,
        query: &str,
        limit: usize,
        session_id: Option<&str>,
    ) -> anyhow::Result<Vec<MemoryEntry>>;

    async fn get(&self, key: &str) -> anyhow::Result<Option<MemoryEntry>>;
    async fn list(
        &self,
        category: Option<&MemoryCategory>,
        session_id: Option<&str>,
    ) -> anyhow::Result<Vec<MemoryEntry>>;
    async fn forget(&self, key: &str) -> anyhow::Result<bool>;
    async fn count(&self) -> anyhow::Result<usize>;
    async fn health_check(&self) -> bool;
}

這反映了「每種後端的搜尋策略都根本不同」——SQLite 用 FTS5 + 向量搜尋,Markdown 用關鍵字比對,Postgres 用 ILIKE——沒有一個合理的預設實作能套用在所有後端,所以索性不提供。


核心資料型別

pub struct MemoryEntry {
    pub id: String,
    pub key: String,
    pub content: String,
    pub category: MemoryCategory,
    pub timestamp: String,
    pub session_id: Option<String>,
    pub score: Option<f64>,  // 只有 recall() 會填,get()/list() 都是 None
}

pub enum MemoryCategory {
    Core,             // 長期事實、偏好、決策
    Daily,            // 每日工作日誌
    Conversation,     // 對話上下文
    Custom(String),   // 開放式擴充點
}

score 的設計很有趣:它是 recall() 的副產品,代表這筆記憶和查詢的相關度。get()list() 都是精確查找,沒有「相關度」的概念,所以這個欄位在那兩個情境下永遠是 NoneCustom(String) 則是一個逃生艙,讓使用者可以自訂任意分類而不需要修改 enum。


四個後端,一個介面

zeroclaw 目前有四個實作(加上一個 no-op):

後端 搜尋方式 特性
SqliteMemory FTS5 + 向量 (cosine) 推薦預設,單一 .db
MarkdownMemory 關鍵字比對 純文字,只能追加
LucidMemory SQLite + 外部 CLI 橋接第三方記憶工具
PostgresMemory ILIKE 模糊比對 雲端持久化,無 pgvector
NoneMemory 完全 no-op,停用記憶

SQLite:真正的大腦

SqliteMemory 是推薦的預設後端,資料存在 workspace/memory/brain.db。它的設計最複雜,也最值得細看。

Schema

三張表,各有不同用途:

-- 主要記憶表
CREATE TABLE memories (
    id         TEXT PRIMARY KEY,
    key        TEXT UNIQUE NOT NULL,   -- upsert 的 identity key
    content    TEXT NOT NULL,
    category   TEXT NOT NULL DEFAULT 'core',
    embedding  BLOB,                   -- f32 向量,little-endian bytes
    created_at TEXT NOT NULL,
    updated_at TEXT NOT NULL,
    session_id TEXT
);

-- FTS5 全文搜尋虛擬表
CREATE VIRTUAL TABLE memories_fts USING fts5(
    key, content,
    content=memories, content_rowid=rowid
);

-- Embedding LRU 快取
CREATE TABLE embedding_cache (
    content_hash TEXT PRIMARY KEY,  -- SHA-256 前 16 hex chars
    embedding    BLOB NOT NULL,
    created_at   TEXT NOT NULL,
    accessed_at  TEXT NOT NULL      -- LRU 用
);

FTS5 透過三個 trigger(INSERT、DELETE、UPDATE)和主表保持同步。store() 用的是 INSERT ... ON CONFLICT(key) DO UPDATE——upsert 語意,key 是唯一識別碼,id 是 UUID(每次 upsert 都會更新)。

SQLite 開啟時的 PRAGMA 調校:WAL 模式、8 MB mmap、2 MB page cache、MEMORY temp store。

為什麼用 spawn_blocking

rusqlite 是 blocking API,直接在 async context 呼叫會卡住 tokio runtime。所有 SQLite 操作都包在 tokio::task::spawn_blocking 裡:

let result = tokio::task::spawn_blocking(move || {
    let conn = db.lock();
    conn.execute("INSERT INTO memories ...", params![...])?;
    Ok(())
}).await??;

連線的開啟還多了一層防護——用 worker thread + mpsc::channel 加上 recv_timeout,防止 SQLite 檔案被鎖住時永遠卡住:

let (tx, rx) = mpsc::channel();
thread::spawn(move || {
    let _ = tx.send(Connection::open(&path));
});
match rx.recv_timeout(Duration::from_secs(capped)) {
    Ok(Ok(conn)) => conn,
    Ok(Err(e)) => return Err(e.into()),
    Err(_) => return Err(anyhow!("SQLite open timed out")),
}

混合語意搜尋:FTS5 + 向量

recall() 的實作是整個記憶系統最有趣的部分。它不是單一的搜尋策略,而是三層 fallback:

graph TD A["recall(query, limit)"] --> B{有設定 Embedding provider?} B -->|是| C[生成查詢向量] C --> D[FTS5 BM25 搜尋] D --> E["hybrid_merge(向量結果, 關鍵字結果)"] E --> F[回傳 Top-N] B -->|否| G[純 BM25 搜尋] G --> F D -->|FTS5 解析錯誤| H["LIKE %keyword% 兜底"] H --> F

向量:純 Rust Cosine Similarity

向量以 f32 little-endian bytes 存在 BLOB 欄位。Cosine similarity 是純 Rust 實作,刻意用 f64 累加以減少高維向量的浮點誤差:

pub fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
    let mut dot = 0.0_f64;
    let mut norm_a = 0.0_f64;
    let mut norm_b = 0.0_f64;

    for (x, y) in a.iter().zip(b.iter()) {
        let x = f64::from(*x);
        let y = f64::from(*y);
        dot += x * y;
        norm_a += x * x;
        norm_b += y * y;
    }
    let denom = norm_a.sqrt() * norm_b.sqrt();
    if !denom.is_finite() || denom < f64::EPSILON {
        return 0.0;
    }
    (dot / denom).clamp(0.0, 1.0) as f32
}

這是暴力掃描所有有 embedding 的 row,沒有 ANN index(如 HNSW、IVFFlat)。對一個本地 agent 的記憶量來說夠用,不需要引入 pgvector 或外部向量 DB 的依賴。

關鍵字:FTS5 BM25

查詢前先把每個 token 用雙引號包住再用 OR 連接,避免 FTS5 特殊字元造成解析錯誤:

let fts_query = query
    .split_whitespace()
    .map(|w| format!("\"{w}\""))
    .collect::<Vec<_>>()
    .join(" OR ");

FTS5 的 bm25() 分數是負數(越負越相關),取負後變成越大越好,和 cosine similarity 的方向統一。

混合合併

pub fn hybrid_merge(
    vector_results: &[(String, f32)],   // (id, cosine)
    keyword_results: &[(String, f32)],  // (id, -BM25)
    vector_weight: f32,                 // 預設 0.7
    keyword_weight: f32,                // 預設 0.3
    limit: usize,
) -> Vec<ScoredResult> {
    // BM25 分數正規化到 [0,1]
    // final_score = 0.7 * cosine + 0.3 * normalized_BM25
    // 去重:HashMap keyed on id
}

合併後的 Top-N ID 用一次 WHERE id IN (...) 批次撈出,避免 N+1 query。


Embedding LRU 快取

每次 store() 都需要為新內容生成 embedding,但相同的文字不應該重複打 API。快取的 key 是內容的 SHA-256 前 16 hex chars——明確選擇 SHA-256 而不是 DefaultHasher,因為後者的輸出在不同 Rust 版本之間不保證穩定。

LRU 淘汰不在 Rust 記憶體裡維護資料結構,而是直接在 SQL 裡做:

-- 保留最新的 N 筆,刪除多餘的舊資料
DELETE FROM embedding_cache WHERE content_hash IN (
    SELECT content_hash FROM embedding_cache
    ORDER BY accessed_at ASC
    LIMIT MAX(0, (SELECT COUNT(*) FROM embedding_cache) - ?1)
)

這樣快取的淘汰邏輯完全不依賴記憶體狀態,重啟之後一樣有效。


Embedding Provider

Embedding 本身也是一個 trait:

#[async_trait]
pub trait EmbeddingProvider: Send + Sync {
    fn name(&self) -> &str;
    fn dimensions(&self) -> usize;
    async fn embed(&self, texts: &[&str]) -> anyhow::Result<Vec<Vec<f32>>>;

    // 唯一的預設方法:批次 embed 的單一版本
    async fn embed_one(&self, text: &str) -> anyhow::Result<Vec<f32>> {
        let mut results = self.embed(&[text]).await?;
        results.pop().ok_or_else(|| anyhow!("Empty embedding result"))
    }
}

兩個具體實作:

  • NoopEmbeddingdimensions() = 0,回傳空向量。啟用後自動降級為純關鍵字搜尋。
  • OpenAiEmbedding:打 /v1/embeddings endpoint。支援自訂 base URL,用 custom: 前綴指定:
pub fn create_embedding_provider(provider: &str, api_key: ..., model: &str, dims: usize) {
    match provider {
        "openai" => Box::new(OpenAiEmbedding::new("https://api.openai.com", ...)),
        name if name.starts_with("custom:") => {
            let base_url = name.strip_prefix("custom:").unwrap_or("");
            Box::new(OpenAiEmbedding::new(base_url, ...))  // 指向 Ollama、本地推理伺服器等
        }
        _ => Box::new(NoopEmbedding),
    }
}

MarkdownMemory:只能追加的審計日誌

MarkdownMemoryforget() 永遠回傳 false——這個後端是只能追加的,設計上就是個審計日誌:

workspace/MEMORY.md                  ← Core 類別
workspace/memory/2026-02-19.md       ← Daily 類別(按日期分檔)

每次 store() 就在對應的 Markdown 檔尾端追加一行 - **key**: contentrecall() 則是把查詢拆成關鍵字,計算每筆記憶裡出現了幾個,算出比例當作分數。

雖然搜尋能力差,但好處是人類可讀、可以直接用 Git 追蹤,也容易備份。


LucidMemory:橋接外部 CLI

LucidMemory 是一個橋接層,SQLite 是主要儲存,外部 lucid-memory CLI 是輔助:

pub struct LucidMemory {
    local: SqliteMemory,
    recall_timeout: Duration,           // 預設 500ms
    store_timeout: Duration,            // 預設 800ms
    local_hit_threshold: usize,         // 預設 3:本地結果夠多就不問 Lucid
    failure_cooldown: Duration,         // 任何錯誤後冷卻 15 秒
    last_failure_at: Mutex<Option<Instant>>,
}

store() 先寫 SQLite,再 fire-and-forget 同步到 Lucid。recall() 的策略:先問 SQLite,如果結果數量低於 local_hit_threshold 且 Lucid 不在冷卻期,才去呼叫 lucid context <query>,然後合併結果。

去重用的是一個 NUL 字元分隔的簽名:

let signature = format!("{}\u{0}{}", entry.key.to_lowercase(), entry.content.to_lowercase());

Lucid 的輸出是自訂的 XML-like 格式:

<lucid-context>
- [decision] Use token refresh middleware
- [context] Working in src/auth.rs
</lucid-context>

方括號裡的標籤(decisioncontextbuglearning)會被映射回 MemoryCategory


Agent 的靈魂:Snapshot 與冷啟動恢復

最讓我覺得有趣的設計是「靈魂快照」機制。

在每次記憶體清理(hygiene)時,如果啟用了 snapshot_on_hygiene,所有 Core 類別的記憶都會被 dump 到工作目錄的 MEMORY_SNAPSHOT.md。這個檔案是 Git 可追蹤的純文字。

更厲害的是冷啟動恢復:如果 brain.db 不存在(例如全新部署、或資料庫損毀刪除),但 MEMORY_SNAPSHOT.md 存在,factory 會在開啟後端之前先呼叫 hydrate_from_snapshot(),把 Markdown 裡的記憶逐筆還原回 SQLite。

graph LR A["brain.db 存在?"] -->|是| B[正常開啟 SqliteMemory] A -->|否| C["MEMORY_SNAPSHOT.md 存在?"] C -->|是| D[hydrate_from_snapshot] D --> E[重新開啟 SqliteMemory] C -->|否| F[全新 SqliteMemory] B --> G[完成] E --> G F --> G

每次清理也會刪掉 MemoryCategory::Conversation 類別超過 conversation_retention_days 的舊紀錄——對話上下文是短暫的,不該無限累積。


在 Agent Loop 裡的角色

Memory 在每輪對話的三個時機被使用:

sequenceDiagram participant U as 使用者 participant AL as Agent Loop participant M as Memory participant LLM as LLM U->>AL: 送出訊息 AL->>M: recall(user_msg, 5) M-->>AL: 相關記憶(score >= 0.4 才納入) AL->>LLM: [Memory context]\n- key: content\n...\n{user_msg} LLM-->>AL: 回應 AL->>M: store("user_msg_{uuid}", msg, Conversation) AL->>M: store("assistant_resp_{uuid}", summary[:100], Daily) AL-->>U: 最終答覆

注意幾個細節:

  • 相關度門檻score >= 0.4 才注入,低分記憶不進 context,避免雜訊干擾 LLM。
  • 自動儲存:使用者訊息存 Conversation 類別(會被清理),助理回應只存前 100 字存 Daily 類別(會保留較久)。
  • LLM 直接操控記憶memory_storememory_recallmemory_forget 這三個工具也被當成普通的 Tool 掛載到 agent 上,LLM 自己可以主動讀寫記憶。

小結

zeroclaw Memory 設計幾個值得注意的地方:

  • 全部必填,沒有預設:每種後端的搜尋策略差太多,預設實作沒有意義。
  • SQLite 就夠用:FTS5 + 向量暴力搜尋在本地 agent 的規模不需要 pgvector,單一 .db 檔沒有外部依賴。
  • 三層 fallback 搜尋:Hybrid(FTS5 + 向量)→ BM25 only → LIKE 兜底,優雅降級。
  • SHA-256 快取,LRU 在 SQL 裡做:快取邏輯不依賴記憶體狀態,重啟後仍然有效。
  • 靈魂快照MEMORY_SNAPSHOT.md 是 Git 可追蹤的,讓 agent 可以從純文字重建長期記憶,冷啟動不代表失憶。
  • Conversation 記憶會被清理:對話上下文是短暫的,設計上就不打算永久保留。

zeroclaw 如何設計 Tool 的抽象層

zeroclaw 的 Tool 是 agent 採取行動的核心機制——執行 shell 指令、讀寫檔案、瀏覽網頁、呼叫 HTTP API、操作記憶體、甚至把任務委派給另一個 sub-agent。目前實作了 30+ 種工具。這篇記錄它如何用一個乾淨的 trait 統一所有工具,以及工具的組裝、dispatch、schema 正規化、安全注入等設計。

Tool Trait:四個必填,一個免費

整個抽象的核心在 src/tools/traits.rs,只有四個方法是必須實作的:

#[async_trait]
pub trait Tool: Send + Sync {
    fn name(&self) -> &str;
    fn description(&self) -> &str;
    fn parameters_schema(&self) -> serde_json::Value;
    async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult>;

    // 預設實作:把前三個方法組裝成 ToolSpec,免費附贈
    fn spec(&self) -> ToolSpec {
        ToolSpec {
            name: self.name().to_string(),
            description: self.description().to_string(),
            parameters: self.parameters_schema(),
        }
    }
}

幾個設計決策值得注意:

  • Send + Sync:讓工具可以存進 Arc<dyn Tool> 並在 thread 之間共享,必要條件。
  • async fn execute:所有工具都可能有 I/O,統一用 async,不需要區分同步/非同步工具。
  • 參數是 serde_json::Value:不是每個工具一個強型別 struct,而是執行時從 JSON 取值。省去了大量 boilerplate,代價是錯誤在執行時才出現,而不是編譯時。
  • spec() 是預設方法:把 namedescriptionparameters_schema 這三個方法的結果組裝成 ToolSpec,新增工具完全不需要自己實作這個。

核心資料型別

三個關鍵型別:

// 每次執行的回傳值
pub struct ToolResult {
    pub success: bool,
    pub output: String,
    pub error: Option<String>,
}

// 送給 LLM 描述「這個工具能做什麼」的規格書
pub struct ToolSpec {
    pub name: String,
    pub description: String,
    pub parameters: serde_json::Value,  // JSON Schema 物件
}

注意 execute 的錯誤處理設計:正常的執行失敗(找不到檔案、路徑不允許)用 ToolResult { success: false, error: Some(...) } 回傳;只有程式本身的 bug 或不可恢復的錯誤才回傳 anyhow::Result::Err。這讓 agent loop 可以把執行失敗的結果繼續送給 LLM 讓它決定下一步,而不是直接炸掉整個流程。

參數 schema 是用 serde_json::json!() 手寫 JSON Schema,沒有 proc macro 或 derive:

fn parameters_schema(&self) -> serde_json::Value {
    json!({
        "type": "object",
        "properties": {
            "command": {
                "type": "string",
                "description": "The shell command to execute"
            },
            "approved": {
                "type": "boolean",
                "description": "Set true to explicitly approve medium/high-risk commands",
                "default": false
            }
        },
        "required": ["command"]
    })
}

工具的組裝:工廠函式,不靠反射

工具不是自動發現的,而是在 src/tools/mod.rs 裡用工廠函式顯式組裝:

// 最小集合,給測試和簡單 agent 用
pub fn default_tools(security: Arc<SecurityPolicy>) -> Vec<Box<dyn Tool>> {
    vec![
        Box::new(ShellTool::new(security.clone(), runtime)),
        Box::new(FileReadTool::new(security.clone())),
        Box::new(FileWriteTool::new(security)),
    ]
}

// 完整工具集,根據設定條件啟用
pub fn all_tools_with_runtime(config, security, runtime, memory, ...) -> Vec<Box<dyn Tool>> {
    let mut tools: Vec<Box<dyn Tool>> = vec![
        Box::new(ShellTool::new(...)),
        Box::new(FileReadTool::new(...)),
        Box::new(CronAddTool::new(...)),
        Box::new(MemoryStoreTool::new(...)),
        // ...
    ];

    // 條件啟用:設定決定工具集,不是全部預設開啟
    if browser_config.enabled {
        tools.push(Box::new(BrowserTool::new_with_backend(...)));
    }
    if http_config.enabled {
        tools.push(Box::new(HttpRequestTool::new(...)));
    }
    if !agents.is_empty() {
        tools.push(Box::new(DelegateTool::new(agents, ...)));
    }
    tools
}

這是工廠/建構器模式——沒有反射、沒有 inventory 這類 compile-time 自動收集 crate、沒有 #[register_tool] 巨集。好處是依賴關係一目了然,壞處是每加一個工具都要手動登記。對這個規模的 codebase 來說是對的取捨。


兩種 Dispatch 模式

工具的呼叫有兩條路,取決於 LLM provider 是否支援原生 function calling:

graph TD A[LLM 回應] --> B{支援原生 Tool Calling?} B -->|是| C[NativeToolDispatcher] B -->|否| D[XmlToolDispatcher] C -->|解析 API 的 tool_calls 欄位| E[ParsedToolCall] D -->|解析文字裡的 XML tag| E E --> F["find_tool(name)"] F --> G["tool.execute(args)"] G --> H[ToolResult] H --> I[轉成 ConversationMessage] I --> J[送回 LLM 繼續下一輪]

NativeToolDispatcher 用於 Anthropic、OpenAI、Gemini——這些 provider 會在 API response 裡回傳結構化的工具呼叫,直接解析即可。

XmlToolDispatcher 用於不支援原生 function calling 的 LLM。此時 zeroclaw 把工具的說明注入到 system prompt,要求 LLM 用特定格式回應:

<tool_call>{"name": "shell", "arguments": {"command": "ls -la"}}</tool_call>

然後用字串解析從回應文字裡抓出這些 XML tag。

Agent loop 查找工具的方式很簡單:

fn find_tool<'a>(tools: &'a [Box<dyn Tool>], name: &str) -> Option<&'a dyn Tool> {
    tools.iter().find(|t| t.name() == name).map(|t| t.as_ref())
}

送給 LLM 的格式(以 OpenAI 為例):

fn tools_to_openai_format(tools: &[Box<dyn Tool>]) -> Vec<serde_json::Value> {
    tools.iter().map(|tool| {
        json!({
            "type": "function",
            "function": {
                "name": tool.name(),
                "description": tool.description(),
                "parameters": tool.parameters_schema()
            }
        })
    }).collect()
}

JSON Schema 的跨 Provider 正規化

各家 LLM API 對 JSON Schema 的支援程度差異很大,尤其是 Gemini——它會拒絕很多標準的 JSON Schema 關鍵字。src/tools/schema.rsSchemaCleanr 專門處理這件事:

pub enum CleaningStrategy {
    Gemini,       // 最嚴格
    Anthropic,
    OpenAI,       // 最寬鬆
    Conservative, // 保守策略,適合未知 provider
}

pub struct SchemaCleanr;

impl SchemaCleanr {
    pub fn clean_for_gemini(schema: Value) -> Value { ... }
    pub fn clean_for_anthropic(schema: Value) -> Value { ... }
    pub fn clean_for_openai(schema: Value) -> Value { ... }
}

各策略的主要差異:

關鍵字 Gemini Anthropic OpenAI
minLengthpattern 移除 移除 保留
$ref 內聯展開 內聯展開 保留
additionalProperties 移除 保留 保留
anyOf/oneOf 攤平為第一個型別 保留 保留
nullable 轉換格式 保留 保留

Gemini 甚至連 anyOf: [type: string, type: null](nullable 的常見寫法)都要特別轉換。這些邊緣情況全部藏在 SchemaCleanr 裡,工具本身完全不需要知道。


安全性是注入的,不是全域的

zeroclaw 的安全設計有個核心原則:安全策略是建構時注入的,不是全域的靜態狀態

// 每個有 I/O 的工具都在建構時收到 SecurityPolicy
pub struct FileReadTool {
    security: Arc<SecurityPolicy>,
}

pub struct ShellTool {
    security: Arc<SecurityPolicy>,
    runtime: Arc<dyn RuntimeAdapter>,
}

async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
    let path = args.get("path").and_then(|v| v.as_str())...;
    // 安全檢查在 execute 裡,繞不過去
    if !self.security.is_path_allowed(path) {
        return Ok(ToolResult { success: false, error: Some("Path not allowed".into()), ... });
    }
    // 繼續執行...
}

SecurityPolicy 強制執行工作區沙盒(防止讀寫工作目錄以外的路徑)、自主等級(ReadOnly、Supervised、Full)、rate limiting 和指令白名單。

Supervised 模式下還有 ApprovalManager,在執行高風險工具前暫停等待使用者確認:

[zeroclaw] Tool: shell
Command: rm -rf /tmp/build
[y]es / [n]o / [a]lways: _

always 之後,這個指令會被加進 session allowlist,下次遇到同樣的指令就不再詢問。


有趣的具體工具

DelegateTool:動態 Schema 的工具

DelegateTool 是最有趣的工具之一——它的 parameters_schema()執行時動態生成的,而不是靜態寫死的:

pub struct DelegateTool {
    agents: Arc<HashMap<String, DelegateAgentConfig>>,
    depth: u32,  // 防止無限委派遞迴
}

fn parameters_schema(&self) -> serde_json::Value {
    // 根據目前設定的 agent 列表動態生成 schema
    let agent_names: Vec<&str> = self.agents.keys().map(|s| s.as_str()).collect();
    json!({
        "properties": {
            "agent": {
                "type": "string",
                "description": format!("Which agent to delegate to. Available: {}",
                                       agent_names.join(", "))
            },
            "task": { "type": "string", "description": "Task description for the agent" }
        }
    })
}

async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
    if self.depth >= MAX_DELEGATION_DEPTH {
        return Ok(ToolResult { success: false, error: Some("Max delegation depth reached"), ... });
    }
    let provider = create_provider(&agent_config.provider, ...)?;
    // 120 秒 timeout,防止 sub-agent 跑太久
    tokio::time::timeout(Duration::from_secs(120),
        provider.chat_with_system(...)).await
}

depth 欄位防止 agent A 委派給 agent B、B 又委派回 A 的無限迴圈。

ShellTool:乾淨的執行環境

ShellTool 的安全設計很謹慎——它清空整個環境變數,只保留一個安全白名單:

async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
    // 清除所有環境變數,防止 API key 洩漏給子行程
    let mut cmd = Command::new("sh");
    cmd.env_clear();  // 全部清空
    for key in SAFE_ENV_VARS {  // 只加回白名單裡的
        if let Ok(val) = std::env::var(key) {
            cmd.env(key, val);
        }
    }
    // 60 秒 timeout,截斷輸出到 1MB
    tokio::time::timeout(Duration::from_secs(60), cmd.output()).await
}

SAFE_ENV_VARS 包含 PATHHOMELANG 這類系統必要的變數,但不包含任何 *_API_KEY*_SECRET 這類敏感變數。即使主行程有這些環境變數,子行程也看不到。


完整的 Tool Call 流程

sequenceDiagram participant U as 使用者 participant AL as Agent Loop participant P as Provider participant LLM as LLM API participant T as Tool U->>AL: 送出訊息 AL->>P: tools_to_provider_format(specs) P->>LLM: chat request + tool definitions LLM-->>P: 回應(含 tool calls) P-->>AL: ChatResponse { tool_calls: [...] } loop 最多 10 次 AL->>AL: find_tool(name) AL->>AL: ApprovalManager(Supervised 模式) AL->>T: execute(args) T->>T: SecurityPolicy 檢查 T-->>AL: ToolResult AL->>P: 把結果轉成 ConversationMessage P->>LLM: 繼續對話(含工具執行結果) LLM-->>P: 新回應 alt 不再呼叫工具 P-->>AL: 純文字回應 AL-->>U: 最終答覆 end end

整個流程最多迴圈 10 次(DEFAULT_MAX_TOOL_ITERATIONS)。超過就強制停止,防止 agent 無止境地呼叫工具。


小結

zeroclaw 的 Tool 設計幾個值得借鑑的地方:

  • 最小介面:四個必填方法,spec() 是免費的預設實作,新增工具的門檻很低。
  • 工廠函式,不靠反射:顯式組裝、條件啟用,依賴關係清晰,沒有魔法。
  • 兩種 Dispatch 兼顧:原生 API tool calling 和 XML prompt 注入都支援,對各種 LLM 都能用。
  • Schema 正規化是一等公民SchemaCleanr 把各家 API 的奇葩限制集中處理,工具本身不受污染。
  • 安全是建構時注入的SecurityPolicynew() 時就進去了,不是全域狀態,也無法繞過。
  • 執行失敗不等於程式崩潰ToolResult { success: false } 讓 agent 可以從失敗中學習,繼續嘗試。

zeroclaw 如何設計 Channel 的抽象層

zeroclaw 的 Channel 負責接收與傳送訊息,支援 Telegram、Discord、Slack、Matrix、iMessage、Signal、IRC、Email 等 14 種平台。這篇記錄它如何用一個乾淨的 trait 把這些平台統一起來,以及 runtime 如何把訊息分派給 LLM 處理。

Trait 設計:三個必填,其餘自選

Channel trait 定義在 src/channels/traits.rs,只有三個方法是必須實作的:

#[async_trait]
pub trait Channel: Send + Sync {
    fn name(&self) -> &str;
    async fn send(&self, message: &SendMessage) -> anyhow::Result<()>;
    async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()>;

    // 以下都有預設空實作(no-op)
    async fn health_check(&self) -> bool { true }
    async fn start_typing(&self, _recipient: &str) -> anyhow::Result<()> { Ok(()) }
    async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> { Ok(()) }

    // 串流草稿更新(目前只有 Telegram 實作)
    fn supports_draft_updates(&self) -> bool { false }
    async fn send_draft(&self, _message: &SendMessage) -> anyhow::Result<Option<String>> { Ok(None) }
    async fn update_draft(&self, _recipient: &str, _message_id: &str, _text: &str) -> anyhow::Result<()> { Ok(()) }
    async fn finalize_draft(&self, _recipient: &str, _message_id: &str, _text: &str) -> anyhow::Result<()> { Ok(()) }
}

最有趣的是 listen 的簽名:它拿到一個 mpsc::Sender<ChannelMessage>,然後把收到的訊息推進去——而不是回呼(callback)。這是 push-based observer pattern,讓所有平台的訊息都匯聚到同一條 mpsc 管道,runtime 只需要從一個地方消費。


訊息型別

進來的訊息(ChannelMessage)和出去的訊息(SendMessage)分開定義:

pub struct ChannelMessage {
    pub id: String,           // 去重用的 ID
    pub sender: String,       // 發送者識別碼
    pub reply_target: String, // 要回覆的目標(chat_id、channel_id 等)
    pub content: String,      // 正規化後的純文字內容
    pub channel: String,      // "telegram" | "discord" | "slack" | ...
    pub timestamp: u64,
}

pub struct SendMessage {
    pub content: String,
    pub recipient: String,        // 平台專屬的目標識別碼
    pub subject: Option<String>,  // Email 用的主旨
}

各平台的 sender/reply_target 格式天差地別(Telegram 是數字 chat_id、Discord 是 snowflake ID、Email 是郵件地址),但對上層 runtime 來說一律是字串,不需要知道細節。


14 個平台,一個介面

src/channels/mod.rs 列出了目前支援的所有 channel:

分類 平台 底層協議
終端 CLI stdin/stdout
即時通訊 Telegram HTTP long-polling
Discord WebSocket Gateway
Slack HTTP polling
Mattermost WebSocket / HTTP
WhatsApp Meta Cloud API webhook
iMessage macOS osascript
Matrix matrix-sdk 同步
Signal signal-cli HTTP daemon
IRC TCP/TLS
企業通訊 Lark/Feishu WebSocket + Protobuf
DingTalk WebSocket
QQ HTTP API
電子郵件 Email SMTP + IMAP

每個 channel 一個 struct,實作 Channel trait,差異只在各自的 struct 欄位和協議處理。


有趣的實作細節

Discord:手刻 Gateway WebSocket

Discord 沒有用任何 Discord SDK,而是用 tokio-tungstenite 手刻了整個 Gateway 協議:

  1. 先打 REST 拿 Gateway URL
  2. 建立 WebSocket 連線
  3. 走完 Hello → Identify → READY 握手流程
  4. 另開一個 task 發心跳(heartbeat)
  5. 處理各種 opcode:1(心跳請求)、7(重連)、9(session 失效)

這樣做的好處是二進位體積可控、依賴少,但代價是要自己維護協議細節。

Telegram:串流草稿更新

Telegram 是唯一實作 supports_draft_updates() 的 channel,可以做到「LLM 邊生成邊更新訊息」的效果:

sequenceDiagram participant U as 使用者 participant TG as Telegram participant A as Agent participant LLM as LLM U->>TG: 送出訊息 TG->>A: ChannelMessage A->>TG: send_draft("...") → message_id A->>+LLM: 串流請求 loop 每隔 N ms LLM-->>A: delta token A->>TG: update_draft(message_id, 累積文字) Note over TG,U: 使用者看到訊息逐漸變長 end LLM-->>-A: 生成完畢 A->>TG: finalize_draft(message_id, 最終格式)

這個效果類似 ChatGPT 網頁版的逐字輸出,但是在 Telegram 上實現。

Lark/Feishu:Protobuf Frame

Lark 的 WebSocket 協議不是 JSON,而是用 prost(Rust 的 protobuf 實作)解析自訂的 PbFrame 格式,區分 method=0(ping/pong)和 method=1(實際事件)。是這 14 個 channel 裡協議最複雜的一個。


Runtime:把一切串起來

所有 channel 都跑起來之後,start_channels() 做的事情:

  1. 一條共用 mpsc:所有 channel 的 listen 都推到同一個 Sender<ChannelMessage>
  2. 監控重啟(supervised restart):每個 listen 都包在一個重試迴圈裡,失敗後指數退避重試
  3. 信號量限流的 worker pool:從 mpsc 讀訊息,用 Semaphore 限制同時在飛的請求數
graph LR TG[Telegram] -->|listen| Q[(mpsc queue)] DC[Discord] -->|listen| Q SL[Slack] -->|listen| Q MT[Matrix ...] -->|listen| Q Q --> W1[Worker] Q --> W2[Worker] Q --> W3[Worker] W1 --> LLM[LLM Provider] W2 --> LLM W3 --> LLM
// 監控重啟
loop {
    let result = ch.listen(tx.clone()).await;
    // 正常退出 → 重置退避;錯誤 → 等待後重試
    tokio::time::sleep(Duration::from_secs(backoff)).await;
    backoff = backoff.saturating_mul(2).min(max_backoff);
}

// 信號量限流
let semaphore = Arc::new(Semaphore::new(max_in_flight));
while let Some(msg) = rx.recv().await {
    let permit = semaphore.acquire_owned().await?;
    workers.spawn(async move {
        let _permit = permit;  // drop 時自動釋放
        process_channel_message(worker_ctx, msg).await;
    });
}

最大並發數是 channel 數量 × 4,夾在 8 到 64 之間。

每則訊息的處理流程

process_channel_message() 做的事遠不只是「丟給 LLM」:

  1. 攔截 runtime 指令(/models/model)— 支援不重啟切換 LLM provider
  2. 從 per-sender 對話歷史取得上下文(最多 50 輪)
  3. 從記憶體召回相關內容(向量相似度搜尋)
  4. 依平台注入特殊指令(例如告訴 Telegram 版的 LLM 可以用 [IMAGE:path] 語法發圖)
  5. 如果支援 draft update,先發 "..." 佔位,再串流更新
  6. 送進 LLM + tool call 迴圈(最多 300 秒 timeout)
  7. 把結果存回對話歷史,回覆給使用者

Gateway vs Channel

zeroclaw 有兩種接收訊息的模式,不要搞混:

  • Channelstart_channels):agent 主動出去建立連線(WebSocket、long-polling),agent 是 client。
  • Gatewaysrc/gateway/):用 axum 開一個 HTTP server,等平台打 webhook 進來,agent 是 server。
graph TB subgraph channel_mode["Channel 模式(Agent 主動連出)"] direction LR A1[Agent] <-->|WebSocket / long-polling| P1["Telegram / Discord\nSlack / Matrix ..."] end subgraph gateway_mode["Gateway 模式(等 Webhook 打入)"] direction LR P2["WhatsApp\n(Meta Cloud API)"] -->|HTTP POST + HMAC 驗簽| A2["Agent\n(axum HTTP server)"] end

WhatsApp(Meta Cloud API)就是用 Gateway 模式,因為 Meta 只支援 webhook,不提供讓你長連的協議。Gateway 還內建了滑動視窗的 rate limiter、HMAC-SHA256 簽名驗證和請求去重(idempotency store)。


小結

zeroclaw Channel 設計的幾個值得學習的地方:

  • Push to mpsc,不用 callbacklisten 只管推訊息,runtime 只管消費,兩邊完全解耦。
  • Supervised restart 是一等公民:每個 channel 的生命週期都有人顧,不怕某個平台偶爾斷線。
  • Draft updates 是 opt-in 擴充:預設 no-op,只有真的需要的平台實作,不污染介面。
  • Gateway 和 Channel 分開:推(長連)和拉(webhook)兩種模式有各自的架構,互不混淆。

zeroclaw 如何設計 LLM Provider 的抽象層

zeroclaw 是一個 Rust-first 的自主 Agent 執行時(autonomous agent runtime),設計目標是高效能、高穩定性、高擴充性與高安全性。它的架構以 trait 為核心,包含 Provider(LLM)、Channel(Telegram/Discord/Slack)、Tool、Memory、Security、Peripheral(STM32/RPi GPIO)等模組。這篇聚焦在它如何設計 LLM Provider 的抽象層,統一了 OpenAI、Anthropic、Gemini 等各種不同 API 的介面。

一個 Trait,一個必填方法

整個抽象的核心是 Provider trait,但它聰明的地方在於:只有一個方法是必須實作的

#[async_trait]
pub trait Provider: Send + Sync {
    async fn chat_with_system(
        &self,
        system_prompt: Option<&str>,
        message: &str,
        model: &str,
        temperature: f64,
    ) -> anyhow::Result<String>;

    // 其他方法都有預設實作,最終都會呼叫 chat_with_system
    async fn simple_chat(&self, message: &str, model: &str, temperature: f64) -> anyhow::Result<String> { ... }
    async fn chat_with_history(&self, messages: &[ChatMessage], ...) -> anyhow::Result<String> { ... }
    async fn chat(&self, request: ChatRequest<'_>, ...) -> anyhow::Result<ChatResponse> { ... }
    // ...
}

這意味著加一個新 provider 最少只需要 ~30 行程式碼。其他的 simple_chatchat_with_history 這些便利方法,都有預設實作會幫你委派過去。

這裡用了 async_trait crate,因為 trait 裡的 async fn 需要特別處理才能支援 dyn Provider


統一的訊息型別

各家 LLM API 的訊息格式其實大同小異,zeroclaw 定義了一套統一的型別:

pub struct ChatMessage {
    pub role: String,   // "system", "user", "assistant", "tool"
    pub content: String,
}

pub struct ChatResponse {
    pub text: Option<String>,
    pub tool_calls: Vec<ToolCall>,
}

pub struct ToolCall {
    pub id: String,
    pub name: String,
    pub arguments: String,  // JSON 字串
}

ChatMessage 還提供了工廠方法:ChatMessage::user("hello")ChatMessage::assistant("hi") 這樣,寫起來很順。

多輪對話(包含 tool call 結果)則用一個 enum 來表達:

pub enum ConversationMessage {
    Chat(ChatMessage),
    AssistantToolCalls { text: Option<String>, tool_calls: Vec<ToolCall> },
    ToolResults(Vec<ToolResultMessage>),
}

Decorator Pattern:功能疊加

Provider trait 自己也可以被包起來再實作 Provider,這讓功能可以自由組合。

ReliableProvider:自動重試 + API key 輪換

pub struct ReliableProvider {
    providers: Vec<(String, Box<dyn Provider>)>,
    max_retries: u32,
    base_backoff_ms: u64,
    api_keys: Vec<String>,
    key_index: AtomicUsize,           // 用 AtomicUsize 做 round-robin
    model_fallbacks: HashMap<String, Vec<String>>,
}

遇到 429 Rate Limit 就輪換 API key、指數退避重試;遇到 4xx 非 429 的錯誤就直接放棄;甚至還能解析 Retry-After header 來決定等多久。

RouterProvider:按 model 名稱路由

// 用 "hint:" 前綴來指定路由
fn resolve(&self, model: &str) -> (usize, String) {
    if let Some(hint) = model.strip_prefix("hint:") {
        if let Some((idx, resolved_model)) = self.routes.get(hint) {
            return (*idx, resolved_model.clone());
        }
    }
    (self.default_index, model.to_string())
}

傳入 "hint:reasoning" 就會路由到你設定好對應「reasoning 任務」的 provider + model 組合。很適合多 provider 環境下的靈活調度。


Tool Calling 的三種模式

各家 API 對 function calling 的格式差異很大,zeroclaw 用一個 enum 來表達:

pub enum ToolsPayload {
    Gemini { function_declarations: Vec<serde_json::Value> },
    Anthropic { tools: Vec<serde_json::Value> },
    OpenAI { tools: Vec<serde_json::Value> },
    PromptGuided { instructions: String },  // 給不支援原生 tool call 的 provider 用
}

實際上支援三種策略:

  1. 原生 Tool Calling:Anthropic、OpenAI、Gemini 都有各自的格式,provider 各自轉換。
  2. Prompt 注入(PromptGuided):把 tool 的說明文字注入到 system prompt,讓 LLM 用 <tool_call>JSON</tool_call> 這樣的 XML tag 來回應。
  3. 自動降級OpenAiCompatibleProvider 會先嘗試原生格式,如果收到 "unknown parameter: tools" 這類錯誤,就自動切換到 prompt 注入模式。這樣就算某個 OpenAI 相容的 endpoint 不支援 tool call,也能無縫 fallback。

各家 Provider 的奇特之處

實作各家 provider 的過程中,可以看到各家 API 設計的一些有趣差異:

Anthropic:支援 Prompt Caching,會自動對大型 system prompt(>3072 bytes)或長對話(>4 則)加上 cache_control: ephemeral,降低費用。

Gemini:是唯一把 API key 放在 query string(?key=xxx)而不是 header 裡的。而且 OAuth 使用者(Gemini CLI)必須打完全不同的 endpoint(cloudcode-pa.googleapis.com)。

OpenAiCompatibleProvider:一個實作打遍幾乎所有 OpenAI 相容的 API——Venice、Cloudflare、Groq、Mistral、xAI、DeepSeek、Perplexity……超過 15 個 provider 都是它的變體。靠著 AuthStyle enum(Bearer、XApiKey、Custom)和可設定的 base URL 來區分。


工廠函式

最後,使用者不需要直接 new 這些 struct,而是透過三個工廠函式:

// 簡單建立單一 provider
create_provider(name, api_key)

// 加上重試 + fallback 功能
create_resilient_provider(..., reliability)

// 加上多 provider 路由
create_routed_provider(..., model_routes)

API key 解析的優先順序是:明確傳入的參數 → provider 專屬的環境變數(如 ANTHROPIC_API_KEY)→ 通用的 ZEROCLAW_API_KEYAPI_KEY


小結

zeroclaw 的設計讓我印象深刻的幾個點:

  • 最小介面原則:只強制一個方法,其他都是預設實作,加新 provider 成本很低。
  • Decorator 而非繼承:retry、routing 都是包裝器,跟具體 provider 完全解耦。
  • 漸進式降級:tool calling 從原生到 prompt 注入的自動 fallback,使用者完全感知不到。
  • 務實:面對各家 API 的奇葩設計(Gemini 的 query param auth、各種 reasoning_content 欄位)直接在具體實作裡處理,不讓這些噪音污染核心抽象。

Rust 的 async_trait:為什麼 async fn 不能直接用在 trait 裡?

本文涵蓋為什麼 Rust trait 不支援 async fnasync_trait crate 如何解決這個問題、它的代價是什麼,以及 Rust 1.75 之後的原生支援現況。

問題:async fn 不能直接用在 trait 裡

如果你第一次在 Rust 寫 async 相關的 trait,很可能會直覺地這樣寫:

trait MyTrait {
    async fn do_something(&self) -> String; // ❌ 編譯錯誤
}

但這在穩定版 Rust(1.75 之前)是不允許的。為什麼?


根本原因:impl Trait 與 dyn Trait 的衝突

async fn 是語法糖,它會被編譯器展開成回傳 impl Future 的普通函式:

// 你寫的:
async fn do_something(&self) -> String { ... }

// 編譯器看到的:
fn do_something(&self) -> impl Future<Output = String> { ... }

問題出在 trait 裡。每個實作 trait 的型別,其 do_something 都會回傳一個不同的具體 Future 型別

impl MyTrait for StructA {
    // 回傳型別是某個只有編譯器知道名字的 Future_A
    async fn do_something(&self) -> String { ... }
}

impl MyTrait for StructB {
    // 回傳型別是另一個 Future_B,跟 Future_A 完全不同
    async fn do_something(&self) -> String { ... }
}

這讓 trait 無法做到 object-safe(也就是無法用 dyn MyTrait),因為動態派發需要在執行期才知道要呼叫哪個實作,但每個實作的回傳型別大小不同,vtable 根本無法表達。


解法:async_trait crate

async_trait 是由 David Tolnay(serdeanyhow 的作者)開發的 procedural macro,它的做法是把所有 async fn 的回傳值統一包進 Box

use async_trait::async_trait;

#[async_trait]
trait MyTrait {
    async fn do_something(&self) -> String;
}

#[async_trait]
impl MyTrait for MyStruct {
    async fn do_something(&self) -> String {
        "hello".to_string()
    }
}

macro 展開後,實際上變成這樣:

trait MyTrait {
    fn do_something(&self) -> Pin<Box<dyn Future<Output = String> + Send + '_>>;
}

impl MyTrait for MyStruct {
    fn do_something(&self) -> Pin<Box<dyn Future<Output = String> + Send + '_>> {
        Box::pin(async move {
            "hello".to_string()
        })
    }
}

回傳型別統一成 Pin<Box<dyn Future>>,大小固定,vtable 可以表達,dyn MyTrait 就可以正常運作了。


代價與限制

堆積分配(Heap allocation)

每次呼叫 async trait method,都會觸發一次 Box::pin(),也就是一次 heap 分配。對於高頻呼叫的場景,這個開銷是需要考慮的。

Send bound

預設情況下,async_trait 要求產生的 Future 必須是 Send(可以跨執行緒傳遞),適合多執行緒 async runtime(如 Tokio)。

如果你的情境不需要 Send(例如單執行緒 runtime),可以這樣關掉:

#[async_trait(?Send)]
trait MyTrait {
    async fn do_something(&self) -> String;
}

生命週期複雜度

macro 會自動處理大部分生命週期,但在一些邊緣情況(例如 &self 裡有複雜的借用關係)還是可能需要手動標注,錯誤訊息也可能比較難讀。


Rust 1.75 的原生支援

Rust 1.75(2023 年 12 月)穩定了 Return Position Impl Trait in Trait(RPITIT),讓 async fn 可以直接用在 trait 裡:

trait MyTrait {
    async fn do_something(&self) -> String; // ✅ Rust 1.75+ 可以!
}

不需要任何外部 crate,不需要 Box,沒有 heap 分配。

但 dyn Trait 仍有限制

原生支援的版本有一個重要限制:動態派發(dyn MyTrait)尚未完全支援

fn call(obj: &dyn MyTrait) {  // ⚠️ 可能有限制
    obj.do_something();
}

如果你需要 dyn Trait,目前仍建議使用 async_trait crate,或者搭配 dynosaur 等新興 crate 來橋接。


async_trait 與 Pin 的關係

async_trait 展開後的回傳型別是 Pin<Box<dyn Future>>,這裡的 Pin 是必要的——因為 dyn Future 可能是自引用的(async block 裡可能跨 await 持有引用),必須保證它被 poll 的過程中不會被移動。

這也是為什麼 async_trait 的文件裡,你會看到它與 PinBox 密不可分。如果你對 Pin 還不熟悉,可以先閱讀《深入理解 Rust 的 Pin》


應該用哪個?

情境 建議
Rust 1.75+,不需要 dyn Trait 原生 async fn in trait
需要 dyn Trait async_trait crate
效能極度敏感,避免 heap 分配 考慮手動實作或 impl Trait 參數
舊版 Rust(< 1.75) async_trait crate

總結

  • Rust trait 原本不支援 async fn,根本原因是每個實作的 Future 型別不同,無法做到 object-safe。
  • async_trait crate 透過把回傳值包進 Pin<Box<dyn Future>> 解決這個問題,代價是每次呼叫需要 heap 分配。
  • Rust 1.75 穩定了原生的 async fn in trait,但動態派發(dyn Trait)的支援仍不完整,async_trait 在這個場景仍有其價值。