kafka基础


kafka基础

什么是 Kafka

Kafka 是一个分布式流式平台,它有三个关键能力

  1. 订阅发布记录流,它类似于企业中的消息队列企业消息传递系统
  2. 以容错的方式存储记录流
  3. 实时记录流

Kafka 的应用

  1. 作为消息系统
  2. 作为存储系统
  3. 作为流处理器

Kafka 可以建立流数据管道,可靠性的在系统或应用之间获取数据。

建立流式应用传输和响应数据。

Kafka 的其他特性

  • 持久化存储: 消息被持久化到磁盘,具有高可靠性,可以防止数据丢失。
  • 分布式架构: Kafka 集群可以扩展到数百台服务器,处理海量数据。
  • 高吞吐量: Kafka 能够处理每秒数百万条消息。
  • 实时性: Kafka 能够提供低延迟的消息传输。

Kafka 作为消息系统

Kafka 作为消息系统,它有三个基本组件

img

  • Producer : 发布消息的客户端
  • Broker:一个从生产者接受并存储消息的客户端
  • Consumer : 消费者从 Broker 中读取消息

在大型系统中,会需要和很多子系统做交互,也需要消息传递,在诸如此类系统中,你会找到源系统(消息发送方)和 目的系统(消息接收方)。为了在这样的消息系统中传输数据,你需要有合适的数据管道

img

这种数据的交互看起来就很混乱,如果我们使用消息传递系统,那么系统就会变得更加简单和整洁

img

  • Kafka 运行在一个或多个数据中心的服务器上作为集群运行
  • Kafka 集群存储消息记录的目录被称为 topics
  • 每一条消息记录包含三个要素:键(key)、值(value)、时间戳(Timestamp)

Kafka 的架构

Kafka 的核心设计理念是基于分布式日志,它将消息持久化地存储在磁盘上,并通过分区和副本机制实现高吞吐量、高可靠性和可扩展性。Kafka 只写数据到 leader 副本,也只从 leader 副本获取数据。如果 leader 失效,会重新选择出 leader。这种架构类似于 MySQL 的主从复制,但 Kafka 的读取操作也只能从 Leader 节点进行。

img

  1. Kafka 存储的消息来自任意多被称为 Producer(生产者) 的进程。数据从而可以被发布到不同的 Topic(主题) 下的不同 Partition(分区)。
  2. 在一个分区内,这些消息被索引并连同时间戳存储在一起。其它被称为 Consumer(消费者) 的进程可以从分区订阅消息。
  3. Kafka 运行在一个由一台或多台服务器组成的集群上,并且分区可以跨集群结点分布。

核心 API

Kafka 有四个核心API,它们分别是

  • Producer API,它允许应用程序向一个或多个 topics 上发送消息记录
  • Consumer API,允许应用程序订阅一个或多个 topics 并处理为其生成的记录流
  • Streams API,它允许应用程序作为流处理器,从一个或多个主题中消费输入流并为其生成输出流,有效的将输入流转换为输出流。
  • Connector API,它允许构建和运行将 Kafka 主题连接到现有应用程序或数据系统的可用生产者和消费者。例如,关系数据库的连接器可能会捕获对表的所有更改

img

Kafka 基本概念

Kafka 作为一个高度可扩展可容错的消息系统,它有很多基本概念,下面就来认识一下这些 Kafka 专属的概念

topic

Topic 被称为主题,在 kafka 中,使用一个类别属性来划分消息的所属类,划分消息的这个类称为 topic。topic 相当于消息的分配标签,是一个逻辑概念。主题好比是数据库的表,或者文件系统中的文件夹。

partition(分区)

partition 译为分区,Topic 的物理分片,topic 中的消息被分割为一个或多个的 partition,它是一个物理概念,对应到系统上的就是一个或若干个目录,一个分区就是一个 提交日志。消息以追加的形式写入分区,先后以顺序的方式读取。

img

注意:由于一个主题包含无数个分区,因此无法保证在整个 topic 中有序,但是单个 Partition 分区可以保证有序。消息被迫加写入每个分区的尾部。Kafka 通过分区来实现数据冗余和伸缩性

分区可以分布在不同的服务器上,也就是说,一个主题可以跨越多个服务器,以此来提供比单个服务器更强大的性能。

segment

Segment 被译为段,将 Partition 进一步细分为若干个 segment,每个 segment 文件的大小相等。

Replication(副本)

为了保证数据的可靠性和高可用性,Kafka 会为每个 Partition 创建多个副本。

  • 概念:每一个分区都有多个副本,用于数据冗余和故障恢复。
  • 作用:
    • 提高数据可靠性:当 Leader 分区发生故障时,可以从 Follower 分区中选举出新的 Leader,保证数据不丢失。
    • 提高可用性:即使部分 Broker 宕机,Kafka 集群仍然可以正常提供服务。
  • 数量限制:
    • Kafka 中默认副本的最大数量是 10 个。
    • 副本的数量不能大于 Broker 的数量。
    • Follower 和 Leader 必须在不同的机器上,同一机器对同一个分区也只可能存放一个副本。

broker

Kafka 集群包含一个或多个服务器每个 Kafka 中服务器被称为 broker。broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker 为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。

broker 是集群的组成部分,每个集群中都会有一个 broker 同时充当了 集群控制器(Leader)的角色,它是由集群中的活跃成员选举出来的。每个集群中的成员都有可能充当 Leader,Leader 负责管理工作,包括将分区分配给 broker 和监控 broker。集群中,一个分区从属于一个 Leader,但是一个分区可以分配给多个 broker(非Leader),这时候会发生分区复制。这种复制的机制为分区提供了消息冗余,如果一个 broker 失效,那么其他活跃用户会重新选举一个 Leader 接管。

img

producer

生产者,即消息的发布者,其会将某 topic 的消息发布到相应的 partition 中。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。

consumer

消费者,即消息的使用者,一个消费者可以消费多个 topic 的消息,对于某一个 topic 的消息,其只会消费同一个 partition 中的消息

img

在了解完 Kafka 的基本概念之后,我们通过搭建 Kafka 集群来进一步深刻认识一下 Kafka。

Leader

每个 Partition 的多个副本中,只有一个会被选为 Leader。

  • 作用:
    • 负责处理来自 Producer 的写入请求。
    • 负责处理来自 Consumer 的读取请求。
    • 将数据同步到 Follower 副本。

Follower

每个 Partition 的其他副本被称为 Follower。

  • 作用:
    • 实时从 Leader 中同步数据,保持和 Leader 数据的同步。
    • 当 Leader 发生故障时,某个 Follower 还会被选举成为新的 Leader。

Offset

分区中每条消息的唯一标识,表示 Consumer 消费的位置。

  • 概念:消费者消费的位置信息,用于跟踪数据消费进度。
  • 作用:
    • 监控数据消费到什么位置。
    • 当 Consumer 挂掉再重新恢复的时候,可以从上次的消费位置继续消费,实现断点续传。
  • 特点:同一主题,不同的分区,它们的 Offset 是独立的。

ZooKeeper

Kafka 集群依赖 ZooKeeper 来存储和管理集群的元数据信息。

  • 作用:
    • 存储 Broker 的注册信息。
    • 维护 Topic 的分区信息。
    • 管理 Consumer Group 的信息。
    • 进行 Leader 选举。

工作流程

img

  1. Offset 独立性:不同的 Partition 的 Offset 是独立的,每个 Consumer Group 维护自己的 Offset。
  2. Topic 与 Partition 关系:Kafka 中消息是以 Topic 进行分类的,Producer 生产消息,Consumer 消费消息,面向的都是同一个 Topic。
  3. Topic 与 Partition 物理对应:Topic 是逻辑上的概念,而 Partition 是物理上的概念,每个 Partition 对应于一个 log 文件,该 log 文件中存储的就是 Producer 生产的数据。
  4. 消息追加与 Offset:Producer 生产的数据会不断追加到该 log 文件末端,且每条数据都有自己的 Offset。
  5. Consumer Offset 记录:Consumer 组中的每个 Consumer,都会实时记录自己消费到了哪个 Offset,以便出错恢复时,从上次的位置继续消费。
  6. 日志存储位置:默认情况下,日志文件存储在 /tmp/kafka-logs 目录下 (生产环境需要修改配置)。

数据可靠性保证

为保证 Producer 发送的数据,能可靠地发送到指定的 Topic,Topic 的每个 Partition 收到 Producer 发送的数据后,都需要向 Producer 发送 ACK(ACKnowledge 确认收到)。如果 Producer 收到 ACK,就会进行下一轮的发送,否则重新发送数据。

img

副本数据同步策略

确保有 Follower 与 Leader 同步完成,Leader 再发送 ACK,这样才能保证 Leader 挂掉之后,能在 Follower 中选举出新的 Leader 而不丢数据。

方案 优点 缺点
半数以上完成同步,就发送 ACK 延迟低 选举新的 leader 时,容忍 n 台节点故障,需要 2n+1 个副本。
全部完成同步,才发送 ACK 选举新的 leader 时,容忍 n 台节点故障,需要 n+1 个副本。 延迟高。

当采用第二种方案时,所有 Follower 完成同步,Producer 才能继续发送数据。设想有一个 Follower 因为某种原因出现故障,那 Leader 就要一直等到它完成同步。这个问题怎么解决?

  1. Leader 维护了一个动态的 in-sync replica set(ISR):和 Leader 保持同步的 Follower 集合。
  2. 当 ISR 集合中的 Follower 完成数据的同步之后,Leader 就会给 Follower 发送 ACK。
  3. 如果 Follower 长时间未向 Leader 同步数据,则该 Follower 将被踢出 ISR 集合,该时间阈值由 replica.lag.time.max.ms 参数设定。Leader 发生故障后,就会从 ISR 中选举出新的 Leader。

ACK 应答机制

Kafka 为用户提供了三种可靠性级别,用户根据可靠性和延迟的要求进行权衡,选择以下的配置。

img

ACK 参数配置:

  1. acks=0:Producer 不等待 Broker 的 ACK,这提供了最低延迟,Broker 一收到数据还没有写入磁盘就已经返回,当 Broker 故障时有可能丢失数据。
  2. acks=1:Producer 等待 Broker 的 ACK,Partition 的 Leader 落盘成功后返回 ACK,如果在 Follower 同步成功之前 Leader 故障,那么将会丢失数据。
  3. acks=-1 或 all:Producer 等待 Broker 的 ACK,Partition 的 Leader 和 Follower 全部落盘成功后才返回 ACK。但是在 Broker 发送 ACK 时,Leader 发生故障,则会造成数据重复。 这是最安全的模式,但延迟也相对较高。

可靠性指标

  1. 分区副本:你可以创建更多的分区来提升可靠性,但是分区数过多也会带来性能上的开销,一般来说,3 个副本就能满足对大部分场景的可靠性要求。

  2. ACKS:生产者发送消息的可靠性,也就是我要保证我这个消息一定是到了 broker 并且完成了多副本的持久化。

  3. 保障消息到了 broker 之后,消费者也需要有一定的保证,因为消费者也可能出现某些问题导致消息没有消费到。

  4. enable.auto.commit 默认为 true,也就是自动提交 offset。自动提交是批量执行的,有一个时间窗口,这种方式会带来重复提交或者消息丢失的问题,所以对于高可靠性要求的程序,要使用手动提交。对于高可靠要求的应用来说,宁愿重复消费也不应该因为消费异常而导致消息丢失。

kafka幂等性

Kafka为啥需要幂等性?

Producer在生产发送消息时,难免会重复发送消息。Producer进行retry时会产生重试机制,发生消息重复发送。而引入幂等性后,重复发送只会生成一条有效的消息。Kafka作为分布式消息系统,它的使用场景常见与分布式系统中,比如消息推送系统、业务平台系统(如物流平台、银行结算平台等)。以银行结算平台来说,业务方作为上游把数据上报到银行结算平台,如果一份数据被计算、处理多次,那么产生的影响会很严重。

影响Kafka幂等性的因素有哪些?

在使用Kafka时,需要确保Exactly-Once语义。分布式系统中,一些不可控因素有很多,比如网络、OOM、FullGC等。在Kafka Broker确认Ack时,出现网络异常、FullGC、OOM等问题时导致Ack超时,Producer会进行重复发送。可能出现的情况如下:

img

Kafka的幂等性是如何实现的?

Kafka为了实现幂等性,它在底层设计架构中引入了ProducerID和SequenceNumber。那这两个概念的用途是什么呢?

  • ProducerID:在每个新的Producer初始化时,会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的。
  • SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。

幂等性引入之前的问题?

Kafka在引入幂等性之前,Producer向Broker发送消息,然后Broker将消息追加到消息流中后给Producer返回Ack信号值。实现流程如下:

img

上图的实现流程是一种理想状态下的消息发送情况,但是实际情况中,会出现各种不确定的因素,比如在Producer在发送给Broker的时候出现网络异常。比如以下这种异常情况的出现:

img

上图这种情况,当Producer第一次发送消息给Broker时,Broker将消息(x2,y2)追加到了消息流中,但是在返回Ack信号给Producer时失败了(比如网络异常) 。此时,Producer端触发重试机制,将消息(x2,y2)重新发送给Broker,Broker接收到消息后,再次将该消息追加到消息流中,然后成功返回Ack信号给Producer。这样下来,消息流中就被重复追加了两条相同的(x2,y2)的消息。

幂等性引入之后解决了什么问题?

面对这样的问题,Kafka引入了幂等性。那么幂等性是如何解决这类重复发送消息的问题的呢?下面我们可以先来看看流程图:

img

同样,这是一种理想状态下的发送流程。实际情况下,会有很多不确定的因素,比如Broker在发送Ack信号给Producer时出现网络异常,导致发送失败。异常情况如下图所示:

img

当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回Ack信号给Producer时,发生异常导致Producer接收Ack信号失败。对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引入了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,而之前Broker缓存过之前发送的相同的消息,那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的情况。

ProducerID是如何生成的?

客户端在生成Producer时,会实例化如下代码:

// 实例化一个Producer对象
Producer<String, String> producer = new KafkaProducer<>(props);

在org.apache.kafka.clients.producer.internals.Sender类中,在run()中有一个maybeWaitForPid()方法,用来生成一个ProducerID,实现代码如下:

private void maybeWaitForPid() {
        if (transactionState == null)
            return;

        while (!transactionState.hasPid()) {
            try {
                Node node = awaitLeastLoadedNodeReady(requestTimeout);
                if (node != null) {
                    ClientResponse response = sendAndAwaitInitPidRequest(node);
                    if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
                        InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
                        transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
                    } else {
                        log.error("Received an unexpected response type for an InitPidRequest from {}. " +
                                "We will back off and try again.", node);
                    }
                } else {
                    log.debug("Could not find an available broker to send InitPidRequest to. " +
                            "We will back off and try again.");
                }
            } catch (Exception e) {
                log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
            }
            log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
            time.sleep(retryBackoffMs);
            metadata.requestUpdate();
        }
    }

事务

与幂等性有关的另外一个特性就是事务。Kafka中的事务与数据库的事务类似,Kafka中的事务属性是指一系列的Producer生产消息和消费消息提交Offsets的操作在一个事务中,即原子性操作。对应的结果是同时成功或者同时失败。

这里需要与数据库中事务进行区别,操作数据库中的事务指一系列的增删查改,对Kafka来说,操作事务是指一系列的生产和消费等原子性操作。

Kafka引入事务的用途?

