java – Kafka – 使用高级消费者的延迟队列实现

前端之家收集整理的这篇文章主要介绍了java – Kafka – 使用高级消费者的延迟队列实现前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
想使用高级消费者api来实施延迟消费者

大意:

>通过键生成消息(每个msg包含创建时间戳),这样可以确保每个分区已按生成时间排序消息.
> auto.commit.enable = false(将在每个消息处理后显式提交)
>消费消息
>检查消息时间戳,并检查是否有足够的时间过去
>进程消息(此操作永远不会失败)
> commit 1 offset

while (it.hasNext()) {
  val msg = it.next().message()
  //checks timestamp in msg to see delay period exceeded
  while (!delayedPeriodPassed(msg)) { 
     waitSomeTime() //Thread.sleep or something....
  }
  //certain that the msg was delayed and can now be handled
  Try { process(msg) } //the msg process will never fail the consumer
  consumer.commitOffsets //commit each msg
}

对此实施有一些担忧:

>提交每个偏移可能会减慢ZK
可以让consumer.commitout丢弃异常吗?如果是,我将消费两次相同的消息(可以使用幂等消息解决)
>问题等待很长时间没有提交偏移量,例如延迟时间是24小时,将从迭代器获取下一个,睡眠24小时,进程和提交(ZK会话超时?)
> ZK会话如何在没有提交新的偏移量的情况下保持活动? (设置一个蜂巢zookeeper.session.timeout.ms可以在死消费者中解决而不识别它)
>任何其他的问题?

谢谢!

解决方法

有一种方法可以使用不同的主题来推送所有要延迟的消息.如果所有延迟的消息都应该在相同的时间延迟之后处理,这将是相当简单的:
while(it.hasNext()) {
    val message = it.next().message()

    if(shouldBeDelayed(message)) {
        val delay = 24 hours
        val delayTo = getCurrentTime() + delay
        putMessageOnDelayedQueue(message,delay,delayTo)
    }
    else {
       process(message)
    }

    consumer.commitOffset()
}

所有常规邮件现在都将尽快处理,而那些需要延迟的邮件将被放在另一个主题上.

好的是,我们知道延迟主题的头部的消息是应该首先处理的消息,因为它的delayTo值将是最小的.因此,我们可以设置另一个读取头信息的消费者,检查时间戳是否在过去,如果处理消息并提交偏移量.如果不是,它不会提交偏移量,而是直到那时睡觉:

while(it.hasNext()) {
    val delayedMessage = it.peek().message()
    if(delayedMessage.delayTo < getCurrentTime()) {
        val readMessage = it.next().message
        process(readMessage.originalMessage)
        consumer.commitOffset()
    } else {
        delayProcessingUntil(delayedMessage.delayTo)
    }
}

如果有不同的延迟时间,您可以对延迟进行分区(例如24小时,12小时,6小时).如果延迟时间比这更复杂,那么延迟时间就会更加动态.您可以通过引入两个延迟主题解决这个问题.读取所有消息关闭延迟主题A并处理所有延迟过去的消息.其他的你只是找到一个最接近的delayTo,然后把它们放在主题B上.睡觉,直到最近的一个应该被处理,并且完全相反,即处理来自主题B的消息,并将一次不应该被处理回主题A.

回答您的具体问题(有些已在您的问题的评论解决)

  1. commit each offset might slow ZK down

您可以考虑切换到Kafka中存储偏移量(可从0.8.2获取功能,检查消费者配置中的offsets.storage属性)

  1. can consumer.commitOffsets throw an exception? if yes i will consume the same message twice (can solve with idempotent messages)

我相信如果它不能与偏移量存储进行通信,例如.正如你所说,使用幂等信息解决了这个问题.

  1. problem waiting long time without committing the offset,for example delay period is 24 hours,will get next from iterator,sleep for 24 hours,process and commit (ZK session timeout ?)

这不会是上述概述的解决方案的问题,除非消息本身的处理超过会话超时.

  1. how can ZK session keep-alive without commit new offsets ? (setting a hive zookeeper.session.timeout.ms can resolve in dead consumer without recognising it)

再次用上面的方法,你不需要设置一个很长的会话超时.

  1. any other problems im missing?

总是有)

猜你在找的Java相关文章