Kafka学习

Olivia的小跟班 Lv4

题记

本文记录对Kafka的入门学习。

Kafka介绍

Kafka是什么

1
2
3
4
5
6
7
8
kafka使用scala开发,支持多语言客户端(c++、java、python、go等)
Kafka最先由LinkedIn公司开发,之后成为Apache的顶级项目。
Kafka是一个分布式的、分区化、可复制提交的日志服务
LinkedIn使用Kafka实现了公司不同应用程序之间的松耦和,那么作为一个可扩展、高可靠的消息系统
支持高Throughput的应用
scale out:无需停机即可扩展机器
持久化:通过将数据持久化到硬盘以及replication防止数据丢失
支持online和offline的场景

Kafka的特点

Kafka是分布式的,其所有的构件borker(服务端集群)、producer(消息生产)、consumer(消息消费者)都可以是分布式的。

在消息的生产时可以使用一个标识topic来区分,且可以进行分区;每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。

同时为发布和订阅提供高吞吐量。据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。

消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡

常用的场景

监控:主机通过Kafka发送与系统和应用程序健康相关的指标,然后这些信息会被收集和处理从而创建监控仪表盘并发送警告。

消息队列: 应用程度使用Kafka作为传统的消息系统实现标准的队列和消息的发布—订阅,例如搜索和内容提要(Content Feed)。比起大多数的消息系统来说,Kafka有更好的吞吐量,内置的分区,冗余及容错性,这让Kafka成为了一个很好的大规模消息处理应用的解决方案。消息系统 一般吞吐量相对较低,但是需要更小的端到端延时,并尝尝依赖于Kafka提供的强大的持久性保障。在这个领域,Kafka足以媲美传统消息系统,如ActiveMR或RabbitMQ

站点的用户活动追踪: 为了更好地理解用户行为,改善用户体验,将用户查看了哪个页面、点击了哪些内容等信息发送到每个数据中心的Kafka集群上,并通过Hadoop进行分析、生成日常报告。

流处理:保存收集流数据,以提供之后对接的Storm或其他流式计算框架进行处理。很多用户会将那些从原始topic来的数据进行 阶段性处理,汇总,扩充或者以其他的方式转换到新的topic下再继续后面的处理。例如一个文章推荐的处理流程,可能是先从RSS数据源中抓取文章的内 容,然后将其丢入一个叫做“文章”的topic中;后续操作可能是需要对这个内容进行清理,比如回复正常数据或者删除重复数据,最后再将内容匹配的结果返 还给用户。这就在一个独立的topic之外,产生了一系列的实时数据处理的流程。

日志聚合:使用Kafka代替日志聚合(log aggregation)。日志聚合一般来说是从服务器上收集日志文件,然后放到一个集中的位置(文件服务器或HDFS)进行处理。然而Kafka忽略掉 文件的细节,将其更清晰地抽象成一个个日志或事件的消息流。这就让Kafka处理过程延迟更低,更容易支持多数据源和分布式数据处理。比起以日志为中心的 系统比如Scribe或者Flume来说,Kafka提供同样高效的性能和因为复制导致的更高的耐用性保证,以及更低的端到端延迟

持久性日志:Kafka可以为一种外部的持久性日志的分布式系统提供服务。这种日志可以在节点间备份数据,并为故障节点数据回复提供一种重新同步的机制。Kafka中日志压缩功能为这种用法提供了条件。在这种用法中,Kafka类似于Apache BookKeeper项目。

Kafka中包含以下基础概念

1
2
3
4
5
6
1.Topic(话题):Kafka中用于区分不同类别信息的类别名称。由producer指定
2.Producer(生产者):将消息发布到Kafka特定的Topic的对象(过程)
3.Consumers(消费者):订阅并处理特定的Topic中的消息的对象(过程)
4.Broker(Kafka服务集群):已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息。
5.Partition(分区):Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)
Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。

消息

消息由一个固定大小的报头和可变长度但不透明的字节阵列负载。报头包含格式版本和CRC32效验和以检测损坏或截断

消息格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1. 4 byte CRC32 of the message
2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
3. 1 byte "attributes" identifier to allow annotations on the message independent of the version
bit 0 ~ 2 : Compression codec
0 : no compression
1 : gzip
2 : snappy
3 : lz4
bit 3 : Timestamp type
0 : create time
1 : log append time
bit 4 ~ 7 : reserved
4. (可选) 8 byte timestamp only if "magic" identifier is greater than 0
5. 4 byte key length, containing length K
6. K byte key
7. 4 byte payload length, containing length V
8. V byte payload

Kafka深层介绍

架构介绍

架构介绍

  • Producer:Producer即生产者,消息的产生者,是消息的⼊口。
  • kafka cluster:kafka集群,一台或多台服务器组成
    • Broker:Broker是指部署了Kafka实例的服务器节点。每个服务器上有一个或多个kafka的实 例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的 编号,如图中的broker-0、broker-1等……
    • Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上 都可以创建多个topic。实际应用中通常是一个业务线建一个topic。
    • Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞 吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的⽂件夹!
    • Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的 时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10 个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机 器对同一个分区也只可能存放一个副本(包括自己)。
  • Consumer:消费者,即消息的消费方,是消息的出口。
    • Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分 区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个 topic的不同分区的数据,这也是为了提高kafka的吞吐量!

⼯作流程

我们看上⾯的架构图中,producer就是生产者,是数据的入口。Producer在写入数据的时候会把数据 写入到leader中,不会直接将数据写入follower!那leader怎么找呢?写入的流程又是什么样的呢?我 们看下图:

⼯作流程

1
2
3
4
5
6
1.⽣产者从Kafka集群获取分区leader信息
2.⽣产者将消息发送给leader
3.leader将消息写入本地磁盘
4.follower从leader拉取消息数据
5.follower将消息写入本地磁盘后向leader发送ACK
6.leader收到所有的follower的ACK之后向生产者发送ACK

选择partition的原则

那在kafka中,如果某个topic有多个partition,producer⼜怎么知道该将数据发往哪个partition呢? kafka中有几个原则:

1
2
3
4
1.partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。
2.如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
3.如果既没指定partition,又没有设置key,则会采用轮询⽅式,即每次取一小段时间的数据写入某
个partition,下一小段的时间写入下一个partition

ACK应答机制

producer在向kafka写入消息的时候,可以设置参数来确定是否确认kafka接收到数据,这个参数可设置 的值为 0,1,all

  • 0代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效 率最高。
  • 1代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。
  • all代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保 leader发送成功和所有的副本都完成备份。安全性最⾼高,但是效率最低。

最后要注意的是,如果往不存在的topic写数据,kafka会⾃动创建topic,partition和replication的数量 默认配置都是1。

Topic和数据⽇志

topic 是同⼀类别的消息记录(record)的集合。在Kafka中,⼀个主题通常有多个订阅者。对于每个 主题,Kafka集群维护了⼀个分区数据⽇志⽂件结构如下:

Topic和数据⽇志

每个partition都是⼀个有序并且不可变的消息记录集合。当新的数据写⼊时,就被追加到partition的末 尾。在每个partition中,每条消息都会被分配⼀个顺序的唯⼀标识,这个标识被称为offset,即偏移 量。注意,Kafka只保证在同⼀个partition内部消息是有序的,在不同partition之间,并不能保证消息 有序。

Kafka可以配置⼀个保留期限,⽤来标识⽇志会在Kafka集群内保留多⻓时间。Kafka集群会保留在保留 期限内所有被发布的消息,不管这些消息是否被消费过。⽐如保留期限设置为两天,那么数据被发布到 Kafka集群的两天以内,所有的这些数据都可以被消费。当超过两天,这些数据将会被清空,以便为后 续的数据腾出空间。由于Kafka会将数据进⾏持久化存储(即写⼊到硬盘上),所以保留的数据⼤⼩可 以设置为⼀个⽐较⼤的值。

Partition结构

Partition在服务器上的表现形式就是⼀个⼀个的⽂件夹,每个partition的⽂件夹下⾯会有多组segment ⽂件,每组segment⽂件⼜包含 .index ⽂件、 .log ⽂件、 .timeindex ⽂件三个⽂件,其中 .log ⽂ 件就是实际存储message的地⽅,⽽ .index 和 .timeindex ⽂件为索引⽂件,⽤于检索消息。

消费数据

多个消费者实例可以组成⼀个消费者组,并⽤⼀个标签来标识这个消费者组。⼀个消费者组中的不同消 费者实例可以运⾏在不同的进程甚⾄不同的服务器上。

如果所有的消费者实例都在同⼀个消费者组中,那么消息记录会被很好的均衡的发送到每个消费者实 例。

如果所有的消费者实例都在不同的消费者组,那么每⼀条消息记录会被⼴播到每⼀个消费者实例。

消费数据

举个例⼦,如上图所示⼀个两个节点的Kafka集群上拥有⼀个四个partition(P0-P3)的topic。有两个 消费者组都在消费这个topic中的数据,消费者组A有两个消费者实例,消费者组B有四个消费者实例。 从图中我们可以看到,在同⼀个消费者组中,每个消费者实例可以消费多个分区,但是每个分区最多只 能被消费者组中的⼀个实例消费。也就是说,如果有⼀个4个分区的主题,那么消费者组中最多只能有4 个消费者实例去消费,多出来的都不会被分配到分区。其实这也很好理解,如果允许两个消费者实例同 时消费同⼀个分区,那么就⽆法记录这个分区被这个消费者组消费的offset了。如果在消费者组中动态 的上线或下线消费者,那么Kafka集群会⾃动调整分区与消费者实例间的对应关系。

Kafka安装

我个人喜欢直接将Kafka安装到docker上,所以我这里的安装教程也是docker下的Kafka安装。

第一步:使用下述命令从Docker Hub查找镜像,此处我们要选择的是zookeeper官网的镜像

1
docker search zookeeper

image-20230702030711415