在事务属性引入之前,先引入Producer的幂等性,它的作用为:

  • Producer多次发送消息可以封装成一个原子性操作,即同时成功,或者同时失败;
  • 消费者&生产者模式下,因为Consumer在Commit Offsets出现问题时,导致重复消费消息时,Producer重复生产消息。需要将这个模式下Consumer的Commit Offsets操作和Producer一系列生产消息的操作封装成一个原子性操作。

产生的场景有:

比如,在Consumer中Commit Offsets时,当Consumer在消费完成时Commit的Offsets为100(假设最近一次Commit的Offsets为50),那么执行触发Balance时,其他Consumer就会重复消费消息(消费的Offsets介于50~100之间的消息)。

kafka的生产者事务是为了实现消息的精准处理一致性而产生的

事务提供了哪些可使用的API?

Producer提供了五种事务方法,它们分别是:initTransactions()、beginTransaction()、sendOffsetsToTransaction()、commitTransaction()、abortTransaction(),代码定义在org.apache.kafka.clients.producer.Producer<K,V>接口中,具体定义接口如下:

// 初始化事务,需要注意确保transation.id属性被分配
void initTransactions();

// 开启事务
void beginTransaction() throws ProducerFencedException;

// 为Consumer提供的在事务内Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId) throws ProducerFencedException;

// 提交事务
void commitTransaction() throws ProducerFencedException;

// 放弃事务,类似于回滚事务的操作
void abortTransaction() throws ProducerFencedException;

事务的实际应用场景有哪些?

在Kafka事务中,一个原子性操作,根据操作类型可以分为3种情况。情况如下:

  • 只有Producer生产消息,这种场景需要事务的介入;
  • 消费消息和生产消息并存,比如Consumer&Producer模式,这种场景是一般Kafka项目中比较常见的模式,需要事务介入;
  • 只有Consumer消费消息,这种操作在实际项目中意义不大,和手动Commit Offsets的结果一样,而且这种场景不是事务的引入目的。

安装环境

安装 Java 环境

