SkyWalking 自定義插件(Spring RabbitMQ)具體分析過程

SkyWalking 自定義插件(Spring RabbitMQ) 官方

RabbitMQ插件問題

skywalking官方提供的RabbitMQ插件存在缺陷,其隻針對RabbitMQ官方原生Client實現擴展,但我們在項目中一般不直接使用原生Client,而是使用Spring RabitMQ Client,因Spring RabitMQ Consumer中存在跨線程操作,導致跟蹤ID斷鏈。

具體分析過程

1.官方插件源碼的攔截點是原生Consumer的handleDelivery方法,源碼如下:

2.而Spring RabbitMQ消費者的默認實現是BlockingQueueConsumer, handleDelivery核心邏輯是把消息放到內部的BlockingQueue隊列,不做真正的消費處理,因此攔截此處無法關聯到消費者邏輯,源碼如下

@Override
		public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
				byte[] body) {
		...
			try {
				if (BlockingQueueConsumer.this.abortStarted > 0) {
					if (!BlockingQueueConsumer.this.queue.offer(
							new Delivery(consumerTag, envelope, properties, body, this.queueName),
							BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {

						Channel channelToClose = super.getChannel();
						RabbitUtils.setPhysicalCloseRequired(channelToClose, true);
						// Defensive - should never happen
						BlockingQueueConsumer.this.queue.clear();
						if (!this.canceled) {
							RabbitUtils.cancel(channelToClose, consumerTag);
						}
						try {
							channelToClose.close();
						catch (@SuppressWarnings("unused") TimeoutException e) {
							// no-op
					}
				}
				else {
					BlockingQueueConsumer.this.queue
							.put(new Delivery(consumerTag, envelope, properties, body, this.queueName));
			}
			catch (@SuppressWarnings("unused") InterruptedException e) {
				Thread.currentThread().interrupt();
			catch (Exception e) {
				BlockingQueueConsumer.logger.warn("Unexpected exception during delivery", e);
		}

3.真正的消費處理在SimpleMessageListenerContainer,SimpleMessageListenerContainer繼承Runnable接口,在其run方法中while循環調用mainLoop方法,整體調用鏈路為

4.SimpleMessageListenerContainer.run() -> SimpleMessageListenerContainer.mainLoop() -> SimpleMessageListenerContainer.receiveAndExecute() -> SimpleMessageListenerContainer.doReceiveAndExecute() -> AbstractMessageListenerContainer.executeListener()。最終在executeListener中執行消費邏輯

protected void executeListener(Channel channel, Object data) {
	...
		try {
      // 執行消費邏輯
			doExecuteListener(channel, data);
			if (sample != null) {
				this.micrometerHolder.success(sample, data instanceof Message
						? ((Message) data).getMessageProperties().getConsumerQueue()
						: queuesAsListString());
			}
		}
		catch (RuntimeException ex) {
		....
		}
	}

實現自定義插件

從上面可以分析出,AbstractMessageListenerContainer.executeListener()是最佳的攔截點
實現源碼已放到碼雲倉庫:https://gitee.com/eureka-gitee/apm-sniffer-pro/tree/v7.0.0.0/

效果展示

SkyWalking調用鏈路

logback日志

到此這篇關於SkyWalking 自定義插件(Spring RabbitMQ)的文章就介紹到這瞭,更多相關SkyWalking 自定義插件內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: