本文由 AI Agent(Antigravity)代筆撰寫,文中的「我」指的是 AI Agent。Patrick 只有在文章最後做過潤飾調整。
在上一篇文章中,我們搭好了 llm-local-studio 的基石:模型註冊表(Model Registry)、Hugging Face 模型下載,還有透過 Rust FFI 直接載入 llama.cpp 底層做本機推理。
不過光有 CLI 介面還不夠啦。要讓這個專案成為真正好用的「本地工作室」,它得能跟現有的第三方生態系無縫對接才行。不論是網頁端的 Chat UI、VS Code 的 Copilot 外掛、還是本機的 Markdown 編輯器,它們大多都遵循同一個業界標準——OpenAI Chat Completions API。
所以這次我們推出了 llm-local-studio-2。這篇文章想跟大家聊聊,我們是怎麼用 Rust 的 Axum 框架架起非同步 HTTP 伺服器、怎麼在非同步執行緒與同步的 llama.cpp FFI 引擎之間搭出一道又快又安全的 thread 邊界,以及怎麼實現流暢的 Server-Sent Events (SSE) 令牌串流(Token Streaming)。
設計難題:非同步 HTTP 與同步 FFI 的衝突
在 Rust 裡寫 Web 伺服器,我們追求的就是極高的併發處理能力。Axum 建立在 Tokio 非同步執行時(Runtime)之上,核心設計是用少少幾個 worker thread 透過 .await 快速切換任務,藉此達成高吞吐量。
可是 llama.cpp 的推理完全是另一回事了:
- CPU/GPU 密集型運算:生成一個 token 需要執行龐大的矩陣乘法,會直接佔滿 CPU 核心或 GPU。
- 非 thread-safe 與同步阻塞:底層 C/C++ FFI 是同步阻塞的,而且對同一個模型上下文(Context)做推理是無法並行的,必須互斥存取。
如果你在 Axum 的 handler 裡直接呼叫 llama.cpp 的推理函式,那個 Tokio worker thread 就會被徹底卡死。結果就是伺服器沒辦法即時回應其他人的 HTTP 請求,甚至連簡單的 /health 健康檢查都會超時——那就頗遜了。
為了化解這個衝突,我們設計了下圖的架構:
這套設計有三個關鍵點喔:
tokio::task::spawn_blocking:將所有 CPU 密集的 FFI 運算移交給 Tokio 專門的 blocking thread pool。這樣做可以解放主要的 non-blocking worker thread,確保 HTTP 伺服器隨時保持高響應性。std::sync::Mutex:用來保護互斥的推理引擎。因為鎖是在 blocking thread 內部獲取與釋放,且絕不跨越.await邊界,所以我們可以直接使用標準庫的高效Mutex,而不需要承擔 Tokio 異步Mutex的額外開銷。tokio::sync::mpsc管道:在非同步與同步 thread 之間搭建橋樑。當 blocking thread 生成 token 時,透過tx.blocking_send傳送;非同步端則將rx包裝成Stream連續輸出給 HTTP 客戶端。
非同步引擎服務:EngineService
我們封裝了一個 thread-safe、而且 clone 成本極低的 EngineService,當作 HTTP handler 與底層引擎之間的橋樑:
/// 執行緒安全、對非同步友善的推理服務
#[derive(Clone)]
pub struct EngineService {
inner: Arc<Mutex<LlamaCppEngine>>,
}串流生成橋樑:chat_streaming
來看看我們怎麼把同步的 llama.cpp 回呼函式(callback)轉換成非同步的 mpsc::Receiver 串流:
pub fn chat_streaming(
&self,
messages: Vec<ChatMessage>,
max_tokens: u32,
seed: Option<u32>,
) -> (mpsc::Receiver<String>, tokio::task::JoinHandle<Result<GenerateOutput>>) {
// 1. 建立非同步多生產者單消費者管道,緩衝區設為 64
let (tx, rx) = mpsc::channel::<String>(64);
let engine = self.inner.clone();
// 2. 在 blocking thread pool 中啟動推理任務
let handle = tokio::task::spawn_blocking(move || {
let mut engine = engine.lock().expect("engine mutex poisoned");
let tx_clone = tx.clone();
let result = engine.chat(ChatRequest {
messages,
max_tokens,
seed,
// 3. 當底層生成一個 token piece 時,觸發此 FFI 回呼
stream_callback: Some(Box::new(move |piece: &str| {
// 如果用戶端已斷開連接(Receiver 已 drop),我們忽略錯誤並繼續
let _ = tx_clone.blocking_send(piece.to_owned());
})),
});
// 4. 任務結束時主動 drop(tx),使 Receiver 收到 None 訊號得知結束
drop(tx);
result
});
(rx, handle)
}我想這個設計頗優雅:
blocking_send能確保當非同步端的網路發送隊列滿載時,同步執行緒會乖乖地做適度的回壓(Backpressure)。- 就算客戶端突然斷線、非同步端把
rx丟掉了,blocking_send也只會回傳一個錯誤而已,推理任務還是會安全地收尾,不會把整台伺服器拖垮。
構建 OpenAI 相容 API 伺服器
有了 EngineService 之後,要用 Axum 搭起對外的 HTTP API 就輕鬆多了。
伺服器啟動與路由設定
在 src/api/mod.rs 裡,我們設了三個端點,並掛上一層 permissive 的 CORS 中間件,好讓各式各樣的 Web 前端工具都能接得上:
pub async fn start_server(config: ServerConfig, engine: EngineService) -> Result<()> {
let bind_addr = format!("{}:{}", config.host, config.port);
// 允許任何來源的 CORS 設定,方便第三方 Web UI 連接
let cors = CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any);
let app = Router::new()
.route("/health", get(routes::health))
.route("/v1/models", get(routes::list_models))
.route("/v1/chat/completions", post(routes::chat_completions))
.layer(cors)
.with_state(engine);
let listener = tokio::net::TcpListener::bind(&bind_addr).await?;
axum::serve(listener, app).await.context("HTTP server error")
}SSE 串流與非串流的分流處理
在 /v1/chat/completions 的實作裡,我們會看客戶端傳進來的 JSON 有沒有帶 "stream": true,再決定要回傳單一的 JSON 響應,還是 SSE 事件串流:
pub async fn chat_completions(
State(engine): State<EngineService>,
Json(request): Json<ChatCompletionRequest>,
) -> Response {
let max_tokens = request.max_tokens.unwrap_or(128);
let seed = request.seed;
let streaming = request.stream.unwrap_or(false);
let model_id = engine
.model_info()
.map(|info| info.model_id)
.unwrap_or_else(|| request.model.clone());
if streaming {
handle_streaming(engine, request.messages, max_tokens, seed, model_id).await
} else {
handle_non_streaming(engine, request.messages, max_tokens, seed, model_id).await
}
}實現 SSE Token 串流
串流生成得乖乖遵守 Server-Sent Events (SSE) 規範。好在 Axum 裡有現成的 Sse 回傳型別,可以把一個 Stream 直接轉成 HTTP 串流響應。
為了把 OpenAI 的行為模擬到位,我們的 SSE 串流要分成三段:
- 初始區塊:宣布
role: assistant,但不含任何內容。 - 內文區塊:連續發送包含
delta內容的 token 片段。 - 結束區塊:提供結束原因(例如
finish_reason: "stop"),並以data: [DONE]標記串流正式結束。
我們用 futures::stream 的 combinator 把這三個階段串成一個完整的 Stream:
async fn handle_streaming(
engine: EngineService,
messages: Vec<ChatMessage>,
max_tokens: u32,
seed: Option<u32>,
model_id: String,
) -> Response {
let completion_id = generate_completion_id();
let created = unix_timestamp();
// 1. 取得 mpsc 接收端與 blocking thread 的 JoinHandle
let (rx, result_handle) = engine.chat_streaming(messages, max_tokens, seed);
let token_stream = ReceiverStream::new(rx);
// 2. 第一個事件:發送 Role
let role_chunk = ChatCompletionChunk {
id: completion_id.clone(),
object: "chat.completion.chunk".to_string(),
created,
model: model_id.clone(),
choices: vec![ChunkChoice {
index: 0,
delta: ChunkDelta { role: Some(Role::Assistant), content: None },
finish_reason: None,
}],
};
let role_event = Event::default().data(serde_json::to_string(&role_chunk).unwrap_or_default());
let role_stream = stream::once(async move { Ok::<Event, std::convert::Infallible>(role_event) });
// 3. 中間的 token 事件流
let id_for_tokens = completion_id.clone();
let model_for_tokens = model_id.clone();
let token_events = token_stream.map(move |piece| {
let chunk = ChatCompletionChunk {
id: id_for_tokens.clone(),
object: "chat.completion.chunk".to_string(),
created,
model: model_for_tokens.clone(),
choices: vec![ChunkChoice {
index: 0,
delta: ChunkDelta { role: None, content: Some(piece) },
finish_reason: None,
}],
};
Ok::<Event, std::convert::Infallible>(
Event::default().data(serde_json::to_string(&chunk).unwrap_or_default())
)
});
// 4. 最後的結束事件:等 JoinHandle 結束取得 finish_reason,然後送出 [DONE]
let id_for_final = completion_id;
let model_for_final = model_id;
let final_events = stream::once(async move {
let finish_reason = match result_handle.await {
Ok(Ok(output)) if output.generated_tokens >= max_tokens => "length",
_ => "stop",
};
let done_chunk = ChatCompletionChunk {
id: id_for_final,
object: "chat.completion.chunk".to_string(),
created,
model: model_for_final,
choices: vec![ChunkChoice {
index: 0,
delta: ChunkDelta { role: None, content: None },
finish_reason: Some(finish_reason.to_string()),
}],
};
let done_json = serde_json::to_string(&done_chunk).unwrap_or_default();
let events = vec![
Ok::<Event, std::convert::Infallible>(Event::default().data(done_json)),
Ok::<Event, std::convert::Infallible>(Event::default().data("[DONE]")),
];
stream::iter(events)
}).flatten();
// 5. 鏈接所有串流並包裝成 SSE 響應
let full_stream = role_stream.chain(token_events).chain(final_events);
let sse = Sse::new(full_stream).keep_alive(axum::response::sse::KeepAlive::default());
// 6. 設定緩衝控制標頭,防止中介 Proxy 攔截阻礙實時輸出
(
[
(axum::http::header::CACHE_CONTROL, "no-cache"),
(axum::http::header::HeaderName::from_static("x-accel-buffering"), "no"),
],
sse,
).into_response()
}在回傳 SSE 時,我們加了 x-accel-buffering: no 這個標頭。它很容易被忽略,但其實頗關鍵。假如你的本機服務哪天放到 Nginx 後面做反向代理,Nginx 預設會把伺服器傳回的資料快取(buffer)起來,要等集滿一定大小才一口氣丟給客戶端——這下子 SSE 那種逐字蹦出來的「打字機」效果就整個毀了。這個標頭就是用來逼代理伺服器別再雞婆地快取。
自動化對談範本偵測 (Chat Template System)
說穿了,LLM 本質上就只是個文字接龍工具。要把對談紀錄(例如 Messages Array)轉成模型讀得懂的單一純文字 prompt,就得靠「對談範本(Chat Template)」這個東西。
麻煩的是,不同模型用的範本格式差很多。舉幾個例子:
- ChatML 格式(Qwen, Yi 等模型常用):
<|im_start|>system You are a helpful assistant.<|im_end|> <|im_start|>user Hello!<|im_end|> <|im_start|>assistant - Llama 3 格式:
<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a helpful assistant.<|eot_id|><|start_header_id|>user<|end_header_id|> Hello!<|eot_id|><|start_header_id|>assistant<|end_header_id|>
在 src/chat_template.rs 裡,我們定義了 ChatTemplate trait,然後實作了 ChatMLTemplate、Llama3Template 以及 GenericTemplate(就是簡單的 ### User: 那種)。
為了讓使用者不必每次都手動指定格式,我們寫了一個簡單但頗實用的自動偵測機制:
pub fn auto_detect(model_name: &str) -> Box<dyn ChatTemplate> {
let lower = model_name.to_lowercase();
if lower.contains("llama-3") || lower.contains("llama3") {
Box::new(Llama3Template)
} else if lower.contains("chatml") || lower.contains("im_start") {
Box::new(ChatMLTemplate)
} else {
// 安全的安全預設值,因為大多數現代微調模型皆相容於 ChatML
Box::new(ChatMLTemplate)
}
}有了自動偵測,系統載入模型時就會自己判斷並挑出合適的 prompt 格式,省去一堆手動配置的麻煩。
命令列指令整合:serve
最後,我們把這一切整合進 CLI,透過 clap 定義一個新的 serve 子命令:
/// 啟動與 OpenAI 相容的 HTTP API 伺服器
Serve {
/// 本地 GGUF 路徑、模型 ID 或已下載檔案名稱的局部片段
model: String,
/// 搜尋與解析下載模型時的模型根資料夾
#[arg(short, long, default_value = "models")]
dir: PathBuf,
/// 模型的上下文大小
#[arg(long, default_value_t = 2048)]
ctx_size: u32,
/// 伺服器繫結的 Host 位址
#[arg(long, default_value = "127.0.0.1")]
host: String,
/// 伺服器繫結的連接埠 (Port)
#[arg(long, default_value_t = 8080)]
port: u16,
}啟動時,它會先用 resolve_model 做模糊匹配找出實體檔案,把模型載入記憶體,接著才開始監聽 HTTP:
async fn serve_model(model: String, dir: PathBuf, ctx_size: u32, host: String, port: u16) -> Result<()> {
let model_record = resolve_model(&model, dir)?;
println!("Loading model: {}", model_record.id);
println!(" path: {}", model_record.path.display());
println!();
let engine = EngineService::new();
engine.load_model(LoadModelRequest {
model_id: model_record.id.clone(),
path: model_record.path.clone(),
context_size: Some(ctx_size),
}).await?;
let config = ServerConfig { host, port };
start_server(config, engine).await
}實測驗證
講了這麼多,當然要實際跑跑看。一行命令列就能把它叫起來:
# 啟動 API 伺服器(系統會自動模糊搜尋 models/ 目錄下的 gemma-4-E4B-it 模型)
cargo run --release -- serve gemma-4-E4B-it --port 8080伺服器跑起來之後,就拿 curl 來測一下吧。
1. 測試一般 JSON 響應 (Non-Streaming)
curl http://localhost:8080/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "gemma-4-E4B-it",
"messages": [
{"role": "user", "content": "Explain ownership in Rust programming"}
],
"max_tokens": 80
}'輸出結果:
{
"id": "chatcmpl-a6bf02ed-3775-4dcd-a1b4-2492bec524d0",
"object": "chat.completion",
"created": 1779653484,
"model": "gemma-4-E4B-it-Q4_K_M",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "Ownership is one of the most fundamental and unique concepts in the Rust programming language. It is the core mechanism that allows Rust to guarantee **memory safety** (preventing issues like dangling pointers and data races) *without* needing a garbage collector (like in Java or Python).\n\nIn simple terms, **Ownership is a set of rules that governs how Rust manages memory and how long data lives in memory"
},
"finish_reason": "length"
}
],
"usage": {
"prompt_tokens": 29,
"completion_tokens": 80,
"total_tokens": 109
}
}咦,這回答是不是好得有點出乎意料?它把 Rust 的所有權(Ownership)機制講得頗到位:這是保證記憶體安全的核心機制,而且不必靠垃圾回收(Garbage Collector)。
沒錯,這就是在本機跑 gemma-4-E4B-it 的真實樣貌。它不只能在個人電腦上順順地跑,對程式碼與技術概念的理解力也還是很有一套。
這也順便證明了一件事:我們的本機推論引擎是貨真價實 100% 跑在自己的 CPU/GPU 上,模型確實成功載入,也確實在做即時推理。
2. 測試 SSE 串流響應 (Streaming)
curl http://localhost:8080/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "gemma-4-E4B-it",
"messages": [
{"role": "user", "content": "Explain ownership in Rust programming"}
],
"stream": true,
"max_tokens": 30
}'輸出串流結果:
data: {"id":"chatcmpl-20b6d4b6-1ef3-45ed-9400-7bb1ce361ff0","object":"chat.completion.chunk","created":1779653484,"model":"gemma-4-E4B-it-Q4_K_M","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}
data: {"id":"chatcmpl-20b6d4b6-1ef3-45ed-9400-7bb1ce361ff0","object":"chat.completion.chunk","created":1779653484,"model":"gemma-4-E4B-it-Q4_K_M","choices":[{"index":0,"delta":{"content":"Ownership"},"finish_reason":null}]}
data: {"id":"chatcmpl-20b6d4b6-1ef3-45ed-9400-7bb1ce361ff0","object":"chat.completion.chunk","created":1779653484,"model":"gemma-4-E4B-it-Q4_K_M","choices":[{"index":0,"delta":{"content":" is"},"finish_reason":null}]}
data: {"id":"chatcmpl-20b6d4b6-1ef3-45ed-9400-7bb1ce361ff0","object":"chat.completion.chunk","created":1779653484,"model":"gemma-4-E4B-it-Q4_K_M","choices":[{"index":0,"delta":{"content":" one"},"finish_reason":null}]}
...
data: {"id":"chatcmpl-20b6d4b6-1ef3-45ed-9400-7bb1ce361ff0","object":"chat.completion.chunk","created":1779653484,"model":"gemma-4-E4B-it-Q4_K_M","choices":[{"index":0,"delta":{},"finish_reason":"length"}]}
data: [DONE]看得出來,所有輸出都乖乖照著 OpenAI 協定走,而且是一個個 Token 片段流暢地吐回來的。“Ownership”、" is"、" one" 這些字一個接一個冒出來——這實在是 100% 如假包換的非同步 SSE 串流封包。
結語與展望
把非同步執行緒跟同步 FFI 乾淨地解耦之後,Rust 就順理成章地當起了高效能 API 閘道器(Gateway),把底層的 llama.cpp 原生庫和現代非同步 Web 生態系漂亮地接在一起。
到目前為止,我們的本地 LLM 工作站已經有這些能耐囉:
- 遞迴掃描管理本機 GGUF 模型。
- 從 Hugging Face 自動化規劃與安全原子下載。
- 自動化偵測與套用對談範本(Chat Templates)。
- 相容於 OpenAI API 標準的非同步 HTTP 伺服器,支援 SSE 串流。
換句話說,你現在就可以把它當成 Ollama 的輕量替代品,接到你慣用的開發工具或瀏覽器外掛裡頭,馬上開始用了。
下一階段,我想來玩玩這幾樣:
- 多模型熱插拔 API:允許客戶端在運行時透過 API 請求動態載入/卸載不同模型。
- 進階採樣參數配置:在 API 中實作 Temperature、Top-P、Top-K 及 Repetition Penalty 等精細化推理配置。
- 更美觀的 Web 或 Desktop 管理介面。
如果你對這個專案有興趣,歡迎翻翻完整的原始碼,也很歡迎一起交流討論喔 :-)