第二步:拉取zookeeper镜像

1
docker pull wurstmeister/zookeeper:latest

image-20230702033317199

第三步:启动zookeeper容器

1
docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper

-v /etc/localtime:/etc/localtime 容器时间同步虚拟时间

image-20230702033420689

第四步:拉取Kafka镜像,此处选择的是 wurstmeister/kafka (这个镜像的受欢迎程度最高)

1
docker pull wurstmeister/kafka:latest

image-20230702031048507

第五步:如果是在Windows系统中可以使用下述命令启动kafka容器

1
docker run -d --name Kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=localhost --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka

image-20230702033445023

第六步:使用下述命令进入都kafka容器内部,然后进入到 /opt/kafka_2.13-2.8.1/bin 目录下

1
2
docker exec -it 容器id bash
cd /opt/kafka_2.13-2.8.1/bin

image-20230702033546224

第七步:使用下述命令创建一个单分区单副本的主题 “test123”,并且创建生产者生产消息

1
2
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test123
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test123

image-20230702033657445

接着另起一个连接创建消费者接收消息:

1
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test123 --from-beginning

image-20230702033715986

操作Kafka

1
go get github.com/segmentio/kafka-go

连接kafka发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
"context"
kafka "github.com/segmentio/kafka-go"
"log"
)

func main() {
// make a writer that produces to test, using the least-bytes distribution
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "test",
Balancer: &kafka.LeastBytes{},
}

err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
kafka.Message{
Key: []byte("Key-B"),
Value: []byte("One!"),
},
kafka.Message{
Key: []byte("Key-C"),
Value: []byte("Two!"),
},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}

if err := w.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}

}

连接kafka消费消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"context"
"fmt"
kafka "github.com/segmentio/kafka-go"
"log"
)

func main() {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
GroupID: "consumer-group-id",
Topic: "test",
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})

for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}

if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}

}

输出

1
2
3
message at topic/partition/offset test/0/6: Key-A = Hello World!
message at topic/partition/offset test/0/7: Key-B = One!
message at topic/partition/offset test/0/8: Key-C = Two!

消息队列面试题

为什么要使用消息队列?

总结一下,主要三点原因:解耦、异步、削峰

1、解耦。比如,用户下单后,订单系统需要通知库存系统,假如库存系统无法访问,则订单减库存将失败,从而导致订单操作失败。订单系统与库存系统耦合,这个时候如果使用消息队列,可以返回给用户成功,先把消息持久化,等库存系统恢复后,就可以正常消费减去库存了。

2、异步。将消息写入消息队列,非必要的业务逻辑以异步的方式运行,不影响主流程业务。

3、削峰。消费端慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。比如秒杀活动,一般会因为流量过大,从而导致流量暴增,应用挂掉。这个时候加上消息队列,服务器接收到用户的请求后,首先写入消息队列,如果消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。

使用了消息队列会有什么缺点

  • 系统可用性降低。引入消息队列之后,如果消息队列挂了,可能会影响到业务系统的可用性。
  • 系统复杂性增加。加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。

如何避免消息重复消费?

在消息生产时,MQ内部针对每条生产者发送的消息生成一个唯一id,作为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列。

在消息消费时,要求消息体中也要有一全局唯一id作为去重和幂等的依据,避免同一条消息被重复消费。

多线程异步和MQ的区别

  • CPU消耗。多线程异步可能存在CPU竞争,而MQ不会消耗本机的CPU。
  • MQ 方式实现异步是完全解耦的,适合于大型互联网项目。
  • 削峰或者消息堆积能力。当业务系统处于高并发,MQ可以将消息堆积在Broker实例中,而多线程会创建大量线程,甚至触发拒绝策略。
  • 使用MQ引入了中间件,增加了项目复杂度和运维难度。

总的来说,规模比较小的项目可以使用多线程实现异步,大项目建议使用MQ实现异步。

MQ 中的消息过期失效了怎么办?

如果使用的是RabbitMQ的话,RabbtiMQ 是可以设置过期时间的(TTL)。如果消息在 Queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。这时的问题就不是数据会大量积压在 MQ 里,而是大量的数据会直接搞丢。这个情况下,就不是说要增加 Consumer 消费积压的消息,因为实际上没啥积压,而是丢了大量的消息。

我们可以采取一个方案,就是批量重导。就是大量积压的时候,直接将数据写到数据库,然后等过了高峰期以后将这批数据一点一点的查出来,然后重新灌入 MQ 里面去,把丢的数据给补回来。

大量消息在 MQ 里长时间积压,该如何解决?

一般这个时候,只能临时紧急扩容了,具体操作步骤和思路如下:

  1. 先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉;
  2. 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量;
  3. 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue;
  4. 接着临时用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据;
  5. 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
  • 标题: Kafka学习
  • 作者: Olivia的小跟班
  • 创建于 : 2023-06-27 14:29:17
  • 更新于 : 2023-07-19 13:52:38
  • 链接: https://www.youandgentleness.cn/2023/06/27/Kafkaѧϰ/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论