一文秒懂 kafka HA(高可用)

我們知道,kafka中每個topic被劃分為多個partition,每個partition又有多個副本,那麼這些分區副本是怎麼均勻的分佈在整個kafka集群的broker節點上的?partition副本的leader是通過什麼算法選舉出來的?partition副本的follower是怎麼復制備份leader的數據的?本文我們就來說一說和 kafka 高可用相關的一些策略。

01名詞解釋

要想說明白kafka的HA機制,我們必須先搞明白幾個縮寫名詞,
1、AR、ISR、OSR
AR:Assigned Replicas,某分區的所有副本(這裡所說的副本包括leader和follower)統稱為 AR。
ISR:In Sync Replicas,所有與leader副本保持”一定程度同步”的副本(包括leader副本在內)組成 ISR 。生產者發送消息時,隻有leader與客戶端發生交互,follower隻是同步備份leader的數據,以保障高可用,所以生產者的消息會先發送到leader,然後follower才能從leader中拉取消息進行同步,同步期間,follower的數據相對leader而言會有一定程度的滯後,前面所說的”一定程度同步”就是指可忍受的滯後范圍,這個范圍可以通過server.properties中的參數進行配置。
OSR :Out-of-Sync Replied,在上面的描述中,相對leader滯後過多的follower將組成OSR 。
由此可見,AR = ISR + OSR,理想情況下,所有的follower副本都應該與leader 保持一定程度的同步,即AR=ISR,OSR集合為空
2、ISR 的伸縮性
leader負責跟蹤維護 ISR 集合中所有follower副本的滯後狀態,當follower副本”落後太多” 或 “follower超過一定時間沒有向leader發送同步請求”時,leader副本會把它從 ISR 集合中剔除。如果 OSR 集合中有follower副本”追上”瞭leader副本,那麼leader副本會把它從 OSR 集合轉移至 ISR 集合。
上面描述的”落後太多”是指follower復制的消息落後於leader的條數超過預定值,這個預定值可在server.properties中通過replica.lag.max.messages配置,其默認值是4000。”超過一定時間沒有向leader發送同步請求”,這個”一定時間”可以在server.properties中通過replica.lag.time.max.ms來配置,其默認值是10000,默認情況下,當leader發生故障時,隻有 ISR 集合中的follower副本才有資格被選舉為新的leader,而在 OSR 集合中的副本則沒有任何機會(不過這個可以通過配置來改變)。
3、HW
HW (High Watermark)俗稱高水位,它標識瞭一個特定的消息偏移量(offset),消費者隻能消費HW之前的消息。
下圖表示一個日志文件,這個日志文件中有9條消息,第一條消息的offset為0,最後一條消息的offset為8,虛線表示的offset為9的消息,代表下一條待寫入的消息。日志文件的 HW 為6,表示消費者隻能拉取offset在 0 到 5 之間的消息,offset為6的消息對消費者而言是不可見的。
4、LEO
LEO (Log End Offset),標識當前日志文件中下一條待寫入的消息的offset。上圖中offset為9的位置即為當前日志文件的 LEO,分區 ISR 集合中的每個副本都會維護自身的 LEO ,而 ISR 集合中最小的 LEO 即為分區的 HW(你品,你細品…),對消費者而言隻能消費 HW 之前的消息。
5、 ISR 集合和 HW、LEO的關系
producer在發佈消息到partition時,隻會與該partition的leader發生交互將消息發送給leader,leader會將該消息寫入其本地log,每個follower都從leader上pull數據做同步備份,follower在pull到該消息並寫入其log後,會向leader發送ack,一旦leader收到瞭ISR中的所有follower的ack(隻關註ISR中的所有follower,不考慮OSR,一定程度上提升瞭吞吐),該消息就被認為已經commit瞭,leader將增加HW,然後向producer發送ack。
也就是說,在ISR中所有的follower還沒有完成數據備份之前,leader不會增加HW,也就是這條消息暫時還不能被消費者消費,隻有當ISR中所有的follower都備份完成後,leader才會將HW後移。
ISR集合中LEO最小的副本,即同步數據同步的最慢的一個,這個最慢副本的LEO即leader的HW,消費者隻能消費HW之前的消息。

02kafka HA

Tips:我們說的副本包括leader和follower,都叫副本,不要認為叫副本說的就是follower。
kafka在0.8以前的版本中是沒有分區副本的概念的,一旦某一個broker宕機,這個broker上的所有分區都將不可用。在0.8版本以後,引入瞭分區副本的概念,同一個partition可以有多個副本,在多個副本中會選出一個做leader,其餘的作為follower,隻有leader對外提供讀寫服務,follower隻負責從leader上同步拉取數據,已保障高可用。
1、partition副本的分配策略
每個topic有多個partition,每個partition有多個副本,這些partition副本分佈在不同的broker上,以保障高可用,那麼這些partition副本是怎麼均勻的分佈到集群中的每個broker上的呢?
※ kafka分配partition副本的算法如下,
① 將所有的broker(假設總共n個broker)和 待分配的partition排序;
② 將第i個partition分配到第(i mod n)個broker上;
③ 第i個partition的第j個副本分配到第((i+j) mod n)個broker上;
2、kafka的消息傳遞備份策略
生產者將消息發送給分區的leader,leader會將該消息寫入其本地log,然後每個follower都會從leader pull數據,follower pull到該消息並將其寫入log後,會向leader發送ack,當leader收到瞭ISR集合中所有follower的ack後,就認為這條消息已經commit瞭,leader將增加HW並且向生產者返回ack。在整個流程中,follower也可以批量的從leader復制數據,以提升復制性能。
producer在發送消息的時候,可指定參數acks,表示”在生產者認為發送請求完成之前,有多少分區副本必須接收到數據”,有三個可選值,0、1、all(或-1),默認為1,
  • acks=0,表示producer隻管發,隻要發出去就認為發發送請求完成瞭,不管leader有沒有收到,更不管follower有沒有備份完成。
  • acks=1,表示隻要leader收到消息,並將其寫入自己log後,就會返回給producer ack,不考慮follower有沒有備份完成。
  • acks=all(或-1),表示不僅要leader收到消息寫入本地log,還要等所有ISR集合中的follower都備份完成後,producer才認為發送成功。
實際上,為瞭提高性能,follower在pull到消息將其保存到內存中而尚未寫入磁盤時,就會向leader發送ack,所以也就不能完全保證異常發生後該條消息一定能被Consumer消費。
3、kafka中的Leader選舉
面試官在考查你kafka知識的時候如果問你:kafka中的選舉是怎麼回事?而不說具體哪種選舉,那這個面試官可能對kafka也是一知半解,這個時候就是”弄死”他的時候瞭,當然如果你沒有一定的知識儲備,那麼就是你被”弄死”的時候。
因為kafka中涉及到選舉的地方有多處,最常提及的也有:①cotroller選舉 、 ②分區leader選舉 和 ③consumer group leader的選舉。我們在前面說過同一個partition有多個副本,其中一個副本作為leader,其餘的作為follower。這裡我們再說一個角色:controller!kafka集群中多個broker,有一個會被選舉為controller,註意區分兩者,一個是broker的leader,我們稱為controller,一個是分區副本的leader,我們稱為leader。
① controller的選舉【broker的leader】
controller的選舉是通過broker在zookeeper的”/controller”節點下創建臨時節點來實現的,並在該節點中寫入當前broker的信息 {“version”:1,”brokerid”:1,”timestamp”:”1512018424988”} ,利用zookeeper的強一致性特性,一個節點隻能被一個客戶端創建成功,創建成功的broker即為controller,即”先到先得”。 
當controller宕機或者和zookeeper失去連接時,zookeeper檢測不到心跳,zookeeper上的臨時節點會被刪除,而其它broker會監聽臨時節點的變化,當節點被刪除時,其它broker會收到通知,重新發起controller選舉。
② leader的選舉【分區副本的leader】
分區leader的選舉由 controller 負責管理和實施,當leader發生故障時,controller會將leader的改變直接通過RPC的方式通知需要為此作出響應的broker,需要為此作出響應的broker即該分區的ISR集合中follower所在的broker,kafka在zookeeper中動態維護瞭一個ISR,隻有ISR裡的follower才有被選為Leader的可能。
具體過程是這樣的:按照AR集合中副本的順序 查找到 第一個 存活的、並且屬於ISR集合的 副本作為新的leader。一個分區的AR集合在創建分區副本的時候就被指定,隻要不發生重分配的情況,AR集合內部副本的順序是保持不變的,而分區的ISR集合上面說過因為同步滯後等原因可能會改變,所以註意這裡是根據AR的順序而不是ISR的順序找。
※ 對於上面描述的過程我們假設一種極端的情況,如果partition的所有副本都不可用時,怎麼辦?這種情況下kafka提供瞭兩種可行的方案:
1、選擇 ISR中 第一個活過來的副本作為Leader;
2、選擇第一個活過來的副本(不一定是ISR中的)作為Leader;
這就需要在可用性和數據一致性當中做出選擇,如果一定要等待ISR中的副本活過來,那不可用的時間可能會相對較長。選擇第一個活過來的副本作為Leader,如果這個副本不在ISR中,那數據的一致性則難以保證。kafka支持用戶通過配置選擇,以根據業務場景在可用性和數據一致性之間做出權衡。
③消費組leader的選舉
組協調器會為消費組(consumer group)內的所有消費者選舉出一個leader,這個選舉的算法也很簡單,第一個加入consumer group的consumer即為leader,如果某一時刻leader消費者退出瞭消費組,那麼會重新 隨機 選舉一個新的leader。

