标准 专业
多元 极客

Spring Cloud Alibaba研究院(1)——Nacos——服务注册

我们在使用Eureka时,通常会使用如下注解:

“`java
@EnableDiscoveryClient
@EnableEurekaClient
“`

> **@EnableEurekaClient**是Netfix单独为Eureka的Client端开发的注解,位于**spring-cloud-netflix-eureka-client**包中

> **@EnableDiscoveryClient**是Spring Cloud的一个原生注解,位于**spring-cloud-commons**包中,可以视为一种规范。

接下来我们将分析**Spring Cloud Alibaba**套件是如何进行服务发现的。

> **Spring Cloud Alibaba**将在本篇文章中以**SCA**代替

## 场景搭建

### 启动器

“`Java
@EnableDiscoveryClient
@SpringBootApplication
public class NacosClientThreeBoostrap {

public static void main(String[] args) {
SpringApplication.run(NacosClientThreeBoostrap.class);
}
}
“`

**application.yml**:

“`yaml
server:
port: 40010
spring:
application:
name: sunshine-taurus
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
management:
endpoints:
web:
exposure:
include: \*
“`

**maven依赖**:

“`xml


org.springframework.boot
spring-boot-starter-web


org.springframework.cloud
spring-cloud-starter-alibaba-nacos-discovery


org.springframework.boot
spring-boot-starter-actuator


“`

## @EnableDiscoveryClient注解去向分析

SCA对于这个注解执行的非常的简单粗暴:

“`properties
org.springframework.cloud.client.discovery.EnableDiscoveryClient=\
org.springframework.cloud.alibaba.nacos.NacosDiscoveryClientAutoConfiguration
“`

直接指到了核心实现类:

“`Java
@Configuration
@ConditionalOnMissingBean(DiscoveryClient.class)
@ConditionalOnNacosDiscoveryEnabled
@EnableConfigurationProperties
public class NacosDiscoveryClientAutoConfiguration {

@Bean
public DiscoveryClient nacosDiscoveryClient() {
// 实例化NacosDiscoveryClient
return new NacosDiscoveryClient();
}

@Bean
@ConditionalOnMissingBean
public NacosDiscoveryProperties nacosProperties() {
// 实例化NacosDiscoveryProperties
return new NacosDiscoveryProperties();
}
}
“`

链路解释:

1. **实例化NacosDiscoveryClient**,也就是实例化一个我们上报到Nacos服务发现的客户端实例。
2. **实例化NacosDiscoveryClient属性Bean**,实例化NacosDiscoveryClient会需要。

### Bean: NacosDiscoveryProperties

实例化的核心:

“`Java
public void init() throws SocketException {

serverAddr = Objects.toString(serverAddr, “”);
endpoint = Objects.toString(endpoint, “”);
namespace = Objects.toString(namespace, “”);
logName = Objects.toString(logName, “”);

if (StringUtils.isEmpty(ip)) {
// 探测本机的IP地址
if (StringUtils.isEmpty(networkInterface)) {
ip = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
}
else {
NetworkInterface netInterface = NetworkInterface
.getByName(networkInterface);
if (null == networkInterface) {
throw new IllegalArgumentException(
“no such interface ” + networkInterface);
}

Enumeration inetAddress = netInterface.getInetAddresses();
while (inetAddress.hasMoreElements()) {
InetAddress currentAddress = inetAddress.nextElement();
if (currentAddress instanceof Inet4Address
&& !currentAddress.isLoopbackAddress()) {
ip = currentAddress.getHostAddress();
break;
}
}

if (StringUtils.isEmpty(ip)) {
throw new RuntimeException(“cannot find available ip from”
+ ” network interface ” + networkInterface);
}

}
}
// 重写本地变量
this.overrideFromEnv(environment);
}
“`

链路解释:

1. **探测本机的IP地址**,找到Inet4Address及第一个非本地还回接口地址即可。
2. **重写本地变量**。

继续看下**NacosDiscoveryProperties#overridFromEnv()**方法:

“`Java
public void overrideFromEnv(Environment env) {

if (StringUtils.isEmpty(this.getServerAddr())) {
this.setServerAddr(env
.resolvePlaceholders(“${spring.cloud.nacos.discovery.server-addr:}”));
}
if (StringUtils.isEmpty(this.getNamespace())) {
this.setNamespace(env
.resolvePlaceholders(“${spring.cloud.nacos.discovery.namespace:}”));
}
if (StringUtils.isEmpty(this.getAccessKey())) {
this.setAccessKey(env
.resolvePlaceholders(“${spring.cloud.nacos.discovery.access-key:}”));
}
if (StringUtils.isEmpty(this.getSecretKey())) {
this.setSecretKey(env
.resolvePlaceholders(“${spring.cloud.nacos.discovery.secret-key:}”));
}
if (StringUtils.isEmpty(this.getLogName())) {
this.setLogName(
env.resolvePlaceholders(“${spring.cloud.nacos.discovery.log-name:}”));
}
if (StringUtils.isEmpty(this.getClusterName())) {
this.setClusterName(env.resolvePlaceholders(
“${spring.cloud.nacos.discovery.clusterName-name:}”));
}
if (StringUtils.isEmpty(this.getEndpoint())) {
this.setEndpoint(
env.resolvePlaceholders(“${spring.cloud.nacos.discovery.endpoint:}”));
}
}
“`

核心思想是从Environment中获取这些必须值并赋值。

### Bean: NacosDiscoveryClient

根据**@EnableDiscoveryClient**的语义,想要实现一个客户端的服务发现,需要实现**DiscoveryClient**接口:

“`Java
public interface DiscoveryClient {

/**
* 一个可读的实现描述,用于健康检查
* @return the description
*/
String description();

/**
* 获取服务serviceId下所有服务实例
* @param serviceId the serviceId to query
* @return a List of ServiceInstance
*/
List getInstances(String serviceId);

/**
* 返回所有的服务serviceId
* @return all known service ids
*/
List getServices();

}
“`

**NacosDiscoveryClient**也实现了它的接口,我们先看下对方法的具体实现。

**NacosDiscoveryClient#getInstantces()**:

“`Java
@Override
public List getInstances(String serviceId) {
try {
// 获取该服务serviceId下的实例信息
List instances = discoveryProperties.namingServiceInstance()
.selectInstances(serviceId, true);
// 封装为ServiceInstance实例
return hostToServiceInstanceList(instances, serviceId);
}

}
“`

链路分析:

1. **获取该服务serviceId下的实例信息**。
2. **封装为ServiceInstance实例**。

调用**NacosDiscoveryProperties#namingServiceInstance()**时:

“`Java
public NamingService namingServiceInstance() {

if (null != namingService) {
return namingService;
}

Properties properties = new Properties();
properties.put(SERVER_ADDR, serverAddr);
properties.put(NAMESPACE, namespace);
properties.put(UtilAndComs.NACOS_NAMING_LOG_NAME, logName);
properties.put(ENDPOINT, endpoint);
properties.put(ACCESS_KEY, accessKey);
properties.put(SECRET_KEY, secretKey);
properties.put(CLUSTER_NAME, clusterName);
properties.put(NAMING_LOAD_CACHE_AT_START, namingLoadCacheAtStart);

try {
// 根据服务信息构建命名服务
namingService = NacosFactory.createNamingService(properties);
return namingService;
}
catch (Exception e) {
LOGGER.error(“create naming service error!properties={},e=,”, this, e);
return null;
}
}
“`

SCA会根据我们在**application.yml**中使用的服务信息,构建当前服务的命名服务。

然后通过反射的形式,通过构造方法,实例化一个**NamingService**,**NacosNamingService#NacosNamingService()**:

“`Java
public NacosNamingService(Properties properties) {
Properties properties = new Properties();
// 设置Nacos服务节点集群属性
properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
// 初始化NacosNamingService
init(properties);
}

private void init(Properties properties) {
// 解析当前service的namespace
namespace = InitUtils.initNamespaceForNaming(properties);
// 初始化Nacos服务节点的域名地址
initServerAddr(properties);
InitUtils.initWebRootContext();
// NamingService的文件缓存地址
initCacheDir();
// 初始化日志对象
initLogName(properties);
// 创建事件驱动器
eventDispatcher = new EventDispatcher();
// 创建一个命名服务的代理
serverProxy = new NamingProxy(namespace, endpoint, serverList, properties);
// 利用命名服务的代理创建对应的心跳响应
beatReactor = new BeatReactor(serverProxy, initClientBeatThreadCount(properties));
// 创建一个Host响应
hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir, isLoadCacheAtStart(properties), initPollingThreadCount(properties));
}
“`

链路分析:

1. **初始化命名服务**,目的在于设置一些参数的初始值。
2. **赋值属性**,根据服务本身的启动参数,对命名服务相关参数进行设置。
3. **创建事件驱动器**,事件驱动器功能是进行通知和监听事件。
4. **创建命名服务代理**,目的在于创建和Nacos服务端建立HTTP连接的代理。
5. **创建心跳响应**,用于建立和Nacos服务端所有节点的心跳连接,心跳连接的建立依托于步骤4建立的服务代理。
6. **创建一个Host响应**,用于发起和Nacos服务端节点的注册,解绑等请求,请求的建立同样依托于步骤4建立的服务代理。

我们可以看下故障转移。

### 故障转移

故障转移其实只有两个私有的成员变量:**failoverDir**和**hostReactor**。

* failoverDir是故障转移的日志记录。
* hostReactor就是我们在上文创建的发起请求的响应式处理器。

“`java
public void init() {
// 用于切换刷新写入文件的任务调度
executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);
// 用于每日统计的写文件的任务调度
executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);

// 后备目录和日志文件的任务调度
}
“`

链路分析:

1. 根据上一次变更的时间戳来判断是否需要写入当次刷新的日志文件。
2. 写入每天文件到磁盘中。
3. 如果当前目录不存在,创建后备日志文件和目录。

> 我们是从**NacosDiscoveryClient#getInstances()**方法到达这里的。

接下来我们继续看**NacosNamingService#selectInstances()**方法:

“`Java
@Override
public List selectInstances(String serviceName, String groupName, List clusters, boolean healthy, boolean subscribe) throws NacosException {

ServiceInfo serviceInfo;
if (subscribe) {
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, “,”));
} else {
serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, “,”));
}
return selectInstances(serviceInfo, healthy);
}
“`

这里会存在一个订阅的概念,订阅的概念其实是一个异步模型,即我们每次去自己的缓存中获取指定服务列表,而服务列表的更新,则是依靠任务调度完成的。

我们发起的请求,一般会以订阅的方式进行。

而上面的实现则是根据订阅方式,决定是从缓存中直接取出指定service的服务列表,还是去Nacos服务端获取对应service的服务列表。

接下来我们继续看**NacosNamingService#selectInstances()**方法:

“`java
private List selectInstances(ServiceInfo serviceInfo, boolean healthy) {
List list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList();
}

Iterator iterator = list.iterator();
// 判断服务实例的状态
while (iterator.hasNext()) {
Instance instance = iterator.next();
if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) {
iterator.remove();
}
}

return list;
}
“`

接下来就是判断服务实例的状态,移除处于非健康,不可用,没有分配权重的服务实例。

> **NacosDiscoverClient**我们分析完成了,但是感觉是不是跑题了,因为我们看到的是怎么获取实例,但是我们是什么时候注册上的呢?

这其实就是一个思想误区,认为DiscoveryClient既要完成服务注册的功能,还要进行服务实例获取。

接下来我将简略的讲一下,Spring Cloud处理的核心体系。

## Spring Cloud服务注册的体系

在容器初始化时,会有**onRefresh()**和**finishRefresh()**两个方法。

* **onRefresh()**是对容器进行初始化。
* **finishRefresh()**是结束容器化,比如开始WebServer,发布ApplicationEvent操作等。

注册就是通过发布ApplicationEvent进行的。

那么消费的就是AbstractAutoServiceRegistration类,**AbstractAutoServiceRegistration#bind()**:

“`Java
@EventListener(WebServerInitializedEvent.class)
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
// 如果是服务端,不进行注册
if (context instanceof ConfigurableWebServerApplicationContext) {
if (“management”.equals(
((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
return;
}
}
// 设置端口
this.port.compareAndSet(0, event.getWebServer().getPort());
// 启动服务注册
this.start();
}
“`

链路梳理:

1. 如果是服务端,不进行注册。
2. 设置端口,port端口比较重要,避免并发情况下对port写错。
3. 进行服务注册。

继续看下服务注册的步骤,**AbstractAutoServiceRegistration#start()**:

“`Java
public void start() {
// 校验服务发现状态
if (!isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug(“Discovery Lifecycle disabled. Not starting”);
}
return;
}

// 没有注册,才会注册
if (!this.running.get()) {
// 获取注册信息,并发布注册前事件
this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
// 注册
register();
// 是否应该注册服务者
if (shouldRegisterManagement()) {
registerManagement();
}
// 获取注册信息,并发布注册后事件
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
// 注册状态更改为已注册。
this.running.compareAndSet(false, true);
}

}
“`

链路分析:

1. 校验服务发现状态。
2. 没有注册,才会进行注册。
3. 获取注册信息,并发布注册前事件。
4. 注册。
5. 是否应该注册服务则。
6. 获取注册信息,并发布注册后事件。
7. 注册状态更改为已注册。

从类名分析,我们不难发现,**AbstractAutoServiceRegistration**应该是一个超类,我们之前分析的是抽象模型,那么调用register的时候会先走派生类,**NacosAutoServiceRegistration#register()**:

“`Java
@Override
protected void register() {
// 根据当前服务发现的属性判断是否可以注册
if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
LOGGER.debug(“Registration disabled.”);
return;
}
// 如果注册器里面的端口是不正常的,设置端口号
if (this.registration.getPort() < 0) {
this.registration.setPort(getPort().get());
}
// 调用超类的注册接口
super.register();
}
“`

链路分析:

1. 根据当前服务发现的属性判断是否可以注册。
2. 如果注册器里面的端口是不正常的,设置端口。
3. 调用超类的注册接口。

继续,:

“`Java
protected void register() {
this.serviceRegistry.register(getRegistration());
}
“`

链路分析:

1. 进行服务注册。

我们看到服务注册对象ServiceRegistry是个接口,那么继续看实现,**NacosServiceRegistry#register()**:

“`java
@Override
public void register(NacosRegistration registration) {
// 再次判断是否可注册
if (!registration.isRegisterEnabled()) {
logger.info(“Nacos Registration is disabled…”);
return;
}
// 如果没有服务ServiceId可供注册,不注册
if (StringUtils.isEmpty(registration.getServiceId())) {
logger.info(“No service to register for nacos client…”);
return;
}
// 获取命名服务
NamingService namingService = registration.getNacosNamingService();
// 获取服务ServiceId
String serviceId = registration.getServiceId();

// 创建注册实例
Instance instance = new Instance();
instance.setIp(registration.getHost());
instance.setPort(registration.getPort());
instance.setWeight(registration.getRegisterWeight());
instance.setCluster(new Cluster(registration.getCluster()));
instance.setMetadata(registration.getMetadata());

try {
// 进行服务注册
namingService.registerInstance(serviceId, instance);
logger.info(“nacos registry, {} {}:{} register finished”, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {

}
}
“`

链路分析:

1. 再次判断当前服务是否可注册。
2. 如果没有服务ServiceId,就不需要注册。
3. 获取命名服务,命名服务在实例化EnableDiscoveryClient的时创建。
4. 获取服务ServiceId。
5. 创建注册实例Instance对象。
6. 进行服务注册。

看到**NacosNamingService**,从前面的链路分析来看,我们接下来应该是就是使用HostReactor进行注册了,

**NacosNamingService#registerInstance()**:

“`Java
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
// 如果是临时节点,也就是我们需要注册的服务节点,则需要客户端自己负责心跳上报
if (instance.isEphemeral()) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
// 添加心跳任务
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
// 通过NamingProxy向Nacos服务端发起注册请求
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
“`

链路分析:

1. 如果注册的是我们的服务节点,则节点类型是临时节点,需要客户端自己负责心跳上报,心跳上报是由任务调度器来完成的。
2. 通过NamingProxy向Nacos服务端发起服务注册请求。

接下来,**NamingProxy#registerService()**:

“`Java
public void registerService(String serviceName, Instance instance) throws NacosException {
NAMING_LOGGER.info(“[REGISTER-SERVICE] {} registering service {} with instance: {}”,
namespaceId, serviceName, instance);
// 服务注册请求参数
final Map params = new HashMap(9);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put(“ip”, instance.getIp());
params.put(“port”, String.valueOf(instance.getPort()));
params.put(“weight”, String.valueOf(instance.getWeight()));
params.put(“enable”, String.valueOf(instance.isEnabled()));
params.put(“healthy”, String.valueOf(instance.isHealthy()));
params.put(“ephemeral”, String.valueOf(instance.isEphemeral()));
params.put(“metadata”, JSON.toJSONString(instance.getMetadata()));
// 发起HTTP请求,请求方法:/nacos/v1/ns/instance
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
}
“`

链路分析:

1. 将Instance实例对象转化为HashMap参数对象,调用HttpClient的请求方法,进行注册。由于是注册行为,所以Http请求方法为PUT。

这样,我们的服务发现的实例化与注册流程基本打通了。

> 聪明的你们肯定会发现,在刚刚的注册链路中,有两个Bean是不知道何时进行实例化的。

### NacosServiceRegistry

追根朔源,我们发现**NacosServiceRegistry**是在**NacosDiscoveryAutoConfiguration**类中进行初始化的。

首先,**NacosDiscoveryAutoConfiguration**类头会有一系列的校验规则:

“`java
@Configuration
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnClass(name = “org.springframework.boot.web.servlet.context.ServletWebServerInitializedEvent”)
@ConditionalOnProperty(value = “spring.cloud.service-registry.auto-registration.enabled”, matchIfMissing = true)
@AutoConfigureBefore({ AutoServiceRegistrationAutoConfiguration.class,
NacosDiscoveryClientAutoConfiguration.class })
“`

校验规则分析:

1. 是个Configuration配置类,意味着接下来需要注入多个Bean。
2. 允许使用配置属性信息。
3. 允许进行Nacos服务发现。
4. 在**ServletWebServerInitializedEvent**实例化之后实例化。
5. 校验自动注册规则,默认值为true。
6. 在实例化之前,先完成**AutoServiceRegistrationAutoConfiguration**和**NacosDiscoveryClientAutoConfiguration**的配置。

峰回路转,我们看到了**NacosDiscoveryClientAutoConfiguration**类。

再看**NacosDiscoveryAutoConfiguration**中具体的Bean,逐个进行分析:

“`Java
@Bean
public NacosServiceRegistry nacosServiceRegistry() {
return new NacosServiceRegistry();
}
“`

第一个是实现**ServiceRegistry**接口的**NacosServiceRegistry**,我们在注册链路中刚刚用过它的register()方法。

它共有两个核心方法,分别是**register()**和**deregister()**。

“`Java
@Override
public void deregister(NacosRegistration registration) {
if (instance.isEphemeral()) {
beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(), instance.getPort());
}
serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);
}
“`

调用链路:

1. 如果需要解绑的实例是我们的服务实例,则需要移除后台正在进行调度的心跳上报任务。
2. 通过NamingProxy向Nacos服务端发起解绑服务请求。

下一个Bean:

“`Java
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration() {
return new NacosRegistration();
}
“`

这个对象也很熟悉,在注册链路和撤销注册链路中,这个对象就是入参。

我们在这里进行一些默认属性的配置:

“`Java
@PostConstruct
public void init() {

Environment env = context.getEnvironment();
Integer managementPort = ManagementServerPortUtils.getPort(context);
if (null != managementPort) {
Map metadata = nacosDiscoveryProperties.getMetadata();
metadata.put(MANAGEMENT_PORT, managementPort.toString());
String contextPath = env
.getProperty(“management.server.servlet.context-path”);
String address = env.getProperty(“management.server.address”);
if (!StringUtils.isEmpty(contextPath)) {
metadata.put(MANAGEMENT_CONTEXT_PATH, contextPath);
}
if (!StringUtils.isEmpty(address)) {
metadata.put(MANAGEMENT_ADDRESS, address);
}
}
}
“`

因为我们需要在**NacosRegistration**中使用**NacosDiscoveryProperties**属性,所以我们就先需要进行**NacosDiscoveryClientAutoConfiguration**的注入。

下一个Bean:

“`Java
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
“`

这个Bean出现在注册链路的前端,在容器执行**finishRefresh()**并发送**ApplicationEven**t事件时,此时,**NacosAutoServiceRegistration**是**AbstractAutoServiceRegistration**的派生类。

> 至此,结束SCA的服务注册源码分析。

> Spring Cloud和Spring Boot是两条链路,开发进度不同,故不要使用最新的Spring Boot版本,应该框架建议的Spring Boot版本,当前SCA代码中Spring Boot版本为2.1.1.RELEASE。

## 总结

我们以更直观的形式进行总结:

![Nacos服务注册流程](http://pic.mysunshine.vip//Spring%20Cloud%20Alibaba/Nacos/%E5%BA%94%E7%94%A8%E6%9C%8D%E5%8A%A1%E6%B3%A8%E5%86%8C%E6%B5%81%E7%A8%8B.png/SecondSundialDreams)

赞(1) 投币

评论 抢沙发

慕勋的实验室慕勋的研究院

码字不容易,路过请投币

支付宝扫一扫

微信扫一扫