Spring事件監聽器之@EventListener原理分析
Spring事件監聽器之@EventListener原理
Spring為我們提供的一個事件監聽、訂閱的實現,內部實現原理是觀察者設計模式;為的就是業務系統邏輯的解耦,提高可擴展性以及可維護性。事件發佈者並不需要考慮誰去監聽,監聽具體的實現內容是什麼,發佈者的工作隻是為瞭發佈事件而已。
在spring中我們可以通過實現ApplicationListener接口或者@EventListener接口來實現事件驅動編程
比如我們做一個電商系統,用戶下單支付成功後,我們一般要發短信或者郵箱給用戶提示什麼的,這時候就可以把這個通知業務做成一個單獨事件監聽,等待通知就可以瞭;把它解耦處理。
public class OrderEvent extends ApplicationEvent { public OrderEvent(Object source) { super(source); } } @Component public class OrderEventListener { @EventListener public void listener(OrderEvent event) { System.out.println("i do OrderEventListener" ); } } @Controller @RequestMapping("person") public class PersonController implements ApplicationContextAware { private ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @ResponseBody @GetMapping("publishOrderEvent") public String publishOrderEvent() { applicationContext.publishEvent(new OrderEvent("我發佈瞭事件!!!")); System.out.println(" publishOrderEvent "); return "發送事件瞭!"; } }
EventListenerMethodProcessor是@EventListener的解析類,他是一個SmartInitializingSingleton和BeanFactoryPostProcessor
一、解析@EventListener前的準備工作
1.1 EventListenerFactory和EventListenerMethodProcessor的註入
EventListenerFactory是把@EventListener標註的方法變成ApplicationListener的關鍵,其是在容器最初期(refresh方法發生前)就放到容器中去
public static Set<BeanDefinitionHolder> registerAnnotationConfigProcessors( BeanDefinitionRegistry registry, Object source) { //獲取對象 DefaultListableBeanFactory beanFactory = unwrapDefaultListableBeanFactory(registry); //org.springframework.context.event.internalEventListenerProcessor //@EventListener註解處理器 if (!registry.containsBeanDefinition(EVENT_LISTENER_PROCESSOR_BEAN_NAME)) { RootBeanDefinition def = new RootBeanDefinition(EventListenerMethodProcessor.class); def.setSource(source); beanDefs.add(registerPostProcessor(registry, def, EVENT_LISTENER_PROCESSOR_BEAN_NAME)); } //org.springframework.context.event.internalEventListenerProcessor //內部管理的EventListenerFactory的bean名稱 if (!registry.containsBeanDefinition(EVENT_LISTENER_FACTORY_BEAN_NAME)) { RootBeanDefinition def = new RootBeanDefinition(DefaultEventListenerFactory.class); def.setSource(source); beanDefs.add(registerPostProcessor(registry, def, EVENT_LISTENER_FACTORY_BEAN_NAME)); } return beanDefs; }
- 如果容器中沒有名字是org.springframework.context.event.internalEventListenerProcessor的bean,那麼就註入一個EventListenerMethodProcessor到容器中
- 如果容器中沒有名字是org.springframework.context.event.internalEventListenerProcessor的bean,那麼就註入一個DefaultEventListenerFactory到容器中
1.2 EventListenerMethodProcessor和EventListenerFactory關系的建立
EventListenerMethodProcessor會在容器啟動時被註入到容器中,他是一個BeanFactoryPostProcessor,EventListenerMethodProcessor和EventListenerFactory關系的建立就發生在其方法postProcessBeanFactory中
public class EventListenerMethodProcessor implements SmartInitializingSingleton, ApplicationContextAware, BeanFactoryPostProcessor { @Nullable private List<EventListenerFactory> eventListenerFactories; //初始化eventListenerFactories @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) { this.beanFactory = beanFactory; //獲取容器中所有的EventListenerFactory,並把他們實例化 Map<String, EventListenerFactory> beans = beanFactory.getBeansOfType(EventListenerFactory.class, false, false); List<EventListenerFactory> factories = new ArrayList<>(beans.values()); AnnotationAwareOrderComparator.sort(factories); //將EventListenerFactory儲存到緩存eventListenerFactories中,便於後來使用 this.eventListenerFactories = factories; } }
EventListenerFactory的實例化時機隻比BeanFactoryPostProcessor完點,他比BeanPostProcessor實例化時機早
二、開始解析@EventListener
EventListenerMethodProcessor是一個SmartInitializingSingleton,所以他會在所以bean實例化後,執行其afterSingletonsInstantiated方法
註意:隻有單例的SmartInitializingSingleton,才會執行其afterSingletonsInstantiated方法
2.1 基本流程
public class EventListenerMethodProcessor implements SmartInitializingSingleton, ApplicationContextAware, BeanFactoryPostProcessor { @Override public void afterSingletonsInstantiated() { ConfigurableListableBeanFactory beanFactory = this.beanFactory; Assert.state(this.beanFactory != null, "No ConfigurableListableBeanFactory set"); // 這裡厲害瞭,用Object.class 是拿出容器裡面所有的Bean定義~~~ 一個一個的檢查 String[] beanNames = beanFactory.getBeanNamesForType(Object.class); for (String beanName : beanNames) { // if (!ScopedProxyUtils.isScopedTarget(beanName)) { Class<?> type = null; try { // 防止是代理,吧真實的類型拿出來 type = AutoProxyUtils.determineTargetClass(beanFactory, beanName); } catch (Throwable ex) { if (logger.isDebugEnabled()) { logger.debug("", ex); } } if (type != null) { // 對專門的作用域對象進行兼容~~~~(絕大部分都用不著) if (ScopedObject.class.isAssignableFrom(type)) { try { Class<?> targetClass = AutoProxyUtils.determineTargetClass( beanFactory, ScopedProxyUtils.getTargetBeanName(beanName)); if (targetClass != null) { type = targetClass; } } catch (Throwable ex) { // An invalid scoped proxy arrangement - let's ignore it. if (logger.isDebugEnabled()) { logger.debug("Could not resolve target bean for scoped proxy '" + beanName + "'", ex); } } } try { // 真正處理這個Bean裡面的方法們。。。 processBean(beanName, type); } catch (Throwable ex) { throw new BeanInitializationException("", ex); } } } } } private void processBean(final String beanName, final Class<?> targetType) { //類上有@Component註解 if (!this.nonAnnotatedClasses.contains(targetType) &&!targetType.getName().startsWith("java") &&!isSpringContainerClass(targetType)) { Map<Method, EventListener> annotatedMethods = null; try { //獲取類中用@EventListener標註方法的信息 annotatedMethods = MethodIntrospector.selectMethods(targetType, (MethodIntrospector.MetadataLookup<EventListener>) method -> AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class)); } catch (Throwable ex) { // An unresolvable type in a method signature, probably from a lazy bean - let's ignore it. if (logger.isDebugEnabled()) { logger.debug("Could not resolve methods for bean with name '" + beanName + "'", ex); } } //如果annotatedMethods為空,那代表類中沒有用@EventListener標註的方法 if (CollectionUtils.isEmpty(annotatedMethods)) { this.nonAnnotatedClasses.add(targetType); if (logger.isTraceEnabled()) { logger.trace("" + targetType.getName()); } } else { // 類中存在用@EventListener標註的方法 ConfigurableApplicationContext context = this.applicationContext; Assert.state(context != null, "No ApplicationContext set"); //獲取容器中所有EventListenerFactory List<EventListenerFactory> factories = this.eventListenerFactories; Assert.state(factories != null, "EventListenerFactory List not initialized"); for (Method method : annotatedMethods.keySet()) { for (EventListenerFactory factory : factories) { if (factory.supportsMethod(method)) { // 簡單的說,就是把這個方法弄成一個可以執行的方法(主要和訪問權限有關) // 這裡註意:若你是JDK的代理類,請不要在實現類裡書寫@EventListener註解的監聽器,否則會報錯的。(CGLIB代理的木關系) Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName)); //利用EventListenerFactory創建ApplicationListener,詳情後面說 ApplicationListener<?> applicationListener = factory.createApplicationListener(beanName, targetType, methodToUse); //如果ApplicationListener是ApplicationListenerMethodAdapter類,那麼執行其init方法 if (applicationListener instanceof ApplicationListenerMethodAdapter) { ((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator); } //放到容器中 context.addApplicationListener(applicationListener); //@EventListener方法隻能解析一次 break; } } } if (logger.isDebugEnabled()) { logger.debug(); } } } } }
獲取容器中所有的類,把用@Component標註的類上所有的@EventListener方法用EventListenerFactory解析成一個ApplicationListener
@EventListener方法隻要有到一個可以解析他的EventListenerFactory,就不會讓其他EventListenerFactory解析他瞭 所以如果容器中存在多個EventListenerFactory,我要註意他的順序
2.2 EventListenerFactory解析@EventListener
public interface EventListenerFactory { //是否支持當前方法 boolean supportsMethod(Method method); //生成一個ApplicationListener ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method); }
EventListenerFactory有2個字類DefaultEventListenerFactory和TransactionalEventListenerFactory,DefaultEventListenerFactory是處理@EventListener,而TransactionalEventListenerFactory是處理@TransactionalEventListener的,Spring默認就有DefaultEventListenerFactory,而TransactionalEventListenerFactory是沒有的,所以我們想要支持@TransactionalEventListener,就要註冊一個TransactionalEventListenerFactory,也就是要說要使用@EnableTransactionManagement註解
public class DefaultEventListenerFactory implements EventListenerFactory, Ordered { private int order = LOWEST_PRECEDENCE; @Override public boolean supportsMethod(Method method) { return true; } @Override public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) { return new ApplicationListenerMethodAdapter(beanName, type, method); } }
ApplicationListenerMethodAdapter一個ApplicationListener,他是用來包裝@EventListener標註的方法
public class ApplicationListenerMethodAdapter implements GenericApplicationListener { private final String beanName; //@EventListener方法所屬bean的名字 private final Method method;//@EventListener標註的方法 private final Method targetMethod;//@EventListener標註的真實方法對象,防止其是代理方法 //方法申明,如public void demo.Ball.applicationContextEvent(demo.OrderEvent) private final AnnotatedElementKey methodKey; private final List<ResolvableType> declaredEventTypes;//存儲方法的參數 private final String condition;//@EventListener的condition private final int order; private ApplicationContext applicationContext; private EventExpressionEvaluator evaluator;//@EventListener的EventExpressionEvaluator public ApplicationListenerMethodAdapter(String beanName, Class<?> targetClass, Method method) { this.beanName = beanName; this.method = BridgeMethodResolver.findBridgedMethod(method); this.targetMethod = (!Proxy.isProxyClass(targetClass) ?AopUtils.getMostSpecificMethod(method, targetClass) : this.method); this.methodKey = new AnnotatedElementKey(this.targetMethod, targetClass); //獲取方法上的@EventListener註解對象 EventListener ann = AnnotatedElementUtils.findMergedAnnotation(this.targetMethod, EventListener.class); this.declaredEventTypes = resolveDeclaredEventTypes(method, ann); this.condition = (ann != null ? ann.condition() : null); this.order = resolveOrder(this.targetMethod); } public void onApplicationEvent(ApplicationEvent event) { processEvent(event); } public void processEvent(ApplicationEvent event) { Object[] args = resolveArguments(event); //根據@EventListener的condition,判斷是否要處理 if (shouldHandle(event, args)) { //調用方法 Object result = doInvoke(args); if (result != null) { //如果有監聽器可以監聽這個結果,那麼可以觸發那個監聽器 handleResult(result); } else { logger.trace("No result object given - no result to handle"); } } } }
EventListener.Factory
EventListener.Factory監聽網絡請求全過程
網上介紹的並不多,關於它的使用方式,可能會存在很多坑。
主要是為瞭監聽網絡請求過程。
首先OkHttpClient.Builder.eventListenerFactory需要的是一個實現瞭EventListener接口的工廠類。
簡單的實現方式。
public class HttpEventListener extends EventListener { private final long callId; final AtomicLong nextCallId = new AtomicLong(1L); @Override public EventListener create(Call call) { long callId = nextCallId.getAndIncrement(); return new HttpEventListener(callId, System.nanoTime()); } public HttpEventListener(long callId, long callStartNanos) { this.callId = callId; this.callStartNanos = callStartNanos; } private long dnsStartTime; private long dnsParseTime; @Override public void dnsStart(Call call, String domainName) { super.dnsStart(call, domainName); dnsStartTime = System.nanoTime(); } @Override public void dnsEnd(Call call, String domainName, List<InetAddress> inetAddressList) { super.dnsEnd(call, domainName, inetAddressList); dnsParseTime = System.nanoTime() - dnsStartTime;//dns解析耗時 } //自動補全剩餘實現方法 }
EventListener.create方法在okHttpClient.newCall後執行
dnsParseTime可以算出dns解析耗時,還可以監聽每次dns解析的domain,解析的結果inetAddressList。
這個是比較好用的。
問題是如何將這些數據回傳回來呢
在OkHttpClient構造時傳入自定義參數
OkHttpClient.Builder builder = new OkHttpClient.Builder(); final ResponseTag tag = new ResponseTag(); tag.logHandler = logHandler; httpClient.newCall(requestBuilder.tag(tag).build()).enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { } @Override public void onResponse(Call call, Response response) throws IOException { } }); //自動補全剩餘實現方法 public class HttpEventListener extends EventListener { /** * 每次請求的標識 */ private long callId = 1L; /** * 每次請求的開始時間,單位納秒 */ private final long callStartNanos; private long total_elapsed_time; private long dns_elapsed_time; private long connect_elapsed_time; private long tls_connect_elapsed_time; private long request_elapsed_time; private long wait_elapsed_time; private long response_elapsed_time; private Client.ResponseTag responseTag; private LogHandler logHandler; private long start_dns_elapsed_time; private long start_total_elapsed_time; private long start_connect_elapsed_time; private long start_tls_connect_elapsed_time; private long start_request_elapsed_time; private long start_response_elapsed_time; public HttpEventListener(long callId, Client.ResponseTag responseTag, long callStartNanos) { this.callId = callId; this.callStartNanos = callStartNanos; this.responseTag = responseTag; this.logHandler = responseTag.logHandler; } public static final Factory FACTORY = new Factory() { final AtomicLong nextCallId = new AtomicLong(1L); @Override public EventListener create(@NotNull Call call) { long callId = nextCallId.getAndIncrement(); return new HttpEventListener(callId, (Client.ResponseTag) call.request().tag(), System.nanoTime()); } }; @Override public void callStart(Call call) { super.callStart(call); start_total_elapsed_time = System.currentTimeMillis(); } @Override public void dnsStart(Call call, String domainName) { super.dnsStart(call, domainName); start_dns_elapsed_time = System.currentTimeMillis(); } @Override public void dnsEnd(Call call, String domainName, List<InetAddress> inetAddressList) { super.dnsEnd(call, domainName, inetAddressList); dns_elapsed_time = System.currentTimeMillis() - start_dns_elapsed_time;//dns解析耗時 logHandler.send("dns_elapsed_time", dns_elapsed_time); } @Override public void connectStart(Call call, InetSocketAddress inetSocketAddress, Proxy proxy) { super.connectStart(call, inetSocketAddress, proxy); start_connect_elapsed_time = System.currentTimeMillis(); } @Override public void secureConnectStart(Call call) { super.secureConnectStart(call); start_tls_connect_elapsed_time = System.currentTimeMillis(); } @Override public void secureConnectEnd(Call call, Handshake handshake) { super.secureConnectEnd(call, handshake); tls_connect_elapsed_time = System.currentTimeMillis() - start_tls_connect_elapsed_time; logHandler.send("tls_connect_elapsed_time", tls_connect_elapsed_time); } @Override public void connectEnd(Call call, InetSocketAddress inetSocketAddress, Proxy proxy, Protocol protocol) { super.connectEnd(call, inetSocketAddress, proxy, protocol); connect_elapsed_time = System.currentTimeMillis() - start_connect_elapsed_time; logHandler.send("connect_elapsed_time", connect_elapsed_time); } @Override public void connectFailed(Call call, InetSocketAddress inetSocketAddress, Proxy proxy, Protocol protocol, IOException ioe) { super.connectFailed(call, inetSocketAddress, proxy, protocol, ioe); } @Override public void connectionAcquired(Call call, Connection connection) { super.connectionAcquired(call, connection); } @Override public void connectionReleased(Call call, Connection connection) { super.connectionReleased(call, connection); } @Override public void requestHeadersStart(Call call) { super.requestHeadersStart(call); start_request_elapsed_time = System.currentTimeMillis(); } @Override public void requestHeadersEnd(Call call, Request request) { super.requestHeadersEnd(call, request); } @Override public void requestBodyStart(Call call) { super.requestBodyStart(call); } @Override public void requestBodyEnd(Call call, long byteCount) { super.requestBodyEnd(call, byteCount); request_elapsed_time = System.currentTimeMillis() - start_request_elapsed_time; logHandler.send("request_elapsed_time", request_elapsed_time); } @Override public void responseHeadersStart(Call call) { super.responseHeadersStart(call); start_response_elapsed_time = System.currentTimeMillis(); } @Override public void responseHeadersEnd(Call call, Response response) { super.responseHeadersEnd(call, response); } @Override public void responseBodyStart(Call call) { super.responseBodyStart(call); } @Override public void responseBodyEnd(Call call, long byteCount) { super.responseBodyEnd(call, byteCount); response_elapsed_time = System.currentTimeMillis() - start_response_elapsed_time; wait_elapsed_time = System.currentTimeMillis() - start_request_elapsed_time; logHandler.send("response_elapsed_time", response_elapsed_time); logHandler.send("wait_elapsed_time", wait_elapsed_time); } @Override public void callEnd(Call call) { super.callEnd(call); total_elapsed_time = System.currentTimeMillis() - start_total_elapsed_time; logHandler.send("total_elapsed_time", total_elapsed_time); } @Override public void callFailed(Call call, IOException ioe) { super.callFailed(call, ioe); } } //利用反射將logHandler打回來的數據存到對象 public static LogHandler getUplogHandler(final Object obj) { final String setMethod = "set"; LogHandler logHandler = new LogHandler() { @Override public void send(String key, Object value) { try { if (value instanceof String) { Method setByKey = obj.getClass().getMethod(setMethod + StringUtils.upperCase(key), Class.forName("java.lang.String")); setByKey.invoke(obj, value); } else if (value instanceof Integer) { Method setByKey = obj.getClass().getMethod(setMethod + StringUtils.upperCase(key), int.class); setByKey.invoke(obj, value); } else if (value instanceof Long) { Method setByKey = obj.getClass().getMethod(setMethod + StringUtils.upperCase(key), long.class); setByKey.invoke(obj, value); } } catch (NoSuchMethodException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } @Override public Object getUploadInfo() { return obj; } }; return logHandler; }
以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。
推薦閱讀:
- @TransactionalEventListener的使用和實現原理分析
- Spring @EventListener 異步中使用condition的問題及處理
- java和Spring中觀察者模式的應用詳解
- 詳解Spring事件發佈與監聽機制
- SpringBoot事件發佈和監聽詳解