03kafka架構中zookeeper的結構

1、查看方式
我們知道,kafka是基於zookeeper協調管理的,那麼zookeeper中究竟存儲瞭哪些信息?另外在後面分析 broker宕機 和 controller宕機 時,我們也需要先瞭解zookeeper的目錄結構,所以我們先學習一下怎麼查看zookeeper的目錄結構?
① 首先啟動zookeeper客戶端連接zk服務
# cd /usr/local/zookeeper-cluster/zk1/bin
# ./zkCli.sh
② 查看zk根節點的子目錄
[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
③ 可以看到zk根節點下有很多子目錄,以brokers為例,查看brokers的層級結構
[zk: localhost:2181(CONNECTED) 1] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
[0]
[zk: localhost:2181(CONNECTED) 3] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://172.17.80.219:9092"],"jmx_port":-1,"host":"172.17.80.219","timestamp":"1584267365984","port":9092,"version":4}
cZxid = 0x300000535
ctime = Sun Mar 15 18:16:06 CST 2020
mZxid = 0x300000535
mtime = Sun Mar 15 18:16:06 CST 2020
pZxid = 0x300000535
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x20191d7053f0009
dataLength = 196
numChildren = 0
[zk: localhost:2181(CONNECTED) 4] 
[zk: localhost:2181(CONNECTED) 4]
[zk: localhost:2181(CONNECTED) 4]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics
[__consumer_offsets, first]
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics/first
[partitions]
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/first/partitions
[0, 1]
[zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/first/partitions/0
[state]
[zk: localhost:2181(CONNECTED) 8] get /brokers/topics/first/partitions/0/state
{"controller_epoch":21,"leader":0,"version":1,"leader_epoch":8,"isr":[0]}
cZxid = 0x3000003e9
ctime = Sun Mar 08 16:24:37 CST 2020
mZxid = 0x3000005cb
mtime = Sun Mar 15 18:54:09 CST 2020
pZxid = 0x3000003e9
cversion = 0
dataVersion = 10
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 73
numChildren = 0
[zk: localhost:2181(CONNECTED) 9]
可以看到,brokers下包括[ids, topics, seqid],ids裡面存儲瞭存活的broker的信息,topics裡面存儲瞭kafka集群中topic的信息。同樣的方法,可以查看其餘節點的結構,這裡不再演示。
2、節點信息(這裡隻列出和HA相關的部分節點)
① controller
controller節點下存放的是kafka集群中controller的信息(controller即kafka集群中所有broker的leader)。
② controller_epoch
controller_epoch用於記錄controller發生變更的次數(controller宕機後會重新選舉controller,這時候controller_epoch的值會+1),即記錄當前的控制器是第幾代控制器,用於防止broker腦裂。
③ brokes
brokers下的ids存儲瞭存活的broker信息,topics存儲瞭kafka集群中topic的信息,其中有一個特殊的topic:_consumer_offsets,新版本的kafka將消費者的offset就存儲在__consumer_offsets下。

04broker failover

我們瞭解瞭kafka集群中zookpeeper的結構,本文的主題是kafka的高可用分析,所以我們還是結合zookpper的結構,來分析一下,當kafka集群中的一個broker節點宕機時(非controller節點),會發生什麼?
在講之前,我們再來回顧一下brokers的結構,
※ 當非controller的broker宕機時,會執行如下操作,
1、controller會在zookeeper的 ” /brokers/ids/” 節點註冊一個watcher(監視器),當有broker宕機時,zookeeper會觸發監視器(fire watch)通知controller。
2、controller 從 “/brokers/ids” 節點讀取到所有可用的broker。
3、controller會聲明一個set_p集合,該集合包含瞭宕機broker上所有的partition。
4、針對set_p中的每一個partition,
① 從 “/state”節點 讀取該partition當前的ISR;
② 決定該partition的新leader:如果該分區的 ISR中有存活的副本,則選擇其中一個作為新leader;如果該partition的ISR副本全部掛瞭,則選擇該partition的 AR集合 中任一幸存的副本作為leader;如果該partition的所有副本都掛,則將分區的leader設為-1;
③ 將新 leader、ISR、controller_epoch 和 leader_epoch 等信息寫入 state 節點;
5、通過RPC向set_p相關的broker發送LeaderAndISR Request命令。

05 controller failover

當 controller 宕機時會觸發 controller failover。每個 broker 都會在 zookeeper 的 “/controller” 節點註冊 watcher(監聽器),當 controller 宕機時 zookeeper 中的臨時節點消失,所有存活的 broker 收到 fire 的通知,每個 broker 都嘗試創建新的臨時節點,隻有一個會創建成功並當選為 controller。
當新的 controller 當選時,會回調KafkaController的onControllerFailover()方法,在這個方法中完成controller的初始化,controller 在初始化時,首先會利用 ZK 的 watch 機制註冊很多不同類型的監聽器,主要有以下幾種:
  • 監聽 /admin/reassign_partitions 節點,用於分區副本遷移的監聽;
  • 監聽 /isr_change_notification 節點,用於 Partition Isr 變動的監聽;
  • 監聽 /admin/preferred_replica_election 節點,用於 Partition 最優 leader 選舉的監聽;
  • 監聽 /brokers/topics 節點,用於 topic 新建的監聽;
  • 監聽 /brokers/topics/TOPIC_NAME 節點,用於 Topic Partition 擴容的監聽;
  • 監聽 /admin/delete_topics 節點,用於 topic 刪除的監聽;
  • 監聽 /brokers/ids 節點,用於 Broker 上下線的監聽;
除瞭註冊多種監聽器外,controller初始化時還做以下操作,
  • initializeControllerContext()
初始化controller上下文,設置當前所有broker、topic、partition的leader、ISR等;
  • replicaStateMachine.startup()
  • partitionStateMachine.startup()
啟動狀態機;
  • brokerState.newState(RunningAsController)
將 brokerState 狀態設置為 RunningAsController;
  • sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
把partition leadership信息發到所有brokers;
  • autoRebalanceScheduler.startup()
如果打開瞭autoLeaderRebalance,則啟動”partition-rebalance-thread”線程;
  • deleteTopicManager.start()
如果delete.topic.enable=true,且 /admin/delete_topics 節點下有值,則刪除相應的topic;
最後,把onControllerFailover()方法的源碼貼一下,上面說的這些操作就是在這個方法中完成的,感興趣的可以再去看下kafka源碼,
def onControllerFailover() {
    if (isRunning) {
        info("Broker %d starting become controller state transition".format(config.brokerId))
        //read controller epoch from zk
        readControllerEpochFromZookeeper()
        // increment the controller epoch
        incrementControllerEpoch(zkUtils.zkClient)
        // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
        registerReassignedPartitionsListener()
        registerIsrChangeNotificationListener()
        registerPreferredReplicaElectionListener()
        partitionStateMachine.registerListeners()
        replicaStateMachine.registerListeners()
        initializeControllerContext()
        replicaStateMachine.startup()
        partitionStateMachine.startup()
        // register the partition change listeners for all existing topics on failover
        controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
        info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
        brokerState.newState(RunningAsController)
        maybeTriggerPartitionReassignment()
        maybeTriggerPreferredReplicaElection()
        /* send partition leadership info to all live brokers */
        sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
        if (config.autoLeaderRebalanceEnable) {
            info("starting the partition rebalance scheduler")
            autoRebalanceScheduler.startup()
            autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
                5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
        }
        deleteTopicManager.start()
    }
    else
        info("Controller has been shut down, aborting startup/failover")
}

到此這篇關於秒懂 kafka HA(高可用)的文章就介紹到這瞭,更多相關kafka HA內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: