百行代碼實現基於Redis的可靠延遲隊列

在之前探討延時隊列的文章中我們提到瞭 redisson delayqueue 使用 redis 的有序集合結構實現延時隊列,遺憾的是 go 語言社區中並無類似的庫。不過問題不大,沒有輪子我們自己造😎。

本文的完整代碼實現在hdt3213/delayqueue,可以直接 go get 安裝使用。

使用有序集合結構實現延時隊列的方法已經廣為人知,無非是將消息作為有序集合的 member, 投遞時間戳作為 score 使用 zrangebyscore 命令搜索已到投遞時間的消息然後將其發給消費者。

然而消息隊列不是將消息發給消費者就萬事大吉,它們還有一個重要職責是確保送達和消費。通常的實現方式是當消費者收到消息後向消息隊列返回確認(ack),若消費者返回否定確認(nack)或超時未返回,消息隊列則會按照預定規則重新發送,直到到達最大重試次數後停止。如何實現 ack 和重試機制是我們要重點考慮的問題。

我們的消息隊列允許分佈式地部署多個生產者和消費者,消費者實例定時執行 lua 腳本驅動消息在隊列中的流轉無需部署額外組件。由於 Redis 保證瞭 lua 腳本執行的原子性,整個流程無需加鎖。

消費者采用拉模式獲得消息,保證每條消息至少投遞一次,消息隊列會重試超時或者被否定確認的消息(nack) 直至到達最大重試次數。一條消息最多有一個消費者正在處理,減少瞭要考慮的並發問題。

請註意:若消費時間超過瞭 MaxConsumeDuration 消息隊列會認為消費超時並重新投遞,此時可能有多個消費者同時消費。

具體使用也非常簡單,隻需要註冊處理消息的回調函數並調用 start() 即可:

package main

import (
	"github.com/go-redis/redis/v8"
	"github.com/hdt3213/delayqueue"
	"strconv"
	"time"
)

func main() {
	redisCli := redis.NewClient(&redis.Options{
		Addr: "127.0.0.1:6379",
	})
	queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool {
		// 註冊處理消息的回調函數
        // 返回 true 表示已成功消費,返回 false 消息隊列會重新投遞次消息
		return true
	})
	// 發送延時消息
	for i := 0; i < 10; i++ {
		err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
		if err != nil {
			panic(err)
		}
	}

	// start consume
	done := queue.StartConsume()
	<-done
}

由於數據存儲在 redis 中所以我們最多能保證在 redis 無故障且消息隊列相關 key 未被外部篡改的情況下不會丟失消息。

原理詳解

消息隊列涉及幾個關鍵的 redis 數據結構:

  • msgKey: 為瞭避免兩條內容完全相同的消息造成意外的影響,我們將每條消息放到一個字符串類型的鍵中,並分配一個 UUID 作為它的唯一標識。其它數據結構中隻存儲 UUID 而不存儲完整的消息內容。每個 msg 擁有一個獨立的 key 而不是將所有消息放到一個哈希表是為瞭利用 TTL 機制避免
  • pendingKey: 有序集合類型,member 為消息 ID, score 為投遞時間的 unix 時間戳。
  • readyKey: 列表類型,需要投遞的消息 ID。
  • unAckKey: 有序集合類型,member 為消息 ID, score 為重試時間的 unix 時間戳。
  • retryKey: 列表類型,已到重試時間的消息 ID
  • garbageKey: 集合類型,用於暫存已達重試上線的消息 ID
  • retryCountKey: 哈希表類型,鍵為消息 ID, 值為剩餘的重試次數

流程如下圖所示:

由於我們允許分佈式地部署多個消費者,每個消費者都在定時執行 lua 腳本,所以多個消費者可能處於上述流程中不同狀態,我們無法預知(或控制)上圖中五個操作發生的先後順序,也無法控制有多少實例正在執行同一個操作。

因此我們需要保證上圖中五個操作滿足三個條件:

  • 都是原子性的
  • 不會重復處理同一條消息
  • 操作前後消息隊列始終處於正確的狀態

隻要滿足這三個條件,我們就可以部署多個實例且不需要使用分佈式鎖等技術來進行狀態同步。

是不是聽起來有點嚇人?😂 其實簡單的很,讓我們一起來詳細看看吧~

pending2ReadyScript

pending2ReadyScript 使用 zrangebyscore 掃描已到投遞時間的消息ID並把它們移動到 ready 中:

-- keys: pendingKey, readyKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- 從 pending key 中找出已到投遞時間的消息
if (#msgs == 0) then return end
local args2 = {'LPush', KEYS[2]} -- 將他們放入 ready key 中
for _,v in ipairs(msgs) do
	table.insert(args2, v) 
end
redis.call(unpack(args2))
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- 從 pending key 中刪除已投遞的消息

ready2UnackScript

ready2UnackScript 從 ready 或者 retry 中取出一條消息發送給消費者並放入 unack 中,類似於 RPopLPush:

-- keys: readyKey/retryKey, unackKey
-- argv: retryTime
local msg = redis.call('RPop', KEYS[1])
if (not msg) then return end
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
return msg

unack2RetryScript

unack2RetryScript 從 retry 中找出所有已到重試時間的消息並把它們移動到 unack 中:

-- keys: unackKey, retryCountKey, retryKey, garbageKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- 找到已到重試時間的消息
if (#msgs == 0) then return end
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- 查詢剩餘重試次數
for i,v in ipairs(retryCounts) do
	local k = msgs[i]
	if tonumber(v) > 0 then -- 剩餘次數大於 0
		redis.call("HIncrBy", KEYS[2], k, -1) -- 減少剩餘重試次數
		redis.call("LPush", KEYS[3], k) -- 添加到 retry key 中
	else -- 剩餘重試次數為 0
		redis.call("HDel", KEYS[2], k) -- 刪除重試次數記錄
		redis.call("SAdd", KEYS[4], k) -- 添加到垃圾桶,等待後續刪除
	end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- 將已處理的消息從 unack key 中刪除

因為 redis 要求 lua 腳本必須在執行前在 KEYS 參數中聲明自己要訪問的 key, 而我們將每個 msg 有一個獨立的 key,我們在執行 unack2RetryScript 之前是不知道哪些 msg key 需要被刪除。所以 lua 腳本隻將需要刪除的消息記在 garbage key 中,腳本執行完後再通過 del 命令將他們刪除:

func (q *DelayQueue) garbageCollect() error {
	ctx := context.Background()
	msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result()
	if err != nil {
		return fmt.Errorf("smembers failed: %v", err)
	}
	if len(msgIds) == 0 {
		return nil
	}
	// allow concurrent clean
	msgKeys := make([]string, 0, len(msgIds))
	for _, idStr := range msgIds {
		msgKeys = append(msgKeys, q.genMsgKey(idStr))
	}
	err = q.redisCli.Del(ctx, msgKeys...).Err()
	if err != nil && err != redis.Nil {
		return fmt.Errorf("del msgs failed: %v", err)
	}
	err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err()
	if err != nil && err != redis.Nil {
		return fmt.Errorf("remove from garbage key failed: %v", err)
	}
	return nil
}

之前提到的 lua 腳本都是原子性執行的,不會有其它命令插入其中。 gc 函數由 3 條 redis 命令組成,在執行過程中可能會有其它命令插入執行過程中,不過考慮到一條消息進入垃圾回收流程之後不會復活所以不需要保證 3 條命令原子性。

ack

ack 隻需要將消息徹底刪除即可:

func (q *DelayQueue) ack(idStr string) error {
	ctx := context.Background()
	err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err()
	if err != nil {
		return fmt.Errorf("remove from unack failed: %v", err)
	}
	// msg key has ttl, ignore result of delete
	_ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
	q.redisCli.HDel(ctx, q.retryCountKey, idStr)
	return nil
}

否定確認隻需要將 unack key 中消息的重試時間改為現在,隨後執行的 unack2RetryScript 會立即將它移動到 retry key

func (q *DelayQueue) nack(idStr string) error {
	ctx := context.Background()
	// update retry time as now, unack2Retry will move it to retry immediately
	err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{
		Member: idStr,
		Score:  float64(time.Now().Unix()),
	}).Err()
	if err != nil {
		return fmt.Errorf("negative ack failed: %v", err)
	}
	return nil
}

consume

消息隊列的核心邏輯是每秒執行一次的 consume 函數,它負責調用上述腳本將消息轉移到正確的集合中並回調 consumer 來消費消息:

func (q *DelayQueue) consume() error {
	// 執行 pending2ready,將已到時間的消息轉移到 ready
	err := q.pending2Ready()
	if err != nil {
		return err
	}
	// 循環調用 ready2Unack 拉取消息進行消費
	var fetchCount uint
	for {
		idStr, err := q.ready2Unack()
		if err == redis.Nil { // consumed all
			break
		}
		if err != nil {
			return err
		}
		fetchCount++
		ack, err := q.callback(idStr)
		if err != nil {
			return err
		}
		if ack {
			err = q.ack(idStr)
		} else {
			err = q.nack(idStr)
		}
		if err != nil {
			return err
		}
		if fetchCount >= q.fetchLimit {
			break
		}
	}
	// 將 nack 或超時的消息放入重試隊列
	err = q.unack2Retry()
	if err != nil {
		return err
	}
    // 清理已達到最大重試次數的消息
	err = q.garbageCollect()
	if err != nil {
		return err
	}
	// 消費重試隊列
	fetchCount = 0
	for {
		idStr, err := q.retry2Unack()
		if err == redis.Nil { // consumed all
			break
		}
		if err != nil {
			return err
		}
		fetchCount++
		ack, err := q.callback(idStr)
		if err != nil {
			return err
		}
		if ack {
			err = q.ack(idStr)
		} else {
			err = q.nack(idStr)
		}
		if err != nil {
			return err
		}
		if fetchCount >= q.fetchLimit {
			break
		}
	}
	return nil
}

至此一個簡單可靠的延時隊列就做好瞭,何不趕緊開始試用呢😘😋?

到此這篇關於百行代碼實現基於Redis的可靠延遲隊列的文章就介紹到這瞭,更多相關百行代碼實現基於Redis的可靠延遲隊列內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: