标准 专业
多元 极客

Spring Cloud Alibaba研究院(4)——Sentinel——延迟加载

在进行Spring Cloud Alibaba Sentinel服务注册源码分析时,我们已经得出结论:

  • spring-cloud-alibaba-sentinel默认是懒加载的。
  • 控制懒加载的参数是spring.cloud.sentinel.eager,默认是false。
  • 立即加载是发生在SentinelAutoConfiguration中。

我们在实验过程中会发现,当应用服务启动时,我们需要触发下一下Sentinel资源,对应的应用信息,才会显示在Sentinel控制台中。

Sentinel加载之后,又做了一些什么呢?

@SentinelResource的切面

沿着实验过程顺藤摸瓜,我们可以找到@SentinelResource的切面和切点:

@Configuration
public class AopConfiguration {

    @Bean
    public SentinelResourceAspect sentinelResourceAspect() {
        return new SentinelResourceAspect();
    }
}
@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {

    @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
    public void sentinelResourceAnnotationPointcut() {
    }

    @Around("sentinelResourceAnnotationPointcut()")
    public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
        // ① 解析当前方法
        Method originMethod = resolveMethod(pjp);

        SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
        // ② 注解校验
        if (annotation == null) {
            ...
        }
        String resourceName = getResourceName(annotation.value(), originMethod);
        EntryType entryType = annotation.entryType();
        Entry entry = null;
        try {
            // ③ 请求上报
            entry = SphU.entry(resourceName, entryType, 1, pjp.getArgs());
            Object result = pjp.proceed();
            return result;
        } catch (BlockException ex) {
            return handleBlockException(pjp, annotation, ex);
        } catch (Throwable ex) {
            Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
            // ④ 异常规避处理
            if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
                throw ex;
            }
            // ⑤ 异常追踪处理
            if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
                traceException(ex, annotation);
                return handleFallback(pjp, annotation, ex);
            }

            // ⑥ 直接抛出异常
            throw ex;
        } finally {
            if (entry != null) {
                // ⑦请求结束,计数减1
                entry.exit(1, pjp.getArgs());
            }
        }
    }
}
  1. 解析使用@SentinelResource注解的方法,获取相应的方法信息。
  2. 进行@SentinelResource注解校验。
  3. 使用Sentinel进行请求上报。
  4. 异常规避处理。
  5. 异常追踪处理。
  6. 直接抛出异常。
  7. 请求完成后,对应的规则计数器减一。

接下来我们看SphU#entry()静态方法:

public static Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
    return Env.sph.entry(name, type, count, args);
}

我们发现存在一个Env的单例对象:

public class Env {

    public static final Sph sph = new CtSph();

    static {
       	// 如果初始化失败,该进程会关闭
        InitExecutor.doInit();
    }
}

我们在Env的静态初始化方法中,看到了关键信息InitExecutor#doInit()

public static void doInit() {
    // ① 乐观锁
    if (!initialized.compareAndSet(false, true)) {
        return;
    }

    try {
        // ② 加载InitFunc接口
        ServiceLoader<InitFunc> loader = ServiceLoader.load(InitFunc.class);
        List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
        for (InitFunc initFunc : loader) {
            RecordLog.info("[InitExecutor] Found init func: " + initFunc.getClass().getCanonicalName());
            // ③ 调整初始化顺序
            insertSorted(initList, initFunc);
        }
        // ④ 按顺序进行初始化
        for (OrderWrapper w : initList) {
            w.func.init();
            RecordLog.info(String.format("[InitExecutor] Executing %s with order %d",
                w.func.getClass().getCanonicalName(), w.order));
        }
    } catch (Exception ex) {
        ...
    } catch (Error error) {
        ...
    }
}
  1. 使用乐观锁对初始化过程进行加锁,代表着一次启动后仅会加载一次。
  2. 获取InitFunc.class接口类型的SPI类。
  3. 将需要初始化的类用OrderWrapper包装一层,并且根据InitOrder调整初始化顺序。
  4. 按顺序进行初始化。

我们来看一下默认的几个初始化类。

InitExecutor#doInit()Sentinel服务注册中的立即加载,有异曲同工之妙。

Sentinel的SPI

CommandCenterInitFunc

初始化命令中心

这是上报功能的关键对象

@Override
public void init() throws Exception {
    // ① 获取CommandCenter对象
    CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();

    if (commandCenter == null) {
        RecordLog.warn("[CommandCenterInitFunc] Cannot resolve CommandCenter");
        return;
    }
	// ② 准备执行
    commandCenter.beforeStart();
    // ③ 开始执行
    commandCenter.start();
    RecordLog.info("[CommandCenterInit] Starting command center: "
                   + commandCenter.getClass().getCanonicalName());
}
  1. CommandCenterProvider中获取一个CommandCenter对象。
  2. 进行CommandCenter的预执行工作。
  3. 执行CommandCenter

了解CommandCenter的操作步骤,我们接下来分析一下它的具体作用。

CommandCenterProvider进行初次加载时,会进行一次SPI操作,CommandCenterProvider#resolveInstance

private static void resolveInstance() {
    CommandCenter resolveCommandCenter = SpiLoader.loadHighestPriorityInstance(CommandCenter.class);

    if (resolveCommandCenter == null) {
        RecordLog.warn("[CommandCenterProvider] WARN: No existing CommandCenter found");
    } else {
        commandCenter = resolveCommandCenter;
        RecordLog.info("[CommandCenterProvider] CommandCenter resolved: " + resolveCommandCenter.getClass()
                       .getCanonicalName());
    }
}

它会获取最高优先级的CommandCenter,可见CommandCenter的实现不止一个。

截稿为止,CommandCenter的默认实现有SimpleHttpCommandCenterNettyHttpCommandCenter

在默认的Sentinel配置中,SimpleHttpCommandCenter优先级最高,看下它的执行和预执行部分。

public void beforeStart() throws Exception {
    // 注册所有的命令处理句柄
    Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
    registerCommands(handlers);
}

在预执行过程中,注册了Sentinel所需要的所有上报命令处理句柄。

大致包括:请求应用的所有规则客户端配置客户端状态Http请求信息等,以及更新规则等写操作

我们继续看执行操作:

@Override
public void start() throws Exception {
    int nThreads = Runtime.getRuntime().availableProcessors();
    // ① 创建一个线程池
    this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
        new ArrayBlockingQueue<Runnable>(10),
        new NamedThreadFactory("sentinel-command-center-service-executor"),
        new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                CommandCenterLog.info("EventTask rejected");
                throw new RejectedExecutionException();
            }
        });

    // ② 创建一个线程任务
    Runnable serverInitTask = new Runnable() {
        int port;

        {
            try {
                port = Integer.parseInt(TransportConfig.getPort());
            } catch (Exception e) {
                port = DEFAULT_PORT;
            }
        }

        @Override
        public void run() {
            boolean success = false;
            ServerSocket serverSocket = getServerSocketFromBasePort(port);

            if (serverSocket != null) {
                CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
                socketReference = serverSocket;
                executor.submit(new ServerThread(serverSocket));
                success = true;
                port = serverSocket.getLocalPort();
            } else {
                CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
            }

            if (!success) {
                port = PORT_UNINITIALIZED;
            }

            TransportConfig.setRuntimePort(port);
            executor.shutdown();
        }

    };
    // ③ 执行线程任务
    new Thread(serverInitTask).start();
}
  1. 创建一个线程池。
  2. 创建一个线程任务,并且开启了一个默认端口8719的Socket连接,这个线程任务是一个服务线程任务。
  3. 开始执行这个线程任务。

任务调度

那么这个线程池和调度任务,具体负责什么工作呢?

class ServerThread extends Thread {

    private ServerSocket serverSocket;

    ServerThread(ServerSocket s) {
        this.serverSocket = s;
        setName("sentinel-courier-server-accept-thread");
    }

    @Override
    public void run() {
        while (true) {
            Socket socket = null;
            try {
                socket = this.serverSocket.accept();
                setSocketSoTimeout(socket);
                // ① 创建一个HttpEventTask,并执行
                HttpEventTask eventTask = new HttpEventTask(socket);
                bizExecutor.submit(eventTask);
            } catch (Exception e) {
                ...
            }
        }
    }
}

这里创建并使用SimpleHttpCommandCenter#start()方法执行这个事件任务。

我们继续看HttpEventTask

@Override
public void run() {
    if (socket == null) {
        return;
    }

    BufferedReader in = null;
    PrintWriter printWriter = null;
    try {
        long start = System.currentTimeMillis();
        in = new BufferedReader(new InputStreamReader(socket.getInputStream(), SentinelConfig.charset()));
        OutputStream outputStream = socket.getOutputStream();

        printWriter = new PrintWriter(
            new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));

        String line = in.readLine();
        CommandCenterLog.info("[SimpleHttpCommandCenter] socket income: " + line
            + "," + socket.getInetAddress());
        CommandRequest request = parseRequest(line);
        // ① 解析Http请求
        if (line.length() > 4 && StringUtil.equalsIgnoreCase("POST", line.substring(0, 4))) {
            String bodyLine = null;
            boolean bodyNext = false;
            boolean supported = false;
            int maxLength = 8192;
            while (true) {
                if (bodyNext) {
                    if (!supported) {
                        break;
                    }
                    char[] bodyBytes = new char[maxLength];
                    int read = in.read(bodyBytes);
                    String postData = new String(bodyBytes, 0, read);
                    parseParams(postData, request);
                    break;
                }

                bodyLine = in.readLine();
                if (bodyLine == null) {
                    break;
                }

                if (StringUtil.isEmpty(bodyLine)) {
                    bodyNext = true;
                    continue;
                }

                int index = bodyLine.indexOf(":");
                if (index < 1) {
                    continue;
                }
                String headerName = bodyLine.substring(0, index);
                String header = bodyLine.substring(index + 1).trim();
                if (StringUtil.equalsIgnoreCase("content-type", headerName)) {
                    if (StringUtil.equals("application/x-www-form-urlencoded", header)) {
                        supported = true;
                    } else {

                        break;
                    }
                } else if (StringUtil.equalsIgnoreCase("content-length", headerName)) {
                    try {
                        int len = new Integer(header);
                        if (len > 0) {
                            maxLength = len;
                        }
                    } catch (Exception e) {
                    }
                }
            }
        }

        // ② 获取执行命令
        String commandName = HttpCommandUtils.getTarget(request);
        if (StringUtil.isBlank(commandName)) {
            badRequest(printWriter, "Invalid command");
            return;
        }

        // ③ 执行具体操作
        CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
        if (commandHandler != null) {
            CommandResponse<?> response = commandHandler.handle(request);
            handleResponse(response, printWriter, outputStream);
        } else {
            badRequest(printWriter, "Unknown command `" + commandName + '`');
        }
        // ④ 返回执行结果
        printWriter.flush();

        ...
    } catch (Throwable e) {
        ...
    } finally {
        ...
    }
}
  1. POST请求,从Body中解析参数。
  2. 获取并校验此次执行的命令。
  3. 从在与执行操作中注册的CommandHandler获取对应的命令,并执行handle()操作。
  4. 无论成功与失败,返回对应的响应。

随意给大家看一下请求样式:

GET /setRules?type=flow&data=%5B%7B%22clusterMode%22%3Afalse%2C%22controlBehavior%22%3A0%2C%22count%22%3A2.0%2C%22grade%22%3A1%2C%22limitApp%22%3A%22default%22%2C%22maxQueueingTimeMs%22%3A500%2C%22resource%22%3A%22sentinelTest%22%2C%22strategy%22%3A0%2C%22warmUpPeriodSec%22%3A10%7D%5D HTTP/1.1, address: /172.19.164.140, time cost: 3188 ms

HeartbeatSenderInitFunc

HeartbeatSenderInitFunc用于和Sentinel服务端进行心跳机制,核心实现init():。

@Override
public void init() {
    HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();
    if (sender == null) {
        RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");
        return;
    }
	// ① 创建调度器
    initSchedulerIfNeeded();
    // ② 获取并设置默认的心跳间隔
    long interval = retrieveInterval(sender);
    setIntervalIfNotExists(interval);
    // ③ 执行心跳任务
    scheduleHeartbeatTask(sender, interval);
}

HeartbeatSenderProvider获取HeartbeatSender的策略和CommandCenter类似,均是通过SPI获取对应最高优先级的执行对象,我们仍然以SimpleHttpHeartbeatSender进行举例。

  1. 创建一个coolPoolSize为2,拒绝策略为丢弃最久任务的任务调度线程池。
  2. 获取并设置默认的心跳间隔时间,默认是5000ms。
  3. 执行心跳任务。

我们继续看如何执行心跳任务,HeartbeatSenderInitFunc#scheduleHeartbeatTask

private void scheduleHeartbeatTask(final HeartbeatSender sender, /*@Valid*/ long interval) {
    pool.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                sender.sendHeartbeat();
            } catch (Throwable e) {
                RecordLog.warn("[HeartbeatSender] Send heartbeat error", e);
            }
        }
    }, 5000, interval, TimeUnit.MILLISECONDS);
    RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: "
                   + sender.getClass().getCanonicalName());
}

我们可以看出这个任务是延迟5000ms执行,每次任务的执行间隔为刚才我们计算出心跳间隔时间。

我们继续看心跳的具体实现,SimpleHttpHeartbeatSender#sendHeartbeat()

@Override
public boolean sendHeartbeat() throws Exception {
    if (TransportConfig.getRuntimePort() <= 0) {
        ...
        return false;
    }
    // ① 获取服务端请求地址
    InetSocketAddress addr = getAvailableAddress();
    if (addr == null) {
        return false;
    }

    // ② 组装Http请求
    SimpleHttpRequest request = new SimpleHttpRequest(addr, HEARTBEAT_PATH);
    request.setParams(heartBeat.generateCurrentMessage());
    try {
        // ③ 执行Http请求
        SimpleHttpResponse response = httpClient.post(request);
        if (response.getStatusCode() == OK_STATUS) {
            return true;
        }
    } catch (Exception e) {
        ...
    }
    return false;
}
  1. 从系统配置中,获取服务端的请求地址。
  2. 组装Http请求。
  3. 执行Http请求。

总结

从上面的源码分析过程中,我们可以大致总结一下我们的服务第一次上报的整体流程:

  1. 使用@SentinelResource的服务,触发第一次执行后,会执行一次AOP的环绕增强。
  2. 环绕增强中,会进行Sentinel资源的上报以及对资源的释放。
  3. SphU#Env对象的首次调用将会触发Sentinel一系列功能的初始化,也就是Sentinel的延迟加载。
  4. Sentinel的延迟加载,通过SPI模式默认加载了命令处理心跳处理两类InitFunc
  5. 命令处理用于处理所有命令,比如规则上报,或者是来自Sentinel规则平台操作的规则更新。
  6. 心跳处理用于维持和Sentinel控制台的心跳,心跳时间有特殊的计算逻辑,并非固定时间间隔触发。

 

赞(1) 投币

慕勋的实验室慕勋的研究院

码字不容易,路过请投币

支付宝扫一扫

微信扫一扫