一看必會系列:kafka和zookeeper集群搭建及測試

來源:本站原創 Linux 超過485 views圍觀 0條評論

——————–注意開啟開機啟動

——————–三臺服務器安裝JDK
yum  install  java-1.8.0-openjdk   java-1.8.0-openjdk-devel -y

[[email protected] ~]# java -v
Unrecognized option: -v
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
[[email protected] ~]# java -version
openjdk version "1.8.0_171"
OpenJDK Runtime Environment (build 1.8.0_171-b10)
OpenJDK 64-Bit Server VM (build 25.171-b10, mixed mode)

配置環境
vim /etc/profile
     #set java environment 
    JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64
    PATH=$PATH:$JAVA_HOME/bin
    CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
    export JAVA_HOME  CLASSPATH  PATH 

重讀變量
[[email protected] ~]# source /etc/profile

看是否正常
打印java_hosm
[[email protected] ~]# echo $JAVA_HOME
/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64
打印calsspath
[[email protected] ~]# echo $CLASSPATH 
.:/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64/lib/dt.jar:/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64/lib/tools.jar

————————安裝zookeeper

#我的目錄統一放在/opt下面
mkdir /opt
#首先創建Zookeeper項目目錄
#項目目錄
mkdir -p /opt/zookeeper
#存放快照日志
mkdir -p /opt/zookeeper/zkdata
#存放事物日志
mkdir -p /opt/zookeeper/zkdatalog

wget https://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.12/
cd /opt/zookeeper/
tar -zxvf zookeeper-3.4.12.tar.gz
cp zookeeper-3.4.12/conf/zoo_sample.cfg zookeeper-3.4.12/conf/zoo.cfg

#三臺配置一樣
vim zookeeper-3.4.12/conf/zoo.cfg
# example sakes.
dataDir=/opt/zookeeper/zkdata
#新加配置
dataLogDir=/opt/zookeeper/zkdatalog
# the port at which the clients will connect
clientPort=2181
server.1=192.168.142.130:2888:3888
server.2=192.168.142.131:2888:3888
server.3=192.168.142.132:2888:3888
#server.1 這個1是服務器的標識也可以是其他的數字, 表示這個是第幾號服務器,用來標識服務器,這個標識要寫到快照目錄下面myid文件里
#192.168.142.107為集群里的IP地址,第一個端口是master和slave之間的通信端口,默認是2888,第二個端口是leader選舉的端口,集群剛啟動的時候選舉或者leader掛掉之后進行新的選舉的端口默認是3888

創建myid文件

#server1
echo "1" > /opt/zookeeper/zkdata/myid
#server2
echo "2" > /opt/zookeeper/zkdata/myid
#server3
echo "3" > /opt/zookeeper/zkdata/myid

三臺服務器啟動服務
/opt/zookeeper/zookeeper-3.4.12/bin/zkServer.sh start

查看三臺的狀態
[[email protected] ~]# /opt/zookeeper/zookeeper-3.4.12/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/zookeeper-3.4.12/bin/../conf/zoo.cfg
Mode: follower
[[email protected] ~]# /opt/zookeeper/zookeeper-3.4.12/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/zookeeper-3.4.12/bin/../conf/zoo.cfg
Mode: leader     ——這個是leader隨機生成
[[email protected] ~]# /opt/zookeeper/zookeeper-3.4.12/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/zookeeper-3.4.12/bin/../conf/zoo.cfg
Mode: follower

可以用“jps”查看zk的進程,這個是zk的整個工程的main
[[email protected] ~]# jps
1442 Jps
1348 QuorumPeerMain

———-啟動排錯

[[email protected] ~]# /opt/zookeeper/zookeeper-3.4.12/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/zookeeper-3.4.12/bin/../conf/zoo.cfg
Error contacting service. It is probably not running.  ——–出現這個不要慌,只要配置正確,再一臺一臺啟動就OK了

如果不是不行 如下操作
mv  /opt/zookeeper/zkdata/zookeeper_server.pid  /tmp/
tail -f /root/zookeeper.out  查看日志

java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:558)
    at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:610)
    at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:838)
    at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:957)
2018-07-02 11:19:05,937 [myid:1] – INFO  [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:[email protected]] – Resolved hostname: 192.168.142.132 to address: /192.168.142.132
2018-07-02 11:19:05,937 [myid:1] – INFO  [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:[email protected]] – Notification time out: 51200
2018-07-02 11:19:57,139 [myid:1] – WARN  [QuorumPeer[myid=1]/0:0:0:0:0:0:0:0:2181:[email protected]] – Cannot open channel to 2 at election address /192.168.142.131:3888
java.net.ConnectException: Connection refused (Connection refused)   ——-

檢查防火墻,是否開放端口
檢查對端服務器 ZK是否起來

———————————-安裝KAFKA

下載包
https://mirrors.cnnic.cn/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz  這個快

mkdir -p /opt/kafka/kafkalogs

tar -zxvf kafka_2.12-1.1.0.tgz
cd /opt/kafka/kafka_2.12-1.1.0/config
cp server.properties server.properties.bak
cp zookeeper.properties zookeeper.properties.bak

vim server.properties

啟動kafka
cd /opt/kafka/kafka_2.12-1.1.0/bin
./kafka-server-start.sh -daemon ../config/server.properties
               :::*                    LISTEN      1062/master        

三臺服務器查看
[[email protected] bin]# jps
1526 QuorumPeerMain
1974 Kafka
1997 Jps

[[email protected] bin]# !net
netstat -ntlp
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name   
tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      978/sshd           
tcp        0      0 127.0.0.1:25            0.0.0.0:*               LISTEN      1062/master        
tcp6       0      0 :::2181                 :::*                    LISTEN      1348/java          
tcp6       0      0 :::37797                :::*                    LISTEN      1348/java          
tcp6       0      0 :::36009                :::*                    LISTEN      1756/java          
tcp6       0      0 192.168.142.132:3888    :::*                    LISTEN      1348/java          
tcp6       0      0 :::22                   :::*                    LISTEN      978/sshd           
tcp6       0      0 ::1:25  

創建#創建Topic
./kafka-topics.sh –create –zookeeper 192.168.142.130:2181 –replication-factor 2 –partitions 1 –topic niux
#解釋
–replication-factor 2   #復制兩份
–partitions 1 #創建1個分區
–topic #主題為shuaige

執行結果
[[email protected] bin]#
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Created topic "niux".   ——表示成功
[[email protected] bin]#

#創建一個broker,發布者
[[email protected] bin]# ./kafka-console-producer.sh –broker-list 192.168.142.130:9092,192.168.142.131:9092,192.168.142.132:9092 –topic niux
OpenJDK 64-Bit Server VM warning: If the number of procenfigure the number of parallel GC threads appropriately
>d     ———這里生產消息
d

”’在一臺服務器上創建一個訂閱者”’
[[email protected] bin]# ./kafka-console-consumer.sh –zookeeper 192.168.142.130:2181,192.168.142.131:2181,192.168.142.132:2181 –topic niux –from-beginning
OpenJDK 64-Bit Server VM warning: If the number of procenfigure the number of parallel GC threads appropriately
Using the ConsoleConsumer with old consumer is deprecateusing the new consumer by passing [bootstrap-server] ins
4    —-這里及以下為收到的消息
d       
d

——正確的測試方案
其中一臺服務器運行 做為生產者
./kafka-console-producer.sh –broker-list IP:9092 –topic niux
另兩臺服務器運行 做為消費者
./kafka-console-consumer.sh –zookeeper IP:2181 –topic niux –from-beginning

生產者進行信息輸入的同時 另兩臺消費者如能同時出現信息即為正常

查看topic

./kafka-topics.sh –list –zookeeper localhost:2181
#就會顯示我們創建的所有topic
[[email protected] bin]# ./kafka-topics.sh –list –zookeeper localhost:2181
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
niux
[[email protected] bin]#

查看topic狀態
./kafka-topics.sh –describe –zookeeper localhost:2181 –topic niux

[[email protected] bin]# ./kafka-topics.sh –describe –zookeeper localhost:2181 –topic niux
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Topic:niux    PartitionCount:1    ReplicationFactor:2    Configs:
    Topic: niux    Partition: 0    Leader: 131    Replicas: 131,132    Isr: 131,132
#分區為為1  復制因子為2   他的  niux的分區為0
#Replicas: 131,132   復制的為131,132
#

——————-配置文件解釋  zoo.cfg

#tickTime:
這個時間是作為 Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。
#initLimit:
這個配置項是用來配置 Zookeeper 接受客戶端(這里所說的客戶端不是用戶連接 Zookeeper 服務器的客戶端,而是 Zookeeper 服務器集群中連接到 Leader 的 Follower 服務器)初始化連接時最長能忍受多少個心跳時間間隔數。當已經超過 5個心跳的時間(也就是 tickTime)長度后 Zookeeper 服務器還沒有收到客戶端的返回信息,那么表明這個客戶端連接失敗。總的時間長度就是 5*2000=10 秒
#syncLimit:
這個配置項標識 Leader 與Follower 之間發送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是5*2000=10秒
#dataDir:
快照日志的存儲路徑
#dataLogDir:
事物日志的存儲路徑,如果不配置這個那么事物日志會默認存儲到dataDir制定的目錄,這樣會嚴重影響zk的性能,當zk吞吐量較大的時候,產生的事物日志、快照日志太多
#clientPort:
這個端口就是客戶端連接 Zookeeper 服務器的端口,Zookeeper 會監聽這個端口,接受客戶端的訪問請求。修改他的端口改大點

重要配置說明

1、myid文件和server.myid  在快照目錄下存放的標識本臺服務器的文件,他是整個zk集群用來發現彼此的一個重要標識。

2、zoo.cfg 文件是zookeeper配置文件 在conf目錄里。

3、log4j.properties文件是zk的日志輸出文件 在conf目錄里用java寫的程序基本上有個共同點日志都用log4j,來進行管理。

——————-配置文件解釋  server.properties
broker.id=0  #當前機器在集群中的唯一標識,和zookeeper的myid性質一樣
port=9092 #當前kafka對外提供服務的端口默認是9092
host.name=192.168.142.100 #這個參數默認是關閉的,在0.8.1有個bug,DNS解析問題,失敗率的問題。
num.network.threads=3 #這個是borker進行網絡處理的線程數
num.io.threads=8 #這個是borker進行I/O處理的線程數
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目錄,這個目錄可以配置為“,”逗號分割的表達式,上面的num.io.threads要大于這個目錄的個數這個目錄,如果配置多個目錄,新創建的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個
socket.send.buffer.bytes=102400 #發送緩沖區buffer大小,數據不是一下子就發送的,先回存儲到緩沖區了到達一定的大小后在發送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收緩沖區大小,當數據到達一定大小后在序列化到磁盤
socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小
num.partitions=1 #默認的分區數,一個topic默認1個分區數
log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務
replica.fetch.max.bytes=5242880  #取消息的最大直接數
log.segment.bytes=1073741824 #這個參數是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過期的消息如果有,刪除
log.cleaner.enable=false #是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能
zookeeper.connect=192.168.142.100:12181,192.168.142.101:12181,192.168.142.107:1218 #設置zookeeper的連接端口

——————-Kafka初識

1、Kafka使用背景

在我們大量使用分布式數據庫、分布式計算集群的時候,是否會遇到這樣的一些問題:

    我們想分析下用戶行為(pageviews),以便我們設計出更好的廣告位
    我想對用戶的搜索關鍵詞進行統計,分析出當前的流行趨勢
    有些數據,存儲數據庫浪費,直接存儲硬盤效率又低

