深入研究spring boot集成kafka之spring-kafka底層原理

前言

kafka是一個消息隊列產品,基於Topic partitions的設計,能達到非常高的消息發送處理性能。Spring創建瞭一個項目Spring-kafka,封裝瞭Apache 的Kafka-client,用於在Spring項目裡快速集成kafka。除瞭簡單的收發消息外,Spring-kafka還提供瞭很多高級功能,下面我們就來一一探秘這些用法。

項目地址:https://github.com/spring-projects/spring-kafka

簡單集成

引入依賴

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.2.6.RELEASE</version>
</dependency>

添加配置

spring.kafka.producer.bootstrap-servers=127.0.0.1:9092

測試發送和接收

/**
 * @author: kl @kailing.pub
 * @date: 2019/5/30
 */
@SpringBootApplication
@RestController
public class Application {
	private final Logger logger = LoggerFactory.getLogger(Application.class);
	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}
	@Autowired
	private KafkaTemplate<Object, Object> template;
	@GetMapping("/send/{input}")
	public void sendFoo(@PathVariable String input) {
		this.template.send("topic_input", input);
	}
	@KafkaListener(id = "webGroup", topics = "topic_input")
	public void listen(String input) {
		logger.info("input value: {}" , input);
	}
}

啟動應用後,在瀏覽器中輸入:http://localhost:8080/send/kl。就可以在控制臺看到有日志輸出瞭:input value: "kl"。基礎的使用就這麼簡單。發送消息時註入一個KafkaTemplate,接收消息時添加一個@KafkaListener註解即可。

Spring-kafka-test嵌入式Kafka Server

不過上面的代碼能夠啟動成功,前提是你已經有瞭Kafka Server的服務環境,我們知道Kafka是由Scala + Zookeeper構建的,可以從官網下載部署包在本地部署。但是,我想告訴你,為瞭簡化開發環節驗證Kafka相關功能,Spring-Kafka-Test已經封裝瞭Kafka-test提供瞭註解式的一鍵開啟Kafka Server的功能,使用起來也是超級簡單。本文後面的所有測試用例的Kafka都是使用這種嵌入式服務提供的。

引入依賴

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka-test</artifactId>
   <version>2.2.6.RELEASE</version>
   <scope>test</scope>
</dependency>

啟動服務

下面使用Junit測試用例,直接啟動一個Kafka Server服務,包含四個Broker節點。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApplicationTests.class)
@EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095})
public class ApplicationTests {
	@Test
	public void contextLoads()throws IOException {
		System.in.read();
	}
}

如上:隻需要一個註解@EmbeddedKafka即可,就可以啟動一個功能完整的Kafka服務,是不是很酷。默認隻寫註解不加參數的情況下,是創建一個隨機端口的Broker,在啟動的日志中會輸出具體的端口以及默認的一些配置項。不過這些我們在Kafka安裝包配置文件中的配置項,在註解參數中都可以配置,下面詳解下@EmbeddedKafka註解中的可設置參數 :

  • value:broker節點數量
  • count:同value作用一樣,也是配置的broker的節點數量
  • controlledShutdown:控制關閉開關,主要用來在Broker意外關閉時減少此Broker上Partition的不可用時間

      Kafka是多Broker架構的高可用服務,一個Topic對應多個partition,一個Partition可以有多個副本Replication,這些Replication副本保存在多個Broker,用於高可用。但是,雖然存在多個分區副本集,當前工作副本集卻隻有一個,默認就是首次分配的副本集【首選副本】為Leader,負責寫入和讀取數據。當我們升級Broker或者更新Broker配置時需要重啟服務,這個時候需要將partition轉移到可用的Broker。下面涉及到三種情況

  •    直接關閉Broker:當Broker關閉時,Broker集群會重新進行選主操作,選出一個新的Broker來作為Partition Leader,選舉時此Broker上的Partition會短時不可用
  •   開啟controlledShutdown:當Broker關閉時,Broker本身會先嘗試將Leader角色轉移到其他可用的Broker上
  •   使用命令行工具:使用bin/kafka-preferred-replica-election.sh,手動觸發PartitionLeader角色轉移

ports:端口列表,是一個數組。對應瞭count參數,有幾個Broker,就要對應幾個端口號

brokerProperties:Broker參數設置,是一個數組結構,支持如下方式進行Broker參數設置:

@EmbeddedKafka(brokerProperties = {"log.index.interval.bytes = 4096","num.io.threads = 8"})

okerPropertiesLocation:Broker參數文件設置

  功能同上面的brokerProperties,隻是Kafka Broker的可設置參數達182個之多,都像上面這樣配置肯定不是最優方案,所以提供瞭加載本地配置文件的功能,如:

@EmbeddedKafka(brokerPropertiesLocation = "classpath:application.properties")

創建新的Topic

默認情況下,如果在使用KafkaTemplate發送消息時,Topic不存在,會創建一個新的Topic,默認的分區數和副本數為如下Broker參數來設定

num.partitions = 1 #默認Topic分區數
num.replica.fetchers = 1 #默認副本數

程序啟動時創建TOPIC

/**
 * @author: kl @kailing.pub
 * @date: 2019/5/31
 */
@Configuration
public class KafkaConfig {
	@Bean
	public KafkaAdmin admin(KafkaProperties properties){
		KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());
		admin.setFatalIfBrokerNotAvailable(true);
		return admin;
	}
	@Bean
	public NewTopic topic2() {
		return new NewTopic("topic-kl", 1, (short) 1);
	}
}

如果Kafka Broker支持(1.0.0或更高版本),則如果發現現有Topic的Partition 數少於設置的Partition 數,則會新增新的Partition分區。關於KafkaAdmin有幾個常用的用法如下:

setFatalIfBrokerNotAvailable(true):默認這個值是False的,在Broker不可用時,不影響Spring 上下文的初始化。如果你覺得Broker不可用影響正常業務需要顯示的將這個值設置為True

setAutoCreate(false) : 默認值為True,也就是Kafka實例化後會自動創建已經實例化的NewTopic對象

initialize():當setAutoCreate為false時,需要我們程序顯示的調用admin的initialize()方法來初始化NewTopic對象

代碼邏輯中創建

有時候我們在程序啟動時並不知道某個Topic需要多少Partition數合適,但是又不能一股腦的直接使用Broker的默認設置,這個時候就需要使用Kafka-Client自帶的AdminClient來進行處理。上面的Spring封裝的KafkaAdmin也是使用的AdminClient來處理的。如:

@Autowired
	private KafkaProperties properties;
	@Test
	public void testCreateToipc(){
		AdminClient client = AdminClient.create(properties.buildAdminProperties());
		if(client !=null){
			try {
				Collection<NewTopic> newTopics = new ArrayList<>(1);
				newTopics.add(new NewTopic("topic-kl",1,(short) 1));
				client.createTopics(newTopics);
			}catch (Throwable e){
				e.printStackTrace();
			}finally {
				client.close();
			}
		}
	}

PS:其他的方式創建TOPIC

上面的這些創建Topic方式前提是你的spring boot版本到2.x以上瞭,因為spring-kafka2.x版本隻支持spring boot2.x的版本。在1.x的版本中還沒有這些api。下面補充一種在程序中通過Kafka_2.10創建Topic的方式

引入依賴

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.2</version>
</dependency>

api方式創建

@Test
	public void testCreateTopic()throws Exception{
		ZkClient zkClient =new ZkClient("127.0.0.1:2181", 3000, 3000, ZKStringSerializer$.MODULE$)
		String topicName = "topic-kl";
		int partitions = 1;
		int replication = 1;
		AdminUtils.createTopic(zkClient,topicName,partitions,replication,new Properties());
	}

註意下ZkClient最後一個構造入參,是一個序列化反序列化的接口實現,博主測試如果不填的話,創建的Topic在ZK上的數據是有問題的,默認的Kafka實現也很簡單,就是做瞭字符串UTF-8編碼處理。ZKStringSerializer$是Kafka中已經實現好的一個接口實例,是一個Scala的伴生對象,在Java中直接調用點MODULE$就可以得到一個實例

命令方式創建

@Test
	public void testCreateTopic(){
		String [] options= new String[]{
				"--create",
				"--zookeeper","127.0.0.1:2181",
				"--replication-factor", "3",
				"--partitions", "3",
				"--topic", "topic-kl"
		};
		TopicCommand.main(options);
	}

消息發送之KafkaTemplate探秘

獲取發送結果

異步獲取

template.send("","").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
			@Override
			public void onFailure(Throwable throwable) {
				......
			}
			@Override
			public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
				....
			}
		});

同步獲取

ListenableFuture<SendResult<Object,Object>> future = template.send("topic-kl","kl");
		try {
			SendResult<Object,Object> result = future.get();
		}catch (Throwable e){
			e.printStackTrace();
		}

KAFKA事務消息

默認情況下,Spring-kafka自動生成的KafkaTemplate實例,是不具有事務消息發送能力的。需要使用如下配置激活事務特性。事務激活後,所有的消息發送隻能在發生事務的方法內執行瞭,不然就會拋一個沒有事務交易的異常

spring.kafka.producer.transaction-id-prefix=kafka_tx.

當發送消息有事務要求時,比如,當所有消息發送成功才算成功,如下面的例子:假設第一條消費發送後,在發第二條消息前出現瞭異常,那麼第一條已經發送的消息也會回滾。而且正常情況下,假設在消息一發送後休眠一段時間,在發送第二條消息,消費端也隻有在事務方法執行完成後才會接收到消息

@GetMapping("/send/{input}")
	public void sendFoo(@PathVariable String input) {
		template.executeInTransaction(t ->{
			t.send("topic_input","kl");
			if("error".equals(input)){
				throw new RuntimeException("failed");
			}
			t.send("topic_input","ckl");
			return true;
		});
	}

當事務特性激活時,同樣,在方法上面加@Transactional註解也會生效

@GetMapping("/send/{input}")
	@Transactional(rollbackFor = RuntimeException.class)
	public void sendFoo(@PathVariable String input) {
		template.send("topic_input", "kl");
		if ("error".equals(input)) {
			throw new RuntimeException("failed");
		}
		template.send("topic_input", "ckl");
	}

Spring-Kafka的事務消息是基於Kafka提供的事務消息功能的。而Kafka Broker默認的配置針對的三個或以上Broker高可用服務而設置的。這邊在測試的時候為瞭簡單方便,使用瞭嵌入式服務新建瞭一個單Broker的Kafka服務,出現瞭一些問題:如

1、事務日志副本集大於Broker數量,會拋如下異常:

Number of alive brokers '1' does not meet the required replication factor '3' 
for the transactions state topic (configured via 'transaction.state.log.replication.factor').
This error can be ignored if the cluster is starting up and not all brokers are up yet.

默認Broker的配置transaction.state.log.replication.factor=3,單節點隻能調整為1

2、副本數小於副本同步隊列數目,會拋如下異常

Number of insync replicas for partition __transaction_state-13 is [1], below required minimum [2]

默認Broker的配置transaction.state.log.min.isr=2,單節點隻能調整為1

REPLYINGKAFKATEMPLATE獲得消息回復

ReplyingKafkaTemplate是KafkaTemplate的一個子類,除瞭繼承父類的方法,新增瞭一個方法sendAndReceive,實現瞭消息發送回復語義

RequestReplyFuture sendAndReceive(ProducerRecord record);

也就是我發送一條消息,能夠拿到消費者給我返回的結果。就像傳統的RPC交互那樣。當消息的發送者需要知道消息消費者的具體的消費情況,非常適合這個api。如,一條消息中發送一批數據,需要知道消費者成功處理瞭哪些數據。下面代碼演示瞭怎麼集成以及使用ReplyingKafkaTemplate

/**
 * @author: kl @kailing.pub
 * @date: 2019/5/30
 */
@SpringBootApplication
@RestController
public class Application {
	private final Logger logger = LoggerFactory.getLogger(Application.class);
	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}
	@Bean
	public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
		ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer("replies");
		repliesContainer.getContainerProperties().setGroupId("repliesGroup");
		repliesContainer.setAutoStartup(false);
		return repliesContainer;
	}
	@Bean
	public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> pf, ConcurrentMessageListenerContainer<String, String> repliesContainer) {
		return new ReplyingKafkaTemplate(pf, repliesContainer);
	}
	@Bean
	public KafkaTemplate kafkaTemplate(ProducerFactory<String, String> pf) {
		return new KafkaTemplate(pf);
	}
	@Autowired
	private ReplyingKafkaTemplate template;
	@GetMapping("/send/{input}")
	@Transactional(rollbackFor = RuntimeException.class)
	public void sendFoo(@PathVariable String input) throws Exception {
		ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input);
		RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
		ConsumerRecord<String, String> consumerRecord = replyFuture.get();
		System.err.println("Return value: " + consumerRecord.value());
	}
	@KafkaListener(id = "webGroup", topics = "topic-kl")
	@SendTo
	public String listen(String input) {
		logger.info("input value: {}", input);
		return "successful";
	}
}

Spring-kafka消息消費用法探秘

@KAFKALISTENER的使用

前面在簡單集成中已經演示過瞭@KafkaListener接收消息的能力,但是@KafkaListener的功能不止如此,其他的比較常見的,使用場景比較多的功能點如下:

  • 顯示的指定消費哪些Topic和分區的消息,
  • 設置每個Topic以及分區初始化的偏移量,
  • 設置消費線程並發度
  • 設置消息異常處理器
@KafkaListener(id = "webGroup", topicPartitions = {
			@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
					@TopicPartition(topic = "topic2", partitions = "0",
							partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
			},concurrency = "6",errorHandler = "myErrorHandler")
	public String listen(String input) {
		logger.info("input value: {}", input);
		return "successful";
	}

其他的註解參數都很好理解,errorHandler需要說明下,設置這個參數需要實現一個接口KafkaListenerErrorHandler。而且註解裡的配置,是你自定義實現實例在spring上下文中的Name。比如,上面配置為errorHandler = "myErrorHandler"。則在spring上線中應該存在這樣一個實例:

/**
 * @author: kl @kailing.pub
 * @date: 2019/5/31
 */
@Service("myErrorHandler")
public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {
	Logger logger =LoggerFactory.getLogger(getClass());
	@Override
	public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
		logger.info(message.getPayload().toString());
		return null;
	}
	@Override
	public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
		logger.info(message.getPayload().toString());
		return null;
	}
}

手動ACK模式

手動ACK模式,由業務邏輯控制提交偏移量。比如程序在消費時,有這種語義,特別異常情況下不確認ack,也就是不提交偏移量,那麼你隻能使用手動Ack模式來做瞭。開啟手動首先需要關閉自動提交,然後設置下consumer的消費模式

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual

上面的設置好後,在消費時,隻需要在@KafkaListener監聽方法的入參加入Acknowledgment 即可,執行到ack.acknowledge()代表提交瞭偏移量

@KafkaListener(id = "webGroup", topics = "topic-kl")
	public String listen(String input, Acknowledgment ack) {
		logger.info("input value: {}", input);
		if ("kl".equals(input)) {
			ack.acknowledge();
		}
		return "successful";
	}

@KAFKALISTENER註解監聽器生命周期

@KafkaListener註解的監聽器的生命周期是可以控制的,默認情況下,@KafkaListener的參數autoStartup = "true"。也就是自動啟動消費,但是也可以同過KafkaListenerEndpointRegistry來幹預他的生命周期。KafkaListenerEndpointRegistry有三個動作方法分別如:start(),pause(),resume()/啟動,停止,繼續。如下代碼詳細演示瞭這種功能。

/**
 * @author: kl @kailing.pub
 * @date: 2019/5/30
 */
