java – 如何取消AsyncRestTemplate HTTP请求,如果他们花费太多时间?

前端之家收集整理的这篇文章主要介绍了java – 如何取消AsyncRestTemplate HTTP请求,如果他们花费太多时间?前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
自从开始以来,我总是混淆如何处理InterruptedException以及如何正确地取消http请求,如果他们花费太多时间.我有一个图书馆,其中为我们的客户提供了两种方法,同步和异步.他们可以调用他们认为适合他们目的的方法.

> executeSync() – 等待,直到我有结果,返回结果.
> executeAsync() – 立即返回一个可以在其他事情完成之后处理的未来,如果需要的话.

他们将会传递其中具有用户ID和超时值的DataKey对象.我们将根据用户ID找出要调用哪台机器,然后使用该机器创建一个URL,我们将使用AsyncRestTemplate对该URL进行http呼叫,然后根据是否成功将响应发回给他们.

我正在使用AsyncRestTemplate的exchange方法,它返回一个ListenableFuture,并且我想要使用基于NIO的客户端连接的异步非阻塞体系结构,以便请求使用非阻塞IO,这就是为什么我使用AsyncRestTemplate.这个方法对我的问题定义是否正确?该图书馆将在非常重的负载下用于生产.

以下是我的界面:

public interface Client {
    // for synchronous
    public DataResponse executeSync(DataKey key);

    // for asynchronous
    public ListenableFuture<DataResponse> executeAsync(DataKey key);
}

下面是我实现的界面:

public class DataClient implements Client {

    // using spring 4 AsyncRestTemplate
    private final AsyncRestTemplate restTemplate = new AsyncRestTemplate();

    // for synchronous
    @Override
    public DataResponse executeSync(DataKey keys) {
        Future<DataResponse> responseFuture = executeAsync(keys);
        DataResponse response = null;

        try {
            response = responseFuture.get(keys.getTimeout(),TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
            // do we need to catch InterruptedException here and interrupt the thread?
            Thread.currentThread().interrupt();
            // also do I need throw this RuntimeException at all?
            throw new RuntimeException("Interrupted",ex);
        } catch (TimeoutException ex) {
            DataLogging.logEvents(ex,DataErrorEnum.CLIENT_TIMEOUT,keys);
            response = new DataResponse(null,DataStatusEnum.ERROR);
            responseFuture.cancel(true); // terminating the tasks that got timed out so that they don't take up the resources?
        } catch (Exception ex) {
            DataLogging.logEvents(ex,DataErrorEnum.ERROR_CLIENT,DataStatusEnum.ERROR);
        }

        return response;
    }

    // for asynchronous     
    @Override
    public ListenableFuture<DataResponse> executeAsync(final DataKey keys) {

        final SettableFuture<DataResponse> responseFuture = SettableFuture.create();
        final org.springframework.util.concurrent.ListenableFuture orig = 
            restTemplate.exchange(createURL(keys),HttpMethod.GET,keys.getEntity(),String.class);

        orig.addCallback(
                new ListenableFutureCallback<ResponseEntity<String>>() {
                    @Override
                    public void onSuccess(ResponseEntity<String> result) {
                        responseFuture.set(new DataResponse(result.getBody(),DataErrorEnum.OK,DataStatusEnum.SUCCESS));
                    }

                    @Override
                    public void onFailure(Throwable ex) {
                        DataLogging.logErrors(ex,DataErrorEnum.ERROR_SERVER,keys);
                        responseFuture.set(new DataResponse(null,DataStatusEnum.ERROR));
                    }
                });

        // propagate cancellation back to the original request
        responseFuture.addListener(new Runnable() {
          @Override public void run() {
             if (responseFuture.isCancelled()) {
               orig.cancel(false); // I am keeping this false for now
             }
          }
        },MoreExecutors.directExecutor());
        return responseFuture;
    }
}

客户将从他们的代码中这样调用

// if they are calling executeSync() method
DataResponse response = DataClientFactory.getInstance().executeSync(dataKey);

// and if they want to call executeAsync() method
Future<DataResponse> response = DataClientFactory.getInstance().executeAsync(dataKey);

现在的问题是 –

>如果http请求太长,我们可以中断AsyncRestTemplate调用吗?我实际上是在executeSync方法中的上述代码中呼吁取消我的未来,但我不知道如何验证它,以确保它正在做什么?我想将取消传播到原来的未来,以便我可以取消相应的http请求(我可能想要保存资源),这就是为什么我在我的executeAsync方法添加了一个监听器.我相信,我们不能中断RestTemplate调用,但不能确定AsyncRestTemplate是否可以做到这一点.如果说我们可以中断AsyncRestTemplate调用,那么我正在做一切正确的中断http呼叫?还是有更好的/更清洁的方法来做到这一点?或者我甚至需要担心用当前的设计以AsyncRestTemplate取消Http请求?

// propagate cancellation back to the original request
    responseFuture.addListener(new Runnable() {
      @Override public void run() {
         if (responseFuture.isCancelled()) {
           orig.cancel(false); // I am keeping this false for now
         }
      }
    },MoreExecutors.directExecutor());

使用当前设置,我可以看到它是抛出CancellationException的一些时间(不是每次) – 这是否意味着我的HTTP请求被取消了?
>还有我在executeSync方法中的InterruptedException的catch块中做正确的事情?如果没有,那么处理这个问题的正确方法是什么.在我的情况下,是否需要处理InterruptedException?
>这是真的,默认情况下AsyncRestTamplete使用阻塞调用和请求每个线程?如果是,那么有什么办法可以在我目前的设置中拥有基于NIO的客户端连接?

任何解释/代码建议将是非常有帮助的.

解决方法

首先,为什么要使用SettableFuture?为什么不能返回AsyncRestTemplate返回的ListenableFuture?
1. Can we interrupt AsyncRestTemplate call if http request is taking too long?

当然你做!你只需要调用Future.cancel方法.此方法将中断AsyncRestTemplate实际使用的内部RestTemplate的执行.

2. Also am I doing the right thing in catch block of InterruptedException in executeSync method?

正如Phil和Danilo所说,你不需要中断InterruptedException catch块中的当前线程.只要执行请求必须取消,您需要做的任何事情.

事实上,我建议您创建一个处理此行为的方法,像handleInterrupt,并且对于TimeoutException和InterruptedException使用此方法.

3. Is it true that by default AsyncRestTamplete uses blocking calls and request per thread?

是. AsyncRestTamplete的默认构造函数在内部使用SimpleClientHttpRequestFactory和SimpleAsyncTaskExecutor.

这个TaskExecutor总是为每个任务启动一个威胁,并且永远不会重用Threads,所以效率很低:

* TaskExecutor implementation that fires up a new Thread for each task,* executing it asynchronously.
 *
 * Supports limiting concurrent threads through the "concurrencyLimit"
 * bean property. By default,the number of concurrent threads is unlimited.
 *
 * NOTE: This implementation does not reuse threads! Consider a
 * thread-pooling TaskExecutor implementation instead,in particular for
 * executing a large number of short-lived tasks.
 *

我建议您使用AsyncRestTemplate的另一个配置.

您应该使用使用另一个TaskExecutor的AsyncRestTemplate的构造函数

public AsyncRestTemplate(AsyncListenableTaskExecutor taskExecutor)

例如:

AsyncRestTemplate template = new AsyncRestTemplate(new ConcurrentTaskExecutor(Executors.newCachedThreadPool()));

此ExecutorService(Executors.newCachedThreadPool())根据需要创建新线程,但在可用时将重新使用以前构造的线程.

或者甚至更好,可以使用另一个RequestFactory.例如,您可以使用HttpComponentsAsyncClientHttpRequestFactory,内部使用NIO,只需调用AsyncRestTemplate的正确构造函数

new AsyncRestTemplate(new HttpComponentsAsyncClientHttpRequestFactory())

不要忘记AsyncRestTemplate的内部行为将取决于您如何创建对象.

猜你在找的Java相关文章