這些場景都有一個共同點:
數據是由上游模塊產生,上游模塊,使用上游模塊的數據計算、統計、分析,這個時候就可以使用消息系統,尤其是分布式消息系統!
2、Kafka的定義
What is Kafka:它是一個分布式消息系統,由linkedin使用scala編寫,用作LinkedIn的活動流(Activity Stream)和運營數據處理管道(Pipeline)的基礎。具有高水平擴展和高吞吐量。
3、Kafka和其他主流分布式消息系統的對比
定義解釋:
1、Java 和 scala都是運行在JVM上的語言。
2、erlang和最近比較火的和go語言一樣是從代碼級別就支持高并發的一種語言,所以RabbitMQ天生就有很高的并發性能,但是 有RabbitMQ嚴格按照AMQP進行實現,受到了很多限制。kafka的設計目標是高吞吐量,所以kafka自己設計了一套高性能但是不通用的協議,他也是仿照AMQP( Advanced Message Queuing Protocol   高級消息隊列協議)設計的。
3、事物的概念:在數據庫中,多個操作一起提交,要么操作全部成功,要么全部失敗。舉個例子, 在轉賬的時候付款和收款,就是一個事物的例子,你給一個人轉賬,你轉成功,并且對方正常行收到款項后,這個操作才算成功,有一方失敗,那么這個操作就是失敗的。
對應消在息隊列中,就是多條消息一起發送,要么全部成功,要么全部失敗。3個中只有ActiveMQ支持,這個是因為,RabbitMQ和Kafka為了更高的性能,而放棄了對事物的支持 。
4、集群:多臺服務器組成的整體叫做集群,這個整體對生產者和消費者來說,是透明的。其實對消費系統組成的集群添加一臺服務器減少一臺服務器對生產者和消費者都是無感之的。
5、負載均衡,對消息系統來說負載均衡是大量的生產者和消費者向消息系統發出請求消息,系統必須均衡這些請求使得每一臺服務器的請求達到平衡,而不是大量的請求,落到某一臺或幾臺,使得這幾臺服務器高負荷或超負荷工作,嚴重情況下會停止服務或宕機。
6、動態擴容是很多公司要求的技術之一,不支持動態擴容就意味著停止服務,這對很多公司來說是不可以接受的。
注:
阿里巴巴的Metal,RocketMQ都有Kafka的影子,他們要么改造了Kafka或者借鑒了Kafka,最后Kafka的動態擴容是通過Zookeeper來實現的。
Zookeeper是一種在分布式系統中被廣泛用來作為:分布式狀態管理、分布式協調管理、分布式配置管理、和分布式鎖服務的集群。kafka增加和減少服務器都會在Zookeeper節點上觸發相應的事件kafka系統會捕獲這些事件,進行新一輪的負載均衡,客戶端也會捕獲這些事件來進行新一輪的處理。
Kafka相關概念

1、 AMQP協議
Advanced Message Queuing Protocol (高級消息隊列協議)
The Advanced Message Queuing Protocol (AMQP):是一個標準開放的應用層的消息中間件(Message Oriented Middleware)協議。AMQP定義了通過網絡發送的字節流的數據格式。因此兼容性非常好,任何實現AMQP協議的程序都可以和與AMQP協議兼容的其他程序交互,可以很容易做到跨語言,跨平臺。
上面說的3種比較流行的消息隊列協議,要么支持AMQP協議,要么借鑒了AMQP協議的思想進行了開發、實現、設計。
2、 一些基本的概念
1、消費者:(Consumer):從消息隊列中請求消息的客戶端應用程序
2、生產者:(Producer)  :向broker發布消息的應用程序
3、AMQP服務端(broker):用來接收生產者發送的消息并將這些消息路由給服務器中的隊列,便于fafka將生產者發送的消息,動態的添加到磁盤并給每一條消息一個偏移量,所以對于kafka一個broker就是一個應用程序的實例
kafka支持的客戶端語言:Kafka客戶端支持當前大部分主流語言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript
可以使用以上任何一種語言和kafka服務器進行通信(即辨析自己的consumer從kafka集群訂閱消息也可以自己寫producer程序)

——————還有一個需要注意
ZooKeeper server will not remove old snapshots and log files when using the default configuration (see autopurge below), this is the responsibility of the operator
zookeeper不會主動的清除舊的快照和日志文件,這個是操作者的責任。

但是可以通過命令去定期的清理

————腳本示例
#!/bin/bash
#snapshot file dir
dataDir=/opt/zookeeper/zkdata/version-2
#tran log dir
dataLogDir=/opt/zookeeper/zkdatalog/version-2

#Leave 66 files
count=66
count=$[$count+1]
ls -t $dataLogDir/log.* | tail -n +$count | xargs rm -f
ls -t $dataDir/snapshot.* | tail -n +$count | xargs rm -f

#以上這個腳本定義了刪除對應兩個目錄中的文件,保留最新的66個文件,可以將他寫到crontab中,設置為每天凌晨2點執行一次就可以了。

#zk log dir   del the zookeeper log
#logDir=
#ls -t $logDir/zookeeper.log.* | tail -n +$count | xargs rm -f

參考
https://www.cnblogs.com/zhaojiankai/p/7181910.html?utm_source=itdadao&utm_medium=referral
http://www.cnblogs.com/smartloli/p/4538173.html
https://blog.csdn.net/my_bai/article/details/68490632

文章出自:CCIE那點事 http://www.qdxgqk.live/ 版權所有。本站文章除注明出處外,皆為作者原創文章,可自由引用,但請注明來源。 禁止全文轉載。
本文鏈接:http://www.qdxgqk.live/?p=3798轉載請注明轉自CCIE那點事
如果喜歡:點此訂閱本站
?
?
萌宠夺宝游戏