响应式编程学习

响应式流规范

相关链接: https://github.com/reactive-streams/reactive-streams-jvm

Reactive Streams是JVM的面向流的库的标准和规范,目的是实现

  • 具有处理无限数量的元素的能力
  • 按序处理
  • 异步地传递元素
  • 必须实现非阻塞的回压(backpressure)

响应式流接口

  • Publisher(发布者,发出元素)
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
  • Subscriber(订阅者,接收元素并做出响应的)
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
  • Subscription
public interface Subscription {
    public void request(long n);
    public void cancel();
}
  • Processor(集合Publisher和Subscriber)
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

当执行subscribe方法时,发布者会回调订阅者的onSubscribe方法,订阅者借助传入的Subscription的request方法向发布者请求n个数据。发布者通过不断调用订阅者的onNext方法向订阅者发出最多n个数据。如果数据全部发完,则会调用onComplete告知订阅者流已经发完;如果有错误发生,则通过onError发出错误数据,同样也会终止流。

Reactor

Reactor是完全基于Reactive Streams设计和实现的库,也是 Spring 5 中反应式编程的基础。

Flux与Mono

Flux与Mono都是发布者(Publisher),一个Flux对象代表一个包含 0-N 个元素的异步序列,而一个Mono对象代表一个异步的 0-1 个元素。

  • 创建数据流
Flux.fromArray(array);
Flux.fromIterable(list);
Flux.fromStream(stream);
Flux.just(...);
  • 订阅
// 订阅并触发数据流
subscribe(); 
// 订阅并指定对正常数据元素如何处理
subscribe(Consumer<? super T> consumer); 
// 订阅并定义对正常数据元素和错误信号的处理
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); 
// 订阅并定义对正常数据元素、错误信号和完成信号的处理
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); 
// 订阅并定义对正常数据元素、错误信号和完成信号的处理,以及订阅发生时的处理逻辑
subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer);

//example
Flux<Integer> ints = Flux.range(1, 4); 
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> {System.out.println("Done");}); 
  • 自定义数据流-generate

generate是一种同步地,逐个地发出数据的方法。因为它提供的sink是一个SynchronousSink, 而且其next()方法在每次回调的时候最多只能被调用一次。

public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator);

public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator);

public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer);

//example
//1. 初始化状态值(state)为0
//2. 基于状态值 state 来生成下一个值(state 乘以 3)
//3. 当状态值 state = 10 ,终止序列
//4. 返回一个新的状态值 state,用于下一次调用
//element "3 x 0 = 0" , "3 x 1 = 3" , "3 x 2 = 6" ... "3 x 10 = 30"
Flux<String> flux = Flux.generate(
    () -> 0, 
    (state, sink) -> {
      sink.next("3 x " + state + " = " + 3*state); 
      if (state == 10) sink.complete(); 
      return state + 1; 
    });

  • 自定义数据流-create

更高级的创建Flux的方法,其生成数据流的方式既可以是同步的,也可以是异步的,并且还可以每次发出多个元素(create 常用的场景就是将现有的 API 转为响应式,比如监听器的异步方法)。

MyEventSource eventSource = new MyEventSource();    
Flux.create(sink -> {
				//向事件源注册用匿名内部类创建的监听器
                eventSource.register(new MyEventListener() {    
                    @Override
                    public void onNewEvent(MyEventSource.MyEvent event) {
						//监听器在收到事件回调的时候通过sink将事件再发出
                        sink.next(event);
                    }

                    @Override
                    public void onEventStopped() {
						//监听器在收到事件源停止的回调的时候通过sink发出完成信号
                        sink.complete();        
                    }
                });
            }
).subscribe(System.out::println);

调试

  • 单元测试工具StepVerifier

expectNext用于测试下一个期望的数据元素,expectErrorMessage用于校验下一个元素是否为错误信号,expectComplete用于测试下一个元素是否为完成信号。

    StepVerifier.create(...)
        .expectNext(1, 2, 3, 4, 5, 6)
        .expectComplete()
        .verify();

Operator(操作符)

操作符选择器:https://htmlpreview.github.io/?https://github.com/get-set/reactor-core/blob/master-zh/src/docs/index.html#which-operator

  • map: 元素映射为新元素
  • flatMap: 元素映射为流
  • filter: 过滤
  • zip: 一对一合并:对两个Flux/Mono流每次各取一个元素,合并为一个二元组(Tuple2)
public static <T1,T2> Flux<Tuple2<T1,T2>> zip(Publisher<? extends T1> source1,Publisher<? extends T2> source2);
public static <T1, T2> Mono<Tuple2<T1, T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2);
  • transform: 可以将一段操作链打包为一个函数,这个函数能在操作期(assembly time)将被封装的操作链中的操作符还原并接入到调用 transform 的位置。
//将filter和map操作符进行了打包
Function<Flux<String>, Flux<String>> filterAndMap =
            f -> f.filter(color -> !color.equals("orange"))
                    .map(String::toUpperCase);

Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
        .doOnNext(System.out::println)
        //将包装的函数拼装到操作链
        .transform(filterAndMap)
        .subscribe(d -> System.out.println("Subscriber to Transformed MapAndFilter: "+d));

// result:
// blue
// Subscriber to Transformed MapAndFilter: BLUE
// green
// Subscriber to Transformed MapAndFilter: GREEN
// orange
// purple
// Subscriber to Transformed MapAndFilter: PURPLE
  • compose: 与 transform类似,将几个操作符封装到一个函数式中。区别是,这个函数式是针对每一个订阅者起作用的。对每一个subscription可以生成不同的操作链(通过维护状态值)。
AtomicInteger ai = new AtomicInteger();
Function<Flux<String>, Flux<String>> filterAndMap = f -> {
        if (ai.incrementAndGet() == 1) {
return f.filter(color -> !color.equals("orange"))
        .map(String::toUpperCase);
        }
        return f.filter(color -> !color.equals("purple"))
                .map(String::toUpperCase);
};

Flux<String> composedFlux =
Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
    .doOnNext(System.out::println)
    .compose(filterAndMap);

composedFlux.subscribe(d -> System.out.println("Subscriber 1 to Composed MapAndFilter :"+d));
composedFlux.subscribe(d -> System.out.println("Subscriber 2 to Composed MapAndFilter: "+d));

// result:
// blue
// Subscriber 1 to Composed MapAndFilter :BLUE
// green
// Subscriber 1 to Composed MapAndFilter :GREEN
// orange
// purple
// Subscriber 1 to Composed MapAndFilter :PURPLE
// blue
// Subscriber 2 to Composed MapAndFilter: BLUE
// green
// Subscriber 2 to Composed MapAndFilter: GREEN
// orange
// Subscriber 2 to Composed MapAndFilter: ORANGE
// purple

调度器&线程模型

Schedulers工具类

  • 当前线程(Schedulers.immediate())。
  • 可重用的单线程(Schedulers.single()),这个方法对所有调用者都提供同一个线程来使用,直到该调度器被废弃。Schedulers.newSingle()使用独占的线程。
  • 弹性线程池(Schedulers.elastic())。它根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长 (默认为 60s)就会被废弃。对于 I/O 阻塞的场景比较适用。Schedulers.elastic()能够方便地给一个阻塞 的任务分配它自己的线程,从而不会妨碍其他任务和资源。
  • 固定大小线程池(Schedulers.parallel()),所创建线程池的大小与CPU个数等同,适合做并行计算。
  • 自定义线程池(Schedulers.fromExecutorService(ExecutorService))基于自定义的ExecutorService创建 Scheduler。
// 基于Callable的Mono
Mono.fromCallable(() -> getStringSync())   
// 使用subscribeOn将任务调度到Schedulers内置的弹性线程池执行
       .subscribeOn(Schedulers.elastic()) 
       .subscribe(System.out::println);

publishOn与subscribeOn区别

Reactor 提供了两种在响应式链中调整调度器 Scheduler 的方法:publishOn 和 subscribeOn。 它们都接受一个 Scheduler 作为参数,从而可以改变调度器。

  • publishOn: publishOn 的用法和处于订阅链(subscriber chain)中的其他操作符一样。它将上游 信号传给下游,同时执行指定的调度器 Scheduler 的某个工作线程上的回调。 它会改变后续的操作符的执行所在线程 (直到下一个 publishOn 出现在这个链上)
  • subscribeOn: 用于订阅(subscription)过程,作用于那个向上的订阅链(发布者在被订阅 时才激活,订阅的传递方向是向上游的)。所以,无论把 subscribeOn 至于操作链的什么位置, 它都会影响到源头的线程执行环境(context),不会影响到后续的 publishOn,后者仍能够切换其后操作符的线程执行环境。

数据流配置Context

类似于 ThreadLocal ,区别是Context作用于一个 Flux 或一个 Mono 上,而不是应用于一个线程(Thread),其生命周期伴随整个数据流。

String key = "message";
Mono<String> r = Mono.just("Hello")
                .flatMap( s -> Mono.subscriberContext() 
                                   .map( ctx -> s + " " + ctx.get(key))) 
                .subscriberContext(ctx -> ctx.put(key, "World")); 

StepVerifier.create(r)
            .expectNext("Hello World") 
            .verifyComplete();

错误处理&重试

错误处理

  • 捕获并返回一个静态的缺省值:onErrorReturn方法能够在收到错误信号的时候提供一个缺省值。
Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn("RECOVERED");
  • 捕获并执行一个异常处理方法或计算一个候补值来顶替:onErrorResume方法能够在收到错误信号的时候提供一个新的数据流。
Flux.just("key1", "key2")
    .flatMap(k -> callExternalService(k)) 
    .onErrorResume(e -> getFromCache(k));
  • 捕获并再包装为某一个业务相关的异常,然后再抛出业务异常:onErrorMap。
Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));
  • 捕获并执行操作如记录错误日志等,然后继续传递:doOnError(只读,不会对数据流造成影响)。
LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
    .flatMap(k -> callExternalService(k)) 
    .doOnError(e -> {
        failureStat.increment();
        log("uh oh, falling back, service failed for key " + k); 
    })
    .onErrorResume(e -> getFromCache(k)); 
  • 使用 finally 来清理资源:doFinally方法在序列终止(onComplete、onError)时被执行,并且支持判断终止事件。
LongAdder statsCancel = new LongAdder(); 

Flux<String> flux =
Flux.just("foo", "bar")
    .doFinally(type -> {
        if (type == SignalType.CANCEL) 
          statsCancel.increment(); 
    })
    .take(1);

重试

retry可以对出现错误的序列进行重试,retry对于上游Flux是采取的重订阅(re-subscribing)的方式,出错了retry会从新订阅了原始的数据流执行n次。

Backpressure回压

前提

  1. 发布者与订阅者不在同一个线程中,因为在同一个线程中的话,通常使用传统的线性逻辑,不需要进行回压处理;
  2. 发布者发出数据的速度高于订阅者处理数据的速度,也就是处于"PUSH"状态下。

回压策略(OverflowStrategy)

  • IGNORE: 完全忽略下游回压请求
  • ERROR: 当下游跟不上上游的速率的时候发出一个错误信号。
  • DROP:当下游没有准备好接收新的元素的时候抛弃这个元素。
  • LATEST:让下游只得到上游最新的元素。
  • BUFFER:缓存下游没有来得及处理的元素(如果缓存不限大小的可能导致OutOfMemoryError)。

声明回压策略

public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) 

操作符调整回压策略

  • onBackpressureBuffer(对于来自其下游的request采取“缓存”策略)

    image

  • onBackpressureDrop(元素就绪时,根据下游是否有未满足的request来判断是否发出当前元素)

    image

  • onBackpressureLatest(当有新的request到来的时候,将最新的元素发出)

    image

  • onBackpressureError(当有多余元素就绪时,发出错误信号)

    image

通过BaseSubscriber定义自己的Subscriber进行上游流量控制

subscribe(new BaseSubscriber<Integer>() {  
                @Override
                protected void hookOnSubscribe(Subscription subscription) { 
                    //hookOnSubscribe定义在订阅的时候执行的操作,订阅时首先向上游请求1个元素
                    request(1);
                }

                @Override
                protected void hookOnNext(Integer value) {  
                    //hookOnNext定义每次在收到一个元素的时候的操作,每次处理完1个元素后再请求1个。
                    request(1);
                }