我正在尝试将kafka作为AWS SQS的替代品.动机主要是提高性能,其中kafka将消除限制,一次性提取10条消息,上限为256kb.这是我的用例的高级场景.我有一堆爬虫正在发送索引文件.有效载荷的大小平均约为1 MB.爬虫调用SOAP端点,后者又运行生产者代码以将消息提交给kafka队列.消费者应用程序接收消息并处理它们.对于我的测试框,我已经为主题配置了30个分区和2个复制.两个kafka实例正在运行1个zookeeper实例.卡夫卡版本是0.10.0.
对于我的测试,我在队列中发布了700万条消息.我创建了一个包含30个消费者线程的消费者组,每个分区一个.我最初的印象是,与我通过SQS获得的相比,这将大大加快处理能力.不幸的是,事实并非如此.就我而言,数据处理很复杂,平均需要1-2分钟才能完成.这导致了一系列的分区重新平衡,因为线程无法按时心跳.我可以在日志引用中看到一堆消息
Auto offset commit Failed for group full_group: Commit cannot be
completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between
subsequent calls to poll() was longer than the configured
session.timeout.ms,which typically implies that the poll loop is
spending too much time message processing. You can address this either
by increasing the session timeout or by reducing the maximum size of
batches returned in the poll() with max.poll.records.
这导致多次处理相同的消息.我尝试使用会话超时,max.poll.records和轮询时间来避免这种情况,但这会减慢整个处理时间.这是一些配置参数.
Metadata.max.age.ms = 300000 max.partition.fetch.bytes = 1048576 bootstrap.servers = [kafkahost1:9092,kafkahost2:9092] enable.auto.commit = true max.poll.records = 10000 request.timeout.ms = 310000 heartbeat.interval.ms = 100000 auto.commit.interval.ms = 1000 receive.buffer.bytes = 65536 fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class com.autodesk.preprocessor.consumer.serializer.KryoObjectSerializer group.id = full_group retry.backoff.ms = 100 fetch.max.wait.ms = 500 connections.max.idle.ms = 540000 session.timeout.ms = 300000 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer metrics.sample.window.ms = 30000 auto.offset.reset = latest
我将消费者轮询时间减少到100毫秒.它减少了重新平衡问题,消除了重复处理,但显着减慢了整个过程.与使用基于SQS的解决方案的25小时相比,最终花了35个小时来完成所有600万条消息的处理.每个消费者线程平均每次轮询检索50-60条消息,尽管其中一些有时会轮询0条记录.当分区中有大量消息时,我不确定这种行为.同一个线程能够在后续迭代期间获取消息.这可能是由于重新平衡?
这是我的消费者代码
while (true) { try{ ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { if(record.value()!=null){ TextAnalysisRequest textAnalysisObj = record.value(); if(textAnalysisObj!=null){ // Process record PreProcessorUtil.submitPostProcessRequest(textAnalysisObj); } } } }catch(Exception ex){ LOGGER.error("Error in Full Consumer group worker",ex); }
我理解记录处理部分是我的一个瓶颈.但我相信这里的一些人有类似处理大处理时间的用例.我想通过旋转其专用线程中的每个处理器或使用大容量的线程池来进行异步处理,但不确定它是否会在系统中产生很大的负载.与此同时,我看到过一些人们使用暂停和恢复API来执行处理以避免重新平衡问题的情况.
在这种情况下,我真的在寻找一些建议/最佳实践.特别是,推荐的配置设置围绕听力,请求超时,最大轮询记录,自动提交间隔,轮询间隔等,如果kafka不是我的用例的正确工具,请让我知道.
private final BlockingQueue
在阅读帖子中:
while (true) {
try{
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
if(record.value()!=null){
TextAnalysisRequest textAnalysisObj = record.value();
if(textAnalysisObj!=null){
// Process record
requests.offer(textAnalysisObj);
}
}
}
}
catch(Exception ex){
LOGGER.error("Error in Full Consumer group worker",ex);
}
在处理线程中:
while (!Thread.currentThread().isInterrupted()) {
try {
TextAnalysisRequest textAnalysisObj = requests.take();
PreProcessorUtil.submitPostProcessRequest(textAnalysisObj);
} catch (InterruptedException e) {
LOGGER.info("Process thread interrupted",e);
Thread.currentThread().interrupt();
} catch (Throwable t) {
LOGGER.warn("Unexpected throwable while processing.",t);
}
}
另请参阅此文档,了解通过Kafka发送大型邮件的策略:http://blog.cloudera.com/blog/2015/07/deploying-apache-kafka-a-practical-faq/
简而言之,它表示Kafka在大约10K的小尺寸消息上表现最佳,如果您需要发送更大的消息,最好将它们放在网络存储上,然后通过Kafka发送它们的位置,或者拆分它们.