我有两个Kafka消费者ConsumerA和ConsumerB.我想在同一台机器上独立运行这两个kafka消费者.它们之间根本没有关系.这两个kafka消费者将在同一台机器上处理不同的主题.
>每个消费者都应该有一个不同的Properties对象.
>每个消费者应该具有不同的线程池配置,因为它们可以以多线程方式(消费者组)运行,如果需要独立于其他消费者.
以下是我的设计:
消费者类(摘要):
public abstract class Consumer implements Runnable { private final Properties consumerProps; private final String consumerName; public Consumer(String consumerName,Properties consumerProps) { this.consumerName = consumerName; this.consumerProps = consumerProps; } protected abstract void shutdown(); protected abstract void run(String consumerName,Properties consumerProps); @Override public final void run() { run(consumerName,consumerProps); } }
消费者类:
public class ConsumerA extends Consumer { private final AtomicBoolean closed = new AtomicBoolean(false); private KafkaConsumer<byte[],byte[]> consumer; public ConsumerA(String consumerName,Properties consumerProps) { super(consumerName,consumerProps); } @Override public void shutdown() { closed.set(true); consumer.wakeup(); } @Override protected void run(String consumerName,Properties consumerProps) { consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(getTopicsBasisOnConsumerName()); Map<String,Object> config = new HashMap<>(); config.put(Config.URLS,TEST_URL); GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config); try { while (!closed.get()) { ConsumerRecords<byte[],byte[]> records = consumer.poll(Long.MAX_VALUE); for (ConsumerRecord<byte[],byte[]> record : records) { GenericRecord payload = decoder.decode(record.value()); // extract data from payload System.out.println("topic = %s,partition = %s,offset = %d,customer = %s,country = %s\n",record.topic(),record.partition(),record.offset(),record.key(),record.value()); } consumer.commitAsync(); } } catch (WakeupException ex) { // Ignore exception if closing System.out.println("error= ",ex); if (!closed.get()) throw e; } catch (Exception ex) { System.out.println("error= ",ex); } finally { try { consumer.commitSync(); } finally { consumer.close(); } } } }
ConsumerA B类:
// similar to `ConsumerA` but with specific details of B
ConsumerHandler类:
public final class ConsumerHandler { private final ExecutorService executorServiceConsumer; private final Consumer consumer; private final List<Consumer> consumers = new ArrayList<>(); public ConsumerHandler(Consumer consumer,int poolSize) { this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize); this.consumer = consumer; for (int i = 0; i < poolSize; i++) { this.consumers.add(consumer); executorServiceConsumer.submit(consumer); } } public void shutdown() { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { for (Consumer consumer : consumers) { consumer.shutdown(); } executorServiceConsumer.shutdown(); try { executorServiceConsumer.awaitTermination(1000,TimeUnit.MILLISECONDS); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } }); } }
下面是我的一个项目中的主要课程,如果我启动我的服务器,呼叫将首先自动进入,并从这个地方开始我执行我的ConsumerA和ConsumerB的所有kafka消费者.一旦调用shutdown,我就通过在所有Kafka消费者上调用shutdown来释放所有资源.
import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.inject.Singleton; @Singleton @DependencyInjectionInitializer public class Initializer { private ConsumerHandler consumerHandlerA; private ConsumerHandler consumerHandlerB; @PostConstruct public void init() { consumerHandlerA = new ConsumerHandler (new ConsumerA("consumerA",getConsumerPropsA()),3); consumerHandlerB = new ConsumerHandler (new ConsumerB("consumerB",getConsumerPropsB()),3); } @PreDestroy public void shutdown() { consumerHandlerA.shutdown(); consumerHandlerB.shutdown(); } }
对于我想在同一个盒子上运行多个kafka消费者的这类问题,这是正确的设计吗?如果有更好更有效的方法来解决这个问题,请告诉我.一般来说,我将在同一个盒子上运行最多三个或四个Kafka消费者,并且每个消费者可以根据需要拥有自己的消费者群体.
这是我在消费者中使用的KafkaConsumer的Javadoc.基于这个article,我已经创建了我的消费者,只是我使用了抽象类来扩展它.在该链接中搜索“全部放在一起”.
在文档中提到消费者不是线程安全的,但看起来我的代码正在为池中的每个线程重用相同的消费者实例.
public ConsumerHandler(Consumer consumer,int poolSize) { this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize); this.consumer = consumer; for (int i = 0; i < poolSize; i++) { this.consumers.add(consumer); executorServiceConsumer.submit(consumer); } }