專案概述
本文將探討如何使用 Rust 實作一個生產級的非同步任務佇列系統。這個專案展示了多個重要的 Rust 概念和系統設計原則,包括:
- 模組化設計與封裝
- Trait 系統的應用
- 並發安全與資料競爭處理
- 非同步程式設計 (Tokio)
- SQLite 資料持久化
- CLI 工具設計
專案原始碼: https://github.com/p47t/rust-52-projects/tree/master/async-job-queue
系統架構
核心組件
我們的任務佇列系統由三個主要模組組成:
async-job-queue/
├── src/
│ ├── job.rs # 任務定義與狀態管理
│ ├── storage.rs # SQLite 儲存層
│ ├── worker.rs # 工作池與任務處理
│ ├── lib.rs # 公開 API
│ └── bin/
│ ├── producer.rs # 生產者 CLI
│ └── worker.rs # 消費者 CLI
資料流程
Producer (CLI) → SQLite Database ← Worker Pool (多個 workers)
↓
[Pending Jobs]
↓
[Running Jobs]
↓
[Completed/Failed/Dead Letter]
Rust 核心概念解析
1. 模組可見性:mod vs pub mod
問題: 如何設計乾淨的公開 API?
在 lib.rs 中,我們使用私有模組配合公開重匯出:
// 私有模組 - 隱藏內部實作細節
mod job;
mod storage;
mod worker;
// 公開重匯出 - 只暴露需要的類型
pub use job::{Job, JobHandler, JobStatus, Priority};
pub use storage::{Storage, StorageError};
pub use worker::WorkerPool;優點:
- ✅ 使用者只能透過
use async_job_queue::{Job, Storage}匯入 - ✅ 內部模組結構可以自由重構而不影響公開 API
- ✅ 更好的封裝性,隱藏實作細節
替代方案(不推薦):
pub mod job; // 暴露整個模組,使用者可存取所有公開項目
2. Trait:定義行為抽象
JobHandler trait 定義了任務處理的介面:
pub trait JobHandler: Send + Sync {
fn handle(&self, payload: &[u8]) -> Result<(), String>;
}設計考量:
Send + Sync:確保可以在執行緒間安全傳遞和共享- 接受
&[u8]而非具體類型:保持通用性,支援任何序列化格式 - 回傳
Result<(), String>:簡單的錯誤處理
使用範例:
struct EchoHandler;
impl JobHandler for EchoHandler {
fn handle(&self, payload: &[u8]) -> Result<(), String> {
let message = String::from_utf8_lossy(payload);
println!("處理任務:{}", message);
Ok(())
}
}3. Newtype 模式的應用場景
討論: 應該使用 Vec<u8> 還是 JobPayload 新類型?
目前實作(Vec<u8>):
pub struct Job {
pub payload: Vec<u8>,
// ...
}Newtype 模式:
pub struct JobPayload(Vec<u8>);
impl JobPayload {
pub fn new(bytes: Vec<u8>) -> Self {
Self(bytes)
}
pub fn as_bytes(&self) -> &[u8] {
&self.0
}
}
impl From<String> for JobPayload {
fn from(s: String) -> Self {
Self(s.into_bytes())
}
}何時使用 Newtype:
- ✅ 提供型別安全(避免混淆不同的
Vec<u8>) - ✅ 零成本抽象(編譯時展開,無執行時開銷)
- ✅ API 更清晰(
Job::new(JobPayload, ...)vsJob::new(Vec<u8>, ...)) - ✅ 未來擴充性(可加入驗證、壓縮等功能)
目前專案的選擇:
保持 Vec<u8> 是合理的,因為:
- 系統設計為通用型,不關心具體格式
- API 介面簡單,不易混淆
- 遵循 YAGNI 原則(You Aren’t Gonna Need It)
4. 列舉型別與 Display Trait
問題: 如何為列舉實作 Display?
手動實作(目前方式):
impl fmt::Display for Priority {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Priority::Low => write!(f, "low"),
Priority::Normal => write!(f, "normal"),
Priority::High => write!(f, "high"),
Priority::Critical => write!(f, "critical"),
}
}
}使用第三方 crate(strum):
use strum_macros::Display;
#[derive(Display)]
#[strum(serialize_all = "lowercase")]
pub enum Priority {
Low,
Normal,
High,
Critical,
}建議: 對於簡單列舉,手動實作更清晰,無需額外依賴。
5. 非同步程式設計:Tokio 的角色
Tokio 在專案中的使用:
#[tokio::main]- 設定非同步執行環境
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// ...
}tokio::spawn- 並發執行多個 worker
for worker_id in 0..self.num_workers {
let handle = tokio::spawn(async move {
worker_loop(worker_id, storage, handler, poll_interval).await;
});
handles.push(handle);
}tokio::time::sleep- 非阻塞延遲
sleep(poll_interval).await; // 輪詢間隔
有趣的觀察: 這個專案實際上是「假非同步」!核心操作都是同步的:
- SQLite 操作使用
rusqlite(同步 API) - 任務處理是同步函式
- 使用
std::sync::Mutex而非tokio::sync::Mutex
為什麼還要用 Tokio?
- 方便建立並發 worker(比
std::thread更輕量) - 非阻塞的 sleep(不會凍結整個執行緒)
- 為未來的非同步擴充做準備
6. std::sync::Mutex vs tokio::sync::Mutex
關鍵差異:
| 特性 | std::sync::Mutex |
tokio::sync::Mutex |
|---|---|---|
| 等待鎖時 | 阻塞整個執行緒 | 讓出執行權給其他 task |
| 使用方式 | mutex.lock().unwrap() |
mutex.lock().await |
| 適用場景 | 短時間持鎖、同步操作 | 長時間持鎖、跨 await |
| 效能 | 更快(作業系統級) | 稍慢(需協調排程) |
目前專案的正確選擇:
pub struct Storage {
conn: Arc<Mutex<Connection>>, // ✅ std::sync::Mutex
}
pub fn get_next_pending(&self) -> Result<Option<Job>, StorageError> {
let conn = self.conn.lock().unwrap();
// 執行快速的同步資料庫操作
// 沒有在持鎖期間 await
Ok(job)
}何時必須使用 tokio::sync::Mutex:
// ❌ 錯誤:在持有 std::sync::Mutex 時 await
let guard = std_mutex.lock().unwrap();
async_operation().await; // 會阻塞整個執行緒!
// ✅ 正確:使用 tokio::sync::Mutex
let guard = tokio_mutex.lock().await;
async_operation().await; // OK,會讓出執行權
並發安全與資料競爭
發現的競爭條件問題
原始設計的漏洞:
// ❌ 有競爭條件的程式碼
pub fn get_next_pending(&self) -> Result<Option<Job>, StorageError> {
// 步驟 1:讀取待處理任務
let job = SELECT ... WHERE status = Pending LIMIT 1;
// ⚠️ 問題:Worker 2 可能在這裡也讀取到相同任務!
Ok(job)
}
// Worker 處理邏輯
let job = storage.get_next_pending()?; // 取得任務
job.mark_running(); // 標記為執行中
storage.update(&job)?; // 更新資料庫
時序圖展示問題:
| 時間 | Worker 1 | Worker 2 | 資料庫狀態 |
|---|---|---|---|
| T1 | get_next_pending() | Job A: Pending | |
| T2 | 讀取 Job A | ||
| T3 | get_next_pending() | Job A: Pending | |
| T4 | 讀取 Job A (相同!) | ||
| T5 | mark_running() | ||
| T6 | update(Job A) | Job A: Running | |
| T7 | mark_running() | ||
| T8 | update(Job A) | Job A: Running | |
| T9 | 處理 Job A | 處理 Job A | ❌ 重複處理! |
解決方案:原子性操作
修正後的實作:
pub fn get_next_pending(&self) -> Result<Option<Job>, StorageError> {
let conn = self.conn.lock().unwrap();
// ✅ 原子性的 UPDATE + RETURNING(一個 SQL 語句完成)
let mut stmt = conn.prepare(
"UPDATE jobs
SET status = ?1, updated_at = ?2
WHERE id = (
SELECT id FROM jobs
WHERE status = ?3
ORDER BY priority DESC, created_at ASC
LIMIT 1
)
RETURNING id, payload, priority, status, retry_count, max_retries,
created_at, updated_at, error_message",
)?;
let now = Utc::now().to_rfc3339();
let job = stmt
.query_row(
params![JobStatus::Running as i32, now, JobStatus::Pending as i32],
|row| Ok(self.row_to_job(row)?),
)
.optional()?;
Ok(job)
}為什麼這樣可以解決問題:
- 原子性保證:
UPDATE ... RETURNING是單一 SQL 語句 - Mutex 保護: 同一時間只有一個 worker 能執行此查詢
- 狀態立即改變: 任務在被回傳前就已標記為 Running
- 資料庫層級鎖定: SQLite 的 EXCLUSIVE 鎖確保寫入安全
修正後的時序:
| 時間 | Worker 1 | Worker 2 | 資料庫狀態 |
|---|---|---|---|
| T1 | 取得 Mutex | ||
| T2 | UPDATE Job A → Running | 等待 Mutex | Job A: Running |
| T3 | 釋放 Mutex,回傳 Job A | ||
| T4 | 取得 Mutex | ||
| T5 | UPDATE (找不到 Pending) | Job A: Running | |
| T6 | 回傳 None | ||
| T7 | 處理 Job A | 等待下一輪 | ✅ 無重複處理 |
Worker 程式碼簡化
因為 get_next_pending() 已經原子性地標記任務,worker 程式碼更簡潔:
// ✅ 修正後
match storage.get_next_pending() {
Ok(Some(mut job)) => {
// 任務已經是 Running 狀態,直接處理
match handler.handle(&job.payload) {
Ok(()) => job.mark_completed(),
Err(e) => job.mark_failed(e),
}
storage.update(&job)?;
}
Ok(None) => {
sleep(poll_interval).await; // 無任務,等待
}
Err(e) => { /* 處理錯誤 */ }
}日誌與除錯
tracing vs println!
何時使用 println!:
// ✅ 使用者導向的輸出
println!("Job ID: {}", job.id);
println!("Status: {}", job.status);何時使用 tracing:
// ✅ 操作性日誌、除錯資訊
tracing::info!(job_id = %job.id, "Processing job");
tracing::error!("Database error: {}", e);在 CLI 工具中:
producer使用println!顯示命令結果(使用者期望的輸出)worker使用tracing記錄操作日誌(方便監控和除錯)
初始化 tracing
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 設定日誌格式與過濾級別
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive(tracing::Level::INFO.into()),
)
.init();
// ...
}使用方式:
# 預設 INFO 級別
./worker
# 設定為 DEBUG 級別
RUST_LOG=debug ./worker資料持久化設計
SQLite Schema
CREATE TABLE jobs (
id TEXT PRIMARY KEY,
payload BLOB NOT NULL,
priority INTEGER NOT NULL,
status INTEGER NOT NULL,
retry_count INTEGER NOT NULL,
max_retries INTEGER NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
error_message TEXT
);
CREATE INDEX idx_status_priority
ON jobs(status, priority DESC, created_at ASC);索引設計考量:
status在前:快速過濾待處理任務priority DESC:高優先順序優先created_at ASC:相同優先順序時,先進先出 (FIFO)
狀態轉換
┌─────────────┐
│ Pending │
└──────┬──────┘
│
↓
┌──────────────┐
│ Running │
└──────┬───────┘
│
┌───────┴────────┐
│ │
↓ (成功) ↓ (失敗)
┌───────────┐ ┌─────────┐
│ Completed │ │ Failed │
└───────────┘ └────┬────┘
│
┌───────┴────────┐
│ │
↓ (可重試) ↓ (超過最大重試)
[Pending] ┌──────────────┐
(retry_count++) │ DeadLetter │
└──────────────┘
失敗處理:
pub fn mark_failed(&mut self, error: String) {
self.status = if self.can_retry() {
self.retry_count += 1;
JobStatus::Pending // 重新排程
} else {
JobStatus::DeadLetter // 移至死信佇列
};
self.error_message = Some(error);
self.updated_at = Utc::now();
}CLI 設計
Producer(生產者)
# 提交任務
producer submit -p "Hello, World!" -r high -m 3
# 查詢狀態
producer status --job-id <uuid>
# 統計資料
producer stats使用 Clap 實作:
#[derive(Parser)]
#[command(name = "producer")]
#[command(about = "Job queue producer - submit and manage jobs")]
struct Cli {
#[arg(short, long, default_value = "jobs.db")]
database: String,
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
Submit {
#[arg(short, long)]
payload: String,
#[arg(short = 'r', long, default_value = "normal")]
priority: String,
#[arg(short, long, default_value = "3")]
max_retries: u32,
},
Status { /* ... */ },
Stats,
}Worker(消費者)
# 啟動 4 個 workers
worker -w 4
# 使用不同的資料庫
worker --database /path/to/jobs.db --workers 8效能考量
輪詢 vs 事件驅動
目前實作(輪詢):
loop {
match storage.get_next_pending() {
Ok(Some(job)) => { /* 處理 */ }
Ok(None) => sleep(poll_interval).await, // 等待 1 秒
Err(e) => { /* 錯誤處理 */ }
}
}優點:
- ✅ 實作簡單
- ✅ 可靠性高(無需額外機制)
- ✅ 易於理解和維護
缺點:
- ❌ 延遲:最多
poll_interval的延遲 - ❌ 資源浪費:無任務時仍在輪詢
改進方案(事件驅動):
可以使用 tokio::sync::Notify 或資料庫的 NOTIFY/LISTEN:
// 生產者插入任務後通知
notifier.notify_waiters();
// Worker 等待通知
notifier.notified().await;指數退避(Exponential Backoff)
if job.retry_count > 0 {
let backoff = Duration::from_secs(2_u64.pow(job.retry_count.min(5)));
sleep(backoff).await;
}退避時間:
- 第 1 次重試:2^1 = 2 秒
- 第 2 次重試:2^2 = 4 秒
- 第 3 次重試:2^3 = 8 秒
- 第 4 次重試:2^4 = 16 秒
- 第 5 次以上:2^5 = 32 秒(最大值)
目的: 避免快速重試造成系統負擔,給下游服務恢復時間。
最佳實踐總結
1. 模組設計
- 使用私有模組 + 公開重匯出控制 API
- 清晰的關注點分離
2. 型別安全
- 考慮使用 newtype 模式增加型別安全
- 平衡抽象與簡潔性
3. 並發處理
- 識別臨界區(critical section)
- 使用原子操作避免競爭條件
- 選擇適當的同步原語(Mutex、RwLock 等)
4. 錯誤處理
- 使用
thiserror定義清晰的錯誤類型 - 區分可恢復與不可恢復的錯誤
5. 日誌策略
- CLI 輸出用
println! - 操作日誌用
tracing - 使用環境變數控制日誌級別
6. 非同步選擇
- 評估是否真的需要 async/await
- 混合使用同步與非同步程式碼時要小心
- 理解
std::sync與tokio::sync的差異
擴充方向
-
真正的非同步化:
- 使用
sqlx或tokio-rusqlite - 非同步的
JobHandlertrait
- 使用
-
分散式支援:
- 使用 PostgreSQL 的
SELECT FOR UPDATE SKIP LOCKED - Redis 作為訊息佇列
- 使用 PostgreSQL 的
-
監控與觀測:
- Prometheus metrics
- 分散式追蹤
-
進階功能:
- 延遲任務(scheduled jobs)
- 任務優先順序調整
- 動態 worker 擴縮容
結論
這個專案展示了如何使用 Rust 構建可靠的系統軟體。關鍵要點:
- 型別系統讓我們在編譯期捕獲許多錯誤
- 所有權模型確保記憶體安全與執行緒安全
- Trait 系統提供零成本抽象
- 並發原語幫助我們正確處理競爭條件
Rust 的「如果編譯通過,通常就能正確運行」的特性,讓我們能夠自信地構建複雜的並發系統。