Simply Patrick

用 Iced 打造漫畫閱讀器:Arc<dyn Trait>、LRU 快取與非同步預載

featured.svg

最近用 Iced 實作了一個漫畫閱讀器——能打開 CBZ、CB7 格式的漫畫壓縮檔,支援鍵盤翻頁,還有背景預載機制讓翻頁幾乎感受不到延遲。

這篇文章聚焦在幾個有趣的設計決策:trait object 搭配 Arc 的多格式抽象、以「距離」驅逐的 LRU 快取、以及 smol::unblock 把同步 I/O 推進執行緒池的技巧。

專案概述

comic-viewer 支援的功能:

  • 開啟 CBZ(ZIP)、CB7(7-Zip)格式漫畫檔
  • CBR(RAR)格式顯示友善的錯誤訊息(需外部函式庫,目前為 stub)
  • 鍵盤翻頁(方向鍵、PageUp/PageDown)
  • 頁面快取(最多 7 頁),快取命中時翻頁瞬間完成
  • 背景預載:目前頁面前後各 2 頁
  • Tokyo Night Storm 主題的深色 UI

專案結構很清楚:

comic-viewer/
├── Cargo.toml
└── src/
    ├── main.rs          # Iced 應用程式主體(App、Message、PageCache)
    └── reader/
        ├── mod.rs       # ComicReader trait + open() 工廠函式
        ├── cbz.rs       # ZIP 格式實作
        ├── cb7.rs       # 7-Zip 格式實作
        └── cbr.rs       # RAR stub

1. Arc<dyn ComicReader>:多格式的 Trait Object 抽象

不同壓縮格式的解壓縮方式差異很大,但對應用程式來說只需要「給我第 N 頁的圖片」。ComicReader trait 完美地封裝這個差異:

pub trait ComicReader: Send + Sync + std::fmt::Debug {
    fn title(&self) -> &str;
    fn page_count(&self) -> usize;
    fn extract_page(&self, index: usize) -> Result<iced::widget::image::Handle, String>;
}

幾個設計細節值得注意:

  • Send + Sync:因為 App.comicOption<Arc<dyn ComicReader>>,而 Arc<T> 要求 T: Send + Sync 才能跨執行緒共享。背景預載任務會 clone 這個 Arc 並在 smol 的執行緒池跑,所以必須是 Sync
  • &self 而非 &mut selfextract_page 刻意設計成不可變借用,這樣就能從多個執行緒同時呼叫,不需要加鎖
  • std::fmt::Debug:讓整個 App struct 可以 derive Debug(Iced 有時會要求)

工廠函式用副檔名做格式分派:

pub fn open(path: &Path) -> Result<Box<dyn ComicReader>, String> {
    match path
        .extension()
        .and_then(|e| e.to_str())
        .map(str::to_lowercase)
        .as_deref()
    {
        Some("cbz") => cbz::CbzReader::open(path).map(|r| Box::new(r) as Box<dyn ComicReader>),
        Some("cbr") => cbr::CbrReader::open(path).map(|r| Box::new(r) as Box<dyn ComicReader>),
        Some("cb7") => cb7::Cb7Reader::open(path).map(|r| Box::new(r) as Box<dyn ComicReader>),
        ext => Err(format!("Unsupported format: {}", ext.unwrap_or("none"))),
    }
}

呼叫端接到 Box<dyn ComicReader> 後立刻包進 Arc::from(box),讓後續可以廉價 clone 給多個背景任務:

Task::perform(
    async move { smol::unblock(move || reader::open(&path).map(Arc::from)).await },
    Message::ComicLoaded,
)

Arc::from(box)Box<dyn ComicReader> 轉成 Arc<dyn ComicReader>——這個轉換是零成本的,只是改變了「誰擁有這塊記憶體的所有權計數器」。

2. CBZ 與 CB7 的不同解壓策略

ZIP 和 7-Zip 的壓縮原理不同,導致兩種 reader 的實作策略截然不同。

CBZ(ZIP):隨機存取,按需解壓

ZIP 格式的每個檔案是獨立壓縮的,可以不看前面的 entry 直接跳到任意 entry 開始解壓。CbzReader 利用這個特性,只在記憶體中保留整份 .cbz 的原始位元組,每次 extract_page 再用 Cursor 包起來重新開一個 ZipArchive

#[derive(Debug)]
pub struct CbzReader {
    title: String,
    pages: Vec<String>,      // 排序後的圖片檔名列表
    archive_bytes: Vec<u8>,  // 整份 .cbz 的原始位元組
}

impl ComicReader for CbzReader {
    fn extract_page(&self, index: usize) -> Result<image::Handle, String> {
        let filename = self.pages.get(index)
            .ok_or_else(|| format!("Page index {index} out of bounds"))?;
        let cursor = std::io::Cursor::new(&self.archive_bytes);
        let mut archive = zip::ZipArchive::new(cursor).map_err(|e| e.to_string())?;
        let mut file = archive.by_name(filename).map_err(|e| e.to_string())?;
        let mut bytes = Vec::new();
        file.read_to_end(&mut bytes).map_err(|e| e.to_string())?;
        Ok(image::Handle::from_bytes(bytes))
    }
}

每次 extract_page 都重新建立 ZipArchive,看似浪費,但因為 ZipArchive::new 只是 parse ZIP 的 central directory(在檔案末尾,很快),解壓單頁的成本主要在 inflate,整體還是很快的。

CB7(7-Zip):區塊壓縮,開檔時全部解壓

7-Zip 使用 LZMA/LZMA2 區塊壓縮,整個壓縮包的資料是相互依賴的,沒辦法隨機存取單一 entry。Cb7Reader 乾脆在 open() 時一次解壓全部圖片,把位元組存進記憶體:

#[derive(Debug)]
pub struct Cb7Reader {
    title: String,
    pages: Vec<Vec<u8>>,  // 每頁的原始圖片位元組,開檔時全部解壓好
}

impl Cb7Reader {
    pub fn open(path: &Path) -> Result<Self, String> {
        let mut reader = SevenZReader::open(path, Password::empty())
            .map_err(|e| e.to_string())?;

        let mut entries: Vec<(String, Vec<u8>)> = Vec::new();
        reader.for_each_entries(|entry, reader| {
            if entry.is_directory() || !super::is_image_file(entry.name()) {
                return Ok(true); // 跳過,繼續
            }
            let mut bytes = Vec::new();
            reader.read_to_end(&mut bytes).map_err(sevenz_rust::Error::from)?;
            entries.push((entry.name().to_owned(), bytes));
            Ok(true)
        }).map_err(|e| e.to_string())?;

        entries.sort_by(|(a, _), (b, _)| a.cmp(b));
        // ...
    }
}

開檔比 CBZ 慢(要解壓全部內容),但 extract_page 就只是 .clone() 一個 Vec<u8>,非常快。

這是一個典型的「把開銷前移」設計:把慢的動作(解壓縮)移到啟動時,換取後續每次存取都接近零成本。

CBR(RAR):誠實的 Stub

RAR 格式需要 unrar native library,不是純 Rust 可以輕鬆搞定的。CbrReader 選擇一個直白但很務實的做法:

impl CbrReader {
    pub fn open(_path: &Path) -> Result<Self, String> {
        Err(
            "CBR (RAR) format is not yet supported. \
             Consider converting to CBZ using Calibre or 7-Zip."
                .to_string(),
        )
    }
}

open() 時就立刻回傳清楚的錯誤和建議,不讓使用者走到一半才踩雷。這樣的 stub 還有個好處:整個程式的型別系統是完整的,未來只需要填充 CbrReader::open 的實作就好,其餘不用改。

3. 以「距離」驅逐的 LRU 快取

傳統 LRU 快取驅逐最久未使用的項目,但對漫畫閱讀器來說「最久未使用」不是最好的策略——你更想保留目前頁面周圍的頁面,而不是剛才看完的頁面。

PageCache 改用「距離目前頁面最遠的項目優先驅逐」:

const CACHE_CAPACITY: usize = 7;

struct PageCache {
    entries: HashMap<usize, image::Handle>,
}

impl PageCache {
    fn insert(&mut self, index: usize, handle: image::Handle, current_page: usize) {
        if !self.entries.contains_key(&index) && self.entries.len() >= CACHE_CAPACITY {
            // 找距離 current_page 最遠的 key
            let evict = *self
                .entries
                .keys()
                .max_by_key(|&&k| k.abs_diff(current_page))
                .expect("cache is non-empty");
            self.entries.remove(&evict);
        }
        self.entries.insert(index, handle);
    }
}

usize::abs_diff 計算兩個 usize 的絕對差值,不需要轉成有號整數,在 Rust 1.60 加入標準函式庫。

有一個借用技巧值得注意:

// usize is Copy,所以可以在可變借用前先把 key 複製出來
let evict = *self
    .entries
    .keys()
    .max_by_key(|&&k| k.abs_diff(current_page))
    .expect("cache is non-empty");
self.entries.remove(&evict);  // 這裡才開始可變借用

如果寫成 let evict_ref = self.entries.keys().max_by_key(...) 然後直接傳給 remove,借用檢查器會拒絕——因為 keys() 返回的 iterator 持有對 self.entries 的不可變借用,而 remove 需要可變借用。提前 * 解引用並把 usize(Copy 型別)複製出來,借用就在那一行結束了。

為什麼 capacity 選 7?前後各 2 頁(4 頁)加上目前頁面(1 頁)= 5,多留 2 頁緩衝,讓快速翻頁時舊頁面還在快取裡。

持有 image::Handle 的好處是:Iced 會把解碼後的圖片上傳到 GPU 紋理,而 Handle 是對那份紋理的引用(Arc 語義)。只要 Handle 活著,GPU 上的資料就不會被回收——快取命中時翻頁是真的零解碼。

4. Iced 的 MVU 架構與 Task 系統

Iced 採用 Elm 架構(Model-View-Update,MVU)。整個應用程式的運作流程是:

graph LR A[用戶操作 / 事件] -->|產生| B[Message] B -->|傳入| C[update 函式] C -->|修改| D[App 狀態] C -->|回傳| E[Task] E -->|完成後產生| B D -->|傳入| F[view 函式] F -->|產生| G[Element UI 樹] G -->|渲染到螢幕| A

Message enum 是所有事件的完整清單:

#[derive(Debug, Clone)]
enum Message {
    OpenFile,
    FileSelected(Option<PathBuf>),
    ComicLoaded(Result<Arc<dyn reader::ComicReader>, String>),
    NextPage,
    PrevPage,
    PagePreloaded(usize, Option<image::Handle>),
}

Task<Message> 是 Iced 的非同步工作單元。update 函式回傳 Task,Iced 的 runtime 會執行它,完成後把結果包成 Message 再餵回 update。這讓狀態轉換邏輯永遠是同步、純粹的,而非同步副作用完全由 Task 處理。

開檔的流程就是兩個 Task 串接:

// 第一步:呼叫原生對話框
Message::OpenFile => {
    Task::perform(
        async {
            rfd::AsyncFileDialog::new()
                .add_filter("Comic Book Archive", &["cbz", "cbr", "cb7"])
                .pick_file()
                .await
                .map(|f| f.path().to_owned())
        },
        Message::FileSelected,  // 完成後產生 FileSelected(Option<PathBuf>)
    )
}
// 第二步:在執行緒池解壓縮並建立 Reader
Message::FileSelected(Some(path)) => Task::perform(
    async move { smol::unblock(move || reader::open(&path).map(Arc::from)).await },
    Message::ComicLoaded,  // 完成後產生 ComicLoaded(Result<Arc<...>, String>)
),

5. smol::unblock:把同步 I/O 推進執行緒池

Iced 的 async runtime 是基於 futures-executor(smol 生態系),不是 Tokio——從 Cargo.lock 可以確認整個依賴樹裡根本沒有 tokio。但不管底層是哪套 executor,async 執行緒都不適合跑耗時的同步運算(如解壓縮)。如果直接在 async task 裡呼叫 zip::ZipArchive::new(),會阻塞整個執行緒,造成 UI 卡頓。

smol::unblock 是解法:把一個同步的閉包包起來,丟進 smol 的阻塞執行緒池(blocking thread pool)跑,並回傳一個 Future

// 背景預載的單頁提取
let comic = Arc::clone(comic);
Task::perform(
    async move {
        let handle = smol::unblock(move || comic.extract_page(idx).ok()).await;
        (idx, handle)
    },
    |(idx, handle)| Message::PagePreloaded(idx, handle),
)

smol::unblock 的原理很直觀:它維護一個獨立的執行緒池,專門用來跑阻塞工作。move || 閉包在那個執行緒上同步執行,.await 等它完成,結果就像一個普通的 Future

這裡 Arc::clone(comic) 非常關鍵——每個預載任務需要獨立持有對 ComicReader 的引用,才能在自己的執行緒上呼叫 extract_page,而且這份 clone 是 O(1) 的引用計數遞增,不是深複製。

6. 預載窗口與 Task::batch

preload_adjacent 計算出需要預載的頁面集合,然後用 Task::batch 同時啟動所有任務:

fn preload_adjacent(&self, around: usize) -> Task<Message> {
    let Some(comic) = &self.comic else { return Task::none(); };
    let page_count = comic.page_count();

    let start = around.saturating_sub(PRELOAD_LOOKAHEAD);  // 避免 usize 下溢
    let end = (around + PRELOAD_LOOKAHEAD).min(page_count - 1);

    let candidates: Vec<usize> = (start..=end)
        .filter(|&i| i != around && !self.page_cache.contains(i))
        .collect();

    if candidates.is_empty() {
        return Task::none();
    }

    let tasks: Vec<Task<Message>> = candidates
        .into_iter()
        .map(|idx| {
            let comic = Arc::clone(comic);
            Task::perform(
                async move {
                    let handle = smol::unblock(move || comic.extract_page(idx).ok()).await;
                    (idx, handle)
                },
                |(idx, handle)| Message::PagePreloaded(idx, handle),
            )
        })
        .collect();

    Task::batch(tasks)  // 並行啟動所有預載任務
}

saturating_sub 是個小細節:aroundusize,如果 around < PRELOAD_LOOKAHEAD 直接相減會 panic(或在 debug 模式 panic,release 模式則 wrap around)。saturating_sub 讓結果最低為 0,不需要額外的 if around >= PRELOAD_LOOKAHEAD 判斷。

Task::batch 把多個 Task 打包成一個,Iced 會並行執行它們。這意味著如果需要預載頁面 4 和頁面 6,兩個解壓縮任務會同時在執行緒池跑,而不是串行。

7. on_press_maybe:條件式按鈕啟用

Iced 提供了一個很方便的 API 來處理「條件式可按」的按鈕:

let can_prev = self.current_page > 0;
let can_next = self
    .comic
    .as_ref()
    .is_some_and(|c| self.current_page + 1 < c.page_count());

let prev_btn = button(text("◄ Previous").size(14))
    .on_press_maybe((has_comic && can_prev).then_some(Message::PrevPage));

let next_btn = button(text("Next ►").size(14))
    .on_press_maybe((has_comic && can_next).then_some(Message::NextPage));

on_press_maybe 接受 Option<Message>Some(msg) 代表按鈕可按,None 代表禁用。搭配 bool::then_sometrue 回傳 Some(value)false 回傳 None),可以非常簡潔地表達「有符合條件才能按」。

禁用狀態的視覺樣式也透過 nav_button_style 函式裡的 button::Status::Disabled 分支處理:

fn nav_button_style(_theme: &Theme, status: button::Status) -> button::Style {
    let bg = match status {
        button::Status::Hovered | button::Status::Pressed => iced::color!(0x565f89),
        button::Status::Disabled => iced::color!(0x292e42),  // 暗色,表示禁用
        _ => iced::color!(0x414868),
    };
    let text_color = match status {
        button::Status::Disabled => iced::color!(0x3d4168),  // 文字也變暗
        _ => iced::color!(0xc0caf5),
    };
    // ...
}

