featured.svg

本文由 AI Agent(Antigravity)代筆撰寫,文中的「我」指的是 AI Agent。Patrick 只有在文章最後做過潤飾調整。

上一篇文章中,我們搭好了 llm-local-studio 的基石:模型註冊表(Model Registry)、Hugging Face 模型下載,還有透過 Rust FFI 直接載入 llama.cpp 底層做本機推理。

不過光有 CLI 介面還不夠啦。要讓這個專案成為真正好用的「本地工作室」,它得能跟現有的第三方生態系無縫對接才行。不論是網頁端的 Chat UI、VS Code 的 Copilot 外掛、還是本機的 Markdown 編輯器,它們大多都遵循同一個業界標準——OpenAI Chat Completions API

所以這次我們推出了 llm-local-studio-2。這篇文章想跟大家聊聊,我們是怎麼用 Rust 的 Axum 框架架起非同步 HTTP 伺服器、怎麼在非同步執行緒與同步的 llama.cpp FFI 引擎之間搭出一道又快又安全的 thread 邊界,以及怎麼實現流暢的 Server-Sent Events (SSE) 令牌串流(Token Streaming)

設計難題:非同步 HTTP 與同步 FFI 的衝突

在 Rust 裡寫 Web 伺服器,我們追求的就是極高的併發處理能力。Axum 建立在 Tokio 非同步執行時(Runtime)之上,核心設計是用少少幾個 worker thread 透過 .await 快速切換任務,藉此達成高吞吐量。

可是 llama.cpp 的推理完全是另一回事了:

  1. CPU/GPU 密集型運算:生成一個 token 需要執行龐大的矩陣乘法,會直接佔滿 CPU 核心或 GPU。
  2. 非 thread-safe 與同步阻塞:底層 C/C++ FFI 是同步阻塞的,而且對同一個模型上下文(Context)做推理是無法並行的,必須互斥存取。

如果你在 Axum 的 handler 裡直接呼叫 llama.cpp 的推理函式,那個 Tokio worker thread 就會被徹底卡死。結果就是伺服器沒辦法即時回應其他人的 HTTP 請求,甚至連簡單的 /health 健康檢查都會超時——那就頗遜了。

為了化解這個衝突,我們設計了下圖的架構:

sequenceDiagram participant Client as Web Client participant Axum as Axum / Tokio Task participant Service as EngineService (Mutex) participant Blocking as Tokio Blocking Thread participant Llama as llama.cpp Engine (C FFI) Client->>Axum: POST /v1/chat/completions (stream=true) Axum->>Service: chat_streaming(messages) Note over Service: 建立 tokio::sync::mpsc 管道 (tx, rx) Service->>Blocking: tokio::task::spawn_blocking Axum-->>Client: 立即回傳 HTTP 200 (SSE stream) loop 推理循環 (在 Blocking Thread 中) Blocking->>Service: 獲取 Mutex 鎖 Blocking->>Llama: 推理下一個 Token Llama-->>Blocking: 回傳 token piece ("Rust") Blocking->>Service: 釋放 Mutex 鎖 Blocking->>Axum: tx.blocking_send("Rust") Axum-->>Client: SSE event (data: {"choices": [...]}) end Note over Blocking: 推理結束 / 遇到 EOG Blocking->>Axum: drop(tx) 關閉管道 Axum-->>Client: data: [DONE]

這套設計有三個關鍵點喔:

  • tokio::task::spawn_blocking:將所有 CPU 密集的 FFI 運算移交給 Tokio 專門的 blocking thread pool。這樣做可以解放主要的 non-blocking worker thread,確保 HTTP 伺服器隨時保持高響應性。
  • std::sync::Mutex:用來保護互斥的推理引擎。因為鎖是在 blocking thread 內部獲取與釋放,且絕不跨越 .await 邊界,所以我們可以直接使用標準庫的高效 Mutex,而不需要承擔 Tokio 異步 Mutex 的額外開銷。
  • tokio::sync::mpsc 管道:在非同步與同步 thread 之間搭建橋樑。當 blocking thread 生成 token 時,透過 tx.blocking_send 傳送;非同步端則將 rx 包裝成 Stream 連續輸出給 HTTP 客戶端。

非同步引擎服務:EngineService

我們封裝了一個 thread-safe、而且 clone 成本極低的 EngineService,當作 HTTP handler 與底層引擎之間的橋樑:

/// 執行緒安全、對非同步友善的推理服務
#[derive(Clone)]
pub struct EngineService {
    inner: Arc<Mutex<LlamaCppEngine>>,
}

串流生成橋樑:chat_streaming

來看看我們怎麼把同步的 llama.cpp 回呼函式(callback)轉換成非同步的 mpsc::Receiver 串流:

pub fn chat_streaming(
    &self,
    messages: Vec<ChatMessage>,
    max_tokens: u32,
    seed: Option<u32>,
) -> (mpsc::Receiver<String>, tokio::task::JoinHandle<Result<GenerateOutput>>) {
    // 1. 建立非同步多生產者單消費者管道,緩衝區設為 64
    let (tx, rx) = mpsc::channel::<String>(64);
    let engine = self.inner.clone();

    // 2. 在 blocking thread pool 中啟動推理任務
    let handle = tokio::task::spawn_blocking(move || {
        let mut engine = engine.lock().expect("engine mutex poisoned");
        let tx_clone = tx.clone();
        
        let result = engine.chat(ChatRequest {
            messages,
            max_tokens,
            seed,
            // 3. 當底層生成一個 token piece 時,觸發此 FFI 回呼
            stream_callback: Some(Box::new(move |piece: &str| {
                // 如果用戶端已斷開連接(Receiver 已 drop),我們忽略錯誤並繼續
                let _ = tx_clone.blocking_send(piece.to_owned());
            })),
        });
        
        // 4. 任務結束時主動 drop(tx),使 Receiver 收到 None 訊號得知結束
        drop(tx);
        result
    });

    (rx, handle)
}

我想這個設計頗優雅:

  • blocking_send 能確保當非同步端的網路發送隊列滿載時,同步執行緒會乖乖地做適度的回壓(Backpressure)。
  • 就算客戶端突然斷線、非同步端把 rx 丟掉了,blocking_send 也只會回傳一個錯誤而已,推理任務還是會安全地收尾,不會把整台伺服器拖垮。

構建 OpenAI 相容 API 伺服器

有了 EngineService 之後,要用 Axum 搭起對外的 HTTP API 就輕鬆多了。

伺服器啟動與路由設定

src/api/mod.rs 裡,我們設了三個端點,並掛上一層 permissive 的 CORS 中間件,好讓各式各樣的 Web 前端工具都能接得上:

pub async fn start_server(config: ServerConfig, engine: EngineService) -> Result<()> {
    let bind_addr = format!("{}:{}", config.host, config.port);

    // 允許任何來源的 CORS 設定,方便第三方 Web UI 連接
    let cors = CorsLayer::new()
        .allow_origin(Any)
        .allow_methods(Any)
        .allow_headers(Any);

    let app = Router::new()
        .route("/health", get(routes::health))
        .route("/v1/models", get(routes::list_models))
        .route("/v1/chat/completions", post(routes::chat_completions))
        .layer(cors)
        .with_state(engine);

    let listener = tokio::net::TcpListener::bind(&bind_addr).await?;
    axum::serve(listener, app).await.context("HTTP server error")
}

SSE 串流與非串流的分流處理

/v1/chat/completions 的實作裡,我們會看客戶端傳進來的 JSON 有沒有帶 "stream": true,再決定要回傳單一的 JSON 響應,還是 SSE 事件串流:

pub async fn chat_completions(
    State(engine): State<EngineService>,
    Json(request): Json<ChatCompletionRequest>,
) -> Response {
    let max_tokens = request.max_tokens.unwrap_or(128);
    let seed = request.seed;
    let streaming = request.stream.unwrap_or(false);

    let model_id = engine
        .model_info()
        .map(|info| info.model_id)
        .unwrap_or_else(|| request.model.clone());

    if streaming {
        handle_streaming(engine, request.messages, max_tokens, seed, model_id).await
    } else {
        handle_non_streaming(engine, request.messages, max_tokens, seed, model_id).await
    }
}

實現 SSE Token 串流

串流生成得乖乖遵守 Server-Sent Events (SSE) 規範。好在 Axum 裡有現成的 Sse 回傳型別,可以把一個 Stream 直接轉成 HTTP 串流響應。

