问题描述
我认为您的问题出在auto.offset.reset属性上。当新使用者从分区读取并且没有先前的已提交偏移量时,将使用auto.offset.reset属性来确定起始偏移量。如果将其设置为“最大”(默认),则从最新(最后)消息开始阅读。如果将其设置为“最小”,则会收到第一条可用消息。
因此添加:
properties.put("auto.offset.reset", "smallest");
然后再试一次。
“最小”和“最大”在不久前已被弃用。您现在应该使用“最早”或“最新”。有任何疑问,请检查文档
解决方法
我是一名学习Kafka的新学生,在了解多个消费者(到目前为止,文章,文档等对他们没有太大帮助)方面,我遇到了一些基本问题。
我尝试做的一件事是编写我自己的高级Kafka生产者和消费者并同时运行它们,将100条简单消息发布到某个主题,然后让消费者检索它们。我已经成功地做到了这一点,但是当我尝试引入另一个使用者来使用与刚刚发布消息的主题相同的主题时,它没有收到消息。
据我了解,对于每个主题,您可以拥有来自不同消费者组的消费者,并且这些消费者组中的每一个都将获得针对某个主题生成的消息的完整副本。这样对吗?如果没有,对我来说建立多个消费者的正确方法是什么?到目前为止,这是我编写的消费者类:
public class AlternateConsumer extends Thread {
private final KafkaConsumer<Integer,String> consumer;
private final String topic;
private final Boolean isAsync = false;
public AlternateConsumer(String topic,String consumerGroup) {
Properties properties = new Properties();
properties.put("bootstrap.servers","localhost:9092");
properties.put("group.id",consumerGroup);
properties.put("partition.assignment.strategy","roundrobin");
properties.put("enable.auto.commit","true");
properties.put("auto.commit.interval.ms","1000");
properties.put("session.timeout.ms","30000");
properties.put("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<Integer,String>(properties);
consumer.subscribe(topic);
this.topic = topic;
}
public void run() {
while (true) {
ConsumerRecords<Integer,String> records = consumer.poll(0);
for (ConsumerRecord<Integer,String> record : records) {
System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
}
}
}
}
此外,我注意到最初我只是在单个分区上测试主题“ test”的上述消耗。当我将另一个消费者添加到现有的消费者组(例如“
testGroup”)时,这触发了Kafka重新平衡,从而使我的消费延迟显着降低了几秒钟。我以为这是重新平衡的问题,因为我只有一个分区,但是当我创建一个新的主题“
multipartpartitions”(例如6个分区)时,出现了类似的问题,即向同一消费者组添加更多消费者会导致延迟问题。我环顾四周,人们告诉我,我应该使用多线程使用者-
有人可以阐明这一点吗?