zeroclaw 的 Channel 負責接收與傳送訊息,支援 Telegram、Discord、Slack、Matrix、iMessage、Signal、IRC、Email 等 14 種平台。這篇記錄它如何用一個乾淨的 trait 把這些平台統一起來,以及 runtime 如何把訊息分派給 LLM 處理。
Trait 設計:三個必填,其餘自選
Channel trait 定義在 src/channels/traits.rs,只有三個方法是必須實作的:
#[async_trait]
pub trait Channel: Send + Sync {
fn name(&self) -> &str;
async fn send(&self, message: &SendMessage) -> anyhow::Result<()>;
async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()>;
// 以下都有預設空實作(no-op)
async fn health_check(&self) -> bool { true }
async fn start_typing(&self, _recipient: &str) -> anyhow::Result<()> { Ok(()) }
async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> { Ok(()) }
// 串流草稿更新(目前只有 Telegram 實作)
fn supports_draft_updates(&self) -> bool { false }
async fn send_draft(&self, _message: &SendMessage) -> anyhow::Result<Option<String>> { Ok(None) }
async fn update_draft(&self, _recipient: &str, _message_id: &str, _text: &str) -> anyhow::Result<()> { Ok(()) }
async fn finalize_draft(&self, _recipient: &str, _message_id: &str, _text: &str) -> anyhow::Result<()> { Ok(()) }
}最有趣的是 listen 的簽名:它拿到一個 mpsc::Sender<ChannelMessage>,然後把收到的訊息推進去——而不是回呼(callback)。這是 push-based observer pattern,讓所有平台的訊息都匯聚到同一條 mpsc 管道,runtime 只需要從一個地方消費。
訊息型別
進來的訊息(ChannelMessage)和出去的訊息(SendMessage)分開定義:
pub struct ChannelMessage {
pub id: String, // 去重用的 ID
pub sender: String, // 發送者識別碼
pub reply_target: String, // 要回覆的目標(chat_id、channel_id 等)
pub content: String, // 正規化後的純文字內容
pub channel: String, // "telegram" | "discord" | "slack" | ...
pub timestamp: u64,
}
pub struct SendMessage {
pub content: String,
pub recipient: String, // 平台專屬的目標識別碼
pub subject: Option<String>, // Email 用的主旨
}各平台的 sender/reply_target 格式天差地別(Telegram 是數字 chat_id、Discord 是 snowflake ID、Email 是郵件地址),但對上層 runtime 來說一律是字串,不需要知道細節。
14 個平台,一個介面
src/channels/mod.rs 列出了目前支援的所有 channel:
| 分類 | 平台 | 底層協議 |
|---|---|---|
| 終端 | CLI | stdin/stdout |
| 即時通訊 | Telegram | HTTP long-polling |
| Discord | WebSocket Gateway | |
| Slack | HTTP polling | |
| Mattermost | WebSocket / HTTP | |
| Meta Cloud API webhook | ||
| iMessage | macOS osascript |
|
| Matrix | matrix-sdk 同步 | |
| Signal | signal-cli HTTP daemon | |
| IRC | TCP/TLS | |
| 企業通訊 | Lark/Feishu | WebSocket + Protobuf |
| DingTalk | WebSocket | |
| HTTP API | ||
| 電子郵件 | SMTP + IMAP |
每個 channel 一個 struct,實作 Channel trait,差異只在各自的 struct 欄位和協議處理。
有趣的實作細節
Discord:手刻 Gateway WebSocket
Discord 沒有用任何 Discord SDK,而是用 tokio-tungstenite 手刻了整個 Gateway 協議:
- 先打 REST 拿 Gateway URL
- 建立 WebSocket 連線
- 走完
Hello → Identify → READY握手流程 - 另開一個 task 發心跳(heartbeat)
- 處理各種 opcode:1(心跳請求)、7(重連)、9(session 失效)
這樣做的好處是二進位體積可控、依賴少,但代價是要自己維護協議細節。
Telegram:串流草稿更新
Telegram 是唯一實作 supports_draft_updates() 的 channel,可以做到「LLM 邊生成邊更新訊息」的效果:
這個效果類似 ChatGPT 網頁版的逐字輸出,但是在 Telegram 上實現。
Lark/Feishu:Protobuf Frame
Lark 的 WebSocket 協議不是 JSON,而是用 prost(Rust 的 protobuf 實作)解析自訂的 PbFrame 格式,區分 method=0(ping/pong)和 method=1(實際事件)。是這 14 個 channel 裡協議最複雜的一個。
Runtime:把一切串起來
所有 channel 都跑起來之後,start_channels() 做的事情:
- 一條共用 mpsc:所有 channel 的
listen都推到同一個Sender<ChannelMessage> - 監控重啟(supervised restart):每個
listen都包在一個重試迴圈裡,失敗後指數退避重試 - 信號量限流的 worker pool:從 mpsc 讀訊息,用
Semaphore限制同時在飛的請求數
// 監控重啟
loop {
let result = ch.listen(tx.clone()).await;
// 正常退出 → 重置退避;錯誤 → 等待後重試
tokio::time::sleep(Duration::from_secs(backoff)).await;
backoff = backoff.saturating_mul(2).min(max_backoff);
}
// 信號量限流
let semaphore = Arc::new(Semaphore::new(max_in_flight));
while let Some(msg) = rx.recv().await {
let permit = semaphore.acquire_owned().await?;
workers.spawn(async move {
let _permit = permit; // drop 時自動釋放
process_channel_message(worker_ctx, msg).await;
});
}最大並發數是 channel 數量 × 4,夾在 8 到 64 之間。
每則訊息的處理流程
process_channel_message() 做的事遠不只是「丟給 LLM」:
- 攔截 runtime 指令(
/models、/model)— 支援不重啟切換 LLM provider - 從 per-sender 對話歷史取得上下文(最多 50 輪)
- 從記憶體召回相關內容(向量相似度搜尋)
- 依平台注入特殊指令(例如告訴 Telegram 版的 LLM 可以用
[IMAGE:path]語法發圖) - 如果支援 draft update,先發
"..."佔位,再串流更新 - 送進 LLM + tool call 迴圈(最多 300 秒 timeout)
- 把結果存回對話歷史,回覆給使用者
Gateway vs Channel
zeroclaw 有兩種接收訊息的模式,不要搞混:
- Channel(
start_channels):agent 主動出去建立連線(WebSocket、long-polling),agent 是 client。 - Gateway(
src/gateway/):用 axum 開一個 HTTP server,等平台打 webhook 進來,agent 是 server。
WhatsApp(Meta Cloud API)就是用 Gateway 模式,因為 Meta 只支援 webhook,不提供讓你長連的協議。Gateway 還內建了滑動視窗的 rate limiter、HMAC-SHA256 簽名驗證和請求去重(idempotency store)。
小結
zeroclaw Channel 設計的幾個值得學習的地方:
- Push to mpsc,不用 callback:
listen只管推訊息,runtime 只管消費,兩邊完全解耦。 - Supervised restart 是一等公民:每個 channel 的生命週期都有人顧,不怕某個平台偶爾斷線。
- Draft updates 是 opt-in 擴充:預設 no-op,只有真的需要的平台實作,不污染介面。
- Gateway 和 Channel 分開:推(長連)和拉(webhook)兩種模式有各自的架構,互不混淆。