youyichannel

志于道,据于德,依于仁,游于艺!

0%

Kafka Basic

体系结构

一个典型的Kafka体系结构包含若干 Producer、若干 Consumer、若干 Broker,以及一个 ZooKeeper 集群。其中 Zookeeper 负责给 Kafka 实现集群元数据的管理、控制器的选举等功能

Producer 将消息发送到 Broker,Broker 负责将接收到的消息存储到磁盘中,Consumer 负责订阅 Broker 并消费消息。对于 Kafka 而言,Broker 可以简单看作一个独立的 Kafka 服务节点或 Kafka 服务实例

大多数情况下也可以将 Broker 看作一台 Kafka 服务器,前提是这台服务器上只部署了一个 Kafka 实例;多个 Broker 组成了一个 Kafka 集群。

Topic-Partition

Kafka 中的消息以主题 (Topic) 为单位进行归类,生产者负责将消息发送到特定的主题,而消费者负责订阅主题并进行消费。

主题是一个逻辑概念,一个主题可以细分成多个分区,一个分区只属于一个主题。同一个主题下的不同分区包含的消息是不同的,分区在存储层面可以看做一个可追加的日志 (Log) 文件,消息在被追加到分区日志文件时会分配一个特定的偏移量 (offset)。offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,但是 offset 不跨分区,这也就意味着 Kafka 保证的是分区有序而不是主题有序。除此之外,Kafka 中的分区可以分布在不同的 Broker 上。

每一条消息被发送到 Broker 之前,会根据分区规则选择存储到一个具体的分区。如果一个主题只对应一个文件(分区 Log 文件),那么这个文件所在的机器 IO 将成为这个主题的性能瓶颈,多分区就解决了这个问题。

Replica

Kafka 为分区引入了多副本 (Replica) 机制, 通过增加副本数量可以提升容灾能力。副本之间是「一主多从」的关系,其中 leader 副本负责处理读写请求,follower 副本只负责与 leader 副本的消息同步。多个副本位于不同的 Broker 中,当 leader 副本出现故障时,从 follower 副本中重新选举新的 leader 副本对外提供服务。Kafka 通过多副本机制实现故障自动转移,即当 Kafka 集群中某个 Broker 失效时依然能够保证服务可用。

⚠️注意:同一分区的不同副本中保存的是相同的消息,但是在同一时刻,副本之间并非完全一样。

【🌰栗子】Kafka 集群中存在 4 个 Broker,某个 Topic 存在三分区三副本。

PS:Kafka Consumer 也具备一定的容灾能力。 Consumer 使用拉 (Pull) 模式从服务端拉取消息,并且保存消费的具体位置,当消费者宕机后恢复上线时可以根据之前保存的消费位置重新拉取需要的消息进行消费,这样就不会造成消息丢失 。

分区副本统称为 AR (Assigned Replicas)。所有与 leader 副本保持一定程度的同步的副本(包括 leader 副本)组成 ISR (In-Sync Replicas),其中「一定程度的同步」指的是可以忍受的滞后范围,这个范围可以通过参数配置。与 leader 副本同步之后过多的副本(不包括 leader 副本)组成 OSR (Out-Sync Replicas),即 AR = ISR + OSR

正常情况下,所有的 follower 副本都应该与 leader 副本保持一定程度的同步,即 AR = ISR,OSR 集合为空。

ISR 和 OSR 在 Kafka 运行过程中是动态变化的,leader 副本负责维护和跟踪 ISR 集合中所有 follower 副本的滞后状态,当 follower 副本落后太多或失效时,leader副本会把它从 ISR 集合中剔除;如果 OSR 集合中有 follower 副本一定程度同步追上了 leader 副本,那么 leader 副本会把它从 OSR 集合转移至 ISR 集合。

默认情况下,当 leader 副本发生故障时,只有在 ISR 集合中的副本才有资格被选举为新的 leader,而在 OSR 集合中的副本没有机会。

HW / LEO

ISR 与 HW (High Watermark) 和 LEO (Log End Offset) 也有紧密的联系。

  • HW,高水位,标识了一个特定的消息偏移量 (offset),消费者只能够拉取到这个 offset 之前的消息。
  • LEO,标识当前日志文件中下一条待写入消息的 offset。

分区 ISR 集合中的每个个副本都会维护自身的 LEO,min(LEO) = HW

【🌰栗子】假设某个分区的 ISR 集合中有 3 个副本,此时分区的 LEO=HW=3。消息 3 和消息 4 从生产者发出之后会存入 leader 副本。在消息写入 leader 副本之后,follower 副本会发送拉取请求来拉取消息 3 和消息 4 以进行消息同步。

由此可见,Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。

  • 同步复制要求所有能工作的 follower 副本都复制完之后,这条消息才会被确认已成功提交,这种复制方式极影响性能;
  • 异步复制方式下,follower 副本异步地从 leader 副本中复制数据,数据只要被 leader 副本写入就被认为已经成功提交,在这种情况下,如果 follower 副本都还没有复制完而落后于 leader 副本,突然 leader 所在的 Broker 宕机,则会造成数据丢失。

Kafka 使用的这种 ISR 方式则权衡了数据可靠性和性能之间的关系。

Kafka安装和启动

1)下载Kafka和Zookeeper

此处选择 kafka_2.13-3.5.0 和 apache-zookeeper-3.6.4-bin

2)修改配置文件

修改Kafka安装目录中的config目录下server.properties中的log.dirs配置项和zookeeper.properties中的dataDir配置项

