用Go+Redis實現分佈式鎖的示例代碼
為什麼需要分佈式鎖
用戶下單
鎖住 uid,防止重復下單。
庫存扣減
鎖住庫存,防止超賣。
餘額扣減
鎖住賬戶,防止並發操作。
分佈式系統中共享同一個資源時往往需要分佈式鎖來保證變更資源一致性。
分佈式鎖需要具備特性
排他性
鎖的基本特性,並且隻能被第一個持有者持有。
防死鎖
高並發場景下臨界資源一旦發生死鎖非常難以排查,通常可以通過設置超時時間到期自動釋放鎖來規避。
可重入
鎖持有者支持可重入,防止鎖持有者再次重入時鎖被超時釋放。
高性能高可用
鎖是代碼運行的關鍵前置節點,一旦不可用則業務直接就報故障瞭。高並發場景下,高性能高可用是基本要求。
實現 Redis 鎖應先掌握哪些知識點
set 命令
SET key value [EX seconds] [PX milliseconds] [NX|XX]
- EX second :設置鍵的過期時間為 second 秒。 SET key value EX second 效果等同於 SETEX key second value 。
- PX millisecond :設置鍵的過期時間為 millisecond 毫秒。 SET key value PX millisecond 效果等同於 PSETEX key millisecond value 。
- NX :隻在鍵不存在時,才對鍵進行設置操作。 SET key value NX 效果等同於 SETNX key value 。
- XX :隻在鍵已經存在時,才對鍵進行設置操作。
Redis.lua 腳本
使用 redis lua 腳本能將一系列命令操作封裝成 pipline 實現整體操作的原子性。
go-zero 分佈式鎖 RedisLock 源碼分析
core/stores/redis/redislock.go
加鎖流程
-- KEYS[1]: 鎖key -- ARGV[1]: 鎖value,隨機字符串 -- ARGV[2]: 過期時間 -- 判斷鎖key持有的value是否等於傳入的value -- 如果相等說明是再次獲取鎖並更新獲取時間,防止重入時過期 -- 這裡說明是“可重入鎖” if redis.call("GET", KEYS[1]) == ARGV[1] then -- 設置 redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2]) return "OK" else -- 鎖key.value不等於傳入的value則說明是第一次獲取鎖 -- SET key value NX PX timeout : 當key不存在時才設置key的值 -- 設置成功會自動返回“OK”,設置失敗返回“NULL Bulk Reply” -- 為什麼這裡要加“NX”呢,因為需要防止把別人的鎖給覆蓋瞭 return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2]) end
解鎖流程
-- 釋放鎖 -- 不可以釋放別人的鎖 if redis.call("GET", KEYS[1]) == ARGV[1] then -- 執行成功返回“1” return redis.call("DEL", KEYS[1]) else return 0 end
源碼解析
package redis import ( "math/rand" "strconv" "sync/atomic" "time" red "github.com/go-redis/redis" "github.com/tal-tech/go-zero/core/logx" ) const ( letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" lockCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2]) return "OK" else return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2]) end` delCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else return 0 end` randomLen = 16 // 默認超時時間,防止死鎖 tolerance = 500 // milliseconds millisPerSecond = 1000 ) // A RedisLock is a redis lock. type RedisLock struct { // redis客戶端 store *Redis // 超時時間 seconds uint32 // 鎖key key string // 鎖value,防止鎖被別人獲取到 id string } func init() { rand.Seed(time.Now().UnixNano()) } // NewRedisLock returns a RedisLock. func NewRedisLock(store *Redis, key string) *RedisLock { return &RedisLock{ store: store, key: key, // 獲取鎖時,鎖的值通過隨機字符串生成 // 實際上go-zero提供更加高效的隨機字符串生成方式 // 見core/stringx/random.go:Randn id: randomStr(randomLen), } } // Acquire acquires the lock. // 加鎖 func (rl *RedisLock) Acquire() (bool, error) { // 獲取過期時間 seconds := atomic.LoadUint32(&rl.seconds) // 默認鎖過期時間為500ms,防止死鎖 resp, err := rl.store.Eval(lockCommand, []string{rl.key}, []string{ rl.id, strconv.Itoa(int(seconds)*millisPerSecond + tolerance), }) if err == red.Nil { return false, nil } else if err != nil { logx.Errorf("Error on acquiring lock for %s, %s", rl.key, err.Error()) return false, err } else if resp == nil { return false, nil } reply, ok := resp.(string) if ok && reply == "OK" { return true, nil } logx.Errorf("Unknown reply when acquiring lock for %s: %v", rl.key, resp) return false, nil } // Release releases the lock. // 釋放鎖 func (rl *RedisLock) Release() (bool, error) { resp, err := rl.store.Eval(delCommand, []string{rl.key}, []string{rl.id}) if err != nil { return false, err } reply, ok := resp.(int64) if !ok { return false, nil } return reply == 1, nil } // SetExpire sets the expire. // 需要註意的是需要在Acquire()之前調用 // 不然默認為500ms自動釋放 func (rl *RedisLock) SetExpire(seconds int) { atomic.StoreUint32(&rl.seconds, uint32(seconds)) } func randomStr(n int) string { b := make([]byte, n) for i := range b { b[i] = letters[rand.Intn(len(letters))] } return string(b) }
關於分佈式鎖還有哪些實現方案
etcd
redis redlock
項目地址
https://github.com/zeromicro/go-zero
到此這篇關於用Go+Redis實現分佈式鎖的示例代碼的文章就介紹到這瞭,更多相關Go Redis分佈式鎖內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Go 語言下基於Redis分佈式鎖的實現方式
- Golang分佈式應用之Redis示例詳解
- 關於SpringBoot 使用 Redis 分佈式鎖解決並發問題
- redis分佈式鎖的8大坑總結梳理
- redis中lua腳本使用教程