kafka 命令、API

article/2025/4/20 0:56:45

 

日萌社

人工智能AI:Keras PyTorch MXNet TensorFlow PaddlePaddle 深度学习实战(不定时更新)


 

大数据组件使用 总文章

  • kafka 生产/消费API、offset管理/原理、kafka命令
  • kafka 命令、API
  • Kafka 安装、原理、使用
  • mapreduce 实时消费 kafka 数据

1.使用控制台运行

    1.创建一个topic主题  cd /root/kafka chmod 777 /root/kafka/bin/kafka-topics.sh格式:bin/kafka-topics.sh --create --zookeeper IP:2181 --replication-factor 副本数 --partitions 分片数/分区数 --topic 主题名例子:bin/kafka-topics.sh --create --zookeeper NODE1:2181 --replication-factor 1 --partitions 1 --topic order

    2.编写代码启动一个生产者,生产数据

        cd /root/kafka chmod 777 /root/kafka/bin/kafka-console-producer.sh格式:bin/kafka-console-producer.sh --broker-list IP:9092 --topic 主题名例子:bin/kafka-console-producer.sh --broker-list NODE1:9092 --topic order


    3.编写代码启动给一个消费者,消费数据

        cd /root/kafka chmod 777 /root/kafka/bin/kafka-console-consumer.sh格式:bin/kafka-console-consumer.sh --zookeeper IP:2181 --from-beginning --topic 主题名例子:bin/kafka-console-consumer.sh --zookeeper NODE1:2181 --from-beginning --topic order

    4.Kafka 常用操作命令

        1.查看当前服务器中的所有 topiccd /root/kafka格式:bin/kafka-topics.sh --list --zookeeper zookeeper的IP:2181例子:bin/kafka-topics.sh --list --zookeeper NODE1:21812.创建 topiccd /root/kafka格式:bin/kafka-topics.sh --create --zookeeper zookeeper的IP:2181 --replication-factor 副本数 --partitions 分片数/分区数 --topic 主题名例子:bin/kafka-topics.sh --create --zookeeper NODE1:2181 --replication-factor 1 --partitions 1 --topic test3.删除 topic需要在kafka集群中的每个kafka服务器中的 vim /root/kafka/config/server.properties 中设置 delete.topic.enable=true 否则只是标记删除或者直接重启cd /root/kafka格式:bin/kafka-topics.sh --delete --zookeeper zookeeper的IP:2181 --topic 主题名例子:bin/kafka-topics.sh --delete --zookeeper NODE1:2181 --topic test4.通过 shell 命令发送消息cd /root/kafka格式:bin/kafka-console-producer.sh --broker-list kafka的IP:9092 --topic 主题名例子:bin/kafka-console-producer.sh --broker-list NODE1:9092 --topic test5.通过 shell 消费消息cd /root/kafka格式:bin/kafka-console-consumer.sh --zookeeper zookeeper的IP:2181 --from-beginning --topic 主题名例子:bin/kafka-console-consumer.sh --zookeeper NODE1:2181 --from-beginning --topic test6.查看消费位置cd /root/kafka格式:bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zookeeper的IP:2181 --group testGroup例子:bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper NODE1:2181 --group testGroup7.查看某个 Topic 的详情cd /root/kafka格式:bin/kafka-topics.sh --topic 主题名 --describe --zookeeper zookeeper的IP:2181例子:bin/kafka-topics.sh --topic 主题名 --describe --zookeeper NODE1:21818.对分区数进行修改cd /root/kafka格式:bin/kafka-topics.sh --zookeeper zookeeper的IP --alter --partitions 2 --topic 主题名例子:bin/kafka-topics.sh --zookeeper NODE1 --alter --partitions 分片数/分区数 --topic 主题名
消费者:
旧版:kafka-console-consumer --zookeeper cdh01:2181,cdh02:2181,cdh03:2181 --topic test --from-beginning新版:新版中仍然使用旧版的消费者命令会报错:zookeeper is not a recognized optionkafka-console-consumer --bootstrap-server cdh01:9092,cdh02:9092,cdh03:9092 --topic test --from-beginning指定消费者组:kafka-console-consumer --bootstrap-server cdh01:9092,cdh02:9092,cdh03:9092 --group testGroup --topic test --from-beginning查看消费位置
指定消费者组:kafka-consumer-groups --bootstrap-server cdh01:9092,cdh02:9092,cdh03:9092 --describe --group testGroup kafka-consumer-groups --bootstrap-server cdh01:9092,cdh02:9092,cdh03:9092 --describe --group testGroup --all-topics

 


2.使用Java api运行

    1.java工程-maven,依赖<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.1</version></dependency>2.生产者(先启动消费者等待接收数据,然后再启动生产者进行发送数据)//kafka生产者:订单的生产者代码public class OrderProducer{public static void main(String[] args) throws InterruptedException{// 通过Properties配置文件的方式 配置"连接集群"的信息Properties props = new Properties();props.put("bootstrap.servers", "NODE1:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//kafka生产者KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);for (int i = 0; i < 10000; i++){//数据分发策略:可以指定数据发往哪个partition,当ProducerRecord 的构造参数中有partition的时候,就可以发送到对应partition上// new ProducerRecord<String, String>("topic主题名",partition,"key", "value")ProducerRecord<String, String> partition = new ProducerRecord<String, String>("order", 0, "key", "订单"+i);//数据分发策略:如果生产者没有指定partition,但是发送消息中有key,根据key的hash值的方式发送数据到那个partition分区/分片中// new ProducerRecord<String, String>("topic主题名","key", "value")ProducerRecord<String, String> key = new ProducerRecord<String, String>("order", "key", "value"+i);//数据分发策略:既没有指定partition,也没有key的情况下如何发送数据。使用轮询的方式发送数据到那个partition分区/分片中// new ProducerRecord<String, String>("topic主题名", "value")ProducerRecord<String, String> value = new ProducerRecord<String, String>("order", "订单信息!"+i);//kafka生产者发送数据kafkaProducer.send(value);}}}3.消费者(先启动消费者等待接收数据,然后再启动生产者进行发送数据)    //kafka消费者:订单的消费者代码public class OrderConsumer{public static void main(String[] args){//1.通过Properties配置文件的方式 配置"连接集群"的信息Properties props = new Properties();props.put("bootstrap.servers", "NODE1:9092");//配置group.id:多个Consumer的group.id都相同的话,表示多个Consumer都在同一个消费组group中props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// Kafka消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);//2.订阅某个topic主题下的相关消息数据用于消费。可以订阅多个topic主题,封装到一个 List中,可以使用Arrays.asList进行封装kafkaConsumer.subscribe(Arrays.asList("order"));while (true){//jdk queue的操作方法:offer插入元素、poll获取元素。//blockingqueue的操作方法:put插入元素,take获取元素。ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);for (ConsumerRecord<String, String> record : consumerRecords){//此处需要等待到有数据才能消费进行获取打印System.out.println("消费的数据为:" + record.value());}}}}

 
