本文共 9706 字,大约阅读时间需要 32 分钟。
本文简要介绍Kafka基本概念、Kafka集群架构和Topic设计以及Kafka集群环境的配置部署,并且测试生产端和消费端的消息处理,加强对Kafka消息队列设计理念的理解。
Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:
Zookeeper是一种在分布式系统中被广泛用来作为:分布式状态管理、分布式协调管理、分布式配置管理、和分布式锁服务的集群。Kafka增加和减少服务器都会在Zookeeper节点上触发相应的事件kafka系统会捕获这些事件,进行新一轮的负载均衡,客户端也会捕获这些事件来进行新一轮的处理。
Kafka中有些基本名词如下所示:
kafka分区是提高kafka性能的关键所在,当你发现你的集群性能不高时,常用手段就是增加Topic的分区,分区里面的消息是按照从新到老的顺序进行组织,消费者从队列头订阅消息,生产者从队列尾添加消息。
1)Kafka集群架构
如上图所示,一个典型的kafka集群中包含若干producer(可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干consumer group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalance。producer使用push模式将消息发布到broker,consumer使用pull模式从broker订阅并消费消息。2)Kafka Topic结构
Kafka集群中的消息,是通过Topic(主题)来进行组织的,如下图所示: 因为每条消息都被append到该partition中,是顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。每一条消息被发送到broker时,会根据paritition规则选择被存储到哪一个partition。如果partition规则设置的合理,所有消息可以均匀分布到不同的partition里,这样就实现了水平扩展。(如果一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个topic的性能瓶颈,而partition解决了这个问题)服务器环境配置信息如下表所示:
角色 | 主机名 | IP | 系统 |
---|---|---|---|
Kafka集群 | tango-centos01 | 192.168.112.101 | Centos7-X86_64 |
Kafka集群 | tango-centos02 | 192.168.112.102 | Centos7-X86_64 |
Kafka集群 | tango-centos03 | 192.168.112.103 | Centos7-X86_64 |
Zookeeper集群 | tango-centos01 | 192.168.112.101 | Centos7-X86_64 |
Zookeeper集群 | tango-centos02 | 192.168.112.102 | Centos7-X86_64 |
Zookeeper集群 | tango-centos03 | 192.168.112.103 | Centos7-X86_64 |
1)解压安装包到指定目录
[root@tango-centos01 src-install]# tar -xzvf kafka_2.11-1.1.0.tgz -C /usr/local/elk
2)配置zookeeper集群,修改配置文件zookeeper.properties
[root@tango-centos01 config]# pwd/usr/local/elk/kafka_2.11-1.1.0/config[root@tango-centos01 config]# vi zookeeper.properties # the directory where the snapshot is stored.dataDir=/usr/local/elk/kafka-data/zookeeper# the port at which the clients will connectclientPort=2181# disable the per-ip limit on the number of connections since this is a non-production configmaxClientCnxns=0tickTime=2000initLimit=20syncLimit=10server.1=192.168.112.101:2888:3888server.2=192.168.112.102:2888:3888server.3=192.168.112.103:2888:3888
[root@tango-centos01 kafka-data]# mkdir zookeeper[root@tango-centos01 kafka-data]# lszookeeper
4)在/data/zookeeper目录下创建myid文件,里面的内容为数字,用于标识主机,如果这个文件没有的话,zookeeper是没法启动的
[root@tango-centos01 zookeeper]# echo "1" > myid[root@tango-centos01 zookeeper]# lsmyid[root@tango-centos01 zookeeper]# cat myid1
5)另外2个节点的zookeeper配置相同,除了修改myid文件
[root@tango-centos02 zookeeper]# cat myid2[root@tango-centos03 zookeeper]# cat myid3
6)启动zookeeper集群,其中2机被选中为Leader
[root@tango-centos01 kafka_2.11-1.1.0]# nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &[root@tango-centos02 kafka_2.11-1.1.0]# nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &[root@tango-centos03 kafka_2.11-1.1.0]# nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
7)Zookeeper服务检查
[root@tango-centos01 kafka_2.11-1.1.0]# netstat -nlpt | grep -E "2181|2888|3888"tcp6 0 0 192.168.112.101:3888 :::* LISTEN 1318/java tcp6 0 0 :::2181 :::* LISTEN 1318/java[root@tango-centos02 kafka_2.11-1.1.0]# netstat -nlpt | grep -E "2181|2888|3888"tcp6 0 0 192.168.112.102:3888 :::* LISTEN 1131/java tcp6 0 0 :::2181 :::* LISTEN 1131/java tcp6 0 0 192.168.112.102:2888 :::* LISTEN 1131/java[root@tango-centos03 kafka_2.11-1.1.0]# netstat -nlpt | grep -E "2181|2888|3888"tcp6 0 0 192.168.112.103:3888 :::* LISTEN 1126/java tcp6 0 0 :::2181 :::* LISTEN 1126/java
1)修改Kafka配置文件config/server.properties
# The id of the broker. This must be set to a unique integer for each broker.broker.id=1#listeners=PLAINTEXT://192.168.112.101:9092advertised.listeners=PLAINTEXT://192.168.112.101:9092# A comma separated list of directories under which to store log fileslog.dirs=/usr/local/elk/kafka-data/kafka-logs# The default number of log partitions per topic. More partitions allow greater# parallelism for consumption, but this will also result in more files across# the brokers.num.partitions=4# Zookeeper connection string (see zookeeper docs for details).# This is a comma separated host:port pairs, each corresponding to a zk# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".# You can also append an optional chroot string to the urls to specify the# root directory for all kafka znodes.zookeeper.connect=192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181
2)配置其它节点的kafka,修改以下,其它保持相同:
broker.id=2listeners=PLAINTEXT://192.168.112.102:9092advertised.listeners=PLAINTEXT://192.168.112.102:9092
broker.id=3listeners=PLAINTEXT://192.168.112.103:9092advertised.listeners=PLAINTEXT://192.168.112.103:9092
3)创建kafka所需的目录
[root@tango-centos01 kafka-data]# lszookeeper[root@tango-centos01 kafka-data]# mkdir kafka-logs[root@tango-centos01 kafka-data]# lskafka-logs zookeeper
4)启动Kafka集群
[root@tango-centos01 kafka_2.11-1.1.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties &[root@tango-centos02 kafka_2.11-1.1.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties &[root@tango-centos03 kafka_2.11-1.1.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties &
5)检查端口情况
[root@tango-centos01 kafka_2.11-1.1.0]# netstat -nlpt | grep -E "9092|2181"tcp6 0 0 192.168.112.101:9092 :::* LISTEN 1821/java tcp6 0 0 :::2181 :::* LISTEN 1407/java
[root@tango-centos02 kafka_2.11-1.1.0]# netstat -nlpt | grep -E "9092|2181"tcp6 0 0 192.168.112.102:9092 :::* LISTEN 1489/java tcp6 0 0 :::2181 :::* LISTEN 1160/java
[root@tango-centos03 kafka_2.11-1.1.0]# netstat -nlpt | grep -E "9092|2181"tcp6 0 0 192.168.112.103:9092 :::* LISTEN 1432/java tcp6 0 0 :::2181 :::* LISTEN 1099/java
1)建立一个主题
[root@tango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic summerCreated topic "summer".
2)查看哪些主题已经创建
[root@tango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181summer
3)查看summer主题详情
[root@tango-centos02 kafka_2.11-1.1.0]# ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic summerTopic:summer PartitionCount:1 ReplicationFactor:2 Configs: Topic: summer Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
4)发送消息,使用的是生产者角色
[root@tango-centos01 kafka_2.11-1.1.0]# ./bin/kafka-console-producer.sh --broker-list 192.168.112.101:9092 --topic summer>This is a Message>Welcome to Kafka
5)接收消息,使用的是消费者角色
[root@tango-centos03 kafka_2.11-1.1.0]# ./bin/kafka-console-consumer.sh --zookeeper 192.168.112.103:2181 --topic summer --from-beginningUsing the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].This is a MessageWelcome to Kafka
1)启动Kafka进程
nohup ./bin/kafka-server-start.sh ./config/server.properties &
2)停止Kafka进程
./bin/kafka-server-stop.sh
3)检查服务状态
netstat -nlpt | grep -E "9092|2181"ps -ef|grep kafka
4)创建Topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic TestKafka
5)查看Topic情况
./bin/kafka-topics.sh --describe --zookeeper 192.168.112.101:2181./bin/kafka-topics.sh --describe --zookeeper 192.168.112.101:2181 --topic TestKafka
6)查看消费组情况
./bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.112.101:9092 --list./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.112.101:9092 --group TestKafka-consumer-group --describe
7)Shell方式生产端
./bin/kafka-console-producer.sh --broker-list 192.168.112.101:9092 --topic TestKafka
8)Shell方式消费端
./bin/kafka-console-consumer.sh --zookeeper 25.1.34.37:2181 --topic TestKafka --from-beginning./bin/kafka-console-consumer.sh --bootstrap-server 192.168.112.101:9092 --topic TestKafka --partition 0 --offset 26700
参考资料:
转载请注明原文地址:https://blog.csdn.net/solihawk/article/details/115901371
文章会同步在公众号“牧羊人的方向”更新,感兴趣的可以关注公众号,谢谢!