> executeSync() – 等待,直到我有结果,返回结果.
> executeAsync() – 立即返回一个可以在其他事情完成之后处理的未来,如果需要的话.
public interface Client { // for synchronous public DataResponse executeSync(DataKey key); // for asynchronous public ListenableFuture<DataResponse> executeAsync(DataKey key); }
public class DataClient implements Client { // using spring 4 AsyncRestTemplate private final AsyncRestTemplate restTemplate = new AsyncRestTemplate(); // for synchronous @Override public DataResponse executeSync(DataKey keys) { Future<DataResponse> responseFuture = executeAsync(keys); DataResponse response = null; try { response = responseFuture.get(keys.getTimeout(),TimeUnit.MILLISECONDS); } catch (InterruptedException ex) { // do we need to catch InterruptedException here and interrupt the thread? Thread.currentThread().interrupt(); // also do I need throw this RuntimeException at all? throw new RuntimeException("Interrupted",ex); } catch (TimeoutException ex) { DataLogging.logEvents(ex,DataErrorEnum.CLIENT_TIMEOUT,keys); response = new DataResponse(null,DataStatusEnum.ERROR); responseFuture.cancel(true); // terminating the tasks that got timed out so that they don't take up the resources? } catch (Exception ex) { DataLogging.logEvents(ex,DataErrorEnum.ERROR_CLIENT,DataStatusEnum.ERROR); } return response; } // for asynchronous @Override public ListenableFuture<DataResponse> executeAsync(final DataKey keys) { final SettableFuture<DataResponse> responseFuture = SettableFuture.create(); final org.springframework.util.concurrent.ListenableFuture orig = restTemplate.exchange(createURL(keys),HttpMethod.GET,keys.getEntity(),String.class); orig.addCallback( new ListenableFutureCallback<ResponseEntity<String>>() { @Override public void onSuccess(ResponseEntity<String> result) { responseFuture.set(new DataResponse(result.getBody(),DataErrorEnum.OK,DataStatusEnum.SUCCESS)); } @Override public void onFailure(Throwable ex) { DataLogging.logErrors(ex,DataErrorEnum.ERROR_SERVER,keys); responseFuture.set(new DataResponse(null,DataStatusEnum.ERROR)); } }); // propagate cancellation back to the original request responseFuture.addListener(new Runnable() { @Override public void run() { if (responseFuture.isCancelled()) { orig.cancel(false); // I am keeping this false for now } } },MoreExecutors.directExecutor()); return responseFuture; } }
// if they are calling executeSync() method DataResponse response = DataClientFactory.getInstance().executeSync(dataKey); // and if they want to call executeAsync() method Future<DataResponse> response = DataClientFactory.getInstance().executeAsync(dataKey);
现在的问题是 –
// propagate cancellation back to the original request responseFuture.addListener(new Runnable() { @Override public void run() { if (responseFuture.isCancelled()) { orig.cancel(false); // I am keeping this false for now } } },MoreExecutors.directExecutor());
使用当前设置,我可以看到它是抛出CancellationException的一些时间(不是每次) – 这是否意味着我的HTTP请求被取消了?
1. Can we interrupt AsyncRestTemplate call if http request is taking too long?
2. Also am I doing the right thing in catch block of InterruptedException in executeSync method?
正如Phil和Danilo所说,你不需要中断InterruptedException catch块中的当前线程.只要执行请求必须取消,您需要做的任何事情.
3. Is it true that by default AsyncRestTamplete uses blocking calls and request per thread?
是. AsyncRestTamplete的默认构造函数在内部使用SimpleClientHttpRequestFactory和SimpleAsyncTaskExecutor.
* TaskExecutor implementation that fires up a new Thread for each task,* executing it asynchronously. * * Supports limiting concurrent threads through the "concurrencyLimit" * bean property. By default,the number of concurrent threads is unlimited. * * NOTE: This implementation does not reuse threads! Consider a * thread-pooling TaskExecutor implementation instead,in particular for * executing a large number of short-lived tasks. *
public AsyncRestTemplate(AsyncListenableTaskExecutor taskExecutor)
AsyncRestTemplate template = new AsyncRestTemplate(new ConcurrentTaskExecutor(Executors.newCachedThreadPool()));
new AsyncRestTemplate(new HttpComponentsAsyncClientHttpRequestFactory())