@SpringBootApplication
@RestController
public class Application {
	private final Logger logger = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}
	@Autowired
	private KafkaTemplate template;
	@GetMapping("/send/{input}")
	@Transactional(rollbackFor = RuntimeException.class)
	public void sendFoo(@PathVariable String input) throws Exception {
		ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input);
		template.send(record);
	}
	@Autowired
	private KafkaListenerEndpointRegistry registry;
	@GetMapping("/stop/{listenerID}")
	public void stop(@PathVariable String listenerID){
		registry.getListenerContainer(listenerID).pause();
	}
	@GetMapping("/resume/{listenerID}")
	public void resume(@PathVariable String listenerID){
		registry.getListenerContainer(listenerID).resume();
	}
	@GetMapping("/start/{listenerID}")
	public void start(@PathVariable String listenerID){
		registry.getListenerContainer(listenerID).start();
	}
	@KafkaListener(id = "webGroup", topics = "topic-kl",autoStartup = "false")
	public String listen(String input) {
		logger.info("input value: {}", input);
		return "successful";
	}
}

在上面的代碼中,listenerID就是@KafkaListener中的id值“webGroup”。項目啟動好後,分別執行如下url,就可以看到效果瞭。

先發送一條消息:http://localhost:8081/send/ckl。因為autoStartup = "false",所以並不會看到有消息進入監聽器。

接著啟動監聽器:http://localhost:8081/start/webGroup。可以看到有一條消息進來瞭。

暫停和繼續消費的效果使用類似方法就可以測試出來瞭。

SENDTO消息轉發

前面的消息發送響應應用裡面已經見過@SendTo,其實除瞭做發送響應語義外,@SendTo註解還可以帶一個參數,指定轉發的Topic隊列。常見的場景如,一個消息需要做多重加工,不同的加工耗費的cup等資源不一致,那麼就可以通過跨不同Topic和部署在不同主機上的consumer來解決瞭。如:

@KafkaListener(id = "webGroup", topics = "topic-kl")
	@SendTo("topic-ckl")
	public String listen(String input) {
		logger.info("input value: {}", input);
		return input + "hello!";
	}
	@KafkaListener(id = "webGroup1", topics = "topic-ckl")
	public void listen2(String input) {
		logger.info("input value: {}", input);
	}

消息重試和死信隊列的應用

除瞭上面談到的通過手動Ack模式來控制消息偏移量外,其實Spring-kafka內部還封裝瞭可重試消費消息的語義,也就是可以設置為當消費數據出現異常時,重試這個消息。而且可以設置重試達到多少次後,讓消息進入預定好的Topic。也就是死信隊列裡。下面代碼演示瞭這種效果:

@Autowired
	private KafkaTemplate template;
	@Bean
	public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
			ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
			ConsumerFactory<Object, Object> kafkaConsumerFactory,
			KafkaTemplate<Object, Object> template) {
		ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
		configurer.configure(factory, kafkaConsumerFactory);
		//最大重試三次
		factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3));
		return factory;
	}
	@GetMapping("/send/{input}")
	public void sendFoo(@PathVariable String input) {
		template.send("topic-kl", input);
	}
	@KafkaListener(id = "webGroup", topics = "topic-kl")
	public String listen(String input) {
		logger.info("input value: {}", input);
		throw new RuntimeException("dlt");
	}
	@KafkaListener(id = "dltGroup", topics = "topic-kl.DLT")
	public void dltListen(String input) {
		logger.info("Received from DLT: " + input);
	}

上面應用,在topic-kl監聽到消息會,會觸發運行時異常,然後監聽器會嘗試三次調用,當到達最大的重試次數後。消息就會被丟掉重試死信隊列裡面去。死信隊列的Topic的規則是,業務Topic名字+“.DLT”。如上面業務Topic的name為“topic-kl”,那麼對應的死信隊列的Topic就是“topic-kl.DLT”

文末結語

最近業務上使用瞭kafka用到瞭Spring-kafka,所以系統性的探索瞭下Spring-kafka的各種用法,發現瞭很多好玩很酷的特性,比如,一個註解開啟嵌入式的Kafka服務、像RPC調用一樣的發送響應語義調用、事務消息等功能。希望此博文能夠幫助那些正在使用Spring-kafka或即將使用的人少走一些彎路少踩一點坑。

以上就是深入研究spring boot集成kafka之spring-kafka底層原理的詳細內容,更多關於spring boot集成kafka底層原理的資料請關註WalkonNet其它相關文章!

推薦閱讀: