java – 如果没有收到确认,如何设计发送记录并重试发送的系统?

前端之家收集整理的这篇文章主要介绍了java – 如果没有收到确认,如何设计发送记录并重试发送的系统?前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我正在开展一个项目,我需要消耗大量的记录,然后我将这些记录发送到使用ZeroMQ的其他系统.

这是流程:

>将所有传入的记录存储在多个线程的CHM中.记录将以非常高的速度进行.
>从每1分钟运行的后台线程将这些记录从CHM发送到ZeroMQ服务器.
>将每个记录发送到ZeroMQ服务器后,将它们添加到重试桶中,以便在特定时间过后,如果尚未收到此记录的确认,则可以重试.
>我们还有一个poller可运行的线程,它收到来自ZeroMQ服务器的确认,告诉这些记录已经收到,所以一旦我得到确认,我从重试桶中删除该记录,以便它不会被重试.
>即使有一些记录发送两次,没关系,但最好还是这样做.

我不知道在我的下面的情况下最好的方法是最小化.

下面是我的Processor类,其中一个.add()方法将被多个线程调用,以线程安全的方式填充dataHolderByPartitionReference CHM.然后,在Processor类的构造函数中,通过调用SendToZeroMQ类,如下所示,启动每隔30秒运行的后台线程,将记录从同一个CHM推送到一组ZeroMQ服务器.

处理器

public class Processor {
  private final ScheduledExecutorService executorService = Executors
      .newSingleThreadScheduledExecutor();
  private final AtomicReference<ConcurrentHashMap<Integer,ConcurrentLinkedQueue<DataHolder>>> dataHolderByPartitionReference =
      new AtomicReference<>(new ConcurrentHashMap<Integer,ConcurrentLinkedQueue<DataHolder>>());

  private static class Holder {
    private static final Processor INSTANCE = new Processor();
  }

  public static Processor getInstance() {
    return Holder.INSTANCE;
  }

  private Processor() {
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        validateAndSendAllPartitions(dataHolderByPartitionReference
            .getAndSet(new ConcurrentHashMap<Integer,ConcurrentLinkedQueue<DataHolder>>()));
      }
    },30,TimeUnit.SECONDS);
  }

  private void validateAndSendAllPartitions(
      ConcurrentHashMap<Integer,ConcurrentLinkedQueue<DataHolder>> dataHolderByPartition) {
        // calling validateAndSend in parallel for each partition (which is map key)
        // generally there will be only 5-6 unique partitions max
  }

  private void validateAndSend(final int partition,final ConcurrentLinkedQueue<DataHolder> dataHolders) {
    Map<byte[],byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
    int totalSize = 0;
    while (!dataHolders.isEmpty()) {
        .........
        .........
        SendToZeroMQ.getInstance().executeAsync(partition,clientKeyBytesAndProcessBytesHolder);
    }
    // calling again with remaining values
    SendToZeroMQ.getInstance().executeAsync(partition,clientKeyBytesAndProcessBytesHolder);
  }

  // called by multiple threads to populate dataHolderByPartitionReference CHM
  public void add(final int partition,final DataHolder holder) {
    // store records in dataHolderByPartitionReference in a thread safe way
  }
}

以下是我的SendToZeroMQ类,它将一条记录发送到一组ZeroMQ服务器,并根据确认传递相应地重试.

>首先,它将向ZeroMQ服务器发送一条记录.
>然后它将添加一个相同的记录retryBucket,这将稍后重试,取决于是否收到确认.
>在同一个类中,我启动了一个后台线程,每1分钟运行一次,再次发送记录,这些记录仍然在重试桶中.
>同样的类也启动ResponsePoller线程,它将永远保持运行,看看哪些记录已被确认(我们之前发送过的),所以一旦记录得到确认,ResponsePoller线程就会从retryBucket中删除这些记录,这样不要重试

SendToZeroMQ

public class SendToZeroMQ {
  // do I need these two ScheduledExecutorService or one is sufficient to start my both the thread?
  private final ScheduledExecutorService executorServicePoller = Executors
      .newSingleThreadScheduledExecutor();
  private final ScheduledExecutorService executorService = Executors
      .newSingleThreadScheduledExecutor();
  private final Cache<Long,byte[]> retryBucket = CacheBuilder.newBuilder().maximumSize(10000000)
      .removalListener(RemovalListeners.asynchronous(new CustomListener(),executorService))
      .build();

  private static class Holder {
    private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();
  }

  public static SendToZeroMQ getInstance() {
    return Holder.INSTANCE;
  }

  private SendToZeroMQ() {
    executorServicePoller.submit(new ResponsePoller());
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        for (Entry<Long,byte[]> entry : retryBucket.asMap().entrySet()) {
          executeAsync(entry.getKey(),entry.getValue());
        }
      }
    },1,TimeUnit.MINUTES);
  }

  public boolean executeAsync(final long address,final byte[] encodedByteArray) {
    Optional<ZMQObj> liveSockets = PoolManager.getInstance().getNextSocket();
    if (!liveSockets.isPresent()) {
      return false;
    }
    return executeAsync(address,encodedByteArray,liveSockets.get().getSocket());
  }

  public boolean executeAsync(final long address,final byte[] encodedByteArray,final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(encodedByteArray);
    boolean sent = msg.send(socket);
    msg.destroy();
    // add to retry bucket
    retryBucket.put(address,encodedByteArray);
    return sent;
  }

  public boolean executeAsync(final int partition,final Map<byte[],byte[]> clientKeyBytesAndProcessBytesHolder) {
    Optional<ZMQObj> liveSockets = PoolManager.getInstance().getNextSocket();
    if (!liveSockets.isPresent()) {
      return false;
    }         
    Map<Long,byte[]> addressToencodedByteArray = encode(partition,clientKeyBytesAndProcessBytesHolder);
    long address = addressToencodedByteArray.entrySet().iterator().next().getKey();
    byte[] encodedByteArray = addressToencodedByteArray.entrySet().iterator().next().getValue();
    return executeAsync(address,liveSockets.get().getSocket());
  }

  private Map<Long,byte[]> encode(final int partition,byte[]> clientKeyBytesAndProcessBytesHolder) {

    // this address will be unique always
    long address = TestUtils.getAddress();
    Frame frame = new Frame(............);
    byte[] packedByteArray = frame.serialize();
    // this map will always have one entry in it.
    return ImmutableMap.of(address,packedByteArray);
  }

  public void removeFromRetryBucket(final long address) {
    retryBucket.invalidate(address);
  }
}

下面是我的ResponsePoller类,它等待所有这些记录的确认,这些记录已经被其他后台线程发送了.如果接收到确认,则从重试桶中删除确认信息,以免重新尝试.

public class ResponsePoller implements Runnable {
  private static final Random random = new Random();
  private static final int listenerPort = 8076;

  @Override
  public void run() {
    ZContext ctx = new ZContext();
    Socket client = ctx.createSocket(ZMQ.PULL);

    // Set random identity to make tracing easier
    String identity = String.format("%04X-%04X",random.nextInt(),random.nextInt());
    client.setIdentity(identity.getBytes(ZMQ.CHARSET));
    client.bind("tcp://" + TestUtils.getIPAddress() + ":" + listenerPort);

    PollItem[] items = new PollItem[] {new PollItem(client,Poller.POLLIN)};

    while (!Thread.currentThread().isInterrupted()) {
      // Tick once per second,pulling in arriving messages
      for (int centitick = 0; centitick < 100; centitick++) {
        ZMQ.poll(items,10);
        if (items[0].isReadable()) {
          ZMsg msg = ZMsg.recvMsg(client);
          Iterator<ZFrame> it = msg.iterator();
          while (it.hasNext()) {
            ZFrame frame = it.next();
            try {
              long address = TestUtils.getAddress(frame.getData());
              // remove from retry bucket since we got the acknowledgment for this record
              SendToZeroMQ.getInstance().removeFromRetryBucket(address);
            } catch (Exception ex) {
              // log error
            } finally {
              frame.destroy();
            }
          }
          msg.destroy();
        }
      }
    }
    ctx.destroy();
  }
}

题:

>我从设计的角度来看,设计这个问题的最好办法是什么,所以我所有的逻辑都能无缝地运作?
>我相当肯定有一个更好的方法来设计这个问题,而不是我有什么 – 更好的方法可以是什么?

解决方法

在我看来,只要您使用TCP进行底层通信,就不用担心“应用层”中的数据接收确认.

在这种情况下 – 由于ZeroMQ建立在TCP本身之上,进一步的优化,您不必担心成功的数据传输,只要传输层没有异常(这显然会反弹到您处理案例).

我看到你的问题的方式是 – 你正在运行Kafka消费者线程,它将接收和反弹消息到另一个消息队列(在这种情况下是ZMQ,它正在使用TCP并保证成功的消息传递,或者在较低级别引发异常通信层).

我可以想到的最简单的解决方案是使用线程池,从每个消费者内部,并尝试使用ZMQ发送消息.在任何网络错误的情况下,只要您的应用程序守护程序正在运行,您可以轻松地将该消息集中在以后的消费或日志记录中.

在提出的解决方案中,我假设消息的顺序不在问题空间中.你不是在看复杂的事情.

猜你在找的Java相关文章