詳解Redis 緩存刪除機制(源碼解析)
刪除的范圍
- 過期的 key
- 在內存滿瞭的情況下,如果繼續執行 set 等命令,且所有 key 都沒有過期,那麼會按照緩存淘汰策略選中的 key
過期刪除
redis 中設置瞭過期時間的 key 會單獨存儲一份
typedef struct redisDb { dict *dict; // 所有的鍵值對 dict *expires; //設置瞭過期時間的鍵值對 // ... } redisDb;
設置有效期
Redis 中有 4 個命令可以給 key 設置過期時間,分別是 expire pexpire expireat pexpireat
設置相對時間
expire <key> <ttl>:將 key 值的過期時間設置為 ttl 秒。
// src/expire.c /* EXPIRE key seconds */ void expireCommand(client *c) { expireGenericCommand(c,mstime(),UNIT_SECONDS); }
pexpire <key> <ttl>:將 key 值的過期時間設置為 ttl 毫秒。
// src/expire.c /* PEXPIRE key milliseconds */ void pexpireCommand(client *c) { expireGenericCommand(c,mstime(),UNIT_MILLISECONDS); }
設置絕對時間
expireat <key> <timestamp>:將 key 值的過期時間設置為指定的 timestamp 秒數。
// src/expire.c /* EXPIREAT key time */ void expireatCommand(client *c) { expireGenericCommand(c,0,UNIT_SECONDS); }
pexpireat <key> <timestamp>:將 key 值的過期時間設置為指定的 timestamp 毫秒數。
// src/expire.c /* PEXPIREAT key ms_time */ void pexpireatCommand(client *c) { expireGenericCommand(c,0,UNIT_MILLISECONDS); }
以上 4 種方法最終都會調用下面的通用函數 expireGenericCommand :
// src/expire.c void expireGenericCommand(client *c, long long basetime, int unit) { robj *key = c->argv[1], *param = c->argv[2]; // 獲取數據對象 long long when; if (getLongLongFromObjectOrReply(c, param, &when, NULL) != C_OK) return; // 將時間轉化成以 ms 為單位 if (unit == UNIT_SECONDS) when *= 1000; when += basetime; // 在 master 節點上,如果設置的過期時間小於當前時間,那麼將命令轉化成 DEL 指令 if (when <= mstime() && !server.loading && !server.masterhost) { robj *aux; int deleted = server.lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) : dbSyncDelete(c->db,key); // ... // 將刪除命令同步給 slave 和 AOF // ... } else { // 設置過期時間 setExpire(c,c->db,key,when); // ... // 構造返回值和發佈對象更新消息 // ... return; } }
設置過期時間的操作由 setExpire 執行,他將 dictEntry 的 union v 中的 s64 設為過期時間
// src/db.c void setExpire(client *c, redisDb *db, robj *key, long long when) { dictEntry *kde, *de; // 找出 db->dict 中對應的存儲對象,這裡的查詢和用 get 查詢數據是邏輯一樣,通過 hashFunc(key) & sizemask // 找到 bucket 後在鏈表中遍歷 kde = dictFind(db->dict,key->ptr); // 找出 db->expires 中對應的存儲對象,如果沒有則新建一個 de = dictAddOrFind(db->expires,dictGetKey(kde)); // dictSetSignedIntegerVal(de,when); // ... } #define dictSetSignedIntegerVal(entry, _val_) \ do { (entry)->v.s64 = _val_; } while(0)
db->expires 中存儲的 dictEntry 表示的是過期 key 和過期時間,存儲過期時間的 v 是一個 union ,可見在 redis 中不同使用場景或不同編碼下 v 的意義不同
typedef struct dictEntry { void *key; union { void *val; uint64_t u64; int64_t s64; double d; } v; struct dictEntry *next; } dictEntry;
查詢過期時間
ttl key 返回 key 剩餘過期秒數。
// src/expire.c /* TTL key */ void ttlCommand(client *c) { ttlGenericCommand(c, 0); }
pttl key 返回 key 剩餘過期的毫秒數。
// src/expire.c /* PTTL key */ void pttlCommand(client *c) { ttlGenericCommand(c, 1); }
以上 2 種查看方式最終都會調用下面的通用函數 ttlGenericCommand :
// src/expire.c /* Implements TTL and PTTL */ void ttlGenericCommand(client *c, int output_ms) { // ... // key 不存在時報錯 // ... // 獲取過期時間,如果沒有過期時間則 expire = getExpire(c->db,c->argv[1]); if (expire != -1) { ttl = expire-mstime(); if (ttl < 0) ttl = 0; } if (ttl == -1) { addReplyLongLong(c,-1); } else { // 根據指定的單位返回結果,以秒為單位時向上取整 addReplyLongLong(c,output_ms ? ttl : ((ttl+500)/1000)); } }
獲取過期時間的操作由 getExpire 執行,在 db->expires 中查詢到對象後,獲取 union v 中的成員 s64
// src/expire.c // 返回過期時間的絕對時間 long long getExpire(redisDb *db, robj *key) { dictEntry *de; // 查詢對象 if (dictSize(db->expires) == 0 || // 如果返回為 NULL 表示沒有設置過期時間,向上返回 -1 (de = dictFind(db->expires,key->ptr)) == NULL) return -1; // 獲取 v.s64 return dictGetSignedIntegerVal(de); } #define dictGetSignedIntegerVal(he) ((he)->v.s64)
過期策略
Redis 綜合使用 惰性刪除 和 定期掃描 實現
惰性刪除
每次訪問時會調用 expireIfNeeded 判斷 key 是否過期,如果過期就刪除該鍵,否則返回鍵對應的值。單獨使用這種策略可能會浪費很多內存。
// src/db.c int expireIfNeeded(redisDb *db, robj *key) { mstime_t when = getExpire(db,key); mstime_t now; // 沒有設置過期時間,直接返回 if (when < 0) return 0; // 從硬盤中加載數據時不執行過期操作 if (server.loading) return 0; // 參考 GitHub Issue #1525 // 對於 master,在執行 Lua Script 的過程中,可能會用某個 key 是否存在當作判斷條件 // 為瞭避免一個腳本中前後條件不一致,將當前時間強制設為腳本開始時間 now = server.lua_caller ? server.lua_time_start : mstime(); // 對於 slave,返回此時 key 是否已過期,但不執行後續刪除操作 if (server.masterhost != NULL) return now > when; // key 未過期 if (now <= when) return 0; // 統計過期 key 的個數 server.stat_expiredkeys++; // 向所有的 slave 和 AOF 文件寫入一條 DEL 指令 propagateExpire(db,key,server.lazyfree_lazy_expire); // 向 keyspace channel 中發佈一條 key 過期的消息 notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired",key,db->id); // 根據配置決定是同步刪除還是異步刪除(僅刪除引用,由後臺線程執行物理刪除) return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : dbSyncDelete(db,key); }
特殊處理
在 master 節點執行 Lua 腳本時
參考 GitHub Issue #1525,對於 master,在執行 Lua Script 的過程中,可能會用某個 key 是否存在當作判斷條件。為瞭避免一個腳本中前後條件不一致,將當前時間強制設為腳本開始時間。
例如多次執行如下 Lua 腳本 /tmp/myscript.lua 出現的結果可能不一致
-- /tmp/myscript.lua if redis.call("exists",KEYS[1]) == 1 then redis.call("incr","mycounter") end if redis.call("exists",KEYS[1]) == 1 then return redis.call("incr","mycounter") end
具體復現操作可以參考下面的 bash 腳本:
while [ 1 ] do redis-cli set x foo px 100 > /dev/null sleep 0.092 redis-cli --eval /tmp/myscript.lua x > /dev/null sleep 0.1 redis-cli get mycounter redis-cli -p 6380 get mycounter done
對於 slave 節點
在 slave 節點上,key 的刪除操作由 master 發來的 DEL 執行,因此這裡隻返回是否過期的結果給客戶端,而不執行刪除操作
正在從 RDB 和 AOF 讀取數據時跳過這個步驟
定期掃描
系統每隔一段時間就定期掃描一次,發現過期的鍵就進行刪除。單獨使用這種策略可能出現鍵已經過期但沒有刪除的情況
Redis 默認每 100ms 執行一次(通過 hz 參數配置,執行周期為 1s/hz)過期掃描。由於 redisDb 中設置瞭過期時間的 key 會單獨存儲,所以不會出現掃描所有 key 的情況
具體步驟由 activeExpireCycle 函數執行
activeExpireCycle、incrementallyRehash 等後臺操作都是由 databasesCron 觸發的
void activeExpireCycle(int type) { // ... // 依次遍歷各個 db for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) { int expired; redisDb *db = server.db+(current_db % server.dbnum); // 記錄下一個執行的 db,這樣如果因為超時意外退出,下次可以繼續從這個 db 開始, // 從而在所有 db 上均勻執行清除操作 current_db++; do { // ... // 跳過沒有設置過期時間的 key 等不需要執行的情況 // ... // 抽樣個數,默認為 20 if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP; // 從設置瞭過期時間的 key 中隨機抽取 20 個 while (num--) { dictEntry *de; long long ttl; // 隨機挑選 dict 中的一個 key if ((de = dictGetRandomKey(db->expires)) == NULL) break; ttl = dictGetSignedIntegerVal(de)-now; // 執行刪除,具體刪除操作和惰性刪除中類似 if (activeExpireCycleTryExpire(db,de,now)) expired++; // ... } // ... // 更新統計數據等操作 // ... // 如果每次刪除的 key 超過瞭樣本數的 25%,說明過期鍵占的比例較高,需要再重復執行依次 } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4); } // ... }
隨機抽樣由 dictGetRandomKey 執行
// src/dict.c /* Return a random entry from the hash table. Useful to * implement randomized algorithms */ dictEntry *dictGetRandomKey(dict *d) { dictEntry *he, *orighe; unsigned long h; int listlen, listele; // 沒有數據,返回為 NULL,外層函數接收到 NULL 後會中斷過期操作的執行 if (dictSize(d) == 0) return NULL; // 根據 rehashidx 參數判斷是否正在執行 rehash,如果正在執行, // 則先執行 rehash 中的一個步驟 if (dictIsRehashing(d)) _dictRehashStep(d); if (dictIsRehashing(d)) { do { // 正在執行 rehash,所以兩個 ht 中的對象都要考慮 // // 由於正在執行 rehash,所以可以肯定 ht[0] 中下標小於等於 rehashidx 的 bucket // 肯定沒有數據,所以隻從 ht[0] 中大於 rehashidx 的 bucket 和 ht[1] 中抽取 h = d->rehashidx + (random() % (d->ht[0].size + d->ht[1].size - d->rehashidx)); he = (h >= d->ht[0].size) ? d->ht[1].table[h - d->ht[0].size] : d->ht[0].table[h]; // 取到空 bucket 時重試 } while(he == NULL); } else { do { // 參考寫入 ht 時計算下標的規則 hashFunc(key) & sizemake // 這裡 random() & sizemask 是隨機取一個下標 h = random() & d->ht[0].sizemask; he = d->ht[0].table[h]; // 取到空 bucket 時重試 } while(he == NULL); } // 到這一步 he 是 ht[n] 中某個 bucket 中完整的鏈表 // 所以還要從這個鏈表中隨機取一個對象 // 遍歷計算整個鏈表的長度 listlen = 0; orighe = he; while(he) { he = he->next; listlen++; } // 隨機取鏈表中某個對象的下標 listele = random() % listlen; he = orighe; // 重新遍歷鏈表獲取指定下標的對象 while(listele--) he = he->next; return he; }
緩存淘汰
配置最大內存限制
在 redis.conf 中配置
redis server 啟動時加載配置文件和命令行參數中的 maxmemory ,存入 Server 對象的 maxmemory 字段
main 中在 redis server 啟動時執行初始化等操作,其中會執行加載配置文件的 loadServerConfig 函數
// src/server.c int main(int argc, char **argv) { // .. // 加載配置 loadServerConfig(configfile,options); // .. // 警告過小的配置 if (server.maxmemory > 0 && server.maxmemory < 1024*1024) { serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory); } }
loadServerConfig 中將配置文件、stdin、命令行參數加載到 config 字符串中,然後調用 loadServerConfigFromString
// src/config.c void loadServerConfig(char *filename, char *options) { sds config = sdsempty(); char buf[CONFIG_MAX_LINE+1]; // 加載配置文件 if (filename) { FILE *fp; // 啟動命令為 ./redis-server - 則從 stdin 中讀取,需要用 <C-D> 觸發 EOF if (filename[0] == '-' && filename[1] == '\0') { fp = stdin; } else { // 第一個參數不是 -,則嘗試打開這個參數指定的文件 if ((fp = fopen(filename,"r")) == NULL) { serverLog(LL_WARNING, "Fatal error, can't open config file '%s'", filename); exit(1); } } // 將配置文件中的每一行追加到 config 中 while(fgets(buf,CONFIG_MAX_LINE+1,fp) != NULL) config = sdscat(config,buf); if (fp != stdin) fclose(fp); } // 添加其他選項,例如 ./redis-server --port 8080 後面的參數,直接加到 config 中 if (options) { config = sdscat(config,"\n"); config = sdscat(config,options); } loadServerConfigFromString(config); sdsfree(config); }
loadServerConfigFromString 從上一步中的 config 字符串中逐行讀取配置,並寫入 server 對象
// src/config.c void loadServerConfigFromString(char *config) { // ... // 按行讀取配置文件 lines = sdssplitlen(config,strlen(config),"\n",1,&totlines); for (i = 0; i < totlines; i++) { // 跳過無效的配置和註釋 // ... argv = sdssplitargs(lines[i],&argc); // 將配置命令轉化成小寫 sdstolower(argv[0]); // 根據配置命令初始化配置,strcasecmp 比較 if (!strcasecmp(argv[0],"timeout") && argc == 2) { server.maxidletime = atoi(argv[1]); if (server.maxidletime < 0) { err = "Invalid timeout value"; goto loaderr; } // ... } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) { // memtoll 將字符串形式的配置轉化成對應的 long long 值 // 例如 1kb -> 1024 server.maxmemory = memtoll(argv[1],NULL); } } }
使用 CONFIG SET 命令配置
Redis Server 接收到客戶端的 CONFIG SET 命令後調用 configSetCommand 函數
服務端接收到命令時將命令和參數存入 Redis Server 的 argc 和 argv
argc: 4 argv: 0 1 2 3 config set maxmemory 10mb
動態配置 maxmemory 後會立即嘗試觸發內存回收,而修改其他內存相關配置(例如: maxmemory_policy)時不會觸發
if (0) { // ... } config_set_memory_field("maxmemory",server.maxmemory) { // 配置不為 0,表示之前限制過內存 if (server.maxmemory) { if (server.maxmemory < zmalloc_used_memory()) { serverLog(LL_WARNING,"WARNING: the new maxmemory value set via CONFIG SET is smaller than the current memory usage. This will result in keys eviction and/or inability to accept new write commands depending on the maxmemory-policy."); } freeMemoryIfNeeded(); } // ... }
32 位機器的內存限制
對於 64 位機器,將 maxmemory 設置為 0 表示不限制內存,但由於 32 位尋址空間最多隻有 4 GB,所以默認內存限制設為 3 GB,緩存淘汰策略設為 noeviction
// src/server.c // ... if (server.arch_bits == 32 && server.maxmemory == 0) { serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now."); server.maxmemory = 3072LL*(1024*1024); /* 3 GB */ server.maxmemory_policy = MAXMEMORY_NO_EVICTION; }
淘汰策略
淘汰策略使用 CONFIG SET maxmemory-policy 配置
默認:
- **noeviction: **內存滿瞭後對於 set 等命令直接返回錯誤
針對所有 key:
- allkeys-lru: 在所有 key 的范圍內使用 LRU 算法執行刪除,如果內存仍然不夠,則報錯
- **allkeys-lfu: **在所有 key 的范圍內使用 LRU 算法執行刪除,如果內存仍然不夠,則報錯
- **allkeys-random: **在所有 key 的范圍內隨機執行刪除,如果內存仍然不夠,則報錯
針對設置瞭過期時間的 key:
- **volatile-lru: **在設置瞭過期時間的 key 中使用 LRU 算法執行刪除,如果內存仍然不夠,則報錯
- **volatile-lfu: **在設置瞭過期時間的 key 中使用 LRU 算法執行刪除,如果內存仍然不夠,則報錯
- **volatile-random: **在設置瞭過期時間的 key 中隨機執行刪除,如果內存仍然不夠,則報錯
- **volatile-ttl: **刪除即將過期的 key,如果內存仍然不夠,則報錯
Redis 在執行淘汰之前會計算部分對象的 idle 值,使用不同淘汰策略時計算 idle 值的方法也不同, idle 值越大表示這個值越需要優先刪除。
下面主要介紹 LRU 和 LFU 中 idle 值的計算方法
LRU 淘汰策略
抽樣刪除,樣本數量通過 CONFIG SET maxmemory-samples 100 控制,對應 RedisObject 中的 maxmemory_samples 參數,抽樣數量越多和傳統的 LRU 算法越接近
優化策略
為瞭避免傳統的 LRU 算法通常使用 hashmap + 鏈表實現帶來的開銷,Redis 進行瞭如下優化:
RedisObject 結構中設置瞭一個 lru 字段,用來記錄數據的訪問時間戳,而不是每次調整對象在鏈表中的位置
typedef struct redisObject { // 對象類型 unsigned type:4; // 對象編碼 unsigned encoding:4; // LRU 算法和 LFU 算法公用 lru 這個字段 // // LRU_BITS 默認為 24,因此最大隻能存儲 194 天的時間戳, // 創建對象時會寫入這個字段,訪問對象時會更新這個字段, // 超過之後再從 0 開始計算 unsigned lru:LRU_BITS; int refcount; void *ptr; } robj;
使用抽樣數組代替鏈表,後續在候選集合中根據 lru 字段值的大小進行篩選,避免瞭鏈表帶來的開銷。候選集合中的對象用 evictionPoolEntry 表示
struct evictionPoolEntry { unsigned long long idle; // 用於淘汰排序,在不同算法中意義不同 sds key; // 鍵的名字 // ... };
計算方法
全局對象 lru_clock 記錄瞭當前的 unix 時間戳,由 serverCron 調用 updateCachedTime 默認每 100 ms 更新一次。更新頻率與 hz 參數有關, 1s/hz 即為更新間隔時間。
LRU_CLOCK_RESOLUTION 的值為 1000,因此使用 LRU_CLOCK 函數獲取 lru_clock 時,如果每秒更新頻率在 1 次以上,會使用全局變量中緩存的 lrulcock
unsigned int LRU_CLOCK(void) { unsigned int lruclock; if (1000/server.hz <= LRU_CLOCK_RESOLUTION) { atomicGet(server.lruclock,lruclock); } else { lruclock = getLRUClock(); } return lruclock; }
如果更新頻率不到每秒 1 次,則會用函數 getLRUClock 實時計算 lruclock
unsigned int getLRUClock(void) { // mstime() 獲取 unix 時間戳,單位時毫秒 // 除以 LRU_CLOCK_RESOLUTION(值為 1000),將時間戳轉化為秒 return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX; }
其中 LRU_CLOCK_MAX 表示 lru_clock 最大的可能值,這個值與 redisObject 中 lru 最大的可能值一樣,定義如下:
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1)
所以最終比較時 lru_clock 和 robj.lru 的值都在 [0, LRU_CLOCK_MAX] 的范圍內。
從邏輯上講, 當前時間戳應該永遠大於上次訪問的時間戳 ,因此正常的計算規則應該是 lru_clock-robj.lru 。
但是由於 lru_clock 和 robj.lru 是當前時間戳取模後的值,所以可能出現 lru_clock 小於 robj.lru 的情況,所以這種情況下計算規則應該改為 lru_clock+194天-robj.lru
但是對於 lru_clock 和 robj.lru 間隔超過 194 天的情況仍然無法判斷,所以更能存在刪除不準確的情況。
將上述的邏輯組合起來就是 LRU 算法下獲取 idle 值的函數:
// src/evict.c // 以秒為精度計算對象距離上一次訪問的間隔時間,然後轉化成毫秒返回 unsigned long long estimateObjectIdleTime(robj *o) { unsigned long long lruclock = LRU_CLOCK(); if (lruclock >= o->lru) { return (lruclock - o->lru) * LRU_CLOCK_RESOLUTION; } else { return (lruclock + (LRU_CLOCK_MAX - o->lru)) * LRU_CLOCK_RESOLUTION; } }
在 Redis 3.0 中,當取樣數量設為 10 時,已經和傳統的 LRU 算法效果很接近瞭
LFU 淘汰策略
LFU 算法復用 robj.lru 字段,將這個 24 bit 的字段拆分成瞭兩部分:
- ldt(last decrement time,單位:分鐘):lru 字段的前 16bit,表示數據的訪問時間戳,最多隻能存儲 45 天。
- counter 值:lru 字段的後 8bit,表示數據的訪問頻率
遞增策略
counter 能表示的最大值是 255,因此 counter 與訪問次數不能是線性關系,這裡采用的計算步驟如下:
- 隨機取 0 到 1 之間的隨機數 r
- 比較 r 與 1/((counter-LFU_INIT_VAL)*lfu_log_factor+1) 的大小,其中 LFU_INIT_VAL 是常量,默認為 5,lfu_log_factor 是可配置參數,默認為 10
- 如果 r 小則 counter 增加 1,否則 counter 不變
實現代碼如下:
uint8_t LFULogIncr(uint8_t counter) { // counter 值已經到達瞭 255,不能再增加,直接返回 if (counter == 255) return 255; double r = (double)rand()/RAND_MAX; double baseval = counter - LFU_INIT_VAL; // LFU_INIT_VAL 值為 5 if (baseval < 0) baseval = 0; double p = 1.0/(baseval*server.lfu_log_factor+1); if (r < p) counter++; return counter; }
訪問次數與 counter 值之間大概是對數關系,counter 值越大,增速越低
// https://redis.io/topics/lru-cache +--------+------------+------------+------------+------------+------------+ | factor | 100 hits | 1000 hits | 100K hits | 1M hits | 10M hits | +--------+------------+------------+------------+------------+------------+ | 0 | 104 | 255 | 255 | 255 | 255 | +--------+------------+------------+------------+------------+------------+ | 1 | 18 | 49 | 255 | 255 | 255 | +--------+------------+------------+------------+------------+------------+ | 10 | 10 | 18 | 142 | 255 | 255 | +--------+------------+------------+------------+------------+------------+ | 100 | 8 | 11 | 49 | 143 | 255 | +--------+------------+------------+------------+------------+------------+
衰減策略
除瞭訪問對象時 counter 需要增加,對於一段時間內沒有訪問的對象還要相應地減少 counter 值,遞減的速率由 lfu-decay-time 參數控制。
counter 衰減步驟如下:
取當前時間戳(單位:分鐘)的低 16 位記為 now ,計算與 ldt 的差值。這裡與 LRU 算法中計算 lru_clock 和 robj.lru 時可能出現一樣的問題,由於 ldt 最多隻能表示 45 天,所以如果距離對象上次訪問超過 45 天,則無法準確計算訪問的時間間隔
unsigned long LFUDecrAndReturn(robj *o) { // 取高 16 位 unsigned long ldt = o->lru >> 8; // 取低 8 位 unsigned long counter = o->lru & 255; // 如果 lfu_decay_time 為 0,則步修改 counter,否則將 counter 減少 LFUTimeElapsed(ldt)/lfu_decay_time unsigned long num_periods = server.lfu_decay_time ? LFUTimeElapsed(ldt) / server.lfu_decay_time : 0; if (num_periods) // 保證 counter 的最小值位 0 counter = (num_periods > counter) ? 0 : counter - num_periods; return counter; } // 計算距離上次訪問的間隔時間 unsigned long LFUTimeElapsed(unsigned long ldt) { // 取當前時間戳(單位:分鐘) unsigned long now = LFUGetTimeInMinutes(); // 計算時間差 if (now >= ldt) return now-ldt; return 65535-ldt+now; } // 獲取當前時間戳,以分鐘為單位,取低 8 位 unsigned long LFUGetTimeInMinutes(void) { return (server.unixtime/60) & 65535; }
如果 lfu_decay_time 為 0,則步修改 counter,否則將 counter 減少 LFUTimeElapsed(ldt)/lfu_decay_time
例如,在 lfu_decay_time 為 1 的情況下,如果有 N 分鐘沒有訪問這個對象,那麼 counter 值減 N
每次訪問一個對象時都會調用 updateLFU 更新 counter 的值:
void updateLFU(robj *val) { unsigned long counter = LFUDecrAndReturn(val); counter = LFULogIncr(counter); val->lru = (LFUGetTimeInMinutes()<<8) | counter; }
執行淘汰
當 Redis 需要淘汰一批數據時,會調用 evictionPoolPopulate 獲取一批待刪除對象,根據設置的淘汰范圍的不同,會決定傳遞給 evictionPoolPopulate 的 sampledict 參數是存有全部數據的 db->dict 還是隻有設置瞭過期時間的數據的 db->expires
void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) { int j, k, count; dictEntry *samples[server.maxmemory_samples]; // 隨機獲取 server.maxmemory_samples 個對象,寫入 samples 中 count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples); // 遍歷每個對象 for (j = 0; j < count; j++) { // ... // 初始化 // ... de = samples[j]; key = dictGetKey(de); // 如果獲取樣本的字典不是 db->dict(還可能是 db->expires),並且不是按 volatile-ttl 淘汰 // 那麼還要將對象轉化成數據字典中對應的對象,然後取其值 if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) { if (sampledict != keydict) de = dictFind(keydict, key); // #define dictGetVal(he) ((he)->v.val) // 這裡還是利用 union 的特性,如果是 db->dict 中的元素,返回的是鍵的值 // 如果是 db->expires 中的元素,返回的是過期時間 o = dictGetVal(de); } // 按各算法計算 idle 分值,idle 越大的越應該被先淘汰 // // 如果使用 LRU 淘汰算法,則計算對象的空閑時間 if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) { idle = estimateObjectIdleTime(o); // 使用 LFU 淘汰算法, } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { idle = 255-LFUDecrAndReturn(o); // 使用 volatile-ttl 算法,用 ULLONG_MAX 減去過期時間作為分值 } else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { idle = ULLONG_MAX - (long)dictGetVal(de); } else { serverPanic("Unknown eviction policy in evictionPoolPopulate()"); } k = 0; // 與原 pool 中的 idle 值進行比較,找出應該比當前對象先淘汰出去的對象 while (k < EVPOOL_SIZE && pool[k].key && pool[k].idle < idle) k++; if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) { // 沒有發現更需要被淘汰的對象,並且 pool 中也沒有多餘的位置 // 那麼當前對象仍然留在 samples 中 continue; } else if (k < EVPOOL_SIZE && pool[k].key == NULL) { // 沒有發現更需要被淘汰的對象,但 pool 中有多餘的位置 // 於是將這個對象插入 pool 中 } else { // 當前對象 // | // V // Pool: [ 0 1 2 3 ...k-1 k ... EVPOOL_SIZE-1] // 為瞭保證 pool 中的數據按 idle 從小到大排列,這裡將當前對象插入第 k 個對象後面的位置 if (pool[EVPOOL_SIZE-1].key == NULL) { // pool 的右邊還有空餘的位置,因此將從第 k 個開始後面的元素整體後移 memmove(pool+k+1,pool+k, sizeof(pool[0])*(EVPOOL_SIZE-k-1)); } else { // pool 的右邊沒有空餘的位置瞭,那麼將 pool 中前 k 個元素整體左移 sds cached = pool[0].cached; memmove(pool,pool+1,sizeof(pool[0])*k); } } // ... // 將當前對象的屬性賦值到下標為 k 的元素 // ... } }
完成上述操作後,pool 中剩下的就是新篩選出來的最需要淘汰的對象瞭。
在 freeMemoryIfNeeded 中會調用 evictionPoolPopulate 來篩選需要淘汰的對象,每次刪除一個,直到釋放瞭足夠的內存。如果始終無法達到內存需求,則會報錯。
到此這篇關於Redis 緩存刪除機制(源碼解析)的文章就介紹到這瞭,更多相關Redis 緩存刪除內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Redis 的內存淘汰策略和過期刪除策略的區別
- redis 過期策略及內存回收機制解析
- 緩存替換策略及應用(以Redis、InnoDB為例)
- redis 限制內存使用大小的實現
- Redis過期數據是否會被立馬刪除