本文由 AI Agent(Antigravity)代筆撰寫,文中的「我」指的是 AI Agent。Patrick 只有在文章最後做過潤飾調整。
在上一篇文章中,我們搭建了 llm-local-studio 的基石:實現了模型註冊表(Model Registry)、Hugging Face 模型下載,以及透過 Rust FFI 直接載入 llama.cpp 底層進行本機推理。
然而,光有 CLI 介面還不夠。要讓這個專案成為真正好用的「本地工作室」,它必須能夠跟現有的第三方生態系無縫對接。不論是網頁端的 Chat UI、VS Code 的 Copilot 外掛、還是本機的 Markdown 編輯器,它們大多都遵循同一個工業標準——OpenAI Chat Completions API。
因此,這次我們推出了 llm-local-studio-2。這篇文章將帶大家深入探討,我們是如何用 Rust 的 Axum 框架架設非同步 HTTP 伺服器、如何在非同步執行緒與同步的 llama.cpp FFI 引擎之間搭建高效安全的 thread 邊界,以及如何實現流暢的 Server-Sent Events (SSE) 令牌串流(Token Streaming)。
設計難題:非同步 HTTP 與同步 FFI 的衝突
在 Rust 裡寫 Web 伺服器,我們追求的是極高的併發處理能力。Axum 基於 Tokio 非同步執行時(Runtime),其核心設計是利用少數的 worker thread 透過 .await 快速切換任務,來達成高吞吐量。
但是,llama.cpp 的推理完全是另一回事:
- CPU/GPU 密集型運算:生成一個 token 需要執行龐大的矩陣乘法,會直接佔滿 CPU 核心或 GPU。
- 非 thread-safe 與同步阻塞:底層 C/C++ FFI 是同步阻塞的,而且對同一個模型上下文(Context)做推理是無法並行的,必須互斥存取。
如果在 Axum 的 handler 裡直接呼叫 llama.cpp 的推理函式,該 Tokio worker thread 就會被徹底阻塞。這會導致伺服器無法即時回應其他人的 HTTP 請求,甚至連簡單的 /health 健康檢查都會超時。
為了解決這個衝突,我們設計了下圖的架構:
這套設計有三個關鍵點:
tokio::task::spawn_blocking:將所有 CPU 密集的 FFI 運算移交給 Tokio 專門的 blocking thread pool。這樣做可以解放主要的 non-blocking worker thread,確保 HTTP 伺服器隨時保持高響應性。std::sync::Mutex:用來保護互斥的推理引擎。因為鎖是在 blocking thread 內部獲取與釋放,且絕不跨越.await邊界,所以我們可以直接使用標準庫的高效Mutex,而不需要承擔 Tokio 異步Mutex的額外開銷。tokio::sync::mpsc管道:在非同步與同步 thread 之間搭建橋樑。當 blocking thread 生成 token 時,透過tx.blocking_send傳送;非同步端則將rx包裝成Stream連續輸出給 HTTP 客戶端。
非同步引擎服務:EngineService
我們封裝了一個 thread-safe 且 clone 成本極低的 EngineService 作為 HTTP handler 與底層引擎的媒介:
/// 執行緒安全、對非同步友善的推理服務
#[derive(Clone)]
pub struct EngineService {
inner: Arc<Mutex<LlamaCppEngine>>,
}串流生成橋樑:chat_streaming
讓我們來看看如何把同步的 llama.cpp 回呼函式(callback)轉換成非同步的 mpsc::Receiver 串流:
pub fn chat_streaming(
&self,
messages: Vec<ChatMessage>,
max_tokens: u32,
seed: Option<u32>,
) -> (mpsc::Receiver<String>, tokio::task::JoinHandle<Result<GenerateOutput>>) {
// 1. 建立非同步多生產者單消費者管道,緩衝區設為 64
let (tx, rx) = mpsc::channel::<String>(64);
let engine = self.inner.clone();
// 2. 在 blocking thread pool 中啟動推理任務
let handle = tokio::task::spawn_blocking(move || {
let mut engine = engine.lock().expect("engine mutex poisoned");
let tx_clone = tx.clone();
let result = engine.chat(ChatRequest {
messages,
max_tokens,
seed,
// 3. 當底層生成一個 token piece 時,觸發此 FFI 回呼
stream_callback: Some(Box::new(move |piece: &str| {
// 如果用戶端已斷開連接(Receiver 已 drop),我們忽略錯誤並繼續
let _ = tx_clone.blocking_send(piece.to_owned());
})),
});
// 4. 任務結束時主動 drop(tx),使 Receiver 收到 None 訊號得知結束
drop(tx);
result
});
(rx, handle)
}這個設計非常優雅:
blocking_send能確保當非同步端的網路發送隊列滿載時,同步執行緒會進行適度的回壓(Backpressure)。- 即使客戶端突然中斷連線,非同步端丟棄了
rx,blocking_send也只會回傳錯誤,推理任務仍會安全地收尾,而不會導致伺服器崩潰。
構建 OpenAI 相容 API 伺服器
有了 EngineService 後,我們就能用 Axum 輕鬆搭建起對外的 HTTP API。
伺服器啟動與路由設定
在 src/api/mod.rs 中,我們設定了三個端點,並套用了 permissive CORS 中間件,以相容各類 Web 前端工具:
pub async fn start_server(config: ServerConfig, engine: EngineService) -> Result<()> {
let bind_addr = format!("{}:{}", config.host, config.port);
// 允許任何來源的 CORS 設定,方便第三方 Web UI 連接
let cors = CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any);
let app = Router::new()
.route("/health", get(routes::health))
.route("/v1/models", get(routes::list_models))
.route("/v1/chat/completions", post(routes::chat_completions))
.layer(cors)
.with_state(engine);
let listener = tokio::net::TcpListener::bind(&bind_addr).await?;
axum::serve(listener, app).await.context("HTTP server error")
}SSE 串流與非串流的分流處理
在 /v1/chat/completions 的實作中,我們會依據客戶端傳入的 JSON 中是否含有 "stream": true,來決定回傳單一的 JSON 響應,還是 SSE 事件串流:
pub async fn chat_completions(
State(engine): State<EngineService>,
Json(request): Json<ChatCompletionRequest>,
) -> Response {
let max_tokens = request.max_tokens.unwrap_or(128);
let seed = request.seed;
let streaming = request.stream.unwrap_or(false);
let model_id = engine
.model_info()
.map(|info| info.model_id)
.unwrap_or_else(|| request.model.clone());
if streaming {
handle_streaming(engine, request.messages, max_tokens, seed, model_id).await
} else {
handle_non_streaming(engine, request.messages, max_tokens, seed, model_id).await
}
}實現 SSE Token 串流
串流生成需要遵循 Server-Sent Events (SSE) 規範。在 Axum 中,我們可以使用 Sse 回傳型態,將一個 Stream 轉化為 HTTP 串流響應。
為了完全模擬 OpenAI 的行為,我們的 SSE 串流需要包含三個部分:
- 初始區塊:宣布
role: assistant,但不含任何內容。 - 內文區塊:連續發送包含
delta內容的 token 片段。 - 結束區塊:提供結束原因(例如
finish_reason: "stop"),並以data: [DONE]標記串流正式結束。
我們使用 futures::stream 的 combinator 將這三個階段鏈接成一個完整的 Stream:
async fn handle_streaming(
engine: EngineService,
messages: Vec<ChatMessage>,
max_tokens: u32,
seed: Option<u32>,
model_id: String,
) -> Response {
let completion_id = generate_completion_id();
let created = unix_timestamp();
// 1. 取得 mpsc 接收端與 blocking thread 的 JoinHandle
let (rx, result_handle) = engine.chat_streaming(messages, max_tokens, seed);
let token_stream = ReceiverStream::new(rx);
// 2. 第一個事件:發送 Role
let role_chunk = ChatCompletionChunk {
id: completion_id.clone(),
object: "chat.completion.chunk".to_string(),
created,
model: model_id.clone(),
choices: vec![ChunkChoice {
index: 0,
delta: ChunkDelta { role: Some(Role::Assistant), content: None },
finish_reason: None,
}],
};
let role_event = Event::default().data(serde_json::to_string(&role_chunk).unwrap_or_default());
let role_stream = stream::once(async move { Ok::<Event, std::convert::Infallible>(role_event) });
// 3. 中間的 token 事件流
let id_for_tokens = completion_id.clone();
let model_for_tokens = model_id.clone();
let token_events = token_stream.map(move |piece| {
let chunk = ChatCompletionChunk {
id: id_for_tokens.clone(),
object: "chat.completion.chunk".to_string(),
created,
model: model_for_tokens.clone(),
choices: vec![ChunkChoice {
index: 0,
delta: ChunkDelta { role: None, content: Some(piece) },
finish_reason: None,
}],
};
Ok::<Event, std::convert::Infallible>(
Event::default().data(serde_json::to_string(&chunk).unwrap_or_default())
)
});
// 4. 最後的結束事件:等 JoinHandle 結束取得 finish_reason,然後送出 [DONE]
let id_for_final = completion_id;
let model_for_final = model_id;
let final_events = stream::once(async move {
let finish_reason = match result_handle.await {
Ok(Ok(output)) if output.generated_tokens >= max_tokens => "length",
_ => "stop",
};
let done_chunk = ChatCompletionChunk {
id: id_for_final,
object: "chat.completion.chunk".to_string(),
created,
model: model_for_final,
choices: vec![ChunkChoice {
index: 0,
delta: ChunkDelta { role: None, content: None },
finish_reason: Some(finish_reason.to_string()),
}],
};
let done_json = serde_json::to_string(&done_chunk).unwrap_or_default();
let events = vec![
Ok::<Event, std::convert::Infallible>(Event::default().data(done_json)),
Ok::<Event, std::convert::Infallible>(Event::default().data("[DONE]")),
];
stream::iter(events)
}).flatten();
// 5. 鏈接所有串流並包裝成 SSE 響應
let full_stream = role_stream.chain(token_events).chain(final_events);
let sse = Sse::new(full_stream).keep_alive(axum::response::sse::KeepAlive::default());
// 6. 設定緩衝控制標頭,防止中介 Proxy 攔截阻礙實時輸出
(
[
(axum::http::header::CACHE_CONTROL, "no-cache"),
(axum::http::header::HeaderName::from_static("x-accel-buffering"), "no"),
],
sse,
).into_response()
}[!NOTE] 在回傳 SSE 時,我們加入了
x-accel-buffering: no標頭。這是一個很容易被忽視但至關重要的設定。如果你的本機服務未來透過 Nginx 進行反向代理,Nginx 預設會快取(buffer)伺服器傳回的資料,直到集滿一定大小才一口氣發給客戶端,這會直接摧毀 SSE 的實時「打字機」效果。這個標頭能強制代理伺服器不要進行任何快取。
自動化對談範本偵測 (Chat Template System)
LLM 本質上只是接龍工具,把對談紀錄(例如 Messages Array)轉換成模型能讀懂的單一純文字 prompt,就需要「對談範本(Chat Template)」。
不同的模型使用的範本格式大相逕庭。例如:
- ChatML 格式(Qwen, Yi 等模型常用):
<|im_start|>system You are a helpful assistant.<|im_end|> <|im_start|>user Hello!<|im_end|> <|im_start|>assistant - Llama 3 格式:
<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a helpful assistant.<|eot_id|><|start_header_id|>user<|end_header_id|> Hello!<|eot_id|><|start_header_id|>assistant<|end_header_id|>
在 src/chat_template.rs 中,我們定義了 ChatTemplate trait,並實作了 ChatMLTemplate、Llama3Template 以及 GenericTemplate(基於簡單的 ### User:)。
為了讓使用者不必每次都手動指定格式,我們寫了一個簡單實用的自動偵測機制:
pub fn auto_detect(model_name: &str) -> Box<dyn ChatTemplate> {
let lower = model_name.to_lowercase();
if lower.contains("llama-3") || lower.contains("llama3") {
Box::new(Llama3Template)
} else if lower.contains("chatml") || lower.contains("im_start") {
Box::new(ChatMLTemplate)
} else {
// 安全的安全預設值,因為大多數現代微調模型皆相容於 ChatML
Box::new(ChatMLTemplate)
}
}有了自動偵測,當系統載入模型時,會自動判斷並解析出合適的 prompt 格式,大幅減少配置的繁瑣度。
命令列指令整合:serve
現在我們將這一切整合進 CLI。透過 clap 定義新的 serve 子命令:
/// 啟動與 OpenAI 相容的 HTTP API 伺服器
Serve {
/// 本地 GGUF 路徑、模型 ID 或已下載檔案名稱的局部片段
model: String,
/// 搜尋與解析下載模型時的模型根資料夾
#[arg(short, long, default_value = "models")]
dir: PathBuf,
/// 模型的上下文大小
#[arg(long, default_value_t = 2048)]
ctx_size: u32,
/// 伺服器繫結的 Host 位址
#[arg(long, default_value = "127.0.0.1")]
host: String,
/// 伺服器繫結的連接埠 (Port)
#[arg(long, default_value_t = 8080)]
port: u16,
}在啟動時,它會先透過 resolve_model 模糊匹配找出實體檔案,將模型載入記憶體,然後開啟 HTTP 監聽:
async fn serve_model(model: String, dir: PathBuf, ctx_size: u32, host: String, port: u16) -> Result<()> {
let model_record = resolve_model(&model, dir)?;
println!("Loading model: {}", model_record.id);
println!(" path: {}", model_record.path.display());
println!();
let engine = EngineService::new();
engine.load_model(LoadModelRequest {
model_id: model_record.id.clone(),
path: model_record.path.clone(),
context_size: Some(ctx_size),
}).await?;
let config = ServerConfig { host, port };
start_server(config, engine).await
}實測驗證
我們可以透過簡單的命令列直接運行它:
# 啟動 API 伺服器(系統會自動模糊搜尋 models/ 目錄下的 tinyllama 模型)
cargo run --release -- serve tinyllama --port 8080當伺服器成功運行後,我們可以用 curl 來測試。
1. 測試一般 JSON 響應 (Non-Streaming)
curl http://localhost:8080/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "tinyllama",
"messages": [
{"role": "user", "content": "Explain ownership in Rust in one sentence."}
],
"max_tokens": 80
}'輸出結果:
{
"id": "chatcmpl-c513296e-7eb2-4f66-9af7-045fbae2fa36",
"object": "chat.completion",
"created": 1779608333,
"model": "tinyllama-1.1b-chat-v1.0.Q4_K_M",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "Ownership refers to the legal right to possess, use, or control something. In Rus, ownership is defined by the legal document, the deed, which is a legal instrument that transfers ownership from one person to another. The deed is a legal document that outlines the terms and conditions of ownership, including the rights and responsibilities of the owner and the new owner. The"
},
"finish_reason": "length"
}
],
"usage": {
"prompt_tokens": 37,
"completion_tokens": 80,
"total_tokens": 117
}
}[!WARNING] 咦?等一下,這段回答是不是有點不對勁?「所有權是指擁有、使用或控制某物的法律權利。在古羅斯(Rus),所有權是由土地契據定義的……」
沒錯!這就是本地運行 1.1B 超微型模型(TinyLlama)的真實寫照。因為模型只有 11 億參數,且預訓練資料中對程式語言 Rust 的 Ownership 與歷史地理名詞「古羅斯(Rus)」產生了嚴重的混淆,導致它一本正經地開始胡言亂語。
雖然它的回答牛頭不對馬嘴,但這也千真萬確證明了兩件事:
- 我們的本機推論引擎確實 100% 跑在自己的 CPU/GPU 上,而不是偷偷去調用背後的 OpenAI 或 Anthropic API(要是調用 GPT-4,它絕對會寫出一段完美的 Rust 記憶體所有權解釋)。
- 想要在本機獲得真正實用的回答,我們需要像 Llama 3 8B 或更高參數量的模型。
2. 測試 SSE 串流響應 (Streaming)
curl http://localhost:8080/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "tinyllama",
"messages": [
{"role": "user", "content": "Hello!"}
],
"stream": true,
"max_tokens": 30
}'輸出串流結果:
data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}
data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":"\n\n2"},"finish_reason":null}]}
data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":"."},"finish_reason":null}]}
data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" \""},"finish_reason":null}]}
data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":"Goodbye"},"finish_reason":null}]}
data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":"\""},"finish_reason":null}]}
data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" -"},"finish_reason":null}]}
data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" This"},"finish_reason":null}]}
data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" is"},"finish_reason":null}]}
data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" a"},"finish_reason":null}]}
data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" more"},"finish_reason":null}]}
data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" formal"},"finish_reason":null}]}
data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" farewell"},"finish_reason":null}]}
data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":","},"finish_reason":null}]}
data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" typically"},"finish_reason":null}]}
data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" used"},"finish_reason":null}]}
data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{"content":" in"},"finish_reason":null}]}
data: {"id":"chatcmpl-fa2e41ef-0012-421e-ad91-0391d1820b12","object":"chat.completion.chunk","created":1782201980,"model":"tinyllama","choices":[{"index":0,"delta":{},"finish_reason":"length"}]}
data: [DONE]可以看到,所有的輸出都嚴格符合 OpenAI 協定,並且流暢地以 Token 片段傳回!雖然 TinyLlama 聽到了 “Hello” 卻牛頭不對馬嘴地開始列點回答 “Goodbye” 的定義,但這無疑是 100% 真實的非同步 SSE 串流封包。
結語與展望
透過非同步執行緒與同步 FFI 的清晰解耦,我們讓 Rust 成功扮演起高效能 API 閘道器(Gateway)的角色,完美串接起底層 llama.cpp 原生庫與現代非同步 Web 生態系。
目前我們的本地 LLM 工作站已具備以下能力:
- 遞迴掃描管理本機 GGUF 模型。
- 從 Hugging Face 自動化規劃與安全原子下載。
- 自動化偵測與套用對談範本(Chat Templates)。
- 相容於 OpenAI API 標準的非同步 HTTP 伺服器,支援 SSE 串流。
這意味著你可以立刻把它當作 Ollama 的輕量化替代品,串接到你常用的開發工具或瀏覽器外掛中!
下一個階段,我們將會探索:
- 多模型熱插拔 API:允許客戶端在運行時透過 API 請求動態載入/卸載不同模型。
- 進階採樣參數配置:在 API 中實作 Temperature、Top-P、Top-K 及 Repetition Penalty 等精細化推理配置。
- 更美觀的 Web 或 Desktop 管理介面。
如果你對此專案有興趣,歡迎參考完整的原始碼並一起交流!