请求已经发送出去,我们是如何接收回调的呢?
请求响应
首先,我们在执行请求的过程中,出现了几次promise.addListener()情况,而在ProxyEndpoint中,我们还对其做一些特殊的逻辑:
@Override
public void operationComplete(final Future<PooledConnection> connectResult) {
try {
methodBinding.bind(() -> {
Integer readTimeout = null;
Server server = chosenServer.get();
if (server != null) {
// 状态上报
if (requestStat != null) {
requestStat.server(server);
}
final ExecutionContext<?> executionContext = origin.getExecutionContext(zuulRequest);
IClientConfig requestConfig = executionContext.getRequestConfig();
try {
readTimeout = setReadTimeoutOnContext(requestConfig, attemptNum);
origin.onRequestStartWithServer(zuulRequest, server, attemptNum);
Object overriddenReadTimeoutObj = requestConfig.get(IClientConfigKey.Keys.ReadTimeout);
if (overriddenReadTimeoutObj != null && overriddenReadTimeoutObj instanceof Integer) {
int overriddenReadTimeout = (Integer) overriddenReadTimeoutObj;
readTimeout = overriddenReadTimeout;
}
} catch (Throwable e) {
handleError(e);
return;
} finally {
if (originalReadTimeout == null) {
requestConfig.setProperty(ReadTimeout, null);
} else {
requestConfig.setProperty(ReadTimeout, originalReadTimeout);
}
}
}
// ② 连接建立处理
if (connectResult.isSuccess()) {
onOriginConnectSucceeded(connectResult.getNow(), readTimeout);
} else {
onOriginConnectFailed(connectResult.cause());
}
});
} catch (Throwable ex) {
...
}
}
- 一些负载均衡,或者请求的数据收集上报处理。
- 连接建立结果处理。
我们继续看ProxyEndpoint#onOriginConnectSucceeded()方法:
private void onOriginConnectSucceeded(PooledConnection conn, int readTimeout) {
passport.add(ORIGIN_CONN_ACQUIRE_END);
if (context.isCancelled()) {
conn.release();
} else {
conn.startReadTimeoutHandler(readTimeout);
currentRequestAttempt.setReadTimeout(readTimeout);
// 开始进行数据传输
writeClientRequestToOrigin(conn);
}
}
这个时候才准备开始进行数据传输,这里是通过promise异步任务的特性,并不断进行trySuccess(),从而保证在连接建立的情况下,才会开始进行数据传输。
private void writeClientRequestToOrigin(final PooledConnection conn) {
final Channel ch = conn.getChannel();
passport.setOnChannel(ch);
context.set("_origin_channel", ch);
context.set(POOLED_ORIGIN_CONNECTION_KEY, conn);
// 预写入
preWriteToOrigin(chosenServer.get(), context);
final ChannelPipeline pipeline = ch.pipeline();
originResponseReceiver = getOriginResponseReceiver();
pipeline.addBefore("connectionPoolHandler", OriginResponseReceiver.CHANNEL_HANDLER_NAME, originResponseReceiver);
// 判断是否需要再填充
repopulateRetryBody();
// 请求写入channel
ch.write(zuulRequest);
writeBufferedBodyContent(zuulRequest, ch);
ch.flush();
// 准备从channel中读取数据
ch.read();
originConn = conn;
// 继续进行数据读取
channelCtx.read();
}
- 将自定义内容写入请求。
- 判断body内容是否需要再填充,只有在SSL的请求缓存存在时,并且需要body存储数据,并且body没有实际值时,才会进行再填充。
- 请求写入Channel,并执行flush()操作。
- 切换Channel模式后,准备从Channel中读取数据。
- 当前的Channel可以继续读取接下来的数据了。
我们从Channel中读取的数据是后端服务节点提供的响应数据。
不同于请求逻辑,响应逻辑将会执行我们之前已经注册好的所有ChanneOutboundHandler,而接收请求的核心Handler是OriginResponseRecevier,在它的channelRead()方法中,进行了内容对象的解析:
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse) {
if (edgeProxy != null) {
// 处理响应
edgeProxy.responseFromOrigin((HttpResponse) msg);
}
// 继续读取数据
ctx.channel().read();
// 消息内容,进行合并处理
} else if (msg instanceof HttpContent) {
final HttpContent chunk = (HttpContent) msg;
if (edgeProxy != null) {
edgeProxy.invokeNext(chunk);
} else {
chunk.release();
}
ctx.channel().read();
} else {
// 此处代码应该不会执行
ReferenceCountUtil.release(msg);
final Exception error = new IllegalStateException("Received invalid message from origin");
if (edgeProxy != null) {
edgeProxy.errorFromOrigin(error);
}
ctx.fireExceptionCaught(error);
}
}
- 当内容对象是HttpResponse时,我们首先会让ProxyEndpoint将它当做响应对象进行处理。
- 处理和继续读取Channel中的数据串行执行。
- 当内容对象是消息内容是,会通过ChannelOutboundHandler,对其进行合并处理。
- 异常处理,Zuul官方表示应该不会有代码会走到这里。
我们继续看ProxyEndpoint#responseFromOrigin():
public void responseFromOrigin(final HttpResponse originResponse) {
try {
// 进行响应处理
methodBinding.bind(() -> processResponseFromOrigin(originResponse));
} catch (Exception ex) {
// 异常处理
unlinkFromOrigin();
LOG.error("Error in responseFromOrigin", ex);
channelCtx.fireExceptionCaught(ex);
}
}
- 进行响应处理。
- 异常处理,例如相应的计数器减法操作,连接置空等。
我们继续看是如何处理正确的响应的:
private void processResponseFromOrigin(final HttpResponse originResponse) {
if (originResponse.status().code() >= 500) {
handleOriginNonSuccessResponse(originResponse, chosenServer.get());
} else {
handleOriginSuccessResponse(originResponse, chosenServer.get());
}
}
protected void handleOriginSuccessResponse(final HttpResponse originResponse, Server chosenServer) {
// 计数器计数
origin.recordSuccessResponse();
if (originConn != null) {
originConn.getServerStats().clearSuccessiveConnectionFailureCount();
}
final int respStatus = originResponse.status().code();
long duration = 0;
if (requestStat != null) {
requestStat.updateWithHttpStatusCode(respStatus);
requestStat.finishIfNotAlready();
duration = requestStat.duration();
}
if (currentRequestAttempt != null) {
currentRequestAttempt.complete(respStatus, duration, null);
}
final StatusCategory statusCategory = respStatus == 404 ? SUCCESS_NOT_FOUND : SUCCESS;
// 构建响应
zuulResponse = buildZuulHttpResponse(originResponse, statusCategory, context.getError());
// 进行下一步操作
invokeNext(zuulResponse);
}
- 计数器计数。
- 构建响应,同时会对响应的404响应码进行剥离,以便快速通知下游进行对应操作。
- 进行下一步操作。
响应对象的构建,是对响应头进行属性赋值,HttpContent内容进行解析。
public void invokeNext(final HttpResponseMessage zuulResponse) {
try {
methodBinding.bind(() -> filterResponse(zuulResponse));
} catch (Exception ex) {
unlinkFromOrigin();
LOG.error("Error in invokeNext resp", ex);
channelCtx.fireExceptionCaught(ex);
}
}
private void filterResponse(final HttpResponseMessage zuulResponse) {
if (responseFilters != null) {
responseFilters.filter(zuulResponse);
} else {
channelCtx.fireChannelRead(zuulResponse);
}
}
和接到请求时的Handler也会调用BaseZuulFilterRunner相同,响应时也会进行同样的调用,不同的是请求时执行的是InboundFilter,而响应时执行的是OutboundFilter。
至此,请求到Origin,再从Origin到响应的整个流程就全部解析完成了。
总结
- 对于每个请求的响应,Zuul 2进行的是异步处理,也就是在当前线程向Channel中写入数据后,会去继续处理其他线程,然后异步等待后端服务的响应。
- 收到响应后,也会进行责任链的处理模式。
- Zuul 2通过责任链对响应进行组装,返回给前端服务。