spring-integration-kafka配置使用者从指定分区接收消息

前端之家收集整理的这篇文章主要介绍了spring-integration-kafka配置使用者从指定分区接收消息前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

我开始在我的项目中使用spring-integration-kafka,我可以生成和使用来自Kafka的消息.但是现在,我希望向特定分区生成消息,并且还消耗来自特定分区的消息.

示例我想生成到分区3的消息,而消费只会从分区3接收消息.

到目前为止,我的主题有8个分区,我可以向特定分区生成消息,但我还没有找到配置消费者的方法,只接收来自特定分区的消息.

所以关于如何使用spring-integration-kafka配置使用者的任何建议,或者其他任何需要与KafkaConsumer.java类配合使用的建议都可以从特定分区接收消息.

谢谢.

这是我的代码

kafka-producer-context.xml

Metadata.refresh.interval.ms">3600000required.acks">1

KafkaProducer.java

public class KafkaProducer {

private static final Logger logger = LoggerFactory
        .getLogger(KafkaProducer.class);

@Autowired
private MessageChannel inputToKafka;

public void sendMessage(String message) {

    try {
        inputToKafka.send(MessageBuilder.withPayload(message)
                    .setHeader(KafkaHeaders.TOPIC,"testTopic")
                    .setHeader(KafkaHeaders.PARTITION_ID,3).build());
    } catch (Exception e) {
        logger.error(String.format(
                "Failed to send [ %s ] to topic %s ",message,topic),e);
    }
}

}

kafka-consumer-context.xml

factorybean">
    

KafkaConsumer.java

public class KafkaConsumer {

private static final Logger log = LoggerFactory
        .getLogger(KafkaConsumer.class);

@Autowired
KafkaService kafkaService;

public void processMessage(MapFailed to process message %s",message));
                }
            }
        }

    }
}
}

所以我的问题就在这里.当我向分区3或任何分区生成消息时,KafkaConsumer始终收到消息.我想要的只是:KafkaConsumer只接收来自分区3的消息,而不是来自其他分区的消息.

再次感谢.

最佳答案
你需要使用message-driven-channel-adapter.

As a variant,the KafkaMessageListenerContainer can accept org.springframework.integration.kafka.core.Partition array argument to specify topics and their partitions pair.

您需要使用this constructor连接侦听器容器,并使用listener-container属性将其提供给适配器.

我们将通过示例更新自述文件.

猜你在找的Spring相关文章