【🌰栗子】

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/Users/codejuzi/program/kafka/kafka_2.13-3.5.0/kafka-logs
# the directory where the snapshot is stored.
dataDir=/Users/codejuzi/program/zookeeper/apache-zookeeper-3.6.4-bin

3)启动Zookeeper

sh ./bin/zookeeper-server-start.sh config/zookeeper.properties

【🌰栗子】

➜  kafka_2.13-3.5.0 pwd
/Users/codejuzi/program/kafka/kafka_2.13-3.5.0
➜ kafka_2.13-3.5.0 sh ./bin/zookeeper-server-start.sh config/zookeeper.properties

4)启动Kafka

sh ./bin/kafka-server-start.sh config/server.properties

【🌰栗子】

➜  kafka_2.13-3.5.0 pwd
/Users/codejuzi/program/kafka/kafka_2.13-3.5.0
➜ kafka_2.13-3.5.0 sh ./bin/kafka-server-start.sh config/server.properties

5)创建Topic

sh ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic test-topic

【🌰栗子】

➜  kafka_2.13-3.5.0 pwd
/Users/codejuzi/program/kafka/kafka_2.13-3.5.0
➜ kafka_2.13-3.5.0 sh ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic test-topic

Kafka 创建 Topic 和 RabbitMQ 创建 Exchange 的对应关系

1)RabbitMQ 创建一个 Queue,其实就是创建了一个 Exchange 和一个 Queue,此时就可以向这个 Exchange 内发送消息;Kafka 创建一个 Topic,其实就是创建了一个接收消息的 Topic 和对应的 Partition,此时就能够向这个 Topic 内发送消息;

2)RabbitMQ 中的 Exchange,内部可以包含多个 RoutingKey 和 BindingKey,在发送消息到 Exchange 时,Exchange 可以根据交换机类型选择相应的 Key 匹配机制,将消息路由到其中符合规则的 Queue 中;Kafka 中的 Topic,内部可以包含多个 Partition,在发送消息到Topic时,如果不指定 Partition,Topic 会根据一些策略将消息分配到其中一个分区(相当于一个 Topic 中有多个 Partition,在往 Partition 中添加数据的时候,直接通知 Topic,Topic 选择一个 Partition 添加数据)。

6)创建生产者

sh ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic

【🌰栗子】

➜  kafka_2.13-3.5.0 pwd
/Users/codejuzi/program/kafka/kafka_2.13-3.5.0
➜ kafka_2.13-3.5.0 sh ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic

7)创建消费者

sh ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic

【🌰栗子】

➜  kafka_2.13-3.5.0 pwd
/Users/codejuzi/program/kafka/kafka_2.13-3.5.0
➜ kafka_2.13-3.5.0 sh ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic

服务端参数配置

$KAFKA_HOME/config/server.properties 下的部分重要参数说明

官方文档

1)zookeeper.connect

该参数指明 Broker 要连接的 ZooKeeper 集群的服务地址(包含端口号),没有默认值,且 此参数为必填项。可以配置为 localhost:2181,如果 ZooKeeper 集群中有多个节点,则可以用逗号将每个节点隔开,类似于 localhost1:2181,localhost2:2181,localhost3:2181 这种格式。

最佳的实践方式是再加一个 chroot 路径,这样既可以明确指明该 chroot 路径下的节点是为 Kafka 所用的, 也可以实现多个 Kafka 集群复用一套 ZooKeeper 集群,这样可以节省更 多的硬件资源。包含 chroot 路径的配置类似于 localhost1:2181,localhost2:2181,localhost3:2181/kafka 这种,如果不指定 chroot,那么默认使用 ZooKeeper 的根路径。

2)listeners

该参数指明 Broker 监听客户端连接的地址列表,即为客户端要连接 Broker 的入口地址列表, 配置格式为 protocol1://hostname1:port1, protocol2://hostname2:port2 ,其中 protocol 代表协议类型, Kafka 当前支持的协议类型有 PLAINTEXT、SSL、SASL_SSL 等, 如果未开启安全认证,则使用简单的 PLAINTEXT 即可。 此参数的默认值为 null。与此参数关联的还有 advertised.listeners,作用和 listeners 类似,默认值也为 null。

advertised.listeners 主要用于 IaaS (Infrastructure as a Service) 环境,比如公有云上的机器通常配备有多块网卡 ,即包含私网网卡和公网网卡,对于这种情况而言,可以设置 advertised.listeners 参数绑定公网 IP 供外部客户端使用,而配置 listeners 参数来绑定私网 IP 地址供 Broker 间通信使用。

3)broker.id

该参数用来指定 Kafka 集群中 Broker 的唯一标识,默认值为 -1。如果没有设置,那么 Kafka会自动生成一个。

4)log.dirs

Kafka 把所有的消息都保存在磁盘上,而这个参数用来配置 Kafka 日志文件存放的根目录。

5)message.max.bytes

该参数用来指定 Broker 所能接收消息的最大值,默认值为 1000012 (B),约等于 976.6KB。 如果 Producer 发送的消息大于这个参数所设置的值,那么 Producer 就会报出 RecordTooLargeException 的异常。如果需要修改这个参数,那么还要考虑 max.request . size (客户端参数)、 max.message.bytes (Topic 端参数) 等参数的影响。

为了避免修改此参数而引起级联的影响,建议在修改此参数之前考虑分拆消息的可行性。