解決線程並發redisson使用遇到的坑
線程並發redisson的坑
背景
因為業務上的一個購買需求,需要對庫存進行行程保護,防止超賣的出現(我們不是電商公司),經過調研,最終選擇使用Redission來進行控制。
主要因為Redission豐富的API,開源框架,已經被廣泛應用於實際生產環境。
問題描述
當我們使用Ression中Lock.lock()方法之後,如果存在線程並發常見情況下,會出現如下異常:
java.lang.IllegalMonitorStateException: attempt to unlock lock, not locked by current thread by node id: 9f178836-f7e1-44fe-a89d-2db52f399c0d thread-id: 22
問題分析
在thread-1還沒有結束的時候,也就是在thread-1在獲得鎖但是還沒有釋放鎖的時候, `thread-2由於被別的線程中斷停止瞭等待從lock.tryLock的阻塞狀態中返回繼續執行接下來的邏輯,並且由於嘗試去釋放一個屬於線程thread-1的鎖而拋出瞭一個運行時異常導致該線程thread-2結束瞭, 然而thread-2完成瞭一系列操作後,線程thread-1才釋放瞭自己的鎖.
所以thread-2並沒有獲得鎖,卻執行瞭需要同步的內容,還嘗試去釋放鎖。
那解決方式我們就知道瞭,當前線程加的鎖由當前線程去解鎖,也就是說當我們使用lock.unlock的時候加上線程的判斷即可。
問題解決
RLock lock = redissonClient.getLock(key); if(lock.isLocked()){ // 是否還是鎖定狀態 if(lock.isHeldByCurrentThread()){ // 時候是當前執行線程的鎖 lock.unlock(); // 釋放鎖 } }
redisson使用註意事項
Redisson 是一個在 Redis 的基礎上實現的 Java 駐內存數據網格,相較於暴露底層操作的Jedis,Redisson提供瞭一系列的分佈式的 Java 常用對象,還提供瞭許多分佈式服務。
特性 & 功能:
- 支持 Redis 單節點(single)模式、哨兵(sentinel)模式、主從(Master/Slave)模式以及集群(Redis Cluster)模式
- 程序接口調用方式采用異步執行和異步流執行兩種方式
- 數據序列化,Redisson 的對象編碼類是用於將對象進行序列化和反序列化,以實現對該對象在 Redis 裡的讀取和存儲
- 單個集合數據分片,在集群模式下,Redisson 為單個 Redis 集合類型提供瞭自動分片的功能
- 提供多種分佈式對象,如:Object Bucket,Bitset,AtomicLong,Bloom Filter 和 HyperLogLog 等
- 提供豐富的分佈式集合,如:Map,Multimap,Set,SortedSet,List,Deque,Queue 等
- 分佈式鎖和同步器的實現,可重入鎖(Reentrant Lock),公平鎖(Fair Lock),聯鎖(MultiLock),紅鎖(Red Lock),信號量(Semaphonre),可過期性信號鎖(PermitExpirableSemaphore)等
- 提供先進的分佈式服務,如分佈式遠程服務(Remote Service),分佈式實時對象(Live Object)服務,分佈式執行服務(Executor Service),分佈式調度任務服務(Schedule Service)和分佈式映射歸納服務(MapReduce)
- 更多特性和功能,請關註官網:http://redisson.org
實現原理
redis本身是不支持上述的分佈式對象和集合,Redisson是通過利用redis的特性在客戶端實現瞭高級數據結構和特性,例如優先隊列的實現,是通過客戶端排序整理後再存入redis。
客戶端實現,意味著當沒有任何客戶端在線時,這些所有的數據結構和特性都不會保留,也不會自動生效,例如過期事件的觸發或原來優先隊列的元素增加。
註意事項
實時性
RMap中有一個功能是可以設置鍵值對的過期時間的,並可以註冊鍵值對的事件監聽器
元素淘汰功能(Eviction)
Redisson的分佈式的RMapCache Java對象在基於RMap的前提下實現瞭針對單個元素的淘汰機制。同時仍然保留瞭元素的插入順序。由於RMapCache是基於RMap實現的,使它同時繼承瞭java.util.concurrent.ConcurrentMap接口和java.util.Map接口。Redisson提供的Spring Cache整合以及JCache正是基於這樣的功能來實現的。
目前的Redis自身並不支持散列(Hash)當中的元素淘汰,因此所有過期元素都是通過org.redisson.EvictionScheduler實例來實現定期清理的。為瞭保證資源的有效利用,每次運行最多清理300個過期元素。任務的啟動時間將根據上次實際清理數量自動調整,間隔時間趨於1秒到1小時之間。比如該次清理時刪除瞭300條元素,那麼下次執行清理的時間將在1秒以後(最小間隔時間)。一旦該次清理數量少於上次清理數量,時間間隔將增加1.5倍。
正如官方wiki所述,這個功能是通過後臺線程定時去清理的, 所以這個是非實時的(issue-1234:on expired event is not executed in real-time.),延遲在5秒到2小時之間,因此對實時性要求比較高的場景就得自己衡量瞭。
由於過期時間的非實時性,所以導致過期事件的發生也是非實時的,相應的監聽器可能會延遲瞭一會兒才收到通知,在我的測試中,ttl設置在秒級誤差是比較大的,分鐘級別的ttl倒還好(左側設置值,右側實際耗時):
1s _ 5s
3s _ 5s
4s _ 5s
5s _ 9s
6s _ 10s
10s _ 15s
1m _ 1m11s
序列化
由Redisson默認的編碼器為JsonJacksonCodec,JsonJackson在序列化有雙向引用的對象時,會出現無限循環異常。而fastjson在檢查出雙向引用後會自動用引用符$ref替換,終止循環。
在我的情況中,我序列化瞭一個service,這個service已被spring托管,而且和另一個service之間也相互註入瞭,用fastjson能 正常序列化到redis,而JsonJackson則拋出無限循環異常。
為瞭序列化後的內容可見,所以不用redission其他自帶的二進制編碼器,自行實現編碼器:
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufOutputStream; import org.redisson.client.codec.BaseCodec; import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Encoder; import java.io.IOException; public class FastjsonCodec extends BaseCodec { private final Encoder encoder = in -> { ByteBuf out = ByteBufAllocator.DEFAULT.buffer(); try { ByteBufOutputStream os = new ByteBufOutputStream(out); JSON.writeJSONString(os, in,SerializerFeature.WriteClassName); return os.buffer(); } catch (IOException e) { out.release(); throw e; } catch (Exception e) { out.release(); throw new IOException(e); } }; private final Decoder<Object> decoder = (buf, state) -> JSON.parseObject(new ByteBufInputStream(buf), Object.class); @Override public Decoder<Object> getValueDecoder() { return decoder; } @Override public Encoder getValueEncoder() { return encoder; } }
訂閱發佈
Redisson對訂閱發佈的封裝是RTopic,這也是Redisson中很多事件監聽的實現原理(例如鍵值對的事件監聽)。
使用單元測試時發現,在事件發佈後,訂閱方需要延時一下才能收到事件。具體原因待查。
以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。
推薦閱讀:
- redisson分佈式鎖的用法大全
- 基於Redission的分佈式鎖實戰
- Redisson 主從一致性問題詳解
- 使用自定義註解實現redisson分佈式鎖
- spring整合redisson開啟緩存方式