五、服务注册
1、实例信息注册器初始化
服务注册的代码位置不容易发现,我们看 DiscoveryClient 初始化调度任务的这个方法,这段代码会去初始化一个实例信息复制器 InstanceInfoReplicator,这个复制器就包含了实例的注册(明明是注册却叫 Replicator 感觉怪怪的)。
① DiscoveryClient 初始化调度器的流程
- 先基于 DiscoveryClient、InstanceInfo 构造 InstanceInfoReplicator,然后还有两个参数为实例信息复制间隔时间(默认30秒)、并发的数量(默认为2)。
- 创建了一个实例状态变更监听器,并注册到 ApplicationInfoManager。当实例状态变更时,就会触发这个监听器,并调用 InstanceInfoReplicator 的 onDemandUpdate 方法。
- 启动 InstanceInfoReplicator,默认延迟40秒,也就是说服务启动可能40秒之后才会注册到注册中心。
1 private void initScheduledTasks() { 2 // 省略定时刷新注册表的任务... 3 4 if (clientConfig.shouldRegisterWithEureka()) { 5 省略定时心跳的任务... 6 7 实例信息复制器,用于定时更新自己状态,并向注册中心注册 8 instanceInfoReplicator = new InstanceInfoReplicator( 9 this,10 instanceInfo,1)">11 clientConfig.getInstanceInfoReplicationIntervalSeconds(),1)">12 2); burstSize 13 14 实例状态变更的监听器 15 statusChangeListener = ApplicationInfoManager.StatusChangeListener() { 16 @Override 17 public String getId() { 18 return "statusChangeListener"; 19 } 20 21 22 public notify(StatusChangeEvent statusChangeEvent) { 23 if (statusChangeEvent.getStatus() == InstanceStatus.DOWN) { 24 logger.error("Saw local status change event {}"25 } else { 26 logger.info("Saw local status change event {}"27 } 28 instanceInfoReplicator.onDemandUpdate(); 29 30 }; 31 32 向 ApplicationInfoManager 注册状态变更监听器 33 (clientConfig.shouldOnDemandUpdateStatusChange()) { 34 applicationInfoManager.registerStatusChangeListener(statusChangeListener); 35 } 36 37 启动实例信息复制器,默认延迟时间40秒 38 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); 39 } 40 logger.info("Not registering with Eureka server per configuration"); 41 } 42 }
② InstanceInfoReplicator 的构造方法
- 创建了一个单线程的调度器
- 设置 started 为 false
- 创建了以分钟为单位的限流器,每分钟默认最多只能调度4次
1 InstanceInfoReplicator(DiscoveryClient discoveryClient,InstanceInfo instanceInfo,int replicationIntervalSeconds,1)">int burstSize) { this.discoveryClient = discoveryClient; 3 this.instanceInfo = instanceInfo; 单线程的调度器 5 this.scheduler = Executors.newScheduledThreadPool(1 6 ThreadFactoryBuilder() 7 .setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d") 8 .setDaemon(true 9 .build()); 10 11 this.scheduledPeriodicRef = new AtomicReference<Future>(); 12 started 设置为 false 13 this.started = new AtomicBoolean(false14 以分钟为单位的限流器 15 this.rateLimiter = RateLimiter(TimeUnit.MINUTES); 16 间隔时间,默认为30秒 17 this.replicationIntervalSeconds = replicationIntervalSeconds; 18 this.burstSize = burstSize; 19 允许每分钟更新的频率 60 * 2 / 30 = 4 20 this.allowedRatePerMinute = 60 * this.burstSize / .replicationIntervalSeconds; 21 logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}"22 }
③ 启动 InstanceInfoReplicator
- 将 started 设置为 true,代表已经启动了
- 调用 instanceInfo.setIsDirty() 方法,将实例设置为 dirty=true,并更新了最后一次设置 dirty 的时间戳
- InstanceInfoReplicator 实现了 Runnable,它本身被当成任务来调度,然后延迟40秒开始调度当前任务,并将 Future 放到本地变量中
void start( initialDelayMs) { 启动时 started 设置为 true if (started.compareAndSet(false,1)">)) { 4 设置为 dirty,便于下一次心跳时同步到 eureka server 5 instanceInfo.setIsDirty(); 6 延迟40秒后开始调度当前任务 7 Future next = scheduler.schedule( 8 将 Future 放到本地变量中 scheduledPeriodicRef.set(next); } 12 13 /////// 14 15 synchronized setIsDirty() { 16 isInstanceInfoDirty = 17 lastDirtyTimestamp = System.currentTimeMillis(); 18 }
2、客户端实例注册
接着看 InstanceInfoReplicator 的 run 方法,这个方法就是完成注册的核心位置。
- 首先会更新实例的信息,如果有变更就会设置 dirty=true
- 如过是 dirty 的,就会调用 DiscoveryClient 的 register 方法注册实例
- 实例注册后,就把 dirty 设置为 false
- 最后在 finally 中继续下一次的调度,默认是每隔30秒调度一次,注意他这里是把调度结果 Future 放到本地变量中
run() { try 3 更新本地实例信息,如果实例信息有变更,则 dirty=true 4 discoveryClient.refreshInstanceInfo(); 5 设置为 dirty 时的时间戳 7 Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { 9 注册实例 discoveryClient.register(); 11 设置 dirty=false 12 instanceInfo.unsetIsDirty(dirtyTimestamp); 14 } catch (Throwable t) { 15 logger.warn("There was a problem with the instance info replicator"16 } finally17 30秒之后再调度 18 Future next = scheduler.schedule(20 21 }
② 实例信息刷新
再来细看下 refreshInstanceInfo 刷新实例信息的方法:
- 首先刷新了数据中心的信息
- 然后刷新续约信息,主要就是将 EurekaClientConfig 的续约配置与本地的续约配置做对比,如果变更了就重新创建续约信息,并设置实例为dirty。这种情况一般就是运行期间动态更新实例的配置,然后重新注册实例信息。
- 接着使用健康检查器检查实例健康状况,从 getHealthCheckHandler 这段代码进去不难发现,我们可以自定义健康检查器,例如当本地的一些资源未创建成功、某些核心线程池down了就认为实例不可用,这个时候就可以自定义健康检查器。如果没有自定义健康检查器,那就直接返回实例当前的状态。我们可以实现 HealthCheckHandler 接口自定义健康检查器。
- 最后就会调用 ApplicationInfoManager 的 setInstanceStatus 设置实例状态,会判断如果状态发生变更,就会发出状态变更的通知,这样就会触发前面定义的状态变更监听器,然后调用 InstanceInfoReplicator 的 onDemandUpdate 方法。
refreshInstanceInfo() { 如果有必要,就更新数据中心的信息 3 applicationInfoManager.refreshDataCenterInfoIfrequired(); 如果有必要,就更新续约信息,比如动态更新了配置文件,这时就更新续约信息 LeaseInfo,并将实例设置为 dirty applicationInfoManager.refreshLeaseInfoIfrequired(); 7 InstanceStatus status; 8 9 用监控检查器检查实例的状态 10 status = getHealthCheckHandler().getStatus(instanceInfo.getStatus()); 11 } (Exception e) { 12 logger.warn("Exception from healthcheckHandler.getStatus,setting status to DOWN"13 status = InstanceStatus.DOWN; 14 15 if (null != status) { 设置实例状态,实例状态变了会触发状态变更的监听器 18 applicationInfoManager.setInstanceStatus(status); 21 22 ////////////////////////////////23 24 refreshLeaseInfoIfrequired() { 25 当前实例续约信息 26 LeaseInfo leaseInfo = instanceInfo.getLeaseInfo(); 27 if (leaseInfo == 28 return30 从配置中获取续约信息 31 int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds(); 32 int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds(); 33 如果续约信息变了,就重新创建续约信息,并设置实例为 dirty 34 if (leaseInfo.getDurationInSecs() != currentLeaseDuration || leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) { 35 LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder() 36 .setRenewalIntervalInSecs(currentLeaseRenewal) 37 .setDurationInSecs(currentLeaseDuration) .build(); 39 instanceInfo.setLeaseInfo(newLeaseInfo); 40 42 43 44 45 46 HealthCheckHandler getHealthCheckHandler() { 47 HealthCheckHandler healthCheckHandler = .healthCheckHandlerRef.get(); 48 if (healthCheckHandler == 49 可以自定义 HealthCheckHandler 实现健康检查 50 healthCheckHandlerProvider) { 51 healthCheckHandler = healthCheckHandlerProvider.get(); 52 } else healthCheckCallbackProvider) { 53 可以自定义 HealthCheckCallback 实现健康检查,HealthCheckCallback 已过期,建议使用 HealthCheckHandler 54 healthCheckHandler = HealthCheckCallbackToHandlerBridge(healthCheckCallbackProvider.get()); 55 56 57 null == healthCheckHandler) { 58 没有自定义的就是用默认的桥接类 59 healthCheckHandler = new HealthCheckCallbackToHandlerBridge(60 61 this.healthCheckHandlerRef.compareAndSet(62 63 64 return 65 66 67 ////////////////////////////////////// 68 69 setInstanceStatus(InstanceStatus status) { 70 InstanceStatus next = instanceStatusMapper.map(status); 71 if (next == 72 73 74 75 如果状态变更了,才会返回之前的状态,然后触发状态变更监听器 76 InstanceStatus prev = instanceInfo.setStatus(next); 77 if (prev != 78 for (StatusChangeListener listener : listeners.values()) { 79 80 listener.notify( StatusChangeEvent(prev,next)); 81 } 82 logger.warn("Failed to notify listener: {}"83 84 85 86 }
③ 向 eureka server 注册
在 run 方法里调用了 discoveryClient.register() 方法实现了客户端实例向注册中心的注册,进入到 register 方法可以看到,他就是使用前面构造的 EurekaTransport 来发起远程调用。
一层层进去,很容易发现就是调用了 eureka-server 的 POST /apps/{appName} 接口,后面我们就从 eureka-core 中找这个接口就可以找到注册中心实现服务注册的入口了。
boolean register() throws Throwable { 2 logger.info(PREFIX + "{}: registering service..." 3 EurekaHttpResponse<Void> httpResponse; registrationClient => JerseyReplicationClient 6 httpResponse = eurekaTransport.registrationClient.register(instanceInfo); 7 } 8 logger.warn(PREFIX + "{} - registration Failed {}"throw e; (logger.isInfoEnabled()) { 12 logger.info(PREFIX + "{} - registration status: {}"return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); 16 17 18 public EurekaHttpResponse<Void> register(InstanceInfo info) { 调用的是 POST apps/{appName} 接口 21 String urlPath = "apps/" + info.getAppName(); 22 ClientResponse response = 23 24 Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); 25 addExtraHeaders(resourceBuilder); 26 response = resourceBuilder 27 .header("Accept-Encoding","gzip" .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) 30 post 方法 31 .post(ClientResponse.class anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); 33 } 34 (logger.isDebugEnabled()) { 35 logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}"36 response == null ? "N/A" : response.getStatus()); 38 if (response != response.close(); 42 }
④ 注册中心设置实例状态为已启动
再回想下注册中心的初始化流程,在最后调用 openForTraffic 方法时,最后也会调用 ApplicationInfoManager 的 setInstanceStatus 方法,将实例状态设置为已启动,这个时候就会触发客户端注册到注册中心的动作。
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
⑤ 完成监听实例变更的方法
状态变更器会调用 onDemandUpdate 方法来完成实例状态变更后的逻辑。
- 它这里一个是用到了限流器来限制每分钟这个方法只能被调用4次,即避免了频繁的注册行为
- 然后在调度时,它会从本地变量中取出上一次调度的 Future,如果任务还没执行完,它会直接取消掉
- 最后就是调用 run 方法,完成服务的注册
boolean onDemandUpdate() { 限流控制 (rateLimiter.acquire(burstSize,allowedRatePerMinute)) { if (!scheduler.isShutdown()) { 5 scheduler.submit( Runnable() { 6 @Override 7 8 logger.debug("Executing on-demand update of local InstanceInfo" 9 10 如果上一次的任务还没有执行完,直接取消掉,然后执行注册的任务 11 Future latestPeriodic = scheduledPeriodicRef.get(); 12 if (latestPeriodic != null && !latestPeriodic.isDone()) { 13 logger.debug("Canceling the latest scheduled update,it will be rescheduled at the end of on demand update"14 latestPeriodic.cancel( } 17 InstanceInfoReplicator..run(); }); 20 21 } 22 logger.warn("Ignoring onDemand update due to stopped scheduler"23 25 } 26 logger.warn("Ignoring onDemand update due to rate limiter"27 29 }
⑥ 限流器
最后简单看下限流器 RateLimiter 的设计:
- 从它的注释中可以看出,eureka 的 RateLimiter 是基于令牌桶算法实现的限流器
- acquire 方法有两个参数:
- burstSize:允许以突发方式进入系统的最大请求数
- averageRate:设置的时间窗口内允许进入的请求数
/** 2 * Rate limiter implementation is based on token bucket algorithm. There are two parameters: * <ul> * <li> * burst size - maximum number of requests allowed into the system as a burst * </li> 8 * average rate - expected number of requests per second (RateLimiters using MINUTES is also supported) * </ul> * * @author Tomasz Bak 13 */ RateLimiter { final long rateToMsConversion; 17 final AtomicInteger consumedTokens = AtomicInteger(); final AtomicLong lastRefillTime = new AtomicLong(0 @Deprecated 22 RateLimiter() { 23 (TimeUnit.SECONDS); 25 26 RateLimiter(TimeUnit averageRateUnit) { switch (averageRateUnit) { 28 case SECONDS: 29 rateToMsConversion = 1000break31 MINUTES: 32 rateToMsConversion = 60 * 100033 34 default: 35 throw new IllegalArgumentException("TimeUnit of " + averageRateUnit + " is not supported"38 39 boolean acquire(int burstSize,1)"> averageRate) { 40 acquire(burstSize,averageRate,System.currentTimeMillis()); 42 43 long averageRate,1)"> currentTimeMillis) { 44 if (burstSize <= 0 || averageRate <= 0) { Instead of throwing exception,we just let all the traffic go 45 47 48 refillToken(burstSize,currentTimeMillis); consumeToken(burstSize); 50 51 52 void refillToken(53 上一次填充 token 的时间 54 long refillTime = lastRefillTime.get(); 55 时间差 56 long timeDelta = currentTimeMillis - refillTime; 固定生成令牌的速率,即每分钟4次 58 例如刚好间隔15秒进来一个请求,就是 15000 * 4 / 60000 = 1,newTokens 代表间隔了多少次,如果等于0,说明间隔不足15秒 59 long newTokens = timeDelta * averageRate /60 if (newTokens > 061 long newRefillTime = refillTime == 0 62 ? currentTimeMillis 63 注意这里不是直接设置的当前时间戳,而是根据 newTokens 重新计算的,因为有可能同一周期内同时有多个请求进来,这样可以保持一个固定的周期 64 : refillTime + newTokens * rateToMsConversion / averageRate; 65 (lastRefillTime.compareAndSet(refillTime,newRefillTime)) { 66 while (67 调整令牌的数量 68 int currentLevel = consumedTokens.get(); 69 int adjustedLevel = Math.min(currentLevel,burstSize); 70 currentLevel 可能为2,重置为了 0 或 1 71 int newLevel = (int) Math.max(0,adjustedLevel - newTokens); 72 (consumedTokens.compareAndSet(currentLevel,newLevel)) { 73 74 75 76 77 78 79 80 boolean consumeToken(81 82 83 突发数量为2,也就是允许15秒内最多有两次请求进来 84 if (currentLevel >=85 86 87 if (consumedTokens.compareAndSet(currentLevel,currentLevel + 188 89 90 91 92 93 reset() { 94 consumedTokens.set(095 lastRefillTime.set(096 97 }
3、Eureka Server 接收注册请求
① 找到实例注册的API入口
从前面的分析中,我们知道服务端注册的API是 POST /apps/{appName},由于 eureka 是基于 jersey 来通信的,想找到API入口还是有点费劲的,至少没有 springmvc 那么容易。
先看 ApplicationsResource 这个类,可以找到 getApplicationResource 这个方法的路径是符合 /apps/{appName} 这个规则的。然后可以看到它里面创建了 ApplicationResource,再进入到这个类里面,就可以找到 @Post 标注的 addInstance 方法,这就是注册的入口了。可以看到它是调用了注册表的 register 方法来注册实例的。
1 @Path("/{version}/apps" 2 @Produces({"application/xml","application/json"}) ApplicationsResource { final EurekaServerConfig serverConfig; PeerAwareInstanceRegistry registry; 6 ResponseCache responseCache; 7 符合规则 /apps/{appName} 9 @Path("{appId}"10 ApplicationResource getApplicationResource( 11 @PathParam("version") String version,1)">12 @PathParam("appId") String appId) { CurrentRequestVersion.set(Version.toEnum(version)); 15 真正的入口 16 ApplicationResource(appId,serverConfig,registry); 17 } CurrentRequestVersion.remove(); 22 23 24 25 @Produces({"application/xml",1)">26 ApplicationResource { 27 28 29 @POST 31 @Consumes({"application/json","application/xml" Response addInstance(InstanceInfo info,1)">33 @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { 34 logger.debug("Registering instance {} (replication={})"35 36 registry.register(info,"true".equals(isReplication)); return Response.status(204).build(); 204 to be backwards compatible 39 }
addInstance 接口有两个参数:
- InstanceInfo:服务实例,主要有两块数据:
- isReplication:这个参数是从请求头中取的,表示是否是在同步 server 节点的实例。在集群模式下,因为客户端实例注册到注册中心后,会同步到其它 server节点,所以如果是eureka-server之间同步信息,这个参数就为 true,避免循环同步。
② 实例注册
进入到注册表的 register 方法,可以看到主要就是调用父类的 register 方法注册实例,然后同步到 eureka server 集群中的其它 server 节点。集群同步放到后面来看,现在只需要知道注册实例时会同步到其它server节点即可。
@Override void register(final InstanceInfo info,1)"> isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; 如果实例中没有周期的配置,就设置为默认的 90 秒 if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0 6 leaseDuration = info.getLeaseInfo().getDurationInSecs(); 9 super.register(info,leaseDuration,1)"> 复制到集群其它 server 节点 11 replicateToPeers(Action.Register,info.getAppName(),info,1)">12 }
- 首先可以看到eureka server保存注册表(registry)的数据结构是 ConcurrentHashMap<String,Map<String,Lease<InstanceInfo>>>,key 就是服务名称,value 就是对应的实例,因为一个服务可能会部署多个实例。
- 根据服务名称从注册表拿到实例表,然后根据实例ID拿到实例的租约信息 Lease<InstanceInfo>
- 如果租约信息存在,说明已经注册过相同的实例了,然后就对比已存在实例和新注册实例的最后更新时间,如果新注册的是旧的,就替换为已存在的实例来完成注册
- 如果租约信息不存在,说明是一个新注册的实例,这时会更新两个阈值:
- 然后就根据注册的实例信息和续约周期创建新的租约,并放入注册表中去
- 接着根据当前时间戳、服务名称、实例ID封装一个 Pair,然后放入到最近注册的队列中 recentRegisteredQueue,先记住这个队列就行了
- 根据实例的 overriddenStatus 判断,不为空的话,可能就只是要更新实例的状态,这个时候就会只变更实例的状态,而不会改变 dirty
- 然后是设置了实例的启动时间戳,设置了实例的 ActionType 为 ADDED
- 将租约加入到最近变更的队列 recentlyChangedQueue,先记住这个队列
- 最后一步失效缓存,一步步进去可以发现,主要就是将读写缓存 readWriteCacheMap 中与这个实例相关的缓存失效掉,这个缓存后面分析抓取注册表的时候再来细看
void register(InstanceInfo registrant,1)">int leaseDuration,1)"> read.lock(); registry => ConcurrentHashMap<String,Lease<InstanceInfo>>> 5 Map<String,Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); if (gMap == 8 初次注册时,创建一个 ConcurrentHashMap,key 为 appName final ConcurrentHashMap<String,Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String,Lease<InstanceInfo>>10 gMap = registry.putIfAbsent(registrant.getAppName(),gNewMap); 12 gMap = gNewMap; 15 Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); 16 Retain the last dirty timestamp without overwriting it,if there is already a lease if (existingLease != null && (existingLease.getHolder() != 18 已存在的实例的最后更新时间 19 Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); 新注册的实例的最后更新时间 21 Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); 22 logger.debug("Existing lease found (existing={},provided={}"24 如果存在的实例比新注册尽量的实例后更新,就直接把新注册的实例设置为已存在的实例 25 if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { 26 logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + 27 " than the one that is being registered {}"28 logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"29 registrant = existingLease.getHolder(); 31 } 32 新注册时,续约信息不存在 33 synchronized (lock) { 34 this.expectedNumberOfClientsSendingRenews > 035 Since the client wants to register it,increase the number of clients sending renews 36 期望续约的客户端数量 + 1 37 this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 138 更新每分钟续约请求次数的阀值,这个阀值在后面很多地方都会用到 updateRenewsPerMinThreshold(); 42 logger.debug("No prevIoUs lease information found; it is new registration"43 创建新的续约 45 Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant,leaseDuration); 46 47 lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); 49 gMap.put(registrant.getId(),lease); 放入最近注册的队列 51 recentRegisteredQueue.add(new Pair<Long,String>( 52 System.currentTimeMillis(),1)">53 registrant.getAppName() + "(" + registrant.getId() + ")")); 覆盖状态 InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { 56 logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " 57 + "overrides"overriddenInstanceStatusMap.containsKey(registrant.getId())) { 59 logger.info("Not found overridden id {} and hence adding it" overriddenInstanceStatusMap.put(registrant.getId(),registrant.getOverriddenStatus()); 61 63 InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); 64 if (overriddenStatusFromMap != 65 logger.info("Storing overridden status {} from map"66 registrant.setOverriddenStatus(overriddenStatusFromMap); 69 Set the status based on the overridden status rules 70 InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant,existingLease,1)">71 仅仅是变更实例状态,不会设置为 dirty 72 registrant.setStatusWithoutDirty(overriddenInstanceStatus); 73 74 If the lease is registered with UP status,set lease service up timestamp 75 (InstanceStatus.UP.equals(registrant.getStatus())) { 76 UP 时设置 Lease 的时间戳 lease.serviceUp(); 79 设置动作是 ADDED,这个在后面会做 switch 判断 80 registrant.setActionType(ActionType.ADDED); 添加到最近变更的队列 82 recentlyChangedQueue.add( RecentlyChangedItem(lease)); 83 设置最后更新时间 registrant.setLastUpdatedTimestamp(); 85 失效缓存 invalidateCache(registrant.getAppName(),registrant.getVIPAddress(),registrant.getSecureVipAddress()); 87 logger.info("Registered instance {}/{} with status {} (replication={})"88 registrant.getAppName(),registrant.getId(),registrant.getStatus(),1)">89 } read.unlock(); 92 }
更新每分钟续约次数的阈值:
1 protected updateRenewsPerMinThreshold() { 2 每分钟续约阈值 = 期望续约的客户端数量 * (60 / 续约间隔时间) * 续约百分比 3 例如,一共注册了 10 个实例,那么期望续约的客户端数量为 10,间隔时间默认为 30秒,就是每个客户端应该每30秒发送一次心跳,续约百分比默认为 0.85 4 每分钟续约次数阈值 = 10 * (60.0 / 30) * 0.85 = 17,也就是说每分钟至少要接收到 17 此续约请求 5 this.numberOfRenewsPerMinThreshold = (int) (.expectedNumberOfClientsSendingRenews 6 * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds()) 7 * serverConfig.getRenewalPercentThreshold()); 8 }
这就是注册表 registry 缓存服务实例信息的结构,可以看出 eureka 是基于内存来组织注册表的,使用的是 ConcurrentHashMap 来保证多线程并发安全。
4、Eureka Server 控制台
前面已经将服务实例注册上去了,现在来看下 eureka server 的控制台页面是怎么获取这些数据的。
前面已经分析过 eureka-server 的 web.xml 中配置了欢迎页为 status.jsp ,这就是控制台的页面。
从 status.jsp 可以看出,其实就是从 EurekaServerContext 上下文获取注册表,然后读取注册表注册的服务实例,然后遍历展示到表格中。
1 <%@ page language="java" import="java.util.*,java.util.Map.Entry,com.netflix.discovery.shared.Pair,1)"> 2 com.netflix.discovery.shared.*,com.netflix.eureka.util.*,com.netflix.appinfo.InstanceInfo.* 3 com.netflix.appinfo.DataCenterInfo.*,com.netflix.appinfo.AmazonInfo.MetaDataKey,com.netflix.eureka.resources.* 4 com.netflix.eureka.*,com.netflix.appinfo.*,com.netflix.eureka.util.StatusUtil" pageEncoding="UTF-8" %> 5 <% 6 String path = request.getContextPath(); 7 String basePath = request.getScheme()+"://"+request.getServerName()+":"+request.getServerPort()+path+"/" 8 %> 9 10 <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"> 11 12 <html> 13 <head> 14 <base href="<%=basePath%>"> 15 16 <title>Eureka</title> 17 <link rel="stylesheet" type="text/css" href="./css/main.css"> 18 <script type="text/javascript" src="./js/jquery-1.11.1.js" ></script> 19 <script type="text/javascript" src="./js/jquery.dataTables.js" ></script> 20 <script type="text/javascript" > 21 $(document).ready(function() { 22 $('table.stripeable tr:odd').addClass('odd' 23 $('table.stripeable tr:even').addClass('even' 24 $('#instances thead th').each(function () { 25 var title = $('#instances thead th').eq($().index()).text(); 26 $(this).html(title + '</br><input type="text" placeholder="Search ' + title + '" />' 27 }); 28 DataTable 29 var table = $('#instances').DataTable({"paging": }); 30 Apply the search 31 table.columns().eq(0).each(function (colIdx) { 32 $('input',table.column(colIdx).header()).on('keyup change' 33 table.column(colIdx).search(.value).draw(); 34 }); 35 36 }); 37 </script> 38 </head> 39 40 <body id="one"> 41 <jsp:include page="header.jsp" /> 42 <jsp:include page="navbar.jsp" /> 43 <div id="content"> 44 <div class="sectionTitle">Instances currently registered with Eureka</div> 45 <table id='instances' class="stripeable"> 46 <thead><tr><th>Application</th><th>AMIs</th><th>Availability Zones</th><th>Status</th></tr></thead> 47 <tfoot><tr><th>Application</th><th>AMIs</th><th>Availability Zones</th><th>Status</th></tr></tfoot> 48 <tbody> 49 <% 50 获取 eureka server 上下文 EurekaServerContext 51 EurekaServerContext serverContext = (EurekaServerContext) pageContext.getServletContext() 52 .getAttribute(EurekaServerContext..getName()); 53 从上下文中取出注册表, 54 (Application app : serverContext.getRegistry().getSortedApplications()) { 55 out.print("<tr><td><b>" + app.getName() + "</b></td>" 56 Map<String,Integer> amiCounts = new HashMap<String,Integer> 57 Map<InstanceStatus,List<Pair<String,String>>> instancesByStatus = 58 new HashMap<InstanceStatus,String>>> 59 Map<String,Integer> zoneCounts = 60 61 (InstanceInfo info : app.getInstances()){ 62 String id = info.getId(); 63 String url = info.getStatusPageUrl(); 64 InstanceStatus status = info.getStatus(); 65 String ami = "n/a" 66 String zone = "" 67 if(info.getDataCenterInfo().getName() == Name.Amazon){ 68 AmazonInfo dcInfo = (AmazonInfo)info.getDataCenterInfo(); 69 ami = dcInfo.get(MetaDataKey.amiId); 70 zone = dcInfo.get(MetaDataKey.availabilityZone); 71 } 72 73 Integer count = amiCounts.get(ami); 74 if(count != ){ 75 amiCounts.put(ami,Integer.valueOf(count.intValue()+1 76 } 77 amiCounts.put(ami,Integer.valueOf(1 78 79 80 count = zoneCounts.get(zone); 81 82 zoneCounts.put(zone,1)"> 83 } 84 zoneCounts.put(zone,1)"> 85 86 List<Pair<String,String>> list = instancesByStatus.get(status); 87 88 if(list == 89 list = new ArrayList<Pair<String,String>> 90 instancesByStatus.put(status,list); 91 92 list.add(new Pair<String,1)">(id,url)); 93 } 94 StringBuilder buf = StringBuilder(); 95 for (Iterator<Entry<String,Integer>> iter = 96 amiCounts.entrySet().iterator(); iter.hasNext();) { 97 Entry<String,Integer> entry = iter.next(); 98 buf.append("<b>").append(entry.getKey()).append("</b> (").append(entry.getValue()).append(")," 99 100 out.println("<td>" + buf.toString() + "</td>"101 buf = 102 103 zoneCounts.entrySet().iterator(); iter.hasNext();) { 104 Entry<String,1)">105 buf.append("<b>").append(entry.getKey()).append("</b> (").append(entry.getValue()).append("),1)">106 107 out.println("<td>" + buf.toString() + "</td>"108 buf = 109 for (Iterator<Entry<InstanceStatus,String>>>> iter = 110 instancesByStatus.entrySet().iterator(); iter.hasNext();) { 111 Entry<InstanceStatus,String>>> entry =112 List<Pair<String,String>> value = entry.getValue(); 113 InstanceStatus status = entry.getKey(); 114 if(status != InstanceStatus.UP){ 115 buf.append("<font color=red size=+1><b>"116 117 buf.append("<b>").append(status.name()).append("</b> (").append(value.size()).append(") - "118 119 buf.append("</font></b>"120 121 122 for(Pair<String,1)"> p : value) { 123 String id = p.first(); 124 String url = p.second(); 125 if(url != null && url.startsWith("http")){ 126 buf.append("<a href=\"").append(url).append("\">"127 }128 url = 129 } 130 buf.append(id); 131 132 buf.append("</a>"133 134 buf.append(",1)">135 136 137 out.println("<td>" + buf.toString() + "</td></tr>"138 } 139 %> 140 </tbody> 141 </table> 142 </div> 143 <div> 144 <div class="sectionTitle">General Info</div> 145 <table id='generalInfo' 146 <tr><th>Name</th><th>Value</th></tr> 147 <% 148 StatusInfo statusInfo = ( StatusUtil(serverContext)).getStatusInfo(); 149 Map<String,String> genMap = statusInfo.getGeneralStats(); 150 for (Map.Entry<String,1)"> entry : genMap.entrySet()) { 151 out.print("<tr>"152 out.print("<td>" + entry.getKey() + "</td><td>" + entry.getValue() + "</td>"153 out.print("</tr>"154 155 Map<String,String> appMap = statusInfo.getApplicationStats(); 156 entry : appMap.entrySet()) { 157 out.print("<tr>"158 out.print("<td>" + entry.getKey() + "</td><td>" + entry.getValue() + "</td>"159 out.print("</tr>"160 161 %> 162 </table> 163 </div> 164 <div> 165 <div class="sectionTitle">Instance Info</div> 166 <table id='instanceInfo' 167 <tr><th>Name</th><th>Value</th></tr> 168 <% 169 InstanceInfo instanceInfo = statusInfo.getInstanceInfo(); 170 Map<String,String> instanceMap = 171 instanceMap.put("ipAddr"172 instanceMap.put("status"173 if(instanceInfo.getDataCenterInfo().getName() == DataCenterInfo.Name.Amazon) { 174 AmazonInfo info = (AmazonInfo) instanceInfo.getDataCenterInfo(); 175 instanceMap.put("availability-zone"MetaDataKey.availabilityZone)); 176 instanceMap.put("public-ipv4"MetaDataKey.publicIpv4)); 177 instanceMap.put("instance-id"MetaDataKey.instanceId)); 178 instanceMap.put("public-hostname"MetaDataKey.publicHostname)); 179 instanceMap.put("ami-id"MetaDataKey.amiId)); 180 instanceMap.put("instance-type"MetaDataKey.instanceType)); 181 182 entry : instanceMap.entrySet()) { 183 out.print("<tr>"184 out.print("<td>" + entry.getKey() + "</td><td>" + entry.getValue() + "</td>"185 out.print("</tr>"186 187 %> 188 </table> 189 </div> 190 191 </body> 192 </html>
5、服务注册的整体流程图
下面通过一张图来看看服务实例注册的整个流程。
六、抓取注册表
1、Eureka Client 启动时全量抓取注册表
客户端启动初始化 DiscoveryClient 时,其中有段代码如下:这一步调用 fetchRegistry 就是在启动时全量抓取注册表缓存到本地中。
(clientConfig.shouldFetchRegistry()) { 拉取注册表:全量抓取和增量抓取 boolean primaryFetchRegistryResult = fetchRegistry(primaryFetchRegistryResult) { 6 logger.info("Initial registry fetch from primary servers Failed"boolean backupFetchRegistryResult = if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) { 10 backupFetchRegistryResult = 11 logger.info("Initial registry fetch from backup servers Failed"13 if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) { 14 new IllegalStateException("Fetch registry error at startup. Initial fetch Failed." (Throwable th) { 17 logger.error("Fetch registry error at startup: {}"18 IllegalStateException(th); 20 }
进入 fetchRegistry 方法,可以看到,首先获取本地的 Applications,如果为空就会调用 getAndStoreFullRegistry 方法全量抓取注册表并缓存到本地。
boolean fetchRegistry( forceFullRegistryFetch) { 2 Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); 获取本地的应用实例 6 Applications applications = getApplications(); (clientConfig.shouldDisableDelta() 9 || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) 10 || forceFullRegistryFetch 11 || (applications == 12 || (applications.getRegisteredApplications().size() == 013 || (applications.getVersion() == -1)) Client application does not have latest library supporting delta { 全量抓取注册表 getAndStoreFullRegistry(); 增量更新注册表 getAndUpdateDelta(applications); applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); 23 } (Throwable e) { 24 logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}" appPathIdentifier,clientConfig.getRegistryFetchIntervalSeconds(),ExceptionUtils.getStackTrace(e)); 26 27 } if (tracer != tracer.stop(); 31 32 发出缓存刷新的通知 onCacheRefreshed(); 36 Update remote status based on refreshed data held in the cache updateInstanceRemoteStatus(); registry was fetched successfully,so return true 40 41 }
进入 getAndStoreFullRegistry 方法可以发现,就是调用 GET /apps 接口抓取全量注册表,因此等会服务端就从这个入口进去看抓取全量注册表的逻辑。注册表抓取回来之后,就放到本地变量 localRegionApps 中。
41 }
2、Eureka Server 注册表多级缓存机制
① 全量抓取注册表的接口
全量抓取注册表的接口是 GET /apps,跟找注册接口是类似的,最终可以找到 ApplicationsResource 的 getContainers 方法就是全量抓取注册表的入口。
- 可以看出,我们可以通过请求头来指定返回 xml 格式还是 json 格式,可以指定是否要压缩返回等。
- 然后创建了全量缓存的 Key
- 接着根据缓存的 key 从 responseCache 中全量抓取注册表
@GET public Response getContainers(@PathParam("version" @HeaderParam(HEADER_ACCEPT) String acceptHeader,1)"> @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,1)"> @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,1)"> @Context UriInfo uriInfo,1)"> 7 @Nullable @QueryParam("regions") String regionsStr) { 省略部分代码... JSON 类型 11 KeyType keyType = Key.KeyType.JSON; 12 String returnMediaType = MediaType.APPLICATION_JSON; if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) { 14 keyType = Key.KeyType.XML; 15 returnMediaType = MediaType.APPLICATION_XML; 全量注册表的缓存key 19 Key cacheKey = Key(Key.EntityType.Application,1)"> ResponseCacheImpl.ALL_APPS,1)"> keyType,CurrentRequestVersion.get(),EurekaAccept.fromString(eurekaAccept),regions ); Response response; if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { 压缩返回 27 response = Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING,HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE,returnMediaType) 31 } 根据缓存 key 从 responseCache 获取全量注册表 33 response = Response.ok(responseCache.get(cacheKey)) CurrentRequestVersion.remove(); 37 response; 38 }
② ResponseCache 多级缓存读取
ResponseCache 就是 eureka server 读取注册表的核心组件,它的内部采用了多级缓存的机制来快速响应客户端抓取注册表的请求,下面就来看看 ResponseCache。
缓存读取的流程:
- 如果设置了使用只读缓存(默认true),就先从只读缓存 readOnlyCacheMap 中读取;readOnlyCacheMap 使用 ConcurrentHashMap 实现,ConcurrentHashMap 支持并发访问,读取速度很快。
- 如果读写缓存中没有,就从读写缓存 readWriteCacheMap 中读取,读取出来后并写入到只读缓存中;readWriteCacheMap 使用 google guava 的 LoadingCache 实现,LoadingCache 支持在没有元素的时候使用 CacheLoader 加载元素。
- 如果没有开启使用只读缓存,就直接从读写缓存中获取。
public String get( Key key) { get(key,shouldUseReadOnlyResponseCache); 4 //////////////////////////////////////////////////// 7 String get(final Key key,1)"> useReadOnlyCache) { => getValue 9 Value payload = getValue(key,useReadOnlyCache); if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) { 11 12 } payload.getPayload(); 19 Value getValue(20 Value payload = 21 22 (useReadOnlyCache) { 开启使用只读缓存,则先从只读缓存读取 readOnlyCacheMap => ConcurrentHashMap<Key,Value> final Value currentPayload = readOnlyCacheMap.get(key); 26 if (currentPayload != 27 payload = currentPayload; 28 } 29 只读缓存中没有,则从读写缓存中读取,然后放入只读缓存中 readWriteCacheMap => LoadingCache<Key,1)">31 payload = readWriteCacheMap.get(key); 32 readOnlyCacheMap.put(key,payload); 34 } 35 未开启只读缓存,就从读写缓存中读取 36 payload =38 } 39 logger.error("Cannot get value for key : {}"41 payload; 42 }
③ ResponseCache 初始化
分析 eureka server EurekaBootStrap 启动初始化时,最后有一步去初始化 eureka server 上下文,它里面就会去初始化注册表,初始化注册表的时候就会初始化 ResponseCache,这里就来分析下这个初始化干了什么。
- 主要就是使用 google guava cache 构造了一个读写缓存 readWriteCacheMap,初始容量为 1000。注意这个读写缓存的特性:每隔 180 秒定时过期,然后元素不存在的时候就会使用 CacheLoader 从注册表中读取。
- 接着如果配置了使用只读缓存,还会开启一个定时任务,每隔30秒将读写缓存 readWriteCacheMap 的数据同步到只读缓存 readOnlyCacheMap。
ResponseCacheImpl(EurekaServerConfig serverConfig,ServerCodecs serverCodecs,AbstractInstanceRegistry registry) { this.serverConfig = serverConfig; this.serverCodecs = serverCodecs; 是否使用只读缓存,默认为 true this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache(); 保存注册表 7 this.registry = registry; 缓存更新间隔时间,默认30秒 long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs(); 使用 google guava cache 构造一个读写缓存 this.readWriteCacheMap = 12 初始容量为1000 CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache()) 14 缓存的数据在写入多久后过期,默认180秒,也就是说 readWriteCacheMap 会定时过期 .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(),TimeUnit.SECONDS) 16 .removalListener(new RemovalListener<Key,Value>() { @Override 18 void onRemoval(RemovalNotification<Key,1)"> notification) { 19 Key removedKey = notification.getKey(); 20 (removedKey.hasRegions()) { 21 Key cloneWithNoRegions = removedKey.cloneWithoutRegions(); regionSpecificKeys.remove(cloneWithNoRegions,removedKey); } } }) 26 当key对应的元素不存在时,使用定义 CacheLoader 加载元素 27 .build(new CacheLoader<Key,1)">29 public Value load(Key key) Exception { 30 (key.hasRegions()) { 31 Key cloneWithNoRegions = key.cloneWithoutRegions(); regionSpecificKeys.put(cloneWithNoRegions,key); 34 获取元素 35 Value value = generatePayload(key); 36 value; }); 39 (shouldUseReadOnlyResponseCache) { 41 如果配置了使用只读缓存,就开启一个定时任务,定期将 readWriteCacheMap 的数据同步到 readOnlyCacheMap 中 42 默认间隔时间是 30 秒 timer.schedule(getCacheUpdateTask(),1)">44 new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) 45 + responseCacheUpdateIntervalMs),1)"> responseCacheUpdateIntervalMs); 48 49 50 Monitors.registerObject(51 } 52 logger.warn("Cannot register the JMX monitor for the InstanceRegistry"53 54 }
generatePayload 方法:
private Value generatePayload(Key key) { 2 Stopwatch tracer = String payload; (key.getEntityType()) { Application: boolean isRemoteRegionRequested = key.hasRegions(); 8 获取所有应用 10 (ALL_APPS.equals(key.getName())) { 11 (isRemoteRegionRequested) { 12 tracer = serializeAllAppsWithRemoteRegionTimer.start(); 13 payload = getPayLoad(key,registry.getApplicationsFromMultipleRegions(key.getRegions())); 14 } 15 tracer = serializeAllAppsTimer.start(); 16 从注册表读取所有服务实例 17 payload =20 增量获取应用 21 (ALL_APPS_DELTA.equals(key.getName())) { 22 23 tracer = serializeDeltaAppsWithRemoteRegionTimer.start(); versionDeltaWithRegions.incrementAndGet(); versionDeltaWithRegionsLegacy.incrementAndGet(); 26 payload = registry.getApplicationDeltasFromMultipleRegions(key.getRegions())); 28 } 29 tracer = serializeDeltaAppsTimer.start(); versionDelta.incrementAndGet(); versionDeltaLegacy.incrementAndGet(); 32 payload =34 } 35 tracer = serializeOneApptimer.start(); 36 payload =38 39 VIP: 40 SVIP: 41 tracer = serializeViptimer.start(); 42 payload =43 44 45 logger.error("Unidentified entity type: {} found in the cache key."46 payload = ""47 Value(payload); 50 } 51 54 55 }
3、Eureka Server 注册表多级缓存过期机制
这节来总结下 eureka server 注册表多级缓存的过期时机,其实前面都已经分析过了。
① 主动过期
分析服务注册时已经说过,服务注册完成后,调用了 invalidateCache 来失效缓存,进去可以看到就是将读写缓存 readWriteCacheMap 中的服务、所有服务、增量服务的缓存失效掉。
那这里就要注意了,如果服务注册、下线、故障之类的,这里只是失效了读写缓存,然后可能要间隔30秒才能同步到只读缓存 readOnlyCacheMap,那么其它客户端可能要隔30秒后才能感知到。
invalidateCache(String appName,@Nullable String vipAddress,@Nullable String secureVipAddress) { invalidate cache 3 responseCache.invalidate(appName,vipAddress,secureVipAddress); 4 }
缓存失效:
invalidate(String appName,1)"> (Key.KeyType type : Key.KeyType.values()) { (Version v : Version.values()) { invalidate( 6 失效服务的缓存 7 8 9 失效所有 APP 的缓存 失效增量 APP 的缓存 13 ); vipAddress) { 17 invalidate( Key(Key.EntityType.VIP,EurekaAccept.full)); 19 secureVipAddress) { 20 invalidate( Key(Key.EntityType.SVIP,secureVipAddress,1)"> invalidate(Key... keys) { (Key key : keys) { 28 logger.debug("Invalidating the response cache key : {} {} {} {},{}" key.getEntityType(),key.getName(),key.getVersion(),key.getType(),key.getEurekaAccept()); 30 31 失效读写缓存 readWriteCacheMap.invalidate(key); 33 Collection<Key> keysWithRegions = regionSpecificKeys.get(key); null != keysWithRegions && !keysWithRegions.isEmpty()) { (Key keysWithRegion : keysWithRegions) { 36 logger.debug("Invalidating the response cache key : {} {} {} {} {}" key.getEntityType(),1)"> readWriteCacheMap.invalidate(keysWithRegion); 42 }
② 定时过期
读写缓存 readWriteCacheMap 在构建的时候,指定了一个自动过期的时间,默认值是180秒,所以往 readWriteCacheMap 中放入一个数据过后,等180秒过后,它就自动过期了。然后下次读取的时候发现缓存中没有这个 key,就会使用 CacheLoader 重新加载到这个缓存中。
这种定时过期机制就是每隔一段时间来同步注册表与缓存的数据。
③ 被动过期
初始化 ResponseCache 时,如果启用了只读缓存,就会创建一个定时任务(每隔30秒运行一次)来同步 readWriteCacheMap 与 readOnlyCacheMap 中的数据,对于 readOnlyCacheMap 来说这就是一种被动过期。
@H_404_3188@
TimerTask getCacheUpdateTask() { TimerTask() { @Override 5 logger.debug("Updating the client cache from response cache" (Key key : readOnlyCacheMap.keySet()) { 8 logger.debug("Updating the client cache from response cache for key : {} {} {} {}" key.getEntityType(),key.getType()); 11 CurrentRequestVersion.set(key.getVersion()); 获取读写缓存中的数据 14 Value cacheValue =15 获取只读缓存中的数据 16 Value currentCacheValue =17 如果 readOnlyCacheMap 中缓存的值与 readWriteCacheMap 缓存的值不同,就用 readWriteCacheMap 的值覆盖 readOnlyCacheMap 的值 18 if (cacheValue != currentCacheValue) { readOnlyCacheMap.put(key,cacheValue); 21 } 22 logger.error("Error while updating the client cache from response cache for key {}"23 } CurrentRequestVersion.remove(); }; 29 }