标准 专业
多元 极客

Zuul 2研究院(4)——响应处理

请求已经发送出去,我们是如何接收回调的呢?

请求响应

首先,我们在执行请求的过程中,出现了几次promise.addListener()情况,而在ProxyEndpoint中,我们还对其做一些特殊的逻辑:

@Override
public void operationComplete(final Future<PooledConnection> connectResult) {
	try {
		methodBinding.bind(() -> {

			Integer readTimeout = null;
			Server server = chosenServer.get();

			if (server != null) {
                // 状态上报
				if (requestStat != null) {
					requestStat.server(server);
				}

				final ExecutionContext<?> executionContext = origin.getExecutionContext(zuulRequest);
				IClientConfig requestConfig = executionContext.getRequestConfig();
				try {
					readTimeout = setReadTimeoutOnContext(requestConfig, attemptNum);

					origin.onRequestStartWithServer(zuulRequest, server, attemptNum);

					Object overriddenReadTimeoutObj = requestConfig.get(IClientConfigKey.Keys.ReadTimeout);
					if (overriddenReadTimeoutObj != null && overriddenReadTimeoutObj instanceof Integer) {
						int overriddenReadTimeout = (Integer) overriddenReadTimeoutObj;
						readTimeout = overriddenReadTimeout;
					}
				} catch (Throwable e) {
					handleError(e);
					return;
				} finally {
					if (originalReadTimeout == null) {
						requestConfig.setProperty(ReadTimeout, null);
					} else {
						requestConfig.setProperty(ReadTimeout, originalReadTimeout);
					}
				}
			}

			// ② 连接建立处理
			if (connectResult.isSuccess()) {
				onOriginConnectSucceeded(connectResult.getNow(), readTimeout);
			} else {
				onOriginConnectFailed(connectResult.cause());
			}
		});
	} catch (Throwable ex) {
		...
	}
}
  1. 一些负载均衡,或者请求的数据收集上报处理。
  2. 连接建立结果处理。

我们继续看ProxyEndpoint#onOriginConnectSucceeded()方法:

private void onOriginConnectSucceeded(PooledConnection conn, int readTimeout) {
	passport.add(ORIGIN_CONN_ACQUIRE_END);

	if (context.isCancelled()) {
		conn.release();
	} else {

		conn.startReadTimeoutHandler(readTimeout);

		currentRequestAttempt.setReadTimeout(readTimeout);

        // 开始进行数据传输
		writeClientRequestToOrigin(conn);
	}
}

这个时候才准备开始进行数据传输,这里是通过promise异步任务的特性,并不断进行trySuccess(),从而保证在连接建立的情况下,才会开始进行数据传输。

private void writeClientRequestToOrigin(final PooledConnection conn) {
	final Channel ch = conn.getChannel();
	passport.setOnChannel(ch);

	context.set("_origin_channel", ch);
	context.set(POOLED_ORIGIN_CONNECTION_KEY, conn);

    // 预写入
	preWriteToOrigin(chosenServer.get(), context);

	final ChannelPipeline pipeline = ch.pipeline();
	originResponseReceiver = getOriginResponseReceiver();
	pipeline.addBefore("connectionPoolHandler", OriginResponseReceiver.CHANNEL_HANDLER_NAME, originResponseReceiver);

	// 判断是否需要再填充
	repopulateRetryBody();

    // 请求写入channel
	ch.write(zuulRequest);
	writeBufferedBodyContent(zuulRequest, ch);
	ch.flush();

	// 准备从channel中读取数据
	ch.read();

	originConn = conn;
    // 继续进行数据读取
	channelCtx.read();
}
  1. 将自定义内容写入请求。
  2. 判断body内容是否需要再填充,只有在SSL的请求缓存存在时,并且需要body存储数据,并且body没有实际值时,才会进行再填充。
  3. 请求写入Channel,并执行flush()操作。
  4. 切换Channel模式后,准备从Channel中读取数据。
  5. 当前的Channel可以继续读取接下来的数据了。

我们从Channel中读取的数据是后端服务节点提供的响应数据。

不同于请求逻辑,响应逻辑将会执行我们之前已经注册好的所有ChanneOutboundHandler,而接收请求的核心HandlerOriginResponseRecevier,在它的channelRead()方法中,进行了内容对象的解析:

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
	if (msg instanceof HttpResponse) {
		if (edgeProxy != null) {
            // 处理响应
			edgeProxy.responseFromOrigin((HttpResponse) msg);
        }
        // 继续读取数据
        ctx.channel().read();
        // 消息内容,进行合并处理
	} else if (msg instanceof HttpContent) {
		final HttpContent chunk = (HttpContent) msg;
		if (edgeProxy != null) {
			edgeProxy.invokeNext(chunk);
		} else {
			chunk.release();
		}
		ctx.channel().read();
	} else {
		// 此处代码应该不会执行
		ReferenceCountUtil.release(msg);
		final Exception error = new IllegalStateException("Received invalid message from origin");
		if (edgeProxy != null) {
			edgeProxy.errorFromOrigin(error);
		}
		ctx.fireExceptionCaught(error);
	}
}
  1. 当内容对象是HttpResponse时,我们首先会让ProxyEndpoint将它当做响应对象进行处理。
  2. 处理和继续读取Channel中的数据串行执行。
  3. 当内容对象是消息内容是,会通过ChannelOutboundHandler,对其进行合并处理。
  4. 异常处理,Zuul官方表示应该不会有代码会走到这里。

我们继续看ProxyEndpoint#responseFromOrigin()

public void responseFromOrigin(final HttpResponse originResponse) {
    try {
        // 进行响应处理
        methodBinding.bind(() -> processResponseFromOrigin(originResponse));
    } catch (Exception ex) {
        // 异常处理
        unlinkFromOrigin();
        LOG.error("Error in responseFromOrigin", ex);
        channelCtx.fireExceptionCaught(ex);
    }
}
  1. 进行响应处理。
  2. 异常处理,例如相应的计数器减法操作连接置空等。

我们继续看是如何处理正确的响应的:

private void processResponseFromOrigin(final HttpResponse originResponse) {
    if (originResponse.status().code() >= 500) {
        handleOriginNonSuccessResponse(originResponse, chosenServer.get());
    } else {
        handleOriginSuccessResponse(originResponse, chosenServer.get());
    }
}
protected void handleOriginSuccessResponse(final HttpResponse originResponse, Server chosenServer) {
    // 计数器计数
    origin.recordSuccessResponse();
    if (originConn != null) {
        originConn.getServerStats().clearSuccessiveConnectionFailureCount();
    }
    final int respStatus = originResponse.status().code();
    long duration = 0;
    if (requestStat != null) {
        requestStat.updateWithHttpStatusCode(respStatus);
        requestStat.finishIfNotAlready();
        duration = requestStat.duration();
    }
    if (currentRequestAttempt != null) {
        currentRequestAttempt.complete(respStatus, duration, null);
    }

    final StatusCategory statusCategory = respStatus == 404 ? SUCCESS_NOT_FOUND : SUCCESS;
    // 构建响应
    zuulResponse = buildZuulHttpResponse(originResponse, statusCategory, context.getError());
    // 进行下一步操作
    invokeNext(zuulResponse);
}
  1. 计数器计数。
  2. 构建响应,同时会对响应的404响应码进行剥离,以便快速通知下游进行对应操作。
  3. 进行下一步操作。

响应对象的构建,是对响应头进行属性赋值,HttpContent内容进行解析。

public void invokeNext(final HttpResponseMessage zuulResponse) {
    try {
        methodBinding.bind(() -> filterResponse(zuulResponse));
    } catch (Exception ex) {
        unlinkFromOrigin();
        LOG.error("Error in invokeNext resp", ex);
        channelCtx.fireExceptionCaught(ex);
    }
}
private void filterResponse(final HttpResponseMessage zuulResponse) {
    if (responseFilters != null) {
        responseFilters.filter(zuulResponse);
    } else {
        channelCtx.fireChannelRead(zuulResponse);
    }
}

和接到请求时的Handler也会调用BaseZuulFilterRunner相同,响应时也会进行同样的调用,不同的是请求时执行的是InboundFilter,而响应时执行的是OutboundFilter

至此,请求到Origin,再从Origin到响应的整个流程就全部解析完成了。

总结

  • 对于每个请求的响应,Zuul 2进行的是异步处理,也就是在当前线程向Channel中写入数据后,会去继续处理其他线程,然后异步等待后端服务的响应。
  • 收到响应后,也会进行责任链的处理模式。
  • Zuul 2通过责任链对响应进行组装,返回给前端服务。
赞(2) 投币

评论 抢沙发

慕勋的实验室慕勋的研究院

码字不容易,路过请投币

支付宝扫一扫

微信扫一扫