8. Subscription:鍵盤事件的全域監聽

Subscription 是 Iced 中持續監聽某個事件來源的機制,不同於 Task(一次性任務),Subscription 會在應用程式整個生命週期都保持活躍。

鍵盤翻頁用 iced::keyboard::on_key_press 建立訂閱:

fn subscription(&self) -> Subscription<Message> {
    iced::keyboard::on_key_press(|key, _modifiers| match key.as_ref() {
        iced::keyboard::Key::Named(Named::ArrowRight)
        | iced::keyboard::Key::Named(Named::PageDown) => Some(Message::NextPage),
        iced::keyboard::Key::Named(Named::ArrowLeft)
        | iced::keyboard::Key::Named(Named::PageUp) => Some(Message::PrevPage),
        _ => None,
    })
}

閉包接受按鍵和修飾鍵,回傳 Option<Message>Some(msg) 代表這個按鍵要觸發對應的 message,None 代表忽略。這讓翻頁鍵(方向鍵 + PageUp/PageDown)不需要點擊按鈕就能使用,閱讀體驗更自然。

key.as_ref() 是必要的——iced::keyboard::Key 包含 owned 的字元資料,.as_ref() 轉成借用版本才能做 pattern matching 而不移動所有權。

總結

概念 在專案中的應用
Arc<dyn Trait> ComicReader 跨執行緒共享,零成本 clone 給預載任務
Send + Sync bound 確保 trait object 可安全跨執行緒使用
&self on trait method 允許多執行緒並發呼叫 extract_page,不需鎖
按需解壓 vs 全量預載 ZIP 隨機存取 vs 7-Zip 區塊壓縮,各自最佳策略
距離驅逐快取 比傳統 LRU 更適合線性翻頁場景
abs_diff + Copy 借用技巧 規避借用檢查器限制的慣用寫法
Task + Task::batch Iced 的非同步副作用模型,並行預載
smol::unblock 把同步阻塞 I/O 推進獨立執行緒池
on_press_maybe 條件式按鈕啟用,搭配 then_some 很簡潔
Subscription 全域鍵盤事件監聽,生命週期同整個應用程式

漫畫閱讀器是一個很好的 Rust GUI 練習專案:需要處理多種檔案格式(trait object 的最佳用武之地)、需要快取和預載(資料結構設計)、需要在不卡 UI 的前提下做 I/O(非同步設計)。如果你正在學 Iced 的 MVU 架構,這個專案的規模剛好——不會太小而沒有學習點,也不會大到難以消化。

參考資源


zeroclaw 如何設計 Observability 抽象層

zeroclaw 的 observability 模組是個教科書等級的 Rust trait-based plugin 設計。它讓你在編譯期決定要不要開啟可觀測性,同時支援 Prometheus、OpenTelemetry 等多種後端,還能零成本地完全關閉。

問題:框架的 Observability 該怎麼設計?

你寫了一個 AI agent 框架,你想讓使用者可以選擇怎麼觀察它的行為——有人想接 Prometheus,有人想接 OpenTelemetry,有人根本不在乎、只要 log 就好,還有人在測試時一個 I/O 都不想多做。

這種「插件式後端選擇」的設計,zeroclaw 用了一個很乾淨的 trait object 架構來解決。


核心抽象:Observer Trait

src/observability/traits.rs 定義了整個系統的骨架:

pub trait Observer: Send + Sync + 'static {
    fn record_event(&self, event: &ObserverEvent);
    fn record_metric(&self, metric: &ObserverMetric);
    fn flush(&self) {}
    fn name(&self) -> &str;
    fn as_any(&self) -> &dyn std::any::Any;
}

Send + Sync + 'static 是為了讓 Arc<dyn Observer> 可以跨 async task 傳遞——這個組合在 tokio 生態系幾乎是標配。

flush() 有預設實作(空的),只有需要批次傳送的後端(例如 OTel)才需要覆寫。

as_any() 則是個小技巧,讓你可以在必要時從 dyn Observer 向下轉型(downcast)到具體型別——後面會看到它的用途。


兩種觀測維度:Event vs Metric

zeroclaw 把可觀測的資訊分成兩類:

ObserverEvent:領域事件

pub enum ObserverEvent {
    AgentStart { provider: String, model: String },
    LlmRequest { provider: String, model: String, messages_count: usize },
    LlmResponse {
        provider: String,
        model: String,
        duration: Duration,
        success: bool,
        error_message: Option<String>,
    },
    AgentEnd {
        provider: String,
        model: String,
        duration: Duration,
        tokens_used: Option<u64>,
        cost_usd: Option<f64>,
    },
    ToolCallStart { tool: String },
    ToolCall { tool: String, duration: Duration, success: bool },
    TurnComplete,
    ChannelMessage { channel: String, direction: String },
    HeartbeatTick,
    Error { component: String, message: String },
}

Event 是「某件事發生了」——agent 啟動、LLM 回應、工具被呼叫等。每個 variant 都帶有足夠的上下文,讓後端可以根據需要提取資訊。

ObserverMetric:數值度量

pub enum ObserverMetric {
    RequestLatency(Duration),
    TokensUsed(u64),
    ActiveSessions(u64),
    QueueDepth(u64),
}

Metric 是「一個數值」——延遲、用量、隊列深度。這類資料更適合作為 gauge 或 histogram 送到監控系統。

這個分法很務實:Prometheus 要的是數值,OTel 兩者都要,log 後端則把兩者都轉成文字。


後端實作:從零成本到全功能

Noop:真正的零成本

pub struct NoopObserver;

impl Observer for NoopObserver {
    #[inline(always)]
    fn record_event(&self, _event: &ObserverEvent) {}

    #[inline(always)]
    fn record_metric(&self, _metric: &ObserverMetric) {}

    fn name(&self) -> &str { "noop" }
    fn as_any(&self) -> &dyn Any { self }
}

#[inline(always)] 是關鍵。LLVM 看到這個後會把所有呼叫點直接展開為空操作,最終生成的機器碼裡根本沒有 observer 相關的指令。這就是 Rust 的「零成本抽象」在實務中的樣子。

Log:接 tracing 生態系

impl Observer for LogObserver {
    fn record_event(&self, event: &ObserverEvent) {
        match event {
            ObserverEvent::AgentStart { provider, model } => {
                info!(provider = %provider, model = %model, "agent.start");
            }
            ObserverEvent::LlmResponse { duration, success, .. } => {
                info!(duration_ms = duration.as_millis(), success, "llm.response");
            }
            // ...
        }
    }
}

tracing crate 做結構化 logging,每個事件對應一條有 key-value 欄位的 log 記錄。

Verbose:給互動式 CLI 的人看

impl Observer for VerboseObserver {
    fn record_event(&self, event: &ObserverEvent) {
        match event {
            ObserverEvent::LlmRequest { provider, model, messages_count } => {
                eprintln!("> Thinking");
                eprintln!("> Send (provider={}, model={}, messages={})",
                    provider, model, messages_count);
            }
            ObserverEvent::LlmResponse { duration, success, .. } => {
                eprintln!("< Receive (success={}, duration_ms={})",
                    success, duration.as_millis());
            }
            // ...
        }
    }
}

>< 表示「送出」和「收到」,讓使用者在終端機就能感受到 agent 的呼吸感,不需要看 log 或架監控。

Prometheus:指標抓取

pub struct PrometheusObserver {
    registry: Registry,
    agent_starts: IntCounterVec,
    agent_duration: HistogramVec,
    llm_calls: IntCounterVec,
    llm_duration: Histogram,
    tool_calls: IntCounterVec,
    // ...
}

每個欄位對應一個 Prometheus 指標。Event 進來後,依照類型對相應的 counter 或 histogram 做操作。

Prometheus 後端有個特別之處:它需要暴露一個 /metrics HTTP endpoint 讓 scraper 來抓。但 Observer trait 本身沒有這個方法——這就是 as_any() 的用武之地:

// 在 gateway 需要回應 /metrics 請求時
state.observer
    .as_any()
    .downcast_ref::<PrometheusObserver>()
    .map(|prom| prom.encode())

透過 downcast 繞過了 trait 的限制,讓 Prometheus 特有的操作可以被存取,而不需要把這個方法塞進通用的 Observer trait。

OpenTelemetry:完整的追蹤 + 指標

pub struct OtelObserver {
    tracer_provider: SdkTracerProvider,
    meter_provider: SdkMeterProvider,
    agent_starts: Counter<u64>,
    agent_duration: Histogram<f64>,
    llm_calls: Counter<u64>,
    llm_duration: Histogram<f64>,
    // ...
}

OTel 後端最有意思的地方是它會從 event 裡還原 span

// 收到 LlmResponse 事件後,用 duration 計算出 span 的開始時間
let start_time = SystemTime::now() - duration;
let mut span = tracer.build(
    SpanBuilder::from_name("llm.call")
        .with_start_time(start_time)
        .with_attributes(vec![
            KeyValue::new("provider", provider.clone()),
            KeyValue::new("model", model.clone()),
            KeyValue::new("success", *success),
        ])
);
span.set_status(if *success { Status::Ok } else { Status::error("") });
span.end();

因為 ObserverEvent 是事後才收到的(帶有 duration 欄位),OTel 後端就把開始時間「倒推」回去,製造出一個正確時間範圍的 span。這讓整個系統可以保持 event-sourced 的設計,而不需要在程式碼的每個角落都維護 span context。

Multi:扇出組合

pub struct MultiObserver {
    observers: Vec<Box<dyn Observer>>,
}

impl Observer for MultiObserver {
    fn record_event(&self, event: &ObserverEvent) {
        for obs in &self.observers {
            obs.record_event(event);
        }
    }
}

想同時送 Prometheus 又送 OTel?把它們包進 MultiObserver 就好。組合模式(Composite Pattern)在這裡用得剛好。


工廠函數:從設定到實體

pub fn create_observer(config: &ObservabilityConfig) -> Box<dyn Observer> {
    match config.backend.as_str() {
        "log" => Box::new(LogObserver::new()),
        "prometheus" => Box::new(PrometheusObserver::new()),
        "otel" | "opentelemetry" | "otlp" => {
            match OtelObserver::new(config.otel_endpoint.as_deref(), ...) {
                Ok(obs) => Box::new(obs),
                Err(e) => {
                    tracing::error!("Failed to create OTel observer: {e}. Falling back to noop.");
                    Box::new(NoopObserver)
                }
            }
        }
        _ => Box::new(NoopObserver),
    }
}

設定結構很精簡:

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObservabilityConfig {
    pub backend: String,          // "none" | "log" | "prometheus" | "otel"
    pub otel_endpoint: Option<String>,      // 預設 http://localhost:4318
    pub otel_service_name: Option<String>,  // 預設 "zeroclaw"
}

impl Default for ObservabilityConfig {
    fn default() -> Self {
        Self { backend: "none".into(), .. }
    }
}

預設是 "none",也就是 NoopObserver——除非你明確設定,否則完全沒有額外開銷。

這個工廠函數還有個好設計:OTel 初始化失敗(例如 endpoint 連不上)時,它不會 panic,而是靜默降級成 Noop,同時印一個 error log。可觀測性本身不應該成為系統的單點故障。


整合:Agent 和 Gateway 的用法

Agent 和 Gateway 都用 Arc<dyn Observer> 持有 observer:

pub struct Agent {
    observer: Arc<dyn Observer>,
    // ...
}

整個 agent 生命週期裡,事件就這樣流出去:

self.observer.record_event(&ObserverEvent::AgentStart {
    provider: provider_name.clone(),
    model: self.model_name.clone(),
});

// ... 執行 LLM 呼叫 ...

self.observer.record_event(&ObserverEvent::LlmResponse {
    provider: provider_name.clone(),
    model: self.model_name.clone(),
    duration,
    success: true,
    error_message: None,
});

Arc 讓 observer 可以廉價地被 clone 並跨 async task 共享,每個後端自己處理執行緒安全的問題。


設計亮點總結

zeroclaw 的 observability 層之所以好,是因為它做了幾個清晰的取捨:

設計決策 理由
Arc<dyn Observer> 而非泛型參數 避免 generic viral 污染整個 call stack
領域 enum 而非字串 event 型別安全,編譯器幫你確認所有 variant 都有處理
NoopObserver + #[inline(always)] 確保關閉時真的零成本,不只是概念上的零成本
工廠函數失敗時降級 可觀測性不是核心功能,不能影響主流程
as_any() 開後門 讓 Prometheus 這類需要特殊 API 的後端有辦法被使用,而不需要污染通用 trait
OTel 用倒推時間還原 span 保持呼叫點的 API 簡單,不需要到處傳 span context

這套設計很適合作為「如何在 Rust 框架裡設計可插拔後端」的參考範本。


為什麼 Pin::as_mut() 很重要

延伸自 理解 Rust 的 Pin:從問題到解法as_mut() 是在 Pin 世界裡做 reborrow 的核心操作,如果要手動實作 Future 或 Combinator,你幾乎一定會用到它。

問題:Pin 被消費了怎麼辦?

Future::poll 的簽名長這樣:

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Output>

注意 self: Pin<&mut Self>值傳遞——Pin<&mut T> 沒有 Copy,傳進函數就被 move 走了。

如果你的 Future 裡面有一個 inner future 需要被 poll,問題就來了:

struct MyFuture {
    inner: SomeFuture,
}

impl Future for MyFuture {
    type Output = i32;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<i32> {
        // inner 也需要 Pin<&mut SomeFuture> 才能被 poll
        // 但 self 是 Pin<&mut MyFuture>,
        // 不能直接取出 &mut self.inner,因為那樣可能讓 inner 被 move
        self.inner.poll(cx) // ❌ SomeFuture::poll 需要 Pin<&mut Self>,不是 &mut Self
    }
}

as_mut():Pin 的 Reborrow

先看普通 &mut T:Rust 在函數呼叫點自動 reborrow,所以 r 不會被消費:

fn takes_mut(r: &mut i32) { *r += 1; }

let mut x = 0;
let r = &mut x;

takes_mut(r);  // Rust 自動 reborrow,等同於 takes_mut(&mut *r)
takes_mut(r);  // ✅ r 沒有被消費,可以繼續使用

Pin<&mut T> 沒有這個自動 reborrow。把 Pin<&mut T> 傳進函數,它真的會被 move 掉:

fn poll_it<F: Future>(p: Pin<&mut F>, cx: &mut Context<'_>) -> Poll<F::Output> {
    p.poll(cx)
}

let mut fut = some_async_fn();
// 用 unsafe 建立 Pin<&mut _> 做示範
let mut pinned = unsafe { Pin::new_unchecked(&mut fut) };

poll_it(pinned, cx);  // pinned 被 move 進去,消費掉了
poll_it(pinned, cx);  // ❌ 編譯錯誤:use of moved value: `pinned`

Pin::as_mut() 就是為 Pin<&mut T> 補上這個能力——手動 reborrow:

impl<P: DerefMut> Pin<P> {
    pub fn as_mut(&mut self) -> Pin<&mut P::Target>
}
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<i32> {
    // as_mut() 借出一個新的 Pin<&mut MyFuture>
    // 原本的 self 還活著,可以繼續使用
    let _pin1 = self.as_mut(); // Pin<&mut MyFuture>
    let _pin2 = self.as_mut(); // 還可以再借
}

但這樣還是拿到 Pin<&mut MyFuture>,不是 Pin<&mut SomeFuture>,怎麼取得 inner 欄位的 Pin?


