spring cache註解@Cacheable緩存穿透詳解
最近發現線上監控有個SQL調用量很大,但是方法的調用量不是很大,查看接口實現,發現接口是做瞭緩存操作的,使用Spring cache緩存註解結合tair實現緩存操作。
但是為啥SQL調用量這麼大,難道緩存沒有生效。測試發現緩存是正常的,分析瞭代碼發現,代碼存在緩存穿透的風險。
具體註解是這樣的
@Cacheable(value = "storeDeliveryCoverage", key = "#sellerId + '|' + #cityCode", unless = "#result == null")
unless = “#result == null”表明接口返回值不為空的時候才緩存,如果線上有大量不合法的請求參數過來,由於為空的不會緩存起來,每次請求都打到DB上,導致DB的sql調用量巨大,給瞭黑客可乘之機,風險還是很大的。
找到原因之後就修改,查詢結果為空的時候兜底一個null,把這句unless = “#result == null”條件去掉測試瞭一下,發現為空的話還是不會緩存。於是debug分析瞭一波源碼,終於發現原來是tair的問題。
由於tair自身的特性,無法緩存null。既然無法緩存null,那我們就兜底一個空對象進去,取出來的時候把空對象轉化為null。
基於這個思路,我們把Cache的實現改造瞭一下
@Override public void put(Object key, Object value) { if (value == null) { // 為空的話,兜底一個空對象,防止緩存穿透(由於tair自身特性不允許緩存null對象的原因,這裡緩存一個空對象) value = new Nil(); } if (value instanceof Serializable) { final String tairKey = String.format("%s:%s", this.name, key); final ResultCode resultCode = this.tairManager.put( this.namespace, tairKey, (Serializable) value, 0, this.timeout ); if (resultCode != ResultCode.SUCCESS) { TairSpringCache.log.error( String.format( "[CachePut]: unable to put %s => %s into tair due to: %s", key, value, resultCode.getMessage() ) ); } } else { throw new RuntimeException( String.format( "[CachePut]: value %s is not Serializable", value ) ); } }
Nil類默認是一個空對象,這裡給瞭個內部類:
static class Nil implements Serializable { private static final long serialVersionUID = -9138993336039047508L; }
取緩存的get方法實現
@Override public ValueWrapper get(Object key) { final String tairKey = String.format("%s:%s", this.name, key); final Result<DataEntry> result = this.tairManager.get(this.namespace, tairKey); if (result.isSuccess() && (result.getRc() == ResultCode.SUCCESS)) { final Object obj = result.getValue().getValue(); // 緩存為空兜底的是Nil對象,這裡返回的時候需要轉為null if (obj instanceof Nil) { return null; } return () -> obj; } return null; }
改好瞭之後,測試一下,結果發現還是沒有生效,緩存沒有兜底,請求都打到DB上瞭。
debug走一遍,看瞭下Cache的源碼,終於發現關鍵問題所在(具體實現流程參考上一篇:Spring Cache- 緩存攔截器( CacheInterceptor)):
private Object execute(final CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) { // Special handling of synchronized invocation if (contexts.isSynchronized()) { CacheOperationContext context = contexts.get(CacheableOperation.class).iterator().next(); if (isConditionPassing(context, CacheOperationExpressionEvaluator.NO_RESULT)) { Object key = generateKey(context, CacheOperationExpressionEvaluator.NO_RESULT); Cache cache = context.getCaches().iterator().next(); try { return wrapCacheValue(method, cache.get(key, new Callable<Object>() { @Override public Object call() throws Exception { return unwrapReturnValue(invokeOperation(invoker)); } })); } catch (Cache.ValueRetrievalException ex) { // The invoker wraps any Throwable in a ThrowableWrapper instance so we // can just make sure that one bubbles up the stack. throw (CacheOperationInvoker.ThrowableWrapper) ex.getCause(); } } else { // No caching required, only call the underlying method return invokeOperation(invoker); } } // 處理beforeIntercepte=true的緩存刪除操作 processCacheEvicts(contexts.get(CacheEvictOperation.class), true, CacheOperationExpressionEvaluator.NO_RESULT); // 從緩存中查找,是否有匹配@Cacheable的緩存數據 Cache.ValueWrapper cacheHit = findCachedItem(contexts.get(CacheableOperation.class)); // 如果@Cacheable沒有被緩存,那麼就需要將數據緩存起來,這裡將@Cacheable操作收集成CachePutRequest集合,以便後續做@CachePut緩存數據存放。 List<CachePutRequest> cachePutRequests = new LinkedList<CachePutRequest>(); if (cacheHit == null) { collectPutRequests(contexts.get(CacheableOperation.class), CacheOperationExpressionEvaluator.NO_RESULT, cachePutRequests); } Object cacheValue; Object returnValue; //如果沒有@CachePut操作,就使用@Cacheable獲取的結果(可能也沒有@Cableable,所以result可能為空)。 if (cacheHit != null && cachePutRequests.isEmpty() && !hasCachePut(contexts)) { //如果沒有@CachePut操作,並且cacheHit不為空,說明命中緩存瞭,直接返回緩存結果 cacheValue = cacheHit.get(); returnValue = wrapCacheValue(method, cacheValue); } else { // 否則執行具體方法內容,返回緩存的結果 returnValue = invokeOperation(invoker); cacheValue = unwrapReturnValue(returnValue); } // Collect any explicit @CachePuts collectPutRequests(contexts.get(CachePutOperation.class), cacheValue, cachePutRequests); // Process any collected put requests, either from @CachePut or a @Cacheable miss for (CachePutRequest cachePutRequest : cachePutRequests) { cachePutRequest.apply(cacheValue); } // Process any late evictions processCacheEvicts(contexts.get(CacheEvictOperation.class), false, cacheValue); return returnValue; }
根據key從緩存中查找,返回的結果是ValueWrapper,它是返回結果的包裝器:
private Cache.ValueWrapper findCachedItem(Collection<CacheOperationContext> contexts) { Object result = CacheOperationExpressionEvaluator.NO_RESULT; for (CacheOperationContext context : contexts) { if (isConditionPassing(context, result)) { Object key = generateKey(context, result); Cache.ValueWrapper cached = findInCaches(context, key); if (cached != null) { return cached; } else { if (logger.isTraceEnabled()) { logger.trace("No cache entry for key '" + key + "' in cache(s) " + context.getCacheNames()); } } } } return null; }
private Cache.ValueWrapper findInCaches(CacheOperationContext context, Object key) { for (Cache cache : context.getCaches()) { Cache.ValueWrapper wrapper = doGet(cache, key); if (wrapper != null) { if (logger.isTraceEnabled()) { logger.trace("Cache entry for key '" + key + "' found in cache '" + cache.getName() + "'"); } return wrapper; } } return null; }
這裡判斷緩存是否命中的邏輯是根據cacheHit是否為空,而cacheHit是ValueWrapper類型,查看ValueWrapper是一個接口,它的實現類是SimpleValueWrapper,這是一個包裝器,將緩存的結果包裝起來瞭。
而我們前面的get方法取緩存的時候如果為Nil對象,返回的是null,這樣緩存判斷出來是沒有命中,即cacheHit==null,就會去執行具體方法朔源。
所以到這裡已經很清晰瞭,關鍵問題是get取緩存的結果如果是兜底的Nil對象,應該返回new SimpleValueWrapper(null)。
應該返回包裝器,包裝的是緩存的對象為null。
測試瞭一下,發現ok瞭
具體源碼如下:
/** * 基於tair的緩存,適配spring緩存框架 */ public class TairSpringCache implements Cache { private static final Logger log = LoggerFactory.getLogger(TairSpringCache.class); private TairManager tairManager; private final String name; private int namespace; private int timeout; public TairSpringCache(String name, TairManager tairManager, int namespace) { this(name, tairManager, namespace, 0); } public TairSpringCache(String name, TairManager tairManager, int namespace, int timeout) { this.name = name; this.tairManager = tairManager; this.namespace = namespace; this.timeout = timeout; } @Override public String getName() { return this.name; } @Override public Object getNativeCache() { return this.tairManager; } @Override public ValueWrapper get(Object key) { final String tairKey = String.format("%s:%s", this.name, key); final Result<DataEntry> result = this.tairManager.get(this.namespace, tairKey); if (result.isSuccess() && (result.getRc() == ResultCode.SUCCESS)) { final Object obj = result.getValue().getValue(); // 緩存為空兜底的是Nil對象,這裡返回的時候需要轉為null if (obj instanceof Nil) { return () -> null; } return () -> obj; } return null; } @Override public <T> T get(Object key, Class<T> type) { return (T) this.get(key).get(); } public <T> T get(Object o, Callable<T> callable) { return null; } @Override public void put(Object key, Object value) { if (value == null) { // 為空的話,兜底一個空對象,防止緩存穿透(由於tair自身特性不允許緩存null對象的原因,這裡緩存一個空對象) value = new Nil(); } if (value instanceof Serializable) { final String tairKey = String.format("%s:%s", this.name, key); final ResultCode resultCode = this.tairManager.put( this.namespace, tairKey, (Serializable) value, 0, this.timeout ); if (resultCode != ResultCode.SUCCESS) { TairSpringCache.log.error( String.format( "[CachePut]: unable to put %s => %s into tair due to: %s", key, value, resultCode.getMessage() ) ); } } else { throw new RuntimeException( String.format( "[CachePut]: value %s is not Serializable", value ) ); } } public ValueWrapper putIfAbsent(Object key, Object value) { final ValueWrapper vw = this.get(key); if (vw.get() == null) { this.put(key, value); } return vw; } @Override public void evict(Object key) { final String tairKey = String.format("%s:%s", this.name, key); final ResultCode resultCode = this.tairManager.delete(this.namespace, tairKey); if ((resultCode == ResultCode.SUCCESS) || (resultCode == ResultCode.DATANOTEXSITS) || (resultCode == ResultCode.DATAEXPIRED)) { return; } else { final String errMsg = String.format( "[CacheDelete]: unable to evict key %s, resultCode: %s", key, resultCode ); TairSpringCache.log.error(errMsg); throw new RuntimeException(errMsg); } } @Override public void clear() { //TODO fgz: implement here later } public void setTairManager(TairManager tairManager) { this.tairManager = tairManager; } public void setNamespace(int namespace) { this.namespace = namespace; } public void setTimeout(int timeout) { this.timeout = timeout; } static class Nil implements Serializable { private static final long serialVersionUID = -9138993336039047508L; } }
測試用例就不貼瞭。
以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。
推薦閱讀:
- Spring @Cacheable指定失效時間實例
- SpringBoot實現統一封裝返回前端結果集的示例代碼
- 關於Spring Cache 緩存攔截器( CacheInterceptor)
- SpringBoot 統一公共返回類的實現
- 基於spring @Cacheable 註解的spel表達式解析執行邏輯