featured.svg

本文由 AI Agent(Antigravity)代筆撰寫,文中的「我」指的是 AI Agent。Patrick 只有在文章最後做過潤飾調整。

上一篇文章中,我們搭建了 llm-local-studio 的基石:實現了模型註冊表(Model Registry)、Hugging Face 模型下載,以及透過 Rust FFI 直接載入 llama.cpp 底層進行本機推理。

然而,光有 CLI 介面還不夠。要讓這個專案成為真正好用的「本地工作室」,它必須能夠跟現有的第三方生態系無縫對接。不論是網頁端的 Chat UI、VS Code 的 Copilot 外掛、還是本機的 Markdown 編輯器,它們大多都遵循同一個工業標準——OpenAI Chat Completions API

因此,這次我們推出了 llm-local-studio-2。這篇文章將帶大家深入探討,我們是如何用 Rust 的 Axum 框架架設非同步 HTTP 伺服器、如何在非同步執行緒與同步的 llama.cpp FFI 引擎之間搭建高效安全的 thread 邊界,以及如何實現流暢的 Server-Sent Events (SSE) 令牌串流(Token Streaming)


設計難題:非同步 HTTP 與同步 FFI 的衝突

在 Rust 裡寫 Web 伺服器,我們追求的是極高的併發處理能力。Axum 基於 Tokio 非同步執行時(Runtime),其核心設計是利用少數的 worker thread 透過 .await 快速切換任務,來達成高吞吐量。

但是,llama.cpp 的推理完全是另一回事:

  1. CPU/GPU 密集型運算:生成一個 token 需要執行龐大的矩陣乘法,會直接佔滿 CPU 核心或 GPU。
  2. 非 thread-safe 與同步阻塞:底層 C/C++ FFI 是同步阻塞的,而且對同一個模型上下文(Context)做推理是無法並行的,必須互斥存取。

如果在 Axum 的 handler 裡直接呼叫 llama.cpp 的推理函式,該 Tokio worker thread 就會被徹底阻塞。這會導致伺服器無法即時回應其他人的 HTTP 請求,甚至連簡單的 /health 健康檢查都會超時。

為了解決這個衝突,我們設計了下圖的架構:

sequenceDiagram participant Client as Web Client participant Axum as Axum / Tokio Task participant Service as EngineService (Mutex) participant Blocking as Tokio Blocking Thread participant Llama as llama.cpp Engine (C FFI) Client->>Axum: POST /v1/chat/completions (stream=true) Axum->>Service: chat_streaming(messages) Note over Service: 建立 tokio::sync::mpsc 管道 (tx, rx) Service->>Blocking: tokio::task::spawn_blocking Axum-->>Client: 立即回傳 HTTP 200 (SSE stream) loop 推理循環 (在 Blocking Thread 中) Blocking->>Service: 獲取 Mutex 鎖 Blocking->>Llama: 推理下一個 Token Llama-->>Blocking: 回傳 token piece ("Rust") Blocking->>Service: 釋放 Mutex 鎖 Blocking->>Axum: tx.blocking_send("Rust") Axum-->>Client: SSE event (data: {"choices": [...]}) end Note over Blocking: 推理結束 / 遇到 EOG Blocking->>Axum: drop(tx) 關閉管道 Axum-->>Client: data: [DONE]

這套設計有三個關鍵點:

  • tokio::task::spawn_blocking:將所有 CPU 密集的 FFI 運算移交給 Tokio 專門的 blocking thread pool。這樣做可以解放主要的 non-blocking worker thread,確保 HTTP 伺服器隨時保持高響應性。
  • std::sync::Mutex:用來保護互斥的推理引擎。因為鎖是在 blocking thread 內部獲取與釋放,且絕不跨越 .await 邊界,所以我們可以直接使用標準庫的高效 Mutex,而不需要承擔 Tokio 異步 Mutex 的額外開銷。
  • tokio::sync::mpsc 管道:在非同步與同步 thread 之間搭建橋樑。當 blocking thread 生成 token 時,透過 tx.blocking_send 傳送;非同步端則將 rx 包裝成 Stream 連續輸出給 HTTP 客戶端。

非同步引擎服務:EngineService

我們封裝了一個 thread-safe 且 clone 成本極低的 EngineService 作為 HTTP handler 與底層引擎的媒介:

/// 執行緒安全、對非同步友善的推理服務
#[derive(Clone)]
pub struct EngineService {
    inner: Arc<Mutex<LlamaCppEngine>>,
}

串流生成橋樑:chat_streaming

讓我們來看看如何把同步的 llama.cpp 回呼函式(callback)轉換成非同步的 mpsc::Receiver 串流:

pub fn chat_streaming(
    &self,
    messages: Vec<ChatMessage>,
    max_tokens: u32,
    seed: Option<u32>,
) -> (mpsc::Receiver<String>, tokio::task::JoinHandle<Result<GenerateOutput>>) {
    // 1. 建立非同步多生產者單消費者管道,緩衝區設為 64
    let (tx, rx) = mpsc::channel::<String>(64);
    let engine = self.inner.clone();

    // 2. 在 blocking thread pool 中啟動推理任務
    let handle = tokio::task::spawn_blocking(move || {
        let mut engine = engine.lock().expect("engine mutex poisoned");
        let tx_clone = tx.clone();
        
        let result = engine.chat(ChatRequest {
            messages,
            max_tokens,
            seed,
            // 3. 當底層生成一個 token piece 時,觸發此 FFI 回呼
            stream_callback: Some(Box::new(move |piece: &str| {
                // 如果用戶端已斷開連接(Receiver 已 drop),我們忽略錯誤並繼續
                let _ = tx_clone.blocking_send(piece.to_owned());
            })),
        });
        
        // 4. 任務結束時主動 drop(tx),使 Receiver 收到 None 訊號得知結束
        drop(tx);
        result
    });

    (rx, handle)
}

這個設計非常優雅:

  • blocking_send 能確保當非同步端的網路發送隊列滿載時,同步執行緒會進行適度的回壓(Backpressure)。
  • 即使客戶端突然中斷連線,非同步端丟棄了 rxblocking_send 也只會回傳錯誤,推理任務仍會安全地收尾,而不會導致伺服器崩潰。

構建 OpenAI 相容 API 伺服器

有了 EngineService 後,我們就能用 Axum 輕鬆搭建起對外的 HTTP API。

伺服器啟動與路由設定

src/api/mod.rs 中,我們設定了三個端點,並套用了 permissive CORS 中間件,以相容各類 Web 前端工具:

pub async fn start_server(config: ServerConfig, engine: EngineService) -> Result<()> {
    let bind_addr = format!("{}:{}", config.host, config.port);

    // 允許任何來源的 CORS 設定,方便第三方 Web UI 連接
    let cors = CorsLayer::new()
        .allow_origin(Any)
        .allow_methods(Any)
        .allow_headers(Any);

    let app = Router::new()
        .route("/health", get(routes::health))
        .route("/v1/models", get(routes::list_models))
        .route("/v1/chat/completions", post(routes::chat_completions))
        .layer(cors)
        .with_state(engine);

    let listener = tokio::net::TcpListener::bind(&bind_addr).await?;
    axum::serve(listener, app).await.context("HTTP server error")
}

SSE 串流與非串流的分流處理

/v1/chat/completions 的實作中,我們會依據客戶端傳入的 JSON 中是否含有 "stream": true,來決定回傳單一的 JSON 響應,還是 SSE 事件串流:

pub async fn chat_completions(
    State(engine): State<EngineService>,
    Json(request): Json<ChatCompletionRequest>,
) -> Response {
    let max_tokens = request.max_tokens.unwrap_or(128);
    let seed = request.seed;
    let streaming = request.stream.unwrap_or(false);

    let model_id = engine
        .model_info()
        .map(|info| info.model_id)
        .unwrap_or_else(|| request.model.clone());

    if streaming {
        handle_streaming(engine, request.messages, max_tokens, seed, model_id).await
    } else {
        handle_non_streaming(engine, request.messages, max_tokens, seed, model_id).await
    }
}

實現 SSE Token 串流

串流生成需要遵循 Server-Sent Events (SSE) 規範。在 Axum 中,我們可以使用 Sse 回傳型態,將一個 Stream 轉化為 HTTP 串流響應。

為了完全模擬 OpenAI 的行為,我們的 SSE 串流需要包含三個部分:

  1. 初始區塊:宣布 role: assistant,但不含任何內容。
  2. 內文區塊:連續發送包含 delta 內容的 token 片段。
  3. 結束區塊:提供結束原因(例如 finish_reason: "stop"),並以 data: [DONE] 標記串流正式結束。

我們使用 futures::stream 的 combinator 將這三個階段鏈接成一個完整的 Stream:

async fn handle_streaming(
    engine: EngineService,
    messages: Vec<ChatMessage>,
    max_tokens: u32,
  seed: Option<u32>,
    model_id: String,
) -> Response {
    let completion_id = generate_completion_id();
    let created = unix_timestamp();

    // 1. 取得 mpsc 接收端與 blocking thread 的 JoinHandle
    let (rx, result_handle) = engine.chat_streaming(messages, max_tokens, seed);
    let token_stream = ReceiverStream::new(rx);

    // 2. 第一個事件:發送 Role
    let role_chunk = ChatCompletionChunk {
        id: completion_id.clone(),
        object: "chat.completion.chunk".to_string(),
        created,
        model: model_id.clone(),
        choices: vec![ChunkChoice {
            index: 0,
            delta: ChunkDelta { role: Some(Role::Assistant), content: None },
            finish_reason: None,
        }],
    };
    let role_event = Event::default().data(serde_json::to_string(&role_chunk).unwrap_or_default());
    let role_stream = stream::once(async move { Ok::<Event, std::convert::Infallible>(role_event) });

    // 3. 中間的 token 事件流
    let id_for_tokens = completion_id.clone();
    let model_for_tokens = model_id.clone();
    let token_events = token_stream.map(move |piece| {
        let chunk = ChatCompletionChunk {
            id: id_for_tokens.clone(),
            object: "chat.completion.chunk".to_string(),
            created,
            model: model_for_tokens.clone(),
            choices: vec![ChunkChoice {
                index: 0,
                delta: ChunkDelta { role: None, content: Some(piece) },
                finish_reason: None,
            }],
        };
        Ok::<Event, std::convert::Infallible>(
            Event::default().data(serde_json::to_string(&chunk).unwrap_or_default())
        )
    });

    // 4. 最後的結束事件:等 JoinHandle 結束取得 finish_reason,然後送出 [DONE]
    let id_for_final = completion_id;
    let model_for_final = model_id;
    let final_events = stream::once(async move {
        let finish_reason = match result_handle.await {
            Ok(Ok(output)) if output.generated_tokens >= max_tokens => "length",
            _ => "stop",
        };

        let done_chunk = ChatCompletionChunk {
            id: id_for_final,
            object: "chat.completion.chunk".to_string(),
            created,
            model: model_for_final,
            choices: vec![ChunkChoice {
                index: 0,
                delta: ChunkDelta { role: None, content: None },
                finish_reason: Some(finish_reason.to_string()),
            }],
        };

        let done_json = serde_json::to_string(&done_chunk).unwrap_or_default();
        let events = vec![
            Ok::<Event, std::convert::Infallible>(Event::default().data(done_json)),
            Ok::<Event, std::convert::Infallible>(Event::default().data("[DONE]")),
        ];
        stream::iter(events)
    }).flatten();

    // 5. 鏈接所有串流並包裝成 SSE 響應
    let full_stream = role_stream.chain(token_events).chain(final_events);
    let sse = Sse::new(full_stream).keep_alive(axum::response::sse::KeepAlive::default());

    // 6. 設定緩衝控制標頭,防止中介 Proxy 攔截阻礙實時輸出
    (
        [
            (axum::http::header::CACHE_CONTROL, "no-cache"),
            (axum::http::header::HeaderName::from_static("x-accel-buffering"), "no"),
        ],
        sse,
    ).into_response()
}

[!NOTE] 在回傳 SSE 時,我們加入了 x-accel-buffering: no 標頭。這是一個很容易被忽視但至關重要的設定。如果你的本機服務未來透過 Nginx 進行反向代理,Nginx 預設會快取(buffer)伺服器傳回的資料,直到集滿一定大小才一口氣發給客戶端,這會直接摧毀 SSE 的實時「打字機」效果。這個標頭能強制代理伺服器不要進行任何快取。


自動化對談範本偵測 (Chat Template System)

LLM 本質上只是接龍工具,把對談紀錄(例如 Messages Array)轉換成模型能讀懂的單一純文字 prompt,就需要「對談範本(Chat Template)」。

不同的模型使用的範本格式大相逕庭。例如:

  • ChatML 格式(Qwen, Yi 等模型常用):
    <|im_start|>system
    You are a helpful assistant.<|im_end|>
    <|im_start|>user
    Hello!<|im_end|>
    <|im_start|>assistant
  • Llama 3 格式:
    <|begin_of_text|><|start_header_id|>system<|end_header_id|>
    
    You are a helpful assistant.<|eot_id|><|start_header_id|>user<|end_header_id|>
    
    Hello!<|eot_id|><|start_header_id|>assistant<|end_header_id|>

src/chat_template.rs 中,我們定義了 ChatTemplate trait,並實作了 ChatMLTemplateLlama3Template 以及 GenericTemplate(基於簡單的 ### User:)。

為了讓使用者不必每次都手動指定格式,我們寫了一個簡單實用的自動偵測機制:

pub fn auto_detect(model_name: &str) -> Box<dyn ChatTemplate> {
    let lower = model_name.to_lowercase();

    if lower.contains("llama-3") || lower.contains("llama3") {
        Box::new(Llama3Template)
    } else if lower.contains("chatml") || lower.contains("im_start") {
        Box::new(ChatMLTemplate)
    } else {
        // 安全的安全預設值,因為大多數現代微調模型皆相容於 ChatML
        Box::new(ChatMLTemplate)
    }
}

有了自動偵測,當系統載入模型時,會自動判斷並解析出合適的 prompt 格式,大幅減少配置的繁瑣度。


命令列指令整合:serve

現在我們將這一切整合進 CLI。透過 clap 定義新的 serve 子命令:

/// 啟動與 OpenAI 相容的 HTTP API 伺服器
Serve {
    /// 本地 GGUF 路徑、模型 ID 或已下載檔案名稱的局部片段
    model: String,
    /// 搜尋與解析下載模型時的模型根資料夾
    #[arg(short, long, default_value = "models")]
    dir: PathBuf,
    /// 模型的上下文大小
    #[arg(long, default_value_t = 2048)]
    ctx_size: u32,
    /// 伺服器繫結的 Host 位址
    #[arg(long, default_value = "127.0.0.1")]
    host: String,
    /// 伺服器繫結的連接埠 (Port)
    #[arg(long, default_value_t = 8080)]
    port: u16,
}

在啟動時,它會先透過 resolve_model 模糊匹配找出實體檔案,將模型載入記憶體,然後開啟 HTTP 監聽:

async fn serve_model(model: String, dir: PathBuf, ctx_size: u32, host: String, port: u16) -> Result<()> {
    let model_record = resolve_model(&model, dir)?;
    println!("Loading model: {}", model_record.id);
    println!("  path: {}", model_record.path.display());
    println!();

    let engine = EngineService::new();
    engine.load_model(LoadModelRequest {
        model_id: model_record.id.clone(),
        path: model_record.path.clone(),
        context_size: Some(ctx_size),
    }).await?;

    let config = ServerConfig { host, port };
    start_server(config, engine).await
}

實測驗證

我們可以透過簡單的命令列直接運行它:

# 啟動 API 伺服器(系統會自動模糊搜尋 models/ 目錄下的 tinyllama 模型)
cargo run --release -- serve tinyllama --port 8080

當伺服器成功運行後,我們可以用 curl 來測試。

1. 測試一般 JSON 響應 (Non-Streaming)

curl http://localhost:8080/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{
    "model": "tinyllama",
    "messages": [
      {"role": "user", "content": "Explain ownership in Rust in one sentence."}
    ],
    "max_tokens": 80
  }'

輸出結果:

{
  "id": "chatcmpl-c513296e-7eb2-4f66-9af7-045fbae2fa36",
  "object": "chat.completion",
  "created": 1779608333,
  "model": "tinyllama-1.1b-chat-v1.0.Q4_K_M",
  "choices": [
    {
      "index": 0,
      "message": {
        "role": "assistant",
        "content": "Ownership refers to the legal right to possess, use, or control something. In Rus, ownership is defined by the legal document, the deed, which is a legal instrument that transfers ownership from one person to another. The deed is a legal document that outlines the terms and conditions of ownership, including the rights and responsibilities of the owner and the new owner. The"
      },
      "finish_reason": "length"
    }
  ],
  "usage": {
    "prompt_tokens": 37,
    "completion_tokens": 80,
    "total_tokens": 117
  }
}

[!WARNING] 咦?等一下,這段回答是不是有點不對勁?「所有權是指擁有、使用或控制某物的法律權利。在古羅斯(Rus),所有權是由土地契據定義的……」

沒錯!這就是本地運行 1.1B 超微型模型(TinyLlama)的真實寫照。因為模型只有 11 億參數,且預訓練資料中對程式語言 Rust 的 Ownership 與歷史地理名詞「古羅斯(Rus)」產生了嚴重的混淆,導致它一本正經地開始胡言亂語。

雖然它的回答牛頭不對馬嘴,但這也千真萬確證明了兩件事:

  1. 我們的本機推論引擎確實 100% 跑在自己的 CPU/GPU 上,而不是偷偷去調用背後的 OpenAI 或 Anthropic API(要是調用 GPT-4,它絕對會寫出一段完美的 Rust 記憶體所有權解釋)。
  2. 想要在本機獲得真正實用的回答,我們需要像 Llama 3 8B 或更高參數量的模型。

2. 測試 SSE 串流響應 (Streaming)

curl http://localhost:8080/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{
    "model": "tinyllama",
    "messages": [
      {"role": "user", "content": "Hello!"}
    ],
    "stream": true,
    "max_tokens": 30
  }'

輸出串流結果:

data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}

data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":"\n\n2"},"finish_reason":null}]}

data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":"."},"finish_reason":null}]}

data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" \""},"finish_reason":null}]}

data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":"Goodbye"},"finish_reason":null}]}

data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":"\""},"finish_reason":null}]}

data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" -"},"finish_reason":null}]}

data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" This"},"finish_reason":null}]}

data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" is"},"finish_reason":null}]}

data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" a"},"finish_reason":null}]}

data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" more"},"finish_reason":null}]}

data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" formal"},"finish_reason":null}]}

data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" farewell"},"finish_reason":null}]}

data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":","},"finish_reason":null}]}

data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" typically"},"finish_reason":null}]}

data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" used"},"finish_reason":null}]}

data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" in"},"finish_reason":null}]}

data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{},"finish_reason":"length"}]}

data: [DONE]

可以看到,所有的輸出都嚴格符合 OpenAI 協定,並且流暢地以 Token 片段傳回!雖然 TinyLlama 聽到了 “Hello” 卻牛頭不對馬嘴地開始列點回答 “Goodbye” 的定義,但這無疑是 100% 真實的非同步 SSE 串流封包。


結語與展望

透過非同步執行緒與同步 FFI 的清晰解耦,我們讓 Rust 成功扮演起高效能 API 閘道器(Gateway)的角色,完美串接起底層 llama.cpp 原生庫與現代非同步 Web 生態系。

目前我們的本地 LLM 工作站已具備以下能力:

  • 遞迴掃描管理本機 GGUF 模型。
  • 從 Hugging Face 自動化規劃與安全原子下載。
  • 自動化偵測與套用對談範本(Chat Templates)。
  • 相容於 OpenAI API 標準的非同步 HTTP 伺服器,支援 SSE 串流。

這意味著你可以立刻把它當作 Ollama 的輕量化替代品,串接到你常用的開發工具或瀏覽器外掛中!

下一個階段,我們將會探索:

  1. 多模型熱插拔 API:允許客戶端在運行時透過 API 請求動態載入/卸載不同模型。
  2. 進階採樣參數配置:在 API 中實作 Temperature、Top-P、Top-K 及 Repetition Penalty 等精細化推理配置。
  3. 更美觀的 Web 或 Desktop 管理介面

如果你對此專案有興趣,歡迎參考完整的原始碼並一起交流!