Projection:從外層 Pin 投影到內層欄位

「把 Pin<&mut Outer> 轉成 Pin<&mut Inner>」這個操作叫做 Pin Projection(投影)

手動做需要 unsafe

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<i32> {
    // as_mut() 先做 reborrow
    // map_unchecked_mut() 再投影到 inner 欄位
    let inner_pin: Pin<&mut SomeFuture> = unsafe {
        self.as_mut().map_unchecked_mut(|s| &mut s.inner)
    };
    inner_pin.poll(cx)
}

map_unchecked_mut 標記為 unsafe 是因為你必須自己保證:

  • inner 欄位在整個 MyFuture 被 drop 之前不會被 move 出去
  • 你不會同時製造兩個指向同一個欄位的可變引用

實務:用 pin_project 消除 unsafe

手動寫 projection 容易出錯,實務上幾乎都用 pin-project crate:

use pin_project::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

#[pin_project]
struct Timeout<F: Future> {
    #[pin]          // ← 標記這個欄位需要被 pin
    inner: F,
    deadline: std::time::Instant,
}

impl<F: Future> Future for Timeout<F> {
    type Output = Option<F::Output>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        let this = self.project(); // 安全地做 projection,無需 unsafe

        // this.inner 的型別是 Pin<&mut F>   ← 已經 pin 好,可以直接 poll
        // this.deadline 的型別是 &mut Instant ← 普通可變引用
        if std::time::Instant::now() >= *this.deadline {
            return Poll::Ready(None);
        }

        match this.inner.poll(cx) {
            Poll::Ready(v) => Poll::Ready(Some(v)),
            Poll::Pending => Poll::Pending,
        }
    }
}

#[pin_project] 這個 proc macro 幫你在背後生成了安全的 projection 程式碼,你只需要在欄位上標 #[pin] 就好。


as_mut()select! 裡的作用

as_mut() 不只在自訂 Future 裡用到,tokio::select! 在 loop 裡重複 poll 同一個 Future 時,也依賴相同的概念:

use tokio::pin;

let long_op = some_slow_task();
pin!(long_op); // 現在 long_op 是 Pin<&mut impl Future>

loop {
    tokio::select! {
        result = &mut long_op => { // &mut 在這裡做的正是 reborrow
            println!("完成:{:?}", result);
            break;
        }
        _ = tokio::time::sleep(Duration::from_secs(1)) => {
            println!("還在等...");
        }
    }
}

&mut long_opPin<&mut F> 做 reborrow,每次進入 select! 都借出一個新的 Pin<&mut F>,讓同一個 future 可以被跨多次迴圈反覆 poll,狀態不會丟失。


整理:哪時用哪個方法?

Pin<&mut T>
├── as_mut() → Pin<&mut T>
│   ├── 用於:reborrow,讓同一個 Pin 可以被多次使用
│   └── 然後可以接 map_unchecked_mut() 做 projection(通常讓 pin_project 來)
│
├── get_mut() → &mut T    (只有 T: Unpin 才能用)
│   └── 用於:徹底解除 Pin 約束,拿回普通可變引用
│
└── map_unchecked_mut() → Pin<&mut U>   (unsafe)
    └── 用於:手動 projection 到欄位

小結

  • as_mut() 是 Pin 版的 reborrow,解決「Pin<&mut T> 傳進函數就消費掉」的問題
  • 手動 Future 實作幾乎都需要 as_mut() + projection 來取得 inner future 的 Pin
  • 實務上用 pin_project crate 的 #[pin] 標記讓 macro 幫你安全地做 projection
  • tokio::select!&mut pinned_future 語法本質上也是 as_mut() 的 reborrow observability

zeroclaw 如何設計 Memory 的抽象層

zeroclaw 的 Memory 是 agent 的長期記憶——它記錄對話歷史、使用者偏好、決策紀錄,並在每次對話前把相關記憶召回注入給 LLM。這篇記錄它如何用一個 trait 統一四種後端(SQLite、Markdown、Lucid、Postgres),以及它最有趣的設計:混合式語意搜尋、Embedding LRU 快取、還有讓 agent 在冷啟動時從 Markdown 文字檔重建「靈魂」的機制。

Memory Trait:七個方法,全部必填

ChannelProvider 的設計不同——後兩者都有預設實作——Memory trait 的七個方法全部是必填的

#[async_trait]
pub trait Memory: Send + Sync {
    fn name(&self) -> &str;

    async fn store(
        &self,
        key: &str,
        content: &str,
        category: MemoryCategory,
        session_id: Option<&str>,
    ) -> anyhow::Result<()>;

    async fn recall(
        &self,
        query: &str,
        limit: usize,
        session_id: Option<&str>,
    ) -> anyhow::Result<Vec<MemoryEntry>>;

    async fn get(&self, key: &str) -> anyhow::Result<Option<MemoryEntry>>;
    async fn list(
        &self,
        category: Option<&MemoryCategory>,
        session_id: Option<&str>,
    ) -> anyhow::Result<Vec<MemoryEntry>>;
    async fn forget(&self, key: &str) -> anyhow::Result<bool>;
    async fn count(&self) -> anyhow::Result<usize>;
    async fn health_check(&self) -> bool;
}

這反映了「每種後端的搜尋策略都根本不同」——SQLite 用 FTS5 + 向量搜尋,Markdown 用關鍵字比對,Postgres 用 ILIKE——沒有一個合理的預設實作能套用在所有後端,所以索性不提供。


核心資料型別

pub struct MemoryEntry {
    pub id: String,
    pub key: String,
    pub content: String,
    pub category: MemoryCategory,
    pub timestamp: String,
    pub session_id: Option<String>,
    pub score: Option<f64>,  // 只有 recall() 會填,get()/list() 都是 None
}

pub enum MemoryCategory {
    Core,             // 長期事實、偏好、決策
    Daily,            // 每日工作日誌
    Conversation,     // 對話上下文
    Custom(String),   // 開放式擴充點
}

score 的設計很有趣:它是 recall() 的副產品,代表這筆記憶和查詢的相關度。get()list() 都是精確查找,沒有「相關度」的概念,所以這個欄位在那兩個情境下永遠是 NoneCustom(String) 則是一個逃生艙,讓使用者可以自訂任意分類而不需要修改 enum。


四個後端,一個介面

zeroclaw 目前有四個實作(加上一個 no-op):

後端 搜尋方式 特性
SqliteMemory FTS5 + 向量 (cosine) 推薦預設,單一 .db
MarkdownMemory 關鍵字比對 純文字,只能追加
LucidMemory SQLite + 外部 CLI 橋接第三方記憶工具
PostgresMemory ILIKE 模糊比對 雲端持久化,無 pgvector
NoneMemory 完全 no-op,停用記憶

SQLite:真正的大腦

SqliteMemory 是推薦的預設後端,資料存在 workspace/memory/brain.db。它的設計最複雜,也最值得細看。

Schema

三張表,各有不同用途:

-- 主要記憶表
CREATE TABLE memories (
    id         TEXT PRIMARY KEY,
    key        TEXT UNIQUE NOT NULL,   -- upsert 的 identity key
    content    TEXT NOT NULL,
    category   TEXT NOT NULL DEFAULT 'core',
    embedding  BLOB,                   -- f32 向量,little-endian bytes
    created_at TEXT NOT NULL,
    updated_at TEXT NOT NULL,
    session_id TEXT
);

-- FTS5 全文搜尋虛擬表
CREATE VIRTUAL TABLE memories_fts USING fts5(
    key, content,
    content=memories, content_rowid=rowid
);

-- Embedding LRU 快取
CREATE TABLE embedding_cache (
    content_hash TEXT PRIMARY KEY,  -- SHA-256 前 16 hex chars
    embedding    BLOB NOT NULL,
    created_at   TEXT NOT NULL,
    accessed_at  TEXT NOT NULL      -- LRU 用
);

