Java技术域异步编程总结

异步编程优势

异步编程是可以让程序并行运行的一种手段,其可以让程序中的一个工作单元与主应用程序线程分开独立运行,并且等工作单元运行结束后通知主应用程序线程它的运行结果或者失败原因。使用它有许多好处,例如改进的应用程序性能和减少用户等待时间等。异步编程充分利用单核CPU的性能,意在单个CPU上执行几个松耦合的任务。

同步阻塞对比异步非阻塞

当线程发生一次rpc调用或者http调用,又或者其他的一些耗时的IO调用,发起之后,如果是同步阻塞,这个线程就会被阻塞挂起,直到结果返回,如果IO调用很频繁的话CPU使用率其实会很低。通过异步非阻塞调用,当发生IO调用时我只需要把回调函数写入这次IO调用,我这个时候线程可以继续处理新的请求,当IO调用结束结束时,会调用回调函数,充分利用CPU资源。

异步编程场景

异步化并不是万能,异步化程序并不能缩短整个链路调用时间长的问题,而是旨在最大化提升qps,但也需要针对场景异步优化。

IO密集型:例如网络调用,文件传输,文件读取…,这个时候线程一般会挂起阻塞,异步编程可以针对这个场景进行优化。

CPU密集型:例如一些数据的聚合运算,对象的序列化,排序查找等CPU耗时任务,异步化并不能解决这个问题,需要进行一些算法的优化或者利用一些并行处理框架进行优化,充分利用多核CPU。

Java异步编程技术实现方式

线程池

具体使用方式如下代码所示:

    private final static ThreadPoolExecutor POOL_EXECUTOR =
            new ThreadPoolExecutor(AVALIABLE_PROCESSORS, AVALIABLE_PROCESSORS * 2,
                    1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(5),
                    new ThreadPoolExecutor.CallerRunsPolicy());

    /**
     * 投递任务,不获取返回值
     */
    @Test
    public void threadPoolAsyncRunnable() {
        StopWatch watch = new StopWatch();
        watch.start("ThreadPoolAsyncTask");
        POOL_EXECUTOR.execute(() -> doSomeThing("task1"));
        doSomeThing("task2");
        watch.stop();
        log.info("total time :{} millisecond", watch.getTotalTimeMillis());
    }

    /**
     * 投递任务,同步获取
     */
    @Test
    public void threadPoolAsyncSubmit() throws ExecutionException, InterruptedException {
        StopWatch watch = new StopWatch();
        watch.start("ThreadPoolAsyncTask");
        Future<String> resultFuture = POOL_EXECUTOR.submit(() -> doSomeThing("task1"));
        doSomeThing("task2");
        //同步等待结果
        String result = resultFuture.get();
        watch.stop();
        log.info("total time :{} millisecond", watch.getTotalTimeMillis());
    }

核心理念

使用线程来进行异步执行,避免阻塞当前线程,利用线程池来实现线程的复用。

线程池构造

图片

  • ctl:是Integer的原子变量,同时记录线程池状态和线程池中线程个数
- 线程池状态
RUNNING:接收新任务并且处理阻塞队列里的任务
SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务
STOP:拒绝新任务并且抛弃阻塞队列里的任务,同时中断正在处理的任务
TIDYING:所有任务都执行完(包含阻塞队列里面任务),当前线程池活动线程为0,将要调用terminated方法
TERMINATED:终止状态。terminated方法调用完成以后的状态

- 状态转换
显式调用shutdown()方法或者隐式调用了finalize():RUNNING→SHUTDOWN
显式调用shutdownNow()方法:RUNNING或者SHUTDOWN→STOP
当线程池和任务队列都为空时:SHUTDOWN→TIDYING
当线程池为空时:STOP→TIDYING
当terminated() hook方法执行完成时:TIDYING→TERMINATED
  • corePoolSize:线程池核心线程个数
  • workQueue:用于保存等待执行的任务的阻塞队列
  • maximunPoolSize:线程池最大线程数量
  • threadFactory:创建线程的工厂类
  • defaultHandler:饱和策略,当队列满了并且线程个数达到maximunPoolSize后采取的策略,比如AbortPolicy(抛出异常)、CallerRunsPolicy(使用调用者所在线程来运行任务)、DiscardOldestPolicy(调用poll丢弃一个任务,执行当前任务)、DiscardPolicy(默默丢弃,不抛出异常)
  • keeyAliveTime:存活时间。如果当前线程池中的线程数量比核心线程数量要多,并且是闲置状态的话,这些闲置的线程能存活的最大时间。

线程池投递任务原理简述

图片

  • execute

大致流程如下:

        public void execute(Runnable command) {
...

            //1. 获取当前线程池的状态+线程个数变量的组合值
            int c = ctl.get();

            //2. 当前线程池线程个数是否小于corePoolSize,小于则开启新线程运行
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
        	//3. 如果线程池处于RUNNING状态,则添加任务到阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {

            //3.1 二次检查
            int recheck = ctl.get();
            //3.2 如果当前线程池状态不是RUNNING则从队列删除任务,并执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);

            //3.3 如果当前线程池线程为空,则添加一个线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //4. 如果队列满了,则新增线程执行,如果当前线程池的线程个数大于maximumPoolSize执行拒绝策略
        else if (! addWorker(command, false))
            reject(command);
    }

  • submit

submit会将Runnable包装成RunnableFuture后,调用execute投递到线程池执行

  • worker

上述addWorker方法执行后,用户线程会马上返回,任务稍后再由Worker线程执行。Worker本身实现了Runnable方法,具体如下:

public void run() {
	runWorker(this); //委托给runWorker方法
}

worker的runWorker方法会获取线程进行任务执行,并且会执行调用前后的钩子方法。

线程池关闭

图片

总结

虽然线程池方式提供了线程复用可以获取任务返回值,但是获取返回值时还是需要阻塞调用线程的。

JDK的Future

JUC包中Future可以用于异步计算,Future中提供了一系列方法用来检查计算结果是否已经完成、同步等待任务执行完成、获取计算结果。

图片

FutureTask

FutureTask实现了Future接口,接受的任务可以是Callable类型,也可以是Runnable类型,一般被提交到线程池中进行异步执行,最后调用get系列方法阻塞获取。具体使用方式如下代码所示:

 		//异步执行task1
        FutureTask<String> task = new FutureTask<>(() -> doSomeThing("task1"));
        POOL_EXECUTOR.execute(task);

        //同步执行task2
        String result2 = doSomeThing("task2");
        //同步等待task1
        String result1 = task.get();

FutureTask构造

图片

  • state:用来记录任务的状态
private static final int NEW           = 0;		//新建
private static final int COMPLETING    = 1;		//完成中...
private static final int NORMAL        = 2;		//正常
private static final int EXCEPTIONAL   = 3;		//异常
private static final int CANCELLED     = 4;		//取消
private static final int INTERRUPTING  = 5;		//中断中...
private static final int INTERRUPTED   = 6;		//中断

- 状态转换
NEW→COMPLETING→NORMAL:正常终止流程转换
NEW→COMPLETING→EXCEPTIONAL:执行过程中发生异常流程转换
NEW→CANCELLED:任务还没开始就被取消
NEW→INTERRUPTING→INTERRUPTED:任务被中断
  • outcome:任务运行的结果
  • runner:记录运行该任务的线程
  • waiters:链表结构,记录等待任务结果的线程

FutureTask执行与获取

  • run(Runnable接口实现)
public void run() {
    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

当创建一个FutureTask时,其任务状态初始化为NEW,提交到线程或者线程池后,会有一个线程来执行该FutureTask任务,通过调用FutureTask的run()执行。run()方法判断并且设置state状态,调用call() 执行任务,当任务执行完毕后会把结果或者异常信息设置到outcome变量,最后调用LockSupport.unpark()唤醒waiters链表中所有由于等待获取结果而被阻塞的线程,并从waiters链表中移除它们。

  • get
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

在其他线程调用FutureTask的get()方法来等待获取结果,get()方法会判断任务状态是否小于等于COMPLETING,是则阻塞线程循环等待任务完成,加入waiters链表,调用LockSupport.park()挂起线程,如果任务状态为COMPLETING(正在执行),调用Thread.yield()进行线程让步。当线程被激活时,会去获取outcome变量拿到结果。

总结

FutureTask虽然提供了用来检查任务是否执行完成、等待任务执行结果、获取任务执行结果的方法,但是它并不能清楚地表达多个FutureTask之间的关系。而且从Future获取结果需要调用get()方法,该方法会在任务执行完毕前阻塞调用线程。

JDK的CompletableFuture

CompletableFuture是一个可以通过编程方式显式地设置计算结果和状态以便让任务结束的Future,并且其可以作为一个CompletionStage(计算阶段),当它的计算完成时可以触发一个函数或者行为;当多个线程企图调用同一个CompletableFuture的complete、cancel方式时只有一个线程会成功。

CompletableFuture所有异步的方法在没有显式指定Executor参数的情形下都是复ForkJoinPool的commonPool()线程池来执行。

CompletableFuture基于栈收集任务,所以在同一个CompletableFuture对象上行为注册的顺序与行为执行的顺序是相反的。

显式设置CompletableFuture结果

通过编程显式设置结果的future(complete),阻塞获取结果(get)。

CompletableFuture<String> future = new CompletableFuture<>();
        POOL_EXECUTOR.execute(()->{
            String result = doSomeThing("task1");
            //显式设置
            future.complete(result);
        });
        //阻塞获取
log.info("result:{}",future.get());

异步计算与结果转换

方法命名规律:以run为例子,不以Async结尾的方法由原来的线程计算,以Async结尾的方法由默认的线程池ForkJoinPool.commonPool()或者指定的线程池executor运行。

runAsync:无返回值的异步计算

static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)

supplyAsync:带有返回值的异步计算,可以通过get方法获取返回值

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

thenRunAsync:执行完成任务后,激活其他任务(其他任务拿不到之前任务的返回值)

 CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)

thenAcceptAsync:执行完成任务后,激活其他任务(其他任务可以拿到之前任务的返回值)

CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

//example
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> doSomeThing("task from thenRun1"),POOL_EXECUTOR);
CompletableFuture<Void> thenRunFuture = future.thenRunAsync(() -> doSomeThing("task from thenRun2"),POOL_EXECUTOR);

whenCompleteAsync:设置回调函数,通过回调的方式,不会阻塞调用线程

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> doSomeThing("task from supplyAsync"), POOL_EXECUTOR);
       future.whenCompleteAsync((s, throwable) -> {
           if (Objects.nonNull(s)){
               log.info("result:{}",s);
           }
       },POOL_EXECUTOR);

多CompletableFuture组合运算

thenCompose:当一个CompletableFuture执行完毕后,执行另外一个CompletableFuture

<U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn)

thenCombine:当两个并发运行的CompletableFuture任务都完成后,使用两者的结果作为参数再执行一个异步任务

<U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) 

//example
 CompletableFuture.supplyAsync(() -> doSomeThing("task from supplyAsync1"), POOL_EXECUTOR)
                .thenCombineAsync(CompletableFuture
                                .supplyAsync(() -> doSomeThing("task from supplyAsync2"), POOL_EXECUTOR)
                        , (result1, result2) -> result1 + result2)
                .whenCompleteAsync((s, throwable) -> {
                    if (Objects.nonNull(s)) {
                        log.info("result:{}", s);
                    }
                }, POOL_EXECUTOR);

allOf:等待多个并发运行的CompletableFuture任务执行完毕

 static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) 

 //example
ArrayList<CompletableFuture> futures = new ArrayList<>();
        futures.add(CompletableFuture.supplyAsync(() -> doSomeThing("task from supplyAsync1"), POOL_EXECUTOR));
        futures.add(CompletableFuture.supplyAsync(() -> doSomeThing("task from supplyAsync2"), POOL_EXECUTOR));
        futures.add(CompletableFuture.supplyAsync(() -> doSomeThing("task from supplyAsync3"), POOL_EXECUTOR));

        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).get();

anyOf:等多个并发运行的CompletableFuture任务任意一个执行完毕

 static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

异常处理

当出现异常时,可以调用future.completeExceptionally(e) 把异常信息设置到future内部,get获取时会把异常带出来,或者future.exceptionally设置出现异常时返回的默认值。

Stream与CompletableFuture

可以使用map将一些数据传进CompletableFuture进行异步调用,并转换成CompletableFuture。

List<String> requests = Arrays.asList("request1", "request2", "request3");

        //转换成CompletableFuture,异步调用
        List<CompletableFuture<String>> futures = requests.stream()
                .map(request -> CompletableFuture.supplyAsync(() -> getRpcResult(request), POOL_EXECUTOR))
                .collect(Collectors.toList());

        //同步阻塞等待调用完毕,收集结果
        List<String> rpcResponse = futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());

        log.info("rpcResponse:{}", rpcResponse);

RxJava与Reactor

响应式编程(Reactive Programming)是一种涉及数据流和变化传播的异步编程范式,可以通过所采用的编程语言轻松地表达静态(例如阵列)或动态(例如事件发射器)数据流。RxJava与Reactor都是对响应式编程理念实现,基于Reactive Streams标准,遵循了同一个规范,可以很轻易地从一方切换到另一方 。

对比Java的异步编程模型

Java提供了两种异步编程模型

  • CallBacks:异步方法没有返回值,采用 callback 作为参数(lambda 或匿名类),当结果出来后回调 callback。如CompletableFuture.whenCompleteAsync、EventListener。
  • Futures:异步方法立即返回Future<T>,异步线程计算任务,并当结果T计算出来后设置到Future中。如ExecutorService执行Callable<T> 任务后返回Future对象。

CallBacks与Futures局限性

多个Callback组合在一起后,容易形成回调地狱(Callback Hell);多个Future难以编排,尽管使用改进后的CompletableFuture,也存在不支持延迟计算和高级错误处理的缺陷。

响应式编程库基于声明式编程,比较Callbacks更通俗易懂,同时自带很多操作符,提供高级特性,简化处理。

参考对比:Reactor 3 参考文档

响应式编程库通过如下几点弥补Java异步编程模型的不足

  • 可编排性(Composability) 以及 可读性(Readability)
  • 使用丰富的 操作符 来处理形如 的数据
  • 订阅(subscribe) 之前什么都不会发生
  • 背压(backpressure) 具体来说即消费者能够反向告知生产者生产内容的速度的能力
  • 高层次 (同时也是有高价值的)的抽象,从而达到 并发无关 的效果

基于RxJava实现异步编程

  • observeOn(切换调度线程执行订阅者函数)

注意,observeOn切换到了其他线程异步执行,但是事件还是按发送顺序同步执行。

   Flowable.fromArray(requests.toArray(new String[0]))
                //切换到IO调度线程执行订阅者函数
                .observeOn(Schedulers.io())
                //异步调用,按发布顺序顺序执行
                .map(this::getRpcResponse)
                .subscribe(response->log.info("get Rpc Response:{}",response));
  • subscribeOn(切换到调度线程执行发布者函数)
Flowable.fromCallable(this::generateRequests)
            //切换到IO调度线程执行发布者函数
                .subscribeOn(Schedulers.io())
            //切换到Signle调度线程执行订阅者函数
                .observeOn(Schedulers.single())
                .subscribe(System.out::print);
  • flatMap与subscribeOn进行并发调用

针对上述observeOn例子顺序执行的情况,要做到并发执行,我们可以通过flatMap操作符配合subscribeOn进行操作。如下所示,使用flatMap将Request参数转换为Flowable,利用subscribeOn切换到调度线程执行,做到了并发调用。

  Flowable.fromArray(generateRequests())
                .flatMap(requests ->
                        //flatMap将Request转换为Flowable对象
                        Flowable.just(requests)
                                //切换到调度线程执行发布者函数
                                .subscribeOn(Schedulers.io())
                                //执行rpc调用转换成Response
                                .map(this::getRpcResponse))
                //阻塞等待所有的rpc调用并发执行完毕
                .blockingSubscribe(System.out::print);

基于Reactor实现异步编程

Reactor中的流操作符与RxJava基本相同,下面简单重写了上述RxJava的例子

  
  Flux.fromArray(generateRequests())
                .flatMap(requests ->
                        //flatMap将Request转换为Flowable对象
                        Flux.just(requests)
                                //切换到调度线程执行发布者函数
                                .subscribeOn(Schedulers.elastic())
                                //执行rpc调用转换成Response
                                .map(this::getRpcResponse))
                .subscribe(response -> {
                    log.info("get Rpc Response :{}", response);
                });
                
Flux.fromArray(generateRequests())
                //切换到IO调度线程执行订阅者函数
                .publishOn(Schedulers.elastic())
                .map(this::getRpcResponse)
                .subscribe(response -> log.info("get Rpc Response:{}", response));

Spring的TaskExecutor

Spring 2.0版本开始提供了一种新的处理执行器(executors)的抽象,TaskExecutor。

        public interface TaskExecutor {
            void execute(Runnable task);
        }

Spring内置TaskExecutor实现

  • SimpleAsyncTaskExecutor(每个请求会新创建一个对应的线程来执行)
  • SyncTaskExecutor(同步使用调用线程来执行任务)
  • ConcurrentTaskExecutor(对JDK5中的java.util.concurrent.Executor的一个包装,通过setConcurrentExecutor设置一个JUC中的线程池到其内部来做适配)
  • SimpleThreadPoolTaskExecutor(Quartz的SimpleThreadPool的子类,会监听Spring的生命周期回调)
  • ThreadPoolTaskExecutor(比较常用,用于配置java.util.concurrent.ThreadPoolExecutor并将其包装在TaskExecutor中。如果需要一些高级的接口,例如ScheduledThreadPoolExecutor,可以使用Concurrent TaskExecutor)
  • TimerTaskExecutor(对所有提交的任务都在Timer内的单独线程中执行)

在SpringBoot中使用TaskExecutor进行异步处理

  1. 配置Executor参数
@Slf4j
@Configuration
public class AsyncTaskConfig implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setThreadNamePrefix("AsyncThread-");
        threadPoolTaskExecutor.setCorePoolSize(100);
        threadPoolTaskExecutor.setMaxPoolSize(100);
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                // 打印线程池异常信息...
                super.rejectedExecution(r, e);
            }
        });
        //关闭执行器时不等待正在执行的任务执行完毕就中断执行任务的线程
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        threadPoolTaskExecutor.afterPropertiesSet();
        return threadPoolTaskExecutor;
    }
	
	//拒绝策略
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new SimpleAsyncUncaughtExceptionHandler();
    }
}

  1. 在SpringBootApplication类上添加@EnableAsync注解。
  2. 在方法上添加@Async,异步执行该方法,该方法的实际执行将发生在Spring的TaskExecutor异步处理器线程中。基于@Async注解的异步处理是支持返回值的,但是返回值类型必须是Future或者其子类类型的,如JDK的Future类型,Spring框架的ListenableFuture类型,或者JDK8中的 CompletableFuture类型,又或者Spring中的AsyncResult类型等。

@Async注解执行原理

@EnableAsync开启后会把ProxyAsyncConfiguration的实例注入Spring容器。默认情况下,Spring框架是使用Cglib对标注@Async注解的方法进行代理的,具体拦截器是AnnotationAsyncExecutionInterceptor作为切面逻辑,当我们调用含有@Async注解的Bean的方法时候,实际调用的是被代理后的Bean,AnnotationAsyncExecutionInterceptor的invoke方法如下:

    public Object invoke(final MethodInvocation invocation) throws Throwable {
        //1.被代理的目标对象
        Class<?> targetClass = (invocation.getThis() ! = null ? AopUtils.
                getTargetClass(invocation.getThis()) : null);
        //2. 获取被代理的方法
        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.
                getMethod(), targetClass);
        final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(
                specificMethod);
        //3. 判断使用哪个执行器执行被代理的方法
        AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
        if (executor == null) {
            throw new IllegalStateException(
                    "No executor specified and no default executor set on
                    AsyncExecutionInterceptor either");
        }
        //4. 使用Callable包装要执行的方法
        Callable<Object> task = () -> {
            try {
                Object result = invocation.proceed();
                if (result instanceof Future) {
                    return ((Future<?>) result).get();
                }
            } catch (ExecutionException ex) {
                handleError(ex.getCause(), userDeclaredMethod, invocation.
                        getArguments());
            } catch (Throwable ex) {
                handleError(ex, userDeclaredMethod, invocation.getArguments());
            }
            return null;
        };
        //5. 提交包装的Callable任务到指定执行器执行
        return doSubmit(task, executor, invocation.getMethod().getReturnType());
    }

doSubmit会判断方法的返回值类型进行包装,然后提交到线程池中运行。

    protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor,
                              Class<?> returnType) {
        //5.1判断方法返回值是否为CompletableFuture类型或者是其子类
        if (CompletableFuture.class.isAssignableFrom(returnType)) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    return task.call();
                } catch (Throwable ex) {
                    throw new CompletionException(ex);
                }
            }, executor);
        }
        //5.2判断返回值类型是否为ListenableFuture类型或者是其子类
        else if (ListenableFuture.class.isAssignableFrom(returnType)) {
            return ((AsyncListenableTaskExecutor) executor).
                    submitListenable(task);
        }
        //5.3判断返回值类型是否为ListenableFuture类型或者是其子类
        else if (Future.class.isAssignableFrom(returnType)) {
            return executor.submit(task);
        }
        //5.4其他情况下没有返回值
        else {
            executor.submit(task);
            return null;
        }
    }

Java Servlet异步编程

Servlet3.0规范前,Servlet容器的线程模型如下:

图片

每个请求对应一个线程这种1 : 1的模式进行处理的同步线程模型,线程数是有限的,当线程池资源耗尽后就不能接收处理新的请求了,限制了服务器的并发请求数。

Servlet3.0 提供的异步处理

Servlet 3.0规范中引入了异步处理请求的能力,相对3.0之前的同步线程模型,Servlet内开启异步处理后会立刻释放Servlet容器线程,具体对请求进行处理与响应的是业务线程池中的线程。

图片

官方例子:

@WebServlet(urlPatterns={"/asyncservlet"}, asyncSupported=true)
public class AsyncServlet extends HttpServlet {
   /* ... Same variables and init method as in SyncServlet ... */

   @Override
   public void doGet(HttpServletRequest request, 
                     HttpServletResponse response) {
      response.setContentType("text/html;charset=UTF-8");
      final AsyncContext acontext = request.startAsync();
      acontext.start(new Runnable() {
         public void run() {
            String param = acontext.getRequest().getParameter("param");
            String result = resource.process(param);
            HttpServletResponse response = acontext.getResponse();
            /* ... print to the response ... */
            acontext.complete();
            }
      });
   }
}

Servlet3.1 提供的非阻塞IO处理

Servlet 3.0规范让Servlet的执行变为了异步,但是其IO还是阻塞式的(从ServletInputStream中读取请求体时是阻塞的)。

为此在Servlet3.1规范中提供了非阻塞IO处理方式,(当内核支持)Servlet3.1允许我们在ServletInputStream上通过函数setReadListener注册一个监听器,该监听器在发现内核有数据时才会进行回调处理函数。

图片

示例:

    final AsyncContext asyncContext = req.startAsync();

    //设置数据就绪监听器
    final ServletInputStream inputStream = req.getInputStream();
    inputStream.setReadListener(new ReadListener() {

        @Override
        public void onError (Throwable throwable){
            //异常处理
        }

        @Override
        public void onDataAvailable ()throws IOException {
            //数据就绪时回调,获取数据流
            final ServletInputStream inputStream = asyncContext.
                    getRequest().getInputStream();
        }

        @Override
        public void onAllDataRead ()throws IOException {
            //请求体的数据全部被读取完毕后,进行业务处理
        }
    }

Netty

Netty是一个异步、基于事件驱动的网络应用程序框架,其对Java NIO进行了封装,简化了TCP/UDP服务器的网络编程开发。

Netty框架将网络编程逻辑与业务逻辑处理分离开来,其内部会自动处理好网络与异步处理逻辑,使用者只需要关注逻辑处理。Netty的异步非阻塞能力与CompletableFuture结合可以让我们轻松实现网络请求的异步调用。

很多现代化,高性能的Web框架,RPC框架底层都是用来Netty来实现,例如WebFlux,Vert.x,Dubbo,RocketMq…。

线程模型

图片

以Netty Server端为例子,NettyServer启动时会创建两个NioEventLoop Group线程池组,其中Boss Group用来接收客户端发来的连接,Worker Group 则负责对完成TCP三次握手的连接进行处理。图中每个NioEventLoopGroup里面包含了多个Nio EventLoop,每个NioEventLoop中包含了一个NIO Selector、一个队列、一个线程;其中线程用来做轮询注册到Selector上的Channel的读写事件和对投递到队列里面的事件进行处理。

当客户端发来一个连接请求时,Boss线程池组中注册了监听套接字的NioEventLoop中的Selector会读取TCP三次握手的请求,然后创建对应的连接套接字通道NioSocketChannel,接着把其注册到Worker线程池组的某一个NioEventLoop中管理的一个NIO Selector上,该连接套接字通道NioSocketChannel上的所有读写事件都由该NioEventLoop管理。

  • 非阻塞write

NioSocketChannel的write系列方法向连接里面写入数据时是非阻塞的,具体实现是执行write方法时判断是否是IO线程调用,不是则把写入请求封装为WriteTask并投递到与其对应的NioEventLoop中的队列里面。

        private void write(Object msg, boolean flush, ChannelPromise promise) {
            ...
            //1.如果调用线程是IO线程,直接执行
            EventExecutor executor = next.executor();
              if (executor.inEventLoop()) {
                  if (flush) {
                      next.invokeWriteAndFlush(m, promise);
                  } else {
                      next.invokeWrite(m, promise);
                  }
              } else {//2.如果调用线程不是IO线程,封装为WriteTask投递到对应的NioEventLoop中的队列,NioEventLoop的线程轮询队列处理
                  AbstractWriteTask task;
                  if (flush) {
                      task = WriteAndFlushTask.newInstance(next, m, promise);
                  }   else {
                      task = WriteTask.newInstance(next, m, promise);
                  }
                  safeExecute(executor, task, promise, m);
              }
          }

  • 非阻塞read

NioSocketChannel中读取数据时,等NioEventLoop中的IO轮询线程发现Selector上有数据就绪时,通过事件通知方式来通知我们业务数据已经就绪,是非阻塞的。