Kafka 消息队列教程

用户投稿头像

用户投稿

管理员

发布于:2026年06月08日

3 阅读 · 0 评论

【队列&循环队列】手动实现循环队列,掌握循环队列的每一处细节

Apache Kafka 是一个分布式流处理平台,核心能力包括发布订阅消息系统、存储记录、处理实时流,相比传统消息队列(如 RabbitMQ),Kafka 强调高吞吐、持久化、分区有序。

适用场景:日志采集、用户行为追踪、指标监控、事件溯源、实时数仓 ETL。

核心概念

概念 说明
Producer 消息生产者,向 Topic 发送记录
Consumer 消息消费者,从 Topic 读取记录
Broker Kafka 服务器节点,一个集群包含多个 Broker
Topic 逻辑消息分类,类似数据库表
Partition Topic 的分片单元,提供顺序保证和并行能力
Offset 分区内消息的唯一递增序号
Consumer Group 一组消费者协同消费一个或多个 Topic

安装与基础操作

1 快速启动(使用内置 ZooKeeper)

# 下载并解压(以 3.6.0 为例)
wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0
# 启动 ZooKeeper(Kafka 3.x 依赖)
bin/zookeeper-server-start.sh config/zookeeper.properties
# 另开终端,启动 Kafka
bin/kafka-server-start.sh config/server.properties

Kafka 消息队列教程

2 Topic 管理

# 创建 Topic(分区数 3,副本因子 1)
bin/kafka-topics.sh --create --topic test-topic \
  --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
# 列出 Topic
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看详情
bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server localhost:9092

3 发送与消费消息

# 生产者(控制台输入)
bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
# 消费者(默认从最新 offset 开始)
bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server localhost:9092
# 从头消费
bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server localhost:9092 --from-beginning

核心机制

1 分区与顺序

  • 同一分区内消息严格有序,全局无序。
  • 生产时可指定 key,相同 key 的消息被哈希到同一分区。
  • 消费者组内,每个分区只能被一个消费者实例读取。

Kafka 消息队列教程

2 持久化与保留策略

  • 消息写入磁盘(默认 /tmp/kafka-logs),基于时间(log.retention.hours=168)或大小删除。
  • 可配置紧凑型(compacted)Topic 保留最近的消息。

3 副本与高可用

  • 每个分区有 Leader 和若干 Follower(副本)。
  • Leader 处理读写,Follower 异步拉取同步。
  • Leader 故障时,从 ISR(In-Sync Replica)中选举新 Leader。

客户端编程(Java 示例)

1 生产者

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("test-topic", "key-1", "message body"));
producer.close();

2 消费者

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "demo-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    records.forEach(record -> {
        System.out.printf("offset = %d, key = %s, value = %s%n",
            record.offset(), record.key(), record.value());
    });
}

Kafka 消息队列教程

常用配置参数

参数 默认值 说明
acks all 生产者确认等级(0、1、all)
compression.type none 压缩方式(gzip、snappy、lz4)
batch.size 16384 批次字节数上限
linger.ms 0 发送前等待更多消息的时间
enable.auto.commit true 消费者自动提交 offset
max.poll.records 500 单次拉取最大消息数

常见问题与建议

  • 消息丢失:确认 acks=allmin.insync.replicas=2,并禁用自动提交。
  • 重复消费:生产端启用幂等性 enable.idempotence=true,消费端设计去重逻辑。
  • 积压处理:增加 Topic 分区数,同时增加消费者实例(不超过分区数)。
  • 顺序保证:单分区 + 单消费者,或使用生产者 key 控制分区。

监控工具

  • kafka-run-class.sh kafka.tools.JmxTool(JMX 指标)
  • Burrow(LinkedIn 开源 lag 检查工具)
  • Cruise Control(自动再均衡与运维)
  • Prometheus + Grafana(结合 JMX Exporter)

本教程覆盖基础用法与核心概念,生产环境建议配置多 Broker 集群、认证授权(SASL/ACL)以及监控告警体系。

标签:

相关阅读