FTS5 透過三個 trigger(INSERT、DELETE、UPDATE)和主表保持同步。store() 用的是 INSERT ... ON CONFLICT(key) DO UPDATE——upsert 語意,key 是唯一識別碼,id 是 UUID(每次 upsert 都會更新)。

SQLite 開啟時的 PRAGMA 調校:WAL 模式、8 MB mmap、2 MB page cache、MEMORY temp store。

為什麼用 spawn_blocking

rusqlite 是 blocking API,直接在 async context 呼叫會卡住 tokio runtime。所有 SQLite 操作都包在 tokio::task::spawn_blocking 裡:

let result = tokio::task::spawn_blocking(move || {
    let conn = db.lock();
    conn.execute("INSERT INTO memories ...", params![...])?;
    Ok(())
}).await??;

連線的開啟還多了一層防護——用 worker thread + mpsc::channel 加上 recv_timeout,防止 SQLite 檔案被鎖住時永遠卡住:

let (tx, rx) = mpsc::channel();
thread::spawn(move || {
    let _ = tx.send(Connection::open(&path));
});
match rx.recv_timeout(Duration::from_secs(capped)) {
    Ok(Ok(conn)) => conn,
    Ok(Err(e)) => return Err(e.into()),
    Err(_) => return Err(anyhow!("SQLite open timed out")),
}

混合語意搜尋:FTS5 + 向量

recall() 的實作是整個記憶系統最有趣的部分。它不是單一的搜尋策略,而是三層 fallback:

graph TD A["recall(query, limit)"] --> B{有設定 Embedding provider?} B -->|是| C[生成查詢向量] C --> D[FTS5 BM25 搜尋] D --> E["hybrid_merge(向量結果, 關鍵字結果)"] E --> F[回傳 Top-N] B -->|否| G[純 BM25 搜尋] G --> F D -->|FTS5 解析錯誤| H["LIKE %keyword% 兜底"] H --> F

向量:純 Rust Cosine Similarity

向量以 f32 little-endian bytes 存在 BLOB 欄位。Cosine similarity 是純 Rust 實作,刻意用 f64 累加以減少高維向量的浮點誤差:

pub fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
    let mut dot = 0.0_f64;
    let mut norm_a = 0.0_f64;
    let mut norm_b = 0.0_f64;

    for (x, y) in a.iter().zip(b.iter()) {
        let x = f64::from(*x);
        let y = f64::from(*y);
        dot += x * y;
        norm_a += x * x;
        norm_b += y * y;
    }
    let denom = norm_a.sqrt() * norm_b.sqrt();
    if !denom.is_finite() || denom < f64::EPSILON {
        return 0.0;
    }
    (dot / denom).clamp(0.0, 1.0) as f32
}

這是暴力掃描所有有 embedding 的 row,沒有 ANN index(如 HNSW、IVFFlat)。對一個本地 agent 的記憶量來說夠用,不需要引入 pgvector 或外部向量 DB 的依賴。

關鍵字:FTS5 BM25

查詢前先把每個 token 用雙引號包住再用 OR 連接,避免 FTS5 特殊字元造成解析錯誤:

let fts_query = query
    .split_whitespace()
    .map(|w| format!("\"{w}\""))
    .collect::<Vec<_>>()
    .join(" OR ");

FTS5 的 bm25() 分數是負數(越負越相關),取負後變成越大越好,和 cosine similarity 的方向統一。

混合合併

pub fn hybrid_merge(
    vector_results: &[(String, f32)],   // (id, cosine)
    keyword_results: &[(String, f32)],  // (id, -BM25)
    vector_weight: f32,                 // 預設 0.7
    keyword_weight: f32,                // 預設 0.3
    limit: usize,
) -> Vec<ScoredResult> {
    // BM25 分數正規化到 [0,1]
    // final_score = 0.7 * cosine + 0.3 * normalized_BM25
    // 去重:HashMap keyed on id
}

合併後的 Top-N ID 用一次 WHERE id IN (...) 批次撈出,避免 N+1 query。


Embedding LRU 快取

每次 store() 都需要為新內容生成 embedding,但相同的文字不應該重複打 API。快取的 key 是內容的 SHA-256 前 16 hex chars——明確選擇 SHA-256 而不是 DefaultHasher,因為後者的輸出在不同 Rust 版本之間不保證穩定。

LRU 淘汰不在 Rust 記憶體裡維護資料結構,而是直接在 SQL 裡做:

-- 保留最新的 N 筆,刪除多餘的舊資料
DELETE FROM embedding_cache WHERE content_hash IN (
    SELECT content_hash FROM embedding_cache
    ORDER BY accessed_at ASC
    LIMIT MAX(0, (SELECT COUNT(*) FROM embedding_cache) - ?1)
)

這樣快取的淘汰邏輯完全不依賴記憶體狀態,重啟之後一樣有效。


Embedding Provider

Embedding 本身也是一個 trait:

#[async_trait]
pub trait EmbeddingProvider: Send + Sync {
    fn name(&self) -> &str;
    fn dimensions(&self) -> usize;
    async fn embed(&self, texts: &[&str]) -> anyhow::Result<Vec<Vec<f32>>>;

    // 唯一的預設方法:批次 embed 的單一版本
    async fn embed_one(&self, text: &str) -> anyhow::Result<Vec<f32>> {
        let mut results = self.embed(&[text]).await?;
        results.pop().ok_or_else(|| anyhow!("Empty embedding result"))
    }
}

兩個具體實作:

  • NoopEmbeddingdimensions() = 0,回傳空向量。啟用後自動降級為純關鍵字搜尋。
  • OpenAiEmbedding:打 /v1/embeddings endpoint。支援自訂 base URL,用 custom: 前綴指定:
pub fn create_embedding_provider(provider: &str, api_key: ..., model: &str, dims: usize) {
    match provider {
        "openai" => Box::new(OpenAiEmbedding::new("https://api.openai.com", ...)),
        name if name.starts_with("custom:") => {
            let base_url = name.strip_prefix("custom:").unwrap_or("");
            Box::new(OpenAiEmbedding::new(base_url, ...))  // 指向 Ollama、本地推理伺服器等
        }
        _ => Box::new(NoopEmbedding),
    }
}

MarkdownMemory:只能追加的審計日誌

MarkdownMemoryforget() 永遠回傳 false——這個後端是只能追加的,設計上就是個審計日誌:

workspace/MEMORY.md                  ← Core 類別
workspace/memory/2026-02-19.md       ← Daily 類別(按日期分檔)

每次 store() 就在對應的 Markdown 檔尾端追加一行 - **key**: contentrecall() 則是把查詢拆成關鍵字,計算每筆記憶裡出現了幾個,算出比例當作分數。

雖然搜尋能力差,但好處是人類可讀、可以直接用 Git 追蹤,也容易備份。


LucidMemory:橋接外部 CLI

LucidMemory 是一個橋接層,SQLite 是主要儲存,外部 lucid-memory CLI 是輔助:

pub struct LucidMemory {
    local: SqliteMemory,
    recall_timeout: Duration,           // 預設 500ms
    store_timeout: Duration,            // 預設 800ms
    local_hit_threshold: usize,         // 預設 3:本地結果夠多就不問 Lucid
    failure_cooldown: Duration,         // 任何錯誤後冷卻 15 秒
    last_failure_at: Mutex<Option<Instant>>,
}

store() 先寫 SQLite,再 fire-and-forget 同步到 Lucid。recall() 的策略:先問 SQLite,如果結果數量低於 local_hit_threshold 且 Lucid 不在冷卻期,才去呼叫 lucid context <query>,然後合併結果。

去重用的是一個 NUL 字元分隔的簽名:

let signature = format!("{}\u{0}{}", entry.key.to_lowercase(), entry.content.to_lowercase());

Lucid 的輸出是自訂的 XML-like 格式:

<lucid-context>
- [decision] Use token refresh middleware
- [context] Working in src/auth.rs
</lucid-context>

方括號裡的標籤(decisioncontextbuglearning)會被映射回 MemoryCategory


Agent 的靈魂:Snapshot 與冷啟動恢復

最讓我覺得有趣的設計是「靈魂快照」機制。

在每次記憶體清理(hygiene)時,如果啟用了 snapshot_on_hygiene,所有 Core 類別的記憶都會被 dump 到工作目錄的 MEMORY_SNAPSHOT.md。這個檔案是 Git 可追蹤的純文字。

更厲害的是冷啟動恢復:如果 brain.db 不存在(例如全新部署、或資料庫損毀刪除),但 MEMORY_SNAPSHOT.md 存在,factory 會在開啟後端之前先呼叫 hydrate_from_snapshot(),把 Markdown 裡的記憶逐筆還原回 SQLite。

graph LR A["brain.db 存在?"] -->|是| B[正常開啟 SqliteMemory] A -->|否| C["MEMORY_SNAPSHOT.md 存在?"] C -->|是| D[hydrate_from_snapshot] D --> E[重新開啟 SqliteMemory] C -->|否| F[全新 SqliteMemory] B --> G[完成] E --> G F --> G

每次清理也會刪掉 MemoryCategory::Conversation 類別超過 conversation_retention_days 的舊紀錄——對話上下文是短暫的,不該無限累積。


在 Agent Loop 裡的角色

Memory 在每輪對話的三個時機被使用:

sequenceDiagram participant U as 使用者 participant AL as Agent Loop participant M as Memory participant LLM as LLM U->>AL: 送出訊息 AL->>M: recall(user_msg, 5) M-->>AL: 相關記憶(score >= 0.4 才納入) AL->>LLM: [Memory context]\n- key: content\n...\n{user_msg} LLM-->>AL: 回應 AL->>M: store("user_msg_{uuid}", msg, Conversation) AL->>M: store("assistant_resp_{uuid}", summary[:100], Daily) AL-->>U: 最終答覆

注意幾個細節:

  • 相關度門檻score >= 0.4 才注入,低分記憶不進 context,避免雜訊干擾 LLM。
  • 自動儲存:使用者訊息存 Conversation 類別(會被清理),助理回應只存前 100 字存 Daily 類別(會保留較久)。
  • LLM 直接操控記憶memory_storememory_recallmemory_forget 這三個工具也被當成普通的 Tool 掛載到 agent 上,LLM 自己可以主動讀寫記憶。

小結

zeroclaw Memory 設計幾個值得注意的地方:

  • 全部必填,沒有預設:每種後端的搜尋策略差太多,預設實作沒有意義。
  • SQLite 就夠用:FTS5 + 向量暴力搜尋在本地 agent 的規模不需要 pgvector,單一 .db 檔沒有外部依賴。
  • 三層 fallback 搜尋:Hybrid(FTS5 + 向量)→ BM25 only → LIKE 兜底,優雅降級。
  • SHA-256 快取,LRU 在 SQL 裡做:快取邏輯不依賴記憶體狀態,重啟後仍然有效。
  • 靈魂快照MEMORY_SNAPSHOT.md 是 Git 可追蹤的,讓 agent 可以從純文字重建長期記憶,冷啟動不代表失憶。
  • Conversation 記憶會被清理:對話上下文是短暫的,設計上就不打算永久保留。

zeroclaw 如何設計 Tool 的抽象層

zeroclaw 的 Tool 是 agent 採取行動的核心機制——執行 shell 指令、讀寫檔案、瀏覽網頁、呼叫 HTTP API、操作記憶體、甚至把任務委派給另一個 sub-agent。目前實作了 30+ 種工具。這篇記錄它如何用一個乾淨的 trait 統一所有工具,以及工具的組裝、dispatch、schema 正規化、安全注入等設計。

Tool Trait:四個必填,一個免費

整個抽象的核心在 src/tools/traits.rs,只有四個方法是必須實作的:

#[async_trait]
pub trait Tool: Send + Sync {
    fn name(&self) -> &str;
    fn description(&self) -> &str;
    fn parameters_schema(&self) -> serde_json::Value;
    async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult>;

    // 預設實作:把前三個方法組裝成 ToolSpec,免費附贈
    fn spec(&self) -> ToolSpec {
        ToolSpec {
            name: self.name().to_string(),
            description: self.description().to_string(),
            parameters: self.parameters_schema(),
        }
    }
}

幾個設計決策值得注意:

  • Send + Sync:讓工具可以存進 Arc<dyn Tool> 並在 thread 之間共享,必要條件。
  • async fn execute:所有工具都可能有 I/O,統一用 async,不需要區分同步/非同步工具。
  • 參數是 serde_json::Value:不是每個工具一個強型別 struct,而是執行時從 JSON 取值。省去了大量 boilerplate,代價是錯誤在執行時才出現,而不是編譯時。
  • spec() 是預設方法:把 namedescriptionparameters_schema 這三個方法的結果組裝成 ToolSpec,新增工具完全不需要自己實作這個。

核心資料型別

三個關鍵型別:

// 每次執行的回傳值
pub struct ToolResult {
    pub success: bool,
    pub output: String,
    pub error: Option<String>,
}

// 送給 LLM 描述「這個工具能做什麼」的規格書
pub struct ToolSpec {
    pub name: String,
    pub description: String,
    pub parameters: serde_json::Value,  // JSON Schema 物件
}

注意 execute 的錯誤處理設計:正常的執行失敗(找不到檔案、路徑不允許)用 ToolResult { success: false, error: Some(...) } 回傳;只有程式本身的 bug 或不可恢復的錯誤才回傳 anyhow::Result::Err。這讓 agent loop 可以把執行失敗的結果繼續送給 LLM 讓它決定下一步,而不是直接炸掉整個流程。

參數 schema 是用 serde_json::json!() 手寫 JSON Schema,沒有 proc macro 或 derive:

fn parameters_schema(&self) -> serde_json::Value {
    json!({
        "type": "object",
        "properties": {
            "command": {
                "type": "string",
                "description": "The shell command to execute"
            },
            "approved": {
                "type": "boolean",
                "description": "Set true to explicitly approve medium/high-risk commands",
                "default": false
            }
        },
        "required": ["command"]
    })
}

工具的組裝:工廠函式,不靠反射

工具不是自動發現的,而是在 src/tools/mod.rs 裡用工廠函式顯式組裝:

// 最小集合,給測試和簡單 agent 用
pub fn default_tools(security: Arc<SecurityPolicy>) -> Vec<Box<dyn Tool>> {
    vec![
        Box::new(ShellTool::new(security.clone(), runtime)),
        Box::new(FileReadTool::new(security.clone())),
        Box::new(FileWriteTool::new(security)),
    ]
}

// 完整工具集,根據設定條件啟用
pub fn all_tools_with_runtime(config, security, runtime, memory, ...) -> Vec<Box<dyn Tool>> {
    let mut tools: Vec<Box<dyn Tool>> = vec![
        Box::new(ShellTool::new(...)),
        Box::new(FileReadTool::new(...)),
        Box::new(CronAddTool::new(...)),
        Box::new(MemoryStoreTool::new(...)),
        // ...
    ];

    // 條件啟用:設定決定工具集,不是全部預設開啟
    if browser_config.enabled {
        tools.push(Box::new(BrowserTool::new_with_backend(...)));
    }
    if http_config.enabled {
        tools.push(Box::new(HttpRequestTool::new(...)));
    }
    if !agents.is_empty() {
        tools.push(Box::new(DelegateTool::new(agents, ...)));
    }
    tools
}

這是工廠/建構器模式——沒有反射、沒有 inventory 這類 compile-time 自動收集 crate、沒有 #[register_tool] 巨集。好處是依賴關係一目了然,壞處是每加一個工具都要手動登記。對這個規模的 codebase 來說是對的取捨。


兩種 Dispatch 模式

工具的呼叫有兩條路,取決於 LLM provider 是否支援原生 function calling:

