Spring Boot 集成 Kafkad的實現示例
Spring Boot 作為主流微服務框架,擁有成熟的社區生態。市場應用廣泛,為瞭方便大傢,整理瞭一個基於spring boot的常用中間件快速集成入門系列手冊,涉及RPC、緩存、消息隊列、分庫分表、註冊中心、分佈式配置等常用開源組件,大概有幾十篇文章,陸續會開放出來,感興趣同學請提前關註&收藏
消息通信有兩種基本模型,即發佈-訂閱(Pub-Sub)模型和點對點(Point to Point)模型,發佈-訂閱支持生產者消費者之間的一對多關系,而點對點模型中有且僅有一個消費者。
前言
Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。該項目的目標是為處理實時數據提供一個統一、高吞吐、低延遲的平臺。其持久化層本質上是一個“按照分佈式事務日志架構的大規模發佈/訂閱消息隊列”。
Kafka高效地處理實時流式數據,可以實現與Storm、HBase和Spark的集成。作為聚類部署到多臺服務器上,Kafka處理它所有的發佈和訂閱消息系統使用瞭四個API,即生產者API、消費者API、Stream API和Connector API。它能夠傳遞大規模流式消息,自帶容錯功能,已經取代瞭一些傳統消息系統,如JMS、AMQP等。
為什麼使用kafka?
- 削峰填谷。緩沖上下遊瞬時突發流量,保護 “脆弱” 的下遊系統不被壓垮,避免引發全鏈路服務 “雪崩”。
- 系統解耦。發送方和接收方的松耦合,一定程度簡化瞭開發成本,減少瞭系統間不必要的直接依賴。
- 異步通信:消息隊列允許用戶把消息放入隊列但不立即處理它。
- 可恢復性:即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復後被處理。
業務場景
- 一些同步業務流程的非核心邏輯,對時間要求不是特別高,可以解耦異步來執行
- 系統日志收集,采集並同步到kafka,一般采用ELK組合玩法
- 一些大數據平臺,用於各個系統間數據傳遞
基本架構
Kafka 運行在一個由一臺或多臺服務器組成的集群上,並且分區可以跨集群節點分佈
1、Producer 生產消息,發送到Broker中
2、Leader狀態的Broker接收消息,寫入到相應topic中。在一個分區內,這些消息被索引並連同時間戳存儲在一起
3、Leader狀態的Broker接收完畢以後,傳給Follow狀態的Broker作為副本備份
4、 Consumer 消費者的進程可以從分區訂閱,並消費消息
常用術語
- Broker。負責接收和處理客戶端發送過來的請求,以及對消息進行持久化。雖然多個 Broker 進程能夠運行在同一臺機器上,但更常見的做法是將不同的 Broker 分散運行在不同的機器上
- 主題:Topic。主題是承載消息的邏輯容器,在實際使用中多用來區分具體的業務。
- 分區:Partition。一個有序不變的消息序列。每個主題下可以有多個分區。
- 消息:這裡的消息就是指 Kafka 處理的主要對象。
- 消息位移:Offset。表示分區中每條消息的位置信息,是一個單調遞增且不變的值。
- 副本:Replica。Kafka 中同一條消息能夠被拷貝到多個地方以提供數據冗餘,這些地方就是所謂的副本。副本還分為領導者副本和追隨者副本,各自有不同的角色劃分。每個分區可配置多個副本實現高可用。一個分區的N個副本一定在N個不同的Broker上。
- Leader:每個分區多個副本的“主”副本,生產者發送數據的對象,以及消費者消費數據的對象,都是 Leader。
- Follower:每個分區多個副本的“從”副本,實時從 Leader 中同步數據,保持和 Leader 數據的同步。Leader 發生故障時,某個 Follower 還會成為新的 Leader。
- 生產者:Producer。向主題發佈新消息的應用程序。
- 消費者:Consumer。從主題訂閱新消息的應用程序。
- 消費者位移:Consumer Offset。表示消費者消費進度,每個消費者都有自己的消費者位移。offset保存在broker端的內部topic中,不是在clients中保存
- 消費者組:Consumer Group。多個消費者實例共同組成的一個組,同時消費多個分區以實現高吞吐。
- 重平衡:Rebalance。消費者組內某個消費者實例掛掉後,其他消費者實例自動重新分配訂閱主題分區的過程。Rebalance 是 Kafka 消費者端實現高可用的重要手段。
代碼演示
外部依賴:
在 pom.xml 中添加 Kafka 依賴:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
由於spring-boot-starter-parent 指定的版本號是2.1.5.RELEASE,spring boot 會對外部框架的版本號統一管理,spring-kafka 引入的版本是 2.2.6.RELEASE
配置文件:
在配置文件 application.yaml 中配置 Kafka 的相關參數,具體內容如下:
Spring: kafka: bootstrap-servers: localhost:9092 producer: retries: 3 # 生產者發送失敗時,重試次數 batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 生產者消息key和消息value的序列化處理類 value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: tomge-consumer-group # 默認消費者group id auto-offset-reset: earliest enable-auto-commit: true auto-commit-interval: 100 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
對應的配置類 org.springframework.boot.autoconfigure.kafka.KafkaProperties
,來初始化kafka相關的bean實例對象,並註冊到spring容器中。
發送消息:
Spring Boot 作為一款支持快速開發的集成性框架,同樣提供瞭一批以 -Template
命名的模板工具類用於實現消息通信。對於 Kafka 而言,這個工具類就是KafkaTemplate
。
KafkaTemplate 提供瞭一系列 send 方法用來發送消息,典型的 send 方法定義如下代碼所示:
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) { 。。。。 省略 }
生產端提供瞭一個restful接口,模擬發送一條創建新用戶消息。
@GetMapping("/add_user") public Object add() { try { Long id = Long.valueOf(new Random().nextInt(1000)); User user = User.builder().id(id).userName("TomGE").age(29).address("上海").build(); ListenableFuture<SendResult> listenableFuture = kafkaTemplate.send(addUserTopic, JSON.toJSONString(user)); // 提供回調方法,可以監控消息的成功或失敗的後續處理 listenableFuture.addCallback(new ListenableFutureCallback<SendResult>() { @Override public void onFailure(Throwable throwable) { System.out.println("發送消息失敗," + throwable.getMessage()); } @Override public void onSuccess(SendResult sendResult) { // 消息發送到的topic String topic = sendResult.getRecordMetadata().topic(); // 消息發送到的分區 int partition = sendResult.getRecordMetadata().partition(); // 消息在分區內的offset long offset = sendResult.getRecordMetadata().offset(); System.out.println(String.format("發送消息成功,topc:%s, partition: %s, offset:%s ", topic, partition, offset)); } }); return "消息發送成功"; } catch (Exception e) { e.printStackTrace(); return "消息發送失敗"; } }
實際上開發使用的Kafka默認允許自動創建Topic,創建Topic時默認的分區數量是1,可以通過server.properties文件中的num.partitions=1修改默認分區數量。在生產環境中通常會關閉自動創建功能,Topic需要由運維人員先創建好。
消費消息:
在 Kafka 中消息通過服務器推送給各個消費者,而 Kafka 的消費者在消費消息時,需要提供一個監聽器(Listener)對某個 Topic 實現監聽,從而獲取消息,這也是 Kafka 消費消息的唯一方式。
定義一個消費類,在處理具體消息業務邏輯的方法上添加 @KafkaListener 註解,並配置要消費的topic,代碼如下所示:
@Component public class UserConsumer { @KafkaListener(topics = "add_user") public void receiveMesage(String content) { System.out.println("消費消息:" + content); } }
是不是很簡單,添加kafka依賴、使用KafkaTemplate、@KafkaListener註解就完成消息的生產和消費,其實是SpringBoot在背後默默的做瞭很多工作,如果感興趣可以研究下spring-boot-autoconfigure ,裡面提供瞭常用開源框架的客戶端實例封裝。
演示工程代碼
https://github.com/aalansehaiyang/spring-boot-bulking
模塊:spring-boot-bulking-kafka
以上就是Spring Boot 集成 Kafkad的示例的詳細內容,更多關於Spring Boot 集成 Kafka的資料請關註WalkonNet其它相關文章!
推薦閱讀:
- None Found