3.topic主体中的分区partition 和 group.id消费者组 的关系

    Consumer Group(CG)消费者组:这是 kafka 用来实现一个 topic(主题) 消息的广播(发给所有的 consumer消费者)和 单播(发给任意一个 consumer消费者)的手段。一个 topic(主题) 可以有多个 CG消费者组。topic(主题) 的消息会复制(不是真的复制,是概念上的)到所有的 CG消费者组,但每个 partion(分区) 只会把消息发给该 CG 中的一个consumer消费者。如果需要实现广播,只要每个 consumer消费者 有一个独立的 CG 就可以了。要实现单播只要所有的 consumer 在同一个 CG。用 CG消费者组 还可以将 consumer消费者 进行自由的分组,而不需要多次发送消息到不同的 topic(主题)。Broker:一台 kafka 服务器就是一个 broker(中间人)。一个kafka集群(cluster) 由多个 broker(中间人) 组成。一个 broker 可以容纳多个 topic(主题)。Partition(分区):为了实现扩展性,一个非常大的 topic(主题) 可以分布到多个 broker(即服务器)上,一个topic(主题) 可以分为多个 partition(分区),每个 partition(分区) 是一个有序的队列(Queue)。partition(分区) 中的每条消息都会被分配一个有序的 id(offset)。kafka 只保证按一个 partition(分区) 中的顺序 将消息发给 consumer消费者,不保证一个 topic 的整体(多个 partition分区 间)的顺序

 

    情况一:同一个topic的一个partition只能被同一个customerGroup的一个customer消费;group里多于partition数量的customer会空闲;


    情况二:同一个topic的partition数量多于同一个customerGroup的customer数量时,会有一个customer消费多个partition,这样也就没法保证customer接收到的消息的顺序性,kafka只保证在一个partition上数据是有序的,无法保证topic全局数据的顺序性;


    情况三:一个topic 的partitions被多个customerGroup消费时,可以并行重复消费;

        kafka topic partition被同一个GROUPID的多个消费者消费,只有一个能收到消息的原因一般有如下:1.只有一个partition分区;同一个topic的同一个partition分区只允许同一个customerGroup的一个消费者消费信息,一个partition上不允许同一个消费者组的多个消费者并发,但同一个partition上是可以多个不同消费者组种的消费者并发消费的;2.多个partition分区,但是,消息在生产时只发往到了一个partiton上;key的hashCode%partitionNum相同导致,或者自定义了分区策略;导致这种严重的数据倾斜;

 

    1.发送到Kafka的消息会根据key,发送到对应topic的partition中,有默认的分发规则(也可以自己重写分发规则),基本上就是相同的key发送到一个partition中,不同的key有可能发送到相同的partition中。2.group是消费者中的概念,按照group(组)对消费者进行区分。对于每个group,需要先指定订阅哪个topic的消息,然后该topic下的partition会平均分配到group下面的consumer上。所以会出现以下这些情况:1.一个topic被多个group订阅,那么一条消息就会被不同group中的多个consumer处理。如果需要实现广播,只要每个 consumer消费者 有一个独立的 CG 就可以了。一个 topic(主题) 可以有多个 CG消费者组。topic(主题) 的消息会复制(不是真的复制,是概念上的)到所有的 CG消费者组,但每个 partion(分区) 只会把消息发给该 CG 中的一个consumer消费者。2.同一个group中,每个partition只会被一个consumer处理,这个consumer处理的消息不一定是同一个key的。所以需要在处理的地方判断。要实现单播只要所有的 consumer 在同一个 CG。3.例子,现在有一个用户信息修改的回调消息扔到消息队列里,有两个业务要处理,一个是更新数据库,一个是更新es索引信息topic:user_update_topickey:user_update_key_cid //cid标志公司的区分信息这样子的话,同一个公司的用户更新会被分配到一个partition中,同一个公司的用户更新能保证前后顺序不变group1中的consumer更新数据库,group:consumer1_groupgroup2中的consumer更新es索引信息,group:consumer2_groupgroup1、group2这个两个group会分别消费这个topic下的数据,对于每个group,内部的consumer会平分topic下的partition,相当于group中的每个consumer会处理多个公司的数据,但处理的公司不会有重叠。以上是topic中partition多余group中的消费者的时候,如果group下面有3个消费者,但是分区partition只有一个,那么三个消费者中只有一个会消费消息。

 

    消费者组 (Consumer Group)consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费(当然该分区还可以同时分配给其他group中的某个consumer来消费)。个人认为,理解consumer group记住下面这三个特性就好了:1.consumer group下可以有一个或多个consumer instance,consumer instance可以是一个进程,也可以是一个线程2.group.id是一个字符串,唯一标识一个consumer group3.consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer,当然该分区还可以同时分配给其他group中的某个consumer来消费。kafka consumer:消费者可以从多个broker中读取数据。消费者可以消费多个topic中的数据。因为Kafka的broker是无状态的,所以consumer必须使用partition offset来记录消费了多少数据。如果一个consumer指定了一个topic的offset,意味着该consumer已经消费了该offset之前的所有数据。consumer可以通过指定offset,从topic的指定位置开始消费数据。consumer的offset存储在Zookeeper中。offset:用来保存消费进度。offset表示在当前topic,当前groupID下消费到的位置。offset为earliest并不代表offset=1。在不进行过期配置的情况下,kafka消息默认7天时间就会过期。过期后其offset也就随之发生变化,使得用数字进行配置的消费进度并不准确。1.earliest:自动重置到最早的offset。2.latest:看上去重置到最晚的offset。3.none:如果边更早的offset也没有的话,就抛出异常给consumer,告诉consumer在整个consumer group中都没有发现有这样的offset。groupID:一个字符串用来指示一组consumer所在的组。相同的groupID表示在一个组里。相同的groupID消费记录offset时,记录的是同一个offset。所以,此处需要注意 (1)如果多个地方都使用相同的groupid,可能造成个别消费者消费不到的情况(2)如果单个消费者消费能力不足的话,可以启动多个相同groupid的consumer消费,处理相同的逻辑。


  

    produce方面:如果有多个分区,发送的时候按照key值hashCode%partitionNum哈希取模分区数来决定该条信息发往哪个partition, 这里可以自定义partition的分发策略,只要实现Partitioner接口就好,可以自定义成随机分发或者fangwang发往指定分区;customer方面:对于topic中的partition来说,一个partition只允许一个customer来消费,同一个partition上不允许customer并发;Partition数量 > customer数量时:一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 。最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目 。Partition数量 < customer数量时:就会有剩余的customer闲置,造成浪费;如果一个consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同;kafka只保证在一个上数据是有序的(每个partition都有segment文件记录消息的顺序性),无法保证topic全局数据的顺序行;增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化。


4.storm消费kafka时并行度设置问题

    1.首先明确的一点是,storm的并行度都是executor即线程级别的并行;包括work(进程),executor(线程)的设置,具体体现在works,spout,bolt设置上,同一个executor上设置多个task还是会串行化执行,并不能提高执行效率,这也是由于并行是线程并行,一个线程的多个task肯定是有先后执行顺序的,有顺序那就不是并行;关于node,work,executor,task关系和work,spout,bolt,并行度设置网上有很多资料,挺详细;2.这里记录下我遇到的自己关系的另外两个问题:一个是从kafka消费消息是spout并行度设置,另一个ack响应开启的是线程还是进程以及如何设置其数量;1.第一个问题:其实理解了上面kafka customer和partition的关系第一个问题也就解决了,spout的并发度实例数量设置最好和partition数量一样,这样能保证一个spout消费者实例对应一个partition,即实现了一个partition中消息消费的顺序性(有时消息的顺序性要求并不是很高)也能很好地提高整个topology的执行效率,至少对拓扑执行效率来说,瓶颈不会卡在spout(数据源)这里;2.第二个问题:通过Storm UI发现,work和spout,bolt并行度不变的情况下,多开几个acker_executors,works的数量并没有增加,反而是executors数量增加,这样就确定了acker_executors如其名一样只是线程,并不像有些网友说的ack的执行是会单独开启ack进程再在该进程里运行ack响应线程。他其实就是一个普通的ack线程,运行在已有的work进程里;3.另外通过测试发现,我设置了4个work,4个spout,4个bolt,没有设置acker_executors,Storm UI上显示Num workers是4,Num executors却是12(4个spout,4个bolt 这里一共是8个executors),所以默认情况下一个work里会有一个acker_executors。4.默认情况下 一个work会有一个executor,一个executor会有一个task,如果设置了他们的数量,就会按照设置的数量来生成对应实例;如开了4个work,2个spout,3个bolt,那spout和bolt的executors一共就会有5个(spout executors 2个,bolt executors 3个,),相当于有2个work里的每个work都有1个spout executor和1个bolt executor,另外还有1个work里只有1个bolt executor,另外还有一个work里啥也没有;其实这种配置会导致多开一个啥活也不干的work进程,有些浪费;


5.kafka 多个消费者在同一个groupID消费者组中,只有一个消费者能收到消息,解决方案如下
 

    1.业务需求:将一个业务逻辑分拆出来单独部署多个服务器上,然后与主程序之间通过kafka队列通信,每个业务实例上都有一个消费者在监听队列,且他们的groupid相同。我们的原意是主程序 往 队列上发送的命令参数(数据值)会被其中随机的一个消费者收到,从而实现一个负载均衡的效果,但是后来发现主程序发送往队列上的控制消息(数据值)始终是被其中固定的一个服务器上的消费者收到,其他的消费者从头到尾没有收到过。这样自然就达不到我们负载均衡的效果了。2.后来发现,造成这个结果的原因可能有两个:1.分区数(Partition)设成了12.发送消息的时候指定了key值3.原因一的解决方案:1.原因:分区数(Partition)设成了12.分析:因为kafka为了保证了消息的一致性,同一个分区的消息只会被于此关联的同一个消费者接收到(大致就是这个意思)。如果只有一个分区的话,如果第一条消息被其中一个消费者收到后,后面的消息始终会被这个消费者收到。所以应保证分区的数目大于消费者的数目。3.解决方案:#查看partition数据bin/kafka-topics.sh --zookeeper zk1,zk2,zk3 --describe --partitions 20 --topic topic_test#增加partition数目#增加partition数目后,如果使用的是java的kafka-client的Producer的话,会由于producer内部的缓存机制,#导致过几分钟后才被producer感知到partition数目的变化bin/kafka-topics.sh --zookeeper zk1,zk2,zk3 --alter --partitions 20 --topic topic_test4.原因二的解决方案:1.原因:发送消息的时候指定了key值2.分析:我们改了partitions 的数目后,再次测试,还是始终被其中一个消费者收到。后来查看了producer的send方法,发现了是因为我们把key设置为了””,也就是指定了key值为空字符串。3.send方法发送 代码分析:producer.send(new ProducerRecord<String, String>("test","",""),new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {// TODO Auto-generated method stub}});4.send方法的实现 和 partition方法的调用 代码分析:@Overridepublic Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {try {// first make sure the metadata for the topic is availablelong waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);byte[] serializedKey;try {serializedKey = keySerializer.serialize(record.topic(), record.key());} catch (ClassCastException cce) {throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +" specified in key.serializer");}byte[] serializedValue;try {serializedValue = valueSerializer.serialize(record.topic(), record.value());} catch (ClassCastException cce) {throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +" specified in value.serializer");}//可以看出 生产者往哪个partition分区 发送消息,实际是由 partition()方法得出的int partition = partition(record, serializedKey, serializedValue, metadata.fetch());int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);ensureValidRecordSize(serializedSize);TopicPartition tp = new TopicPartition(record.topic(), partition);log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback, remainingWaitMs);if (result.batchIsFull || result.newBatchCreated) {log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);this.sender.wakeup();}return result.future;// handling exceptions and record the errors;// for API exceptions return them in the future,// for other exceptions throw directly} catch (ApiException e) {log.debug("Exception occurred during message send:", e);if (callback != null)callback.onCompletion(null, e);this.errors.record();return new FutureFailure(e);} catch (InterruptedException e) {this.errors.record();throw new InterruptException(e);} catch (BufferExhaustedException e) {this.errors.record();this.metrics.sensor("buffer-exhausted-records").record();throw e;} catch (KafkaException e) {this.errors.record();throw e;}}5.partition方法的实现 和 ProducerRecord对象 代码分析:private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {Integer partition = record.partition();if (partition != null) {List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());int numPartitions = partitions.size();// they have given us a partition, use itif (partition < 0 || partition >= numPartitions)throw new IllegalArgumentException("Invalid partition given with record: " + partition+ " is not in the range [0..."+ numPartitions+ "].");return partition;}return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,cluster);}如果发送的时候已经指定了parition的话,就会发送到指定的partition上。怎么指定呢??我们发送的是一个ProducerRecoed对象。看它的构造方法,第二个参数就是partition。public ProducerRecord(String topic, Integer partition, K key, V value) {if (topic == null)throw new IllegalArgumentException("Topic cannot be null");this.topic = topic;this.partition = partition;this.key = key;this.value = value;}如果没有指定的parition的话,就调用:this.partitioner.partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)如果没有配置partitioner.class的话,那么partitioner默认是org.apache.kafka.clients.producer.internals.DefaultPartitioner的一个对象(参考文章末尾),查看一下它的partition()方法:public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {int nextValue = counter.getAndIncrement();List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {// no partitions are available, give a non-available partitionreturn DefaultPartitioner.toPositive(nextValue) % numPartitions;}} else {// hash the keyBytes to choose a partitionreturn DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}从上面代码就看出了如果指定了key值的话,partition的值实际上是由Utils.murmur2(keyBytes)哈希计算出来,这样自然每次都是被同一个消费者接收到。如果没有指定的话,就会通过轮询的方式逐个发送。这里有个问题就是,如果我们每次都打印出partition的值的话,可能会看到peoducer发送并不一定会按照1、2、3、4、5这样的顺序发送,这个从上面代码中能看出最终返回的不是part的值,而是availablePartitions.get(part).partition()的值,而availablePartitions里的PartitionInfo的顺序本身就不一定是严格按照顺序排列的。在org.apache.kafka.clients.producer.ProducerConfig类中可以看到kafkaproducer的配置以及默认值,其中有这么一行:.define(PARTITIONER_CLASS_CONFIG,Type.CLASS,DefaultPartitioner.class.getName(),Importance.MEDIUM, PARTITIONER_CLASS_DOC)其中静态变量 PARTITIONER_CLASS_CONFIG的值是”partitioner.class”,可以看出其默认对应的类是 DefaultPartitioner.class,当然,这里可以自定义partition的分发策略,只要实现Partitioner接口就好,例如下面代码实现了随机选择partition。public class MyPartitioner implements Partitioner{@Overridepublic void configure(Map<String, ?> configs) {// TODO Auto-generated method stub}@Overridepublic int partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();int randomNum = new Random().nextInt(numPartitions);return partitions.get(randomNum).partition();}@Overridepublic void close() {// TODO Auto-generated method stub}}

 


http://chatgpt.dhexx.cn/article/kOiAcnEN.shtml

相关文章

kafka命令行使用

kafka使用 查看Kafka当前的主题列表创建一个主题查看主题信息修改分区信息删除一个主题生成者推送消息消费者接收查看分组信息查看特定consumer group 详情 基于Kafka 2.13版本的操作 查看Kafka当前的主题列表 ./kafka-topics.sh --zookeeper 127.0.0.1:2181 --list创建一个主…

简单的kafka命令行操作

目录 一、主题topic命令行操作 1.查看操作主题的命令参数 2.连接kafka地址&#xff0c;创建名为kaf的主题&#xff0c;指定分区和副本数量 3.查看所有主题的名称 4.查看主题的详细信息 5.修改主题&#xff08;修改分区数&#xff09; 二、生产者命令行操作 1.查看操作生…

【kafka】三、kafka命令行操作

kafka命令行操作 kafka的相关操作命令脚本文件在bin目录下 查看所有的topic kafka-topics.sh --zookeeper hll1:2181 --list 或 kafka-topics.sh --zookeeper 192.168.171.132:2181 --listkafka-topics.sh&#xff1a;topic执行脚本 --zookeeper hll1:2181&#xff1a;需要的…

Kafka的命令行操作

一、topic命令 下面Windows命令需要把cmd路径切换到bin/windows下。 而Linux命令只需要在控制台切换到bin目录下即可。 下面都以Windows下的操作为例&#xff0c;在Linux下也是一样的。 1.1 查看主题命令的参数 kafka-topics.bat # Windows kafka-topics.sh # Linux输…

Kafka 命令行操作

1&#xff09;查看当前服务器中的所有 topic bin/kafka-topics.sh --zookeeper backup01:2181 使用命令 bin/kafka-topics.sh --list 报异常&#xff0c;提示必须依赖zookeeper 前面我们就讲过kafka是依赖于zookeeper 连上zookeeper什么都没有输出&#xff0c;因为我们什…

kafka命令行操作大全

最近利用flink使用一个流式SQL处理平台&#xff0c;利用kafka, mysql, hive等组件比较多&#xff0c;命令行突然间需要操作一次记不住命令很麻烦&#xff0c;索性直接整理成笔记。 在 0.9.0.0 之后的 Kafka&#xff0c;出现了几个新变动&#xff0c;一个是在 Server 端增加了…

Kafka命令大全

kafka 脚本 connect-distributed.sh connect-mirror-maker.sh connect-standalone.sh kafka-acls.sh kafka-broker-api-versions.sh kafka-configs.sh kafka-console-consumer.sh kafka-console-producer.sh kafka-consumer-groups.sh kafka-consumer-perf-test.sh kafka-dele…

Kafka常用命令行命令

文章目录 Kafka常用命令kafka的基本操作&#xff08;命令行操作&#xff09;1.启动集群&#xff1a;2.查看当前服务器中的所有topic&#xff08;在kafka目录下&#xff09;3.创建主题topic&#xff08;在kafka目录下&#xff09;4.删除topic&#xff08;在kafka目录下&#xff…

美国Stripe支付Android端集成流程

上家公司想要拓展自己在新加坡的市场,打算做一个新加坡本地的生活服务应用,其中少不了的就是支付了。国外支付这块一直是个头疼的问题。想用Google Wallet吧,但它是采用NFC接触式交易,想要进行线上服务时没法进行,后来就去整个贝宝PayPal支付。在这里想吐槽一下,PayPal支付做起…

zencart1.55手把手教你开发stripe支付插件

第一步&#xff1a;在includes/modules/payment目录下创建名称为c_stripe的文件夹&#xff0c;用于存放stripe支付logo 第二步&#xff1a;在同includes/modules/payment目录下创建c_stripe.php文件&#xff0c;这个文件就是用于编写zencart支付插件,代码如下 <?php // /…

JAVA接入STRIPE支付教程(测试环境),STRIPE支付的调用以及STRIPE WEBHOOK回调

一、环境准备 1.注册 2.密钥 3.WEBHOOK回调 二、核心代码 1.配置API.key以及webhook.key 2.支付demo 3.WEBHOOK回调 一、环境准备 1.注册 STRIPE官网自行注册账号 2.两个重要的密钥 首先在STRIPE官网注册账号之后进入首页&#xff0c;点击API密钥&#xff0c;查看账号对应的…

前端对接stripe支付,创建测试session_is

第一次搞 stripe支付&#xff0c;国外的文档全英文 接stripe支付&#xff0c;根据官方文档&#xff0c;首先就是先跟服务端交互&#xff0c;创建session会话&#xff0c;获取id&#xff0c;当服务端不做这个功能时&#xff0c;就需要前端去掉stripe最底层的api&#xff0c;拿到…

Stripe支付简介和前端js调用

最近公司正在做一个国际版APP&#xff0c;涉及到海外支付&#xff0c;调研过Paypal、Skrill、BrainTree、Stripe&#xff08;可参考海外移动支付方案对比&#xff09;&#xff0c;最终 选择了Stripe支付。Stripe特点如下&#xff1a; 收费规则简单透明&#xff0c;手续费就是收…

laravel 对接stripe支付

参考文档 &#xff1a; stripe文档 stripe/stripe-php stripe api 文档 目录 一 获取关键参数二 安装Stripe库三 代码示例 一 获取关键参数 SCRIPE_SECRET_KEY &#xff08;调用api秘钥&#xff09; NOTIFY_SIGN (签名 支付回调使用) 二 安装Stripe库 # Install the PHP libr…

Stripe支付配置

开通支付 首先&#xff0c;你需要在 Stripe 官网开通你自己的支付账号信息&#xff1a; https://stripe.com/ 注册好以后&#xff0c;你即可获取 Stripe 的密钥信息&#xff1a; 密钥主要包含两部分&#xff0c;可发布的密钥 密钥 同时&#xff0c;你需要找到你交易的对应的货…

php实现Stripe支付 | ecshop stripe支付

Stripe支付 &#xff1a;Stripe Login | Sign in to the Stripe Dashboard 1. 安装Stripe&#xff1a; composer require stripe/stripe-php 2. 获取密钥&#xff1a;https://dashboard.stripe.com/test/apikeys 3. 创建产品&#xff1a;Stripe Login | Sign in to the Stri…

php实现Stripe支付

Stripe支付 &#xff1a;https://dashboard.stripe.com/dashboard 1. 安装Stripe&#xff1a; composer require stripe/stripe-php 2. 获取密钥&#xff1a; https://dashboard.stripe.com/test/apikeys 3. 创建产品&#xff1a; https://dashboard.stripe.com/test/product…

stripe 支付

stripe 支付整理 1、创建账号 官方网址 中文版 https://stripe.com/zh-cn-us/payments 2、激活你的账号 填写信息只支持国外与香港的哦 3、开发者秘钥 如果不激活的话&#xff0c;只能用测试api秘钥 4、配置你的回调地址 配置秘钥&#xff0c;选择webhook事件 事件一定要选择…

java 对接 stripe支付

stripe 支付跟国内的 支付宝 、微信、等第三方支付平台不一样 码字不易&#xff0c;开源更不易&#xff0c;点赞收藏关注&#xff0c;多多支持 开源地址 https://gitee.com/J-LJJ/stripe-demo 支付方式一 先看效果 支付方式2&#xff08;需要配合回调&#xff09; 2023-04…

Stripe支付流程简要描述

在国外&#xff0c;除了Paypal支付之外&#xff0c;Stripe支付也占有很大一部分市场份额&#xff0c;Stripe支付官网 https://stripe.com/ 下面简单介绍一下Stripe的支付流程。 1、用户页面输入充值金额&#xff0c;点击确定跳转到支付页面&#xff08;页面的样式由stripe提供…