zeroclaw 的 Channel 負責接收與傳送訊息,支援 Telegram、Discord、Slack、Matrix、iMessage、Signal、IRC、Email 等 14 種平台。這篇記錄它如何用一個乾淨的 trait 把這些平台統一起來,以及 runtime 如何把訊息分派給 LLM 處理。

Trait 設計:三個必填,其餘自選

Channel trait 定義在 src/channels/traits.rs,只有三個方法是必須實作的:

#[async_trait]
pub trait Channel: Send + Sync {
    fn name(&self) -> &str;
    async fn send(&self, message: &SendMessage) -> anyhow::Result<()>;
    async fn listen(&self, tx: tokio::sync::mpsc::Sender<ChannelMessage>) -> anyhow::Result<()>;

    // 以下都有預設空實作(no-op)
    async fn health_check(&self) -> bool { true }
    async fn start_typing(&self, _recipient: &str) -> anyhow::Result<()> { Ok(()) }
    async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> { Ok(()) }

    // 串流草稿更新(目前只有 Telegram 實作)
    fn supports_draft_updates(&self) -> bool { false }
    async fn send_draft(&self, _message: &SendMessage) -> anyhow::Result<Option<String>> { Ok(None) }
    async fn update_draft(&self, _recipient: &str, _message_id: &str, _text: &str) -> anyhow::Result<()> { Ok(()) }
    async fn finalize_draft(&self, _recipient: &str, _message_id: &str, _text: &str) -> anyhow::Result<()> { Ok(()) }
}

最有趣的是 listen 的簽名:它拿到一個 mpsc::Sender<ChannelMessage>,然後把收到的訊息推進去——而不是回呼(callback)。這是 push-based observer pattern,讓所有平台的訊息都匯聚到同一條 mpsc 管道,runtime 只需要從一個地方消費。


訊息型別

進來的訊息(ChannelMessage)和出去的訊息(SendMessage)分開定義:

pub struct ChannelMessage {
    pub id: String,           // 去重用的 ID
    pub sender: String,       // 發送者識別碼
    pub reply_target: String, // 要回覆的目標(chat_id、channel_id 等)
    pub content: String,      // 正規化後的純文字內容
    pub channel: String,      // "telegram" | "discord" | "slack" | ...
    pub timestamp: u64,
}

pub struct SendMessage {
    pub content: String,
    pub recipient: String,        // 平台專屬的目標識別碼
    pub subject: Option<String>,  // Email 用的主旨
}

各平台的 sender/reply_target 格式天差地別(Telegram 是數字 chat_id、Discord 是 snowflake ID、Email 是郵件地址),但對上層 runtime 來說一律是字串,不需要知道細節。


14 個平台,一個介面

src/channels/mod.rs 列出了目前支援的所有 channel:

分類 平台 底層協議
終端 CLI stdin/stdout
即時通訊 Telegram HTTP long-polling
Discord WebSocket Gateway
Slack HTTP polling
Mattermost WebSocket / HTTP
WhatsApp Meta Cloud API webhook
iMessage macOS osascript
Matrix matrix-sdk 同步
Signal signal-cli HTTP daemon
IRC TCP/TLS
企業通訊 Lark/Feishu WebSocket + Protobuf
DingTalk WebSocket
QQ HTTP API
電子郵件 Email SMTP + IMAP

每個 channel 一個 struct,實作 Channel trait,差異只在各自的 struct 欄位和協議處理。


有趣的實作細節

Discord:手刻 Gateway WebSocket

Discord 沒有用任何 Discord SDK,而是用 tokio-tungstenite 手刻了整個 Gateway 協議:

  1. 先打 REST 拿 Gateway URL
  2. 建立 WebSocket 連線
  3. 走完 Hello → Identify → READY 握手流程
  4. 另開一個 task 發心跳(heartbeat)
  5. 處理各種 opcode:1(心跳請求)、7(重連)、9(session 失效)

這樣做的好處是二進位體積可控、依賴少,但代價是要自己維護協議細節。

Telegram:串流草稿更新

Telegram 是唯一實作 supports_draft_updates() 的 channel,可以做到「LLM 邊生成邊更新訊息」的效果:

sequenceDiagram participant U as 使用者 participant TG as Telegram participant A as Agent participant LLM as LLM U->>TG: 送出訊息 TG->>A: ChannelMessage A->>TG: send_draft("...") → message_id A->>+LLM: 串流請求 loop 每隔 N ms LLM-->>A: delta token A->>TG: update_draft(message_id, 累積文字) Note over TG,U: 使用者看到訊息逐漸變長 end LLM-->>-A: 生成完畢 A->>TG: finalize_draft(message_id, 最終格式)

這個效果類似 ChatGPT 網頁版的逐字輸出,但是在 Telegram 上實現。

Lark/Feishu:Protobuf Frame

Lark 的 WebSocket 協議不是 JSON,而是用 prost(Rust 的 protobuf 實作)解析自訂的 PbFrame 格式,區分 method=0(ping/pong)和 method=1(實際事件)。是這 14 個 channel 裡協議最複雜的一個。


Runtime:把一切串起來

所有 channel 都跑起來之後,start_channels() 做的事情:

  1. 一條共用 mpsc:所有 channel 的 listen 都推到同一個 Sender<ChannelMessage>
  2. 監控重啟(supervised restart):每個 listen 都包在一個重試迴圈裡,失敗後指數退避重試
  3. 信號量限流的 worker pool:從 mpsc 讀訊息,用 Semaphore 限制同時在飛的請求數
graph LR TG[Telegram] -->|listen| Q[(mpsc queue)] DC[Discord] -->|listen| Q SL[Slack] -->|listen| Q MT[Matrix ...] -->|listen| Q Q --> W1[Worker] Q --> W2[Worker] Q --> W3[Worker] W1 --> LLM[LLM Provider] W2 --> LLM W3 --> LLM
// 監控重啟
loop {
    let result = ch.listen(tx.clone()).await;
    // 正常退出 → 重置退避;錯誤 → 等待後重試
    tokio::time::sleep(Duration::from_secs(backoff)).await;
    backoff = backoff.saturating_mul(2).min(max_backoff);
}

// 信號量限流
let semaphore = Arc::new(Semaphore::new(max_in_flight));
while let Some(msg) = rx.recv().await {
    let permit = semaphore.acquire_owned().await?;
    workers.spawn(async move {
        let _permit = permit;  // drop 時自動釋放
        process_channel_message(worker_ctx, msg).await;
    });
}

最大並發數是 channel 數量 × 4,夾在 8 到 64 之間。

每則訊息的處理流程

process_channel_message() 做的事遠不只是「丟給 LLM」:

  1. 攔截 runtime 指令(/models/model)— 支援不重啟切換 LLM provider
  2. 從 per-sender 對話歷史取得上下文(最多 50 輪)
  3. 從記憶體召回相關內容(向量相似度搜尋)
  4. 依平台注入特殊指令(例如告訴 Telegram 版的 LLM 可以用 [IMAGE:path] 語法發圖)
  5. 如果支援 draft update,先發 "..." 佔位,再串流更新
  6. 送進 LLM + tool call 迴圈(最多 300 秒 timeout)
  7. 把結果存回對話歷史,回覆給使用者

Gateway vs Channel

zeroclaw 有兩種接收訊息的模式,不要搞混:

  • Channelstart_channels):agent 主動出去建立連線(WebSocket、long-polling),agent 是 client。
  • Gatewaysrc/gateway/):用 axum 開一個 HTTP server,等平台打 webhook 進來,agent 是 server。
graph TB subgraph channel_mode["Channel 模式(Agent 主動連出)"] direction LR A1[Agent] <-->|WebSocket / long-polling| P1["Telegram / Discord\nSlack / Matrix ..."] end subgraph gateway_mode["Gateway 模式(等 Webhook 打入)"] direction LR P2["WhatsApp\n(Meta Cloud API)"] -->|HTTP POST + HMAC 驗簽| A2["Agent\n(axum HTTP server)"] end

WhatsApp(Meta Cloud API)就是用 Gateway 模式,因為 Meta 只支援 webhook,不提供讓你長連的協議。Gateway 還內建了滑動視窗的 rate limiter、HMAC-SHA256 簽名驗證和請求去重(idempotency store)。


小結

zeroclaw Channel 設計的幾個值得學習的地方:

  • Push to mpsc,不用 callbacklisten 只管推訊息,runtime 只管消費,兩邊完全解耦。
  • Supervised restart 是一等公民:每個 channel 的生命週期都有人顧,不怕某個平台偶爾斷線。
  • Draft updates 是 opt-in 擴充:預設 no-op,只有真的需要的平台實作,不污染介面。
  • Gateway 和 Channel 分開:推(長連)和拉(webhook)兩種模式有各自的架構,互不混淆。