Spring Boot Callable 异步接口服务端超时后是否会继续执行业务逻辑

Spring Boot juc About 5,283 words

Controller 代码

使用了Callable作为返回值,Spring Boot会当作异步接口来处理。

@GetMapping("/timeout")
public Callable<String> timeout() {
    System.out.println(LocalDateTime.now() + ": 准备超长处理, threadId: " + Thread.currentThread().getId());
    return () -> {
        System.out.println(LocalDateTime.now() + ": 超长处理中, threadId: " + Thread.currentThread().getId());
        LockSupport.park();
        System.out.println(LocalDateTime.now() + ": 继续执行, threadId: " + Thread.currentThread().getId());
        return "timeout";
    };
}

客户端请求

超时后客户端收到503错误。

❯ curl -vv http://localhost:8081/timeout
*   Trying 127.0.0.1:8081...
* Connected to localhost (127.0.0.1) port 8081 (#0)
> GET /timeout HTTP/1.1
> Host: localhost:8081
> User-Agent: curl/7.88.1
> Accept: */*
>
< HTTP/1.1 503
< Cache-Control: private
< Content-Type: text/html;charset=utf-8
< Content-Language: en
< Content-Length: 451
< Date: Fri, 14 Jul 2023 04:40:55 GMT
< Connection: close
<
* Closing connection 0
<!doctype html><html lang="en"><head><title>HTTP Status 503 – Service Unavailable</title><style type="text/css">body {font-family:Tahoma,Arial,sans-serif;} h1, h2, h3, b {color:white;background-color:#525D76;} h1 {font-size:22px;} h2 {font-size:16px;} h3 {font-size:14px;} p {font-size:12px;} a {color:black;} .line {height:1px;background-color:#525D76;border:none;}</style></head><body><h1>HTTP Status 503 – Service Unavailable</h1></body></html>%

服务端日志

可以发现30秒后,中断了sleep,直接继续执行sleep后代码。

没有休眠60秒,直接被唤醒,执行后续代码。

2023-07-14T10:18:36.543868: 超长处理, threadId: 28
java.lang.InterruptedException: sleep interrupted
    at java.base/java.lang.Thread.sleep(Native Method)
    at TestController.lambda$timeout$0(AuthController.java:39)
    at org.springframework.web.context.request.async.WebAsyncManager.lambda$startCallableProcessing$4(WebAsyncManager.java:337)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
2023-07-14T10:19:07.341425: 继续执行, threadId: 51
2023-07-14 10:19:07.357  WARN 61586 --- [nio-8081-exec-2] .w.s.m.s.DefaultHandlerExceptionResolver : Resolved [org.springframework.web.context.request.async.AsyncRequestTimeoutException]

原理

实际调用是走到WebAsyncManagerstartCallableProcessing方法。

调用taskExecutorsubmit,里面调用Callablecall方法运行run方法体。

// org.springframework.web.context.request.async.WebAsyncManager#startCallableProcessing(org.springframework.web.context.request.async.WebAsyncTask<?>, java.lang.Object...)
public final class WebAsyncManager {
    public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext) throws Exception {
        // ...
        try {
            Future<?> future = this.taskExecutor.submit(() -> {
                Object result = null;
                try {
                    result = callable.call();
                }
                catch (Throwable ex) {
                    result = ex;
                }
            });
        }
        catch (RejectedExecutionException ex) {
            Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, ex);
            setConcurrentResultAndDispatch(result);
            throw ex;
        }
    }
}

FutureTaskrun方法内会捕获Throwable级别的异常,且不对外抛出。

所以Callablerun方法体内无法感知是否有异常发生。对于Callable中的业务逻辑排查增加了难度。

// java.util.concurrent.FutureTask#run
public class FutureTask<V> implements RunnableFuture<V> {
    public void run() {
        if (state != NEW ||
            !RUNNER.compareAndSet(this, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // ...
        }
    }
}

最后会调用FutureTaskcancel(boolean mayInterruptIfRunning)方法,且参数mayInterruptIfRunningtrue,意味着会调用Threadinterrupt方法,唤醒可能有阻塞线程的操作。

// java.util.concurrent.FutureTask#cancel
public class FutureTask<V> implements RunnableFuture<V> {
    public boolean cancel(boolean mayInterruptIfRunning) {
        // ...
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    STATE.setRelease(this, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }
}
Views: 792 · Posted: 2023-07-18

————        END        ————

Give me a Star, Thanks:)

https://github.com/fendoudebb/LiteNote

扫描下方二维码关注公众号和小程序↓↓↓

扫描下方二维码关注公众号和小程序↓↓↓


Today On History
Browsing Refresh