最近用 Iced 實作了一個漫畫閱讀器——能打開 CBZ、CB7 格式的漫畫壓縮檔,支援鍵盤翻頁,還有背景預載機制讓翻頁幾乎感受不到延遲。
這篇文章聚焦在幾個有趣的設計決策:trait object 搭配 Arc 的多格式抽象、以「距離」驅逐的 LRU 快取、以及 smol::unblock 把同步 I/O 推進執行緒池的技巧。
專案概述
comic-viewer 支援的功能:
- 開啟 CBZ(ZIP)、CB7(7-Zip)格式漫畫檔
- CBR(RAR)格式顯示友善的錯誤訊息(需外部函式庫,目前為 stub)
- 鍵盤翻頁(方向鍵、PageUp/PageDown)
- 頁面快取(最多 7 頁),快取命中時翻頁瞬間完成
- 背景預載:目前頁面前後各 2 頁
- Tokyo Night Storm 主題的深色 UI
專案結構很清楚:
comic-viewer/
├── Cargo.toml
└── src/
├── main.rs # Iced 應用程式主體(App、Message、PageCache)
└── reader/
├── mod.rs # ComicReader trait + open() 工廠函式
├── cbz.rs # ZIP 格式實作
├── cb7.rs # 7-Zip 格式實作
└── cbr.rs # RAR stub
1. Arc<dyn ComicReader>:多格式的 Trait Object 抽象
不同壓縮格式的解壓縮方式差異很大,但對應用程式來說只需要「給我第 N 頁的圖片」。ComicReader trait 完美地封裝這個差異:
pub trait ComicReader: Send + Sync + std::fmt::Debug {
fn title(&self) -> &str;
fn page_count(&self) -> usize;
fn extract_page(&self, index: usize) -> Result<iced::widget::image::Handle, String>;
}
幾個設計細節值得注意:
Send + Sync:因為 App.comic 是 Option<Arc<dyn ComicReader>>,而 Arc<T> 要求 T: Send + Sync 才能跨執行緒共享。背景預載任務會 clone 這個 Arc 並在 smol 的執行緒池跑,所以必須是 Sync
&self 而非 &mut self:extract_page 刻意設計成不可變借用,這樣就能從多個執行緒同時呼叫,不需要加鎖
std::fmt::Debug:讓整個 App struct 可以 derive Debug(Iced 有時會要求)
工廠函式用副檔名做格式分派:
pub fn open(path: &Path) -> Result<Box<dyn ComicReader>, String> {
match path
.extension()
.and_then(|e| e.to_str())
.map(str::to_lowercase)
.as_deref()
{
Some("cbz") => cbz::CbzReader::open(path).map(|r| Box::new(r) as Box<dyn ComicReader>),
Some("cbr") => cbr::CbrReader::open(path).map(|r| Box::new(r) as Box<dyn ComicReader>),
Some("cb7") => cb7::Cb7Reader::open(path).map(|r| Box::new(r) as Box<dyn ComicReader>),
ext => Err(format!("Unsupported format: {}", ext.unwrap_or("none"))),
}
}
呼叫端接到 Box<dyn ComicReader> 後立刻包進 Arc::from(box),讓後續可以廉價 clone 給多個背景任務:
Task::perform(
async move { smol::unblock(move || reader::open(&path).map(Arc::from)).await },
Message::ComicLoaded,
)
Arc::from(box) 把 Box<dyn ComicReader> 轉成 Arc<dyn ComicReader>——這個轉換是零成本的,只是改變了「誰擁有這塊記憶體的所有權計數器」。
2. CBZ 與 CB7 的不同解壓策略
ZIP 和 7-Zip 的壓縮原理不同,導致兩種 reader 的實作策略截然不同。
CBZ(ZIP):隨機存取,按需解壓
ZIP 格式的每個檔案是獨立壓縮的,可以不看前面的 entry 直接跳到任意 entry 開始解壓。CbzReader 利用這個特性,只在記憶體中保留整份 .cbz 的原始位元組,每次 extract_page 再用 Cursor 包起來重新開一個 ZipArchive:
#[derive(Debug)]
pub struct CbzReader {
title: String,
pages: Vec<String>, // 排序後的圖片檔名列表
archive_bytes: Vec<u8>, // 整份 .cbz 的原始位元組
}
impl ComicReader for CbzReader {
fn extract_page(&self, index: usize) -> Result<image::Handle, String> {
let filename = self.pages.get(index)
.ok_or_else(|| format!("Page index {index} out of bounds"))?;
let cursor = std::io::Cursor::new(&self.archive_bytes);
let mut archive = zip::ZipArchive::new(cursor).map_err(|e| e.to_string())?;
let mut file = archive.by_name(filename).map_err(|e| e.to_string())?;
let mut bytes = Vec::new();
file.read_to_end(&mut bytes).map_err(|e| e.to_string())?;
Ok(image::Handle::from_bytes(bytes))
}
}
每次 extract_page 都重新建立 ZipArchive,看似浪費,但因為 ZipArchive::new 只是 parse ZIP 的 central directory(在檔案末尾,很快),解壓單頁的成本主要在 inflate,整體還是很快的。
CB7(7-Zip):區塊壓縮,開檔時全部解壓
7-Zip 使用 LZMA/LZMA2 區塊壓縮,整個壓縮包的資料是相互依賴的,沒辦法隨機存取單一 entry。Cb7Reader 乾脆在 open() 時一次解壓全部圖片,把位元組存進記憶體:
#[derive(Debug)]
pub struct Cb7Reader {
title: String,
pages: Vec<Vec<u8>>, // 每頁的原始圖片位元組,開檔時全部解壓好
}
impl Cb7Reader {
pub fn open(path: &Path) -> Result<Self, String> {
let mut reader = SevenZReader::open(path, Password::empty())
.map_err(|e| e.to_string())?;
let mut entries: Vec<(String, Vec<u8>)> = Vec::new();
reader.for_each_entries(|entry, reader| {
if entry.is_directory() || !super::is_image_file(entry.name()) {
return Ok(true); // 跳過,繼續
}
let mut bytes = Vec::new();
reader.read_to_end(&mut bytes).map_err(sevenz_rust::Error::from)?;
entries.push((entry.name().to_owned(), bytes));
Ok(true)
}).map_err(|e| e.to_string())?;
entries.sort_by(|(a, _), (b, _)| a.cmp(b));
// ...
}
}
開檔比 CBZ 慢(要解壓全部內容),但 extract_page 就只是 .clone() 一個 Vec<u8>,非常快。
這是一個典型的「把開銷前移」設計:把慢的動作(解壓縮)移到啟動時,換取後續每次存取都接近零成本。
CBR(RAR):誠實的 Stub
RAR 格式需要 unrar native library,不是純 Rust 可以輕鬆搞定的。CbrReader 選擇一個直白但很務實的做法:
impl CbrReader {
pub fn open(_path: &Path) -> Result<Self, String> {
Err(
"CBR (RAR) format is not yet supported. \
Consider converting to CBZ using Calibre or 7-Zip."
.to_string(),
)
}
}
在 open() 時就立刻回傳清楚的錯誤和建議,不讓使用者走到一半才踩雷。這樣的 stub 還有個好處:整個程式的型別系統是完整的,未來只需要填充 CbrReader::open 的實作就好,其餘不用改。
3. 以「距離」驅逐的 LRU 快取
傳統 LRU 快取驅逐最久未使用的項目,但對漫畫閱讀器來說「最久未使用」不是最好的策略——你更想保留目前頁面周圍的頁面,而不是剛才看完的頁面。
PageCache 改用「距離目前頁面最遠的項目優先驅逐」:
const CACHE_CAPACITY: usize = 7;
struct PageCache {
entries: HashMap<usize, image::Handle>,
}
impl PageCache {
fn insert(&mut self, index: usize, handle: image::Handle, current_page: usize) {
if !self.entries.contains_key(&index) && self.entries.len() >= CACHE_CAPACITY {
// 找距離 current_page 最遠的 key
let evict = *self
.entries
.keys()
.max_by_key(|&&k| k.abs_diff(current_page))
.expect("cache is non-empty");
self.entries.remove(&evict);
}
self.entries.insert(index, handle);
}
}
usize::abs_diff 計算兩個 usize 的絕對差值,不需要轉成有號整數,在 Rust 1.60 加入標準函式庫。
有一個借用技巧值得注意:
// usize is Copy,所以可以在可變借用前先把 key 複製出來
let evict = *self
.entries
.keys()
.max_by_key(|&&k| k.abs_diff(current_page))
.expect("cache is non-empty");
self.entries.remove(&evict); // 這裡才開始可變借用
如果寫成 let evict_ref = self.entries.keys().max_by_key(...) 然後直接傳給 remove,借用檢查器會拒絕——因為 keys() 返回的 iterator 持有對 self.entries 的不可變借用,而 remove 需要可變借用。提前 * 解引用並把 usize(Copy 型別)複製出來,借用就在那一行結束了。
為什麼 capacity 選 7?前後各 2 頁(4 頁)加上目前頁面(1 頁)= 5,多留 2 頁緩衝,讓快速翻頁時舊頁面還在快取裡。
持有 image::Handle 的好處是:Iced 會把解碼後的圖片上傳到 GPU 紋理,而 Handle 是對那份紋理的引用(Arc 語義)。只要 Handle 活著,GPU 上的資料就不會被回收——快取命中時翻頁是真的零解碼。
4. Iced 的 MVU 架構與 Task 系統
Iced 採用 Elm 架構(Model-View-Update,MVU)。整個應用程式的運作流程是:
graph LR
A[用戶操作 / 事件] -->|產生| B[Message]
B -->|傳入| C[update 函式]
C -->|修改| D[App 狀態]
C -->|回傳| E[Task]
E -->|完成後產生| B
D -->|傳入| F[view 函式]
F -->|產生| G[Element UI 樹]
G -->|渲染到螢幕| A
Message enum 是所有事件的完整清單:
#[derive(Debug, Clone)]
enum Message {
OpenFile,
FileSelected(Option<PathBuf>),
ComicLoaded(Result<Arc<dyn reader::ComicReader>, String>),
NextPage,
PrevPage,
PagePreloaded(usize, Option<image::Handle>),
}
Task<Message> 是 Iced 的非同步工作單元。update 函式回傳 Task,Iced 的 runtime 會執行它,完成後把結果包成 Message 再餵回 update。這讓狀態轉換邏輯永遠是同步、純粹的,而非同步副作用完全由 Task 處理。
開檔的流程就是兩個 Task 串接:
// 第一步:呼叫原生對話框
Message::OpenFile => {
Task::perform(
async {
rfd::AsyncFileDialog::new()
.add_filter("Comic Book Archive", &["cbz", "cbr", "cb7"])
.pick_file()
.await
.map(|f| f.path().to_owned())
},
Message::FileSelected, // 完成後產生 FileSelected(Option<PathBuf>)
)
}
// 第二步:在執行緒池解壓縮並建立 Reader
Message::FileSelected(Some(path)) => Task::perform(
async move { smol::unblock(move || reader::open(&path).map(Arc::from)).await },
Message::ComicLoaded, // 完成後產生 ComicLoaded(Result<Arc<...>, String>)
),
5. smol::unblock:把同步 I/O 推進執行緒池
Iced 的 async runtime 是基於 futures-executor(smol 生態系),不是 Tokio——從 Cargo.lock 可以確認整個依賴樹裡根本沒有 tokio。但不管底層是哪套 executor,async 執行緒都不適合跑耗時的同步運算(如解壓縮)。如果直接在 async task 裡呼叫 zip::ZipArchive::new(),會阻塞整個執行緒,造成 UI 卡頓。
smol::unblock 是解法:把一個同步的閉包包起來,丟進 smol 的阻塞執行緒池(blocking thread pool)跑,並回傳一個 Future:
// 背景預載的單頁提取
let comic = Arc::clone(comic);
Task::perform(
async move {
let handle = smol::unblock(move || comic.extract_page(idx).ok()).await;
(idx, handle)
},
|(idx, handle)| Message::PagePreloaded(idx, handle),
)
smol::unblock 的原理很直觀:它維護一個獨立的執行緒池,專門用來跑阻塞工作。move || 閉包在那個執行緒上同步執行,.await 等它完成,結果就像一個普通的 Future。
這裡 Arc::clone(comic) 非常關鍵——每個預載任務需要獨立持有對 ComicReader 的引用,才能在自己的執行緒上呼叫 extract_page,而且這份 clone 是 O(1) 的引用計數遞增,不是深複製。
6. 預載窗口與 Task::batch
preload_adjacent 計算出需要預載的頁面集合,然後用 Task::batch 同時啟動所有任務:
fn preload_adjacent(&self, around: usize) -> Task<Message> {
let Some(comic) = &self.comic else { return Task::none(); };
let page_count = comic.page_count();
let start = around.saturating_sub(PRELOAD_LOOKAHEAD); // 避免 usize 下溢
let end = (around + PRELOAD_LOOKAHEAD).min(page_count - 1);
let candidates: Vec<usize> = (start..=end)
.filter(|&i| i != around && !self.page_cache.contains(i))
.collect();
if candidates.is_empty() {
return Task::none();
}
let tasks: Vec<Task<Message>> = candidates
.into_iter()
.map(|idx| {
let comic = Arc::clone(comic);
Task::perform(
async move {
let handle = smol::unblock(move || comic.extract_page(idx).ok()).await;
(idx, handle)
},
|(idx, handle)| Message::PagePreloaded(idx, handle),
)
})
.collect();
Task::batch(tasks) // 並行啟動所有預載任務
}
saturating_sub 是個小細節:around 是 usize,如果 around < PRELOAD_LOOKAHEAD 直接相減會 panic(或在 debug 模式 panic,release 模式則 wrap around)。saturating_sub 讓結果最低為 0,不需要額外的 if around >= PRELOAD_LOOKAHEAD 判斷。
Task::batch 把多個 Task 打包成一個,Iced 會並行執行它們。這意味著如果需要預載頁面 4 和頁面 6,兩個解壓縮任務會同時在執行緒池跑,而不是串行。
7. on_press_maybe:條件式按鈕啟用
Iced 提供了一個很方便的 API 來處理「條件式可按」的按鈕:
let can_prev = self.current_page > 0;
let can_next = self
.comic
.as_ref()
.is_some_and(|c| self.current_page + 1 < c.page_count());
let prev_btn = button(text("◄ Previous").size(14))
.on_press_maybe((has_comic && can_prev).then_some(Message::PrevPage));
let next_btn = button(text("Next ►").size(14))
.on_press_maybe((has_comic && can_next).then_some(Message::NextPage));
on_press_maybe 接受 Option<Message>:Some(msg) 代表按鈕可按,None 代表禁用。搭配 bool::then_some(true 回傳 Some(value),false 回傳 None),可以非常簡潔地表達「有符合條件才能按」。
禁用狀態的視覺樣式也透過 nav_button_style 函式裡的 button::Status::Disabled 分支處理:
fn nav_button_style(_theme: &Theme, status: button::Status) -> button::Style {
let bg = match status {
button::Status::Hovered | button::Status::Pressed => iced::color!(0x565f89),
button::Status::Disabled => iced::color!(0x292e42), // 暗色,表示禁用
_ => iced::color!(0x414868),
};
let text_color = match status {
button::Status::Disabled => iced::color!(0x3d4168), // 文字也變暗
_ => iced::color!(0xc0caf5),
};
// ...
}
8. Subscription:鍵盤事件的全域監聽
Subscription 是 Iced 中持續監聽某個事件來源的機制,不同於 Task(一次性任務),Subscription 會在應用程式整個生命週期都保持活躍。
鍵盤翻頁用 iced::keyboard::on_key_press 建立訂閱:
fn subscription(&self) -> Subscription<Message> {
iced::keyboard::on_key_press(|key, _modifiers| match key.as_ref() {
iced::keyboard::Key::Named(Named::ArrowRight)
| iced::keyboard::Key::Named(Named::PageDown) => Some(Message::NextPage),
iced::keyboard::Key::Named(Named::ArrowLeft)
| iced::keyboard::Key::Named(Named::PageUp) => Some(Message::PrevPage),
_ => None,
})
}
閉包接受按鍵和修飾鍵,回傳 Option<Message>:Some(msg) 代表這個按鍵要觸發對應的 message,None 代表忽略。這讓翻頁鍵(方向鍵 + PageUp/PageDown)不需要點擊按鈕就能使用,閱讀體驗更自然。
key.as_ref() 是必要的——iced::keyboard::Key 包含 owned 的字元資料,.as_ref() 轉成借用版本才能做 pattern matching 而不移動所有權。
總結
| 概念 |
在專案中的應用 |
Arc<dyn Trait> |
ComicReader 跨執行緒共享,零成本 clone 給預載任務 |
Send + Sync bound |
確保 trait object 可安全跨執行緒使用 |
&self on trait method |
允許多執行緒並發呼叫 extract_page,不需鎖 |
| 按需解壓 vs 全量預載 |
ZIP 隨機存取 vs 7-Zip 區塊壓縮,各自最佳策略 |
| 距離驅逐快取 |
比傳統 LRU 更適合線性翻頁場景 |
abs_diff + Copy 借用技巧 |
規避借用檢查器限制的慣用寫法 |
Task + Task::batch |
Iced 的非同步副作用模型,並行預載 |
smol::unblock |
把同步阻塞 I/O 推進獨立執行緒池 |
on_press_maybe |
條件式按鈕啟用,搭配 then_some 很簡潔 |
Subscription |
全域鍵盤事件監聽,生命週期同整個應用程式 |
漫畫閱讀器是一個很好的 Rust GUI 練習專案:需要處理多種檔案格式(trait object 的最佳用武之地)、需要快取和預載(資料結構設計)、需要在不卡 UI 的前提下做 I/O(非同步設計)。如果你正在學 Iced 的 MVU 架構,這個專案的規模剛好——不會太小而沒有學習點,也不會大到難以消化。
參考資源
zeroclaw 的 observability 模組是個教科書等級的 Rust trait-based plugin 設計。它讓你在編譯期決定要不要開啟可觀測性,同時支援 Prometheus、OpenTelemetry 等多種後端,還能零成本地完全關閉。
問題:框架的 Observability 該怎麼設計?
你寫了一個 AI agent 框架,你想讓使用者可以選擇怎麼觀察它的行為——有人想接 Prometheus,有人想接 OpenTelemetry,有人根本不在乎、只要 log 就好,還有人在測試時一個 I/O 都不想多做。
這種「插件式後端選擇」的設計,zeroclaw 用了一個很乾淨的 trait object 架構來解決。
核心抽象:Observer Trait
src/observability/traits.rs 定義了整個系統的骨架:
pub trait Observer: Send + Sync + 'static {
fn record_event(&self, event: &ObserverEvent);
fn record_metric(&self, metric: &ObserverMetric);
fn flush(&self) {}
fn name(&self) -> &str;
fn as_any(&self) -> &dyn std::any::Any;
}
Send + Sync + 'static 是為了讓 Arc<dyn Observer> 可以跨 async task 傳遞——這個組合在 tokio 生態系幾乎是標配。
flush() 有預設實作(空的),只有需要批次傳送的後端(例如 OTel)才需要覆寫。
as_any() 則是個小技巧,讓你可以在必要時從 dyn Observer 向下轉型(downcast)到具體型別——後面會看到它的用途。
兩種觀測維度:Event vs Metric
zeroclaw 把可觀測的資訊分成兩類:
ObserverEvent:領域事件
pub enum ObserverEvent {
AgentStart { provider: String, model: String },
LlmRequest { provider: String, model: String, messages_count: usize },
LlmResponse {
provider: String,
model: String,
duration: Duration,
success: bool,
error_message: Option<String>,
},
AgentEnd {
provider: String,
model: String,
duration: Duration,
tokens_used: Option<u64>,
cost_usd: Option<f64>,
},
ToolCallStart { tool: String },
ToolCall { tool: String, duration: Duration, success: bool },
TurnComplete,
ChannelMessage { channel: String, direction: String },
HeartbeatTick,
Error { component: String, message: String },
}
Event 是「某件事發生了」——agent 啟動、LLM 回應、工具被呼叫等。每個 variant 都帶有足夠的上下文,讓後端可以根據需要提取資訊。
ObserverMetric:數值度量
pub enum ObserverMetric {
RequestLatency(Duration),
TokensUsed(u64),
ActiveSessions(u64),
QueueDepth(u64),
}
Metric 是「一個數值」——延遲、用量、隊列深度。這類資料更適合作為 gauge 或 histogram 送到監控系統。
這個分法很務實:Prometheus 要的是數值,OTel 兩者都要,log 後端則把兩者都轉成文字。
後端實作:從零成本到全功能
Noop:真正的零成本
pub struct NoopObserver;
impl Observer for NoopObserver {
#[inline(always)]
fn record_event(&self, _event: &ObserverEvent) {}
#[inline(always)]
fn record_metric(&self, _metric: &ObserverMetric) {}
fn name(&self) -> &str { "noop" }
fn as_any(&self) -> &dyn Any { self }
}
#[inline(always)] 是關鍵。LLVM 看到這個後會把所有呼叫點直接展開為空操作,最終生成的機器碼裡根本沒有 observer 相關的指令。這就是 Rust 的「零成本抽象」在實務中的樣子。
Log:接 tracing 生態系
impl Observer for LogObserver {
fn record_event(&self, event: &ObserverEvent) {
match event {
ObserverEvent::AgentStart { provider, model } => {
info!(provider = %provider, model = %model, "agent.start");
}
ObserverEvent::LlmResponse { duration, success, .. } => {
info!(duration_ms = duration.as_millis(), success, "llm.response");
}
// ...
}
}
}
用 tracing crate 做結構化 logging,每個事件對應一條有 key-value 欄位的 log 記錄。
Verbose:給互動式 CLI 的人看
impl Observer for VerboseObserver {
fn record_event(&self, event: &ObserverEvent) {
match event {
ObserverEvent::LlmRequest { provider, model, messages_count } => {
eprintln!("> Thinking");
eprintln!("> Send (provider={}, model={}, messages={})",
provider, model, messages_count);
}
ObserverEvent::LlmResponse { duration, success, .. } => {
eprintln!("< Receive (success={}, duration_ms={})",
success, duration.as_millis());
}
// ...
}
}
}
用 > 和 < 表示「送出」和「收到」,讓使用者在終端機就能感受到 agent 的呼吸感,不需要看 log 或架監控。
Prometheus:指標抓取
pub struct PrometheusObserver {
registry: Registry,
agent_starts: IntCounterVec,
agent_duration: HistogramVec,
llm_calls: IntCounterVec,
llm_duration: Histogram,
tool_calls: IntCounterVec,
// ...
}
每個欄位對應一個 Prometheus 指標。Event 進來後,依照類型對相應的 counter 或 histogram 做操作。
Prometheus 後端有個特別之處:它需要暴露一個 /metrics HTTP endpoint 讓 scraper 來抓。但 Observer trait 本身沒有這個方法——這就是 as_any() 的用武之地:
// 在 gateway 需要回應 /metrics 請求時
state.observer
.as_any()
.downcast_ref::<PrometheusObserver>()
.map(|prom| prom.encode())
透過 downcast 繞過了 trait 的限制,讓 Prometheus 特有的操作可以被存取,而不需要把這個方法塞進通用的 Observer trait。
OpenTelemetry:完整的追蹤 + 指標
pub struct OtelObserver {
tracer_provider: SdkTracerProvider,
meter_provider: SdkMeterProvider,
agent_starts: Counter<u64>,
agent_duration: Histogram<f64>,
llm_calls: Counter<u64>,
llm_duration: Histogram<f64>,
// ...
}
OTel 後端最有意思的地方是它會從 event 裡還原 span:
// 收到 LlmResponse 事件後,用 duration 計算出 span 的開始時間
let start_time = SystemTime::now() - duration;
let mut span = tracer.build(
SpanBuilder::from_name("llm.call")
.with_start_time(start_time)
.with_attributes(vec![
KeyValue::new("provider", provider.clone()),
KeyValue::new("model", model.clone()),
KeyValue::new("success", *success),
])
);
span.set_status(if *success { Status::Ok } else { Status::error("") });
span.end();
因為 ObserverEvent 是事後才收到的(帶有 duration 欄位),OTel 後端就把開始時間「倒推」回去,製造出一個正確時間範圍的 span。這讓整個系統可以保持 event-sourced 的設計,而不需要在程式碼的每個角落都維護 span context。
Multi:扇出組合
pub struct MultiObserver {
observers: Vec<Box<dyn Observer>>,
}
impl Observer for MultiObserver {
fn record_event(&self, event: &ObserverEvent) {
for obs in &self.observers {
obs.record_event(event);
}
}
}
想同時送 Prometheus 又送 OTel?把它們包進 MultiObserver 就好。組合模式(Composite Pattern)在這裡用得剛好。
工廠函數:從設定到實體
pub fn create_observer(config: &ObservabilityConfig) -> Box<dyn Observer> {
match config.backend.as_str() {
"log" => Box::new(LogObserver::new()),
"prometheus" => Box::new(PrometheusObserver::new()),
"otel" | "opentelemetry" | "otlp" => {
match OtelObserver::new(config.otel_endpoint.as_deref(), ...) {
Ok(obs) => Box::new(obs),
Err(e) => {
tracing::error!("Failed to create OTel observer: {e}. Falling back to noop.");
Box::new(NoopObserver)
}
}
}
_ => Box::new(NoopObserver),
}
}
設定結構很精簡:
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObservabilityConfig {
pub backend: String, // "none" | "log" | "prometheus" | "otel"
pub otel_endpoint: Option<String>, // 預設 http://localhost:4318
pub otel_service_name: Option<String>, // 預設 "zeroclaw"
}
impl Default for ObservabilityConfig {
fn default() -> Self {
Self { backend: "none".into(), .. }
}
}
預設是 "none",也就是 NoopObserver——除非你明確設定,否則完全沒有額外開銷。
這個工廠函數還有個好設計:OTel 初始化失敗(例如 endpoint 連不上)時,它不會 panic,而是靜默降級成 Noop,同時印一個 error log。可觀測性本身不應該成為系統的單點故障。
整合:Agent 和 Gateway 的用法
Agent 和 Gateway 都用 Arc<dyn Observer> 持有 observer:
pub struct Agent {
observer: Arc<dyn Observer>,
// ...
}
整個 agent 生命週期裡,事件就這樣流出去:
self.observer.record_event(&ObserverEvent::AgentStart {
provider: provider_name.clone(),
model: self.model_name.clone(),
});
// ... 執行 LLM 呼叫 ...
self.observer.record_event(&ObserverEvent::LlmResponse {
provider: provider_name.clone(),
model: self.model_name.clone(),
duration,
success: true,
error_message: None,
});
Arc 讓 observer 可以廉價地被 clone 並跨 async task 共享,每個後端自己處理執行緒安全的問題。
設計亮點總結
zeroclaw 的 observability 層之所以好,是因為它做了幾個清晰的取捨:
| 設計決策 |
理由 |
Arc<dyn Observer> 而非泛型參數 |
避免 generic viral 污染整個 call stack |
| 領域 enum 而非字串 event |
型別安全,編譯器幫你確認所有 variant 都有處理 |
NoopObserver + #[inline(always)] |
確保關閉時真的零成本,不只是概念上的零成本 |
| 工廠函數失敗時降級 |
可觀測性不是核心功能,不能影響主流程 |
as_any() 開後門 |
讓 Prometheus 這類需要特殊 API 的後端有辦法被使用,而不需要污染通用 trait |
| OTel 用倒推時間還原 span |
保持呼叫點的 API 簡單,不需要到處傳 span context |
這套設計很適合作為「如何在 Rust 框架裡設計可插拔後端」的參考範本。
延伸自 理解 Rust 的 Pin:從問題到解法。as_mut() 是在 Pin 世界裡做 reborrow 的核心操作,如果要手動實作 Future 或 Combinator,你幾乎一定會用到它。
問題:Pin 被消費了怎麼辦?
Future::poll 的簽名長這樣:
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Output>
注意 self: Pin<&mut Self> 是值傳遞——Pin<&mut T> 沒有 Copy,傳進函數就被 move 走了。
如果你的 Future 裡面有一個 inner future 需要被 poll,問題就來了:
struct MyFuture {
inner: SomeFuture,
}
impl Future for MyFuture {
type Output = i32;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<i32> {
// inner 也需要 Pin<&mut SomeFuture> 才能被 poll
// 但 self 是 Pin<&mut MyFuture>,
// 不能直接取出 &mut self.inner,因為那樣可能讓 inner 被 move
self.inner.poll(cx) // ❌ SomeFuture::poll 需要 Pin<&mut Self>,不是 &mut Self
}
}
as_mut():Pin 的 Reborrow
先看普通 &mut T:Rust 在函數呼叫點自動 reborrow,所以 r 不會被消費:
fn takes_mut(r: &mut i32) { *r += 1; }
let mut x = 0;
let r = &mut x;
takes_mut(r); // Rust 自動 reborrow,等同於 takes_mut(&mut *r)
takes_mut(r); // ✅ r 沒有被消費,可以繼續使用
但 Pin<&mut T> 沒有這個自動 reborrow。把 Pin<&mut T> 傳進函數,它真的會被 move 掉:
fn poll_it<F: Future>(p: Pin<&mut F>, cx: &mut Context<'_>) -> Poll<F::Output> {
p.poll(cx)
}
let mut fut = some_async_fn();
// 用 unsafe 建立 Pin<&mut _> 做示範
let mut pinned = unsafe { Pin::new_unchecked(&mut fut) };
poll_it(pinned, cx); // pinned 被 move 進去,消費掉了
poll_it(pinned, cx); // ❌ 編譯錯誤:use of moved value: `pinned`
Pin::as_mut() 就是為 Pin<&mut T> 補上這個能力——手動 reborrow:
impl<P: DerefMut> Pin<P> {
pub fn as_mut(&mut self) -> Pin<&mut P::Target>
}
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<i32> {
// as_mut() 借出一個新的 Pin<&mut MyFuture>
// 原本的 self 還活著,可以繼續使用
let _pin1 = self.as_mut(); // Pin<&mut MyFuture>
let _pin2 = self.as_mut(); // 還可以再借
}
但這樣還是拿到 Pin<&mut MyFuture>,不是 Pin<&mut SomeFuture>,怎麼取得 inner 欄位的 Pin?
Projection:從外層 Pin 投影到內層欄位
「把 Pin<&mut Outer> 轉成 Pin<&mut Inner>」這個操作叫做 Pin Projection(投影)。
手動做需要 unsafe:
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<i32> {
// as_mut() 先做 reborrow
// map_unchecked_mut() 再投影到 inner 欄位
let inner_pin: Pin<&mut SomeFuture> = unsafe {
self.as_mut().map_unchecked_mut(|s| &mut s.inner)
};
inner_pin.poll(cx)
}
map_unchecked_mut 標記為 unsafe 是因為你必須自己保證:
- inner 欄位在整個
MyFuture 被 drop 之前不會被 move 出去
- 你不會同時製造兩個指向同一個欄位的可變引用
實務:用 pin_project 消除 unsafe
手動寫 projection 容易出錯,實務上幾乎都用 pin-project crate:
use pin_project::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
#[pin_project]
struct Timeout<F: Future> {
#[pin] // ← 標記這個欄位需要被 pin
inner: F,
deadline: std::time::Instant,
}
impl<F: Future> Future for Timeout<F> {
type Output = Option<F::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.project(); // 安全地做 projection,無需 unsafe
// this.inner 的型別是 Pin<&mut F> ← 已經 pin 好,可以直接 poll
// this.deadline 的型別是 &mut Instant ← 普通可變引用
if std::time::Instant::now() >= *this.deadline {
return Poll::Ready(None);
}
match this.inner.poll(cx) {
Poll::Ready(v) => Poll::Ready(Some(v)),
Poll::Pending => Poll::Pending,
}
}
}
#[pin_project] 這個 proc macro 幫你在背後生成了安全的 projection 程式碼,你只需要在欄位上標 #[pin] 就好。
as_mut() 在 select! 裡的作用
as_mut() 不只在自訂 Future 裡用到,tokio::select! 在 loop 裡重複 poll 同一個 Future 時,也依賴相同的概念:
use tokio::pin;
let long_op = some_slow_task();
pin!(long_op); // 現在 long_op 是 Pin<&mut impl Future>
loop {
tokio::select! {
result = &mut long_op => { // &mut 在這裡做的正是 reborrow
println!("完成:{:?}", result);
break;
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {
println!("還在等...");
}
}
}
&mut long_op 對 Pin<&mut F> 做 reborrow,每次進入 select! 都借出一個新的 Pin<&mut F>,讓同一個 future 可以被跨多次迴圈反覆 poll,狀態不會丟失。
整理:哪時用哪個方法?
Pin<&mut T>
├── as_mut() → Pin<&mut T>
│ ├── 用於:reborrow,讓同一個 Pin 可以被多次使用
│ └── 然後可以接 map_unchecked_mut() 做 projection(通常讓 pin_project 來)
│
├── get_mut() → &mut T (只有 T: Unpin 才能用)
│ └── 用於:徹底解除 Pin 約束,拿回普通可變引用
│
└── map_unchecked_mut() → Pin<&mut U> (unsafe)
└── 用於:手動 projection 到欄位
小結
as_mut() 是 Pin 版的 reborrow,解決「Pin<&mut T> 傳進函數就消費掉」的問題
- 手動 Future 實作幾乎都需要
as_mut() + projection 來取得 inner future 的 Pin
- 實務上用
pin_project crate 的 #[pin] 標記讓 macro 幫你安全地做 projection
tokio::select! 的 &mut pinned_future 語法本質上也是 as_mut() 的 reborrow
observability
zeroclaw 的 Memory 是 agent 的長期記憶——它記錄對話歷史、使用者偏好、決策紀錄,並在每次對話前把相關記憶召回注入給 LLM。這篇記錄它如何用一個 trait 統一四種後端(SQLite、Markdown、Lucid、Postgres),以及它最有趣的設計:混合式語意搜尋、Embedding LRU 快取、還有讓 agent 在冷啟動時從 Markdown 文字檔重建「靈魂」的機制。
Memory Trait:七個方法,全部必填
跟 Channel 和 Provider 的設計不同——後兩者都有預設實作——Memory trait 的七個方法全部是必填的:
#[async_trait]
pub trait Memory: Send + Sync {
fn name(&self) -> &str;
async fn store(
&self,
key: &str,
content: &str,
category: MemoryCategory,
session_id: Option<&str>,
) -> anyhow::Result<()>;
async fn recall(
&self,
query: &str,
limit: usize,
session_id: Option<&str>,
) -> anyhow::Result<Vec<MemoryEntry>>;
async fn get(&self, key: &str) -> anyhow::Result<Option<MemoryEntry>>;
async fn list(
&self,
category: Option<&MemoryCategory>,
session_id: Option<&str>,
) -> anyhow::Result<Vec<MemoryEntry>>;
async fn forget(&self, key: &str) -> anyhow::Result<bool>;
async fn count(&self) -> anyhow::Result<usize>;
async fn health_check(&self) -> bool;
}
這反映了「每種後端的搜尋策略都根本不同」——SQLite 用 FTS5 + 向量搜尋,Markdown 用關鍵字比對,Postgres 用 ILIKE——沒有一個合理的預設實作能套用在所有後端,所以索性不提供。
核心資料型別
pub struct MemoryEntry {
pub id: String,
pub key: String,
pub content: String,
pub category: MemoryCategory,
pub timestamp: String,
pub session_id: Option<String>,
pub score: Option<f64>, // 只有 recall() 會填,get()/list() 都是 None
}
pub enum MemoryCategory {
Core, // 長期事實、偏好、決策
Daily, // 每日工作日誌
Conversation, // 對話上下文
Custom(String), // 開放式擴充點
}
score 的設計很有趣:它是 recall() 的副產品,代表這筆記憶和查詢的相關度。get() 和 list() 都是精確查找,沒有「相關度」的概念,所以這個欄位在那兩個情境下永遠是 None。Custom(String) 則是一個逃生艙,讓使用者可以自訂任意分類而不需要修改 enum。
四個後端,一個介面
zeroclaw 目前有四個實作(加上一個 no-op):
| 後端 |
搜尋方式 |
特性 |
SqliteMemory |
FTS5 + 向量 (cosine) |
推薦預設,單一 .db 檔 |
MarkdownMemory |
關鍵字比對 |
純文字,只能追加 |
LucidMemory |
SQLite + 外部 CLI |
橋接第三方記憶工具 |
PostgresMemory |
ILIKE 模糊比對 |
雲端持久化,無 pgvector |
NoneMemory |
— |
完全 no-op,停用記憶 |
SQLite:真正的大腦
SqliteMemory 是推薦的預設後端,資料存在 workspace/memory/brain.db。它的設計最複雜,也最值得細看。
Schema
三張表,各有不同用途:
-- 主要記憶表
CREATE TABLE memories (
id TEXT PRIMARY KEY,
key TEXT UNIQUE NOT NULL, -- upsert 的 identity key
content TEXT NOT NULL,
category TEXT NOT NULL DEFAULT 'core',
embedding BLOB, -- f32 向量,little-endian bytes
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
session_id TEXT
);
-- FTS5 全文搜尋虛擬表
CREATE VIRTUAL TABLE memories_fts USING fts5(
key, content,
content=memories, content_rowid=rowid
);
-- Embedding LRU 快取
CREATE TABLE embedding_cache (
content_hash TEXT PRIMARY KEY, -- SHA-256 前 16 hex chars
embedding BLOB NOT NULL,
created_at TEXT NOT NULL,
accessed_at TEXT NOT NULL -- LRU 用
);
FTS5 透過三個 trigger(INSERT、DELETE、UPDATE)和主表保持同步。store() 用的是 INSERT ... ON CONFLICT(key) DO UPDATE——upsert 語意,key 是唯一識別碼,id 是 UUID(每次 upsert 都會更新)。
SQLite 開啟時的 PRAGMA 調校:WAL 模式、8 MB mmap、2 MB page cache、MEMORY temp store。
為什麼用 spawn_blocking?
rusqlite 是 blocking API,直接在 async context 呼叫會卡住 tokio runtime。所有 SQLite 操作都包在 tokio::task::spawn_blocking 裡:
let result = tokio::task::spawn_blocking(move || {
let conn = db.lock();
conn.execute("INSERT INTO memories ...", params![...])?;
Ok(())
}).await??;
連線的開啟還多了一層防護——用 worker thread + mpsc::channel 加上 recv_timeout,防止 SQLite 檔案被鎖住時永遠卡住:
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let _ = tx.send(Connection::open(&path));
});
match rx.recv_timeout(Duration::from_secs(capped)) {
Ok(Ok(conn)) => conn,
Ok(Err(e)) => return Err(e.into()),
Err(_) => return Err(anyhow!("SQLite open timed out")),
}
混合語意搜尋:FTS5 + 向量
recall() 的實作是整個記憶系統最有趣的部分。它不是單一的搜尋策略,而是三層 fallback:
graph TD
A["recall(query, limit)"] --> B{有設定 Embedding provider?}
B -->|是| C[生成查詢向量]
C --> D[FTS5 BM25 搜尋]
D --> E["hybrid_merge(向量結果, 關鍵字結果)"]
E --> F[回傳 Top-N]
B -->|否| G[純 BM25 搜尋]
G --> F
D -->|FTS5 解析錯誤| H["LIKE %keyword% 兜底"]
H --> F
向量:純 Rust Cosine Similarity
向量以 f32 little-endian bytes 存在 BLOB 欄位。Cosine similarity 是純 Rust 實作,刻意用 f64 累加以減少高維向量的浮點誤差:
pub fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
let mut dot = 0.0_f64;
let mut norm_a = 0.0_f64;
let mut norm_b = 0.0_f64;
for (x, y) in a.iter().zip(b.iter()) {
let x = f64::from(*x);
let y = f64::from(*y);
dot += x * y;
norm_a += x * x;
norm_b += y * y;
}
let denom = norm_a.sqrt() * norm_b.sqrt();
if !denom.is_finite() || denom < f64::EPSILON {
return 0.0;
}
(dot / denom).clamp(0.0, 1.0) as f32
}
這是暴力掃描所有有 embedding 的 row,沒有 ANN index(如 HNSW、IVFFlat)。對一個本地 agent 的記憶量來說夠用,不需要引入 pgvector 或外部向量 DB 的依賴。
關鍵字:FTS5 BM25
查詢前先把每個 token 用雙引號包住再用 OR 連接,避免 FTS5 特殊字元造成解析錯誤:
let fts_query = query
.split_whitespace()
.map(|w| format!("\"{w}\""))
.collect::<Vec<_>>()
.join(" OR ");
FTS5 的 bm25() 分數是負數(越負越相關),取負後變成越大越好,和 cosine similarity 的方向統一。
混合合併
pub fn hybrid_merge(
vector_results: &[(String, f32)], // (id, cosine)
keyword_results: &[(String, f32)], // (id, -BM25)
vector_weight: f32, // 預設 0.7
keyword_weight: f32, // 預設 0.3
limit: usize,
) -> Vec<ScoredResult> {
// BM25 分數正規化到 [0,1]
// final_score = 0.7 * cosine + 0.3 * normalized_BM25
// 去重:HashMap keyed on id
}
合併後的 Top-N ID 用一次 WHERE id IN (...) 批次撈出,避免 N+1 query。
Embedding LRU 快取
每次 store() 都需要為新內容生成 embedding,但相同的文字不應該重複打 API。快取的 key 是內容的 SHA-256 前 16 hex chars——明確選擇 SHA-256 而不是 DefaultHasher,因為後者的輸出在不同 Rust 版本之間不保證穩定。
LRU 淘汰不在 Rust 記憶體裡維護資料結構,而是直接在 SQL 裡做:
-- 保留最新的 N 筆,刪除多餘的舊資料
DELETE FROM embedding_cache WHERE content_hash IN (
SELECT content_hash FROM embedding_cache
ORDER BY accessed_at ASC
LIMIT MAX(0, (SELECT COUNT(*) FROM embedding_cache) - ?1)
)
這樣快取的淘汰邏輯完全不依賴記憶體狀態,重啟之後一樣有效。
Embedding Provider
Embedding 本身也是一個 trait:
#[async_trait]
pub trait EmbeddingProvider: Send + Sync {
fn name(&self) -> &str;
fn dimensions(&self) -> usize;
async fn embed(&self, texts: &[&str]) -> anyhow::Result<Vec<Vec<f32>>>;
// 唯一的預設方法:批次 embed 的單一版本
async fn embed_one(&self, text: &str) -> anyhow::Result<Vec<f32>> {
let mut results = self.embed(&[text]).await?;
results.pop().ok_or_else(|| anyhow!("Empty embedding result"))
}
}
兩個具體實作:
NoopEmbedding:dimensions() = 0,回傳空向量。啟用後自動降級為純關鍵字搜尋。
OpenAiEmbedding:打 /v1/embeddings endpoint。支援自訂 base URL,用 custom: 前綴指定:
pub fn create_embedding_provider(provider: &str, api_key: ..., model: &str, dims: usize) {
match provider {
"openai" => Box::new(OpenAiEmbedding::new("https://api.openai.com", ...)),
name if name.starts_with("custom:") => {
let base_url = name.strip_prefix("custom:").unwrap_or("");
Box::new(OpenAiEmbedding::new(base_url, ...)) // 指向 Ollama、本地推理伺服器等
}
_ => Box::new(NoopEmbedding),
}
}
MarkdownMemory:只能追加的審計日誌
MarkdownMemory 的 forget() 永遠回傳 false——這個後端是只能追加的,設計上就是個審計日誌:
workspace/MEMORY.md ← Core 類別
workspace/memory/2026-02-19.md ← Daily 類別(按日期分檔)
每次 store() 就在對應的 Markdown 檔尾端追加一行 - **key**: content。recall() 則是把查詢拆成關鍵字,計算每筆記憶裡出現了幾個,算出比例當作分數。
雖然搜尋能力差,但好處是人類可讀、可以直接用 Git 追蹤,也容易備份。
LucidMemory:橋接外部 CLI
LucidMemory 是一個橋接層,SQLite 是主要儲存,外部 lucid-memory CLI 是輔助:
pub struct LucidMemory {
local: SqliteMemory,
recall_timeout: Duration, // 預設 500ms
store_timeout: Duration, // 預設 800ms
local_hit_threshold: usize, // 預設 3:本地結果夠多就不問 Lucid
failure_cooldown: Duration, // 任何錯誤後冷卻 15 秒
last_failure_at: Mutex<Option<Instant>>,
}
store() 先寫 SQLite,再 fire-and-forget 同步到 Lucid。recall() 的策略:先問 SQLite,如果結果數量低於 local_hit_threshold 且 Lucid 不在冷卻期,才去呼叫 lucid context <query>,然後合併結果。
去重用的是一個 NUL 字元分隔的簽名:
let signature = format!("{}\u{0}{}", entry.key.to_lowercase(), entry.content.to_lowercase());
Lucid 的輸出是自訂的 XML-like 格式:
<lucid-context>
- [decision] Use token refresh middleware
- [context] Working in src/auth.rs
</lucid-context>
方括號裡的標籤(decision、context、bug、learning)會被映射回 MemoryCategory。
Agent 的靈魂:Snapshot 與冷啟動恢復
最讓我覺得有趣的設計是「靈魂快照」機制。
在每次記憶體清理(hygiene)時,如果啟用了 snapshot_on_hygiene,所有 Core 類別的記憶都會被 dump 到工作目錄的 MEMORY_SNAPSHOT.md。這個檔案是 Git 可追蹤的純文字。
更厲害的是冷啟動恢復:如果 brain.db 不存在(例如全新部署、或資料庫損毀刪除),但 MEMORY_SNAPSHOT.md 存在,factory 會在開啟後端之前先呼叫 hydrate_from_snapshot(),把 Markdown 裡的記憶逐筆還原回 SQLite。
graph LR
A["brain.db 存在?"] -->|是| B[正常開啟 SqliteMemory]
A -->|否| C["MEMORY_SNAPSHOT.md 存在?"]
C -->|是| D[hydrate_from_snapshot]
D --> E[重新開啟 SqliteMemory]
C -->|否| F[全新 SqliteMemory]
B --> G[完成]
E --> G
F --> G
每次清理也會刪掉 MemoryCategory::Conversation 類別超過 conversation_retention_days 的舊紀錄——對話上下文是短暫的,不該無限累積。
在 Agent Loop 裡的角色
Memory 在每輪對話的三個時機被使用:
sequenceDiagram
participant U as 使用者
participant AL as Agent Loop
participant M as Memory
participant LLM as LLM
U->>AL: 送出訊息
AL->>M: recall(user_msg, 5)
M-->>AL: 相關記憶(score >= 0.4 才納入)
AL->>LLM: [Memory context]\n- key: content\n...\n{user_msg}
LLM-->>AL: 回應
AL->>M: store("user_msg_{uuid}", msg, Conversation)
AL->>M: store("assistant_resp_{uuid}", summary[:100], Daily)
AL-->>U: 最終答覆
注意幾個細節:
- 相關度門檻:
score >= 0.4 才注入,低分記憶不進 context,避免雜訊干擾 LLM。
- 自動儲存:使用者訊息存
Conversation 類別(會被清理),助理回應只存前 100 字存 Daily 類別(會保留較久)。
- LLM 直接操控記憶:
memory_store、memory_recall、memory_forget 這三個工具也被當成普通的 Tool 掛載到 agent 上,LLM 自己可以主動讀寫記憶。
小結
zeroclaw Memory 設計幾個值得注意的地方:
- 全部必填,沒有預設:每種後端的搜尋策略差太多,預設實作沒有意義。
- SQLite 就夠用:FTS5 + 向量暴力搜尋在本地 agent 的規模不需要 pgvector,單一
.db 檔沒有外部依賴。
- 三層 fallback 搜尋:Hybrid(FTS5 + 向量)→ BM25 only →
LIKE 兜底,優雅降級。
- SHA-256 快取,LRU 在 SQL 裡做:快取邏輯不依賴記憶體狀態,重啟後仍然有效。
- 靈魂快照:
MEMORY_SNAPSHOT.md 是 Git 可追蹤的,讓 agent 可以從純文字重建長期記憶,冷啟動不代表失憶。
- Conversation 記憶會被清理:對話上下文是短暫的,設計上就不打算永久保留。
zeroclaw 的 Tool 是 agent 採取行動的核心機制——執行 shell 指令、讀寫檔案、瀏覽網頁、呼叫 HTTP API、操作記憶體、甚至把任務委派給另一個 sub-agent。目前實作了 30+ 種工具。這篇記錄它如何用一個乾淨的 trait 統一所有工具,以及工具的組裝、dispatch、schema 正規化、安全注入等設計。
整個抽象的核心在 src/tools/traits.rs,只有四個方法是必須實作的:
#[async_trait]
pub trait Tool: Send + Sync {
fn name(&self) -> &str;
fn description(&self) -> &str;
fn parameters_schema(&self) -> serde_json::Value;
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult>;
// 預設實作:把前三個方法組裝成 ToolSpec,免費附贈
fn spec(&self) -> ToolSpec {
ToolSpec {
name: self.name().to_string(),
description: self.description().to_string(),
parameters: self.parameters_schema(),
}
}
}
幾個設計決策值得注意:
Send + Sync:讓工具可以存進 Arc<dyn Tool> 並在 thread 之間共享,必要條件。
async fn execute:所有工具都可能有 I/O,統一用 async,不需要區分同步/非同步工具。
- 參數是
serde_json::Value:不是每個工具一個強型別 struct,而是執行時從 JSON 取值。省去了大量 boilerplate,代價是錯誤在執行時才出現,而不是編譯時。
spec() 是預設方法:把 name、description、parameters_schema 這三個方法的結果組裝成 ToolSpec,新增工具完全不需要自己實作這個。
核心資料型別
三個關鍵型別:
// 每次執行的回傳值
pub struct ToolResult {
pub success: bool,
pub output: String,
pub error: Option<String>,
}
// 送給 LLM 描述「這個工具能做什麼」的規格書
pub struct ToolSpec {
pub name: String,
pub description: String,
pub parameters: serde_json::Value, // JSON Schema 物件
}
注意 execute 的錯誤處理設計:正常的執行失敗(找不到檔案、路徑不允許)用 ToolResult { success: false, error: Some(...) } 回傳;只有程式本身的 bug 或不可恢復的錯誤才回傳 anyhow::Result::Err。這讓 agent loop 可以把執行失敗的結果繼續送給 LLM 讓它決定下一步,而不是直接炸掉整個流程。
參數 schema 是用 serde_json::json!() 手寫 JSON Schema,沒有 proc macro 或 derive:
fn parameters_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The shell command to execute"
},
"approved": {
"type": "boolean",
"description": "Set true to explicitly approve medium/high-risk commands",
"default": false
}
},
"required": ["command"]
})
}
工具的組裝:工廠函式,不靠反射
工具不是自動發現的,而是在 src/tools/mod.rs 裡用工廠函式顯式組裝:
// 最小集合,給測試和簡單 agent 用
pub fn default_tools(security: Arc<SecurityPolicy>) -> Vec<Box<dyn Tool>> {
vec![
Box::new(ShellTool::new(security.clone(), runtime)),
Box::new(FileReadTool::new(security.clone())),
Box::new(FileWriteTool::new(security)),
]
}
// 完整工具集,根據設定條件啟用
pub fn all_tools_with_runtime(config, security, runtime, memory, ...) -> Vec<Box<dyn Tool>> {
let mut tools: Vec<Box<dyn Tool>> = vec![
Box::new(ShellTool::new(...)),
Box::new(FileReadTool::new(...)),
Box::new(CronAddTool::new(...)),
Box::new(MemoryStoreTool::new(...)),
// ...
];
// 條件啟用:設定決定工具集,不是全部預設開啟
if browser_config.enabled {
tools.push(Box::new(BrowserTool::new_with_backend(...)));
}
if http_config.enabled {
tools.push(Box::new(HttpRequestTool::new(...)));
}
if !agents.is_empty() {
tools.push(Box::new(DelegateTool::new(agents, ...)));
}
tools
}
這是工廠/建構器模式——沒有反射、沒有 inventory 這類 compile-time 自動收集 crate、沒有 #[register_tool] 巨集。好處是依賴關係一目了然,壞處是每加一個工具都要手動登記。對這個規模的 codebase 來說是對的取捨。
兩種 Dispatch 模式
工具的呼叫有兩條路,取決於 LLM provider 是否支援原生 function calling:
graph TD
A[LLM 回應] --> B{支援原生 Tool Calling?}
B -->|是| C[NativeToolDispatcher]
B -->|否| D[XmlToolDispatcher]
C -->|解析 API 的 tool_calls 欄位| E[ParsedToolCall]
D -->|解析文字裡的 XML tag| E
E --> F["find_tool(name)"]
F --> G["tool.execute(args)"]
G --> H[ToolResult]
H --> I[轉成 ConversationMessage]
I --> J[送回 LLM 繼續下一輪]
NativeToolDispatcher 用於 Anthropic、OpenAI、Gemini——這些 provider 會在 API response 裡回傳結構化的工具呼叫,直接解析即可。
XmlToolDispatcher 用於不支援原生 function calling 的 LLM。此時 zeroclaw 把工具的說明注入到 system prompt,要求 LLM 用特定格式回應:
<tool_call>{"name": "shell", "arguments": {"command": "ls -la"}}</tool_call>
然後用字串解析從回應文字裡抓出這些 XML tag。
Agent loop 查找工具的方式很簡單:
fn find_tool<'a>(tools: &'a [Box<dyn Tool>], name: &str) -> Option<&'a dyn Tool> {
tools.iter().find(|t| t.name() == name).map(|t| t.as_ref())
}
送給 LLM 的格式(以 OpenAI 為例):
fn tools_to_openai_format(tools: &[Box<dyn Tool>]) -> Vec<serde_json::Value> {
tools.iter().map(|tool| {
json!({
"type": "function",
"function": {
"name": tool.name(),
"description": tool.description(),
"parameters": tool.parameters_schema()
}
})
}).collect()
}
JSON Schema 的跨 Provider 正規化
各家 LLM API 對 JSON Schema 的支援程度差異很大,尤其是 Gemini——它會拒絕很多標準的 JSON Schema 關鍵字。src/tools/schema.rs 的 SchemaCleanr 專門處理這件事:
pub enum CleaningStrategy {
Gemini, // 最嚴格
Anthropic,
OpenAI, // 最寬鬆
Conservative, // 保守策略,適合未知 provider
}
pub struct SchemaCleanr;
impl SchemaCleanr {
pub fn clean_for_gemini(schema: Value) -> Value { ... }
pub fn clean_for_anthropic(schema: Value) -> Value { ... }
pub fn clean_for_openai(schema: Value) -> Value { ... }
}
各策略的主要差異:
| 關鍵字 |
Gemini |
Anthropic |
OpenAI |
minLength、pattern |
移除 |
移除 |
保留 |
$ref |
內聯展開 |
內聯展開 |
保留 |
additionalProperties |
移除 |
保留 |
保留 |
anyOf/oneOf |
攤平為第一個型別 |
保留 |
保留 |
nullable |
轉換格式 |
保留 |
保留 |
Gemini 甚至連 anyOf: [type: string, type: null](nullable 的常見寫法)都要特別轉換。這些邊緣情況全部藏在 SchemaCleanr 裡,工具本身完全不需要知道。
安全性是注入的,不是全域的
zeroclaw 的安全設計有個核心原則:安全策略是建構時注入的,不是全域的靜態狀態。
// 每個有 I/O 的工具都在建構時收到 SecurityPolicy
pub struct FileReadTool {
security: Arc<SecurityPolicy>,
}
pub struct ShellTool {
security: Arc<SecurityPolicy>,
runtime: Arc<dyn RuntimeAdapter>,
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
let path = args.get("path").and_then(|v| v.as_str())...;
// 安全檢查在 execute 裡,繞不過去
if !self.security.is_path_allowed(path) {
return Ok(ToolResult { success: false, error: Some("Path not allowed".into()), ... });
}
// 繼續執行...
}
SecurityPolicy 強制執行工作區沙盒(防止讀寫工作目錄以外的路徑)、自主等級(ReadOnly、Supervised、Full)、rate limiting 和指令白名單。
Supervised 模式下還有 ApprovalManager,在執行高風險工具前暫停等待使用者確認:
[zeroclaw] Tool: shell
Command: rm -rf /tmp/build
[y]es / [n]o / [a]lways: _
選 always 之後,這個指令會被加進 session allowlist,下次遇到同樣的指令就不再詢問。
有趣的具體工具
DelegateTool 是最有趣的工具之一——它的 parameters_schema() 是執行時動態生成的,而不是靜態寫死的:
pub struct DelegateTool {
agents: Arc<HashMap<String, DelegateAgentConfig>>,
depth: u32, // 防止無限委派遞迴
}
fn parameters_schema(&self) -> serde_json::Value {
// 根據目前設定的 agent 列表動態生成 schema
let agent_names: Vec<&str> = self.agents.keys().map(|s| s.as_str()).collect();
json!({
"properties": {
"agent": {
"type": "string",
"description": format!("Which agent to delegate to. Available: {}",
agent_names.join(", "))
},
"task": { "type": "string", "description": "Task description for the agent" }
}
})
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
if self.depth >= MAX_DELEGATION_DEPTH {
return Ok(ToolResult { success: false, error: Some("Max delegation depth reached"), ... });
}
let provider = create_provider(&agent_config.provider, ...)?;
// 120 秒 timeout,防止 sub-agent 跑太久
tokio::time::timeout(Duration::from_secs(120),
provider.chat_with_system(...)).await
}
depth 欄位防止 agent A 委派給 agent B、B 又委派回 A 的無限迴圈。
ShellTool 的安全設計很謹慎——它清空整個環境變數,只保留一個安全白名單:
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
// 清除所有環境變數,防止 API key 洩漏給子行程
let mut cmd = Command::new("sh");
cmd.env_clear(); // 全部清空
for key in SAFE_ENV_VARS { // 只加回白名單裡的
if let Ok(val) = std::env::var(key) {
cmd.env(key, val);
}
}
// 60 秒 timeout,截斷輸出到 1MB
tokio::time::timeout(Duration::from_secs(60), cmd.output()).await
}
SAFE_ENV_VARS 包含 PATH、HOME、LANG 這類系統必要的變數,但不包含任何 *_API_KEY、*_SECRET 這類敏感變數。即使主行程有這些環境變數,子行程也看不到。
sequenceDiagram
participant U as 使用者
participant AL as Agent Loop
participant P as Provider
participant LLM as LLM API
participant T as Tool
U->>AL: 送出訊息
AL->>P: tools_to_provider_format(specs)
P->>LLM: chat request + tool definitions
LLM-->>P: 回應(含 tool calls)
P-->>AL: ChatResponse { tool_calls: [...] }
loop 最多 10 次
AL->>AL: find_tool(name)
AL->>AL: ApprovalManager(Supervised 模式)
AL->>T: execute(args)
T->>T: SecurityPolicy 檢查
T-->>AL: ToolResult
AL->>P: 把結果轉成 ConversationMessage
P->>LLM: 繼續對話(含工具執行結果)
LLM-->>P: 新回應
alt 不再呼叫工具
P-->>AL: 純文字回應
AL-->>U: 最終答覆
end
end
整個流程最多迴圈 10 次(DEFAULT_MAX_TOOL_ITERATIONS)。超過就強制停止,防止 agent 無止境地呼叫工具。
小結
zeroclaw 的 Tool 設計幾個值得借鑑的地方:
- 最小介面:四個必填方法,
spec() 是免費的預設實作,新增工具的門檻很低。
- 工廠函式,不靠反射:顯式組裝、條件啟用,依賴關係清晰,沒有魔法。
- 兩種 Dispatch 兼顧:原生 API tool calling 和 XML prompt 注入都支援,對各種 LLM 都能用。
- Schema 正規化是一等公民:
SchemaCleanr 把各家 API 的奇葩限制集中處理,工具本身不受污染。
- 安全是建構時注入的:
SecurityPolicy 在 new() 時就進去了,不是全域狀態,也無法繞過。
- 執行失敗不等於程式崩潰:
ToolResult { success: false } 讓 agent 可以從失敗中學習,繼續嘗試。