标准 专业
多元 极客

Reactor实验室(1)——从命令式编程到响应式编程

Reactor是一种异步I/O的,基于事件驱动的模式,它可以同时接收多个服务请求,并对请求进行调度分离。

我们一般会采用开线程池,一部分人还会考虑异步的问题。

这些其实都是并行化的解决方式。

但是并行化会增加系统的复杂性,比如回调组合,会造Callback Hell

再比如Future是必须等到程序处理完毕之后才会得到结果。

这些编程模式我们称之为命令式编程

那么Reactor其实就是从另一个角度来解决这些问题。

我们先介绍一下Reactor,然后讲讲Reactor的优点。

规范

目前业界内已经制定响应式流开发规范,其中定了一些响应式流的特点:

  1. 可以无限流。
  2. 按序处理。
  3. 异步处理。
  4. 非阻塞背压。

它同时定义了四种角色,分别是:

  • Publisher,发布者,执行subscribe()方法后,发布者会回调onSubscribe()方法。
  • Subscriber,订阅者,接受元素,并处理和响应。
  • Subscription,订阅通道,onSubscribe()会传递Subscription对象,它主要负责从下游到上游进行回溯,完成背压。
  • Processor,发布者与订阅者融为一体。

BackPressure

怎么翻译都行,我习惯翻译成背压,就是给消费者带来的压力。

在传统消息推送中,如果疯狂推送消息给消费者,消费者自己就崩溃,而疯狂轮询Broker,Broker可能也崩溃了。

背压是建立一条消费者到生产者之间的一条道路,订阅者可以通过request(n)方法来告诉生产者它能消费多少,生产者就推给它多少,其实是一种推拉集合的方式。

可以联想到Kafka的消费模型。

Flux

Flux是一个至少包含0个元素的发布者,同之前一样,也是使用onNext()onComplete()onError()方法。

创建Flux花样繁多,但是创建出来的Flux可能用途不同。

首先,Flux作为一个发布者的同时,也是一个工具类,提供了多种方式创建发布序列。

Flux.just("Hello World")    

我们可以通过just创建一个简单的序列。
同时也可以使用range创建一个递增序列:

Flux.range(1, 5);

其实range()的底层还是just()

当我们调用Flux的subscribe()方法时,一个Reactor模型就完成了。

@Test
public void fluxTest() {
    try {
        Flux.just("hello world").subscribe(System.out::println);
        Flux.just(1, 2, 3, 4, 5, 6).subscribe(System.out::println);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

输出:

hello world
1
2
3
4
5
6

当然subscribe()方法不能如此简单,它有四种表现形式:

  • subscribe()
  • subscribe(consumer)
  • subscribe(consumer, errorConsumer)
  • subscribe(consumer, errorConsumer, completeConsumer)

那么我们就来一个完整的subscribe()方法看一下:

@Test
public void multiFluxTest() {
    try {
        Flux.just(1, 2, 3, 4, new Exception("error"), 5).subscribe(
                System.out::println,
                System.err::println,
                () -> System.out.println("mission complete")
        );
    } catch (Exception e) {
        e.printStackTrace();
    }
}

输出:

1
2
3
4
java.lang.Exception: error
5
mission complete

Mono

Mono是一个包含至多1个元素的发布者,和Flux一样,同样兼容onNext()onComplete()onError()方法。

Flux中的操作只有一部分适用于Mono,我们也可以通过just()方法创建一个Mono序列:

@Test
public void simpleTest() {
    try {
        Mono.just(1).subscribe(
                System.out::println
        );
    } catch (Exception e) {
        e.printStackTrace();
    }
}

但是我们可以明显的看到,Fluxjust()方法支持多入参,而Monojust()只支持单入参。

两个Mono结合操作可以忽略其中一个Mono,也可以将两个Mono切换为一个Flux进行生产。

@Test
public void multiTest() {
    try {
        // 二合一为Flux
        Mono.just(1).concatWith(Mono.just(2)).subscribe(
                System.out::println
        );
        // 结果还是Mono
        Mono.just(1).then(Mono.just(2)).subscribe(
                System.out::println
        );
    } catch (Exception e) {
        e.printStackTrace();
    }
}

输出:

1
2
2

自定义序列

前面介绍的Flux序列创建比较基础,其实就类似Executor的思维方式,分别提供了懒人模式和创造模式,接下来我们就展示下序列的自定义。

generate()

generate()方式创建的序列是同步序列,可以通过SynchrounsLink对象看出来。

@Test
public void generateTest() {
    try {
        // 同步流
        Flux fluxTwo = Flux.generate(synchronousSink -> {
            var random = ThreadLocalRandom.current().nextInt(100000);
            synchronousSink.next(random);

            if (random > 50000) {
                synchronousSink.complete();
            }
        });

        fluxTwo.subscribe(
                System.out::println
        );
    } catch (Exception e) {
        e.printStackTrace();
    }
}

流的输出:

78550

create()

create()是一个可同可异的序列。

@Test
public void createMultiTest() {
    try {
        CreateEvent createEvent = new CreateEvent();
        Flux<String> eventFlux = Flux.create(sink -> {
            createEvent.register(new Listener() {
                @Override
                public void listener(Event event) {
                    sink.next(event.getMessage() + "---" + event.getTime());
                }

                @Override
                public void complete() {
                    sink.complete();
                }
            });
        });

        eventFlux.subscribe(System.out::println);

        for (int i = 0; i < 10000; i ++) {
            int random = ThreadLocalRandom.current().nextInt(10000);
            TimeUnit.MILLISECONDS.sleep(5000);
            createEvent.next(new Event("Event:" + random, new Date()));
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}    

背压策略

前面我们已经提到过背压,Ractor3.0根据背压提供了5种背压策略。

  • IGNORE,忽略下游服务的背压请求,不管下游服务是否能够接受并处理,上游服务保持自己的步调。
  • ERROR,当下游服务无法跟上上游发送元素的速度,上游服务将会发送一个错误信号。
  • DROP,当下游服务无法处理上游发送的元素时,上游服务抛弃这个元素。
  • LATEST,下游服务只会接受上游服务的最新元素。
  • BUFFER,缓存下游服务没有处理的元素。

默认情况下,create()采用的就是BUFFER形式的背压策略,所以一般情况下,下游服务会接收所有来自上游服务的元素。

Buffuer

那么接下来我们用一个实验对这集中情形做一下测试:

@Test
public void pressureMultiTest() {
    try {
        CreateEvent createEvent = new CreateEvent();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        BaseSubscriber slowBaseSubscriber = new BaseSubscriber<String>() {
            @Override
            protected void hookOnNext(String value) {
                try {
                    TimeUnit.MILLISECONDS.sleep(1000);
                    System.out.println("Slow:" + value);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            @Override
            protected void hookOnComplete() {
                countDownLatch.countDown();
            }
        };

        Flux flux = Flux.create(sink -> {
            createEvent.register(new Listener() {
                @Override
                public void listener(Event event) {
                    sink.next(event.getMessage() + "---" + event.getTime());
                }

                @Override
                public void complete() {
                    sink.complete();
                }
            });
        }, FluxSink.OverflowStrategy.BUFFER)
                .publishOn(Schedulers.newSingle("new Elastic"), 2).log();

        flux.subscribe(slowBaseSubscriber);

        for (int i = 0; i < 100000; i ++) {
            int random = ThreadLocalRandom.current().nextInt(10000);
            createEvent.next(new Event("Event:" + i, new Date()));
        }
        createEvent.complete();
        countDownLatch.await();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
20:43:28.508 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:43:28.558 [main] INFO reactor.Flux.PublishOn.1 - | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
20:43:28.563 [main] INFO reactor.Flux.PublishOn.1 - | request(unbounded)
20:43:28.677 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:0---Fri Apr 27 20:43:28 CST 2018)
Slow:Event:0---Fri Apr 27 20:43:28 CST 2018
20:43:29.695 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:1---Fri Apr 27 20:43:28 CST 2018)
Slow:Event:1---Fri Apr 27 20:43:28 CST 2018
20:43:30.698 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:2---Fri Apr 27 20:43:28 CST 2018)
Slow:Event:2---Fri Apr 27 20:43:28 CST 2018
20:43:31.703 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:3---Fri Apr 27 20:43:28 CST 2018)
Slow:Event:3---Fri Apr 27 20:43:28 CST 2018
20:43:32.708 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:4---Fri Apr 27 20:43:28 CST 2018)
Slow:Event:4---Fri Apr 27 20:43:28 CST 2018
20:43:33.711 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:5---Fri Apr 27 20:43:28 CST 2018)
Slow:Event:5---Fri Apr 27 20:43:28 CST 2018
20:43:34.715 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:6---Fri Apr 27 20:43:28 CST 2018)
Slow:Event:6---Fri Apr 27 20:43:28 CST 2018
20:43:35.719 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:7---Fri Apr 27 20:43:28 CST 2018)
Slow:Event:7---Fri Apr 27 20:43:28 CST 2018
20:43:36.724 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:8---Fri Apr 27 20:43:28 CST 2018)
Slow:Event:8---Fri Apr 27 20:43:28 CST 2018
20:43:37.728 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:9---Fri Apr 27 20:43:28 CST 2018)
Slow:Event:9---Fri Apr 27 20:43:28 CST 2018
20:43:38.732 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:10---Fri Apr 27 20:43:28 CST 2018)
Slow:Event:10---Fri Apr 27 20:43:28 CST 2018
20:43:39.737 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:11---Fri Apr 27 20:43:28 CST 2018)
Slow:Event:11---Fri Apr 27 20:43:28 CST 2018
20:43:40.742 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:12---Fri Apr 27 20:43:28 CST 2018)
Slow:Event:12---Fri Apr 27 20:43:28 CST 2018
20:43:41.746 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:13---Fri Apr 27 20:43:28 CST 2018)
Slow:Event:13---Fri Apr 27 20:43:28 CST 2018
20:43:42.750 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:14---Fri Apr 27 20:43:28 CST 2018)
Slow:Event:14---Fri Apr 27 20:43:28 CST 2018
20:43:43.755 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:15---Fri Apr 27 20:43:28 CST 2018)
Slow:Event:15---Fri Apr 27 20:43:28 CST 2018
20:43:44.758 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:16---Fri Apr 27 20:43:28 CST 2018)
Slow:Event:16---Fri Apr 27 20:43:28 CST 2018

IGNORE

上游服务疯狂onNext(),不管订阅是否能坚持住。

日志太长了,直截出一段来看看:

21:00:31.958 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: Event:99999---Fri Apr 27 21:00:31 CST 2018

由于99999订阅者没有进行消费,所以上有服务发出了一个信号告知此元素没有进行消费。

DROP

20:50:29.560 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:50:29.596 [main] INFO reactor.Flux.PublishOn.1 - | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
20:50:29.600 [main] INFO reactor.Flux.PublishOn.1 - | request(unbounded)
20:50:29.685 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:0---Fri Apr 27 20:50:29 CST 2018)
Slow:Event:0---Fri Apr 27 20:50:29 CST 2018
20:50:30.694 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:1---Fri Apr 27 20:50:29 CST 2018)
Slow:Event:1---Fri Apr 27 20:50:29 CST 2018
20:50:31.699 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onComplete()

因为一次请求两个元素,获取元素之前线程没有睡觉,所以会获取Flux生产的0,1两个元素。

当它们睡了一觉后,由于Flux采取的是丢弃策略,所以Flux中已经没有元素了,就会执行onComplete()动作。

LATEST

20:47:33.350 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
20:47:33.378 [main] INFO reactor.Flux.PublishOn.1 - | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
20:47:33.381 [main] INFO reactor.Flux.PublishOn.1 - | request(unbounded)
20:47:33.444 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:0---Fri Apr 27 20:47:33 CST 2018)
Slow:Event:0---Fri Apr 27 20:47:33 CST 2018
20:47:34.459 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:1---Fri Apr 27 20:47:33 CST 2018)
Slow:Event:1---Fri Apr 27 20:47:33 CST 2018
20:47:35.463 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onNext(Event:99999---Fri Apr 27 20:47:33 CST 2018)
Slow:Event:99999---Fri Apr 27 20:47:33 CST 2018
20:47:36.467 [new Elastic-1] INFO reactor.Flux.PublishOn.1 - | onComplete()

因为一次会请求两个元素,获取元素之前线程并没有睡觉,所以会获取Flux生产的0,1两个元素。

当它们睡了一觉之后,Flux最新的元素就是99999了,所以只有一个线程会消费这个元素。

由于Flux会缓存生产未发出去的元素,理想状态下,订阅者不会丢失任何元素。

OutOfMemory的情况请大家自行探查。

调度器

调度器相当于ExecuteService,提供执行服务,也就是说,不同的调度器定义了不同的线程执行环境。

同Executors一样,Schedulers也预定义了几种不同线程池模型的调度器:

  • immediate()。当前线程。
  • single()。可重用的单线程。
  • elastic()。弹性线程池。
  • parallel()。固定大小线程池。

如果仍不满意上面几种预设模式,还可以通过Schedulers.fromExecutorService()创建专属调度器。

在响应式链中,我们可以通过publishOn或者subscribeOn来调度Scheduler

publishOn

它和其他操作符一样,将上游元素传递给下游元素,并且执行Scheduler中Worker的回调,而我们可以看出,它改变了后续操作符执行所在的线程。

听起来有点绕口,比如我说我们有这样一个实验:

@Test
public void schedulerTest() {
    try {
        Flux.just("java", "go", "scala", "python", "c++", "rust", "kotlin")
                .log()
                .map(s -> s.toUpperCase())
                .log()
                .filter(s -> s.length() < 5)
                .log()
                .subscribe(System.out::println);

        Flux.just("java", "go", "scala", "python", "c++", "rust", "kotlin")
                .log()
                .map(s -> s.toUpperCase())
                .log()
                .publishOn(Schedulers.parallel())
                .filter(s -> s.length() < 5)
                .log()
                .subscribe(System.out::println);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

两段程序输出对比,很简单的看出,再从map链到filter链,线程由主线程切换到了parallel线程,最后订阅者输出也是parallel线程

subscribeOn

其实是面向过程的一种订阅,作用于向上的订阅链。

所以无论在何处使用subscribeOn,都会借助自下而上的订阅链,通过subscribe()方法,将线程执行环境传递到上游。

但是subscrbeOn()不会影响publishOnpublishOn仍然会影响线程的执行环境。但当多个subscribeOn出现时,只有第一个subscribeOn会生效。

我们继续做一个实验:

@Test
public void schedulerSubscribeOnTest() {
    try {
        Flux.just("java", "go", "scala", "python", "c++", "rust", "kotlin")
                .log()
                .map(s -> s.toUpperCase())
                .log()
                .publishOn(Schedulers.parallel())
                .filter(s -> s.length() < 5)
                .log()
                .subscribeOn(Schedulers.elastic())
                .blockLast();
    } catch (Exception e) {
        e.printStackTrace();
    }
}   

从实验结果我们可以看出,从最上游链路开始,到map链路,使用的都是elastic线程环境,而filter链路被publishOn修改为parallel线程环境

全局环境

总是切换线程环境怎么办?我ThreadLocal里面还一堆宝贝呢。

虽然切换了线程环境,但是Reactor给出了一个解决方案:

把需要共享的东西放在链路的Context中。

@Test
public void contextTest() {
    try {
        String key = "context";
        Mono<String> r = Mono.just("My ")
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "Sunshine"))
                .flatMap( s -> Mono.subscriberContext()
                        .map( ctx -> s + " " + ctx.get(key)))
                .subscriberContext(ctx -> ctx.put(key, "Dreams"));

        StepVerifier.create(r)
                .expectNext("My Sunshine Dreams")
                .verifyComplete();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

Cold or Hot

冰与火,代表两种截然不同的序列处理方式。

冷序列代表如果你已经创建了一个序列,但是却没有订阅它,那么它是不会开始发送元素的。

热序列代表如果你已经创建了一个序列,这个序列是不以他人的意志为转移,从创建始起,它就不断地发送着元素,直到收到终止信号

我们继续做实验。

冷序列

@Test
public void coldTest() {
    try {
        List<Integer> integerList = List.of(1, 2, 3, 4, 5);
        Flux coldFlux = Flux.fromIterable(integerList);
        coldFlux.subscribe(integer ->
            System.out.println("第一位订阅者:" + integer)
        );
        coldFlux.subscribe(integer ->
            System.out.println("第二位订阅者:" + integer)
        );
    } catch (Exception e) {
        e.printStackTrace();
    }
}
输出
第一位订阅者:1
第一位订阅者:2
第一位订阅者:3
第一位订阅者:4
第一位订阅者:5
第二位订阅者:1
第二位订阅者:2
第二位订阅者:3
第二位订阅者:4
第二位订阅者:5

热序列

代码部分
@Test
public void hotTest() {
    try {
        EmitterProcessor emitterProcessor = EmitterProcessor.create(100);
        Flux.range(1, 5).subscribe(integer -> emitterProcessor.onNext(integer));
        emitterProcessor.subscribe(integer ->
            System.out.println("第一位订阅者:" + integer)
        );

        Flux.range(6, 4).subscribe(integer -> emitterProcessor.onNext(integer));
        emitterProcessor.subscribe(integer ->
            System.out.println("第二位订阅者:" + integer)
        );
        Flux.range(11, 3).subscribe(integer -> emitterProcessor.onNext(integer));
    } catch (Exception e) {
        e.printStackTrace();
    }
}
输出
第一位订阅者:1
第一位订阅者:2
第一位订阅者:3
第一位订阅者:4
第一位订阅者:5
第一位订阅者:6
第一位订阅者:7
第一位订阅者:8
第一位订阅者:9
第一位订阅者:11
第二位订阅者:11
第一位订阅者:12
第二位订阅者:12
第一位订阅者:13
第二位订阅者:13

可以看出,第二位订阅者加入进来之后,会从当前元素开始继续消费。

Processor

Processor是个万金油模式,它既可以是一个Publisher,又可以是一个Subcriber模式,这种草履虫模式带来的后果就是上手复杂度高。

它的繁殖系统也比较发达:

  • DirectProcessor。直接调动Sink方法推送数据,但是无法处理背压,就是一个简单的Processor。
  • UnicastProcessor。直接调用Sink方法推送数据,可以使用内存缓存来处理背压,但是只能使用一个内存缓存,所以无法拥有多个订阅者
  • EmitterProcessor。通过Sink方法推送数据或者是订阅一个上游发布者来同步数据均可,可以拥有多个订阅者,并且每个订阅者可以有不同的背压策略,它自带一个缓存,如果没有订阅者,上游数据会推送到缓存中,第一个订阅者会取走缓存中所有的元素,然后这个缓存就将被用来完成背压功能。
  • ReplayProccessor。通过Sink方法推送数据或者是订阅一个上游发布者来同步数据均可,它会缓存直接通过自身Sink发送的元素和上游服务元素,不仅如此,它还具有回放功能EmitterProcessor的缓存元素不是直接被拿走,后面来的订阅者看不到前面的元素了,但是ReplayProcessor的订阅者却可以获取之前的元素,但是也不是一味的缓存,它提供了缓存一个元素缓存一定的元素缓存所有历史的元素缓存基于时间段内的元素缓存一定数目和时间段内的元素的缓存策略。
  • TopicProcessor。可以订阅多个上游发布者,使用RingBuffer数据结构来缓存多个来自上游的数据,并且可以拥有多个订阅者,健壮性更强。它为每个订阅者都分配了一个线程,所以线程池的限制就是订阅者的线程,它也是基于RingBuffer存储已发送的数据,每个线程分别在RingBuffer上有自己的索引,autoCancel意味着所有的订阅者都取消了订阅,那么这个流也就消失了。
  • WorkQueueProcessor。它也是一个异步Processor,可以同时订阅多个上游发布者,使用RingBuffer数据结构来缓存多个来自上游的数据,健壮性更强,但是它不必为每个订阅者都分配一个线程,而是采用一个集中线程,相较于TopicProcessor来说更节省资源,这么做的缺点就是它每次只能向一个订阅者发送消息,所有它的发送模式是轮询方式

截稿为止,Reactor3.0不适用于Android

赞(0) 投币

评论 抢沙发

慕勋的实验室慕勋的研究院

码字不容易,路过请投币

支付宝扫一扫

微信扫一扫