featured.svg

從一個任務佇列開始

任務佇列這東西,幾乎每個後端系統都會用到。但你有沒有想過,如果自己從零刻一個出來,到底要踩過哪些坑?我這次拿 Rust 來實作一個任務佇列,本來想說應該很單純,結果一路寫下來,意外地把好幾個 Rust 的核心概念都複習了一遍,還順手挖出一個藏得很深的競爭條件 bug,實在是頗有收穫。

這篇就帶你走一遍我的思路,沿途會碰到的東西大概是這些:

  • 模組化設計與封裝
  • Trait 系統的應用
  • 並發安全與資料競爭處理
  • 非同步程式設計 (Tokio)
  • SQLite 資料持久化
  • CLI 工具設計

專案原始碼: https://github.com/p47t/rust-52-projects/tree/master/async-job-queue

系統架構

核心組件

整個系統其實不大,主要就三個模組分工合作:

async-job-queue/
├── src/
│   ├── job.rs        # 任務定義與狀態管理
│   ├── storage.rs    # SQLite 儲存層
│   ├── worker.rs     # 工作池與任務處理
│   ├── lib.rs        # 公開 API
│   └── bin/
│       ├── producer.rs  # 生產者 CLI
│       └── worker.rs    # 消費者 CLI

資料流程

Producer (CLI) → SQLite Database ← Worker Pool (多個 workers)
                      ↓
                 [Pending Jobs]
                      ↓
                 [Running Jobs]
                      ↓
              [Completed/Failed/Dead Letter]

Rust 核心概念解析

1. 模組可見性:mod vs pub mod

問題: 怎麼設計一個乾淨、不會洩漏內部細節的公開 API?

我在 lib.rs 裡的做法是:模組通通設成私有,再挑需要的型別重新匯出。

// 私有模組 - 隱藏內部實作細節
mod job;
mod storage;
mod worker;

// 公開重匯出 - 只暴露需要的類型
pub use job::{Job, JobHandler, JobStatus, Priority};
pub use storage::{Storage, StorageError};
pub use worker::WorkerPool;

優點:

  • ✅ 使用者只能透過 use async_job_queue::{Job, Storage} 匯入
  • ✅ 內部模組結構可以自由重構而不影響公開 API
  • ✅ 更好的封裝性,隱藏實作細節

替代方案(我不太推薦):

pub mod job;  // 暴露整個模組,使用者可存取所有公開項目

這樣一寫,內部結構就攤在陽光下了,之後想重構都綁手綁腳。

2. Trait:定義行為抽象

要讓使用者塞自己的任務處理邏輯進來,trait 是最自然的選擇。JobHandler 就定義了這個介面:

pub trait JobHandler: Send + Sync {
    fn handle(&self, payload: &[u8]) -> Result<(), String>;
}

設計考量:

  • Send + Sync:確保可以在執行緒間安全傳遞和共享
  • 接受 &[u8] 而非具體類型:保持通用性,支援任何序列化格式
  • 回傳 Result<(), String>:簡單的錯誤處理

使用範例:

struct EchoHandler;

impl JobHandler for EchoHandler {
    fn handle(&self, payload: &[u8]) -> Result&lt;(), String&gt; {
        let message = String::from_utf8_lossy(payload);
        println!("處理任務:{}", message);
        Ok(())
    }
}

3. Newtype 模式的應用場景

這裡我猶豫了一下:任務的內容該直接用 Vec<u8>,還是包一層 JobPayload 新型別呢?

目前實作(Vec<u8>):

pub struct Job {
    pub payload: Vec&lt;u8&gt;,
    // ...
}

Newtype 模式:

pub struct JobPayload(Vec&lt;u8&gt;);

impl JobPayload {
    pub fn new(bytes: Vec&lt;u8&gt;) -> Self {
        Self(bytes)
    }
    
    pub fn as_bytes(&self) -> &[u8] {
        &self.0
    }
}

impl From&lt;String&gt; for JobPayload {
    fn from(s: String) -> Self {
        Self(s.into_bytes())
    }
}

何時使用 Newtype:

  • ✅ 提供型別安全(避免混淆不同的 Vec<u8>
  • ✅ 零成本抽象(編譯時展開,無執行時開銷)
  • ✅ API 更清晰(Job::new(JobPayload, ...) vs Job::new(Vec<u8>, ...)
  • ✅ 未來擴充性(可加入驗證、壓縮等功能)

我最後的選擇: 還是留著 Vec<u8> 就好,理由是:

  • 系統本來就設計成通用型,不在乎你塞進來的是什麼格式
  • API 介面夠簡單,目前也不太可能混淆
  • 遵循 YAGNI 原則(You Aren’t Gonna Need It)——還沒需要就先別過度設計

4. 列舉型別與 Display Trait

問題: 列舉要怎麼實作 Display,把它印成好看的字串?

手動實作(我目前的做法):

impl fmt::Display for Priority {
    fn fmt(&self, f: &mut fmt::Formatter&lt;'_&gt;) -> fmt::Result {
        match self {
            Priority::Low => write!(f, "low"),
            Priority::Normal => write!(f, "normal"),
            Priority::High => write!(f, "high"),
            Priority::Critical => write!(f, "critical"),
        }
    }
}

使用第三方 crate(strum):

use strum_macros::Display;

#[derive(Display)]
#[strum(serialize_all = "lowercase")]
pub enum Priority {
    Low,
    Normal,
    High,
    Critical,
}

我的看法: 像這種簡單列舉,手動寫反而更清楚,何必為了省四行字多拉一個依賴進來呢?strum 當然很方便,但要看場合用啦。

5. 非同步程式設計:Tokio 的角色

Tokio 在專案中的使用:

  1. #[tokio::main] - 設定非同步執行環境
#[tokio::main]
async fn main() -> Result&lt;(), Box&lt;dyn std::error::Error&gt;&gt; {
    // ...
}
  1. tokio::spawn - 並發執行多個 worker
for worker_id in 0..self.num_workers {
    let handle = tokio::spawn(async move {
        worker_loop(worker_id, storage, handler, poll_interval).await;
    });
    handles.push(handle);
}
  1. tokio::time::sleep - 非阻塞延遲
sleep(poll_interval).await;  // 輪詢間隔

一個有點好笑的觀察: 講了半天 Tokio,但這個專案其實是「假非同步」喔!核心操作骨子裡通通是同步的:

  • SQLite 操作使用 rusqlite(同步 API)
  • 任務處理是同步函式
  • 使用 std::sync::Mutex 而非 tokio::sync::Mutex

那既然是假的,為什麼還要拖 Tokio 下水?

  • 開並發 worker 很方便(比 std::thread 輕量)
  • sleep 不會阻塞(不會把整個執行緒凍住)
  • 為未來真的要非同步化時鋪路

說穿了就是先佔個位子。哪天想把儲存層換成 async,骨架已經在那邊等著了。

6. std::sync::Mutex vs tokio::sync::Mutex

這兩個 Mutex 長得很像,很多人會直覺以為「寫 async 就該用 tokio 的」,其實不一定。關鍵差異在這裡:

特性 std::sync::Mutex tokio::sync::Mutex
等待鎖時 阻塞整個執行緒 讓出執行權給其他 task
使用方式 mutex.lock().unwrap() mutex.lock().await
適用場景 短時間持鎖、同步操作 長時間持鎖、跨 await
效能 更快(作業系統級) 稍慢(需協調排程)

這個專案的鎖只在快速的資料庫操作那一小段持有,中間完全沒有 await,所以 std::sync::Mutex 才是對的選擇——又快又夠用:

pub struct Storage {
    conn: Arc&lt;Mutex&lt;Connection&gt;&gt;,  // ✅ std::sync::Mutex
}

pub fn get_next_pending(&self) -&gt; Result&lt;Option&lt;Job&gt;, StorageError&gt; {
    let conn = self.conn.lock().unwrap();
    // 執行快速的同步資料庫操作
    // 沒有在持鎖期間 await
    Ok(job)
}

那什麼時候非得用 tokio::sync::Mutex 不可? 答案是:當你需要在持鎖期間 await 的時候。

// ❌ 錯誤:在持有 std::sync::Mutex 時 await
let guard = std_mutex.lock().unwrap();
async_operation().await;  // 會阻塞整個執行緒!

// ✅ 正確:使用 tokio::sync::Mutex
let guard = tokio_mutex.lock().await;
async_operation().await;  // OK,會讓出執行權

並發安全與資料競爭

我踩到的競爭條件

這就是我前面說的那個藏很深的 bug。一開始我的 get_next_pending() 寫得很「直覺」,結果直覺害死人:

// ❌ 有競爭條件的程式碼
pub fn get_next_pending(&self) -&gt; Result&lt;Option&lt;Job&gt;, StorageError&gt; {
    // 步驟 1:讀取待處理任務
    let job = SELECT ... WHERE status = Pending LIMIT 1;
    // ⚠️ 問題:Worker 2 可能在這裡也讀取到相同任務!
    Ok(job)
}

// Worker 處理邏輯
let job = storage.get_next_pending()?;  // 取得任務
job.mark_running();                     // 標記為執行中
storage.update(&job)?;                  // 更新資料庫

問題出在「讀取」跟「標記」是兩個分開的步驟。只要兩個 worker 在中間那個空隙同時撈到同一個任務,就會雙雙開工。用時序圖一攤開就很清楚了:

時間 Worker 1 Worker 2 資料庫狀態
T1 get_next_pending() Job A: Pending
T2 讀取 Job A
T3 get_next_pending() Job A: Pending
T4 讀取 Job A (相同!)
T5 mark_running()
T6 update(Job A) Job A: Running
T7 mark_running()
T8 update(Job A) Job A: Running
T9 處理 Job A 處理 Job A ❌ 重複處理!

解法:把它變成一個原子操作

關鍵想法很簡單:與其先讀再寫,不如讓「找出任務」跟「標記成 Running」在同一個 SQL 語句裡一口氣完成。SQLite 的 UPDATE ... RETURNING 正好能辦到:

pub fn get_next_pending(&self) -&gt; Result&lt;Option&lt;Job&gt;, StorageError&gt; {
    let conn = self.conn.lock().unwrap();
    
    // ✅ 原子性的 UPDATE + RETURNING(一個 SQL 語句完成)
    let mut stmt = conn.prepare(
        "UPDATE jobs
         SET status = ?1, updated_at = ?2
         WHERE id = (
             SELECT id FROM jobs
             WHERE status = ?3
             ORDER BY priority DESC, created_at ASC
             LIMIT 1
         )
         RETURNING id, payload, priority, status, retry_count, max_retries,
                   created_at, updated_at, error_message",
    )?;

    let now = Utc::now().to_rfc3339();
    let job = stmt
        .query_row(
            params![JobStatus::Running as i32, now, JobStatus::Pending as i32],
            |row| Ok(self.row_to_job(row)?),
        )
        .optional()?;

    Ok(job)
}

為什麼這樣就解掉了? 簡單講就是讓任務在「被交出去之前」就已經是 Running 狀態,沒有任何空隙留給第二個 worker:

  1. 原子性保證: UPDATE ... RETURNING 是單一 SQL 語句
  2. Mutex 保護: 同一時間只有一個 worker 能執行此查詢
  3. 狀態立即改變: 任務在被回傳前就已標記為 Running
  4. 資料庫層級鎖定: SQLite 的 EXCLUSIVE 鎖確保寫入安全

修正後的時序:

時間 Worker 1 Worker 2 資料庫狀態
T1 取得 Mutex
T2 UPDATE Job A → Running 等待 Mutex Job A: Running
T3 釋放 Mutex,回傳 Job A
T4 取得 Mutex
T5 UPDATE (找不到 Pending) Job A: Running
T6 回傳 None
T7 處理 Job A 等待下一輪 ✅ 無重複處理

順手把 Worker 程式碼變乾淨了

意外的好處是:既然 get_next_pending() 已經幫忙把任務標記好了,worker 那邊就不用再多此一舉,程式碼整個清爽不少:

// ✅ 修正後
match storage.get_next_pending() {
    Ok(Some(mut job)) => {
        // 任務已經是 Running 狀態,直接處理
        match handler.handle(&job.payload) {
            Ok(()) => job.mark_completed(),
            Err(e) => job.mark_failed(e),
        }
        storage.update(&job)?;
    }
    Ok(None) => {
        sleep(poll_interval).await;  // 無任務,等待
    }
    Err(e) => { /* 處理錯誤 */ }
}

日誌與除錯

tracing vs println!

何時使用 println!

// ✅ 使用者導向的輸出
println!("Job ID: {}", job.id);
println!("Status: {}", job.status);

何時使用 tracing

// ✅ 操作性日誌、除錯資訊
tracing::info!(job_id = %job.id, "Processing job");
tracing::error!("Database error: {}", e);

我的分法是這樣:兩個 CLI 工具各司其職——

  • producerprintln! 顯示命令結果,因為這是使用者主動下指令、預期會看到的輸出
  • workertracing 記錄操作日誌,方便事後監控和除錯

初始化 tracing

#[tokio::main]
async fn main() -&gt; Result&lt;(), Box&lt;dyn std::error::Error&gt;&gt; {
    // 設定日誌格式與過濾級別
    tracing_subscriber::fmt()
        .with_env_filter(
            tracing_subscriber::EnvFilter::from_default_env()
                .add_directive(tracing::Level::INFO.into()),
        )
        .init();
    
    // ...
}

使用方式:

# 預設 INFO 級別
./worker

# 設定為 DEBUG 級別
RUST_LOG=debug ./worker

資料持久化設計

SQLite Schema

CREATE TABLE jobs (
    id TEXT PRIMARY KEY,
    payload BLOB NOT NULL,
    priority INTEGER NOT NULL,
    status INTEGER NOT NULL,
    retry_count INTEGER NOT NULL,
    max_retries INTEGER NOT NULL,
    created_at TEXT NOT NULL,
    updated_at TEXT NOT NULL,
    error_message TEXT
);

CREATE INDEX idx_status_priority
ON jobs(status, priority DESC, created_at ASC);

這個索引的欄位順序是有講究的,不是隨便排的:

  • status 擺最前面:先快速過濾出待處理的任務
  • priority DESC:優先順序高的先處理
  • created_at ASC:優先順序一樣時,就先進先出 (FIFO),公平一點

狀態轉換

         ┌─────────────┐
         │   Pending   │
         └──────┬──────┘
                │
                ↓
         ┌──────────────┐
         │   Running    │
         └──────┬───────┘
                │
        ┌───────┴────────┐
        │                │
        ↓ (成功)          ↓ (失敗)
  ┌───────────┐    ┌─────────┐
  │ Completed │    │ Failed  │
  └───────────┘    └────┬────┘
                        │
                ┌───────┴────────┐
                │                │
                ↓ (可重試)        ↓ (超過最大重試)
           [Pending]        ┌──────────────┐
          (retry_count++)   │ DeadLetter   │
                            └──────────────┘

失敗處理:

pub fn mark_failed(&mut self, error: String) {
    self.status = if self.can_retry() {
        self.retry_count += 1;
        JobStatus::Pending  // 重新排程
    } else {
        JobStatus::DeadLetter  // 移至死信佇列
    };
    self.error_message = Some(error);
    self.updated_at = Utc::now();
}

CLI 設計

Producer(生產者)

# 提交任務
producer submit -p "Hello, World!" -r high -m 3

# 查詢狀態
producer status --job-id &lt;uuid&gt;

# 統計資料
producer stats

使用 Clap 實作:

#[derive(Parser)]
#[command(name = "producer")]
#[command(about = "Job queue producer - submit and manage jobs")]
struct Cli {
    #[arg(short, long, default_value = "jobs.db")]
    database: String,

    #[command(subcommand)]
    command: Commands,
}

#[derive(Subcommand)]
enum Commands {
    Submit {
        #[arg(short, long)]
        payload: String,
        
        #[arg(short = 'r', long, default_value = "normal")]
        priority: String,
        
        #[arg(short, long, default_value = "3")]
        max_retries: u32,
    },
    Status { /* ... */ },
    Stats,
}

Worker(消費者)

# 啟動 4 個 workers
worker -w 4

# 使用不同的資料庫
worker --database /path/to/jobs.db --workers 8

效能考量

輪詢 vs 事件驅動

worker 怎麼知道有新任務進來?我這版偷懶用了最簡單的輪詢:每隔一段時間就去問資料庫一次。

loop {
    match storage.get_next_pending() {
        Ok(Some(job)) => { /* 處理 */ }
        Ok(None) => sleep(poll_interval).await,  // 等待 1 秒
        Err(e) => { /* 錯誤處理 */ }
    }
}

優點:

  • ✅ 實作簡單
  • ✅ 可靠性高(無需額外機制)
  • ✅ 易於理解和維護

缺點:

  • ❌ 延遲:最多 poll_interval 的延遲
  • ❌ 資源浪費:無任務時仍在輪詢

想改善的話(事件驅動): 可以改用 tokio::sync::Notify,或是資料庫的 NOTIFY/LISTEN 機制,讓生產者主動敲門通知,而不是讓 worker 在那邊空轉:

// 生產者插入任務後通知
notifier.notify_waiters();

// Worker 等待通知
notifier.notified().await;

指數退避(Exponential Backoff)

if job.retry_count > 0 {
    let backoff = Duration::from_secs(2_u64.pow(job.retry_count.min(5)));
    sleep(backoff).await;
}

退避時間:

  • 第 1 次重試:2^1 = 2 秒
  • 第 2 次重試:2^2 = 4 秒
  • 第 3 次重試:2^3 = 8 秒
  • 第 4 次重試:2^4 = 16 秒
  • 第 5 次以上:2^5 = 32 秒(最大值)

為什麼要這樣做? 任務失敗常常是下游服務暫時掛了,這時候你還死命地每秒重試,只會雪上加霜。退一步、給對方喘口氣,反而比較快好。

我這趟學到的幾件事

下面這些算是我寫完之後的心得整理。當然啦,我不建議你盲目地照單全收,每個專案的取捨都不一樣,看著辦。

1. 模組設計

  • 使用私有模組 + 公開重匯出控制 API
  • 清晰的關注點分離

2. 型別安全

  • 考慮使用 newtype 模式增加型別安全
  • 平衡抽象與簡潔性

3. 並發處理

  • 識別臨界區(critical section)
  • 使用原子操作避免競爭條件
  • 選擇適當的同步原語(Mutex、RwLock 等)

4. 錯誤處理

  • 使用 thiserror 定義清晰的錯誤類型
  • 區分可恢復與不可恢復的錯誤

5. 日誌策略

  • CLI 輸出用 println!
  • 操作日誌用 tracing
  • 使用環境變數控制日誌級別

6. 非同步選擇

  • 評估是否真的需要 async/await
  • 混合使用同步與非同步程式碼時要小心
  • 理解 std::synctokio::sync 的差異

擴充方向

  1. 真正的非同步化:

    • 使用 sqlxtokio-rusqlite
    • 非同步的 JobHandler trait
  2. 分散式支援:

    • 使用 PostgreSQL 的 SELECT FOR UPDATE SKIP LOCKED
    • Redis 作為訊息佇列
  3. 監控與觀測:

    • Prometheus metrics
    • 分散式追蹤
  4. 進階功能:

    • 延遲任務(scheduled jobs)
    • 任務優先順序調整
    • 動態 worker 擴縮容

結論

一個看似簡單的任務佇列,寫下來居然把 Rust 的好幾個招牌特性都用上了:

  • 型別系統讓我在編譯期就攔下一堆錯誤
  • 所有權模型幫我顧好記憶體安全跟執行緒安全
  • Trait 系統提供零成本抽象,要彈性又不犧牲效能
  • 並發原語讓我能正面對決競爭條件

不過老實說,最有趣的還是那個競爭條件 bug——它提醒我,Rust 能保證你不會踩到記憶體和資料競爭的雷,但邏輯上的競爭條件,編譯器可管不著,那還是得靠自己想清楚。常聽人說 Rust「編譯過了通常就能跑」,這話大致沒錯,只是別忘了,編譯器再聰明,也讀不出你腦袋裡的並發模型呢。