我开始在我的项目中使用spring-integration-kafka,我可以生成和使用来自Kafka的消息.但是现在,我希望向特定分区生成消息,并且还消耗来自特定分区的消息.
示例我想生成到分区3的消息,而消费只会从分区3接收消息.
到目前为止,我的主题有8个分区,我可以向特定分区生成消息,但我还没有找到配置消费者的方法,只接收来自特定分区的消息.
所以关于如何使用spring-integration-kafka配置使用者的任何建议,或者其他任何需要与KafkaConsumer.java类配合使用的建议都可以从特定分区接收消息.
谢谢.
这是我的代码:
kafka-producer-context.xml
Metadata.refresh.interval.ms">3600000required.acks">1
KafkaProducer.java
public class KafkaProducer {
private static final Logger logger = LoggerFactory
.getLogger(KafkaProducer.class);
@Autowired
private MessageChannel inputToKafka;
public void sendMessage(String message) {
try {
inputToKafka.send(MessageBuilder.withPayload(message)
.setHeader(KafkaHeaders.TOPIC,"testTopic")
.setHeader(KafkaHeaders.PARTITION_ID,3).build());
} catch (Exception e) {
logger.error(String.format(
"Failed to send [ %s ] to topic %s ",message,topic),e);
}
}
}
kafka-consumer-context.xml
factorybean">
KafkaConsumer.java
public class KafkaConsumer {
private static final Logger log = LoggerFactory
.getLogger(KafkaConsumer.class);
@Autowired
KafkaService kafkaService;
public void processMessage(MapFailed to process message %s",message));
}
}
}
}
}
}
所以我的问题就在这里.当我向分区3或任何分区生成消息时,KafkaConsumer始终收到消息.我想要的只是:KafkaConsumer只接收来自分区3的消息,而不是来自其他分区的消息.
再次感谢.
最佳答案
你需要使用message-driven-channel-adapter.
As a variant,the KafkaMessageListenerContainer can accept org.springframework.integration.kafka.core.Partition array argument to specify topics and their partitions pair.
您需要使用this constructor连接侦听器容器,并使用listener-container属性将其提供给适配器.
我们将通过示例更新自述文件.