從一個任務佇列開始
任務佇列這東西,幾乎每個後端系統都會用到。但你有沒有想過,如果自己從零刻一個出來,到底要踩過哪些坑?我這次拿 Rust 來實作一個任務佇列,本來想說應該很單純,結果一路寫下來,意外地把好幾個 Rust 的核心概念都複習了一遍,還順手挖出一個藏得很深的競爭條件 bug,實在是頗有收穫。
這篇就帶你走一遍我的思路,沿途會碰到的東西大概是這些:
- 模組化設計與封裝
- Trait 系統的應用
- 並發安全與資料競爭處理
- 非同步程式設計 (Tokio)
- SQLite 資料持久化
- CLI 工具設計
專案原始碼: https://github.com/p47t/rust-52-projects/tree/master/async-job-queue
系統架構
核心組件
整個系統其實不大,主要就三個模組分工合作:
async-job-queue/
├── src/
│ ├── job.rs # 任務定義與狀態管理
│ ├── storage.rs # SQLite 儲存層
│ ├── worker.rs # 工作池與任務處理
│ ├── lib.rs # 公開 API
│ └── bin/
│ ├── producer.rs # 生產者 CLI
│ └── worker.rs # 消費者 CLI
資料流程
Producer (CLI) → SQLite Database ← Worker Pool (多個 workers)
↓
[Pending Jobs]
↓
[Running Jobs]
↓
[Completed/Failed/Dead Letter]
Rust 核心概念解析
1. 模組可見性:mod vs pub mod
問題: 怎麼設計一個乾淨、不會洩漏內部細節的公開 API?
我在 lib.rs 裡的做法是:模組通通設成私有,再挑需要的型別重新匯出。
// 私有模組 - 隱藏內部實作細節
mod job;
mod storage;
mod worker;
// 公開重匯出 - 只暴露需要的類型
pub use job::{Job, JobHandler, JobStatus, Priority};
pub use storage::{Storage, StorageError};
pub use worker::WorkerPool;優點:
- ✅ 使用者只能透過
use async_job_queue::{Job, Storage}匯入 - ✅ 內部模組結構可以自由重構而不影響公開 API
- ✅ 更好的封裝性,隱藏實作細節
替代方案(我不太推薦):
pub mod job; // 暴露整個模組,使用者可存取所有公開項目
這樣一寫,內部結構就攤在陽光下了,之後想重構都綁手綁腳。
2. Trait:定義行為抽象
要讓使用者塞自己的任務處理邏輯進來,trait 是最自然的選擇。JobHandler 就定義了這個介面:
pub trait JobHandler: Send + Sync {
fn handle(&self, payload: &[u8]) -> Result<(), String>;
}設計考量:
Send + Sync:確保可以在執行緒間安全傳遞和共享- 接受
&[u8]而非具體類型:保持通用性,支援任何序列化格式 - 回傳
Result<(), String>:簡單的錯誤處理
使用範例:
struct EchoHandler;
impl JobHandler for EchoHandler {
fn handle(&self, payload: &[u8]) -> Result<(), String> {
let message = String::from_utf8_lossy(payload);
println!("處理任務:{}", message);
Ok(())
}
}3. Newtype 模式的應用場景
這裡我猶豫了一下:任務的內容該直接用 Vec<u8>,還是包一層 JobPayload 新型別呢?
目前實作(Vec<u8>):
pub struct Job {
pub payload: Vec<u8>,
// ...
}Newtype 模式:
pub struct JobPayload(Vec<u8>);
impl JobPayload {
pub fn new(bytes: Vec<u8>) -> Self {
Self(bytes)
}
pub fn as_bytes(&self) -> &[u8] {
&self.0
}
}
impl From<String> for JobPayload {
fn from(s: String) -> Self {
Self(s.into_bytes())
}
}何時使用 Newtype:
- ✅ 提供型別安全(避免混淆不同的
Vec<u8>) - ✅ 零成本抽象(編譯時展開,無執行時開銷)
- ✅ API 更清晰(
Job::new(JobPayload, ...)vsJob::new(Vec<u8>, ...)) - ✅ 未來擴充性(可加入驗證、壓縮等功能)
我最後的選擇:
還是留著 Vec<u8> 就好,理由是:
- 系統本來就設計成通用型,不在乎你塞進來的是什麼格式
- API 介面夠簡單,目前也不太可能混淆
- 遵循 YAGNI 原則(You Aren’t Gonna Need It)——還沒需要就先別過度設計
4. 列舉型別與 Display Trait
問題: 列舉要怎麼實作 Display,把它印成好看的字串?
手動實作(我目前的做法):
impl fmt::Display for Priority {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Priority::Low => write!(f, "low"),
Priority::Normal => write!(f, "normal"),
Priority::High => write!(f, "high"),
Priority::Critical => write!(f, "critical"),
}
}
}使用第三方 crate(strum):
use strum_macros::Display;
#[derive(Display)]
#[strum(serialize_all = "lowercase")]
pub enum Priority {
Low,
Normal,
High,
Critical,
}我的看法: 像這種簡單列舉,手動寫反而更清楚,何必為了省四行字多拉一個依賴進來呢?strum 當然很方便,但要看場合用啦。
5. 非同步程式設計:Tokio 的角色
Tokio 在專案中的使用:
#[tokio::main]- 設定非同步執行環境
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// ...
}tokio::spawn- 並發執行多個 worker
for worker_id in 0..self.num_workers {
let handle = tokio::spawn(async move {
worker_loop(worker_id, storage, handler, poll_interval).await;
});
handles.push(handle);
}tokio::time::sleep- 非阻塞延遲
sleep(poll_interval).await; // 輪詢間隔
一個有點好笑的觀察: 講了半天 Tokio,但這個專案其實是「假非同步」喔!核心操作骨子裡通通是同步的:
- SQLite 操作使用
rusqlite(同步 API) - 任務處理是同步函式
- 使用
std::sync::Mutex而非tokio::sync::Mutex
那既然是假的,為什麼還要拖 Tokio 下水?
- 開並發 worker 很方便(比
std::thread輕量) - sleep 不會阻塞(不會把整個執行緒凍住)
- 為未來真的要非同步化時鋪路
說穿了就是先佔個位子。哪天想把儲存層換成 async,骨架已經在那邊等著了。
6. std::sync::Mutex vs tokio::sync::Mutex
這兩個 Mutex 長得很像,很多人會直覺以為「寫 async 就該用 tokio 的」,其實不一定。關鍵差異在這裡:
| 特性 | std::sync::Mutex |
tokio::sync::Mutex |
|---|---|---|
| 等待鎖時 | 阻塞整個執行緒 | 讓出執行權給其他 task |
| 使用方式 | mutex.lock().unwrap() |
mutex.lock().await |
| 適用場景 | 短時間持鎖、同步操作 | 長時間持鎖、跨 await |
| 效能 | 更快(作業系統級) | 稍慢(需協調排程) |
這個專案的鎖只在快速的資料庫操作那一小段持有,中間完全沒有 await,所以 std::sync::Mutex 才是對的選擇——又快又夠用:
pub struct Storage {
conn: Arc<Mutex<Connection>>, // ✅ std::sync::Mutex
}
pub fn get_next_pending(&self) -> Result<Option<Job>, StorageError> {
let conn = self.conn.lock().unwrap();
// 執行快速的同步資料庫操作
// 沒有在持鎖期間 await
Ok(job)
}那什麼時候非得用 tokio::sync::Mutex 不可? 答案是:當你需要在持鎖期間 await 的時候。
// ❌ 錯誤:在持有 std::sync::Mutex 時 await
let guard = std_mutex.lock().unwrap();
async_operation().await; // 會阻塞整個執行緒!
// ✅ 正確:使用 tokio::sync::Mutex
let guard = tokio_mutex.lock().await;
async_operation().await; // OK,會讓出執行權
並發安全與資料競爭
我踩到的競爭條件
這就是我前面說的那個藏很深的 bug。一開始我的 get_next_pending() 寫得很「直覺」,結果直覺害死人:
// ❌ 有競爭條件的程式碼
pub fn get_next_pending(&self) -> Result<Option<Job>, StorageError> {
// 步驟 1:讀取待處理任務
let job = SELECT ... WHERE status = Pending LIMIT 1;
// ⚠️ 問題:Worker 2 可能在這裡也讀取到相同任務!
Ok(job)
}
// Worker 處理邏輯
let job = storage.get_next_pending()?; // 取得任務
job.mark_running(); // 標記為執行中
storage.update(&job)?; // 更新資料庫
問題出在「讀取」跟「標記」是兩個分開的步驟。只要兩個 worker 在中間那個空隙同時撈到同一個任務,就會雙雙開工。用時序圖一攤開就很清楚了:
| 時間 | Worker 1 | Worker 2 | 資料庫狀態 |
|---|---|---|---|
| T1 | get_next_pending() | Job A: Pending | |
| T2 | 讀取 Job A | ||
| T3 | get_next_pending() | Job A: Pending | |
| T4 | 讀取 Job A (相同!) | ||
| T5 | mark_running() | ||
| T6 | update(Job A) | Job A: Running | |
| T7 | mark_running() | ||
| T8 | update(Job A) | Job A: Running | |
| T9 | 處理 Job A | 處理 Job A | ❌ 重複處理! |
解法:把它變成一個原子操作
關鍵想法很簡單:與其先讀再寫,不如讓「找出任務」跟「標記成 Running」在同一個 SQL 語句裡一口氣完成。SQLite 的 UPDATE ... RETURNING 正好能辦到:
pub fn get_next_pending(&self) -> Result<Option<Job>, StorageError> {
let conn = self.conn.lock().unwrap();
// ✅ 原子性的 UPDATE + RETURNING(一個 SQL 語句完成)
let mut stmt = conn.prepare(
"UPDATE jobs
SET status = ?1, updated_at = ?2
WHERE id = (
SELECT id FROM jobs
WHERE status = ?3
ORDER BY priority DESC, created_at ASC
LIMIT 1
)
RETURNING id, payload, priority, status, retry_count, max_retries,
created_at, updated_at, error_message",
)?;
let now = Utc::now().to_rfc3339();
let job = stmt
.query_row(
params![JobStatus::Running as i32, now, JobStatus::Pending as i32],
|row| Ok(self.row_to_job(row)?),
)
.optional()?;
Ok(job)
}為什麼這樣就解掉了? 簡單講就是讓任務在「被交出去之前」就已經是 Running 狀態,沒有任何空隙留給第二個 worker:
- 原子性保證:
UPDATE ... RETURNING是單一 SQL 語句 - Mutex 保護: 同一時間只有一個 worker 能執行此查詢
- 狀態立即改變: 任務在被回傳前就已標記為 Running
- 資料庫層級鎖定: SQLite 的 EXCLUSIVE 鎖確保寫入安全
修正後的時序:
| 時間 | Worker 1 | Worker 2 | 資料庫狀態 |
|---|---|---|---|
| T1 | 取得 Mutex | ||
| T2 | UPDATE Job A → Running | 等待 Mutex | Job A: Running |
| T3 | 釋放 Mutex,回傳 Job A | ||
| T4 | 取得 Mutex | ||
| T5 | UPDATE (找不到 Pending) | Job A: Running | |
| T6 | 回傳 None | ||
| T7 | 處理 Job A | 等待下一輪 | ✅ 無重複處理 |
順手把 Worker 程式碼變乾淨了
意外的好處是:既然 get_next_pending() 已經幫忙把任務標記好了,worker 那邊就不用再多此一舉,程式碼整個清爽不少:
// ✅ 修正後
match storage.get_next_pending() {
Ok(Some(mut job)) => {
// 任務已經是 Running 狀態,直接處理
match handler.handle(&job.payload) {
Ok(()) => job.mark_completed(),
Err(e) => job.mark_failed(e),
}
storage.update(&job)?;
}
Ok(None) => {
sleep(poll_interval).await; // 無任務,等待
}
Err(e) => { /* 處理錯誤 */ }
}日誌與除錯
tracing vs println!
何時使用 println!:
// ✅ 使用者導向的輸出
println!("Job ID: {}", job.id);
println!("Status: {}", job.status);何時使用 tracing:
// ✅ 操作性日誌、除錯資訊
tracing::info!(job_id = %job.id, "Processing job");
tracing::error!("Database error: {}", e);我的分法是這樣:兩個 CLI 工具各司其職——
producer用println!顯示命令結果,因為這是使用者主動下指令、預期會看到的輸出worker用tracing記錄操作日誌,方便事後監控和除錯
初始化 tracing
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 設定日誌格式與過濾級別
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive(tracing::Level::INFO.into()),
)
.init();
// ...
}使用方式:
# 預設 INFO 級別
./worker
# 設定為 DEBUG 級別
RUST_LOG=debug ./worker資料持久化設計
SQLite Schema
CREATE TABLE jobs (
id TEXT PRIMARY KEY,
payload BLOB NOT NULL,
priority INTEGER NOT NULL,
status INTEGER NOT NULL,
retry_count INTEGER NOT NULL,
max_retries INTEGER NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
error_message TEXT
);
CREATE INDEX idx_status_priority
ON jobs(status, priority DESC, created_at ASC);這個索引的欄位順序是有講究的,不是隨便排的:
status擺最前面:先快速過濾出待處理的任務priority DESC:優先順序高的先處理created_at ASC:優先順序一樣時,就先進先出 (FIFO),公平一點
狀態轉換
┌─────────────┐
│ Pending │
└──────┬──────┘
│
↓
┌──────────────┐
│ Running │
└──────┬───────┘
│
┌───────┴────────┐
│ │
↓ (成功) ↓ (失敗)
┌───────────┐ ┌─────────┐
│ Completed │ │ Failed │
└───────────┘ └────┬────┘
│
┌───────┴────────┐
│ │
↓ (可重試) ↓ (超過最大重試)
[Pending] ┌──────────────┐
(retry_count++) │ DeadLetter │
└──────────────┘
失敗處理:
pub fn mark_failed(&mut self, error: String) {
self.status = if self.can_retry() {
self.retry_count += 1;
JobStatus::Pending // 重新排程
} else {
JobStatus::DeadLetter // 移至死信佇列
};
self.error_message = Some(error);
self.updated_at = Utc::now();
}CLI 設計
Producer(生產者)
# 提交任務
producer submit -p "Hello, World!" -r high -m 3
# 查詢狀態
producer status --job-id <uuid>
# 統計資料
producer stats使用 Clap 實作:
#[derive(Parser)]
#[command(name = "producer")]
#[command(about = "Job queue producer - submit and manage jobs")]
struct Cli {
#[arg(short, long, default_value = "jobs.db")]
database: String,
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand)]
enum Commands {
Submit {
#[arg(short, long)]
payload: String,
#[arg(short = 'r', long, default_value = "normal")]
priority: String,
#[arg(short, long, default_value = "3")]
max_retries: u32,
},
Status { /* ... */ },
Stats,
}Worker(消費者)
# 啟動 4 個 workers
worker -w 4
# 使用不同的資料庫
worker --database /path/to/jobs.db --workers 8效能考量
輪詢 vs 事件驅動
worker 怎麼知道有新任務進來?我這版偷懶用了最簡單的輪詢:每隔一段時間就去問資料庫一次。
loop {
match storage.get_next_pending() {
Ok(Some(job)) => { /* 處理 */ }
Ok(None) => sleep(poll_interval).await, // 等待 1 秒
Err(e) => { /* 錯誤處理 */ }
}
}優點:
- ✅ 實作簡單
- ✅ 可靠性高(無需額外機制)
- ✅ 易於理解和維護
缺點:
- ❌ 延遲:最多
poll_interval的延遲 - ❌ 資源浪費:無任務時仍在輪詢
想改善的話(事件驅動):
可以改用 tokio::sync::Notify,或是資料庫的 NOTIFY/LISTEN 機制,讓生產者主動敲門通知,而不是讓 worker 在那邊空轉:
// 生產者插入任務後通知
notifier.notify_waiters();
// Worker 等待通知
notifier.notified().await;指數退避(Exponential Backoff)
if job.retry_count > 0 {
let backoff = Duration::from_secs(2_u64.pow(job.retry_count.min(5)));
sleep(backoff).await;
}退避時間:
- 第 1 次重試:2^1 = 2 秒
- 第 2 次重試:2^2 = 4 秒
- 第 3 次重試:2^3 = 8 秒
- 第 4 次重試:2^4 = 16 秒
- 第 5 次以上:2^5 = 32 秒(最大值)
為什麼要這樣做? 任務失敗常常是下游服務暫時掛了,這時候你還死命地每秒重試,只會雪上加霜。退一步、給對方喘口氣,反而比較快好。
我這趟學到的幾件事
下面這些算是我寫完之後的心得整理。當然啦,我不建議你盲目地照單全收,每個專案的取捨都不一樣,看著辦。
1. 模組設計
- 使用私有模組 + 公開重匯出控制 API
- 清晰的關注點分離
2. 型別安全
- 考慮使用 newtype 模式增加型別安全
- 平衡抽象與簡潔性
3. 並發處理
- 識別臨界區(critical section)
- 使用原子操作避免競爭條件
- 選擇適當的同步原語(Mutex、RwLock 等)
4. 錯誤處理
- 使用
thiserror定義清晰的錯誤類型 - 區分可恢復與不可恢復的錯誤
5. 日誌策略
- CLI 輸出用
println! - 操作日誌用
tracing - 使用環境變數控制日誌級別
6. 非同步選擇
- 評估是否真的需要 async/await
- 混合使用同步與非同步程式碼時要小心
- 理解
std::sync與tokio::sync的差異
擴充方向
-
真正的非同步化:
- 使用
sqlx或tokio-rusqlite - 非同步的
JobHandlertrait
- 使用
-
分散式支援:
- 使用 PostgreSQL 的
SELECT FOR UPDATE SKIP LOCKED - Redis 作為訊息佇列
- 使用 PostgreSQL 的
-
監控與觀測:
- Prometheus metrics
- 分散式追蹤
-
進階功能:
- 延遲任務(scheduled jobs)
- 任務優先順序調整
- 動態 worker 擴縮容
結論
一個看似簡單的任務佇列,寫下來居然把 Rust 的好幾個招牌特性都用上了:
- 型別系統讓我在編譯期就攔下一堆錯誤
- 所有權模型幫我顧好記憶體安全跟執行緒安全
- Trait 系統提供零成本抽象,要彈性又不犧牲效能
- 並發原語讓我能正面對決競爭條件
不過老實說,最有趣的還是那個競爭條件 bug——它提醒我,Rust 能保證你不會踩到記憶體和資料競爭的雷,但邏輯上的競爭條件,編譯器可管不著,那還是得靠自己想清楚。常聽人說 Rust「編譯過了通常就能跑」,這話大致沒錯,只是別忘了,編譯器再聰明,也讀不出你腦袋裡的並發模型呢。