消费者读流程
】每个consumer都可以根据分配策略(默认RangeAssignor),获得要消费的分区
】 获取到consumer对应的offset(默认从ZK中获取上一次消费的offset)
】 找到该分区的leader,拉取数据
】 消费者提交offset
消费者实例代码
public class KafkaConsumerClient {public static final String brokerList = "localhost:9092";public static final String topic = "topic-demo";public static final String groupId = "group.demo";public static final AtomicBoolean isRunning = new AtomicBoolean(true);public static Properties initConfig(){Properties props = new Properties();props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("bootstrap.servers", brokerList);props.put("group.id", groupId);props.put("client.id", "consumer.client.id.demo");return props;}public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));try {while (isRunning.get()) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println("topic = " + record.topic() + ", partition = "+ record.partition() + ", offset = " + record.offset());System.out.println("key = " + record.key()+ ", value = " + record.value());//do something to process record.}}} catch (Exception e) {log.error("occur exception ", e);} finally {consumer.close();}}
}
订阅主题和分区
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
public void subscribe(Collection<String> topics)
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)
消费者消息的类型对象
public class ConsumerRecord<K, V> {private final String topic;private final int partition;private final long offset;private final long timestamp;private final TimestampType timestampType;private final int serializedKeySize;private final int serializedValueSize;private final Headers headers;private final K key;private final V value;private volatile Long checksum;}
位移自动提交,拉取的一批消息没有消费完成发生异常会丢失消息;
手动提交,拉取的一批消息没有消费完成发生异常,最前面的消息重新消费的时候会重复消费。
消费者分区分配策略
1.Range范围分配策略:
配置
配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RangeAssignor。
算法公式:
n = 分区数量 / 消费者数量
m = 分区数量 % 消费者数量
前m个消费者消费n+1个,剩余消费者消费n个
2.RoundRobin轮询策略
RoundRobinAssignor轮询策略是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序(topic和分区的hashcode进行排序),然后通过轮询方式逐个将分区以此分配给每个消费者。
3.Stricky粘性分配策略
1.分区分配尽可能均匀;
2.在发生rebalance的时候,分区的分配尽可能与上一次分配保持相同;
没有发生rebalance时,Striky粘性分配策略和RoundRobin分配策略类似,发生rebalance时,尽量与上一次分配的保持一致。