Kafka 进阶
2026/1/15大约 2 分钟
Kafka 进阶
生产者
发送流程
┌─────────────────────────────────────────────────────────┐
│ Producer 发送流程 │
├─────────────────────────────────────────────────────────┤
│ │
│ 消息 → 序列化 → 分区器 → 缓冲区 → Sender线程 → Broker │
│ │
└─────────────────────────────────────────────────────────┘分区策略
// 自定义分区器
public class OrderPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (key == null) {
// 轮询
return counter.getAndIncrement() % numPartitions;
}
// 按 key hash
return Math.abs(key.hashCode()) % numPartitions;
}
}可靠性配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// acks 配置
// 0: 不等待确认
// 1: Leader 确认
// all/-1: 所有 ISR 确认
props.put("acks", "all");
// 重试
props.put("retries", 3);
props.put("retry.backoff.ms", 100);
// 幂等性
props.put("enable.idempotence", true);
// 批量发送
props.put("batch.size", 16384);
props.put("linger.ms", 5);
// 缓冲区
props.put("buffer.memory", 33554432);事务
Properties props = new Properties();
props.put("transactional.id", "order-producer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic1", "msg1"));
producer.send(new ProducerRecord<>("topic2", "msg2"));
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}消费者
消费流程
┌─────────────────────────────────────────────────────────┐
│ Consumer 消费流程 │
├─────────────────────────────────────────────────────────┤
│ │
│ 1. 加入消费者组 │
│ 2. 分配分区(Rebalance) │
│ 3. 拉取消息(poll) │
│ 4. 处理消息 │
│ 5. 提交 offset │
│ │
└─────────────────────────────────────────────────────────┘消费者组
┌─────────────────────────────────────────────────────────┐
│ Consumer Group: order-group │
├─────────────────────────────────────────────────────────┤
│ │
│ Topic: orders (3 partitions) │
│ │
│ Consumer1 ← Partition0 │
│ Consumer2 ← Partition1 │
│ Consumer3 ← Partition2 │
│ │
│ 注:消费者数 > 分区数时,多余消费者空闲 │
│ │
└─────────────────────────────────────────────────────────┘Offset 管理
// 自动提交
props.put("enable.auto.commit", true);
props.put("auto.commit.interval.ms", 5000);
// 手动提交
props.put("enable.auto.commit", false);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record);
}
// 同步提交
consumer.commitSync();
// 或异步提交
consumer.commitAsync();
}Rebalance
触发条件:
- 消费者加入/离开
- 订阅 Topic 变化
- 分区数变化
// 监听 Rebalance
consumer.subscribe(Arrays.asList("topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 分区被撤销前,提交 offset
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 分区分配后
}
});可靠性保证
消息不丢失
Producer 端:
- acks=all
- retries > 0
- 使用回调确认
Broker 端:
- replication.factor >= 3
- min.insync.replicas >= 2
- unclean.leader.election.enable=false
Consumer 端:
- 手动提交 offset
- 处理成功后再提交
消息不重复
Producer 端:
- enable.idempotence=true
- 使用事务
Consumer 端:
- 业务幂等处理
- 使用唯一 ID 去重
消息顺序
- 单分区内有序
- 指定 key 保证相同 key 到同一分区
- max.in.flight.requests.per.connection=1
性能优化
Producer 优化
# 批量发送
batch.size=32768
linger.ms=10
# 压缩
compression.type=lz4
# 缓冲区
buffer.memory=67108864Consumer 优化
# 批量拉取
fetch.min.bytes=1048576
fetch.max.wait.ms=500
max.poll.records=500
# 并行消费
# 增加分区数和消费者数Broker 优化
# 日志配置
log.segment.bytes=1073741824
log.retention.hours=168
# 网络
num.network.threads=8
num.io.threads=16
# 副本
num.replica.fetchers=4