spring cache註解@Cacheable緩存穿透詳解

最近發現線上監控有個SQL調用量很大,但是方法的調用量不是很大,查看接口實現,發現接口是做瞭緩存操作的,使用Spring cache緩存註解結合tair實現緩存操作。

但是為啥SQL調用量這麼大,難道緩存沒有生效。測試發現緩存是正常的,分析瞭代碼發現,代碼存在緩存穿透的風險。

具體註解是這樣的

@Cacheable(value = "storeDeliveryCoverage", key = "#sellerId + '|' + #cityCode", unless = "#result == null")

unless = “#result == null”表明接口返回值不為空的時候才緩存,如果線上有大量不合法的請求參數過來,由於為空的不會緩存起來,每次請求都打到DB上,導致DB的sql調用量巨大,給瞭黑客可乘之機,風險還是很大的。

找到原因之後就修改,查詢結果為空的時候兜底一個null,把這句unless = “#result == null”條件去掉測試瞭一下,發現為空的話還是不會緩存。於是debug分析瞭一波源碼,終於發現原來是tair的問題。

由於tair自身的特性,無法緩存null。既然無法緩存null,那我們就兜底一個空對象進去,取出來的時候把空對象轉化為null。

基於這個思路,我們把Cache的實現改造瞭一下

@Override
    public void put(Object key, Object value) {
        if (value == null) {
            // 為空的話,兜底一個空對象,防止緩存穿透(由於tair自身特性不允許緩存null對象的原因,這裡緩存一個空對象)
            value = new Nil();
        }
        if (value instanceof Serializable) {
            final String tairKey = String.format("%s:%s", this.name, key);
            final ResultCode resultCode = this.tairManager.put(
                    this.namespace,
                    tairKey,
                    (Serializable) value,
                    0,
                    this.timeout
            );
            if (resultCode != ResultCode.SUCCESS) {
                TairSpringCache.log.error(
                        String.format(
                                "[CachePut]: unable to put %s => %s into tair due to: %s",
                                key,
                                value,
                                resultCode.getMessage()
                        )
                );
            }
        } else {
            throw new RuntimeException(
                    String.format(
                            "[CachePut]: value %s is not Serializable",
                            value
                    )
            );
        }
    }

Nil類默認是一個空對象,這裡給瞭個內部類:

static class Nil implements Serializable {
        private static final long serialVersionUID = -9138993336039047508L;
    }

取緩存的get方法實現

@Override
    public ValueWrapper get(Object key) {
        final String tairKey = String.format("%s:%s", this.name, key);
        final Result<DataEntry> result = this.tairManager.get(this.namespace, tairKey);
        if (result.isSuccess() && (result.getRc() == ResultCode.SUCCESS)) {
            final Object obj = result.getValue().getValue();
            // 緩存為空兜底的是Nil對象,這裡返回的時候需要轉為null
            if (obj instanceof Nil) {
                return null;
            }
            return () -> obj;
        }
        return null;
    }

改好瞭之後,測試一下,結果發現還是沒有生效,緩存沒有兜底,請求都打到DB上瞭。

debug走一遍,看瞭下Cache的源碼,終於發現關鍵問題所在(具體實現流程參考上一篇:Spring Cache- 緩存攔截器( CacheInterceptor)):

