在上一节中提到@SentinelResource注解,在注解中,是通过切面的形式进行上报,上报引擎是通过Env类型进行初始化的。
public static Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
return Env.sph.entry(name, type, count, args);
}
Sph接口
Sph是一个用于进行resource保护的接口,如果触发了阻塞标准,会抛出BlockException异常,而它的返回值是一个Entry,如果请求获取到了这个Entry,证明允许调用通过。
Sph的实现类是CtSph,用于实现resource使用情况的所有上报逻辑,我们来看核心方法#entry():
@Override
public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
// 由于传入的是资源名称,使用StringResourceWrapper
StringResourceWrapper resource = new StringResourceWrapper(name, type);
// 进行资源上报
return entry(resource, count, args);
}
ResourceWrapper
ResourceWrapper是Sentinel对我们请求resource的内部通用包装类,用于判断两种resource是否相等,总共具有两个参数:
/**
* resource名称
*/
protected String name;
/**
* 请求类型
*/
protected EntryType type = EntryType.OUT;
- name,名称,即为我们请求的资源名称。
- type,请求的类型,总体分为两种,入口请求和出口请求(可以理解为Inbound和Outbound)。
它的核心方法是#equals():
/**
* 只考虑{@link #getName()}
*/
@Override
public boolean equals(Object obj) {
if (obj instanceof ResourceWrapper) {
ResourceWrapper rw = (ResourceWrapper)obj;
return rw.getName().equals(getName());
}
return false;
}
比较的两个类型必须同是ResourceWrapper类型,但只比较两个请求资源的名称。
通过这个方法,可以实现通过资源名称、资源方法进行请求,使用的还是同一个资源。
ResourceWrapper目前为止共有两个实现类:
- StringResourceWrapper:通用的ResourceWrapper,入参是resource name。
- MethodResourceWrapper:方法调用的ResourceWrapper,入参是method。
在进入上报流程后,我们对资源的使用都会通过ResourceWrapper这个包装类进行
我们继续看CtSph#entry()方法。
CtSph#entry()
public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
return entryWithPriority(resourceWrapper, count, false, args);
}
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
Context context = ContextUtil.getContext();
// 如果当前线程使用的是NullContext,证明已经不会被规则控制
if (context instanceof NullContext) {
// 只实例化entry即可,无需校验规则
return new CtEntry(resourceWrapper, null, context);
}
// 当前线程还没有上线文,使用默认的上下文
if (context == null) {
context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
}
// Sentinel的全局开关关闭,也不会进行规则校验
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
// 此时已经证明了有Context
// 创建ProcessorSlot
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
// chain为null意味着已经超过slot chain的总数上限,不会进行规则校验
// 创建Entry之后直接返回
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// 在链上申请,相当于过一遍Sentinel
// 由于链上插满了可扩展的槽,比如流控,降级都是可扩展的槽,此方法相当于按照槽的顺序过滤了一遍
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
// 触发规则限制,抛出异常
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// Sentinel内部错误,比较少见,记录下来,用于反馈
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
- 首先,需要创建当前外部请求的上下文。
- 其次,Sentinel具有默认开启功能的全局开关,校验此开关是否关闭。
- 然后,我们会根据上线文获取进行网关操作的责任链。
- 接着,我们会将当前请求的resource,依次通过责任链进行处理。
- 最后,我们会对不同的返回结果进行处理。
Context
Context存储了请求调用的上下文信息,包括上下文名称、调用树的根节点、当前正在处理的节点、当前上下文的Origin名称等信息。
所有的外部请求,都离不开上下文。
CtSph#lookProcessChain()
CtSph#lookProcessChain()方法用于获取resource的ProcessorSlotChain:
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
// 从缓存中获取当前resource关联的ProcessorSlotChain
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
// 如果当前resource没有关联,同步下创建一个新的处理链
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
// 双重校验
if (chain == null) {
// entry大小限制,不能超过SLOT_CHAIN的阈值
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
// 使用SlotChainProvider创建一个新的slot chain
chain = SlotChainProvider.newSlotChain();
// 迁移并缓存新创建的slot chain
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
通过ResourceWrapper#equals()方法判断相等的resource会全局共用ProcessorSlotChain,并且与上下文无关。
- 首先,并不是每次请求都会重新创建一条处理链,针对resource,Sentinel会对其进行缓存。
- 其次,首次请求对应,会在双重校验锁的同步状态下,创建处理链。
- 接着,使用SlotChainProvider创建新的slot chain。
- 最后,迁移缓存集合。
由于每个resource的slot chain可以全局复用,所以Sentinel要求,每个节点上的处理链个数不能超过6000个。
SlotChainProvider#newSlotChain()
SlotChainProvider#newSlotChain()用于创建slot chain以及chain上的一系列配件。
public static ProcessorSlotChain newSlotChain() {
// 可以使用建造器构建的情况下,使用建造器构造
if (builder != null) {
return builder.build();
}
// 没有建造器,会创建一个建造器
// 使用自定义建造器或者默认建造器
resolveSlotChainBuilder();
// 如果仍为空,可能在加载builder中出现异常状态,使用DefaultSlotChainBuilder
if (builder == null) {
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
builder = new DefaultSlotChainBuilder();
}
// 返回建造器构建的slot chain
return builder.build();
}
- 首先,slot chain是通过建造者模式,以固定模型进行构建的。
- 其次,如果当前没有slot chain的建造器,就需要找到对应的建造器。
- 然后,如果在源码中没有找到任何建造器,则使用DefaultSlotChainBuilder建造器。
- 最后,使用计算出的建造器构建slot chain。
#resolveSlotChainBuilder()
SlotChainProvider#resolveSlotChainBuilder()用于加载指定的slot chain建造器,出现多个时,仅会添加一个,和指定的顺序有关。
private static void resolveSlotChainBuilder() {
List<SlotChainBuilder> list = new ArrayList<SlotChainBuilder>();
boolean hasOther = false;
// 遍历通过SPI加载的slot chain
for (SlotChainBuilder builder : LOADER) {
if (builder.getClass() != DefaultSlotChainBuilder.class) {
hasOther = true;
list.add(builder);
}
}
// 获取自定义的DefaultSlotChainBuilder建造者
if (hasOther) {
builder = list.get(0);
} else {
// 没有自定义的builder,使用默认的DefaultSlotChainBuilder进行构造
builder = new DefaultSlotChainBuilder();
}
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "
+ builder.getClass().getCanonicalName());
}
- 首先,我们可以通过SPI指定SlotChainBuilder接口的方式,自定义slot chain的建造器。
- 其次,Sentinel会根据ServiceLoader对SlotChainBuilder进行加载。
- 然后,仅会从多个自定义SlotChainBuilder中获取第一个建造器。
- 最后,如果没有找到任何自定义的SlotChainBuilder,会使用DefaultSlotChainBuilder作为slot chain的默认建造器。
DefaultSlotChainBuilder#build()
DefaultSlotChainBuilder#build()使用默认的建造器建造slot chain。
@Override
public ProcessorSlotChain build() {
// 一个默认的{@link ProcessorSlotChain},相当于链
// Slot相当于链上的槽,各种类型的Slot相当于插入槽的组件
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
return chain;
}
ProcessorSlotChain提供了需要进行处理的链,而责任链上的slot则相当于各种可插拔的槽,而NodeSelectorSlot、ClusterBuilderSlot等相当于热插拔的插件,提供实际的处理逻辑。
我们接着回到CtSph#entryWithPriority()方法。
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// 在链上申请,相当于过一遍Sentinel
// 由于链上插满了可扩展的槽,比如流控,降级都是可扩展的槽,此方法相当于按照槽的顺序过滤了一遍
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
// 触发规则限制,抛出异常
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// Sentinel内部错误,比较少见,记录下来,用于反馈
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
在获取到对应resource的slot chain后,Sentinel会直接创建Entry,Entry包含了并发调用时的信息,当前处理的节点,后端服务节点,申请Entry时间等信息。
- chain.entry()相当于从责任链的header开始,进行处理。
- 如果正常通过责任链,则证明成功通过各种网关拦截,可以执行请求。
- 如果捕获到BlockException,证明在进行slot的处理过程中,被规则限制住,不能执行本次请求。
- 其他异常属于Sentinel的内部错误,比较少见,记录下来,同时判定通过本次责任链的处理。
ProcessorSlotChain
如果说Sph属于Sentinel的工具类,ProcessorSlotChain就属于Sentinel网关实现的核心。
/**
* 一些存储过程的容器,以及完成过程时的通知方式
*/
public interface ProcessorSlot<T> {
void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,
Object... args) throws Throwable;
void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,
Object... args) throws Throwable;
void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}
- #entry()与#exit(),用于进入和退出当前Slot的处理。
- #fireEntry()与#fireExit(),用于开启和关闭下一个Slot,以便调用#entry()和#exit()进行处理。
AbstractLinkedProcessorSlot
AbstractLinkedProcessorSlot是ProcessorSlot的抽闲实现类,用于定义处理链的运转:
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
private AbstractLinkedProcessorSlot<?> next = null;
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
@SuppressWarnings("unchecked")
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
throws Throwable {
T t = (T)o;
entry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
if (next != null) {
next.exit(context, resourceWrapper, count, args);
}
}
public AbstractLinkedProcessorSlot<?> getNext() {
return next;
}
public void setNext(AbstractLinkedProcessorSlot<?> next) {
this.next = next;
}
}
AbstractLinkedProcessorSlot私有了一个next指针,指向下一个需要处理的slot。
那么这个next指针什么时候链起整个slot的呢?
我们在DefaultSlotChainBuilder#build()方法中链通过addLast()的方式添加的,DefaultProcessorSlotChain#addLast():
@Override
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
// end节点的下一个节点设为给定节点,同时将end节点设置为给定节点
// 这么写是避免链断掉
end.setNext(protocolProcessor);
end = protocolProcessor;
}
每次写入都会将end节点的下一个节点设为给定的ProcessorSlot节点,同时将给定的ProcessorSlot更新为end节点。
AbstractLinkedProcessorSlot#fireEntry()
AbstractLinkedProcessorSlot#fireEntry()用于移动处理链上的指针,相当于:
ProcessorSlotChain#entry()
前文提到,ProcessorSlot#entry()用于请求处理链中的header slot,在创建DefaultProcessorSlotChain时,会默认创建一个空操作的header slot:
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
super.fireExit(context, resourceWrapper, count, args);
}
};
可以理解为它是一个仅用于起始的header slot。
紧接着,DefaultProcessorSlotChaint提供了如下的调用链: