Eureka 是 Netflix 公司开源的一个服务注册与发现的组件,和其他 Netflix 公司的服务组件(例如负载均衡、熔断器、网关等)一起,被 Spring Cloud 整合为 Spring Cloud Netflix 模块。不过 Eureka 2.0 开始闭源了,但 1.x 还在继续维护中,可以继续使用。这篇文章就来深入学习下 Eureka 注册中心,便于我们更好的使用和调优注册中心。
关于版本:本文章使用的 Spring cloud 版本为 Hoxton.SR8,Spring boot 版本为 2.3.3.RELEASE,依赖的 eureka 版本则为 1.9.25。
一、Eureka 初体验
Eureka 分为 Eureka Server 和 Eureka Client,Eureka Server 为 Eureka 注册中心,Eureka Client 为 Eureka 客户端。这节先通过demo把注册中心的架子搭起来,看看注册中心的基础架构。
1、Eureka Server
① 创建注册中心服务:sunny-register
首先创建一个 maven 工程,服务名称为 sunny-register,并在 pom.xml 中引入注册中心服务端的依赖。
1 <dependencies> 2 dependency3 groupId>org.springframework.cloud</4 artifactId>spring-cloud-starter-netflix-eureka-server5 6 >
在 resources 下添加 application.yml 配置文件,并添加注册中心相关配置。
1 server: 2 port: 8000 3 spring: 4 application: 5 name: sunny-register 6 7 eureka: 8 instance: 9 hostname: dev.lyyzoo.com 10 client: 11 # 是否向注册中心注册自己 12 register-with-eureka: false 13 # 是否检索服务 14 fetch-registry: 15 service-url: 16 defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/
③ 添加启动类
添加启动类,并在启动类上加上 @EnableEurekaServer 注解,启用注册中心。
package com.lyyzoo.sunny.register; 2 import org.springframework.boot.SpringApplication; org.springframework.boot.autoconfigure.SpringBootApplication; 5 org.springframework.cloud.netflix.eureka.server.EnableEurekaServer; @EnableEurekaServer @SpringBootApplication public class RegisterApplication { 10 11 static void main(String[] args) { 12 SpringApplication.run(RegisterApplication.,args); } 14 }
④ 启动注册中心
启动注册中心后,访问 http://dev.lyyzoo.com:8000/,就可以看到注册中心的页面了,现在还没有实例注册上来。(dev.lyyzoo.com 在本地 hosts 文件中映射到 127.0.0.1)
2、Eureka Client
创建两个 demo 服务,demo-producer 服务作为生产者提供一个接口,demo-consumer 服务作为消费者去调用 demo-producer 的接口。
① 创建客户端服务:demo-producer
创建maven工程,服务名称为 demo-producer,在 pom.xml 中引入注册中心客户端的依赖,并添加了 web 的依赖。
> >spring-cloud-starter-netflix-eureka-client>org.springframework.boot>spring-boot-starter-web>
在 resouces 下添加 application.yml 配置文件,添加注册中心客户端相关的配置。
2 port: 8010 5 name: demo-producer serviceUrl: 10 defaultZone: ${EUREKA_DEFAULT_ZONE:http:dev.lyyzoo.com:8000/eureka}
③ 添加启动类
添加启动类,并在启动类上加上 @EnableEurekaClient 注解,启用客户端。
@EnableEurekaClient 2 3 ProducerApplication { 4 6 SpringApplication.run(ProducerApplication.7 8 }
④ 添加一个 rest 接口
@RestController 2 DemoController { 3 4 private final Logger logger = LoggerFactory.getLogger(getClass()); 5 6 @GetMapping("/v1/uuid") 7 public ResponseEntity<String> getUUID() { 8 String uuid = UUID.randomUUID().toString(); 9 logger.info("generate uuid: {}"10 return ResponseEntity.ok(uuid); 12 }
⑤ 创建客户端服务:demo-consumer
类似的方式,再创建消费者服务:demo-producer,这个服务中添加一个消费者接口,通过 RestTemplate 负载均衡的方式来调用 demo-producer 的接口。
因此需要先配置一个带有负载均衡的 RestTemplate:
ConsumerApplication { 4 @Bean 6 @LoadBalanced public RestTemplate restTemplate() { 8 return new RestTemplate(); 12 SpringApplication.run(ConsumerApplication.14 }
添加消费者接口,注意这里 url 是写的服务名称,并不是具体的 ip 地址或端口,在微服务场景下,服务间调用也不可能写死某个具体的地址。
@Autowired private RestTemplate restTemplate; 8 9 @GetMapping("/v1/id"10 getId() { 11 ResponseEntity<String> result = restTemplate.getForEntity("http://demo-producer/v1/uuid",String.); 12 String uuid = result.getBody(); 13 logger.info("request id: {}"14 15 16 }
⑥ 启动注册中心客户端
以两个不同的端口启动 demo-producer,可以通过环境变量的方式制定端口。然后再启动 demo-consumer。
启动完成之后,就可以在注册中心看到注册上来的两个 demo-producer 实例和一个 demo-consumer 实例,并且状态都为 UP。
⑦ 测试接口
调用消费者服务的接口,多次访问 http://dev.lyyzoo.com:8020/v1/id 接口,会发现生产者服务 demo-consumer 两个实例的控制台会交替的输出日志信息。这就说明消费者客户端通过服务名称访问到生产者了。
3、Eureka 基础架构
通过前面的体验,可以发现,服务间调用只需知道某个服务的名称就可以调用这个服务的api了,而不需要指定具体的ip地址和端口,那这是怎么做到的呢?
不难看出,Eureka 的基础架构包含三种角色:
- 服务注册中心:Eureka Server,提供服务注册和发现的功能@H_412_403@
- 服务提供者:Eureka Client,提供服务(本身也可以作为消费者)@H_412_403@
- 服务消费者:Eureka Client,消费服务(本身也可以作为提供者)@H_412_403@
首先需要一个服务注册中心,客户端则向注册中心注册,将自己的信息(比如服务名、服务的 IP 地址和端口信息等)提交给注册中心。客户端向注册中心获取一份服务注册列表的信息,该列表包含了所有向注册中心注册的服务信息。获取服务注册列表信息之后,客户端服务就可以根据服务名找到服务的所有实例,然后通过负载均衡选择其中一个实例,根据其 IP 地址和端口信息,就可以调用服务的API接口了。
这就是注册中心最基础的架构和功能了,提供服务注册和发现,为各个客户端提供服务注册列表信息。但为了完成这些工作,Eureka 有很多的机制来实现以及保证其高可用,如服务注册、服务续约、获取服务注册列表、服务下线、服务剔除等等。Eureka 也提供了很多参数让我们可以根据实际的场景来优化它的一些功能和配置,比如维持心跳的时间、拉取注册表的间隔时间、自我保护机制等等。下面我们就从 eureka 的源码层面来分析下 eureka 的这些功能以及参数,理解其原理,学习它的一些设计。
二、Eureka 源码准备
虽然我们在 pom.xml 中依赖的是 spring-cloud-starter-netflix-eureka-server 和 spring-cloud-starter-netflix-eureka-client,但 spring-cloud-starter-netflix 只是对 eureka 做了封装,使得其可以通过 springboot 的方式来启动和初始化,其底层其实是 netflix 的 eureka-core、eureka-client 等。所以我们先分析 netflix eureka 的源码,最后再看看 spring-cloud-starter-netflix 的源码。
1、源码环境准备
① 下载源码
Netflix Eureka:https://github.com/Netflix/eureka
Spring Cloud Netflix:https://github.com/spring-cloud/spring-cloud-netflix
克隆 eureka 的源码到本地:
$ git clone https:github.com/Netflix/eureka.git
由于我们依赖的是 1.9.25 版本,将代码克隆到本地后,将其切换到 1.9.25:
$ git checkout -b 1.9.25
然后到 eureka 根目录下执行构建的命令:
$ ./gradlew clean build -x test
② IDEA 打开源码
由于 eureka 使用 gradle 管理依赖,所以本地需要先安装 gradle,之后 IDEA 中也需要安装 gradle 的插件,跟 maven 都是类似的,安装教程可自行百度。
2、Eureka 工程结构
Eureka 主要包含如下模块:
- eureka-client:eureka 客户端@H_412_403@
- eureka-core:eureka 服务端,注册中心的核心功能@H_412_403@
- eureka-resources:基于jsp的eureka控制台,可以查看注册了哪些服务实例@H_412_403@
- eureka-server:注册中心,集成了 eureka-client、eureka-core、eureka-resources,因为依赖了 eureka-client,因此 eureka-server 也是一个客户端,在 eureka server 集群模式下,eureka-server 也会作为客户端注册到其它注册中心上@H_412_403@
- eureka-examples:eureka 例子@H_412_403@
- eureka-test-utils:eureka 单元测试工具@H_412_403@
- eureka-core|client-jersey2:对 jersey 框架的封装,jersey 类似于 spring mvc,支持 http restful 请求,eureka-client 和 eureka-server 之间的通信就是基于 jersey 框架来的@H_412_403@
三、Eureka Server 启动初始化
首先要看的是 eureka-server,注册中心启起来之后,客户端才能来注册服务和发现服务。
1、eureka-server 模块
① eureka-server 目录
- resources 目录中主要是 eureka client 和 server 的配置文件@H_412_403@
- webapp 下有一个 web.xml 配置文件,这里面就配置了启动初始化的入口,从这也可以看出,eureka-server 会被打包成 war 包来运行@H_412_403@
- test 下有个单元测试类 EurekaClientServerRestIntegrationTest,这里面就包含了服务注册、续约、下线等单元测试,我们就可以运行这些单元测试来调试代码@H_412_403@
② web.xml
web.xml 的内容:
<?xml version="1.0" encoding="UTF-8"?> web-app version="2.5" 3 xmlns="http://java.sun.com/xml/ns/javaee" 4 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 5 xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" 7 <!-- eureka 启动初始化类 --> 8 listener 9 listener-class>com.netflix.eureka.EurekaBootStrap10 11 12 状态过滤器 13 filter14 filter-name>statusFilter15 filter-class>com.netflix.eureka.StatusFilter16 17 18 认证过滤器 19 20 >requestAuthFilter21 >com.netflix.eureka.ServerRequestAuthFilter22 23 24 限流过滤器 25 26 >rateLimitingFilter27 >com.netflix.eureka.RateLimitingFilter28 29 30 >gzipEncodingEnforcingFilter31 >com.netflix.eureka.GzipEncodingEnforcingFilter32 33 34 jersey 容器 35 36 >jersey37 >com.sun.jersey.spi.container.servlet.ServletContainer38 init-param39 param-name>com.sun.jersey.config.property.WebPageContentRegex40 param-value>/(flex|images|js|css|jsp)/.*41 42 43 >com.sun.jersey.config.property.packages44 >com.sun.jersey;com.netflix45 46 47 GZIP content encoding/decoding 48 49 >com.sun.jersey.spi.container.ContainerRequestFilters50 >com.sun.jersey.api.container.filter.GZIPContentEncodingFilter51 52 53 >com.sun.jersey.spi.container.ContainerResponseFilters54 55 56 57 58 filter-mapping59 60 url-pattern>/*61 62 63 64 65 66 67 68 Uncomment this to enable rate limiter filter. 69 <filter-mapping> 70 <filter-name>rateLimitingFilter</filter-name> 71 <url-pattern>/v2/apps</url-pattern> 72 <url-pattern>/v2/apps/*</url-pattern> 73 </filter-mapping> 74 75 76 77 78 >/v2/apps79 >/v2/apps/*80 81 82 83 84 85 86 87 欢迎页 88 welcome-file-list89 welcome-file>jsp/status.jsp90 91 92 web-app>
web.xml 中可以得知如下信息:
- eureka server 启动时首先通过 com.netflix.eureka.EurekaBootStrap 类来进行启动初始化相关的工作@H_412_403@
- 配置了 StatusFilter(server 状态过滤器)、ServerRequestAuthFilter(认证过滤器)、RateLimitingFilter(限流过滤器) 等过滤器,但 RateLimitingFilter 默认没有启用@H_412_403@
- 配置了 jersey 的 servlet 容器,其实就跟 springframework 的 DispatcherServlet 是类似的,用来拦截处理 http restful 请求,这块我们不用过于关注@H_412_403@
- 最后还配置了 eureka server 的欢迎页为 jsp/status.jsp 页面,这个页面在 eureka-resources 模块下,也就是前面看到的 eureka 控制台页面@H_412_403@
③ 单元测试类 EurekaClientServerRestIntegrationTest
首先看 setUp 方法,每个测试用例运行之前都会先运行 setUp 方法来初始化运行环境。
@BeforeClass void setUp() throws Exception { 3 初始化 eureka 配置 injectEurekaConfiguration(); 5 启动 eureka server,会找 build/libs 目录下的 eureka-server.*.war 包来运行 6 这一步启动时,就会加载 web.xm 配置文件,然后进入 EurekaBootStrap 初始化类 startServer(); 8 eureka server 配置 createEurekaServerConfig(); 创建 jersey 客户端,使用 jersey 客户端来调用资源 12 httpClientFactory = JerseyEurekaHttpClientFactory.newBuilder() 13 .withClientName("testEurekaClient"14 .withConnectionTimeout(100015 .withReadTimeout(100016 .withMaxConnectionsPerHost(117 .withMaxTotalConnections(118 .withConnectionIdleTimeout(100019 .build(); 20 21 jerseyEurekaClient = httpClientFactory.newClient( DefaultEndpoint(eurekaServiceUrl)); 22 23 ServerCodecs serverCodecs = DefaultServerCodecs(eurekaServerConfig); 24 jerseyReplicationClient = JerseyReplicationClient.createReplicationClient( 25 eurekaServerConfig,26 serverCodecs,1)">27 eurekaServiceUrl 28 ); 29 }
这个类提供了如下的一些测试用例,我们可以运行这些测试用例来进行调试。
2、EurekaBootStrap 初始化
EurekaBootStrap 是监听器的入口,实现了 ServletContextListener 接口,主要完成了 eureka server 的启动初始化。
从 contextInitialized 方法进去,整体上来说,分为 eureka 环境初始化和 eureka server 上下文初始化。
@Override contextInitialized(ServletContextEvent event) { try { 4 eureka 环境初始化 initEurekaEnvironment(); 6 eureka server 上下文初始化 initEurekaServerContext(); 9 ServletContext sc = event.getServletContext(); 10 sc.setAttribute(EurekaServerContext..getName(),serverContext); 11 } catch (Throwable e) { 12 logger.error("Cannot bootstrap eureka server :"13 throw new RuntimeException("Cannot bootstrap eureka server :"14 15 }
① eureka环境初始化
initEurekaEnvironment 方法内主要是设置数据中心和运行环境参数:
- archaius.deployment.datacenter = default@H_412_403@
- archaius.deployment.environment = test@H_412_403@
② eureka server 上下文初始化
initEurekaServerContext 上下文初始化则包含了很多阶段:
- 构造 eureka 注册中心配置:EurekaServerConfig@H_412_403@
- 构造 eureka 实例配置:EurekaInstanceConfig@H_412_403@
- 构造实例信息:InstanceInfo@H_412_403@
- 构造实例管理器:ApplicationInfoManager @H_412_403@
- 构造 eureka 客户端配置:EurekaClientConfig@H_412_403@
- 创建 eureka 客户端:EurekaClient(DiscoveryClient)@H_412_403@
- 创建注册表(可以感知eureka集群的注册表):PeerAwareInstanceRegistry@H_412_403@
- 创建集群:PeerEurekaNodes@H_412_403@
- 将信息封装到eureka上下文:EurekaServerContext@H_412_403@
- 将eureka上下文放到一个全局容器中:EurekaServerContextHolder@H_412_403@
- 初始化eureka上下文@H_412_403@
- 同步eureka server的注册表@H_412_403@
- 开启追踪@H_412_403@
- 注册监控统计@H_412_403@
protected void initEurekaServerContext() 2 1、eureka 注册中心配置 3 EurekaServerConfig eurekaServerConfig = DefaultEurekaServerConfig(); For backward compatibility 6 JsonXStream.getInstance().registerConverter( V1AwareInstanceInfoConverter(),XStream.PRIORITY_VERY_HIGH); 7 XmlXStream.getInstance().registerConverter( 9 logger.info("Initializing the eureka client..." logger.info(eurekaServerConfig.getJsonCodecName()); 11 ServerCodecs serverCodecs = 12 13 ApplicationInfoManager applicationInfoManager = null; 14 if (eurekaClient == ) { 16 2、eureka 实例配置 17 EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext()) 18 ? CloudInstanceConfig() 19 : MyDataCenterInstanceConfig(); 21 3、构造 InstanceInfo 实例信息 22 4、构造 ApplicationInfoManager 应用管理器 23 applicationInfoManager = ApplicationInfoManager( 24 instanceConfig, EurekaConfigBasedInstanceInfoProvider(instanceConfig).get()); 25 26 5、eureka 客户端配置 27 EurekaClientConfig eurekaClientConfig = DefaultEurekaClientConfig(); 28 6、构造 EurekaClient,DiscoveryClient 封装了客户端相关的操作 29 eurekaClient = DiscoveryClient(applicationInfoManager,eurekaClientConfig); 30 } else31 applicationInfoManager = eurekaClient.getApplicationInfoManager(); 32 34 PeerAwareInstanceRegistry registry; 35 if (isAws(applicationInfoManager.getInfo())) { 36 registry = AwsInstanceRegistry( 37 eurekaServerConfig,1)">38 eurekaClient.getEurekaClientConfig(),1)">39 serverCodecs,1)">40 eurekaClient 41 ); 42 awsBinder = AwsBinderDelegate(eurekaServerConfig,eurekaClient.getEurekaClientConfig(),registry,applicationInfoManager); 43 awsBinder.start(); 44 } 45 7、构造感知eureka集群的注册表 46 registry = PeerAwareInstanceRegistryImpl( 47 48 49 50 51 52 53 54 8、构造eureka-server集群信息 55 PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes( 56 registry,1)">57 58 eurekaClient.getEurekaClientConfig(),1)">59 60 applicationInfoManager 61 63 9、基于前面构造的对象创建 EurekaServerContext 64 serverContext = DefaultEurekaServerContext( 65 66 67 68 peerEurekaNodes,1)">71 72 将 serverContext 放到 EurekaServerContextHolder 上下文中, 73 这样其它地方都可以通过 EurekaServerContextHolder 拿到 EurekaServerContext 74 EurekaServerContextHolder.initialize(serverContext); 76 10、初始化eureka-server上下文 77 serverContext.initialize(); 78 logger.info("Initialized server context"79 80 11、从相邻的eureka-server同步注册表 81 int registryCount = registry.syncUp(); 82 /12、启动注册表,启动一些定时任务 83 registry.openForTraffic(applicationInfoManager,registryCount); 84 85 /13、注册监控统计 86 EurekaMonitors.registerAllStats(); 87 }
3、面向接口的配置读取
初始化中有三个配置接口,EurekaServerConfig、EurekaInstanceConfig、EurekaClientConfig,分别对应了注册中心、eureka实例、eureka客户端的配置获取。
从它们默认实现类的构造方法进去可以看到,EurekaServerConfig 是读取的 eureka-server.properties 配置文件,命名前缀是 eureka.server;EurekaInstanceConfig、EurekaClientConfig 是读取的 eureka-client.properties 配置文件,命名前缀分别是 eureka.instance、eureka.client。
这里可以看到,eureka 在代码中获取配置的方式是通过接口方法的形式来获取的,在其默认的实现类里通过硬编码的方式定义了配置的编码以及默认值。这种基于接口的配置读取方式是可以借鉴的,这种方式读取配置更易于维护,不用维护一堆常量,如果配置编码变了只需更改实现类即可。
例如下面的配置:
int getExpectedClientRenewalIntervalSeconds() { final int configured = configInstance.getIntProperty( 4 namespace + "expectedClientRenewalIntervalSeconds" 5 30).get(); return configured > 0 ? configured : 30} double getRenewalPercentThreshold() { configInstance.getDoubleProperty( 12 namespace + "renewalPercentThreshold",0.8516 boolean shouldEnableReplicatedRequestCompression() { 17 configInstance.getBooleanProperty( 18 namespace + "enableReplicatedRequestCompression",1)">false19 }
4、基于建造者模式构造服务实例
看 new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get() 这段代码,在 get 方法中完成了服务实例信息的构造。它这里主要用到了建造者设计模式来构建 LeaseInfo 和 InstanceInfo,以 InstanceInfo 为例,它的内部有一个静态的 Builder 类,通过 newBuilder() 方法创建了 InstanceInfo 对象,然后可以调用 Builder 的属性设置方法来设置属性,在设置这些属性的时候,会做一些关联性的校验,在设置完成后,就调用 build() 方法返回对象,也可以在 build 方法中再做一些最终的校验。建造者模式就很适合用于构建这种复杂的对象。
synchronized InstanceInfo get() { if (instanceInfo == 3 续约信息:主要有续约间隔时间(默认30秒)和续约过期时间(默认90秒) 4 LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder() .setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds()) .setDurationInSecs(config.getLeaseExpirationDurationInSeconds()); 7 if (vipAddressResolver == 9 vipAddressResolver = Archaius1VipAddressResolver(); } 12 基于建造者模式来创建 InstanceInfo 13 InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder(vipAddressResolver); 15 set the appropriate id for the InstanceInfo,falling back to datacenter Id if applicable,else hostname 16 String instanceId = config.getInstanceId(); 17 if (instanceId == null || instanceId.isEmpty()) { 18 DataCenterInfo dataCenterInfo = config.getDataCenterInfo(); 19 if (dataCenterInfo instanceof UniqueIdentifier) { 20 instanceId = ((UniqueIdentifier) dataCenterInfo).getId(); 21 } 22 instanceId = config.getHostName(23 } 24 String defaultAddress; 27 if (config RefreshableInstanceConfig) { 28 Refresh AWS data center info,and return up to date address 29 defaultAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(30 } 31 defaultAddress = config.getHostName(34 fail safe 35 if (defaultAddress == defaultAddress.isEmpty()) { 36 defaultAddress = config.getIpAddress(); 38 39 设置属性 builder.setNamespace(config.getNamespace()) .setInstanceId(instanceId) 42 .setAppName(config.getAppname()) .setAppGroupName(config.getAppGroupName()) 44 .setDataCenterInfo(config.getDataCenterInfo()) 45 .setIPAddr(config.getIpAddress()) 46 .setHostName(defaultAddress) .setPort(config.getNonSecurePort()) .enablePort(PortType.UNSECURE,config.isNonSecurePortEnabled()) .setSecurePort(config.getSecurePort()) .enablePort(PortType.SECURE,config.getSecurePortEnabled()) .setVIPAddress(config.getVirtualHostName()) .setSecureVIPAddress(config.getSecureVirtualHostName()) 53 .setHomePageUrl(config.getHomePageUrlPath(),config.getHomePageUrl()) 54 .setStatusPageUrl(config.getStatusPageUrlPath(),config.getStatusPageUrl()) 55 .setASGName(config.getASGName()) .setHealthCheckUrls(config.getHealthCheckUrlPath(),1)"> config.getHealthCheckUrl(),config.getSecureHealthCheckUrl()); 58 59 60 Start off with the STARTING state to avoid traffic 61 if (!config.isInstanceEnabledOnit()) { 62 InstanceStatus initialStatus = InstanceStatus.STARTING; 63 LOG.info("Setting initial instance status as: {}"64 builder.setStatus(initialStatus); 65 } 66 LOG.info("Setting initial instance status as: {}. This may be too early for the instance to advertise " 67 + "itself as available. You would instead want to control this via a healthcheck handler." InstanceStatus.UP); 70 71 Add any user-specific Metadata information 72 for (Map.Entry<String,String> mapEntry : config.getMetadataMap().entrySet()) { 73 String key = mapEntry.getKey(); 74 String value = mapEntry.getValue(); 75 only add the Metadata if the value is present 76 if (value != null && !value.isEmpty()) { builder.add(key,value); 78 79 80 81 调用 build 方法做属性校验并创建 InstanceInfo 实例 82 instanceInfo = builder.build(); instanceInfo.setLeaseInfo(leaseInfoBuilder.build()); 84 instanceInfo; 86 }
LeaseInfo 就是续约信息,可以看到主要的两个配置就是续约间隔时间和多久未续约认为实例过期,实例过期就会被剔除。然后就是基于 config 设置 InstanceInfo,就是实例信息,包含了实例ID、主机名称、端口、LeaseInfo 等等。
5、注册中心构造客户端 DiscoveryClient
在集群模式下,eureka server 也会作为客户端注册到其它注册中心,此时,它本身就是一个 eureka client。因此会去构建 EurekaClient,其默认实现类是 DiscoveryClient。DiscoveryClient 包含了 eureka 客户端的大部分核心功能,比如服务注册、续约、维持心跳、拉取注册表等。
一步步进入到DiscoveryClient最复杂的那个构造方法,我们先整体分析下做了哪些事情,抓大放小,很多组件的细节等后面分析具体功能的时候再来看。
- 将 EurekaClientConfig、EurekaInstanceConfig、EurekaTransportConfig、InstanceInfo、ApplicationInfoManager 等保存到本地变量中@H_412_403@
- 如果要获取注册表,就创建一个注册表状态度量器@H_412_403@
- 如果要注册到注册中心,就创建一个心跳状态度量器@H_412_403@
- 如果不获取注册表且不注册到注册中心,就不会创建调度器、心跳线程池这些了,释放一些资源@H_412_403@
- 如果要注册到注册中心且要抓取注册表,就初始化一些调度的资源:
- 创建了支持调度的线程池,有两个核心线程,从后面可以看出,主要就是处理心跳和缓存刷新的任务@H_412_403@
- 创建了维持心跳的线程池,核心线程数为1,最大线程数配置默认为5@H_412_403@
- 创建了刷新缓存的线程池,核心线程数为1,最大线程数配置默认为5@H_412_403@
- 创建了eureka client 与 eureka server 进行网络通信的组件 EurekaTransport,并进行了一些初始化 。EurekaTransport 里的客户端主要就是封装了对 server 的 api 调用接口,便于调用@H_412_403@
- 接着,如果要抓取注册表,就会抓取注册表了,fetchRegistry 里面可以看到是分为全量抓取和增量抓取的,第一次启动的时候就是全量抓取注册表@H_412_403@
- 开始初始化调度任务:
- 如果要抓取注册表,就创建刷新缓存的任务,并开始调度,默认每隔30秒抓取一次注册表@H_412_403@
- 如果要注册到注册中心,就创建发送心跳的任务,并开始调度,默认每隔30秒发送一次心跳@H_412_403@
- 如果要注册到注册中心,还会创建实例副本传播器(内部也是一个定时调度任务)、实例状态变更的监听器@H_412_403@
1 DiscoveryClient(ApplicationInfoManager applicationInfoManager,EurekaClientConfig config,AbstractDiscoveryClientOptionalArgs args,1)"> 2 Provider<BackupRegistry> backupRegistryProvider,EndpointRandomizer endpointRandomizer) { 3 if (args != 4 this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; 5 this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; 6 this.eventListeners.addAll(args.getEventListeners()); 7 this.preRegistrationHandler = args.preRegistrationHandler; 8 } 9 this.healthCheckCallbackProvider = 10 this.healthCheckHandlerProvider = 11 this.preRegistrationHandler = 12 13 14 将实例信息、配置信息保存到本地 15 this.applicationInfoManager = applicationInfoManager; 16 InstanceInfo myInfo = applicationInfoManager.getInfo(); 17 clientConfig = config; 18 staticClientConfig = clientConfig; 19 transportConfig = config.getTransportConfig(); 20 instanceInfo = myInfo; 21 if (myInfo != 22 appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); 23 } 24 logger.warn("Setting instanceInfo to a passed in null value" 25 26 27 this.backupRegistryProvider = backupRegistryProvider; 28 this.endpointRandomizer = endpointRandomizer; 29 this.urlRandomizer = EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); 30 localRegionApps.set( Applications()); 31 32 33 fetchRegistryGeneration = new AtomicLong(0 34 remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); 35 从远程拉取注册表的地址数组,使用的原子类,在运行中可能会动态更新地址 36 remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); 37 38 如果要获取注册表,就会注册状态监视器 39 (config.shouldFetchRegistry()) { 40 this.registryStalenessMonitor = new ThresholdLevelsMetric(this,METRIC_REGISTRY_PREFIX + "lastUpdateSec_",1)">new long[]{15L,30L,60L,120L,240L,480L}); 41 } 42 this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; 43 44 45 如果要注册到 eureka-server,就会创建心跳状态监视器 46 (config.shouldRegisterWithEureka()) { 47 this.heartbeatStalenessMonitor = 48 } 49 this.heartbeatStalenessMonitor = 50 51 52 logger.info("Initializing Eureka in region {}" 53 54 如果不注册到注册中心,且不拉取注册表,就不创建调度器、线程池等资源了 55 if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { 56 logger.info("Client configured to neither register nor query for data." 57 scheduler = 58 heartbeatExecutor = 59 cacheRefreshExecutor = 60 eurekaTransport = 61 instanceRegionChecker = new InstanceRegionChecker( PropertyBasedAzToRegionMapper(config),1)"> 62 63 This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() 64 to work with DI'd DiscoveryClient 65 DiscoveryManager.getInstance().setDiscoveryClient( 66 DiscoveryManager.getInstance().setEurekaClientConfig(config); 67 68 initTimestampMs = System.currentTimeMillis(); 69 initRegistrySize = .getApplications().size(); 70 registrySize = initRegistrySize; 71 logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}" 72 initTimestampMs,initRegistrySize); 73 74 return; no need to setup up an network tasks and we are done 75 76 77 78 创建定时调度器,默认有2个核心线程,主要处理心跳任务和缓存刷新任务 79 scheduler = Executors.newScheduledThreadPool(2 80 ThreadFactoryBuilder() 81 .setNameFormat("DiscoveryClient-%d" 82 .setDaemon(true 83 .build()); 84 85 维持心跳的线程池,一个核心线程,最大线程数默认5。 86 注意其使用的队列是 SynchronousQueue 队列,这个队列只能放一个任务,一个线程将任务取走后,才能放入下一个任务,否则只能阻塞。 87 heartbeatExecutor = ThreadPoolExecutor( 88 1,clientConfig.getHeartbeatExecutorThreadPoolSize(),0 89 new SynchronousQueue<Runnable>(),1)"> 90 91 .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d" 92 .setDaemon( 93 .build() 94 ); use direct handoff 95 96 刷新缓存的线程池,一个核心线程,最大线程数据默认为5 97 cacheRefreshExecutor = 98 1,clientConfig.getCacheRefreshExecutorThreadPoolSize(),1)"> 99 100 101 .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d"102 .setDaemon(103 104 ); 105 106 eureka http 调用客户端,支持 eureka client 与 eureka server 之间的通信 107 eurekaTransport = EurekaTransport(); 108 初始化 eurekaTransport 109 scheduleServerEndpointTask(eurekaTransport,1)">110 111 AzToRegionMapper azToRegionMapper; 112 (clientConfig.shouldUseDnsForFetchingServiceUrls()) { 113 azToRegionMapper = DNSBasedAzToRegionMapper(clientConfig); 114 } 115 azToRegionMapper = PropertyBasedAzToRegionMapper(clientConfig); 116 117 if (null != remoteRegionsToFetch.get()) { 118 azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",1)">119 120 instanceRegionChecker = InstanceRegionChecker(azToRegionMapper,1)">121 } 122 new RuntimeException("Failed to initialize DiscoveryClient!"123 124 125 (clientConfig.shouldFetchRegistry()) { 126 127 拉取注册表:全量抓取和增量抓取 128 boolean primaryFetchRegistryResult = fetchRegistry(129 primaryFetchRegistryResult) { 130 logger.info("Initial registry fetch from primary servers Failed"131 132 boolean backupFetchRegistryResult = 133 if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) { 134 backupFetchRegistryResult = 135 logger.info("Initial registry fetch from backup servers Failed"136 137 if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) { 138 new IllegalStateException("Fetch registry error at startup. Initial fetch Failed."139 140 } (Throwable th) { 141 logger.error("Fetch registry error at startup: {}"142 IllegalStateException(th); 143 144 145 146 call and execute the pre registration handler before all background tasks (inc registration) is started 147 this.preRegistrationHandler != 148 .preRegistrationHandler.beforeRegistration(); 149 150 151 if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { 152 153 register() ) { 154 new IllegalStateException("Registration error at startup. Invalid server response."155 156 } 157 logger.error("Registration error at startup: {}"158 159 160 161 162 初始化一些调度任务:刷新缓存的调度任务、发送心跳的调度任务、实例副本传播器 163 initScheduledTasks(); 164 165 166 Monitors.registerObject(167 } 168 logger.warn("Cannot register timers"169 170 171 172 173 DiscoveryManager.getInstance().setDiscoveryClient(174 DiscoveryManager.getInstance().setEurekaClientConfig(config); 175 176 初始化的时间 177 initTimestampMs =178 initRegistrySize = 179 registrySize =180 logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}"181 initTimestampMs,1)">182 183 184 //////////////////////////////////////////////////////////////////// 185 186 initScheduledTasks() { 187 188 抓取注册表的间隔时间,默认30秒 189 int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); 190 刷新缓存调度器延迟时间扩大倍数,在任务超时的时候,将扩大延迟时间 191 这在出现网络抖动、eureka-sever 不可用时,可以避免频繁发起无效的调度 192 int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); 193 注册表刷新的定时任务 194 cacheRefreshTask = TimedSupervisorTask( 195 "cacheRefresh"196 scheduler,1)">197 cacheRefreshExecutor,1)">198 registryFetchIntervalSeconds,1)">199 TimeUnit.SECONDS,1)">200 expBackOffBound,1)">201 new CacheRefreshThread() 刷新注册表的任务 202 203 30秒后开始调度刷新注册表的任务 204 scheduler.schedule( 205 cacheRefreshTask,1)">206 207 208 209 (clientConfig.shouldRegisterWithEureka()) { 210 续约间隔时间,默认30秒 211 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); 212 心跳调度器的延迟时间扩大倍数,默认10 213 clientConfig.getHeartbeatExecutorExponentialBackOffBound(); 214 logger.info("Starting heartbeat executor: " + "renew interval is: {}"215 216 心跳的定时任务 217 heartbeatTask = 218 "heartbeat"219 220 heartbeatExecutor,1)">221 renewalIntervalInSecs,1)">222 223 224 HeartbeatThread() 225 226 30秒后开始调度心跳的任务 227 228 heartbeatTask,1)">229 230 231 实例副本传播器,用于定时更新自己状态 232 instanceInfoReplicator = InstanceInfoReplicator( 233 234 instanceInfo,1)">235 clientConfig.getInstanceInfoReplicationIntervalSeconds(),1)">236 2); burstSize 237 238 实例状态变更的监听器 239 statusChangeListener = ApplicationInfoManager.StatusChangeListener() { 240 @Override 241 String getId() { 242 return "statusChangeListener"243 244 245 246 notify(StatusChangeEvent statusChangeEvent) { 247 if (statusChangeEvent.getStatus() == InstanceStatus.DOWN) { 248 logger.error("Saw local status change event {}"249 } 250 logger.info("Saw local status change event {}"251 } 252 instanceInfoReplicator.onDemandUpdate(); 253 254 }; 255 256 向 ApplicationInfoManager 注册监听器 257 (clientConfig.shouldOnDemandUpdateStatusChange()) { 258 applicationInfoManager.registerStatusChangeListener(statusChangeListener); 259 260 261 启动副本传播器,默认延迟时间40秒 262 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); 263 } 264 logger.info("Not registering with Eureka server per configuration"265 266 }
6、定时任务监管器的设计
可以看到,eureka client 为了定时发送心跳以及定时抓取注册表,使用了定时任务和调度器,我觉得它这里的定时调度的设计思想是可以参考和借鉴的。
以心跳任务的这段代码为例:
8 cacheRefreshTask = 9 "cacheRefresh" scheduler,1)"> cacheRefreshExecutor,1)">12 registryFetchIntervalSeconds,1)"> TimeUnit.SECONDS,1)"> expBackOffBound,1)">15 18 scheduler.schedule( cacheRefreshTask,1)">20 21 }
上面这段代码其实并不复杂,主要就是创建了一个定时任务,然后使用调度器在一定的延迟之后开始调度。但它这里并不是直接使用调度器调度任务(CacheRefreshThread),也不是以一个固定的频率调度(每隔30秒)。它定义了一个任务的监管器 TimedSupervisorTask,在创建这个监管器的时候,传入了调度器、要执行的任务、以及间隔时间等参数,然后调度器调度 TimedSupervisorTask。
看 TimedSupervisorTask 的构造方法,主要有以下几个点:
- 任务的超时时间等于间隔时间,也就是默认30秒的超时时间,然后延迟时间默认等于超时时间 如果 eureka server down 了,或者网络问题,就有可能出现超时@H_412_403@
- 设置了最大的延迟时间,默认在超时时间的基础上扩大10倍,即300秒@H_412_403@
- 最后创建了一些计数器,分别统计成功、超时、拒绝、异常的次数,可以看到,它这里对任务的调度是有做统计的@H_412_403@
TimedSupervisorTask(String name,ScheduledExecutorService scheduler,ThreadPoolExecutor executor,1)"> 2 int timeout,TimeUnit timeUnit,1)"> expBackOffBound,Runnable task) { this.name = name; this.scheduler = scheduler; this.executor = executor; 任务超时时间就等于任务调度的间隔时间 this.timeoutMillis = timeUnit.toMillis(timeout); this.task = task; 延迟时间默认为超时时间 this.delay = AtomicLong(timeoutMillis); 最大延迟时间,默认在超时时间的基础上扩大10倍 12 this.maxDelay = timeoutMillis * expBackOffBound; 13 初始化计数器并注册 15 successCounter = Monitors.newCounter("success"16 timeoutCounter = Monitors.newCounter("timeouts"17 rejectedCounter = Monitors.newCounter("rejectedExecutions"18 throwableCounter = Monitors.newCounter("throwables"19 threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build()); 20 Monitors.registerObject(name,1)">21 }
再看 TimedSupervisorTask 的 run 方法:
- 1)首先将任务异步提交到线程池去执行,它这里并不是直接运行任务,而是异步提交到线程池中,这样可以实现超时等待,不影响主任务@H_412_403@
- 2)任务如果超时,比如出现网络延迟、eureka server 不可用等情况,超时了,它这个时候就会认为如果还是30秒后调度,可能 eureka server 还是不可用的状态,那么就增大延迟时间,那么第一次超时就会在300秒后再调度。如果300秒内 eureka server 可用了,然后有新的服务实例注册上去了,那这个客户端就不能及时感知到了,因此我觉得可以将 getCacheRefreshExecutorExponentialBackOffBound 对应的参数适当设置小一点(默认10倍)。@H_412_403@
- 3)如果任务没有超时,在调度成功后,就会重置延迟时间为默认的超时时间。最后在 finally 中进行下一次的调度。@H_412_403@
run() { 2 Future<?> future = 提交任务到线程池 5 future = executor.submit(task); 6 threadPoolLevelGauge.set((long) executor.getActiveCount()); 7 阻塞直到任务完成或超时 future.get(timeoutMillis,TimeUnit.MILLISECONDS); 9 任务完成后,重置延迟时间为超时时间,即30秒 delay.set(timeoutMillis); 11 threadPoolLevelGauge.set(( 成功次数+1 successCounter.increment(); 14 } (TimeoutException e) { 15 logger.warn("task supervisor timed out" 超时次数+1 17 timeoutCounter.increment(); 18 19 如果任务超时了,就会增大延迟时间,当前延迟时间*2,然后取一个最大值 20 long currentDelay = delay.get(); long newDelay = Math.min(maxDelay,currentDelay * 2 设置为最大的一个延迟时间 delay.compareAndSet(currentDelay,newDelay); 24 25 } (RejectedExecutionException e) { if (executor.isShutdown() || scheduler.isShutdown()) { 27 logger.warn("task supervisor shutting down,reject the task"28 } 29 logger.warn("task supervisor rejected the task"30 31 rejectedCounter.increment(); 33 } 35 logger.warn("task supervisor shutting down,can't accept the task"36 } 37 logger.warn("task supervisor threw an exception"39 throwableCounter.increment(); 41 } finally42 if (future != 43 future.cancel(45 46 scheduler.isShutdown()) { 47 延迟 delay 时间后,继续调度任务 48 scheduler.schedule(51 }
总结一下这块设计:
- 1)首先在远程调用的时候要考虑到网络不可用、server 端 down 了等情况导致调用超时,可以使用线程池异步提交任务,实现等待超时机制。@H_412_403@
- 2)超时之后,可以假想服务恢复可用状态可能需要一定的时间,如果还是按原来的时间间隔调度,可能还是会超时,因此增大延迟时间。如果调用成功,说明已经恢复了,则重置延迟时间。@H_412_403@
- 3)定时任务的调度以一定的延迟时间来循环调度(schedule),延迟时间可以根据实际情况变化,而不是一开始就按一个固定的频率来调度(scheduleAtFixedRate)。@H_412_403@
- 4)定时任务、线程池里的任务,最好做好任务执行状态的统计,便于观察任务的调度情况。@H_412_403@
7、构造注册表
接着构造 PeerAwareInstanceRegistry,从命名来看,这是一个可以感知 eureka 集群的注册表,就是在集群模式下,eureka server 从其它 server 节点拉取注册表。它的默认实现类是 PeerAwareInstanceRegistryImpl,继承自 AbstractInstanceRegistry,就是实例注册表。
① 构造 PeerAwareInstanceRegistry
进入 PeerAwareInstanceRegistryImpl 的构造方法:
- 首先是将前面构造的 EurekaServerConfig、EurekaClientConfig、EurekaClient 等传入构造方法来构造 PeerAwareInstanceRegistry@H_412_403@
- 调用了 super 的构造方法,主要初始化了如下几个东西:
- 保存最近下线实例的循环队列@H_412_403@
- 保存最近注册实例的循环队列@H_412_403@
- 最近一分钟续约次数的计数器@H_412_403@
- 定时任务剔除 recentlyChangedQueue 中的实例@H_412_403@
- 然后创建了一个最近一分钟集群同步次数的计数器 numberOfReplicationsLastMin。MeasuredRate 我们到后面再来分析它的设计。@H_412_403@
EurekaServerConfig serverConfig,1)"> EurekaClientConfig clientConfig,1)"> ServerCodecs serverCodecs,1)"> EurekaClient eurekaClient super(serverConfig,clientConfig,serverCodecs); this.eurekaClient = eurekaClient; 最近一分钟集群同步的次数计数器 this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1 We first check if the instance is STARTING or DOWN,then we check explicit overrides,1)"> then we check the status of a potentially existing lease. 13 this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule( DownOrStartingRule(),1)">14 new OverrideExistsRule(overriddenInstanceStatusMap),1)"> LeaseExistsRule()); 16 /////////////////////////////////////////////// 19 21 protected AbstractInstanceRegistry(EurekaServerConfig serverConfig,EurekaClientConfig clientConfig,ServerCodecs serverCodecs) { 22 this.serverConfig = serverConfig; 23 this.clientConfig =24 this.serverCodecs = serverCodecs; 25 最近下线的循环队列 this.recentCanceledQueue = new CircularQueue<Pair<Long,String>>(1000 最近注册的循环队列 28 this.recentRegisteredQueue = 29 最近一分钟续约的计数器 this.renewsLastMin = 32 一个定时调度任务,定时剔除最近改变队列中过期的实例 33 .deltaRetentionTimer.schedule(getDeltaRetentionTask(),1)"> serverConfig.getDeltaRetentionTimerIntervalInMs(),1)">35 serverConfig.getDeltaRetentionTimerIntervalInMs()); 36 }
这块的具体细节等后面分析具体功能的时候再来看,我们先知道有这些队列、计数器就行了。
② 循环队列 CircularQueue 的设计
从构造方法可以看到,它使用了循环队列来保存最近下线和最近注册的实例信息,容量固定为1000,这样就把最近的实例数量控制在1000以内。
CircularQueue 是它自定义的一个循环队列,继承自 AbstractQueue。其内部其实就是代理了 ArrayBlockingQueue,然后重写了入队的 offer 方法,当队列满了,就取出头部的一个元素,然后再放到队列尾部。
class CircularQueue<E> extends AbstractQueue<E>final ArrayBlockingQueue<E> delegate; capacity; public CircularQueue( capacity) { this.capacity =this.delegate = new ArrayBlockingQueue<>(capacity); 9 @Override public Iterator<E> iterator() { delegate.iterator(); 16 size() { delegate.size(); offer(E e) { 如果队列满了,就取出头部的一个元素,然后再放到尾部 23 while (!delegate.offer(e)) { delegate.poll(); 28 29 E poll() { 31 delegate.poll(); E peek() { 36 delegate.peek(); 38 }
8、创建 Eureka Server 上下文并初始化
接下来先是创建了 PeerEurekaNodes,应该就是代表 eureka 集群的。然后基于前面创建的一些东西创建 eureka server 上下文 EurekaServerContext,从 DefaultEurekaServerContext 构造方法进去可以看到,只是将前面构造的东西封装起来,便于全局使用。然后将 serverContext 放到 EurekaServerContextHolder 中,这样其它地方就可以通过这个 holder 获取 serverContext 了。
接着就是初始化eureka server上下文:
- 启动 eureka 集群:
- 主要是启动一个定时任务(间隔时间默认10分钟)更新 eureka 集群节点的信息,根据配置的 eureka server 地址更新 PeerEurekaNode,这样当有 eureka server 下线或上线后,就可以及时感知到其它 server 节点。PeerEurekaNode 主要就是用于集群节点间的数据同步,这块后面分析集群的时候再具体分析。@H_412_403@
- 注册表初始化:
- 首先启动了前面创建的计数器:numberOfReplicationsLastMin@H_412_403@
- 初始化响应缓存,eureka server 构造了一个多级缓存来响应客户端抓取注册表的请求,这个多级缓存的设计就是响应频繁抓取注册表请求的核心所在,等后面分析客户端抓取注册表的时候再具体分析@H_412_403@
- 定时调度任务更新续约阈值,主要就是更新 numberOfRenewsPerMinThreshold 这个值,即每分钟续约次数,等分析续约的时候再来分析@H_412_403@
- 初始化 RemoteRegionRegistry,猜测是跟 eureka 多个区域(region)部署有关的@H_412_403@
initialize() { 2 logger.info("Initializing ..." 启动eureka集群 peerEurekaNodes.start(); 注册表初始化 registry.init(peerEurekaNodes); 8 } (Exception e) { RuntimeException(e); 11 logger.info("Initialized"12 }
PeerEurekaNodes 的 start 方法:
start() { 单个线程的线程池 3 taskExecutor = Executors.newSingleThreadScheduledExecutor( 4 ThreadFactory() { @Override 6 Thread newThread(Runnable r) { 7 Thread thread = new Thread(r,"Eureka-PeerNodesUpdater" 8 thread.setDaemon( 9 thread; 根据集群地址更新 PeerEurekaNode,PeerEurekaNode 就包含了调度其它注册中心的客户端 updatePeerEurekaNodes(resolvePeerUrls()); 16 Runnable peersUpdateTask = Runnable() { 18 19 updatePeerEurekaNodes(resolvePeerUrls()); 21 } 22 logger.error("Cannot update the replica Nodes" 定时跟新集群信息 PeerEurekaNode,如果有eureka-server不可用了,就可以及时下线,或者新上线了eureka-server,可以及时感知到 taskExecutor.scheduleWithFixedDelay( peersUpdateTask,1)"> serverConfig.getPeerEurekaNodesUpdateIntervalMs(),1)">31 TimeUnit.MILLISECONDS 33 34 } IllegalStateException(e); 36 for (PeerEurekaNode node : peerEurekaNodes) { 38 logger.info("Replica node URL: {}"40 }
PeerAwareInstanceRegistryImpl 的 init 方法:
void init(PeerEurekaNodes peerEurekaNodes) 启动计数器 .numberOfReplicationsLastMin.start(); this.peerEurekaNodes = peerEurekaNodes; 初始化响应缓存,eureka server 构造了一个多级缓存来响应客户端抓取注册表的请求 initializedResponseCache(); 定时调度任务更新续约阀值,主要就是更新 numberOfRenewsPerMinThreshold 这个值,即每分钟续约次数 scheduleRenewalThresholdUpdateTask(); 初始化 RemoteRegionRegistry initRemoteRegionRegistry(); 13 Monitors.registerObject(15 logger.warn("Cannot register the JMX monitor for the InstanceRegistry :"17 }
9、完成 Eureka Server 初始化
接下来看最后几步:
- 首先调用 registry.syncUp() 将 EurekaClient 本地的实例同步到注册表,在集群模式下,eureka server 也是一个客户端,因此会获取到其它注册中心的注册表同步到当前 server 的注册表中。它默认会重试5次,每次间隔30秒。在单机模式下,应该将重试次数设置为 0。@H_412_403@
- 然后调用 registry.openForTraffic 做最后的一些初始化:
- 更新每分钟续约阈值@H_412_403@
- 设置实例状态@H_412_403@
- 启动统计最近一分钟续约次数的计数器@H_412_403@
- 启动定时任务剔除下线的实例,定时任务默认每隔60秒调度一次@H_412_403@
- 最后一步就是注册 eureka 自身的一些监控统计@H_412_403@
syncUp 方法:
syncUp() { Copy entire entry from neighboring DS node int count = 0 注册表同步重试次数,默认5次 for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++if (i > 0 8 9 同步重试时间,默认30秒 Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); 11 } (InterruptedException e) { 12 logger.warn("Interrupted during registry transfer.."13 break16 Applications apps = eurekaClient.getApplications(); (Application app : apps.getRegisteredApplications()) { (InstanceInfo instance : app.getInstances()) { 20 (isRegisterable(instance)) { 21 注册实例 22 register(instance,instance.getLeaseInfo().getDurationInSecs(),1)">23 count++ } 25 } (Throwable t) { 26 logger.error("During DS init copy" count; 32 }
openForTraffic 方法:
void openForTraffic(ApplicationInfoManager applicationInfoManager,1)"> count) { 期望的客户端每分钟的续约次数 this.expectedNumberOfClientsSendingRenews = 更新每分钟续约阀值 updateRenewsPerMinThreshold(); 7 logger.info("Got {} instances from neighboring DS node" 8 logger.info("Renew threshold is: {}"this.startupTime =if (count > 011 this.peerInstancesTransferEmptyOnStartup = 13 DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { 16 logger.info("Priming AWS connections for all replicas.." primeAwsReplicas(applicationInfoManager); 19 logger.info("Changing status to UP" 设置实例状态为已启动 applicationInfoManager.setInstanceStatus(InstanceStatus.UP); .postInit(); //////////////////////////////////////26 postInit() { 启动 统计最近一分钟续约次数的计数器 renewsLastMin.start(); if (evictionTaskRef.get() != evictionTaskRef.get().cancel(); 定时剔除任务 34 evictionTaskRef.set( EvictionTask()); evictionTimer.schedule(evictionTaskRef.get(),1)"> serverConfig.getEvictionIntervalTimerInMs(),1)"> serverConfig.getEvictionIntervalTimerInMs()); 38 }
10、Eureka Server 启动流程图
下面通过一张图来展示下 eureka server 的启动初始化流程。
四、Eureka Client 启动初始化
eureka client 的启动初始化我们看 eureka-examples 模块下的 ExampleEurekaClient 这个类,它的 main 方法中就模拟了作为一个 eureka client 启动初始化,并向注册中心发送请求。
eureka server 的初始化中其实已经包含了客户端的初始化,可以看出,客户端的初始化主要有如下的一些东西:
- 读取 eureka-client.properties 配置文件,创建 EurekaInstanceConfig@H_412_403@
- 基于 InstanceConfig 创建实例信息 InstanceInfo@H_412_403@
- 基于 InstanceConfig 和 InstanceInfo 创建应用实例管理器 ApplicationInfoManager@H_412_403@
- 读取 eureka-client.properties 配置文件,创建 EurekaClientConfig@H_412_403@
- 基于应用实例管理器和 clientConfig 创建 EurekaClient(DiscoveryClient),初始化流程跟 eureka server 初始化流程中 DiscoveryClient 的创建是一样的@H_412_403@
2 ExampleEurekaClient sampleClient = ExampleEurekaClient(); 基于实例配置和实例信息创建应用实例管理器 5 ApplicationInfoManager applicationInfoManager = initializeApplicationInfoManager( MyDataCenterInstanceConfig()); 基于应用实例管理器和客户端配置创建 EurekaClient(DiscoveryClient) 7 EurekaClient client = initializeEurekaClient(applicationInfoManager,1)"> DefaultEurekaClientConfig()); use the client sampleClient.sendRequestToServiceUsingEureka(client); shutdown the client eurekaClient.shutdown(); 14 }