graph TD A[LLM 回應] --> B{支援原生 Tool Calling?} B -->|是| C[NativeToolDispatcher] B -->|否| D[XmlToolDispatcher] C -->|解析 API 的 tool_calls 欄位| E[ParsedToolCall] D -->|解析文字裡的 XML tag| E E --> F["find_tool(name)"] F --> G["tool.execute(args)"] G --> H[ToolResult] H --> I[轉成 ConversationMessage] I --> J[送回 LLM 繼續下一輪]

NativeToolDispatcher 用於 Anthropic、OpenAI、Gemini——這些 provider 會在 API response 裡回傳結構化的工具呼叫,直接解析即可。

XmlToolDispatcher 用於不支援原生 function calling 的 LLM。此時 zeroclaw 把工具的說明注入到 system prompt,要求 LLM 用特定格式回應:

<tool_call>{"name": "shell", "arguments": {"command": "ls -la"}}</tool_call>

然後用字串解析從回應文字裡抓出這些 XML tag。

Agent loop 查找工具的方式很簡單:

fn find_tool<'a>(tools: &'a [Box<dyn Tool>], name: &str) -> Option<&'a dyn Tool> {
    tools.iter().find(|t| t.name() == name).map(|t| t.as_ref())
}

送給 LLM 的格式(以 OpenAI 為例):

fn tools_to_openai_format(tools: &[Box<dyn Tool>]) -> Vec<serde_json::Value> {
    tools.iter().map(|tool| {
        json!({
            "type": "function",
            "function": {
                "name": tool.name(),
                "description": tool.description(),
                "parameters": tool.parameters_schema()
            }
        })
    }).collect()
}

JSON Schema 的跨 Provider 正規化

各家 LLM API 對 JSON Schema 的支援程度差異很大,尤其是 Gemini——它會拒絕很多標準的 JSON Schema 關鍵字。src/tools/schema.rsSchemaCleanr 專門處理這件事:

pub enum CleaningStrategy {
    Gemini,       // 最嚴格
    Anthropic,
    OpenAI,       // 最寬鬆
    Conservative, // 保守策略,適合未知 provider
}

pub struct SchemaCleanr;

impl SchemaCleanr {
    pub fn clean_for_gemini(schema: Value) -> Value { ... }
    pub fn clean_for_anthropic(schema: Value) -> Value { ... }
    pub fn clean_for_openai(schema: Value) -> Value { ... }
}

各策略的主要差異:

關鍵字 Gemini Anthropic OpenAI
minLengthpattern 移除 移除 保留
$ref 內聯展開 內聯展開 保留
additionalProperties 移除 保留 保留
anyOf/oneOf 攤平為第一個型別 保留 保留
nullable 轉換格式 保留 保留

Gemini 甚至連 anyOf: [type: string, type: null](nullable 的常見寫法)都要特別轉換。這些邊緣情況全部藏在 SchemaCleanr 裡,工具本身完全不需要知道。


安全性是注入的,不是全域的

zeroclaw 的安全設計有個核心原則:安全策略是建構時注入的,不是全域的靜態狀態

// 每個有 I/O 的工具都在建構時收到 SecurityPolicy
pub struct FileReadTool {
    security: Arc<SecurityPolicy>,
}

pub struct ShellTool {
    security: Arc<SecurityPolicy>,
    runtime: Arc<dyn RuntimeAdapter>,
}

async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
    let path = args.get("path").and_then(|v| v.as_str())...;
    // 安全檢查在 execute 裡,繞不過去
    if !self.security.is_path_allowed(path) {
        return Ok(ToolResult { success: false, error: Some("Path not allowed".into()), ... });
    }
    // 繼續執行...
}

SecurityPolicy 強制執行工作區沙盒(防止讀寫工作目錄以外的路徑)、自主等級(ReadOnly、Supervised、Full)、rate limiting 和指令白名單。

Supervised 模式下還有 ApprovalManager,在執行高風險工具前暫停等待使用者確認:

[zeroclaw] Tool: shell
Command: rm -rf /tmp/build
[y]es / [n]o / [a]lways: _

always 之後,這個指令會被加進 session allowlist,下次遇到同樣的指令就不再詢問。


有趣的具體工具

DelegateTool:動態 Schema 的工具

DelegateTool 是最有趣的工具之一——它的 parameters_schema()執行時動態生成的,而不是靜態寫死的:

pub struct DelegateTool {
    agents: Arc<HashMap<String, DelegateAgentConfig>>,
    depth: u32,  // 防止無限委派遞迴
}

fn parameters_schema(&self) -> serde_json::Value {
    // 根據目前設定的 agent 列表動態生成 schema
    let agent_names: Vec<&str> = self.agents.keys().map(|s| s.as_str()).collect();
    json!({
        "properties": {
            "agent": {
                "type": "string",
                "description": format!("Which agent to delegate to. Available: {}",
                                       agent_names.join(", "))
            },
            "task": { "type": "string", "description": "Task description for the agent" }
        }
    })
}

async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
    if self.depth >= MAX_DELEGATION_DEPTH {
        return Ok(ToolResult { success: false, error: Some("Max delegation depth reached"), ... });
    }
    let provider = create_provider(&agent_config.provider, ...)?;
    // 120 秒 timeout,防止 sub-agent 跑太久
    tokio::time::timeout(Duration::from_secs(120),
        provider.chat_with_system(...)).await
}

depth 欄位防止 agent A 委派給 agent B、B 又委派回 A 的無限迴圈。

ShellTool:乾淨的執行環境

ShellTool 的安全設計很謹慎——它清空整個環境變數,只保留一個安全白名單:

async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
    // 清除所有環境變數,防止 API key 洩漏給子行程
    let mut cmd = Command::new("sh");
    cmd.env_clear();  // 全部清空
    for key in SAFE_ENV_VARS {  // 只加回白名單裡的
        if let Ok(val) = std::env::var(key) {
            cmd.env(key, val);
        }
    }
    // 60 秒 timeout,截斷輸出到 1MB
    tokio::time::timeout(Duration::from_secs(60), cmd.output()).await
}

SAFE_ENV_VARS 包含 PATHHOMELANG 這類系統必要的變數,但不包含任何 *_API_KEY*_SECRET 這類敏感變數。即使主行程有這些環境變數,子行程也看不到。


完整的 Tool Call 流程

sequenceDiagram participant U as 使用者 participant AL as Agent Loop participant P as Provider participant LLM as LLM API participant T as Tool U->>AL: 送出訊息 AL->>P: tools_to_provider_format(specs) P->>LLM: chat request + tool definitions LLM-->>P: 回應(含 tool calls) P-->>AL: ChatResponse { tool_calls: [...] } loop 最多 10 次 AL->>AL: find_tool(name) AL->>AL: ApprovalManager(Supervised 模式) AL->>T: execute(args) T->>T: SecurityPolicy 檢查 T-->>AL: ToolResult AL->>P: 把結果轉成 ConversationMessage P->>LLM: 繼續對話(含工具執行結果) LLM-->>P: 新回應 alt 不再呼叫工具 P-->>AL: 純文字回應 AL-->>U: 最終答覆 end end

整個流程最多迴圈 10 次(DEFAULT_MAX_TOOL_ITERATIONS)。超過就強制停止,防止 agent 無止境地呼叫工具。


小結

zeroclaw 的 Tool 設計幾個值得借鑑的地方:

  • 最小介面:四個必填方法,spec() 是免費的預設實作,新增工具的門檻很低。
  • 工廠函式,不靠反射:顯式組裝、條件啟用,依賴關係清晰,沒有魔法。
  • 兩種 Dispatch 兼顧:原生 API tool calling 和 XML prompt 注入都支援,對各種 LLM 都能用。
  • Schema 正規化是一等公民SchemaCleanr 把各家 API 的奇葩限制集中處理,工具本身不受污染。
  • 安全是建構時注入的SecurityPolicynew() 時就進去了,不是全域狀態,也無法繞過。
  • 執行失敗不等於程式崩潰ToolResult { success: false } 讓 agent 可以從失敗中學習,繼續嘗試。