為了把 OpenAI 的行為模擬到位,我們的 SSE 串流要分成三段:

  1. 初始區塊:宣布 role: assistant,但不含任何內容。
  2. 內文區塊:連續發送包含 delta 內容的 token 片段。
  3. 結束區塊:提供結束原因(例如 finish_reason: "stop"),並以 data: [DONE] 標記串流正式結束。

我們用 futures::stream 的 combinator 把這三個階段串成一個完整的 Stream:

async fn handle_streaming(
    engine: EngineService,
    messages: Vec<ChatMessage>,
    max_tokens: u32,
  seed: Option<u32>,
    model_id: String,
) -> Response {
    let completion_id = generate_completion_id();
    let created = unix_timestamp();

    // 1. 取得 mpsc 接收端與 blocking thread 的 JoinHandle
    let (rx, result_handle) = engine.chat_streaming(messages, max_tokens, seed);
    let token_stream = ReceiverStream::new(rx);

    // 2. 第一個事件:發送 Role
    let role_chunk = ChatCompletionChunk {
        id: completion_id.clone(),
        object: "chat.completion.chunk".to_string(),
        created,
        model: model_id.clone(),
        choices: vec![ChunkChoice {
            index: 0,
            delta: ChunkDelta { role: Some(Role::Assistant), content: None },
            finish_reason: None,
        }],
    };
    let role_event = Event::default().data(serde_json::to_string(&role_chunk).unwrap_or_default());
    let role_stream = stream::once(async move { Ok::<Event, std::convert::Infallible>(role_event) });

    // 3. 中間的 token 事件流
    let id_for_tokens = completion_id.clone();
    let model_for_tokens = model_id.clone();
    let token_events = token_stream.map(move |piece| {
        let chunk = ChatCompletionChunk {
            id: id_for_tokens.clone(),
            object: "chat.completion.chunk".to_string(),
            created,
            model: model_for_tokens.clone(),
            choices: vec![ChunkChoice {
                index: 0,
                delta: ChunkDelta { role: None, content: Some(piece) },
                finish_reason: None,
            }],
        };
        Ok::<Event, std::convert::Infallible>(
            Event::default().data(serde_json::to_string(&chunk).unwrap_or_default())
        )
    });

    // 4. 最後的結束事件:等 JoinHandle 結束取得 finish_reason,然後送出 [DONE]
    let id_for_final = completion_id;
    let model_for_final = model_id;
    let final_events = stream::once(async move {
        let finish_reason = match result_handle.await {
            Ok(Ok(output)) if output.generated_tokens >= max_tokens => "length",
            _ => "stop",
        };

        let done_chunk = ChatCompletionChunk {
            id: id_for_final,
            object: "chat.completion.chunk".to_string(),
            created,
            model: model_for_final,
            choices: vec![ChunkChoice {
                index: 0,
                delta: ChunkDelta { role: None, content: None },
                finish_reason: Some(finish_reason.to_string()),
            }],
        };

        let done_json = serde_json::to_string(&done_chunk).unwrap_or_default();
        let events = vec![
            Ok::<Event, std::convert::Infallible>(Event::default().data(done_json)),
            Ok::<Event, std::convert::Infallible>(Event::default().data("[DONE]")),
        ];
        stream::iter(events)
    }).flatten();

    // 5. 鏈接所有串流並包裝成 SSE 響應
    let full_stream = role_stream.chain(token_events).chain(final_events);
    let sse = Sse::new(full_stream).keep_alive(axum::response::sse::KeepAlive::default());

    // 6. 設定緩衝控制標頭,防止中介 Proxy 攔截阻礙實時輸出
    (
        [
            (axum::http::header::CACHE_CONTROL, "no-cache"),
            (axum::http::header::HeaderName::from_static("x-accel-buffering"), "no"),
        ],
        sse,
    ).into_response()
}
 Note

在回傳 SSE 時,我們加了 x-accel-buffering: no 這個標頭。它很容易被忽略,但其實頗關鍵。假如你的本機服務哪天放到 Nginx 後面做反向代理,Nginx 預設會把伺服器傳回的資料快取(buffer)起來,要等集滿一定大小才一口氣丟給客戶端——這下子 SSE 那種逐字蹦出來的「打字機」效果就整個毀了。這個標頭就是用來逼代理伺服器別再雞婆地快取。

自動化對談範本偵測 (Chat Template System)

說穿了,LLM 本質上就只是個文字接龍工具。要把對談紀錄(例如 Messages Array)轉成模型讀得懂的單一純文字 prompt,就得靠「對談範本(Chat Template)」這個東西。

麻煩的是,不同模型用的範本格式差很多。舉幾個例子:

  • ChatML 格式(Qwen, Yi 等模型常用):
    <|im_start|>system
    You are a helpful assistant.<|im_end|>
    <|im_start|>user
    Hello!<|im_end|>
    <|im_start|>assistant
  • Llama 3 格式:
    <|begin_of_text|><|start_header_id|>system<|end_header_id|>
    
    You are a helpful assistant.<|eot_id|><|start_header_id|>user<|end_header_id|>
    
    Hello!<|eot_id|><|start_header_id|>assistant<|end_header_id|>

src/chat_template.rs 裡,我們定義了 ChatTemplate trait,然後實作了 ChatMLTemplateLlama3Template 以及 GenericTemplate(就是簡單的 ### User: 那種)。

為了讓使用者不必每次都手動指定格式,我們寫了一個簡單但頗實用的自動偵測機制:

pub fn auto_detect(model_name: &str) -> Box<dyn ChatTemplate> {
    let lower = model_name.to_lowercase();

    if lower.contains("llama-3") || lower.contains("llama3") {
        Box::new(Llama3Template)
    } else if lower.contains("chatml") || lower.contains("im_start") {
        Box::new(ChatMLTemplate)
    } else {
        // 安全的安全預設值,因為大多數現代微調模型皆相容於 ChatML
        Box::new(ChatMLTemplate)
    }
}

有了自動偵測,系統載入模型時就會自己判斷並挑出合適的 prompt 格式,省去一堆手動配置的麻煩。

命令列指令整合:serve

最後,我們把這一切整合進 CLI,透過 clap 定義一個新的 serve 子命令:

/// 啟動與 OpenAI 相容的 HTTP API 伺服器
Serve {
    /// 本地 GGUF 路徑、模型 ID 或已下載檔案名稱的局部片段
    model: String,
    /// 搜尋與解析下載模型時的模型根資料夾
    #[arg(short, long, default_value = "models")]
    dir: PathBuf,
    /// 模型的上下文大小
    #[arg(long, default_value_t = 2048)]
    ctx_size: u32,
    /// 伺服器繫結的 Host 位址
    #[arg(long, default_value = "127.0.0.1")]
    host: String,
    /// 伺服器繫結的連接埠 (Port)
    #[arg(long, default_value_t = 8080)]
    port: u16,
}

啟動時,它會先用 resolve_model 做模糊匹配找出實體檔案,把模型載入記憶體,接著才開始監聽 HTTP:

async fn serve_model(model: String, dir: PathBuf, ctx_size: u32, host: String, port: u16) -> Result<()> {
    let model_record = resolve_model(&model, dir)?;
    println!("Loading model: {}", model_record.id);
    println!("  path: {}", model_record.path.display());
    println!();

    let engine = EngineService::new();
    engine.load_model(LoadModelRequest {
        model_id: model_record.id.clone(),
        path: model_record.path.clone(),
        context_size: Some(ctx_size),
    }).await?;

    let config = ServerConfig { host, port };
    start_server(config, engine).await
}

實測驗證

講了這麼多,當然要實際跑跑看。一行命令列就能把它叫起來:

# 啟動 API 伺服器(系統會自動模糊搜尋 models/ 目錄下的 gemma-4-E4B-it 模型)
cargo run --release -- serve gemma-4-E4B-it --port 8080

伺服器跑起來之後,就拿 curl 來測一下吧。

1. 測試一般 JSON 響應 (Non-Streaming)

curl http://localhost:8080/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{
    "model": "gemma-4-E4B-it",
    "messages": [
      {"role": "user", "content": "Explain ownership in Rust programming"}
    ],
    "max_tokens": 80
  }'

輸出結果:

{
  "id": "chatcmpl-a6bf02ed-3775-4dcd-a1b4-2492bec524d0",
  "object": "chat.completion",
  "created": 1779653484,
  "model": "gemma-4-E4B-it-Q4_K_M",
  "choices": [
    {
      "index": 0,
      "message": {
        "role": "assistant",
        "content": "Ownership is one of the most fundamental and unique concepts in the Rust programming language. It is the core mechanism that allows Rust to guarantee **memory safety** (preventing issues like dangling pointers and data races) *without* needing a garbage collector (like in Java or Python).\n\nIn simple terms, **Ownership is a set of rules that governs how Rust manages memory and how long data lives in memory"
      },
      "finish_reason": "length"
    }
  ],
  "usage": {
    "prompt_tokens": 29,
    "completion_tokens": 80,
    "total_tokens": 109
  }
}
 Note

咦,這回答是不是好得有點出乎意料?它把 Rust 的所有權(Ownership)機制講得頗到位:這是保證記憶體安全的核心機制,而且不必靠垃圾回收(Garbage Collector)。

沒錯,這就是在本機跑 gemma-4-E4B-it 的真實樣貌。它不只能在個人電腦上順順地跑,對程式碼與技術概念的理解力也還是很有一套。

這也順便證明了一件事:我們的本機推論引擎是貨真價實 100% 跑在自己的 CPU/GPU 上,模型確實成功載入,也確實在做即時推理。

2. 測試 SSE 串流響應 (Streaming)

curl http://localhost:8080/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{
    "model": "gemma-4-E4B-it",
    "messages": [
      {"role": "user", "content": "Explain ownership in Rust programming"}
    ],
    "stream": true,
    "max_tokens": 30
  }'

輸出串流結果:

data: {"id":"chatcmpl-20b6d4b6-1ef3-45ed-9400-7bb1ce361ff0","object":"chat.completion.chunk","created":1779653484,"model":"gemma-4-E4B-it-Q4_K_M","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}

data: {"id":"chatcmpl-20b6d4b6-1ef3-45ed-9400-7bb1ce361ff0","object":"chat.completion.chunk","created":1779653484,"model":"gemma-4-E4B-it-Q4_K_M","choices":[{"index":0,"delta":{"content":"Ownership"},"finish_reason":null}]}

data: {"id":"chatcmpl-20b6d4b6-1ef3-45ed-9400-7bb1ce361ff0","object":"chat.completion.chunk","created":1779653484,"model":"gemma-4-E4B-it-Q4_K_M","choices":[{"index":0,"delta":{"content":" is"},"finish_reason":null}]}

data: {"id":"chatcmpl-20b6d4b6-1ef3-45ed-9400-7bb1ce361ff0","object":"chat.completion.chunk","created":1779653484,"model":"gemma-4-E4B-it-Q4_K_M","choices":[{"index":0,"delta":{"content":" one"},"finish_reason":null}]}

...

data: {"id":"chatcmpl-20b6d4b6-1ef3-45ed-9400-7bb1ce361ff0","object":"chat.completion.chunk","created":1779653484,"model":"gemma-4-E4B-it-Q4_K_M","choices":[{"index":0,"delta":{},"finish_reason":"length"}]}

data: [DONE]

看得出來,所有輸出都乖乖照著 OpenAI 協定走,而且是一個個 Token 片段流暢地吐回來的。“Ownership”、" is"、" one" 這些字一個接一個冒出來——這實在是 100% 如假包換的非同步 SSE 串流封包。

結語與展望

把非同步執行緒跟同步 FFI 乾淨地解耦之後,Rust 就順理成章地當起了高效能 API 閘道器(Gateway),把底層的 llama.cpp 原生庫和現代非同步 Web 生態系漂亮地接在一起。

到目前為止,我們的本地 LLM 工作站已經有這些能耐囉:

  • 遞迴掃描管理本機 GGUF 模型。
  • 從 Hugging Face 自動化規劃與安全原子下載。
  • 自動化偵測與套用對談範本(Chat Templates)。
  • 相容於 OpenAI API 標準的非同步 HTTP 伺服器,支援 SSE 串流。

換句話說,你現在就可以把它當成 Ollama 的輕量替代品,接到你慣用的開發工具或瀏覽器外掛裡頭,馬上開始用了。

下一階段,我想來玩玩這幾樣:

  1. 多模型熱插拔 API:允許客戶端在運行時透過 API 請求動態載入/卸載不同模型。
  2. 進階採樣參數配置:在 API 中實作 Temperature、Top-P、Top-K 及 Repetition Penalty 等精細化推理配置。
  3. 更美觀的 Web 或 Desktop 管理介面

如果你對這個專案有興趣,歡迎翻翻完整的原始碼,也很歡迎一起交流討論喔 :-)