【队列&循环队列】手动实现循环队列,掌握循环队列的每一处细节
Apache Kafka 是一个分布式流处理平台,核心能力包括发布订阅消息系统、存储记录、处理实时流,相比传统消息队列(如 RabbitMQ),Kafka 强调高吞吐、持久化、分区有序。
适用场景:日志采集、用户行为追踪、指标监控、事件溯源、实时数仓 ETL。
核心概念
| 概念 | 说明 |
|---|---|
| Producer | 消息生产者,向 Topic 发送记录 |
| Consumer | 消息消费者,从 Topic 读取记录 |
| Broker | Kafka 服务器节点,一个集群包含多个 Broker |
| Topic | 逻辑消息分类,类似数据库表 |
| Partition | Topic 的分片单元,提供顺序保证和并行能力 |
| Offset | 分区内消息的唯一递增序号 |
| Consumer Group | 一组消费者协同消费一个或多个 Topic |
安装与基础操作
1 快速启动(使用内置 ZooKeeper)
# 下载并解压(以 3.6.0 为例) wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz tar -xzf kafka_2.13-3.6.0.tgz cd kafka_2.13-3.6.0 # 启动 ZooKeeper(Kafka 3.x 依赖) bin/zookeeper-server-start.sh config/zookeeper.properties # 另开终端,启动 Kafka bin/kafka-server-start.sh config/server.properties
2 Topic 管理
# 创建 Topic(分区数 3,副本因子 1) bin/kafka-topics.sh --create --topic test-topic \ --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 # 列出 Topic bin/kafka-topics.sh --list --bootstrap-server localhost:9092 # 查看详情 bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server localhost:9092
3 发送与消费消息
# 生产者(控制台输入) bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092 # 消费者(默认从最新 offset 开始) bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server localhost:9092 # 从头消费 bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server localhost:9092 --from-beginning
核心机制
1 分区与顺序
- 同一分区内消息严格有序,全局无序。
- 生产时可指定
key,相同 key 的消息被哈希到同一分区。 - 消费者组内,每个分区只能被一个消费者实例读取。
2 持久化与保留策略
- 消息写入磁盘(默认
/tmp/kafka-logs),基于时间(log.retention.hours=168)或大小删除。 - 可配置紧凑型(compacted)Topic 保留最近的消息。
3 副本与高可用
- 每个分区有 Leader 和若干 Follower(副本)。
- Leader 处理读写,Follower 异步拉取同步。
- Leader 故障时,从 ISR(In-Sync Replica)中选举新 Leader。
客户端编程(Java 示例)
1 生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("test-topic", "key-1", "message body"));
producer.close();
2 消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "demo-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(record -> {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
});
}
常用配置参数
| 参数 | 默认值 | 说明 |
|---|---|---|
acks |
all | 生产者确认等级(0、1、all) |
compression.type |
none | 压缩方式(gzip、snappy、lz4) |
batch.size |
16384 | 批次字节数上限 |
linger.ms |
0 | 发送前等待更多消息的时间 |
enable.auto.commit |
true | 消费者自动提交 offset |
max.poll.records |
500 | 单次拉取最大消息数 |
常见问题与建议
- 消息丢失:确认
acks=all、min.insync.replicas=2,并禁用自动提交。 - 重复消费:生产端启用幂等性
enable.idempotence=true,消费端设计去重逻辑。 - 积压处理:增加 Topic 分区数,同时增加消费者实例(不超过分区数)。
- 顺序保证:单分区 + 单消费者,或使用生产者 key 控制分区。
监控工具
kafka-run-class.sh kafka.tools.JmxTool(JMX 指标)- Burrow(LinkedIn 开源 lag 检查工具)
- Cruise Control(自动再均衡与运维)
- Prometheus + Grafana(结合 JMX Exporter)
本教程覆盖基础用法与核心概念,生产环境建议配置多 Broker 集群、认证授权(SASL/ACL)以及监控告警体系。


