專案概述

本文將探討如何使用 Rust 實作一個生產級的非同步任務佇列系統。這個專案展示了多個重要的 Rust 概念和系統設計原則,包括:

  • 模組化設計與封裝
  • Trait 系統的應用
  • 並發安全與資料競爭處理
  • 非同步程式設計 (Tokio)
  • SQLite 資料持久化
  • CLI 工具設計

專案原始碼: https://github.com/p47t/rust-52-projects/tree/master/async-job-queue

系統架構

核心組件

我們的任務佇列系統由三個主要模組組成:

async-job-queue/
├── src/
│   ├── job.rs        # 任務定義與狀態管理
│   ├── storage.rs    # SQLite 儲存層
│   ├── worker.rs     # 工作池與任務處理
│   ├── lib.rs        # 公開 API
│   └── bin/
│       ├── producer.rs  # 生產者 CLI
│       └── worker.rs    # 消費者 CLI

資料流程

Producer (CLI) → SQLite Database ← Worker Pool (多個 workers)
                      ↓
                 [Pending Jobs]
                      ↓
                 [Running Jobs]
                      ↓
              [Completed/Failed/Dead Letter]

Rust 核心概念解析

1. 模組可見性:mod vs pub mod

問題: 如何設計乾淨的公開 API?

lib.rs 中,我們使用私有模組配合公開重匯出:

// 私有模組 - 隱藏內部實作細節
mod job;
mod storage;
mod worker;

// 公開重匯出 - 只暴露需要的類型
pub use job::{Job, JobHandler, JobStatus, Priority};
pub use storage::{Storage, StorageError};
pub use worker::WorkerPool;

優點:

  • ✅ 使用者只能透過 use async_job_queue::{Job, Storage} 匯入
  • ✅ 內部模組結構可以自由重構而不影響公開 API
  • ✅ 更好的封裝性,隱藏實作細節

替代方案(不推薦):

pub mod job;  // 暴露整個模組,使用者可存取所有公開項目

2. Trait:定義行為抽象

JobHandler trait 定義了任務處理的介面:

pub trait JobHandler: Send + Sync {
    fn handle(&self, payload: &[u8]) -> Result<(), String>;
}

設計考量:

  • Send + Sync:確保可以在執行緒間安全傳遞和共享
  • 接受 &[u8] 而非具體類型:保持通用性,支援任何序列化格式
  • 回傳 Result<(), String>:簡單的錯誤處理

使用範例:

struct EchoHandler;

impl JobHandler for EchoHandler {
    fn handle(&self, payload: &[u8]) -> Result&lt;(), String&gt; {
        let message = String::from_utf8_lossy(payload);
        println!("處理任務:{}", message);
        Ok(())
    }
}

3. Newtype 模式的應用場景

討論: 應該使用 Vec<u8> 還是 JobPayload 新類型?

目前實作(Vec<u8>):

pub struct Job {
    pub payload: Vec&lt;u8&gt;,
    // ...
}

Newtype 模式:

pub struct JobPayload(Vec&lt;u8&gt;);

impl JobPayload {
    pub fn new(bytes: Vec&lt;u8&gt;) -> Self {
        Self(bytes)
    }
    
    pub fn as_bytes(&self) -> &[u8] {
        &self.0
    }
}

impl From&lt;String&gt; for JobPayload {
    fn from(s: String) -> Self {
        Self(s.into_bytes())
    }
}

何時使用 Newtype:

  • ✅ 提供型別安全(避免混淆不同的 Vec<u8>
  • ✅ 零成本抽象(編譯時展開,無執行時開銷)
  • ✅ API 更清晰(Job::new(JobPayload, ...) vs Job::new(Vec<u8>, ...)
  • ✅ 未來擴充性(可加入驗證、壓縮等功能)

目前專案的選擇: 保持 Vec<u8> 是合理的,因為:

  • 系統設計為通用型,不關心具體格式
  • API 介面簡單,不易混淆
  • 遵循 YAGNI 原則(You Aren’t Gonna Need It)

4. 列舉型別與 Display Trait

問題: 如何為列舉實作 Display

手動實作(目前方式):

impl fmt::Display for Priority {
    fn fmt(&self, f: &mut fmt::Formatter&lt;'_&gt;) -> fmt::Result {
        match self {
            Priority::Low => write!(f, "low"),
            Priority::Normal => write!(f, "normal"),
            Priority::High => write!(f, "high"),
            Priority::Critical => write!(f, "critical"),
        }
    }
}

使用第三方 crate(strum):

use strum_macros::Display;

#[derive(Display)]
#[strum(serialize_all = "lowercase")]
pub enum Priority {
    Low,
    Normal,
    High,
    Critical,
}

建議: 對於簡單列舉,手動實作更清晰,無需額外依賴。

5. 非同步程式設計:Tokio 的角色

Tokio 在專案中的使用:

  1. #[tokio::main] - 設定非同步執行環境
#[tokio::main]
async fn main() -> Result&lt;(), Box&lt;dyn std::error::Error&gt;&gt; {
    // ...
}
  1. tokio::spawn - 並發執行多個 worker
for worker_id in 0..self.num_workers {
    let handle = tokio::spawn(async move {
        worker_loop(worker_id, storage, handler, poll_interval).await;
    });
    handles.push(handle);
}
  1. tokio::time::sleep - 非阻塞延遲
sleep(poll_interval).await;  // 輪詢間隔

有趣的觀察: 這個專案實際上是「假非同步」!核心操作都是同步的:

  • SQLite 操作使用 rusqlite(同步 API)
  • 任務處理是同步函式
  • 使用 std::sync::Mutex 而非 tokio::sync::Mutex

為什麼還要用 Tokio?

  • 方便建立並發 worker(比 std::thread 更輕量)
  • 非阻塞的 sleep(不會凍結整個執行緒)
  • 為未來的非同步擴充做準備

6. std::sync::Mutex vs tokio::sync::Mutex

關鍵差異:

特性 std::sync::Mutex tokio::sync::Mutex
等待鎖時 阻塞整個執行緒 讓出執行權給其他 task
使用方式 mutex.lock().unwrap() mutex.lock().await
適用場景 短時間持鎖、同步操作 長時間持鎖、跨 await
效能 更快(作業系統級) 稍慢(需協調排程)

目前專案的正確選擇:

pub struct Storage {
    conn: Arc&lt;Mutex&lt;Connection&gt;&gt;,  // ✅ std::sync::Mutex
}

pub fn get_next_pending(&self) -&gt; Result&lt;Option&lt;Job&gt;, StorageError&gt; {
    let conn = self.conn.lock().unwrap();
    // 執行快速的同步資料庫操作
    // 沒有在持鎖期間 await
    Ok(job)
}

何時必須使用 tokio::sync::Mutex

// ❌ 錯誤:在持有 std::sync::Mutex 時 await
let guard = std_mutex.lock().unwrap();
async_operation().await;  // 會阻塞整個執行緒!

// ✅ 正確:使用 tokio::sync::Mutex
let guard = tokio_mutex.lock().await;
async_operation().await;  // OK,會讓出執行權

並發安全與資料競爭

發現的競爭條件問題

原始設計的漏洞:

// ❌ 有競爭條件的程式碼
pub fn get_next_pending(&self) -&gt; Result&lt;Option&lt;Job&gt;, StorageError&gt; {
    // 步驟 1:讀取待處理任務
    let job = SELECT ... WHERE status = Pending LIMIT 1;
    // ⚠️ 問題:Worker 2 可能在這裡也讀取到相同任務!
    Ok(job)
}

// Worker 處理邏輯
let job = storage.get_next_pending()?;  // 取得任務
job.mark_running();                     // 標記為執行中
storage.update(&job)?;                  // 更新資料庫

時序圖展示問題:

時間 Worker 1 Worker 2 資料庫狀態
T1 get_next_pending() Job A: Pending
T2 讀取 Job A
T3 get_next_pending() Job A: Pending
T4 讀取 Job A (相同!)
T5 mark_running()
T6 update(Job A) Job A: Running
T7 mark_running()
T8 update(Job A) Job A: Running
T9 處理 Job A 處理 Job A ❌ 重複處理!

解決方案:原子性操作

修正後的實作:

pub fn get_next_pending(&self) -&gt; Result&lt;Option&lt;Job&gt;, StorageError&gt; {
    let conn = self.conn.lock().unwrap();
    
    // ✅ 原子性的 UPDATE + RETURNING(一個 SQL 語句完成)
    let mut stmt = conn.prepare(
        "UPDATE jobs
         SET status = ?1, updated_at = ?2
         WHERE id = (
             SELECT id FROM jobs
             WHERE status = ?3
             ORDER BY priority DESC, created_at ASC
             LIMIT 1
         )
         RETURNING id, payload, priority, status, retry_count, max_retries,
                   created_at, updated_at, error_message",
    )?;

    let now = Utc::now().to_rfc3339();
    let job = stmt
        .query_row(
            params![JobStatus::Running as i32, now, JobStatus::Pending as i32],
            |row| Ok(self.row_to_job(row)?),
        )
        .optional()?;

    Ok(job)
}

為什麼這樣可以解決問題:

  1. 原子性保證: UPDATE ... RETURNING 是單一 SQL 語句
  2. Mutex 保護: 同一時間只有一個 worker 能執行此查詢
  3. 狀態立即改變: 任務在被回傳前就已標記為 Running
  4. 資料庫層級鎖定: SQLite 的 EXCLUSIVE 鎖確保寫入安全

修正後的時序:

時間 Worker 1 Worker 2 資料庫狀態
T1 取得 Mutex
T2 UPDATE Job A → Running 等待 Mutex Job A: Running
T3 釋放 Mutex,回傳 Job A
T4 取得 Mutex
T5 UPDATE (找不到 Pending) Job A: Running
T6 回傳 None
T7 處理 Job A 等待下一輪 ✅ 無重複處理

Worker 程式碼簡化

因為 get_next_pending() 已經原子性地標記任務,worker 程式碼更簡潔:

// ✅ 修正後
match storage.get_next_pending() {
    Ok(Some(mut job)) => {
        // 任務已經是 Running 狀態,直接處理
        match handler.handle(&job.payload) {
            Ok(()) => job.mark_completed(),
            Err(e) => job.mark_failed(e),
        }
        storage.update(&job)?;
    }
    Ok(None) => {
        sleep(poll_interval).await;  // 無任務,等待
    }
    Err(e) => { /* 處理錯誤 */ }
}

日誌與除錯

tracing vs println!

何時使用 println!

// ✅ 使用者導向的輸出
println!("Job ID: {}", job.id);
println!("Status: {}", job.status);

何時使用 tracing

// ✅ 操作性日誌、除錯資訊
tracing::info!(job_id = %job.id, "Processing job");
tracing::error!("Database error: {}", e);

在 CLI 工具中:

  • producer 使用 println! 顯示命令結果(使用者期望的輸出)
  • worker 使用 tracing 記錄操作日誌(方便監控和除錯)

初始化 tracing

#[tokio::main]
async fn main() -&gt; Result&lt;(), Box&lt;dyn std::error::Error&gt;&gt; {
    // 設定日誌格式與過濾級別
    tracing_subscriber::fmt()
        .with_env_filter(
            tracing_subscriber::EnvFilter::from_default_env()
                .add_directive(tracing::Level::INFO.into()),
        )
        .init();
    
    // ...
}

使用方式:

# 預設 INFO 級別
./worker

# 設定為 DEBUG 級別
RUST_LOG=debug ./worker

資料持久化設計

SQLite Schema

CREATE TABLE jobs (
    id TEXT PRIMARY KEY,
    payload BLOB NOT NULL,
    priority INTEGER NOT NULL,
    status INTEGER NOT NULL,
    retry_count INTEGER NOT NULL,
    max_retries INTEGER NOT NULL,
    created_at TEXT NOT NULL,
    updated_at TEXT NOT NULL,
    error_message TEXT
);

CREATE INDEX idx_status_priority
ON jobs(status, priority DESC, created_at ASC);

索引設計考量:

  • status 在前:快速過濾待處理任務
  • priority DESC:高優先順序優先
  • created_at ASC:相同優先順序時,先進先出 (FIFO)

狀態轉換

         ┌─────────────┐
         │   Pending   │
         └──────┬──────┘
                │
                ↓
         ┌──────────────┐
         │   Running    │
         └──────┬───────┘
                │
        ┌───────┴────────┐
        │                │
        ↓ (成功)          ↓ (失敗)
  ┌───────────┐    ┌─────────┐
  │ Completed │    │ Failed  │
  └───────────┘    └────┬────┘
                        │
                ┌───────┴────────┐
                │                │
                ↓ (可重試)        ↓ (超過最大重試)
           [Pending]        ┌──────────────┐
          (retry_count++)   │ DeadLetter   │
                            └──────────────┘

失敗處理:

pub fn mark_failed(&mut self, error: String) {
    self.status = if self.can_retry() {
        self.retry_count += 1;
        JobStatus::Pending  // 重新排程
    } else {
        JobStatus::DeadLetter  // 移至死信佇列
    };
    self.error_message = Some(error);
    self.updated_at = Utc::now();
}

CLI 設計

Producer(生產者)

# 提交任務
producer submit -p "Hello, World!" -r high -m 3

# 查詢狀態
producer status --job-id &lt;uuid&gt;

# 統計資料
producer stats

使用 Clap 實作:

#[derive(Parser)]
#[command(name = "producer")]
#[command(about = "Job queue producer - submit and manage jobs")]
struct Cli {
    #[arg(short, long, default_value = "jobs.db")]
    database: String,

    #[command(subcommand)]
    command: Commands,
}

#[derive(Subcommand)]
enum Commands {
    Submit {
        #[arg(short, long)]
        payload: String,
        
        #[arg(short = 'r', long, default_value = "normal")]
        priority: String,
        
        #[arg(short, long, default_value = "3")]
        max_retries: u32,
    },
    Status { /* ... */ },
    Stats,
}

Worker(消費者)

# 啟動 4 個 workers
worker -w 4

# 使用不同的資料庫
worker --database /path/to/jobs.db --workers 8

效能考量

輪詢 vs 事件驅動

目前實作(輪詢):

loop {
    match storage.get_next_pending() {
        Ok(Some(job)) => { /* 處理 */ }
        Ok(None) => sleep(poll_interval).await,  // 等待 1 秒
        Err(e) => { /* 錯誤處理 */ }
    }
}

優點:

  • ✅ 實作簡單
  • ✅ 可靠性高(無需額外機制)
  • ✅ 易於理解和維護

缺點:

  • ❌ 延遲:最多 poll_interval 的延遲
  • ❌ 資源浪費:無任務時仍在輪詢

改進方案(事件驅動): 可以使用 tokio::sync::Notify 或資料庫的 NOTIFY/LISTEN:

// 生產者插入任務後通知
notifier.notify_waiters();

// Worker 等待通知
notifier.notified().await;

指數退避(Exponential Backoff)

if job.retry_count > 0 {
    let backoff = Duration::from_secs(2_u64.pow(job.retry_count.min(5)));
    sleep(backoff).await;
}

退避時間:

  • 第 1 次重試:2^1 = 2 秒
  • 第 2 次重試:2^2 = 4 秒
  • 第 3 次重試:2^3 = 8 秒
  • 第 4 次重試:2^4 = 16 秒
  • 第 5 次以上:2^5 = 32 秒(最大值)

目的: 避免快速重試造成系統負擔,給下游服務恢復時間。

最佳實踐總結

1. 模組設計

  • 使用私有模組 + 公開重匯出控制 API
  • 清晰的關注點分離

2. 型別安全

  • 考慮使用 newtype 模式增加型別安全
  • 平衡抽象與簡潔性

3. 並發處理

  • 識別臨界區(critical section)
  • 使用原子操作避免競爭條件
  • 選擇適當的同步原語(Mutex、RwLock 等)

4. 錯誤處理

  • 使用 thiserror 定義清晰的錯誤類型
  • 區分可恢復與不可恢復的錯誤

5. 日誌策略

  • CLI 輸出用 println!
  • 操作日誌用 tracing
  • 使用環境變數控制日誌級別

6. 非同步選擇

  • 評估是否真的需要 async/await
  • 混合使用同步與非同步程式碼時要小心
  • 理解 std::synctokio::sync 的差異

擴充方向

  1. 真正的非同步化:

    • 使用 sqlxtokio-rusqlite
    • 非同步的 JobHandler trait
  2. 分散式支援:

    • 使用 PostgreSQL 的 SELECT FOR UPDATE SKIP LOCKED
    • Redis 作為訊息佇列
  3. 監控與觀測:

    • Prometheus metrics
    • 分散式追蹤
  4. 進階功能:

    • 延遲任務(scheduled jobs)
    • 任務優先順序調整
    • 動態 worker 擴縮容

結論

這個專案展示了如何使用 Rust 構建可靠的系統軟體。關鍵要點:

  • 型別系統讓我們在編譯期捕獲許多錯誤
  • 所有權模型確保記憶體安全與執行緒安全
  • Trait 系統提供零成本抽象
  • 並發原語幫助我們正確處理競爭條件

Rust 的「如果編譯通過,通常就能正確運行」的特性,讓我們能夠自信地構建複雜的並發系統。