#创建Topic bin/kafka-topics.sh –create –topic test –bootstrap-server localhost:9092 kafka创建topic不是这样的吗,经常看到说根据key的hash值来指定partition,这啥意思,我看命令创建topic也没有key啊
指定key,自动根据key来计算partition id是谁的行为,是kafka自动完成的吗,还是java的api完成的
回答
- 创建 Topic (
kafka-topics.sh):是在搭建“容器”。你只需要定义这个容器有几个分区(Partitions),不需要关心具体的 Key。 - 发送消息 (Producer):是在往容器里“放东西”。这时候才涉及到 Key,以及 Key 如何决定数据落入哪个分区。
1. 创建 Topic 时发生了什么?
1bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
- 作用:告诉 Kafka 集群,“我要建一个叫
test的主题”。 - 默认行为:如果你没指定
--partitions参数,Kafka 会使用配置文件中的默认分区数(通常是 1 或根据 broker 数量决定)。 - 关键点:创建时确实不需要指定 Key。Topic 只是一个逻辑概念,它由一个或多个Partition(分区)组成。
- 你可以把 Topic 想象成一个有 N 个格子的货架。创建 Topic 就是决定这个货架有几个格子。
1# 创建一个有 3 个分区的 topic 2bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
2. “根据 Key 的 Hash 值指定 Partition”是什么意思?
send()方法时,你可以选择带 Key 发送,也可以不带 Key 发送。情况 A:发送消息时指定了 Key
Key = "user_123",Value = "..."。Kafka 的默认分区器(Default Partitioner)会执行以下算法:
- 计算 Hash:对 Key 进行哈希计算(通常是 Murmur2 算法)。
- 取模运算:用哈希值对 Topic 的分区总数取模。
- 结果:只要 Key 不变,哈希值就不变,取模结果也不变。这意味着相同的 Key 永远会被发送到同一个 Partition。
- 保证顺序性:Kafka 只能保证单个 Partition 内的消息是有序的。如果你需要保证“用户 A”的操作日志严格按时间顺序处理,就必须把该用户的所有消息发到同一个 Partition。通过固定 Key,就能实现这一点。
- 数据局部性:相同业务实体的数据聚集在一起,方便后续消费处理。
情况 B:发送消息时没有指定 Key (Key = null)
- Kafka 会使用轮询(Round-Robin)策略(在新版本客户端中)或随机策略。
- 消息会均匀地分发到所有 Partition 中。
- 结果:无法保证同一类数据的顺序性,但能最大化利用并行度。
3. 代码层面的对比
1// 没有指定 Key,Kafka 会自动轮询分配到 partition 0, 1, 2... 2ProducerRecord<String, String> record = new ProducerRecord<>("test", "message content"); 3producer.send(record);
1// 指定了 Key "order_888" 2// Kafka 计算 hash("order_888") % 3 (假设有3个分区) 3// 假设结果是 1,那这条消息一定去 partition 1 4// 下次再发 key 为 "order_888" 的消息,还会去 partition 1 5ProducerRecord<String, String> record = new ProducerRecord<>("test", "order_888", "message content"); 6producer.send(record);
总结
- 你的命令没错:
kafka-topics.sh只是负责“建房子”(定义分区数量),不需要知道将来住进来的人(Key)是谁。 - Key 的作用域:Key 是发消息时才用到的属性。
- 核心逻辑:
- 有 Key→→
Hash(Key) % 分区数→→固定落入某个分区(保证有序)。 - 无 Key→→轮询/随机→→均匀落入各个分区(保证负载均衡)。
- 有 Key→→
--partitions数量以支撑并发。详细流程拆解
1. 用户调用 API (你的代码)
1// 你传入了 key: "user_123" 2ProducerRecord<String, String> record = new ProducerRecord<>("test", "user_123", "data"); 3producer.send(record);
record对象里只有 Topic 名称和 Key,还没有 Partition ID。2. 客户端拦截器与分区器 (Java Client 内部)
kafka-clients.jar)在将消息放入发送缓冲区之前,会执行一个关键步骤:计算分区。- 核心接口:
org.apache.kafka.clients.producer.Partitioner - 默认实现:
org.apache.kafka.clients.producer.internals.DefaultPartitioner - 执行逻辑:
- 客户端检查
record是否指定了 Partition。如果没有(通常是这种情况)。 - 客户端检查是否有 Key。
- 如果有 Key:调用
partitioner.partition(topic, key, ...)。- 默认分区器会对 Key 进行 Murmur2 哈希。
- 然后对 Topic 的分区总数取模:
hash(key) % numPartitions。 - 计算出具体的 Partition ID(比如 2)。
- 如果无 Key:使用轮询策略(Sticky Round-Robin)选择一个分区。
- 如果有 Key:调用
- 此时,Partition ID 已经被“写死”在待发送的消息元数据中了。
- 客户端检查
3. 网络发送
Partition ID = 2的消息,通过网络发送给 Broker。请求包里大致长这样:
1{ 2 "topic": "test", 3 "partition": 2, // <--- 注意:这里已经是确定的数字了 4 "key": "user_123", 5 "value": "data" 6}
4. Broker 接收 (Kafka 服务端)
- 它不再计算Hash,也不决定去哪个分区。
- 它只是像个快递员一样,根据请求包里的
partition: 2,直接把消息追加到本地磁盘上对应的test-2日志文件中。 - 如果客户端发来的 Partition ID 不存在(比如 Topic 只有 3 个分区,客户端却发了 partition=5),Broker 会直接报错
UnknownTopicOrPartitionException。
为什么要设计成客户端计算?
-
性能(Performance):
- 如果让 Broker 计算,Producer 发送消息时只能发
Key,Broker 收到后需要计算 Hash、查找元数据、决定分区,然后再写入。这会增加 Broker 的 CPU 负担和网络往返的延迟。 - 让客户端计算,Broker 只需要做最简单的“追加写”操作(Append Only),吞吐量极大提升。
- 如果让 Broker 计算,Producer 发送消息时只能发
-
批处理(Batching):
- Kafka Producer 为了提高效率,会把发往同一个 Partition的多条消息打包成一个批次(Batch)一起发送。
- 只有客户端先算出每条消息属于哪个 Partition,才能把属于 Partition 0 的消息攒在一起,属于 Partition 1 的攒在一起,然后分别发送。如果等 Broker 算,就没法在发送前做这种高效的 batching 了。
特殊情况:自定义分区策略
Hash(Key)策略,你可以自己写一个类实现Partitioner接口,告诉客户端怎么算。1public class MyCustomPartitioner implements Partitioner { 2 @Override 3 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { 4 // 获取该 topic 的分区总数 5 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); 6 int numPartitions = partitions.size(); 7 8 if ("VIP_USER".equals(key)) { 9 return 0; // 强制发到分区 0 10 } else { 11 // 其他用户随机或轮询 12 return Math.abs(new Random().nextInt()) % numPartitions; 13 } 14 } 15 // ... 其他方法省略 16}
1props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyCustomPartitioner.class.getName());
版权:言论仅代表个人观点,不代表官方立场。转载请注明出处:https://www.stntk.com/question/2525.html
还没有评论呢,快来抢沙发~