Springboot集成Kafka進行批量消費及踩坑點
引入依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.11.RELEASE</version> </dependency>
因為我的項目的 springboot 版本是 1.5.22.RELEASE,所以引的是 1.3.11.RELEASE 的包。讀者可以根據下圖來自行選擇對應的版本。圖片更新可能不及時,詳情可查看spring-kafka 官方網站。
註:這裡有個踩坑點,如果引入包版本不對,項目啟動時會拋出org.springframework.core.log.LogAccessor 異常:
java.lang.ClassNotFoundException: org.springframework.core.log.LogAccessor
創建配置類
/** * kafka 配置類 */ @Configuration @EnableKafka public class KafkaConsumerConfig { private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerConfig.class); @Value("${kafka.bootstrap.servers}") private String kafkaBootstrapServers; @Value("${kafka.group.id}") private String kafkaGroupId; @Value("${kafka.topic}") private String kafkaTopic; public static final String CONFIG_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka_client_jaas.conf"; public static final String LOCATION_PATH = "/home/admin/xxx/BOOT-INF/classes/kafka.client.truststore.jks"; @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // 設置並發量,小於或者等於 Topic 的分區數 factory.setConcurrency(5); // 設置為批量監聽 factory.setBatchListener(Boolean.TRUE); factory.getContainerProperties().setPollTimeout(30000); return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); //設置接入點,請通過控制臺獲取對應Topic的接入點。 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers); //設置SSL根證書的路徑,請記得將XXX修改為自己的路徑。 //與SASL路徑類似,該文件也不能被打包到jar中。 System.setProperty("java.security.auth.login.config", CONFIG_PATH); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, LOCATION_PATH); //根證書存儲的密碼,保持不變。 props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient"); //接入協議,目前支持使用SASL_SSL協議接入。 props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); //SASL鑒權方式,保持不變。 props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); // 自動提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE); //兩次Poll之間的最大允許間隔。 //消費者超過該值沒有返回心跳,服務端判斷消費者處於非存活狀態,服務端將消費者從Consumer Group移除並觸發Rebalance,默認30s。 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); //設置單次拉取的量,走公網訪問時,該參數會有較大影響。 props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000); props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000); //每次Poll的最大數量。 //註意該值不要改得太大,如果Poll太多數據,而不能在下次Poll之前消費完,則會觸發一次負載均衡,產生卡頓。 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30); //消息的反序列化方式。 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //當前消費實例所屬的消費組,請在控制臺申請之後填寫。 //屬於同一個組的消費實例,會負載消費消息。 props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); //Hostname校驗改成空。 props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); return props; } }
註:此處通過 factory.setConcurrency(5); 配置瞭並發量為 5 ,假設我們線上的 Topic 有 12 個分區。那麼將會是 3 個線程分配到 2 個分區,2 個線程分配到 3 個分區,3 * 2 + 2 * 3 = 12。
Kafka 消費者
/** * kafka 消息消費類 */ @Component public class KafkaMessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class); @KafkaListener(topics = {"${kafka.topic}"}) public void listen(List<ConsumerRecord<String, String>> recordList) { for (ConsumerRecord<String,String> record : recordList) { // 打印消息的分區以及偏移量 LOGGER.info("Kafka Consume partition:{}, offset:{}", record.partition(), record.offset()); String value = record.value(); System.out.println("value = " + value); // 處理業務邏輯 ... } } }
因為我在配置類中設置瞭批量監聽,所以此處 listen 方法的入參是List:List<ConsumerRecord<String, String>>。
到此這篇關於Springboot集成Kafka進行批量消費及踩坑點的文章就介紹到這瞭,更多相關Springboot Kafka批量消費內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- 解決kafka消息堆積及分區不均勻的問題
- Java Kafka實現延遲隊列的示例代碼
- springboot項目配置多個kafka的示例代碼
- Java實現Kafka生產者和消費者的示例
- springboot之配置雙kafka全過程