Kafka 基础
2026/1/15大约 2 分钟
Kafka 基础
核心概念
架构
┌─────────────────────────────────────────────────────────────────┐
│ Kafka 集群 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │Producer │ │Producer │ │Producer │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Broker Cluster │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Broker1 │ │ Broker2 │ │ Broker3 │ │ │
│ │ │ ┌─────┐ │ │ ┌─────┐ │ │ ┌─────┐ │ │ │
│ │ │ │Part0│ │ │ │Part1│ │ │ │Part2│ │ │ │
│ │ │ │Part1│ │ │ │Part2│ │ │ │Part0│ │ │ │
│ │ │ └─────┘ │ │ └─────┘ │ │ └─────┘ │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │Consumer │ │Consumer │ │Consumer │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘核心组件
| 组件 | 说明 |
|---|---|
| Producer | 消息生产者 |
| Consumer | 消息消费者 |
| Broker | Kafka 服务节点 |
| Topic | 消息主题,逻辑分类 |
| Partition | 分区,物理存储单元 |
| Replica | 副本,保证高可用 |
| Consumer Group | 消费者组 |
Topic 和 Partition
┌─────────────────────────────────────────────────────────┐
│ Topic: orders │
├─────────────────────────────────────────────────────────┤
│ │
│ Partition 0: [msg0][msg3][msg6][msg9]... │
│ Partition 1: [msg1][msg4][msg7][msg10]... │
│ Partition 2: [msg2][msg5][msg8][msg11]... │
│ │
└─────────────────────────────────────────────────────────┘- 一个 Topic 可以有多个 Partition
- 每个 Partition 是有序的消息队列
- 消息在 Partition 内有唯一的 offset
- 不同 Partition 之间无序
副本机制
┌─────────────────────────────────────────────────────────┐
│ Partition 0 (replication=3) │
├─────────────────────────────────────────────────────────┤
│ │
│ Broker1: [Leader] ──写入──> [msg0][msg1][msg2] │
│ Broker2: [Follower] ──同步──> [msg0][msg1][msg2] │
│ Broker3: [Follower] ──同步──> [msg0][msg1][msg2] │
│ │
└─────────────────────────────────────────────────────────┘- Leader 处理读写请求
- Follower 从 Leader 同步数据
- ISR(In-Sync Replicas):同步副本集合
快速开始
安装启动
# 下载 Kafka
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0
# 启动 Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动 Kafka
bin/kafka-server-start.sh config/server.properties基本操作
# 创建 Topic
bin/kafka-topics.sh --create --topic test \
--bootstrap-server localhost:9092 \
--partitions 3 --replication-factor 1
# 查看 Topic
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看 Topic 详情
bin/kafka-topics.sh --describe --topic test \
--bootstrap-server localhost:9092
# 发送消息
bin/kafka-console-producer.sh --topic test \
--bootstrap-server localhost:9092
# 消费消息
bin/kafka-console-consumer.sh --topic test \
--bootstrap-server localhost:9092 --from-beginning
# 删除 Topic
bin/kafka-topics.sh --delete --topic test \
--bootstrap-server localhost:9092Spring Boot 集成
依赖配置
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*"生产者
@Service
public class OrderProducer {
@Autowired
private KafkaTemplate<String, Order> kafkaTemplate;
public void sendOrder(Order order) {
kafkaTemplate.send("orders", order.getId(), order);
}
// 带回调
public void sendOrderWithCallback(Order order) {
kafkaTemplate.send("orders", order.getId(), order)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("发送失败", ex);
} else {
log.info("发送成功: {}", result.getRecordMetadata());
}
});
}
}消费者
@Service
public class OrderConsumer {
@KafkaListener(topics = "orders", groupId = "order-group")
public void consume(Order order) {
log.info("收到订单: {}", order);
// 处理订单
}
// 手动提交
@KafkaListener(topics = "orders", groupId = "order-group")
public void consumeManual(Order order, Acknowledgment ack) {
try {
processOrder(order);
ack.acknowledge(); // 手动提交
} catch (Exception e) {
// 处理失败,不提交
}
}
// 批量消费
@KafkaListener(topics = "orders", groupId = "order-group")
public void consumeBatch(List<Order> orders) {
orders.forEach(this::processOrder);
}
}