异步 Servlet 原理

servlet 3.0 的规范中,有异步servlet特性,这个可以增大吞吐量。我们有必要看看 spring 是如何适配这个特性的。

实现异步 servlet

spring mvc 中,实现异步servlet有多种方式,比如 DeferredResultCallable,相关代码见末尾

DeferredResult 方式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@GetMapping("/test2")
public DeferredResult<String> test2() {
    before();
    DeferredResult<String> result = new DeferredResult<>();
    executor.submit(() -> {
        process();
        result.setResult("test2");
    });
    after();
    return result;
}

相关日志:

deferredResult-log.png
deferredResult-log

Callable 方式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
@GetMapping("/test4")
public Callable<String> test4() {
    before();
    Callable<String> callable = () -> {
        process();
        return "test4";
    };
    after();
    return callable;
}

相关日志:

callable-log.png
callable-log

源码解读

spring 中,有一个特殊的接口 HandlerMethodReturnValueHandler,专门来处理请求的返回值

处理 DeferredResult

源码位置: org.springframework.web.servlet.mvc.method.annotation.DeferredResultMethodReturnValueHandler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class DeferredResultMethodReturnValueHandler implements HandlerMethodReturnValueHandler {

	@Override
	public boolean supportsReturnType(MethodParameter returnType) {
	    // 判断类型
		Class<?> type = returnType.getParameterType();
		return (DeferredResult.class.isAssignableFrom(type) ||
				ListenableFuture.class.isAssignableFrom(type) ||
				CompletionStage.class.isAssignableFrom(type));
	}

	@Override
	public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
			ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
        ...
		DeferredResult<?> result;

		if (returnValue instanceof DeferredResult) {
			result = (DeferredResult<?>) returnValue;
		}
		else if (returnValue instanceof ListenableFuture) {
			result = adaptListenableFuture((ListenableFuture<?>) returnValue);
		}
		else if (returnValue instanceof CompletionStage) {
			result = adaptCompletionStage((CompletionStage<?>) returnValue);
		}
		else {
			// Should not happen...
			throw new IllegalStateException("Unexpected return value type: " + returnValue);
		}
        // 开始异步处理
		WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer);
	}
}

处理 Callable

源码位置: org.springframework.web.servlet.mvc.method.annotation.CallableMethodReturnValueHandler

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
public class CallableMethodReturnValueHandler implements HandlerMethodReturnValueHandler {

	@Override
	public boolean supportsReturnType(MethodParameter returnType) {
	    // 判断类型
		return Callable.class.isAssignableFrom(returnType.getParameterType());
	}

	@Override
	public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
			ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
        ...
		Callable<?> callable = (Callable<?>) returnValue;
		// 开始异步处理
		WebAsyncUtils.getAsyncManager(webRequest).startCallableProcessing(callable, mavContainer);
	}
}

从上面两个类可以看出,最终都是调用了 WebAsyncManager 类的 startDeferredResultProcessing 或者 startCallableProcessing 方法, 这两个方法的内部实现都是差不多的,下面以 startCallableProcessing 为例。

WebAsyncManager

源码位置: org.springframework.web.context.request.async.WebAsyncManager#startCallableProcessing

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext)
        throws Exception {
    ...
    List<CallableProcessingInterceptor> interceptors = new ArrayList<>();
    interceptors.add(webAsyncTask.getInterceptor());
    interceptors.addAll(this.callableInterceptors.values());
    interceptors.add(timeoutCallableInterceptor);

    final Callable<?> callable = webAsyncTask.getCallable();
    final CallableInterceptorChain interceptorChain = new CallableInterceptorChain(interceptors);

    // 设置超时处理器
    this.asyncWebRequest.addTimeoutHandler(() -> {
        if (logger.isDebugEnabled()) {
            logger.debug("Async request timeout for " + formatRequestUri());
        }
        Object result = interceptorChain.triggerAfterTimeout(this.asyncWebRequest, callable);
        if (result != CallableProcessingInterceptor.RESULT_NONE) {
            setConcurrentResultAndDispatch(result);
        }
    });

    // 设置错误处理
    this.asyncWebRequest.addErrorHandler(ex -> {
        if (!this.errorHandlingInProgress) {
            if (logger.isDebugEnabled()) {
                logger.debug("Async request error for " + formatRequestUri() + ": " + ex);
            }
            Object result = interceptorChain.triggerAfterError(this.asyncWebRequest, callable, ex);
            result = (result != CallableProcessingInterceptor.RESULT_NONE ? result : ex);
            setConcurrentResultAndDispatch(result);
        }
    });

    // 设置完成处理器
    this.asyncWebRequest.addCompletionHandler(() ->
            interceptorChain.triggerAfterCompletion(this.asyncWebRequest, callable));

    // 执行钩子
    interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, callable);
    // 开启异步处理,就是 request#startAsync (servlet api)
    startAsyncProcessing(processingContext);
    try {
        Future<?> future = this.taskExecutor.submit(() -> {
            Object result = null;
            try {
                // 执行钩子 applyPreProcess
                interceptorChain.applyPreProcess(this.asyncWebRequest, callable);
                // 处理请求
                result = callable.call();
            }
            catch (Throwable ex) {
                result = ex;
            }
            finally {
                // 执行钩子 applyPostProcess
                result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, result);
            }
            // 设置结果, 然后 dispatch, 当前这个请求就会再次处理,会被 RequestMappingHandlerAdapter#invokeHandlerMethod 拦截
            setConcurrentResultAndDispatch(result);
        });
        interceptorChain.setTaskFuture(future);
    }
    catch (RejectedExecutionException ex) {
        Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, ex);
        setConcurrentResultAndDispatch(result);
        throw ex;
    }
}

源码位置: org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter#invokeHandlerMethod

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected ModelAndView invokeHandlerMethod(HttpServletRequest request,
        HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {
        ...
        WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
        // 检查是否有异步结果
        if (asyncManager.hasConcurrentResult()) {
            Object result = asyncManager.getConcurrentResult();
            mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0];
            asyncManager.clearConcurrentResult();
            // 这里会返回一个新的 handlerMethod, 这个很重要
            invocableMethod = invocableMethod.wrapConcurrentResult(result);
        }

        // 返回 json
        invocableMethod.invokeAndHandle(webRequest, mavContainer);
        if (asyncManager.isConcurrentHandlingStarted()) {
            return null;
        }
        // 对于 json 请求来说,这里不会执行
        return getModelAndView(mavContainer, modelFactory, webRequest);
    }
    finally {
        webRequest.requestCompleted();
    }
}

task 线程池

Callable 的执行,需要线程池,默认配置类如下:

源码位置: org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration

1
2
3
4
5
6
7
@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
        AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
    return builder.build();
}

代码

demo-spring-async-servlet

0%