Reactor
初识Reactor
! ! Reactor讲解
Reactor:响应式编程,区别于一般:Client去询问Server是否有变化并拉取数据的模式,响应式编程是后台有数据变化时主动push给订阅者,也就是Publisher-Subscriber模型。
一、特点
-
可组合性,即可以协调多个异步任务。
-
提供丰富的操作符。
-
订阅之前不会产生任何的动作。(即只有订阅的时候才执行操作)
-
冷热响应
- 冷响应:对于每个订阅者,包括在数据源处,数据推送都会从头开始。例如,如果源包装了HTTP调用,则会为每个订阅发出一个新的HTTP请求。
- 热响应:并不是每个订阅者都可以从头开始接收数据。后面的订阅者会在订阅后接收已推送的信号。
-
异步非堵塞
-
流量控制(背压控制)
Publisher的数据推送的过于频繁,Subscriber处理数据的速度跟不上Publisher推送的速度。这时候就用到背压控制:Subscriber向Publisher发送信号来控制Publisher push数据的速度。
可以用reactive库中的WebClient来进行实现。
二、Operators
1、filter: 过滤
对Flux流中包含的元素进行过滤,只留下满足指定条件的元素。
//只留下偶数元素
Flux.range(1,10).filter(i->i%2==0).subscribe(System.out::println);

2、map: 数据映射
把Flux的数据通过某些逻辑转换成另外的形式。
//结果为A B C
Flux.just("a","b","c").map(String::toUpperCase).subscribe(System.out::println);
3、index: 编号
给Flux中的每个元素加上编号。
4、window: 分组
把当前流中的元素收集到另外的 Flux 序列中,也就是分组,分组结果为多个Flux序列。
//把一个包含1-100的flux序列分组,20个为一组,每组为一个新的Flux序列
//这里window为一个包含5个Flux序列的Flux序列数组,其中每个Flux有20个元素
Flux<Flux<Integer>> window = Flux.range(1,100).window(20);
window.subscribe(System.out::println);

为什么输出的结果是UnicastProcessor?
其实,在调用window()方法的时候,其调用过程大致如下:
- 序列得到订阅,则开始首次发送元素,这时候会创建一个新的源序列UnicastProcessor。
- 当UnicastProcessor接受的元素数量到达20个时,执行对应的onComplete()方法。
- 那么当第21个元素发送的时候,又会创建一个新的源序列UnicastProcessor,并向下游传递,以此类推。
注意: window和buffer的区别
(首先有个前提,我们数据都是从上游往下游进行传输的)
- window是先创建一个UnicastProcessor序列,然后直接向下游传递。
- buffer是先收集满20个元素,再向下游进行传递。
5、buffer:分组
对一个Flux的元素进行分组,分组结果还是1个Flux序列。
5.1 buffer:有一个参数,指定了每组所包含的元素的最大数量
//结果:输出5组数据,每一组数据包含20个元素
Flux.range(1,100).buffer(20).subscribe(System.out::println)
5.2 bufferUntil:参数表示每个集合中的元素要满足的条件。bufferUntil会一直收集,直到条件==true
//结果:输出5组数据,每一组包含2个元素
//只有当前元素为偶数的时候,才会停止当前元素的收集(当前组包含当前元素),接下来的元素另起组
Flux.range(1,10).bufferUntil(i->i%2==0).subscribe(System.out::println);
5.3 bufferTimeout:2个参数,分别制定了每组所包含的元素的最大数量、收集的时间间隔
5.4 bufferWhile:参数表示每个集合中的元素要满足的条件。bufferWhile则只有当条件==true时才会收集。一旦为false,会立即开始下一次收集。
//结果:输出5组数据,每组包含1个元素
//只有当前元素为偶数时,才进行元素收集
Flux.range(1,10).bufferWhile(i->i%2==0).subscribe(System.out::println);
6、take: 提取数据
6.1 take
//取前三个元素
//结果为1 2 3
Flux.range(1,100).take(3).subscribe(System.out::println);
6.2 takeLast
//取最后三个元素
//结果为98 99 100
Flux.range(1,100).takeLast(3).subscribe(System.out::println);
6.3 takeWhile
//当条件为真时才进行提取
//结果为1 2
Flux.range(1,100).takeWhile(i->i<3).subscribe(System.out::println);
6.4 takeUntil
//当条件为真时停止提取
//结果为1 2
Flux.range(1,100).takeWhile(i->i==3).subscribe(System.out::println);
7、zipWith:元素合并
将两个Flux流中的元素按照一对一的方式进行合并
注意:
- 在合并时可以不做任何处理,由此得到的是一个元素类型为 Tuple2 的流。
- 若某个序列中的元素数量偏多或者偏少,那么多余的结果并不会输出。
// 结果:[a,c] [b,d]
Flux.just("a", "b").zipWith(Flux.just("c", "d","e","f")).subscribe(System.out::println);// 结果:a-c b-d
//第二个参数为逻辑处理流程
Flux.just("a", "b").zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2)).subscribe(System.out::println);
8、reduce:累加、累积
对流中包含的所有元素进行累积操作,得到一个包含计算结果的 Mono 序列
注意:
- 在操作时可以指定一个初始值。如果没有初始值,则序列的第一个元素作为初始值。
- 累积操作并不单单代表累加、累积的任意一种,而指的是对所有的元素做一个统一的操作,可能是累积、累加等一系列操作。
//1-100累加,结果为5050
Flux.range(1,100).reduce(Integer::sum).subscribe(System.out::println);
//1-100累加再加上100,结果为5150
Flux.range(1, 100).reduceWith(() -> 100, Integer::sum).subscribe(System.out::println);
9、merge:流合并
把多个流合并成一个 Flux 序列
Flux.merge(Flux.interval(Duration.ZERO, Duration.ofMillis(100)).take(9),Flux.interval(Duration.ofMillis(50), Duration.ofMillis(100)).take(2),Flux.interval(Duration.ofMillis(100), Duration.ofMillis(100)).take(3)
).subscribe((t) -> System.out.println("Flux.merge :" + t + ",线程:" + Thread.currentThread().getName()));// 让主线程睡眠2s,保证上面的输出能够完整Thread.sleep(2000);

注意:
- merge() 会按照所有流中元素的实际产生顺序来合并,代码中定义了每个流中元素产生的时间间隔。
- merge()方法的兄弟方法:mergeSequential(),按照所有流被订阅的顺序,以流为单位进行合并。
- 代码中对Flux.interval()方法生成的流,采用了take()操作来制定获取的元素,若不采用,那么该流是一个无限序列。
10、interval:延迟
Flux.interval(xxx1,xxx2)
:时间类操作符,按照指定的参数来创建流。
- xxx1:第一次执行的延迟时间。
- xxx2:每隔多少秒发送一次事件,发送的内容是Long类型整数,从0开始。
11、flatMap:转换并合并结果
把流中的每个元素转换成一个流,再把所有流中的元素进行合并
Flux<String> stringFlux1 = Flux.just("a", "b", "c", "d", "e", "f", "g", "h", "i");// [a,b],[c,d],[e,f],[g,h],[i]
Flux<Flux<String>> stringFlux2 = stringFlux1.window(2);stringFlux2.flatMap(flux1 -> flux1.map(String::toUpperCase)).subscribe(System.out::print);
12、concatMap:转换并合并结果
concatMap 操作同样是根据源中的每个元素来获取一个子源,再把所有子源进行合并。
Flux.just(5, 10)// 将源中的每个元素来获取一个子源,那么这里的x指的是5或者10// 然后以元素10为例:x=10,通过Flux.interval来生成序列,并取出10-3=7个元素.concatMap(x -> Flux//延迟x*10ms,之后每100ms开始生成从0开始的元素:0、1、2、3、4…….interval(Duration.ofMillis(x * 10), Duration.ofMillis(100))//将原本Flux流中的两个元素和interval生成的元素通过map合并//形式为x:i.map(i -> x + ": " + i)//取最终流中的前x-3个元素.take(x - 3)).toStream().forEach(System.out::println);
注意: concatMap和flatMap有什么不同?
- 顺序:concatMap 操作会根据初始源中的元素顺序依次将获取到的子源进行合并。而flatMap是不会的。
- 订阅:concatMap 操作对所获取到的子源的订阅是动态进行的,而flatMap则是在合并开始之前就订阅了由父源下发的所有子源。(?不懂)
13、groupBy:分组
通过一个策略 key 将一个 Flux分割为多个组
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9)//分组,偶数为一组,奇数为一组。key分别为even和odd.groupBy(i -> i % 2 == 0 ? "even" : "odd")//合并两个Flux流.concatMap(i -> i//defaultIfEmpty:如果没有任何数据完成此序列,则提供默认的唯一值.defaultIfEmpty(-1)//强制转换为String.map(String::valueOf)//不确定,应该是以i的key为每个i的前缀,这里i表示两个Flux流.startWith(i.key())).subscribe(System.out::println);

注意:
- 几个组之间的元素不会存在交集,也就是一个元素只属于其中1个组。
- 组永远不会为空,因为如果进行分组的时候发现没有对应的组,则进行创建操作。
14、defaultIfEmpty:分配默认值
如果没有任何数据完成此序列,则提供默认的唯一值
三、流式编程Example
消息驱动模式:subcribe()之前的代码描述应该发生什么,但是都尚未发生,直到有人subscribe(代码最后一行subcribe()),才会进行处理的流程(每个.之后都是一步流程) 。也就是订阅之前不会产生任何的动作。
当produces= MediaType.APPLICATION_STREAM_JSON_VALUE时,标注了stream,那么就表述在Publisher和Subcriber之间建立了一个长链接
四、Reactor Publisher
https://skyao.io/learning-reactor/docs/concept/flux/flux.html
https://segmentfault.com/a/1190000024499748
Publisher<T>
是一个可以提供0-N个序列元素的提供者,并根据其订阅者Subscriber<? super T>
的需求推送元素。一个Publisher<T>
可以支持多个订阅者,并可以根据订阅者的逻辑进行推送序列元素。
下面这个Excel计算就能说明一些Publisher<T>
的特点。
A1-A9就可以看做Publisher<T>
及其提供的元素序列。A10-A13可以看作订阅者Subscriber
。假如说我们没有A10-A13,那么A1-A9就没有实际意义,它们并不产生计算。这也是响应式的一个重要特点:当没有订阅时发布者什么也不做。
而Flux
和Mono
都是Publisher<T>
在Reactor 3**实现。
4.1 Flux(一串数据的抽象)
Flux作为Publisher,产生0-N个数据对象 ==> 数据对象交给operator()处理器进行处理 ==> 生成另外的数据对象。
Flux
是一个发出(emit)0-N
个元素组成的异步序列的Publisher<T>
.
在响应流规范中存在三种给下游消费者调用的方法 onNext
, onComplete
, 和onError
。
- 正常的包含元素的消息:onNext()
- 序列结束的消息:onComplete()
- 序列出错的消息:onError()
- Flux序列中可以有n个元素,每个元素都会经过operator操作进行数据的转换然后输出。
- 如果由于某些原因发生了错误,那么会终止该序列。
4.1.1 创建Flux
(1) generate
generate()方法的结构:
generate(// 用于初始化值Callable<S> stateSupplier, // 生成器,也就是逻辑处理的函数BiFunction<S, SynchronousSink<T>, S> generator,// consumer,用于在序列终止的时候调用执行,比如关闭数据源 Consumer<? super S> stateConsumer
);
/**
简单来看,该方法的调用就是三步走:
初始化。
逻辑处理。
终止处理(fiinally)
**/
简单例子:
Flux.generate(t->{ t.next(1);t.complete();
}).subscribe(System.out::println);
//创建一个Flux,有1个元素
//注意generate中next只能调用1次
复杂例子:
public void testGenerate() {Flux.generate(// 1.生成一个对象,用来作为状态,即初始化操作。AtomicInteger::new,// 2.逻辑处理(state, sink) -> {// 改变状态值int i = state.getAndIncrement();sink.next("2*" + i + "=" + 2 * i);if (i == 5) {// sink事件结束。sink.complete();}return state;},// 3.序列终止时,输出状态值。(state) -> System.out.println("state:" + state)).subscribe(System.out::println);
}

(2) create
create()方法的结构:
// 一个参数时,第一个参数为逻辑处理函数/lambda表达式,第二个参数不写时默认为Buffer(背压策略之一)
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) {return create(emitter, OverflowStrategy.BUFFER);
}//两个参数时,第一个参数为逻辑处理函数/lambda表达式,第二个参数为背压策略
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) {return onAssembly((Flux)(new FluxCreate(emitter, backpressure, CreateMode.PUSH_PULL)));
}
简单例子
Flux.create((t)->{t.next(1);t.next(2);t.complete();
}).subscribe(System.out::println);
//创建一个Flux,有多个元素,也就是可以多次发射
复杂例子
private Flux<EventSource.Event> createFlux(FluxSink.OverflowStrategy strategy) {// 事件源注册了一个监听器,负责监听新事件的创建以及事件源的停止return Flux.create(sink -> eventSource.register(new MyListener() {@Overridepublic void newEvent(EventSource.Event event) {System.out.println("上游------>数据源创建了新事件:" + event.getMsg());sink.next(event);}@Overridepublic void eventSourceStopped() {sink.complete();}}), strategy); // 别忘了这里还有个背压策略的参数}
Create() 方法跟Generate() 比较来看,有这么几个不同:
-
Create() 可以同步、也可以异步。而Generate() 是同步的。
//自定义一个事件源类EventSource: public class EventSource {private List<MyListener> listeners;public EventSource() {this.listeners = new ArrayList<>();}public void register(MyListener listener){listeners.add(listener);}public void newEvent(Event event){for (MyListener listener : listeners) {listener.newEvent(event);}}public void eventStopped(){for (MyListener listener : listeners) {listener.eventSourceStopped();}}@Data@AllArgsConstructorpublic static class Event{private Date time;private String msg;} }//定义一个监听器接口MyListener public interface MyListener<T> {// 监听新事件void newEvent(EventSource.Event event);// 监听事件源的终止void eventSourceStopped(); }//测试方法 // 1.构建一个事件源 EventSource eventSource = new EventSource(); Flux.create(sink -> {// 2.向事件源中添加一个监听器,负责监听事件的产生和事件源的停止。// 相当于将事件转换成异步的事件流eventSource.register(new MyListener() {@Overridepublic void newEvent(EventSource.Event event) {// 3.监听器收到事件回调的时候,会通过sink将事件发出sink.next(event);}@Overridepublic void eventSourceStopped() {// 4.监听器收到事件源停止的回调后,由sink发出完成信号,停止sinksink.complete();}}); }).subscribe(System.out::println);// 5.循环产生订阅 for (int i = 0; i < 5; i++) {TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));eventSource.newEvent(new EventSource.Event(new Date(), "事件" + i + "注册!")); } // 6.停止事件源 eventSource.eventStopped();
-
Create() 可以每次发出多个元素,而Generate() 每次发出一个。
//create() Flux.create(sink -> {for (int i = 0; i < 5; i++) {sink.next(i);}sink.complete(); }).subscribe(System.out::println); //generate() Flux.generate(t->{ t.next(1);t.complete(); }).subscribe(System.out::println);
-
Create() 不需要状态值。
-
Create() 可以在回调中触发多个事件。
-
Create() 是多线程的。
(3) push
Create有一个变体模式:push()
方法,和create()
方法不同的是,他是一个单线程的,即只有一个线程可以调用next()
,complete()
方法。
(4) just
Flux.just("1","2").subscribe(System.out::println);
//just("1","2")可以列举生成序列的所有元素(这里是1、2),
//just()创建出来的序列,在发布元素后(subscribe())会自动结束
(5) from(通过Publisher创建一个Flux)
//Flux->Flux
Flux.from(Flux.just("just", "just1", "just2")).subscribe(System.out::println);
//Mono->Mono
Flux.from(Mono.just("just")).subscribe(System.out::println);
(6) fromArray(创建发出数组的Flux)
// fromArray()从一个数组对象中创建一个Flux对象Integer[] arr = new Integer[]{1, 2, 3};Flux<Integer> f4 = Flux.fromArray(arr);
(7) fromIterable、fromStream
// fromIterable()从一个Iterable对象中创建一个Flux对象Flux<String> f2 = Flux.fromIterable(Arrays.asList("str1", "str2", "str3"));// fromStream()从一个Stream对象中创建一个Flux对象ArrayList<Integer> numList = new ArrayList<>();numList.add(1);Flux<Integer> f3 = Flux.fromStream(numList.stream());
(8) defer(推迟发送)
Flux.defer(()->Flux.range(1,10)).subscirbe(System.out::println);
(9) range
Flux.range(1,10).subscirbe(System.out::println);
//range(int starter, int counter),创建包含从starter开始的counter个Integer对象的序列
//例如:range(1,10)创建序列,包含从1起始的10个数量的Integer对象
//Integer 类在对象中包装了一个基本类型 int 的值。该类提供了多个方法:能在 int 类型和 String 类型之间互相转换,还提供了处理 int 类型时非常有用的其他一些常量和方法
(10) interval(Duration period )、interval(Duration delay, Duration period)
创建一个包含了从0开始递增的Long对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
Flux.just(2, 4).flatMap(x -> Flux.interval(Duration.ofMillis(1000)).take(x)).toStream().forEach(System.out::println);
//流中的元素被转换成每隔 1000毫秒产生的数量不同的流,再进行合并
https://www.jianshu.com/p/8d80dcb4e7e0—>如何使用flatMap()
(11) intervalMillis(long period)、intervalMillis(long delay, long period)
与interval()方法相同,但该方法通过毫秒数来指定时间间隔和延迟时间
Flux.intervalMillis(1000).subscirbe(System.out::println);
(12) empty()、never()、error()
// 创建一个不包含任何元素,只发布结束消息的序列。// 并且这种方式不会进行后续传递,需要switchIfEmpty()方法来进行处理。// 因为响应式编程中,流的处理是基于元素的,而empty()是没有元素的!Flux<Object> empty = Flux.empty();// 创建一个只包含错误消息的序列,里面的参数类型是ThrowableFlux<Object> error = Flux.error(new Exception("error!"));// 创建一个不包含任何消息通知的序列,注意区别empty(),empty还是会发布结束消息的。Flux<Object> never = Flux.never();
4.1.2 订阅Flux流: subscribe()
//subscribe():订阅Flux序列,只有进行订阅后才回触发数据流,不订阅就什么都不会发生Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4);integerFlux.subscribe(System.out::println);// lambda表达式Flux<String> stringFlux = Flux.just("hello", "world");stringFlux.subscribe(System.out::println);
(1) 完整参数subcribe()方法
/*** consumer:即如何处理数据元素,比如可以打印输出print。* errorConsumer:若发生错误如何处理,即错误信号(同时终止序列)。* completeConsumer:即完成信号(同时终止序列)。* subscriptionConsumer:订阅发生时候的处理逻辑*/
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer)
{return (Disposable)this.subscribeWith(new LambdaSubscriber(consumer, errorConsumer, completeConsumer, subscriptionConsumer));
}
例如:
public class MySubscriber extends BaseSubscriber {@Overrideprotected void hookOnSubscribe(Subscription subscription) {System.out.println("订阅开始!");// 订阅时首先向上游请求1个元素request(1);}@Overrideprotected void hookOnNext(Object value) {System.out.println("获取当前元素成功!" + value);// 每次处理完后再去请求1个元素request(1);}
}public void testOut() {// 1.创建一个序列Flux<Integer> flux = Flux.range(1, 6).map(i -> {if (i <= 4) {return i * 2;}throw new RuntimeException("数据超过了5!");});// 2.订阅MySubscriber ss = new MySubscriber();flux.subscribe(System.out::println,error -> System.out.println("Error:" + error),() -> System.out.println("Complete!"),s -> ss.request(2));flux.subscribe(ss);
}
(2) Note:
- subscribe()方法的第四个参数只能接收一个自定义的Subscriber,需要重写BaseSubscriber。
- BaseSubscriber是一个抽象类,定义了多种用于处理不同信号的hook方法。
- 至少要重写hookOnSubscribe()方法和hookOnNext()方法。
- 若存在第四个参数,那么在对应的自定义订阅类中,对应重写的方法代替对应位置的lambda表达式。 比如我重写了onNext()方法,替换了原有lambda表达式中单纯的输出结果(加了几个中文字)。那么再简单点,说白了,BaseSubscriber是Lambda的替代品。
4.2 Mono(1\0个数据的抽象)
Mono作为Publisher,产生0-1个数据对象 ==> 数据对象交给operator()处理器进行处理 ==> 生成另外的数据对象
Mono
是一个发出(emit)0-1
个元素的Publisher<T>
,可以被onComplete
信号或者onError
信号所终止。
Flux 和 Mono 之间可以进行转换,例如:
- 对一个 Flux 序列进行计数操作,得到的结果是一个 Mono对象
- 把两个 Mono 序列合并在一起,得到的是一个 Flux 对象。

- 最多发出一项元素,并且以成功信号或者失败信号为终止信号。
//创建MONO
Mono.just("test hjf").subscribe(System.out::println);
Mono.empty().subscribe(System.out::println);
Mono.error(new Throwable()).subscribe(System.out::println);
Mono.never().subscribe(System.out::println);//fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。
Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);//justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data):从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);//delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
Mono.delay(Duration.ofSeconds(10)).subscribe(System.out::println);//ignoreElements(Publisher<T> source):创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。
Mono.ignoreElements(null).subscribe(System.out::println);//还可以通过 create()方法来使用 MonoSink 来创建 Mono。
Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);
五、Lambda表达式
六、背压
两种支持异步的生成Flux的方法create()和push(),支持背压操作,两个方法的第二个参数用于声明背压策略。
// 一个参数时,第一个参数为逻辑处理函数/lambda表达式,第二个参数不写时默认为Buffer(背压策略之一)
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) {return create(emitter, OverflowStrategy.BUFFER);
}//两个参数时,第一个参数为逻辑处理函数/lambda表达式,第二个参数为背压策略
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) {return onAssembly((Flux)(new FluxCreate(emitter, backpressure, CreateMode.PUSH_PULL)));
}
6.1 背压策略
public static enum OverflowStrategy {IGNORE,//忽略下游的背压请求,上游push消息过快时会直接报错:IllegalStateExceptionERROR,//上游push消息过快时会直接报错:IllegalStateExceptionDROP,//下游直接丢弃处理不了的eventLATEST,//让下游只从上游获取最新eventBUFFER;//(默认值)以在下游无法跟上时缓冲所有信号。(这会实现无限缓冲,并可能导致OutOfMemoryError)private OverflowStrategy() {}
}
6.2 request()
request()方法用于限制下游向上游的请求个数,避免下游压力过大
@Test
public void () {Flux<Integer> flux = Flux.range(1, 10).log();flux.subscribe(new BaseSubscriber<Integer>() {private int count = 0;private final int requestCount = 4;@Overrideprotected void hookOnSubscribe(Subscription subscription) {//订阅时,每次只让上游发送requestCount个请求request(requestCount);}@SneakyThrows@Overrideprotected void hookOnNext(Integer value) {count++;// 通过count控制每次request 4个元素//如果队列里已经有4个元素没处理,那么就睡眠1sif (count == requestCount) {Thread.sleep(1000);request(requestCount);count = 0;}}});
}
结果(每个输出停顿1秒):
如果将变量requestCount
改为2:结果会是2个2个输出
七、打包操作
用于调用外部公共方法。
transform()
//定义一个公共方法
//筛选出不以str开头的元素,并变成大写
Function <Flux<String>, Flux<String>> filterAndMap =f->f.filter(color ->!color.startsWith("str")).map(String::toUpperCase);//定义Flux序列
Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple", "yellow", "str1", "str2"))//调用外部函数.transform(filterAndMap).subscribe(System.out::println);

compose()
//定义一个公共方法
AtomicInteger i = new AtomicInteger();
Function <Flux<String>,Flux<String>> function = f ->{//第1个线程就抛去“str1”,并转换成大写字母if(i.incrementAndGet()==1){return f.filter(color -> !color.equals("str1")).map(String::toUpperCase);}//不是第1个线程就抛去“str2”,并转换成大写字母return f.filter(color->!color.equals("str2")).map(String::toUpperCase);
}//定义Flux序列
Flux<String> flux =Flux.fromIterable(Arrays.asList("str1", "str2", "str3", "str4"))// 调用外部定义好的function.compose(function);
//第1个线程
flux.subscribe(d -> System.out.println("订阅者1:获得数据" + d));
System.out.println("==============================================");
//第2个线程
flux.subscribe(d -> System.out.println("订阅者2:获得数据" + d));

如果将compose()改为transform(),结果如下:
结论如下:
compose()
中打包的函数可以是有状态的,本案例中对应我们的AtomicInteger
对象。transform()
打包的函数是无状态的。
AtomicInteger
在Java语言中,++i和i++操作并不是线程安全的。而AtomicInteger是线程安全的,即使不使用synchronized关键字也能保证其是线程安全的,主要用于多线程间共享计数。
比如多个线程从列表中依次读取数据,使用AtomicInteger来计数,每次取第AtomicInteger个。而且由于AtomicInteger由硬件提供原子操作指令实现,在非激烈竞争的情况下,开销更小,速度更快。
AtomicInteger atomicInteger = new AtomicInteger();
//将原先的值进行了一个加1,但是返回的加1之前的值
atomicInteger.getAndIncrement()
//将原先的值进行了一个加1,返回的加1之后的值
atomicInteger.incrementAndGet()
如果多线程想要借助AtomicInteger共享计数,需要在主线程(main)中创建并初始化AtomicInteger对象,再通过构造函数的引用传递才能实现多线程共享计数
public class TestThread extends Thread implements Runnable {public static void main(String[] args) {//使用此种方式,运行结果顺序递增,但是两个线程分别计数//两个线程分别新建了一个AtomicInterge实例对象,所以不会共享计数TestThread thread1 = new TestThread();TestThread thread2 = new TestThread();thread1.start();thread2.start();//使用此种方式,运行结果不一定顺序递增,但是统一计数AtomicInteger testAtomicInteger = new AtomicInteger(0);//两个线程使用同一个AtomicInterge实例对象,所以会共享计数TestThread thread1 = new TestThread(testAtomicInteger);TestThread thread2 = new TestThread(testAtomicInteger);thread1.start();thread2.start();}public TestThread() {// 构造函数,每次使用构造函数都会初始化一个AtomicInteger=0counterAtomicInteger = new AtomicInteger(0);}public TestThread(AtomicInteger counterAtomicInteger) {// 构造函数,每次使用构造函数都会初始化一个作为参数传入的AtomicIntegerthis.counterAtomicInteger = counterAtomicInteger;}
}
八、Schedule定时器
很多情况下,我们的publisher是需要定时去调用一些方法,来产生元素的。Reactor提供了一个新的Schedule类来负责定时任务的生成和管理。
Schedule是一个接口,定义了一些定时器中必须要实现的方法:
立即执行的:
Disposable schedule(Runnable task);
延时执行的:
default Disposable schedule(Runnable task, long delay, TimeUnit unit)
定期执行的:
default Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit)
8.1 Schedulers调度器
Schedule有一个工具类叫做Schedulers,它提供了多个创建Scheduler的方法,它的本质就是对ExecutorService和ScheduledExecutorService进行封装,将其做为Supplier来创建Schedule。
Scheduler提供的静态方法用于创建如下几种线程环境:
(1) 当前线程:Schedulers.immediate()
提交的Runnable将会立马在当前线程执行。
(2) 可重用的单线程:Schedulers.single()
使用同一个线程来执行所有的任务。当各个调用者调用这个方法的时候,都会重用同一个线程,直到这个Scheduler的状态被设定为disposed。
(3) 弹性线程池:Schedulers.elastic()
该方法是一种将阻塞处理放在一个单独的线程中执行的很好的方式,即可以拿来包装一个堵塞方法,将其变为异步。
1.当首次调用这个方法的时候,会创建一个新的线程池,而且这个线程池中闲置的线程可以被重用。
2.如果一个线程的闲置时间太长(默认60s),则会被销毁。
(4) 固定大小线程池:Schedulers.parallel()
创建固定个数的工作线程,个数和CPU的核数相关。
(5) Schedulers.fromExecutorService(ExecutorService)
从一个现有的线程池创建Scheduler。
(6) Schedulers.newXXX
Schedulers提供了很多new开头的方法,来创建各种各样的Scheduler。
//可以指定特定的Scheduler来产生元素
Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))
8.2 注意事项
- 某些Reactor的操作已经是默认使用了特定类型的调度器。例如Flux.interval()创建的源,使用了Schedulers.parallel()等。
- 可以通过publishOn和subscribeOn来进行切换Scheduler的执行上下文。publishOn()切换的是元素消费操作执行时所在的线程。subscribeOn()切换的是源中元素生产操作执行时所在的线程。
九、publishOn() / subscribeOn()
Flux和Mono不会创建线程,只有当触发subscribe()操作时才会执行对应的方法。
而有些操作符,例如**publishOn()和subscribeOn()**方法,能够创建线程。publishOn和subscribeOn主要用来进行切换Scheduler的执行上下文。
9.1 publishOn():
publishOn() 会强制下一个Operator(或许是下一个的下一个…)运行于不同的线程上。也就相当于publishOn()后面的逻辑都执行在另外一个线程上了。
例1:
Flux.range(1, 2).map(i -> {System.out.println(Thread.currentThread().getName());return i;}).publishOn(Schedulers.single()).map(i -> {System.out.println(Thread.currentThread().getName());return i;}).publishOn(Schedulers.newParallel("parallel", 4)).map(i -> {System.out.println(Thread.currentThread().getName());return i;}).subscribe();

流程是:
例2:
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux.range(1, 2).map(i -> 10 + i + ":"+ Thread.currentThread()).publishOn(s).map(i -> "value " + i+":"+ Thread.currentThread());
new Thread(() -> flux.subscribe(System.out::println),"ThreadA").start();
System.out.println(Thread.currentThread());
Thread.sleep(5000);
-
上面创建了一个名字为parallel-scheduler的scheduler。
-
然后创建了一个Flux,Flux先做了一个map操作,然后切换执行上下文到parallel-scheduler线程,最后右执行了一次map操作。
-
最后,我们采用一个新的线程来进行subscribe的输出。
输出结果:
Thread[main,5,main]
value 11:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]
可以看到,主线程的名字是Thread。Subscriber线程的名字是ThreadA。
那么在publishOn之前,map使用的线程就是ThreadA。 而在publishOn之后,map使用的线程就切换到了parallel-scheduler线程池。
9.2 subscribeOn
subscribeOn是用来切换Subscriber的执行上下文,不管subscribeOn出现在调用链的哪个部分,最终都会应用到整个调用链上。
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux.range(1, 2).map(i -> 10 + i + ":" + Thread.currentThread()).subscribeOn(s).map(i -> "value " + i + ":"+ Thread.currentThread());
new Thread(() -> flux.subscribe(System.out::println), "ThreadA").start();
Thread.sleep(5000);
例子同样使用了两个map,然后在两个map中使用了一个subscribeOn用来切换subscribe执行上下文。输出结果:
value 11:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]
可以看到,不管哪个map,都是用的是切换过的parallel-scheduler。
9.3 publishOn和subscribeOn的区别
- publishOn一般使用在订阅链的中间位置,并且从下游获取信号,影响调用位置起后续运算的执行位置。
- subscribeOn一般用于构造向后传播的订阅过程。并且无论放到什么位置,它始终会影响源发射的上下文。同时不会影响对publishOn的后续调用的行为。
- publishOn会强制让下一个运算符(或者下下个)运行于不同的线程上,subscribeOn会强制让上一个(或者上上个)运算符在不同的线程上执行。
十、Processor
https://blog.csdn.net/get_set/article/details/79799895
https://blog.csdn.net/Zong_0915/article/details/115048153
Processor既是一个Publisher也是一个Subscriber,是一个接口。 所以能够订阅一个Processor
,也可以调用它们提供的方法来手动插入数据到序列,或终止序列。
其中的两个实现类:
-
MonoProcessor
-
FluxProcessor
而FluxProcessor又可以衍生出多种Processor,负责应对不同的场景使用:
- UnicastProcessor
- DirectProcessor
- EmitterProcessor
- ReplayProcessor
- TopicProcessor
- WorkQueueProcessor
十一、Context
https://blog.csdn.net/Zong_0915/article/details/115397437
https://blog.csdn.net/LCBUSHIHAHA/article/details/114837031#:~:text=Reactor%E4%B8%AD,ue%E7%9A%84%E5%BD%A2%E5%BC%8F%E5%AD%98%E5%82%A8%E3%80%82
11.1 Context用法:
- Mono.deferContextual(c-> c.get(key)):获得Context中指定Key对应的Value值。
- contextWrite(c -> c.put(key, value):往Context中塞入一个键值对。
- 简单用法的格式:Mono.deferContextual(c ->Mono.just(c.get(key)))
- contextWrite只对它上游的操作生效,对下游的操作不生效。
- 如果下游有多个contestWriter,上游会从邻近的contextWrite中读取上下文的信息。
11.2 Context用例
例子1
String key = "message";
Mono<String> r = Mono.just("Hello").flatMap(s -> Mono.deferContextual(ctx ->Mono.just(s + " " + ctx.get(key)))).contextWrite(ctx -> ctx.put(key, "World"));
- 使用
contextWrite()
往Context
中写入一个键值对,key:message
,value:World
。 - 在
faltMap()
中,通过Mono.deferContextual()
获得一个Context
对象,可以取出对应key的value值,并进行拼接。 - 结果是Hello World
结果说明:订阅是从下游流向上游的,contextWrite只对它上游的操作生效,对下游的操作不生效,在上边例子中也就是先调用了contextWrite(),然后才调用了Mono.flatMap()。
例子2:
String key = "message";
Mono<String> r = Mono.just("Hello").flatMap(s -> Mono.deferContextual(ctx ->Mono.just(s + " " + ctx.get(key)))).contextWrite(ctx -> ctx.put(key, "Reactor")).contextWrite(ctx -> ctx.put(key, "World"));

结果是Hello Reactor
例子3 ??:
String key = "message";
Mono<String> r = Mono.deferContextual(ctx -> Mono.just("Hello " + ctx.get(key)))//3.contextWrite(ctx -> ctx.put(key, "Reactor"))//2.flatMap( s -> Mono.deferContextual(ctx ->Mono.just(s + " " + ctx.get(key))))//4.contextWrite(ctx -> ctx.put(key, "World"));//1
结果为Hello Reactor World
第一个需要获取上下文的deferContextual从邻近下游读取到Reactor,第二个flatMap邻近下游读取到的World。
例子4
String key = "message";
Mono<String> r = Mono.just("Hello").flatMap( s -> Mono.deferContextual(ctxView -> Mono.just(s + " " + ctxView.get(key)))).flatMap( s -> Mono.deferContextual(ctxView -> Mono.just(s + " " + ctxView.get(key))).contextWrite(ctx -> ctx.put(key, "Reactor")) ).contextWrite(ctx -> ctx.put(key, "World"));
结果为Hello World Reactor
因为第一个contextWrite只影响内部的操作流。第二个contextWrite只会影响外部主操作流。
11.3 Spring WebFlux处理Context
思路:根据Reactor Context只对上游生效的特性,WebFlux只用在后置处理器中做contextWrite操作就可以了。遗憾的是在Spring WebFlux中没有HandlerInterceptorAdapter,但是可以使用过滤器WebFilter。
@Component
@Slf4j
public class AuthFilter implements WebFilter {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {return chain.filter(exchange).contextWrite(ctx -> {log.info("设置context内容。");return ctx.put("token", "xx");//注意这里一定要return ctx.put//put操作是产生一个新的context。如果是return ctx;会导致值没有设置进去});}}
按照上面的处理方式,在我们业务开发中就能取到上下文的信息了。
@PostMapping(name = "测试", value = "/save")public Mono<Test> save(@RequestBody Mono<Test> testMono) {return testService.add(testMono).log("", Level.INFO, true).flatMap(test -> {return Mono.contextWrite().map(ctx -> {log.info("context:" + ctx.get("token"));return test;});});}
如果想设置多个值可以像下面这样操作。
@Component
public class WebFluxFilter implements WebFilter {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {return chain.filter(exchange).contextWrite(ctx -> {return ctx.put("token", "123123123123");}).contextWrite(ctx -> {return ctx.put("liu", "hah");});}
}
十二、Reactor doOn系列函数
在Publisher使用subscribe()方法的时候,Subscriber触发回触发一系列的on方法,如onSubscribe();为了更好的监控以及观测异步序列的传递情况,设置了一系列的doOn方法,在触发onxxx方法的时候作为其副作\附加行为,用于监控行为的运行情况。
12.1 官网doOn方法解释
-
Without modifying the final sequence, I want to:
-
get notified of / execute additional behavior (sometimes referred to as “side-effects”) on:
(也就是在不改变Flux或Mono的情况下,以下情况发生时想被通知/想执行额外的行为)
- emissions:
doOnNext
(Flux|Mono) - completion: Flux#doOnComplete, Mono#doOnSuccess (includes the result, if any)
- error termination:
doOnError
(Flux|Mono) - cancellation:
doOnCancel
(Flux|Mono) - “start” of the sequence:
doFirst
(Flux|Mono)- this is tied to Publisher#subscribe(Subscriber)
- post-subscription :
doOnSubscribe
(Flux|Mono)Subscription
acknowledgment aftersubscribe
- this is tied to Subscriber#onSubscribe(Subscription)
- request:
doOnRequest
(Flux|Mono) - completion or error:
doOnTerminate
(Flux|Mono)- but after it has been propagated downstream:
doAfterTerminate
(Flux|Mono)
- but after it has been propagated downstream:
- any type of signal, represented as a Signal:
doOnEach
(Flux|Mono) - any terminating condition (complete, error, cancel):
doFinally
(Flux|Mono)
- emissions:
-
log what happens internally:
log
(Flux|Mono)
-
-
I want to know of all events:
- each represented as Signal object:
- in a callback outside the sequence:
doOnEach
(Flux|Mono) - instead of the original onNext emissions:
materialize
(Flux|Mono)- …and get back to the onNexts:
dematerialize
(Flux|Mono)
- …and get back to the onNexts:
- in a callback outside the sequence:
- as a line in a log:
log
(Flux|Mono)
- each represented as Signal object:
12.1 常见doOn方法
- doOnEach(): 对每一个元素对应的single对象进行监控

-
doOnSubscribe(): 用以监控onSubscribe()方法的执行
/** Add behavior (side-effect) triggered when the Flux is subscribed. param onSubscribe the callback to call on onSubscribe */ public final Flux<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe) {Objects.requireNonNull(onSubscribe, "onSubscribe");return doOnSignal(this, onSubscribe, null, null, null, null, null, null);}
-
doOnRequest:对request行为监控产生副作用
-
doOnNext:onNext副作用

-
doOnError:出现error时的副作用,用于监控报错,可以通过错误类型进行筛选
-
doOnComplete:完成时触犯
-
doOnCancel:取消时触发
-
doOnTerminate:终止时触发,无论是成功还是出现异常
例子:
@Test
public void doOnWithMono () {
Mono.just("ffzs").map(String::toUpperCase).doOnSubscribe(subscription -> log.info("test do on subscribe")).doOnRequest(longNumber -> log.info("test do on request")).doOnNext(next -> log.info("test do on next1, value is {}", next)).map(String::toLowerCase).doOnNext(next -> log.info("test do on next2, value is {}", next)).doOnSuccess(success -> log.info("test do on success: {}", success)).subscribe();}@Test
public void doOnWithFlux () {Flux.range(1,10).map(i -> {if (i == 3) throw new RuntimeException("fake a mistake");else return String.valueOf(i);}).doOnError(error -> log.error("test do on error, error msg is: {}", error.getMessage())).doOnEach(info -> log.info("do on Each: {}", info.get())).doOnComplete(() -> log.info("test do on complete")) // 因为error没有完成时不触发.doOnTerminate(() -> log.info("test do on terminate")) // 无论完成与否,只要终止就触发.subscribe();
}
12.2 log()
reactor提供了一个很便利的监控方法:log()。在编写publisher的时候加上log(),在subscriber调用的时候会将触发的每一个behavior以日志的形式打印出来:
@Test
public void logTest () {Flux.range(1,5).map(i -> {if (i == 3) throw new RuntimeException("fake a mistake");else return String.valueOf(i);}).onErrorContinue((e, val) -> log.error("error type is: {}, msg is : {}", e.getClass(), e.getMessage())).log().subscribe();
}
效果如下,日志内容很详细,线程使用,onNext,request这些都会标明