private Object execute(final CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) {
		// Special handling of synchronized invocation
		if (contexts.isSynchronized()) {
			CacheOperationContext context = contexts.get(CacheableOperation.class).iterator().next();
			if (isConditionPassing(context, CacheOperationExpressionEvaluator.NO_RESULT)) {
				Object key = generateKey(context, CacheOperationExpressionEvaluator.NO_RESULT);
				Cache cache = context.getCaches().iterator().next();
				try {
					return wrapCacheValue(method, cache.get(key, new Callable<Object>() {
						@Override
						public Object call() throws Exception {
							return unwrapReturnValue(invokeOperation(invoker));
						}
					}));
				}
				catch (Cache.ValueRetrievalException ex) {
					// The invoker wraps any Throwable in a ThrowableWrapper instance so we
					// can just make sure that one bubbles up the stack.
					throw (CacheOperationInvoker.ThrowableWrapper) ex.getCause();
				}
			}
			else {
				// No caching required, only call the underlying method
				return invokeOperation(invoker);
			}
		}
		// 處理beforeIntercepte=true的緩存刪除操作
		processCacheEvicts(contexts.get(CacheEvictOperation.class), true,
				CacheOperationExpressionEvaluator.NO_RESULT);
		// 從緩存中查找,是否有匹配@Cacheable的緩存數據
		Cache.ValueWrapper cacheHit = findCachedItem(contexts.get(CacheableOperation.class));
		// 如果@Cacheable沒有被緩存,那麼就需要將數據緩存起來,這裡將@Cacheable操作收集成CachePutRequest集合,以便後續做@CachePut緩存數據存放。
		List<CachePutRequest> cachePutRequests = new LinkedList<CachePutRequest>();
		if (cacheHit == null) {
			collectPutRequests(contexts.get(CacheableOperation.class),
					CacheOperationExpressionEvaluator.NO_RESULT, cachePutRequests);
		}
		Object cacheValue;
		Object returnValue;
		//如果沒有@CachePut操作,就使用@Cacheable獲取的結果(可能也沒有@Cableable,所以result可能為空)。
		if (cacheHit != null && cachePutRequests.isEmpty() && !hasCachePut(contexts)) {
			//如果沒有@CachePut操作,並且cacheHit不為空,說明命中緩存瞭,直接返回緩存結果
			cacheValue = cacheHit.get();
			returnValue = wrapCacheValue(method, cacheValue);
		}
		else {
			// 否則執行具體方法內容,返回緩存的結果
			returnValue = invokeOperation(invoker);
			cacheValue = unwrapReturnValue(returnValue);
		}
		// Collect any explicit @CachePuts
		collectPutRequests(contexts.get(CachePutOperation.class), cacheValue, cachePutRequests);
		// Process any collected put requests, either from @CachePut or a @Cacheable miss
		for (CachePutRequest cachePutRequest : cachePutRequests) {
			cachePutRequest.apply(cacheValue);
		}
		// Process any late evictions
		processCacheEvicts(contexts.get(CacheEvictOperation.class), false, cacheValue);
		return returnValue;
	}

根據key從緩存中查找,返回的結果是ValueWrapper,它是返回結果的包裝器:

private Cache.ValueWrapper findCachedItem(Collection<CacheOperationContext> contexts) {
		Object result = CacheOperationExpressionEvaluator.NO_RESULT;
		for (CacheOperationContext context : contexts) {
			if (isConditionPassing(context, result)) {
				Object key = generateKey(context, result);
				Cache.ValueWrapper cached = findInCaches(context, key);
				if (cached != null) {
					return cached;
				}
				else {
					if (logger.isTraceEnabled()) {
						logger.trace("No cache entry for key '" + key + "' in cache(s) " + context.getCacheNames());
					}
				}
			}
		}
		return null;
	}
private Cache.ValueWrapper findInCaches(CacheOperationContext context, Object key) {
		for (Cache cache : context.getCaches()) {
			Cache.ValueWrapper wrapper = doGet(cache, key);
			if (wrapper != null) {
				if (logger.isTraceEnabled()) {
					logger.trace("Cache entry for key '" + key + "' found in cache '" + cache.getName() + "'");
				}
				return wrapper;
			}
		}
		return null;
	}

這裡判斷緩存是否命中的邏輯是根據cacheHit是否為空,而cacheHit是ValueWrapper類型,查看ValueWrapper是一個接口,它的實現類是SimpleValueWrapper,這是一個包裝器,將緩存的結果包裝起來瞭。

而我們前面的get方法取緩存的時候如果為Nil對象,返回的是null,這樣緩存判斷出來是沒有命中,即cacheHit==null,就會去執行具體方法朔源。

所以到這裡已經很清晰瞭,關鍵問題是get取緩存的結果如果是兜底的Nil對象,應該返回new SimpleValueWrapper(null)。

應該返回包裝器,包裝的是緩存的對象為null。

測試瞭一下,發現ok瞭

具體源碼如下:

/**
 * 基於tair的緩存,適配spring緩存框架
 */
public class TairSpringCache implements Cache {
    private static final Logger log = LoggerFactory.getLogger(TairSpringCache.class);
    private TairManager tairManager;
    private final String name;
    private int namespace;
    private int timeout;
    public TairSpringCache(String name, TairManager tairManager, int namespace) {
        this(name, tairManager, namespace, 0);
    }
    public TairSpringCache(String name, TairManager tairManager, int namespace, int timeout) {
        this.name = name;
        this.tairManager = tairManager;
        this.namespace = namespace;
        this.timeout = timeout;
    }
    @Override
    public String getName() {
        return this.name;
    }
    @Override
    public Object getNativeCache() {
        return this.tairManager;
    }
    @Override
    public ValueWrapper get(Object key) {
        final String tairKey = String.format("%s:%s", this.name, key);
        final Result<DataEntry> result = this.tairManager.get(this.namespace, tairKey);
        if (result.isSuccess() && (result.getRc() == ResultCode.SUCCESS)) {
            final Object obj = result.getValue().getValue();
            // 緩存為空兜底的是Nil對象,這裡返回的時候需要轉為null
            if (obj instanceof Nil) {
                return () -> null;
            }
            return () -> obj;
        }
        return null;
    }
    @Override
    public <T> T get(Object key, Class<T> type) {
        return (T) this.get(key).get();
    }
    public <T> T get(Object o, Callable<T> callable) {
        return null;
    }
    @Override
    public void put(Object key, Object value) {
        if (value == null) {
            // 為空的話,兜底一個空對象,防止緩存穿透(由於tair自身特性不允許緩存null對象的原因,這裡緩存一個空對象)
            value = new Nil();
        }
        if (value instanceof Serializable) {
            final String tairKey = String.format("%s:%s", this.name, key);
            final ResultCode resultCode = this.tairManager.put(
                    this.namespace,
                    tairKey,
                    (Serializable) value,
                    0,
                    this.timeout
            );
            if (resultCode != ResultCode.SUCCESS) {
                TairSpringCache.log.error(
                        String.format(
                                "[CachePut]: unable to put %s => %s into tair due to: %s",
                                key,
                                value,
                                resultCode.getMessage()
                        )
                );
            }
        } else {
            throw new RuntimeException(
                    String.format(
                            "[CachePut]: value %s is not Serializable",
                            value
                    )
            );
        }
    }
    public ValueWrapper putIfAbsent(Object key, Object value) {
        final ValueWrapper vw = this.get(key);
        if (vw.get() == null) {
            this.put(key, value);
        }
        return vw;
    }
    @Override
    public void evict(Object key) {
        final String tairKey = String.format("%s:%s", this.name, key);
        final ResultCode resultCode = this.tairManager.delete(this.namespace, tairKey);
        if ((resultCode == ResultCode.SUCCESS)
                || (resultCode == ResultCode.DATANOTEXSITS)
                || (resultCode == ResultCode.DATAEXPIRED)) {
            return;
        }
        else {
            final String errMsg = String.format(
                    "[CacheDelete]: unable to evict key %s, resultCode: %s",
                    key,
                    resultCode
            );
            TairSpringCache.log.error(errMsg);
            throw new RuntimeException(errMsg);
        }
    }
    @Override
    public void clear() {
        //TODO fgz: implement here later
    }
    public void setTairManager(TairManager tairManager) {
        this.tairManager = tairManager;
    }
    public void setNamespace(int namespace) {
        this.namespace = namespace;
    }
    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }
    static class Nil implements Serializable {
        private static final long serialVersionUID = -9138993336039047508L;
    }
}

測試用例就不貼瞭。

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: