WebFlux 之 Mono API 教程

JAVA herman 3548浏览
公告:“业余草”微信公众号提供免费CSDN下载服务(只下Java资源),关注业余草微信公众号,添加作者微信:xttblog,发送下载链接帮助你免费下载!
本博客日IP超过1800,PV 2600 左右,急需赞助商。
极客时间所有课程通过我的二维码购买后返现24元微信红包,请加博主新的微信号:xttblog,之前的微信号好友位已满,备注:返现
所有面试题(java、前端、数据库、springboot等)一网打尽,请关注文末小程序
视频教程免费领

前面写了一些 WebFlux 方面的教程,发现很多人搜索到了我的文章,希望我能继续写一些教程。于是便有了本文。

关于 WebFlux 前面我已经反复强调过,它是一种新的编程趋势,未来会越来越流行。所以,很有必要认真的学习学习!

Fulx 和 Mono

Reactive Streams 是一个标准或者说是规范,由 Netflix、TypeSafe、Pivotal 等公司发起的。Reactor 是 Spring 中的一个子项目,一个基于 java 的响应式编程框架,该框架实现了 Reactive Programming(反应式编程即响应式编程) 思想,符合 Reactive Streams 规范。Mono 和 Flux 是 Reactor 中非常重要的两个类。

Reactive Streams 是规范,Reactor 实现了 Reactive Streams。Web Flux 以 Reactor 为基础,实现 Web 领域的反应式编程框架。

Mono和Flux都是Publisher(发布者)。

其实,对于大部分业务开发人员来说,当编写反应式代码时,我们通常只会接触到 Publisher 这个接口,对应到 Reactor 便是 Mono 和 Flux。对于 Subscriber 和 Subcription 这两个接口,Reactor 必然也有相应的实现。但是,这些都是 Web Flux 和 Spring Data Reactive 这样的框架用到的。如果不开发中间件,通常开发人员是不会接触到的。

比如,在 Web Flux,你的方法只需返回 Mono 或 Flux 即可。你的代码基本也只和 Mono 或 Flux 打交道。而 Web Flux 则会实现 Subscriber ,onNext 时将业务开发人员编写的 Mono 或 Flux 转换为 HTTP Response 返回给客户端。

Mono 实现了 org.reactivestreams.Publisher 接口,代表0到1个元素的发布者(Publisher)。

根据上面的介绍,可以得出 Mono  相当于 Optional,或和 Optional 的作用类似。

@RestController
public class MonoController {

    @RequestMapping("/hello")
    public Optional<String> hello(@RequestParam(required = false) String str){
        if(StringUtils.isEmpty(str)){
            return Optional.empty();
        }
        return Optional.of("业余草:www.xttblog.com");
    }

    @RequestMapping("/mono")
    public Mono<String> mono(@RequestParam(required = false) String str){
        if(StringUtils.isEmpty(str)){
            return Mono.empty();
        }
        return Mono.just("业余草:www.xttblog.com");
    }
}

反应式编程框架主要采用了观察者模式,而 Spring Reactor的核心则是对观察者模式的一种衍伸。关于观察者模式的架构中被观察者(Observable)和观察者(Subscriber)处在不同的线程环境中时,由于者各自的工作量不一样,导致它们产生事件和处理事件的速度不一样,这时就出现了两种情况:

  • 被观察者产生事件慢一些,观察者处理事件很快。那么观察者就会等着被观察者发送事件好比观察者在等米下锅,程序等待)。
  • 被观察者产生事件的速度很快,而观察者处理很慢。那就出问题了,如果不作处理的话,事件会堆积起来,最终挤爆你的内存,导致程序崩溃。(好比被观察者生产的大米没人吃,堆积最后就会烂掉)。为了方便下面理解Mono和Flux,也可以理解为Publisher(发布者也可以理解为被观察者)主动推送数据给Subscriber(订阅者也可以叫观察者),如果Publisher发布消息太快,超过了Subscriber的处理速度,如何处理。这时就出现了Backpressure(背压—–指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略)

Mono 实现了 Publisher 接口,但是通过查看源码,发现它是一个抽象类。Mono 里面有很多 API,关于这些 API 的解释如下:

  • empty():创建一个不包含任何元素,只发布结束消息的序列。
  • just():可以指定序列中包含的全部元素。创建出来的 Mono序列在发布这些元素之后会自动结束。
  • justOrEmpty():从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
  • error(Throwable error):创建一个只包含错误消息的序列。
  • never():创建一个不包含任何消息通知的序列。
  • fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。
  • delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
  • create():通过 create()方法来使用 MonoSink 来创建 Mono。

API 使用案例如下所示。

//empty():创建一个不包含任何元素,只发布结束消息的序列
Mono.empty().subscribe(System.out::println);
//just():可以指定序列中包含的全部元素。创建出来的 Mono序列在发布这些元素之后会自动结束。
Mono.just("www.xttblog.com").subscribe(System.out::println);
//ustOrEmpty():从一个 Optional 对象或可能为 null 的对象中创建 Mono。
//只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
Mono.justOrEmpty(null).subscribe(System.out::println);
Mono.justOrEmpty("业余草").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("业余草")).subscribe(System.out::println);
//error(Throwable error):创建一个只包含错误消息的序列。
Mono.error(new RuntimeException("error")).subscribe(System.out::println, System.err::println);
//never():创建一个不包含任何消息通知的序列。
Mono.never().subscribe(System.out::println);
//通过 create()方法来使用 MonoSink 来创建 Mono。
Mono.create(sink -> sink.success("业余草")).subscribe(System.out::println);

//通过fromRunnable创建,并实现异常处理
Mono.fromRunnable(() -> {
    System.out.println("thread run");
    throw new RuntimeException("thread run error");
}).subscribe(System.out::println, System.err::println);
//通过fromCallable创建
Mono.fromCallable(() -> "callable run ").subscribe(System.out::println);
//通过fromSupplier创建
Mono.fromSupplier(() -> "create from supplier").subscribe(System.out::println);

//delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
long start = System.currentTimeMillis();
Disposable disposable = Mono.delay(Duration.ofSeconds(2)).subscribe(n -> {
    System.out.println("生产数据源:"+ n);
    System.out.println("当前线程ID:"+ Thread.currentThread().getId() + ",生产到消费耗时:"+ (System.currentTimeMillis() - start));
});
System.out.println("主线程"+ Thread.currentThread().getId() + "耗时:"+ (System.currentTimeMillis() - start));
while(!disposable.isDisposed()) { }

跟我学 WebFlux

本文主要是介绍了一下 Mono 类中相关 API 的用法,关于 Flux 类,我们下章继续。

业余草公众号

最后,欢迎关注我的个人微信公众号:业余草(yyucao)!可加QQ1群:135430763(2000人群已满),QQ2群:454796847(已满),QQ3群:187424846(已满)。QQ群进群密码:xttblog,想加微信群的朋友,之前的微信号好友已满,请加博主新的微信号:xttblog,备注:“xttblog”,添加博主微信拉你进群。备注错误不会同意好友申请。再次感谢您的关注!后续有精彩内容会第一时间发给您!原创文章投稿请发送至532009913@qq.com邮箱。商务合作可添加助理微信进行沟通!

本文原文出处:业余草: » WebFlux 之 Mono API 教程