Kafka分区机制介绍与示例

article/2025/10/7 6:40:28

Kafka中可以将Topic从物理上划分成一个或多个分区(Partition),每个分区在物理上对应一个文件夹,以”topicName_partitionIndex”的命名方式命名,该文件夹下存储这个分区的所有消息(.log)和索引文件(.index),这使得Kafka的吞吐率可以水平扩展。

生产者在生产数据的时候,可以为每条消息指定Key,这样消息被发送到broker时,会根据分区规则选择被存储到哪一个分区中,如果分区规则设置的合理,那么所有的消息将会被均匀的分布到不同的分区中,这样就实现了负载均衡和水平扩展。另外,在消费者端,同一个消费组可以多线程并发的从多个分区中同时消费数据(后续将介绍这块)。

上面所说的分区规则,是实现了kafka.producer.Partitioner接口的类,可以自定义。比如,下面的代码SimplePartitioner中,将消息的key做了hashcode,然后和分区数(numPartitions)做模运算,使得每一个key都可以分布到一个分区中:

package com.lxw1234.kafka;import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;public class SimplePartitioner implements Partitioner {public SimplePartitioner (VerifiableProperties props) {}@Overridepublic int partition(Object key, int numPartitions) {int partition = 0;String k = (String)key;partition = Math.abs(k.hashCode()) % numPartitions;return partition;}}

在创建Topic时候可以使用–partitions <numPartitions>指定分区数。也可以在server.properties配置文件中配置参数num.partitions来指定默认的分区数。

但有一点需要注意,为Topic创建分区时,分区数最好是broker数量的整数倍,这样才能是一个Topic的分区均匀的分布在整个Kafka集群中,假设我的Kafka集群由4个broker组成,以下图为例:

kafka partition

创建带分区的Topic

现在创建一个topic “lxw1234”,为该topic指定4个分区,那么这4个分区将会在每个broker上各分布一个:

./kafka-topics.sh 
--create 
--zookeeper zk1:2181,zk2:2181,zk3:2181 
--replication-factor 1
--partitions 4 
--topic lxw1234

kafka partition

这样所有的分区就均匀分布在集群中,如果创建topic时候指定了3个分区,那么就有一个broker上没有该topic的分区。

带分区规则的生产者

现在用一个生产者示例(PartitionerProducer),向Topic lxw1234中发送消息。该生产者使用的分区规则,就是上面的SimplePartitioner。从0-10一共11条消息,每条消息的key为”key”+index,消息内容为”key”+index+”–value”+index。比如:key0–value0、key1–value1、、、key10–value10。

package com.lxw1234.kafka;import java.util.Properties;import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;public class PartitionerProducer {public static void main(String[] args) {Properties props = new Properties();props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("metadata.broker.list", "127.0.0.17:9091,127.0.0.17:9092,127.0.0.102:9091,127.0.0.102:9092");props.put("partitioner.class", "com.lxw1234.kafka.SimplePartitioner");Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));String topic = "lxw1234";for(int i=0; i<=10; i++) {String k = "key" + i;String v = k + "--value" + i;producer.send(new KeyedMessage<String, String>(topic,k,v));}producer.close();}
}

理论上来说,生产者在发送消息的时候,会按照SimplePartitioner的规则,将key0做hashcode,然后和分区数(4)做模运算,得到分区索引:

hashcode(”key0”) % 4 = 1

hashcode(”key1”) % 4 = 2

hashcode(”key2”) % 4 = 3

hashcode(”key3”) % 4 = 0

         ……

对应的消息将会被发送至相应的分区中。

统计各分区消息的消费者

下面的消费者代码用来验证,在消费数据时,打印出消息所在的分区及消息内容:

package com.lxw1234.kafka;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;public class MyConsumer {public static void main(String[] args) {String topic = "lxw1234";ConsumerConnector consumer = Consumer.createJavaConsumerConnector(createConsumerConfig()); Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(1));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);ConsumerIterator<byte[], byte[]> it = stream.iterator();while(it.hasNext()) {MessageAndMetadata<byte[], byte[]> mam = it.next();System.out.println("consume: Partition [" + mam.partition() + "] Message: [" + new String(mam.message()) + "] ..");}}private static ConsumerConfig createConsumerConfig() {Properties props = new Properties();props.put("group.id","group1");props.put("zookeeper.connect","127.0.0.132:2181,127.0.0.133:2182,127.0.0.134:2183");props.put("zookeeper.session.timeout.ms", "400");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");props.put("auto.offset.reset", "smallest");return new ConsumerConfig(props);}
}

运行程序验证结果

先启动消费者,再运行生产者。

之后在消费者的控制台可以看到如下输出:

kafka partition

结果和正常预期一致。


http://chatgpt.dhexx.cn/article/9J3MOjNU.shtml

相关文章

Kafka介绍

1. Kafka的基本介绍 1.1 什么是Kafka&#xff1f; Kafka是最初由Linkedin公司开发&#xff0c;是一个分布式、分区的、多副本的、多订阅者&#xff0c;基于zookeeper协调的分布式日志系统&#xff08;也可以当做MQ系统&#xff09;&#xff0c;常见可以用于web/nginx日志、访…

什么是Kafka?

1 kafka 是什么   Apache kafka is a distributed streaming platform&#xff0c;即官方定义 kafka 是一个分布式流式计算平台。而在大部分企业开发人员中&#xff0c;都是把 kafka 当成消息系统使用&#xff0c;即它是一个分布式消息队列&#xff0c;很少会使用 kafka 的流…

Linux 之软连接

1.创建软连接 创建文件及文件夹 创建一个软连接 创建的语法&#xff1a;ln -s是必须的&#xff0c;然后后面跟一个目标文件夹&#xff0c;最后是一个当前目录的软连接名。 删除软连接 错误示范&#xff1a; 正确删除&#xff1a; 删除软连接时&#xff0c;要注意软连接的路…

Linux下如何创建和取消软连接

建立软连接&#xff1a; ln -s /usr/nodejs/bin/npm /usr/local/bin/ ln -s /usr/nodejs/bin/node /usr/local/bin/ 删除软连接&#xff1a; rm -rf /usr/local/bin/node注意&#xff1a;取消软连接最后没有/&#xff0c;rm -rf 软连接。加上/是删除文件夹 可以看到&#xf…

ubuntu 软连接建立

程序报错 /bin/sh: 1: /usr/bin/c: not found/usr/bin 主要放置一些应用软体工具的必备执行档例如c、g、gcc&#xff0c;一些软件的运行脚本&#xff0c;在目录中确实没有看到c,g问题应该出在这里 g -v报错Unable to exec g.real: No such file or directory 解决&#xff1a;…

windows系统下创建软连接

Windows系统创建软连接 为c:\Users\hp\.Pycharm2017.3\system\ 创建index文件软连接&#xff0c;被连接文件为D:\”deep learning”\bak\index

创建软连接和硬链接

前言 硬链接的原理&#xff1a;使链接的两个文件共享同样的文件内容&#xff0c;也就是同样的 inode。 硬链接有一个缺陷&#xff1a;只能创建指向文件的硬链接&#xff0c;不能创建指向目录的硬链接。但软链接可以指向文件或目录。 软链接的原理&#xff1a;就跟我们在windo…

软连接与硬链接

引入 1.硬链接与软连接 Linux 系统中有软链接和硬链接两种特殊的 "文件"2.inode是什么 ⛅要解释清楚两者的区别和联系需要先说清楚 linux 文件系统中的 inode 这个东西 ⛅当划分磁盘分区并格式化的时候&#xff0c;整个分区会被划分为两个部分&#xff0c;即inod…

Linux建立软链接、硬链接

软链接 说明&#xff1a;软链接仅仅包含所链接文件的路径名&#xff0c;因此能链接目录文件&#xff0c;也可以跨越文件系统进行链接。但是&#xff0c;当原始文件被删除后&#xff0c;链接文件也将失效。 1.软链接&#xff0c;以路径的形式存在。类似于Windows操作系统中的快…

软连接和硬链接

好多人对软件链接&#xff0c;硬链接不清楚&#xff0c;今天给大家介绍一下 一、软链接&#xff08;Soft Link&#xff09; 1.释义 又被叫为符号链接&#xff08;symbolic Link&#xff09;&#xff0c;它包含了到原文件的路径信息。 2.特性 &#xff08;1&#xff09;软链…

【DEBUG】phpstudy启动mysql服务时候发现3306端口被占用

点击phpstudy之后启动mysql&#xff0c;发现3306端口被占用。打开sqlectron发现并不是sqlectron占用了端口。在命令行中显示3306端口被占用&#xff0c;试图强行结束该进程&#xff0c;但是失败&#xff0c;原因是拒绝访问。 进入phpstudy的info界面需要用户名和密码&#xff0…

数据库安装处理提示3306端口被占用

数据库安装提示3306端口被占用怎么处理&#xff1f; 方法1&#xff1a; 1、windows命令窗口输入services.msc进入服务管理 2、查找mysql服务&#xff0c;右键停止&#xff0c;即可正常安装。 方法2&#xff1a; 1&#xff1a;进入cmd&#xff0c;查看计算机当前进程&#…

三步简单解决3306端口占用问题(windows)

一、查看占用3306端口的进程 其他端口参照3306netstat -aon|findstr 3306二、记住这个pid号 三、ctrl shiftesc调出任务管理器

解决mysql重装时,3306端口被占用(完整版)

解决方法&#xff1a; 1、打开终端&#xff0c;输入cmd,命令框内输入&#xff1a;netstat -ano 会显示如下图的内容&#xff0c;杀死3306对应端口的进程即可 2、杀死进程操作&#xff1a; taskkill /pid xxx -t -f xxx代表该端口的进程ID&#xff0c;也就是上图中pid一栏 …

查看3306端口被谁占用

今天安装mysql一直有问题,怀疑3306被谁占用了,排查开始: 一: 使用命令符netstat命令查看 netstat -a -n 显示各个端口占用: netstat -ano 显示各个端口占用和进程PID: 二: 使用netstat -aon|findstr "3306"命令查找"3306"端口信息 三: 使用tasklis…

安装MySQL时端口3306被占用,显示红色感叹号的解决办法(2023年,5月)

1、当我们安装MySQL时如果出现如图以下情况&#xff0c;说明端口号3306正在被其他程序占用着。 2、快捷键WinR打开运行窗口输入“cmd”点击【确定】进入命令提示符。 3 、输入命令netstat -aon查看本地地址3306对应的PID是多少。 netstat -aon 4、但是为了节省时间&#xff0c…

查看 Windows 端口被占用情况

查看 Windows 端口被占用情况 背景&#xff1a; 搞个开发配置&#xff0c;时不时就碰到端口被占用的情况windows 命令用得少&#xff0c;想又想不起来&#xff0c;搜又搜不准 查找原理&#xff1a; 通过 CMD 来输入命令优先找到监听端口的进程通过进程找到程序 查找流程&a…

安装mysql3306被占用_安装mysql提示3306端口已经被占用解决方案

今天遇到的问题是这样的,之前已经安装过mysql了,一直用的好好的,但是今天开启服务时报异常,无法启动。为了省事,于是想到卸载重装,在安装的过程中发现3306已经被占用,这也是一开始服务无法启动的原因。看到有人说用fport查看端口号,于是下载了,发现win7用不了(很尴尬啊…

3306端口被占用

winR组合键 -- 输入cmd -- 回车&#xff0c;输入以下命令 显示各个端口使用情况&#xff1a;netstat -a -n 或者 netstat -ano 查看端口被哪个程序占用&#xff1a; netstat -ano|findstr "3306" tasklist|findstr "5340"

安装php环境显示端口被占用,【亲测】启动PHPstudy提醒80、3306端口被占用的2种解决办法_全百科网...

我们在启动PHPstudy应用或者是任意Apache环境的时候经常发现我们需要使用的端口被占用的情况&#xff0c;本文分享经过全百科网实战测试的三种方法解决这个问题&#xff0c;如果通过第一种netstat -ano找不到所占用的端口软件&#xff0c;可直接使用第二、三种方法直接修改端口…