在安装 Kafka 之前,先确保Linux 环境上是否有 Java 环境,使用 java -version 命令查看 Java 版本,推荐使用Jdk 1.8 ,如果没有安装 Java 环境的话,可以按照这篇文章进行安装(https://www.cnblogs.com/zs-notes/p/8535275.html)

安装 Zookeeper 环境

Kafka 的底层使用 Zookeeper 储存元数据,确保一致性,所以安装 Kafka 前需要先安装 Zookeeper,Kafka 的发行版自带了 Zookeeper ,可以直接使用脚本来启动,不过安装一个 Zookeeper 也不费劲

Zookeeper 单机搭建

Zookeeper 单机搭建比较简单,直接从

https://www.apache.org/dyn/closer.cgi/zookeeper/

官网下载一个稳定版本的 Zookeeper ,这里我使用的是 3.4.10,下载完成后,在 Linux 系统中的 /usr/local 目录下创建 zookeeper 文件夹,使用 xftp 工具(xftp 和 xshell 工具都可以在官网

www.netsarang.com/zh/xshell/

申请免费的家庭版)把下载好的 zookeeper 压缩包放到 /usr/local/zookeeper 目录下。

如果下载的是一个 tar.gz 包的话,直接使用 tar -zxvf zookeeper-3.4.10.tar.gz解压即可

如果下载的是 zip 包的话,还要检查一下 Linux 中是否有 unzip 工具,如果没有的话,使用 yum install unzip 安装 zip 解压工具,完成后使用 unzip zookeeper-3.4.10.zip 解压即可。

解压完成后,cd 到 /usr/local/zookeeper/zookeeper-3.4.10 ,创建一个 data 文件夹,然后进入到 conf 文件夹下,使用 mv zoo_sample.cfg zoo.cfg 进行重命名操作

然后使用 vi 打开 zoo.cfg ,更改一下dataDir = /usr/local/zookeeper/zookeeper-3.4.10/data ,保存。

进入bin目录,启动服务输入命令 ./zkServer.sh start 输出下面内容表示搭建成功

img

关闭服务输入命令,./zkServer.sh stop

img

使用 ./zkServer.sh status 可以查看状态信息。

Zookeeper 集群搭建
准备条件

准备条件:需要三个服务器,这里我使用了CentOS7 并安装了三个虚拟机,并为各自的虚拟机分配了1GB的内存,在每个 /usr/local/ 下面新建 zookeeper 文件夹,把 zookeeper 的压缩包挪过来,解压,完成后会有 zookeeper-3.4.10 文件夹,进入到文件夹,新建两个文件夹,分别是 datalog文件夹

注:上一节单机搭建中已经创建了一个data 文件夹,就不需要重新创建了,直接新建一个 log 文件夹,对另外两个新增的服务需要新建这两个文件夹。

设置集群

新建完成后,需要编辑 conf/zoo.cfg 文件,三个文件的内容如下

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/zookeeper-3.4.10/data
dataLogDir=/usr/local/zookeeper/zookeeper-3.4.10/log
clientPort=12181
server.1=192.168.1.7:12888:13888
server.2=192.168.1.8:12888:13888
server.3=192.168.1.9:12888:13888

server.1 中的这个 1 表示的是服务器的标识也可以是其他数字,表示这是第几号服务器,这个标识要和下面我们配置的 myid 的标识一致可以。

192.168.1.7:12888:13888 为集群中的 ip 地址,第一个端口表示的是 master 与 slave 之间的通信接口,默认是 2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口,默认是 3888

现在对上面的配置文件进行解释

tickTime: 这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。

initLimit:这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒

syncLimit: 这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒

dataDir: 快照日志的存储路径

dataLogDir: 事务日志的存储路径,如果不配置这个那么事务日志会默认存储到dataDir指定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事务日志、快照日志太多

clientPort: 这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。

创建 myid 文件

在了解完其配置文件后,现在来创建每个集群节点的 myid ,我们上面说过,这个 myid 就是 server.1 的这个 1 ,类似的,需要为集群中的每个服务都指定标识,使用 echo 命令进行创建

# server.1
echo "1" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid
# server.2
echo "2" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid
# server.3
echo "3" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid
启动服务并测试

配置完成,为每个 zk 服务启动并测试,我在 windows 电脑的测试结果如下

启动服务(每台都需要执行)

cd /usr/local/zookeeper/zookeeper-3.4.10/bin
./zkServer.sh start

检查服务状态

使用 ./zkServer.sh status 命令检查服务状态

192.168.1.7 — follower

img

192.168.1.8 — leader

img

192.168.1.9 — follower

img

zk集群一般只有一个leader,多个follower,主一般是相应客户端的读写请求,而从主同步数据,当主挂掉之后就会从follower里投票选举一个leader出来。

Kafka 集群搭建

准备条件

/usr/local 下新建 kafka 文件夹,然后把下载完成的 tar.gz 包移到 /usr/local/kafka 目录下,使用 tar -zxvf 压缩包 进行解压,解压完成后,进入到 kafka_2.12-2.3.0 目录下,新建 log 文件夹,进入到 config 目录下

我们可以看到有很多 properties 配置文件,这里主要关注 server.properties 这个文件即可。

img

kafka 启动方式有两种,一种是使用 kafka 自带的 zookeeper 配置文件来启动(可以按照官网来进行启动,并使用单个服务多个节点来模拟集群http://kafka.apache.org/quickstart#quickstart_multibroker),一种是通过使用独立的zk集群来启动,这里推荐使用第二种方式,使用 zk 集群来启动

修改配置项

需要为每个服务都修改一下配置项,也就是server.properties, 需要更新和添加的内容有

broker.id=0 //初始是0,每个 server 的broker.id 都应该设置为不一样的,就和 myid 一样 我的三个服务分别设置的是 1,2,3
log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log

#在log.retention.hours=168 下面新增下面三项
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

#设置zookeeper的连接端口
zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181

配置项的含义

broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=9092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.1.7 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880  #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181 #设置zookeeper的连接端口

启动 Kafka 集群并测试

启动服务,进入到 /usr/local/kafka/kafka_2.12-2.3.0/bin 目录下

# 启动后台进程
./kafka-server-start.sh -daemon ../config/server.properties

检查服务是否启动

# 执行命令 jps
6201 QuorumPeerMain
7035 Jps
6972 Kafka
  • kafka 已经启动
  • 创建 Topic 来验证是否创建成功# cd .. 往回退一层 到 /usr/local/kafka/kafka_2.12-2.3.0 目录下 bin/kafka-topics.sh –create –zookeeper 192.168.1.7:2181 –replication-factor 2 –partitions 1 –topic cxuan 对上面的解释 –replication-factor 2 复制两份 –partitions 1 创建1个分区 –topic 创建主题 查看我们的主题是否出创建成功 bin/kafka-topics.sh –list –zookeeper 192.168.1.7:2181

img

启动一个服务就能把集群启动起来 在一台机器上创建一个发布者

# 创建一个broker,发布者
./kafka-console-producer.sh --broker-list 192.168.1.7:9092 --topic cxuantopic

在一台服务器上创建一个订阅者

# 创建一个consumer, 消费者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning

注意:这里使用 –zookeeper 的话可能出现 zookeeper is not a recognized option 的错误,这是因为 kafka 版本太高,需要使用 --bootstrap-server 指令

测试结果

发布

img

消费

img

其他命令

显示 topic

bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181

# 显示
cxuantopic

查看 topic 状态

bin/kafka-topics.sh --describe --zookeeper 192.168.1.7:2181 --topic cxuantopic

# 下面是显示的详细信息
Topic:cxuantopic PartitionCount:1 ReplicationFactor:2 Configs:
Topic: cxuantopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2

# 分区为为1  复制因子为2   主题 cxuantopic 的分区为0 
# Replicas: 0,1   复制的为12

Leader 负责给定分区的所有读取和写入的节点,每个节点都会通过随机选择成为 leader。

Replicas 是为该分区复制日志的节点列表,无论它们是 Leader 还是当前处于活动状态。

Isr 是同步副本的集合。它是副本列表的子集,当前仍处于活动状态并追随Leader。

至此,kafka 集群搭建完毕。

验证多节点接收数据

刚刚我们都使用的是 相同的ip 服务,下面使用其他集群中的节点,验证是否能够接受到服务

在另外两个节点上使用

bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning

然后再使用 broker 进行消息发送,经测试三个节点都可以接受到消息。

配置详解

在搭建 Kafka 的时候我们简单介绍了一下 server.properties 中配置的含义,现在我们来详细介绍一下参数的配置和概念

常规配置

这些参数是 kafka 中最基本的配置

  • broker.id

每个 broker 都需要有一个标识符,使用 broker.id 来表示。它的默认值是 0,它可以被设置成其他任意整数,在集群中需要保证每个节点的 broker.id 都是唯一的。

  • port

如果使用配置样本来启动 kafka ,它会监听 9092 端口,修改 port 配置参数可以把它设置成其他任意可用的端口。

  • zookeeper.connect

用于保存 broker 元数据的地址是通过 zookeeper.connect 来指定。 localhost:2181 表示运行在本地 2181 端口。该配置参数是用逗号分隔的一组 hostname:port/path 列表,每一部分含义如下:

hostname 是 zookeeper 服务器的服务名或 IP 地址

port 是 zookeeper 连接的端口

/path 是可选的 zookeeper 路径,作为 Kafka 集群的 chroot 环境。如果不指定,默认使用跟路径

  • log.dirs

Kafka 把消息都保存在磁盘上,存放这些日志片段的目录都是通过 log.dirs 来指定的。它是一组用逗号分隔的本地文件系统路径。如果指定了多个路径,那么 broker 会根据 “最少使用” 原则,把同一分区的日志片段保存到同一路径下。要注意,broker 会向拥有最少数目分区的路径新增分区,而不是向拥有最小磁盘空间的路径新增分区。

  • num.recovery.threads.per.data.dir

对于如下 3 种情况,Kafka 会使用可配置的线程池来处理日志片段

服务器正常启动,用于打开每个分区的日志片段;

服务器崩溃后启动,用于检查和截断每个分区的日志片段;

服务器正常关闭,用于关闭日志片段

默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到井行操作的目的。特别是对于包含大量分区的服务器来说,一旦发生崩愤,在进行恢复时使用井行操作可能会省下数小时的时间。设置此参数时需要注意,所配置的数字对应的是 log.dirs 指定的单个日志目录。也就是说,如果 num.recovery.threads.per.data.dir 被设为 8,并且 log.dir 指定了 3 个路径,那么总共需要 24 个线程。

  • auto.create.topics.enable

默认情况下,Kafka 会在如下 3 种情况下创建主题

当一个生产者开始往主题写入消息时

当一个消费者开始从主题读取消息时

当任意一个客户向主题发送元数据请求时

  • delete.topic.enable

如果你想要删除一个主题,你可以使用主题管理工具。默认情况下,是不允许删除主题的,delete.topic.enable 的默认值是 false 因此你不能随意删除主题。这是对生产环境的合理性保护,但是在开发环境和测试环境,是可以允许你删除主题的,所以,如果你想要删除主题,需要把 delete.topic.enable 设为 true。

主题默认配置

Kafka 为新创建的主题提供了很多默认配置参数,下面就来一起认识一下这些参数

  • num.partitions

num.partitions 参数指定了新创建的主题需要包含多少个分区。如果启用了主题自动创建功能(该功能是默认启用的),主题分区的个数就是该参数指定的值。该参数的默认值是 1。要注意,我们可以增加主题分区的个数,但不能减少分区的个数。

  • default.replication.factor

这个参数比较简单,它表示 kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务default.replication.factor 的默认值为1,这个参数在你启用了主题自动创建功能后有效。

  • log.retention.ms

Kafka 通常根据时间来决定数据可以保留多久。默认使用 log.retention.hours 参数来配置时间,默认是 168 个小时,也就是一周。除此之外,还有两个参数 log.retention.minutes 和 log.retentiion.ms 。这三个参数作用是一样的,都是决定消息多久以后被删除,推荐使用 log.retention.ms。

  • log.retention.bytes

另一种保留消息的方式是判断消息是否过期。它的值通过参数 log.retention.bytes 来指定,作用在每一个分区上。也就是说,如果有一个包含 8 个分区的主题,并且 log.retention.bytes 被设置为 1GB,那么这个主题最多可以保留 8GB 数据。所以,当主题的分区个数增加时,整个主题可以保留的数据也随之增加。

  • log.segment.bytes

上述的日志都是作用在日志片段上,而不是作用在单个消息上。当消息到达 broker 时,它们被追加到分区的当前日志片段上,当日志片段大小到达 log.segment.bytes 指定上限(默认为 1GB)时,当前日志片段就会被关闭,一个新的日志片段被打开。如果一个日志片段被关闭,就开始等待过期。这个参数的值越小,就越会频繁的关闭和分配新文件,从而降低磁盘写入的整体效率。

  • log.segment.ms

上面提到日志片段经关闭后需等待过期,那么 log.segment.ms 这个参数就是指定日志多长时间被关闭的参数和,log.segment.ms 和 log.retention.bytes 也不存在互斥问题。日志片段会在大小或时间到达上限时被关闭,就看哪个条件先得到满足。

  • message.max.bytes

broker 通过设置 message.max.bytes 参数来限制单个消息的大小,默认是 1000 000, 也就是 1MB,如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到 broker 返回的错误消息。跟其他与字节相关的配置参数一样,该参数指的是压缩后的消息大小,也就是说,只要压缩后的消息小于 mesage.max.bytes,那么消息的实际大小可以大于这个值

这个值对性能有显著的影响。值越大,那么负责处理网络连接和请求的线程就需要花越多的时间来处理这些请求。它还会增加磁盘写入块的大小,从而影响 IO 吞吐量。


文章作者: wmg
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 wmg !
  目录