kafka安裝部署超詳細步驟

概述

Kafka是最初由Linkedin公司開發,是一個分佈式、分區的、多副本的、多訂閱者,基於zookeeper協調的分佈式日志系統(也可以當做MQ系統),常見可以用於web/nginx日志、訪問日志,消息服務等等,Linkedin於2010年貢獻給瞭Apache基金會並成為頂級開源項目。

主要應用場景是:日志收集系統和消息系統。

Kafka主要設計目標如下:

  • 以時間復雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數據也能保證常數時間的訪問性能。
  • 高吞吐率。即使在非常廉價的商用機器上也能做到單機支持每秒100K條消息的傳輸。
  • 支持Kafka Server間的消息分區,及分佈式消費,同時保證每個partition內的消息順序傳輸。
  • 同時支持離線數據處理和實時數據處理。
  • Scale out:支持在線水平擴展

Step 1: 下載代碼

你可以登錄Apache kafka 官方下載。
http://kafka.apache.org/downloads.html
備註:2.11-1.1.0版本才與JDK1.7兼容,否則更高版本需要JDK1.8

在這裡插入圖片描述

Step 2: 啟動服務

運行kafka需要使用Zookeeper,所以你需要先啟動Zookeeper,如果你沒有Zookeeper,你可以使用kafka自帶打包和配置好的Zookeeper(PS:在kafka包裡)。

//這是前臺啟動,啟動以後,當前就無法進行其他操作(不推薦)
./zookeeper-server-start.sh ../config/zookeeper.properties

//後臺啟動(推薦)
./zookeeper-server-start.sh ../config/zookeeper.properties 1>/dev/null 2>&1 &

現在啟動kafka

config/server1.properties:
	broker.id=0
	listeners=PLAINTEXT://192.168.10.130:9092
	log.dirs=kafka-logs
	zookeeper.connect=localhost:2181
//後臺啟動kafka
./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &

Step 3:創建一個主題

  創建一個名為“test”的Topic,隻有一個分區和備份(2181是zookeeper的默認端口)

./kafka-topics.sh --create --zookeeper localhost:2181 --config max.message.bytes=12800000 --config flush.messages=1 --replication-factor 1 --partitions 1 --topic test
命令解析:
--create: 指定創建topic動作

--topic:指定新建topic的名稱

--zookeeper: 指定kafka連接zk的連接url,該值和server.properties文件中的配置項{zookeeper.connect}一樣

--config:指定當前topic上有效的參數值,參數列表參考文檔為: http://kafka.apache.org/082/documentation.html#brokerconfigs

--partitions:指定當前創建的kafka分區數量,默認為1個

--replication-factor:指定每個分區的復制因子個數,默認1個

創建好之後,可以通過運行以下命令,查看已創建的topic信息:

>./kafka-topics.sh --list --zookeeper localhost:2181
test

  或者,除瞭手工創建topic外,你也可以配置你的broker,當發佈一個不存在的topic時自動創建topic。

  補充:
(1)查看對應topic的描述信息

./kafka-topics.sh --describe --zookeeper localhost:2181  --topic test0
命令解析:
--describe: 指定是展示詳細信息命令

--zookeeper: 指定kafka連接zk的連接url,該值和server.properties文件中的配置項{zookeeper.connect}一樣

--topic:指定需要展示數據的topic名稱

這裡寫圖片描述

  (2)Topic信息修改

bin/kafka-topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --config max.message.bytes=128000
bin/kafka-topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --delete-config max.message.bytes
bin/kafka-topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --partitions 10 
bin/kafka-topics.sh --zookeeper 192.168.187.146:2181 --alter --topic test0 --partitions 3 ## Kafka分區數量隻允許增加,不允許減少

  (3)Topic刪除
默認情況下Kafka的Topic是沒法直接刪除的,需要進行相關參數配置

bin/kafka-topics.sh --delete --topic test0 --zookeeper 192.168.187.146:2181

加粗樣式
Note: This will have no impact if delete.topic.enable is not set to true.## 默認情況下,刪除是標記刪除,沒有實際刪除這個Topic;如果運行刪除Topic,兩種方式:
方式一:通過delete命令刪除後,手動將本地磁盤以及zk上的相關topic的信息刪除即可
方式二:配置server.properties文件,給定參數delete.topic.enable=true,重啟kafka服務,此時執行delete命令表示允許進行Topic的刪除

Step 4: 發送消息

  Kafka提供瞭一個命令行的工具,可以從輸入文件或者命令行中讀取消息並發送給Kafka集群。每一行是一條消息。

  運行producer(生產者),然後在控制臺輸入幾條消息到服務器。
  備註:這裡的localhost:9092不是固定的,需要根據server.properties中配置的地址來寫這裡的地址!

[root@administrator bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
>this is a message
>this is another message
//按`Ctrl+C`終止輸入

Step 5: 消費消息

  Kafka也提供瞭一個消費消息的命令行工具,將存儲的信息輸出出來。
  備註:這裡的localhost:9092不是固定的,需要根據server.properties中配置的地址來寫這裡的地址!

[root@administrator bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
this is a message
this is another message
//按`Ctrl+C`終止讀取消息

  如果你有2臺不同的終端上運行上述命令,那麼當你在運行生產者時,消費者就能消費到生產者發送的消息。

Step 6: 設置多個broker集群(單機偽集群的配置)

  到目前,我們隻是單一的運行一個broker,沒什麼意思。對於Kafka,一個broker僅僅隻是一個集群的大小,所有讓我們多設幾個broker。

  首先為每個broker創建一個配置文件:

cp config/server.properties config/server-1.properties 
cp config/server.properties config/server-2.properties 

現在編輯這些新建的文件,設置以下屬性:

vim config/server.properties 
config/server1.properties:
	broker.id=0
	listeners=PLAINTEXT://192.168.10.130:9092
	log.dirs=kafka-logs
	zookeeper.connect=localhost:2181
	
config/server-1.properties: 
    broker.id=1
	listeners=PLAINTEXT://192.168.10.130:9093
	log.dirs=kafka-logs-1
	zookeeper.connect=localhost:2181

config/server-2.properties: 
    broker.id=2
	listeners=PLAINTEXT://192.168.10.130:9094
	log.dirs=kafka-logs-2
	zookeeper.connect=localhost:2181

  備註1listeners一定要配置成為IP地址;如果配置為localhost或服務器的hostname,在使用java發送數據時就會拋出異 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired 。因為在沒有配置advertised.host.name 的情況下,Kafka並沒有像官方文檔宣稱的那樣改為廣播我們配置的host.name,而是廣播瞭主機配置的hostname。遠端的客戶端並沒有配置 hosts,所以自然是連接不上這個hostname的。

  備註2:當使用java客戶端訪問遠程的kafka時,一定要把集群中所有的端口打開,否則會連接超時

/sbin/iptables -I INPUT -p tcp --dport 9092 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 9093 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 9094 -j ACCEPT
/etc/rc.d/init.d/iptables save

  broker.id是集群中每個節點的唯一且永久的名稱,我們修改端口和日志目錄是因為我們現在在同一臺機器上運行,我們要防止broker在同一端口上註冊和覆蓋對方的數據。

  我們已經運行瞭zookeeper和剛才的一個kafka節點,所有我們隻需要在啟動2個新的kafka節點。

./kafka-server-start.sh ../config/server-1.properties 1>/dev/null 2>&1 &
./kafka-server-start.sh ../config/server-2.properties 1>/dev/null 2>&1 &

  現在,我們創建一個新topic,把備份設置為:3

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

  好瞭,現在我們已經有瞭一個集群瞭,我們怎麼知道每個集群在做什麼呢?運行命令“describe topics”

> ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
//所有分區的摘要
Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
//提供一個分區信息,因為我們隻有一個分區,所以隻有一行。
Topic: my-replicated-topic    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
  • “leader”:該節點負責該分區的所有的讀和寫,每個節點的leader都是隨機選擇的。
  • “replicas”:備份的節點列表,無論該節點是否是leader或者目前是否還活著,隻是顯示。
  • “isr”:“同步備份”的節點列表,也就是活著的節點並且正在同步leader

  其中ReplicasIsr中的1,2,0就對應著3個broker他們的broker.id屬性!

  我們運行這個命令,看看一開始我們創建的那個節點:

> ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test    PartitionCount:1    ReplicationFactor:1    Configs:
Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0

  這並不奇怪,剛才創建的主題沒有Replicas,並且在服務器“0”上,我們創建它的時候,集群中隻有一個服務器,所以是“0”。

Step 7: 測試集群的容錯能力

7.1發佈消息到集群

[root@administrator bin]# ./kafka-console-producer.sh --broker-list 192.168.10.130:9092 --topic my-replicated-topic
>cluster message 1
>cluster message 2
//Ctrl+C終止產生消息

7.2消費消息

[root@administrator bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.10.130:9093 --from-beginning --topic my-replicated-topic
cluster message 1
cluster message 2
//Ctrl+C終止消費消息

7.3幹掉leader,測試集群容錯

首先查詢誰是leader

> ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
//所有分區的摘要
Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
//提供一個分區信息,因為我們隻有一個分區,所以隻有一行。
Topic: my-replicated-topic    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0

  可以看到Leaderbroker.id1,找到對應的Broker

[root@administrator bin]# jps -m
5130 Kafka ../config/server.properties
4861 QuorumPeerMain ../config/zookeeper.properties
1231 Bootstrap start start
7420 Kafka ../config/server-2.properties
7111 Kafka ../config/server-1.properties
9139 Jps -m

  通過以上查詢到LeaderPIDKafka ../config/server-1.properties)為7111,殺掉該進程

//殺掉該進程
kill -9 7111
//再查詢一下,確認新的Leader已經產生,新的Leader為broker.id=0
[root@administrator bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3    Configs:
//備份節點之一成為新的leader,而broker1已經不在同步備份集合裡瞭
Topic: my-replicated-topic      Partition: 0    Leader: 0       Replicas: 1,0,2 Isr: 0,2


7.4再次消費消息,確認消息沒有丟失

[root@administrator bin]# ./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
cluster message 1
cluster message 2

  消息依然存在,故障轉移成功!!

到此這篇關於kafka安裝部署的文章就介紹到這瞭,更多相關kafka安裝部署內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: