Reactor

article/2025/10/3 21:25:27

Reactor

初识Reactor

! ! Reactor讲解

Reactor:响应式编程,区别于一般:Client去询问Server是否有变化并拉取数据的模式,响应式编程是后台有数据变化时主动push给订阅者,也就是Publisher-Subscriber模型。

一、特点

  • 可组合性,即可以协调多个异步任务。

  • 提供丰富的操作符。

  • 订阅之前不会产生任何的动作。(即只有订阅的时候才执行操作)

  • 冷热响应

    • 冷响应:对于每个订阅者,包括在数据源处,数据推送都会从头开始。例如,如果源包装了HTTP调用,则会为每个订阅发出一个新的HTTP请求。
    • 热响应:并不是每个订阅者都可以从头开始接收数据。后面的订阅者会在订阅后接收已推送的信号。
  • 异步非堵塞

  • 流量控制(背压控制)

    Publisher的数据推送的过于频繁,Subscriber处理数据的速度跟不上Publisher推送的速度。这时候就用到背压控制:Subscriber向Publisher发送信号来控制Publisher push数据的速度

    可以用reactive库中的WebClient来进行实现。

二、Operators

1、filter: 过滤

对Flux流中包含的元素进行过滤,只留下满足指定条件的元素。

//只留下偶数元素
Flux.range(1,10).filter(i->i%2==0).subscribe(System.out::println);
在这里插入图片描述
2、map: 数据映射

把Flux的数据通过某些逻辑转换成另外的形式。

//结果为A B C
Flux.just("a","b","c").map(String::toUpperCase).subscribe(System.out::println);
3、index: 编号

给Flux中的每个元素加上编号。

4、window: 分组

把当前流中的元素收集到另外的 Flux 序列中,也就是分组,分组结果为多个Flux序列。

//把一个包含1-100的flux序列分组,20个为一组,每组为一个新的Flux序列
//这里window为一个包含5个Flux序列的Flux序列数组,其中每个Flux有20个元素
Flux<Flux<Integer>> window = Flux.range(1,100).window(20);
window.subscribe(System.out::println);
img
为什么输出的结果是UnicastProcessor?

其实,在调用window()方法的时候,其调用过程大致如下:

  1. 序列得到订阅,则开始首次发送元素,这时候会创建一个新的源序列UnicastProcessor。
  2. 当UnicastProcessor接受的元素数量到达20个时,执行对应的onComplete()方法。
  3. 那么当第21个元素发送的时候,又会创建一个新的源序列UnicastProcessor,并向下游传递,以此类推。
注意: window和buffer的区别

(首先有个前提,我们数据都是从上游往下游进行传输的)

  • window是先创建一个UnicastProcessor序列,然后直接向下游传递。
  • buffer是先收集满20个元素,再向下游进行传递。
5、buffer:分组

对一个Flux的元素进行分组,分组结果还是1个Flux序列。

5.1 buffer:有一个参数,指定了每组所包含的元素的最大数量
//结果:输出5组数据,每一组数据包含20个元素
Flux.range(1,100).buffer(20).subscribe(System.out::println)

5.2 bufferUntil:参数表示每个集合中的元素要满足的条件。bufferUntil会一直收集,直到条件==true
//结果:输出5组数据,每一组包含2个元素
//只有当前元素为偶数的时候,才会停止当前元素的收集(当前组包含当前元素),接下来的元素另起组
Flux.range(1,10).bufferUntil(i->i%2==0).subscribe(System.out::println);

5.3 bufferTimeout:2个参数,分别制定了每组所包含的元素的最大数量、收集的时间间隔
5.4 bufferWhile:参数表示每个集合中的元素要满足的条件。bufferWhile则只有当条件==true时才会收集。一旦为false,会立即开始下一次收集。
//结果:输出5组数据,每组包含1个元素
//只有当前元素为偶数时,才进行元素收集
Flux.range(1,10).bufferWhile(i->i%2==0).subscribe(System.out::println);

6、take: 提取数据


6.1 take
//取前三个元素
//结果为1 2 3
Flux.range(1,100).take(3).subscribe(System.out::println);
6.2 takeLast
//取最后三个元素
//结果为98 99 100
Flux.range(1,100).takeLast(3).subscribe(System.out::println);
6.3 takeWhile
//当条件为真时才进行提取
//结果为1 2
Flux.range(1,100).takeWhile(i->i<3).subscribe(System.out::println);
6.4 takeUntil
//当条件为真时停止提取
//结果为1 2
Flux.range(1,100).takeWhile(i->i==3).subscribe(System.out::println);
7、zipWith:元素合并

将两个Flux流中的元素按照一对一的方式进行合并

注意:
  1. 在合并时可以不做任何处理,由此得到的是一个元素类型为 Tuple2 的流。
  2. 若某个序列中的元素数量偏多或者偏少,那么多余的结果并不会输出
// 结果:[a,c] [b,d]
Flux.just("a", "b").zipWith(Flux.just("c", "d","e","f")).subscribe(System.out::println);// 结果:a-c  b-d
//第二个参数为逻辑处理流程
Flux.just("a", "b").zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2)).subscribe(System.out::println);

在这里插入图片描述

8、reduce:累加、累积

对流中包含的所有元素进行累积操作,得到一个包含计算结果的 Mono 序列

注意:
  • 在操作时可以指定一个初始值。如果没有初始值,则序列的第一个元素作为初始值。
  • 累积操作并不单单代表累加、累积的任意一种,而指的是对所有的元素做一个统一的操作,可能是累积、累加等一系列操作。
//1-100累加,结果为5050
Flux.range(1,100).reduce(Integer::sum).subscribe(System.out::println);
//1-100累加再加上100,结果为5150
Flux.range(1, 100).reduceWith(() -> 100, Integer::sum).subscribe(System.out::println);
9、merge:流合并

把多个流合并成一个 Flux 序列

Flux.merge(Flux.interval(Duration.ZERO, Duration.ofMillis(100)).take(9),Flux.interval(Duration.ofMillis(50), Duration.ofMillis(100)).take(2),Flux.interval(Duration.ofMillis(100), Duration.ofMillis(100)).take(3)
).subscribe((t) -> System.out.println("Flux.merge :" + t + ",线程:" + Thread.currentThread().getName()));// 让主线程睡眠2s,保证上面的输出能够完整Thread.sleep(2000);
在这里插入图片描述
注意:
  1. merge() 会按照所有流中元素的实际产生顺序来合并,代码中定义了每个流中元素产生的时间间隔。
  2. merge()方法的兄弟方法:mergeSequential(),按照所有流被订阅的顺序,以流为单位进行合并。
  3. 代码中对Flux.interval()方法生成的流,采用了take()操作来制定获取的元素,若不采用,那么该流是一个无限序列。
10、interval:延迟

Flux.interval(xxx1,xxx2):时间类操作符,按照指定的参数来创建流。

  • xxx1:第一次执行的延迟时间。
  • xxx2:每隔多少秒发送一次事件,发送的内容是Long类型整数,从0开始。
11、flatMap:转换并合并结果

把流中的每个元素转换成一个流,再把所有流中的元素进行合并

Flux<String> stringFlux1 = Flux.just("a", "b", "c", "d", "e", "f", "g", "h", "i");// [a,b],[c,d],[e,f],[g,h],[i]
Flux<Flux<String>> stringFlux2 = stringFlux1.window(2);stringFlux2.flatMap(flux1 -> flux1.map(String::toUpperCase)).subscribe(System.out::print);

在这里插入图片描述

12、concatMap:转换并合并结果

concatMap 操作同样是根据源中的每个元素来获取一个子源,再把所有子源进行合并。

Flux.just(5, 10)// 将源中的每个元素来获取一个子源,那么这里的x指的是5或者10// 然后以元素10为例:x=10,通过Flux.interval来生成序列,并取出10-3=7个元素.concatMap(x -> Flux//延迟x*10ms,之后每100ms开始生成从0开始的元素:0、1、2、3、4…….interval(Duration.ofMillis(x * 10), Duration.ofMillis(100))//将原本Flux流中的两个元素和interval生成的元素通过map合并//形式为x:i.map(i -> x + ": " + i)//取最终流中的前x-3个元素.take(x - 3)).toStream().forEach(System.out::println);

在这里插入图片描述

注意: concatMap和flatMap有什么不同?
  • 顺序:concatMap 操作会根据初始源中的元素顺序依次将获取到的子源进行合并。而flatMap是不会的。
  • 订阅:concatMap 操作对所获取到的子源的订阅是动态进行的,而flatMap则是在合并开始之前就订阅了由父源下发的所有子源。(?不懂)
13、groupBy:分组

通过一个策略 key 将一个 Flux分割为多个组

Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9)//分组,偶数为一组,奇数为一组。key分别为even和odd.groupBy(i -> i % 2 == 0 ? "even" : "odd")//合并两个Flux流.concatMap(i -> i//defaultIfEmpty:如果没有任何数据完成此序列,则提供默认的唯一值.defaultIfEmpty(-1)//强制转换为String.map(String::valueOf)//不确定,应该是以i的key为每个i的前缀,这里i表示两个Flux流.startWith(i.key())).subscribe(System.out::println);
在这里插入图片描述
注意:
  • 几个组之间的元素不会存在交集,也就是一个元素只属于其中1个组。
  • 组永远不会为空,因为如果进行分组的时候发现没有对应的组,则进行创建操作。
14、defaultIfEmpty:分配默认值

如果没有任何数据完成此序列,则提供默认的唯一值

三、流式编程Example

消息驱动模式:subcribe()之前的代码描述应该发生什么,但是都尚未发生,直到有人subscribe(代码最后一行subcribe()),才会进行处理的流程(每个.之后都是一步流程) 。也就是订阅之前不会产生任何的动作

当produces= MediaType.APPLICATION_STREAM_JSON_VALUE时,标注了stream,那么就表述在Publisher和Subcriber之间建立了一个长链接

四、Reactor Publisher

https://skyao.io/learning-reactor/docs/concept/flux/flux.html
https://segmentfault.com/a/1190000024499748

Publisher<T>是一个可以提供0-N个序列元素的提供者,并根据其订阅者Subscriber<? super T>的需求推送元素。一个Publisher<T>可以支持多个订阅者,并可以根据订阅者的逻辑进行推送序列元素。

下面这个Excel计算就能说明一些Publisher<T>的特点。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rtNTqK7T-1664329234290)(https://segmentfault.com/img/bVbOXFl)]

A1-A9就可以看做Publisher<T>及其提供的元素序列。A10-A13可以看作订阅者Subscriber。假如说我们没有A10-A13,那么A1-A9就没有实际意义,它们并不产生计算。这也是响应式的一个重要特点:当没有订阅时发布者什么也不做

FluxMono都是Publisher<T>Reactor 3**实现。

4.1 Flux(一串数据的抽象)

Flux作为Publisher,产生0-N个数据对象 ==> 数据对象交给operator()处理器进行处理 ==> 生成另外的数据对象。

Flux 是一个发出(emit)0-N个元素组成的异步序列的Publisher<T>.

在响应流规范中存在三种给下游消费者调用的方法 onNext, onComplete, 和onError

  • 正常的包含元素的消息:onNext()
  • 序列结束的消息:onComplete()
  • 序列出错的消息:onError()

  • Flux序列中可以有n个元素,每个元素都会经过operator操作进行数据的转换然后输出。
  • 如果由于某些原因发生了错误,那么会终止该序列。

4.1.1 创建Flux

(1) generate
generate()方法的结构:
generate(// 用于初始化值Callable<S> stateSupplier, // 生成器,也就是逻辑处理的函数BiFunction<S, SynchronousSink<T>, S> generator,// consumer,用于在序列终止的时候调用执行,比如关闭数据源    Consumer<? super S> stateConsumer
);
/**
简单来看,该方法的调用就是三步走:
初始化。
逻辑处理。
终止处理(fiinally)
**/   
简单例子:
Flux.generate(t->{ t.next(1);t.complete();
}).subscribe(System.out::println);
//创建一个Flux,有1个元素
//注意generate中next只能调用1次
复杂例子:
public void testGenerate() {Flux.generate(// 1.生成一个对象,用来作为状态,即初始化操作。AtomicInteger::new,// 2.逻辑处理(state, sink) -> {// 改变状态值int i = state.getAndIncrement();sink.next("2*" + i + "=" + 2 * i);if (i == 5) {// sink事件结束。sink.complete();}return state;},// 3.序列终止时,输出状态值。(state) -> System.out.println("state:" + state)).subscribe(System.out::println);
}

在这里插入图片描述

在这里插入图片描述
(2) create
create()方法的结构:
// 一个参数时,第一个参数为逻辑处理函数/lambda表达式,第二个参数不写时默认为Buffer(背压策略之一)
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) {return create(emitter, OverflowStrategy.BUFFER);
}//两个参数时,第一个参数为逻辑处理函数/lambda表达式,第二个参数为背压策略
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) {return onAssembly((Flux)(new FluxCreate(emitter, backpressure, CreateMode.PUSH_PULL)));
}
简单例子
Flux.create((t)->{t.next(1);t.next(2);t.complete();
}).subscribe(System.out::println);
//创建一个Flux,有多个元素,也就是可以多次发射
复杂例子
 private Flux<EventSource.Event> createFlux(FluxSink.OverflowStrategy strategy) {// 事件源注册了一个监听器,负责监听新事件的创建以及事件源的停止return Flux.create(sink -> eventSource.register(new MyListener() {@Overridepublic void newEvent(EventSource.Event event) {System.out.println("上游------>数据源创建了新事件:" + event.getMsg());sink.next(event);}@Overridepublic void eventSourceStopped() {sink.complete();}}), strategy); // 别忘了这里还有个背压策略的参数}
Create() 方法跟Generate() 比较来看,有这么几个不同:
  1. Create() 可以同步、也可以异步。而Generate() 是同步的。

    //自定义一个事件源类EventSource:
    public class EventSource {private List<MyListener> listeners;public EventSource() {this.listeners = new ArrayList<>();}public void register(MyListener listener){listeners.add(listener);}public void newEvent(Event event){for (MyListener listener : listeners) {listener.newEvent(event);}}public void eventStopped(){for (MyListener listener : listeners) {listener.eventSourceStopped();}}@Data@AllArgsConstructorpublic static class Event{private Date time;private String msg;}
    }//定义一个监听器接口MyListener
    public interface MyListener<T> {// 监听新事件void newEvent(EventSource.Event event);// 监听事件源的终止void eventSourceStopped();
    }//测试方法
    // 1.构建一个事件源
    EventSource eventSource = new EventSource();
    Flux.create(sink -> {// 2.向事件源中添加一个监听器,负责监听事件的产生和事件源的停止。// 相当于将事件转换成异步的事件流eventSource.register(new MyListener() {@Overridepublic void newEvent(EventSource.Event event) {// 3.监听器收到事件回调的时候,会通过sink将事件发出sink.next(event);}@Overridepublic void eventSourceStopped() {// 4.监听器收到事件源停止的回调后,由sink发出完成信号,停止sinksink.complete();}});
    }).subscribe(System.out::println);// 5.循环产生订阅
    for (int i = 0; i < 5; i++) {TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));eventSource.newEvent(new EventSource.Event(new Date(), "事件" + i + "注册!"));
    }
    // 6.停止事件源
    eventSource.eventStopped();
    在这里插入图片描述
  2. Create() 可以每次发出多个元素,而Generate() 每次发出一个。

    //create()
    Flux.create(sink -> {for (int i = 0; i < 5; i++) {sink.next(i);}sink.complete();
    }).subscribe(System.out::println);
    //generate()
    Flux.generate(t->{ t.next(1);t.complete();
    }).subscribe(System.out::println);
    
  3. Create() 不需要状态值。

  4. Create() 可以在回调中触发多个事件。

  5. Create() 是多线程的。

(3) push

Create有一个变体模式:push()方法,和create()方法不同的是,他是一个单线程的,即只有一个线程可以调用next()complete()方法。

(4) just
Flux.just("1","2").subscribe(System.out::println);
//just("1","2")可以列举生成序列的所有元素(这里是1、2),
//just()创建出来的序列,在发布元素后(subscribe())会自动结束
(5) from(通过Publisher创建一个Flux)
//Flux->Flux
Flux.from(Flux.just("just", "just1", "just2")).subscribe(System.out::println);
//Mono->Mono
Flux.from(Mono.just("just")).subscribe(System.out::println);
(6) fromArray(创建发出数组的Flux)
// fromArray()从一个数组对象中创建一个Flux对象Integer[] arr = new Integer[]{1, 2, 3};Flux<Integer> f4 = Flux.fromArray(arr);
(7) fromIterable、fromStream
// fromIterable()从一个Iterable对象中创建一个Flux对象Flux<String> f2 = Flux.fromIterable(Arrays.asList("str1", "str2", "str3"));// fromStream()从一个Stream对象中创建一个Flux对象ArrayList<Integer> numList = new ArrayList<>();numList.add(1);Flux<Integer> f3 = Flux.fromStream(numList.stream());
(8) defer(推迟发送)
Flux.defer(()->Flux.range(1,10)).subscirbe(System.out::println);
(9) range
Flux.range(1,10).subscirbe(System.out::println);
//range(int starter, int counter),创建包含从starter开始的counter个Integer对象的序列
//例如:range(1,10)创建序列,包含从1起始的10个数量的Integer对象
//Integer 类在对象中包装了一个基本类型 int 的值。该类提供了多个方法:能在 int 类型和 String 类型之间互相转换,还提供了处理 int 类型时非常有用的其他一些常量和方法
(10) interval(Duration period )、interval(Duration delay, Duration period)

创建一个包含了从0开始递增的Long对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间

Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
Flux.just(2, 4).flatMap(x -> Flux.interval(Duration.ofMillis(1000)).take(x)).toStream().forEach(System.out::println);
//流中的元素被转换成每隔 1000毫秒产生的数量不同的流,再进行合并

https://www.jianshu.com/p/8d80dcb4e7e0—>如何使用flatMap()

(11) intervalMillis(long period)、intervalMillis(long delay, long period)

与interval()方法相同,但该方法通过毫秒数来指定时间间隔和延迟时间

Flux.intervalMillis(1000).subscirbe(System.out::println);
(12) empty()、never()、error()
    // 创建一个不包含任何元素,只发布结束消息的序列。// 并且这种方式不会进行后续传递,需要switchIfEmpty()方法来进行处理。// 因为响应式编程中,流的处理是基于元素的,而empty()是没有元素的!Flux<Object> empty = Flux.empty();// 创建一个只包含错误消息的序列,里面的参数类型是ThrowableFlux<Object> error = Flux.error(new Exception("error!"));// 创建一个不包含任何消息通知的序列,注意区别empty(),empty还是会发布结束消息的。Flux<Object> never = Flux.never();

4.1.2 订阅Flux流: subscribe()

//subscribe():订阅Flux序列,只有进行订阅后才回触发数据流,不订阅就什么都不会发生Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4);integerFlux.subscribe(System.out::println);// lambda表达式Flux<String> stringFlux = Flux.just("hello", "world");stringFlux.subscribe(System.out::println);  
(1) 完整参数subcribe()方法
/*** consumer:即如何处理数据元素,比如可以打印输出print。* errorConsumer:若发生错误如何处理,即错误信号(同时终止序列)。* completeConsumer:即完成信号(同时终止序列)。* subscriptionConsumer:订阅发生时候的处理逻辑*/
public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer) 
{return (Disposable)this.subscribeWith(new LambdaSubscriber(consumer, errorConsumer, completeConsumer, subscriptionConsumer));
}

例如:

public class MySubscriber extends BaseSubscriber {@Overrideprotected void hookOnSubscribe(Subscription subscription) {System.out.println("订阅开始!");// 订阅时首先向上游请求1个元素request(1);}@Overrideprotected void hookOnNext(Object value) {System.out.println("获取当前元素成功!" + value);// 每次处理完后再去请求1个元素request(1);}
}public void testOut() {// 1.创建一个序列Flux<Integer> flux = Flux.range(1, 6).map(i -> {if (i <= 4) {return i * 2;}throw new RuntimeException("数据超过了5!");});// 2.订阅MySubscriber ss = new MySubscriber();flux.subscribe(System.out::println,error -> System.out.println("Error:" + error),() -> System.out.println("Complete!"),s -> ss.request(2));flux.subscribe(ss);
}

在这里插入图片描述

(2) Note:
  1. subscribe()方法的第四个参数只能接收一个自定义的Subscriber,需要重写BaseSubscriber。
  2. BaseSubscriber是一个抽象类,定义了多种用于处理不同信号的hook方法。
  3. 至少要重写hookOnSubscribe()方法和hookOnNext()方法。
  4. 若存在第四个参数,那么在对应的自定义订阅类中,对应重写的方法代替对应位置的lambda表达式。 比如我重写了onNext()方法,替换了原有lambda表达式中单纯的输出结果(加了几个中文字)。那么再简单点,说白了,BaseSubscriber是Lambda的替代品。

4.2 Mono(1\0个数据的抽象)

Mono作为Publisher,产生0-1个数据对象 ==> 数据对象交给operator()处理器进行处理 ==> 生成另外的数据对象

Mono 是一个发出(emit)0-1个元素的Publisher<T>,可以被onComplete信号或者onError信号所终止。

Flux 和 Mono 之间可以进行转换,例如:

  1. 对一个 Flux 序列进行计数操作,得到的结果是一个 Mono对象
  2. 把两个 Mono 序列合并在一起,得到的是一个 Flux 对象。
img
  • 最多发出一项元素,并且以成功信号或者失败信号为终止信号。
//创建MONO
Mono.just("test hjf").subscribe(System.out::println);
Mono.empty().subscribe(System.out::println);
Mono.error(new Throwable()).subscribe(System.out::println);
Mono.never().subscribe(System.out::println);//fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。
Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);//justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data):从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);//delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
Mono.delay(Duration.ofSeconds(10)).subscribe(System.out::println);//ignoreElements(Publisher<T> source):创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。
Mono.ignoreElements(null).subscribe(System.out::println);//还可以通过 create()方法来使用 MonoSink 来创建 Mono。
Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);

五、Lambda表达式

六、背压

两种支持异步的生成Flux的方法create()和push(),支持背压操作,两个方法的第二个参数用于声明背压策略。

// 一个参数时,第一个参数为逻辑处理函数/lambda表达式,第二个参数不写时默认为Buffer(背压策略之一)
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) {return create(emitter, OverflowStrategy.BUFFER);
}//两个参数时,第一个参数为逻辑处理函数/lambda表达式,第二个参数为背压策略
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure) {return onAssembly((Flux)(new FluxCreate(emitter, backpressure, CreateMode.PUSH_PULL)));
}

6.1 背压策略

public static enum OverflowStrategy {IGNORE,//忽略下游的背压请求,上游push消息过快时会直接报错:IllegalStateExceptionERROR,//上游push消息过快时会直接报错:IllegalStateExceptionDROP,//下游直接丢弃处理不了的eventLATEST,//让下游只从上游获取最新eventBUFFER;//(默认值)以在下游无法跟上时缓冲所有信号。(这会实现无限缓冲,并可能导致OutOfMemoryError)private OverflowStrategy() {}
}

6.2 request()

request()方法用于限制下游向上游的请求个数,避免下游压力过大

@Test
public void () {Flux<Integer> flux = Flux.range(1, 10).log();flux.subscribe(new BaseSubscriber<Integer>() {private int count = 0;private final int requestCount = 4;@Overrideprotected void hookOnSubscribe(Subscription subscription) {//订阅时,每次只让上游发送requestCount个请求request(requestCount);}@SneakyThrows@Overrideprotected void hookOnNext(Integer value) {count++;// 通过count控制每次request 4个元素//如果队列里已经有4个元素没处理,那么就睡眠1sif (count == requestCount) {Thread.sleep(1000);request(requestCount);count = 0;}}});
}

结果(每个输出停顿1秒):
在这里插入图片描述
如果将变量requestCount改为2:结果会是2个2个输出
在这里插入图片描述

七、打包操作

用于调用外部公共方法。

transform()

//定义一个公共方法
//筛选出不以str开头的元素,并变成大写
Function <Flux<String>, Flux<String>> filterAndMap =f->f.filter(color ->!color.startsWith("str")).map(String::toUpperCase);//定义Flux序列
Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple", "yellow", "str1", "str2"))//调用外部函数.transform(filterAndMap).subscribe(System.out::println);
在这里插入图片描述

compose()

//定义一个公共方法
AtomicInteger i = new AtomicInteger();
Function <Flux<String>,Flux<String>> function = f ->{//第1个线程就抛去“str1”,并转换成大写字母if(i.incrementAndGet()==1){return f.filter(color -> !color.equals("str1")).map(String::toUpperCase);}//不是第1个线程就抛去“str2”,并转换成大写字母return f.filter(color->!color.equals("str2")).map(String::toUpperCase);
}//定义Flux序列
Flux<String> flux =Flux.fromIterable(Arrays.asList("str1", "str2", "str3", "str4"))// 调用外部定义好的function.compose(function);
//第1个线程
flux.subscribe(d -> System.out.println("订阅者1:获得数据" + d));
System.out.println("==============================================");
//第2个线程
flux.subscribe(d -> System.out.println("订阅者2:获得数据" + d));
在这里插入图片描述

如果将compose()改为transform(),结果如下:
在这里插入图片描述

结论如下:

  • compose()中打包的函数可以是有状态的,本案例中对应我们的AtomicInteger对象。
  • transform()打包的函数是无状态的。
AtomicInteger

在Java语言中,++i和i++操作并不是线程安全的而AtomicInteger是线程安全的,即使不使用synchronized关键字也能保证其是线程安全的,主要用于多线程间共享计数。

比如多个线程从列表中依次读取数据,使用AtomicInteger来计数,每次取第AtomicInteger个。而且由于AtomicInteger由硬件提供原子操作指令实现,在非激烈竞争的情况下,开销更小,速度更快。

 AtomicInteger atomicInteger = new AtomicInteger();
//将原先的值进行了一个加1,但是返回的加1之前的值
atomicInteger.getAndIncrement()
//将原先的值进行了一个加1,返回的加1之后的值
atomicInteger.incrementAndGet()

如果多线程想要借助AtomicInteger共享计数,需要在主线程(main)中创建并初始化AtomicInteger对象,再通过构造函数的引用传递才能实现多线程共享计数

public class TestThread extends Thread implements Runnable {public static void main(String[] args) {//使用此种方式,运行结果顺序递增,但是两个线程分别计数//两个线程分别新建了一个AtomicInterge实例对象,所以不会共享计数TestThread thread1 = new TestThread();TestThread thread2 = new TestThread();thread1.start();thread2.start();//使用此种方式,运行结果不一定顺序递增,但是统一计数AtomicInteger testAtomicInteger = new AtomicInteger(0);//两个线程使用同一个AtomicInterge实例对象,所以会共享计数TestThread thread1 = new TestThread(testAtomicInteger);TestThread thread2 = new TestThread(testAtomicInteger);thread1.start();thread2.start();}public TestThread() {// 构造函数,每次使用构造函数都会初始化一个AtomicInteger=0counterAtomicInteger = new AtomicInteger(0);}public TestThread(AtomicInteger counterAtomicInteger) {// 构造函数,每次使用构造函数都会初始化一个作为参数传入的AtomicIntegerthis.counterAtomicInteger = counterAtomicInteger;}
}

八、Schedule定时器

很多情况下,我们的publisher是需要定时去调用一些方法,来产生元素的。Reactor提供了一个新的Schedule类来负责定时任务的生成和管理。

Schedule是一个接口,定义了一些定时器中必须要实现的方法:

立即执行的:

Disposable schedule(Runnable task);

延时执行的:

default Disposable schedule(Runnable task, long delay, TimeUnit unit)

定期执行的:

default Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit)

8.1 Schedulers调度器

Schedule有一个工具类叫做Schedulers,它提供了多个创建Scheduler的方法,它的本质就是对ExecutorService和ScheduledExecutorService进行封装,将其做为Supplier来创建Schedule。

Scheduler提供的静态方法用于创建如下几种线程环境:

(1) 当前线程:Schedulers.immediate()

提交的Runnable将会立马在当前线程执行。

(2) 可重用的单线程:Schedulers.single()

使用同一个线程来执行所有的任务。当各个调用者调用这个方法的时候,都会重用同一个线程,直到这个Scheduler的状态被设定为disposed。

(3) 弹性线程池:Schedulers.elastic()

该方法是一种将阻塞处理放在一个单独的线程中执行的很好的方式,即可以拿来包装一个堵塞方法,将其变为异步。

1.当首次调用这个方法的时候,会创建一个新的线程池,而且这个线程池中闲置的线程可以被重用。
2.如果一个线程的闲置时间太长(默认60s),则会被销毁。

(4) 固定大小线程池:Schedulers.parallel()

创建固定个数的工作线程,个数和CPU的核数相关。

(5) Schedulers.fromExecutorService(ExecutorService)

从一个现有的线程池创建Scheduler。

(6) Schedulers.newXXX

Schedulers提供了很多new开头的方法,来创建各种各样的Scheduler。

//可以指定特定的Scheduler来产生元素
Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))

8.2 注意事项

  1. 某些Reactor的操作已经是默认使用了特定类型的调度器。例如Flux.interval()创建的源,使用了Schedulers.parallel()等。
  2. 可以通过publishOn和subscribeOn来进行切换Scheduler的执行上下文。publishOn()切换的是元素消费操作执行时所在的线程。subscribeOn()切换的是源中元素生产操作执行时所在的线程。

九、publishOn() / subscribeOn()

Flux和Mono不会创建线程,只有当触发subscribe()操作时才会执行对应的方法。

而有些操作符,例如**publishOn()subscribeOn()**方法,能够创建线程。publishOn和subscribeOn主要用来进行切换Scheduler的执行上下文。

9.1 publishOn():

publishOn() 会强制下一个Operator(或许是下一个的下一个…)运行于不同的线程上。也就相当于publishOn()后面的逻辑都执行在另外一个线程上了

例1:
Flux.range(1, 2).map(i -> {System.out.println(Thread.currentThread().getName());return i;}).publishOn(Schedulers.single()).map(i -> {System.out.println(Thread.currentThread().getName());return i;}).publishOn(Schedulers.newParallel("parallel", 4)).map(i -> {System.out.println(Thread.currentThread().getName());return i;}).subscribe();
在这里插入图片描述

流程是:

例2:
 Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux.range(1, 2).map(i -> 10 + i + ":"+ Thread.currentThread()).publishOn(s).map(i -> "value " + i+":"+ Thread.currentThread());
new Thread(() -> flux.subscribe(System.out::println),"ThreadA").start();
System.out.println(Thread.currentThread());
Thread.sleep(5000);
  1. 上面创建了一个名字为parallel-scheduler的scheduler。

  2. 然后创建了一个Flux,Flux先做了一个map操作,然后切换执行上下文到parallel-scheduler线程,最后右执行了一次map操作。

  3. 最后,我们采用一个新的线程来进行subscribe的输出。

输出结果:

Thread[main,5,main]
value 11:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[ThreadA,5,main]:Thread[parallel-scheduler-1,5,main]

可以看到,主线程的名字是Thread。Subscriber线程的名字是ThreadA。

那么在publishOn之前,map使用的线程就是ThreadA。 而在publishOn之后,map使用的线程就切换到了parallel-scheduler线程池。

9.2 subscribeOn

subscribeOn是用来切换Subscriber的执行上下文,不管subscribeOn出现在调用链的哪个部分,最终都会应用到整个调用链上。

 Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux.range(1, 2).map(i -> 10 + i + ":" + Thread.currentThread()).subscribeOn(s).map(i -> "value " + i + ":"+ Thread.currentThread());
new Thread(() -> flux.subscribe(System.out::println), "ThreadA").start();
Thread.sleep(5000);

例子同样使用了两个map,然后在两个map中使用了一个subscribeOn用来切换subscribe执行上下文。输出结果:

value 11:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]
value 12:Thread[parallel-scheduler-1,5,main]:Thread[parallel-scheduler-1,5,main]

可以看到,不管哪个map,都是用的是切换过的parallel-scheduler。

9.3 publishOn和subscribeOn的区别

  1. publishOn一般使用在订阅链的中间位置,并且从下游获取信号,影响调用位置起后续运算的执行位置
  2. subscribeOn一般用于构造向后传播的订阅过程。并且无论放到什么位置,它始终会影响源发射的上下文。同时不会影响对publishOn的后续调用的行为。
  3. publishOn会强制让下一个运算符(或者下下个)运行于不同的线程上,subscribeOn会强制让上一个(或者上上个)运算符在不同的线程上执行。

十、Processor

https://blog.csdn.net/get_set/article/details/79799895

https://blog.csdn.net/Zong_0915/article/details/115048153

Processor既是一个Publisher也是一个Subscriber,是一个接口。 所以能够订阅一个Processor,也可以调用它们提供的方法来手动插入数据到序列,或终止序列。

其中的两个实现类:

  • MonoProcessor

  • FluxProcessor

而FluxProcessor又可以衍生出多种Processor,负责应对不同的场景使用:

  • UnicastProcessor
  • DirectProcessor
  • EmitterProcessor
  • ReplayProcessor
  • TopicProcessor
  • WorkQueueProcessor

十一、Context

https://blog.csdn.net/Zong_0915/article/details/115397437

https://blog.csdn.net/LCBUSHIHAHA/article/details/114837031#:~:text=Reactor%E4%B8%AD,ue%E7%9A%84%E5%BD%A2%E5%BC%8F%E5%AD%98%E5%82%A8%E3%80%82

11.1 Context用法:

  1. Mono.deferContextual(c-> c.get(key)):获得Context中指定Key对应的Value值。
  2. contextWrite(c -> c.put(key, value):往Context中塞入一个键值对。
  3. 简单用法的格式:Mono.deferContextual(c ->Mono.just(c.get(key)))
  4. contextWrite只对它上游的操作生效,对下游的操作不生效。
  5. 如果下游有多个contestWriter,上游会从邻近的contextWrite中读取上下文的信息。

11.2 Context用例

例子1
String key = "message";
Mono<String> r = Mono.just("Hello").flatMap(s -> Mono.deferContextual(ctx ->Mono.just(s + " " + ctx.get(key)))).contextWrite(ctx -> ctx.put(key, "World"));
  1. 使用contextWrite()Context中写入一个键值对,key:messagevalue:World
  2. faltMap()中,通过Mono.deferContextual()获得一个Context对象,可以取出对应key的value值,并进行拼接。
  3. 结果是Hello World

结果说明:订阅是从下游流向上游的,contextWrite只对它上游的操作生效,对下游的操作不生效,在上边例子中也就是先调用了contextWrite(),然后才调用了Mono.flatMap()。

例子2:
 String key = "message";
Mono<String> r = Mono.just("Hello").flatMap(s -> Mono.deferContextual(ctx ->Mono.just(s + " " + ctx.get(key)))).contextWrite(ctx -> ctx.put(key, "Reactor")).contextWrite(ctx -> ctx.put(key, "World"));
在这里插入图片描述

结果是Hello Reactor

例子3 ??:
String key = "message";
Mono<String> r = Mono.deferContextual(ctx -> Mono.just("Hello " + ctx.get(key)))//3.contextWrite(ctx -> ctx.put(key, "Reactor"))//2.flatMap( s -> Mono.deferContextual(ctx ->Mono.just(s + " " + ctx.get(key))))//4.contextWrite(ctx -> ctx.put(key, "World"));//1

结果为Hello Reactor World

第一个需要获取上下文的deferContextual从邻近下游读取到Reactor,第二个flatMap邻近下游读取到的World。

例子4
String key = "message";
Mono<String> r = Mono.just("Hello").flatMap( s -> Mono.deferContextual(ctxView -> Mono.just(s + " " + ctxView.get(key)))).flatMap( s -> Mono.deferContextual(ctxView -> Mono.just(s + " " + ctxView.get(key))).contextWrite(ctx -> ctx.put(key, "Reactor")) ).contextWrite(ctx -> ctx.put(key, "World")); 

结果为Hello World Reactor

因为第一个contextWrite只影响内部的操作流。第二个contextWrite只会影响外部主操作流。

11.3 Spring WebFlux处理Context

思路:根据Reactor Context只对上游生效的特性,WebFlux只用在后置处理器中做contextWrite操作就可以了。遗憾的是在Spring WebFlux中没有HandlerInterceptorAdapter,但是可以使用过滤器WebFilter。

@Component
@Slf4j
public class AuthFilter implements WebFilter {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {return chain.filter(exchange).contextWrite(ctx -> {log.info("设置context内容。");return  ctx.put("token", "xx");//注意这里一定要return ctx.put//put操作是产生一个新的context。如果是return ctx;会导致值没有设置进去});}}

按照上面的处理方式,在我们业务开发中就能取到上下文的信息了。

@PostMapping(name = "测试", value = "/save")public Mono<Test> save(@RequestBody Mono<Test> testMono) {return testService.add(testMono).log("", Level.INFO, true).flatMap(test -> {return Mono.contextWrite().map(ctx -> {log.info("context:" + ctx.get("token"));return test;});});}

如果想设置多个值可以像下面这样操作。

@Component
public class WebFluxFilter implements WebFilter {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {return chain.filter(exchange).contextWrite(ctx -> {return ctx.put("token", "123123123123");}).contextWrite(ctx -> {return ctx.put("liu", "hah");});}
}

十二、Reactor doOn系列函数

在Publisher使用subscribe()方法的时候,Subscriber触发回触发一系列的on方法,如onSubscribe();为了更好的监控以及观测异步序列的传递情况,设置了一系列的doOn方法,在触发onxxx方法的时候作为其副作\附加行为,用于监控行为的运行情况。

12.1 官网doOn方法解释

  • Without modifying the final sequence, I want to:

    • get notified of / execute additional behavior (sometimes referred to as “side-effects”) on:

      (也就是在不改变Flux或Mono的情况下,以下情况发生时想被通知/想执行额外的行为)

      • emissions: doOnNext (Flux|Mono)
      • completion: Flux#doOnComplete, Mono#doOnSuccess (includes the result, if any)
      • error termination: doOnError (Flux|Mono)
      • cancellation: doOnCancel (Flux|Mono)
      • “start” of the sequence: doFirst (Flux|Mono)
        • this is tied to Publisher#subscribe(Subscriber)
      • post-subscription : doOnSubscribe (Flux|Mono)
        • Subscription acknowledgment after subscribe
        • this is tied to Subscriber#onSubscribe(Subscription)
      • request: doOnRequest (Flux|Mono)
      • completion or error: doOnTerminate (Flux|Mono)
        • but after it has been propagated downstream: doAfterTerminate (Flux|Mono)
      • any type of signal, represented as a Signal: doOnEach (Flux|Mono)
      • any terminating condition (complete, error, cancel): doFinally (Flux|Mono)
    • log what happens internally: log (Flux|Mono)

  • I want to know of all events:

    • each represented as Signal object:
      • in a callback outside the sequence: doOnEach (Flux|Mono)
      • instead of the original onNext emissions: materialize (Flux|Mono)
        • …and get back to the onNexts: dematerialize (Flux|Mono)
    • as a line in a log: log (Flux|Mono)

12.1 常见doOn方法

  • doOnEach(): 对每一个元素对应的single对象进行监控
在这里插入图片描述
  • doOnSubscribe(): 用以监控onSubscribe()方法的执行

    /**
    Add behavior (side-effect) triggered when the Flux is subscribed.
    param onSubscribe the callback to call on onSubscribe
    */
    public final Flux<T> doOnSubscribe(Consumer<? super Subscription> onSubscribe) {Objects.requireNonNull(onSubscribe, "onSubscribe");return doOnSignal(this, onSubscribe, null, null, null, null, null, null);}
    
  • doOnRequest:对request行为监控产生副作用

  • doOnNext:onNext副作用

在这里插入图片描述
  • doOnError:出现error时的副作用,用于监控报错,可以通过错误类型进行筛选

  • doOnComplete:完成时触犯

  • doOnCancel:取消时触发

  • doOnTerminate:终止时触发,无论是成功还是出现异常

例子:

@Test
public void doOnWithMono () {
Mono.just("ffzs").map(String::toUpperCase).doOnSubscribe(subscription -> log.info("test do on subscribe")).doOnRequest(longNumber -> log.info("test do on request")).doOnNext(next -> log.info("test do on next1, value is {}", next)).map(String::toLowerCase).doOnNext(next -> log.info("test do on next2, value is {}", next)).doOnSuccess(success -> log.info("test do on success: {}", success)).subscribe();}@Test
public void doOnWithFlux () {Flux.range(1,10).map(i -> {if (i == 3) throw new RuntimeException("fake a mistake");else return String.valueOf(i);}).doOnError(error -> log.error("test do on error, error msg is: {}", error.getMessage())).doOnEach(info -> log.info("do on Each: {}", info.get())).doOnComplete(() -> log.info("test do on complete"))  // 因为error没有完成时不触发.doOnTerminate(() -> log.info("test do on terminate")) // 无论完成与否,只要终止就触发.subscribe();
}

12.2 log()

reactor提供了一个很便利的监控方法:log()。在编写publisher的时候加上log(),在subscriber调用的时候会将触发的每一个behavior以日志的形式打印出来:

@Test
public void logTest () {Flux.range(1,5).map(i -> {if (i == 3) throw new RuntimeException("fake a mistake");else return String.valueOf(i);}).onErrorContinue((e, val) -> log.error("error type is: {}, msg is : {}", e.getClass(), e.getMessage())).log().subscribe();
}

效果如下,日志内容很详细,线程使用,onNext,request这些都会标明

在这里插入图片描述

http://chatgpt.dhexx.cn/article/IGXxJfHC.shtml

相关文章

得到c++程序Process ID [getpid()], 调高CPU优先级 [renice]

1. 得到进程ID - pid #include <unistd.h> int pid (int)getpid();std::cout<<"pid: "<<getpid()<<"\n"; C 打印pid和tid_shuai_wow的博客-CSDN博客_c 打印pid获取pid1. 包含头文件#include <unistd.h>2. 在需要打印pid的…

RN(React Native)

RN是React的原生组件库 网址:React Native 中文网 使用React来编写原生应用的框架 一、ReactNative开发环境的搭建 在本机搭建RN项目开发运行环境 —— 3GB ①安装必需的编译软件&#xff1a;Node.js JDK ②安装Android原生App开发软件 —— Android Studio ③使用Android开…

优化Linux系统性能的必杀技:调整进程优先级!学会使用nice命令,让你的系统飞一般的顺畅!

文章目录 前言一. nice命令介绍1.1 nice的介绍1.2 cpu资源的调度1.3 nice是控制内核吗&#xff1f; 二. 语法格式及常用选项三. 参考案例3.1 将ls命令的优先级调整为最高3.2 将 find 命令的优先级调整为最低3.3 如何查看nice值 四. nice和renice的区别总结 前言 大家好&#x…

Linux nice及renice命令使用

Linux nice及renice命令使用 引言: linux内核是一个了不起的马戏表演者,它在进程和系统资源之间小心地玩着杂耍,并保持系统的正常运转。同时,内核也很公平,它将资源公平地分配给各个进程。但是,如果用户需要给一个重要进程提高优先级、或是降低优先级,该怎么办呢?答案是…

Linux进程监测和操控之top、ps、kill、nice、renice命令

文章目录 1、概述2、动态监测进程信息&#xff1a;top命令2.1、Linux系统的整体情况2.2、Linux进程的情况2.3、监控指定进程 3、静态查看进程信息&#xff1a;ps命令3.1、查看进程的资源占用3.2、查看父进程ID3.3、查看指定进程的内存占用 4、结束进程5、进程优先级5.1、启动程…

linux的nice 命令和renice命令

二者没有太大的区别&#xff0c;都是用于改变进程的优先级&#xff0c;所谓的优先级就是谁有更大的机会先执行&#xff0c;那么为什么要有nice这个命令呢&#xff1f;nice”是指“niceness”&#xff0c;即友善度&#xff0c;所以这个值越小的话&#xff0c;比如-10&#xff0c…

Linux 中的 renice 命令详解及C/C++代码实现

Linux 中的 nice 命令有助于执行具有修改的调度优先级的程序/进程。 它启动具有用户定义的调度优先级的进程。 如果我们给一个进程更高的优先级&#xff0c;那么内核会为这个进程分配更多的 CPU 时间。 而 renice 命令允许您更改和修改已运行进程的调度优先级。 什么是进程 …

Java之throw和throws的区别

throw和throws的区别 共同点区别总结 共同点 两者在抛出异常时&#xff0c;抛出异常的方法并不负责处理&#xff0c;顾名思义&#xff0c;只管抛出&#xff0c;由调用者负责处理。 区别 &#xff08;1&#xff09;throws用于方法头&#xff0c;表示的只是异常的申明&#xf…

Throw和Throws详解

Throw和Throws详解 ​ 1.throw是什么&#xff1f;有什么用&#xff1f; ​ throw是java中关于异常的一种操作&#xff0c;如果在try{}catch{}中使用了throw&#xff0c;就代表自己书写的这个方法&#xff0c;可以自己 处理异常了&#xff0c;就是抛出的一个动作&#xff0c;可…

Java异常:throw 和 throws的区别

throws&#xff1a;用来声明一个方法可能抛出和所有异常的信息&#xff0c;表现出现异常的可能性&#xff0c;但并不一定会发生异常。&#xff08;可以理解为为了预防异常的出现&#xff0c;而用throws提前声明一下&#xff09; throw&#xff1a;指抛出的一个具体的异常类型&…

Throws和Throw的区别?

在初学Java异常时&#xff0c;总是被throws和throw搞晕&#xff0c;今天来理一下它们之间的关系。 1、throws 方法名后面跟上 throws Exception 证明这个方法里的语句可能会发生异常&#xff0c;注意是可能&#xff01;在别处如果调用这个方法时&#xff0c;就必须也抛出异常…

再探java基础——throw与throws

再探java基础——throw与throws 以前虽然知道一些异常的处理&#xff0c;也用过一些&#xff0c;但是对throw和throws区别还是有不太清楚。今天用实例测试一下。 异常处理机制 异常处理是对可能出现的异常进行处理&#xff0c;以防止程序遇到异常时被卡死&#xff0c;处于一…

try catch 和throw以及throws的使用

转载&#xff1a;https://www.cnblogs.com/zhangdaicong/p/6127984.html try-catch和throw,throws的区别 java里的异常多种多样&#xff0c;这是一种非常有用的机制&#xff0c;它能帮助我们处理那些我们未知的错误&#xff0c;在java里&#xff0c;关于异常的有throw throws…

Java异常之throw,throws和Throwable

Java异常处理中有throw开头的三兄弟&#xff0c;分别是throw,throws以及Throwable&#xff0c;那么他们之间到底怎么区分呢&#xff0c;且听我慢慢道来。 Throwable Throwable是一个类&#xff0c;该类被异常类Exception以及错误类Error继承&#xff0c;其主要结构如下&#…

Java throws 使用

在开发中&#xff0c;如果去调用别人写的方法时&#xff0c;是否能知道别人写的方法是否会发生异常&#xff1f;这是很难判断的。针对这种情况&#xff0c;Java总允许在方法的后面使用throws关键字对外声明该方法有可能发生异常&#xff0c;这样调用者在调用方法时&#xff0c;…

【JavaSE】throws 异常处理

文章目录 1. 基本介绍2. 快速入门案例3. 注意事项和使用细节 1. 基本介绍 如果一个方法(中的语句执行时)可能生成某种异常&#xff0c;但是并不能确定如何处理这种异常&#xff0c;则此方法应显示地声明抛出异常&#xff0c;表明该方法将不对这些异常进行处理&#xff0c;而由…

throws异常

在程序开发过程中&#xff0c;程序员会尽量避免错误的发生&#xff0c;但是总会发生一些不可预测的事情&#xff0c;例如除法运算时除数为0 、内存不足、栈溢出等&#xff0c;这些就是异常&#xff0c;java语言提供了异常的处理机制&#xff0c;处理一些不可预期的事情。 使用t…

Elasticsearch之利用bulk提高写入性能(含源码分析)

什么是bulk操作 bulk是批量的意思&#xff0c;也就是把原来单个的操作打包好&#xff0c;通过批量的api提交到ES集群。下面是个示例&#xff1a; 单个操作&#xff1a; PUT my-index-000001/_doc/1 {"timestamp": "2099-11-15T13:12:00","message&…

.NET5必备工具——EF大数据批量处理----Bulk系列

之前做项目的时候&#xff0c;如果在大批量操作数据时,性能不好.因为在框架中使用了EntityFramework&#xff0c;于是就搜索到某著名插件,Z系列。 1.安装 然后在类里面添加引用之后,使用DbContext就可以直接点出来。 2.博客园有人做过对比试验 我们直接拿数据说话吧. EF传统…

初识ElasticSearch(5) -批量操作之bulk | 条件查询 | 其它查询

1. bulk&#xff1a;1.1. bulk语法&#xff1a;1.2. bulk行为-增删改&#xff1a; 2. bulk-index批量插入&#xff1a;3. bulk-update批量修改&#xff1a;4. bulk-delete批量删除&#xff1a;5. _update_by_query条件更新&#xff1a;6. _delete_by_query条件删除&#xff1a;…