Rxjava3 RxAndroid

article/2025/9/29 22:27:59

文章目录

  • Rxjava && Rxandroid
    • 引用方式
  • 概念
  • 流程图
  • 代码示例
  • Observable
    • Observable#subscribeOn(@NonNull Scheduler scheduler)
    • Observable#observeOn(@NonNull Scheduler scheduler)
    • ObservableSubscribeOn
    • ObservableObserveOn
    • 本文开头的代码示例等同于如下代码
  • Schedulers调度器
    • internal.schedulers.ScheduledRunnable
  • IoScheduler
    • IoScheduler.ThreadWorker
    • IoScheduler.CachedWorkerPool
    • IoScheduler.EventLoopWorker
  • ComputationScheduler
  • AndroidSchedulers 和 HandlerScheduler
  • Flowable & Backpressure背压
  • Operators 操作符
    • 创建操作符类型
    • 变换操作
    • 过滤操作
    • 布尔变量操作
    • 算术操作
    • To系列
  • 其他
    • Java库
  • 参考资料

Rxjava && Rxandroid

引用方式

implementation "io.reactivex.rxjava3:rxjava:3.1.1"
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'

概念

  • Observable是被观察者,也是生产者,也是source类型。Observer是观察者,也是消费者。
  • 被观察者、生产者、source、上游、upstream 是一类概念。在subscribeOnobserveOn经常会看到source成员变量。一般是指的ObservableSource<T>接口类型,ObservableSource实现了该接口。
  • 观察者、消费者、下游、downstream 是一类概念。

流程图

请添加图片描述

代码示例

  • 代码功能:被观察者(生产者、源端)使用 Schedulers.io()线程池执行,观察者(消费者)用Android主线程AndroidSchedulers.mainThread()执行。处理String类型。
  • 在步骤 0 Observable#subscribe(Observer<? super T>) 函数执行之后,会引发后续的步骤 1, 2, 3, 4, 5执行。
Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {emitter.onNext("emitter string 1"); // 步骤 3emitter.onNext("emitter string 2");emitter.onNext("emitter string 3");emitter.onComplete(); // 步骤 4}
}).subscribeOn(Schedulers.io()) // 生产者使用的线程池,多次调用该函数时,仅第一次设置有效。.observeOn(AndroidSchedulers.mainThread()) // 消费者使用线程池,每次调用,都会切换后续Observer的响应线程。.subscribe(new Observer<String>() { // 步骤 1,subscribe 触发注册动作@Override// Disposable参数实际类型 ObservableObserveOn.ObserveOnObserverpublic void onSubscribe(@NonNull Disposable d) { // 步骤 2// 在步骤 3 subscribe之前执行Log.i(TAG, "onSubscribe " + d.getClass());// d.dispose(); // 这句话可以终止后续所有步骤。}@Overridepublic void onNext(@NonNull String s) {Log.i(TAG, "onNext " + s);}@Overridepublic void onError(@NonNull Throwable e) {Log.i(TAG, "onError " + e);e.printStackTrace();}@Overridepublic void onComplete() { // 步骤 5Log.i(TAG, "onComplete");}});

Observable

  • 关键入口类。
  • 关键入口方法: Observable#create(@NonNull ObservableOnSubscribe<T> source),生成一个Observable<T>对象。实际类型是ObservableCreate<T>类型。
  • 唯一实现的接口是:ObservableSource

Observable#subscribeOn(@NonNull Scheduler scheduler)

  • subscribeOn 决定被观察者的线程环境。Task任务发生、执行的线程环境。
  • subscribeOn如果连续两次调用该函数,仅仅第一次调用配置生效。 因为被观察者(生产者)是从下游向上游触发,代码的第一次配置subscribeOn,实际对应触发流程的最后一个步骤,因此也只有第一次配置生效。
  • 每次调用,该函数都会生成一个Observable<T>对象。实际类型是ObservableSubscribeOn<T>ObservableSubscribeOn<T>持有source成员变量,source的类型是 ObservableSource<T>,实际类型是Observable

Observable#observeOn(@NonNull Scheduler scheduler)

  • observeOn 决定观察者的线程环境。通过哪个线程通知到观察者Observer
  • observeOn在调用链中的每一次调用,都会导致后续的observer操作切换线程。观察者(消费者)是由被观察者(生产者)从上游向下游触发,每一次observeOn都会导致一次线程切换。
  • 每次调用,该函数都会生成一个Observable<T>对象。实际类型是ObservableObserveOn<T>ObservableObserveOn<T>持有source成员变量,source的类型是 ObservableSource<T>,实际类型是Observable

ObservableSubscribeOn

  • 含有一个内部Observer类型,内部类:ObservableSubscribeOn.SubscribeOnObserver<T>。用于注册subscribe上游的观察者对象
  • 第一处注释实现了生产者执行线程的切换。
  • 第二处注释,通过subscribe函数,注册上游观察者对象。
源码: ObservableSubscribeOn#subscribeActual
@Override
public void subscribeActual(final Observer<? super T> observer) {final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer); // 实例化内部观察者observer.onSubscribe(parent);parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); // 注释1,scheduler切换生产者的线程。
}
源码: ObservableSubscribeOn.SubscribeTask
final class SubscribeTask implements Runnable {private final SubscribeOnObserver<T> parent;SubscribeTask(SubscribeOnObserver<T> parent) {this.parent = parent;}@Overridepublic void run() {source.subscribe(parent); // 第二处注释,子线程中,注册上游观察者对象}
}

ObservableObserveOn

  • 含有一个内部Observer类型,内部类:ObservableObserveOn.ObserveOnObserver<T>
  • 第一处注释,生成了消费者的子线程worker
  • 第二处注释,通过subscribe函数,注册上游观察者对象。
源码: ObservableObserveOn#subscribeActual
@Override
protected void subscribeActual(Observer<? super T> observer) {if (scheduler instanceof TrampolineScheduler) {source.subscribe(observer);} else {Scheduler.Worker w = scheduler.createWorker(); //注释一,生成子线程workersource.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize)); // 注释二}
}

本文开头的代码示例等同于如下代码

ObservableCreate<String> observableCreate = (ObservableCreate<String>) Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {emitter.onNext("emitter string 1"); // 步骤 3emitter.onNext("emitter string 2");emitter.onNext("emitter string 3");emitter.onComplete(); // 步骤 4}
});
ObservableSubscribeOn<String> subscribeOn = (ObservableSubscribeOn<String>) observableCreate.subscribeOn(Schedulers.io());
ObservableObserveOn<String> observeOn = (ObservableObserveOn<String>) subscribeOn.observeOn(AndroidSchedulers.mainThread());
observeOn.subscribe(new Observer<String>() {  // 步骤 1,subscribe 触发注册动作@Overridepublic void onSubscribe(@NonNull Disposable d) { // 步骤 2// 在步骤 3 subscribe之前执行Log.i(TAG, "onSubscribe " + d.getClass());// d.dispose(); // 这句话可以终止后续所有步骤。}// 省略 onNext 和 onError函数 ......@Overridepublic void onComplete() { // 步骤 5Log.i(TAG, "onComplete");}
});

Schedulers调度器

Scheduler.io() // 非CPU密集型的IO工作,例如网络请求、文件系统IO,数据库等
Scheduler.computation() // CPU运算密集型
Scheduler.trampoline()
Scheduler.newThread()
Scheduler.single()
Scheduler.from(executor)
AndroidSchedulers.mainThread() // Android主线程

internal.schedulers.ScheduledRunnable

  • 实现了java.util.concurrent.Callable接口,用于线程池执行。实现了disposables.Disposable接口,用于执行中取消Futrue

IoScheduler

  • 默认的IO调度器。默认使用的是调度线程池ScheduledThreadPoolExecutor。参考函数: SchedulerPoolFactory#create
  • Runnable对象最终是在ThreadWorker中对应的线程池中执行的。
  • CachedWorkerPool作为对象池,缓存ThreadWorker

IoScheduler.ThreadWorker

  • 单线程的,定时的,顺序执行任务的worker。一个worker可以执行很多的task。
  • 继承自 NewThreadWorker。每一个ThreadWorker都对应一个ScheduledThreadPool调度线程池。
  • ScheduledThreadPool该线程池的核心线程数为1(且不会增长),所有事件都会被抛到DelayedWorkQueue等待调度执行。

IoScheduler.CachedWorkerPool

  • ThreadWorker的对象池。
  • allWorkers正在运行的worker。allWorkers是CompositeDisposable类型。CompositeDisposable单词的意思是Disposable容器,这个类是一个用OpenHashSet存储的容器。
  • IoScheduler.CachedWorkerPool#get()方法会线程尝试复用expiringWorkerQueue队列中的对象。否则,创建一个新对象。
  • expiringWorkerQueue过期的ThreadWorker对象,可以复用。通过IoScheduler.CachedWorkerPool#release函数回收到expiringWorkerQueue队列中。被回收到expiringWorkerQueue 60秒之后,ThreadWorker会被彻底过期,而被移除。移除函数是CachedWorkerPool#evictExpiredWorkers
  • evictExpiredWorkers:allWorkers是CompositeDisposable类型,remove函数内部调用dispose停止线程池。
static void evictExpiredWorkers(ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue, CompositeDisposable allWorkers) {if (!expiringWorkerQueue.isEmpty()) {long currentTimestamp = now();for (ThreadWorker threadWorker : expiringWorkerQueue) {if (threadWorker.getExpirationTime() <= currentTimestamp) {if (expiringWorkerQueue.remove(threadWorker)) {// 注释1: 此处移除了threadWorker, allWorkers是CompositeDisposable类型,内部调用dispose停止线程池allWorkers.remove(threadWorker);}} else {// Queue is ordered with the worker that will expire first in the beginning, so when we// find a non-expired worker we can stop evicting.break;}}}
}

IoScheduler.EventLoopWorker

  • IoScheduler.EventLoopWorker封装了一层 ThreadWorker。IoScheduler#createWorker创建的是该对象。此处是类似于代理模式或者装饰器模式。
  • EventLoopWorker设计的目的是调用disposable函数销毁时,不同时销毁内部的ThreadWorker对象。
  • 经由EventLoopWorker执行的Runnable任务都会被存储到IoScheduler.EventLoopWorker#tasks容器中。EventLoopWorker销毁时,经由它创建的任务也一并销毁。
  • 强烈建议参考对PoolWorkerEventLoopWorker的解释。https://juejin.cn/post/6844903657792634894

ComputationScheduler

  • PoolWorker继承自 NewThreadWorker。每个PoolWorker对应一个ScheduledThreadPool调度线程池。
  • ComputationScheduler.FixedSchedulerPool是一个scheduler对象池,包含有固定数量(内核CPU数)的PoolWorker。成员变量FixedSchedulerPool#cores代表了当前线程池的大小。
io.reactivex.rxjava3.internal.schedulers.ComputationScheduler.FixedSchedulerPool#FixedSchedulerPool
final int cores; // PoolWorker数量
final PoolWorker[] eventLoops; // 数组
long n; // 内部自增的计数变量
// 构造函数
FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {// initialize event loopsthis.cores = maxThreads;this.eventLoops = new PoolWorker[maxThreads]; // 固定数量的PoolWorkerfor (int i = 0; i < maxThreads; i++) {this.eventLoops[i] = new PoolWorker(threadFactory);}
}
  • 在生成PoolWorker时,会直接从eventLoops数组获取,从0依次累加。计数变量是n。数组下标通过取余数获得。
  • 根据上面的取余数的设计,一个PoolWorker可能会对应多个EventLoopWorker对象。代理模式,每个EventLoopWorker通过disposable销毁时,只销毁自己创建的任务,不影响其他的EventLoopWorker
io.reactivex.rxjava3.internal.schedulers.ComputationScheduler.FixedSchedulerPool#getEventLoop
public PoolWorker getEventLoop() {int c = cores;if (c == 0) {return SHUTDOWN_WORKER;}// simple round robin, improvements to comereturn eventLoops[(int)(n++ % c)]; // n 是一个自增的计数,对cores取余数,获取数组中的PoolWorker。}

AndroidSchedulers 和 HandlerScheduler

  • 实际就是一个采用MainLooper的Handler类,向主线程发送Runnable任务执行。
  • 可以通过AndroidSchedulers#from(Looper looper, boolean async) 定义自己的Looper线程。

Flowable & Backpressure背压

  • 当生产者事件的发送速度远大于消费者处理的速度时,事件就会在线程池的事件调度队列中积压。当积压足够多时,会造成OOM。描述这种情况,称之为Backpressure。
  • BackpressureStrategy是一种控制生产者事件发送速度的策略。默认的队列大小是128。几种策略可供选择MISSINGERRORBUFFERDROPLATEST
  • org.reactivestreams.Subscription#request(long n)请求N个数据。
  • 参考文章,讲述的比较清晰 https://juejin.cn/post/6844903454067032071
  • 通过request请求控制发送事件数量的代码示例。
Flowable.just(1, 2, 3, 4).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {s.request(2); // 背压模式,请求N个数据}@Overridepublic void onNext(Integer integer) {Log.i(TAG, "onNext " + integer);}@Overridepublic void onError(Throwable t) {}@Overridepublic void onComplete() {Log.i(TAG, "onComplete ");}});

Operators 操作符

  • 操作符分为几类,常见的有创建、变换、过滤、组合、算术和聚合、转换操作等等。

创建操作符类型

  • create常用的创建类型操作符。例如:Observable#create(ObservableOnSubscribe<T> source),Flowable#create(FlowableOnSubscribe<T> source, BackpressureStrategy mode)
  • from支持将 Iterable、数组、Future 转换为Observable。例如:fromArrayfromFuturefromIterable
  • just支持将一些元素,依次发送。
  • interval按照固定的时间间隔发送数据。interval默认在computation调度器上执行。

变换操作

  • map。对每一个数据进行变换操作。
map(@NonNull Function<? super T, ? extends R> mapper)
  • flatMap一个 Obervable 转换成多个 Observable进行发射。flat的含义是水平的,平铺的含义。flatMap输出是无序的。内部采用merge动作合并
public final <@NonNull R> Observable<R> flatMap(@NonNull Function<? super T, ? extends ObservableSource<? extends R>> mapper)
  • concatMap操作符的功能和flatMap是非常相似的,区别是:concatMap 输出和原始数据序列一致。concatMap输出是有序的。内部采用concat动作合并
public final <@NonNull R> Observable<R> concatMap(@NonNull Function<? super T, ? extends ObservableSource<? extends R>> mapper)

过滤操作

  • filter过滤符合条件的类型。
  • ofType过滤指定的class类型。ofType(@NonNull Class<U> clazz)
  • distinct去重操作。通过HashSet实现的去重功能
  • take(long count)takeLast(long count)take是获取前count个数据。 takeLast是获取后count个数据

布尔变量操作

  • all所有的数据都符合某个条件,返回True。
  • contains包含某个数据,返回True。
  • isEmpty没有数据要发送,返回True。

算术操作

  • AverageConcat
  • MaxMinCountSum

To系列

  • Observable#toFlowable(@NonNull BackpressureStrategy strategy)转换为Flowable。Flowable#toObservable()转换为Observable。
  • toMaptoListtoIterable转换为map、list、Iterable。

其他

  • plugins.RxJavaPlugins 插件类,用于自定义的一些接口。默认情况下,没有任何定制插件Function对象,会直接返回入参。
  • Disposable 表示可支持取消的,一次性的执行任务。扩展出了很多的子类实现。在和线程池使用是,主要是包装了Futrue<T>类型,取消动作通过Futrue<T>实现。

Java库

  • java.util.concurrent.ScheduledThreadPoolExecutor 任务定时执行线程池
  • java.util.concurrent.ThreadFactory 线程Fractory接口
  • java.util.concurrent.atomic.AtomicReference 原子化对象类
  • java.util.concurrent.atomic.AtomicReferenceArray 原子化数组类

参考资料

  • https://www.jianshu.com/p/a3f2c3ee00a3

http://chatgpt.dhexx.cn/article/HnjdOjHU.shtml

相关文章

RxJava和RxAndroid学习记录

目录 1 概念和说明 1.1 响应式编程 1.2 RxJava 1.3 关于RxJava和RxAndroid 1.4 关于响应式编程和普通编程 2. 基本使用 2.1 基本元素关系图 2.2 代码示例&#xff1a; 2.3 关于subscribe&#xff08;&#xff09; 2.4 线程调度 2.4.1 线程调度 2.4.2 RxJava内置的常用…

RxAndroid的基础使用

作为一个android开发者&#xff0c;在开发应用的过程中避免不了异步这个问题。android系统为我们提供了Handler这个类帮助我们进行线程间的通信和切换&#xff0c;但是GitHub上也有很多其他非常优秀的开源框架来帮助我们进行异步处理&#xff0c;比如今天学习的RxAndroid。 简…

rxandroid 基础知识

概述 在Android 中, 使用 rxandroid , rxandroid和rxJava的关系是,rxandroid包 依赖rxJava包,在其功能上增加了一些Android特有功能,项目中如果不需要指定rxJava包的版本,只需引入rxandroid包即可,如果需要更改 rxandroid包中默认的rxJava包版本 , 在项目中引入指定的rxJava包即…

RxAndroid使用初探;简洁、优雅、高效

引言 RxAndroid是一个开发库、是一种代码风格、也是一种思维方式。 正如标题所言,RxAndroid的特点是简洁、优雅、高效,它的优点是多线程切换简单、数据变换容易、代码简洁可读性好、第三方支持丰富易于开发;缺点是学习成本较高、出错难以排查。 用途与优势 起源 RxAndroid…

RxAndroid的学习和研究

1.什么是RxAndroid RxAndroid的含义为响应式编程&#xff0c;Rx含义是响应式编程&#xff0c;其本质就是观察者模式&#xff0c;以观察者&#xff08;Observer&#xff09;和订阅者&#xff08;Subscriber&#xff09;为基础的异步响应方式。    Observables发出一系列事件&a…

linux基本功系列之dd命令实战

文章目录 前言&#x1f680;&#x1f680;&#x1f680;一. dd 命令介绍二. 语法格式及常用选项三. 参考案例3.1 创建指定大小的文件3.2 清空磁盘数据3.3 给磁盘做备份还原3.4 把光盘拷贝到root下3.5 内存不足的处理方法 四. 文中出现的概念解释swapon命令介绍4.2 /dev/zero 介…

Linux系统中dd命令用法详解

命令介绍&#xff1a; Linux dd 命令用于读取、转换并输出数据。dd 可从标准输入或文件中读取数据&#xff0c;根据指定的格式来转换数据&#xff0c;再输出到文件、设备或标准输出。 参数介绍 if 代表输入文件。如果不指定 if&#xff0c;默认就会从 stdin 中读取输入。of …

dd 命令详解

dd命令是Linux/Unix下的一个很常见的文件拷贝工具。 我们先列下dd命名的常用的参数&#xff0c;再详细分析&#xff1a; bsBYTES read and write up to BYTES bytes at a time cbsBYTES convert BYTES bytes at a time convCONVS convert the file as pe…

dd命令使用总结

dd命令介绍 dd是Linux下一个非常有用的命令&#xff0c;该命令用于读取、转换并输出数据&#xff1b;dd命令在Android shell下也支持使用。 语法格式&#xff1a; dd [option]dd指令选项详解 iffile&#xff1a;输入文件名&#xff0c;缺省为标准输入 offile&#xff1a;输…

dd命令相关整理

对于一个软件测试人员而言&#xff0c;工作开展前就是准备自己的测试环境&#xff0c;那么重装系统就是首当其冲的一个必备技能。最近因为手边工作环境没有windows的系统&#xff0c;所以没有条件利用软碟通这类刻录软件直接刻录启动盘。被逼无奈之下用命令来刻录&#xff0c;整…

Linux:shell 脚本 自动解压压缩文件tar.gz到指定目录

具体情境 Ubuntu16.04系统&#xff0c;将.tar.gz格式的文件从/home/myftp/upload/nuodongiot目录自动解压到/home/myftp/upload/backupcopy目录中&#xff0c;并将源目录/home/myftp/upload/nuodongiot中的文件移动至/home/myftp/upload/extarct目录中 该过程进行单个文件进行…

tar解压文件至指定目录,不包含原目录

1、tar解压文件至指定目录&#xff0c;不包含原目录 要解压的压缩包原目录结构如下 tar -zxf log.tar.gz --strip-components 1 -C /opt/new_test注&#xff1a; --strip-components 1 解压至下一级目录&#xff0c;若为2则解压至下下级目录 2、压缩只指定的目录&#xff0c…

linux gz解压 指定目,linux解压tar.gz到指定文件夹或目录

1. 前言 本文主要讲解如何解压tar.gz到指定文件夹或目录,tar是Linux系统上的一种打包与压缩工具。 2. linux解压tar文件使用案例 Linux下使用tar命令把当前目录下的zcwyou.tar.gz解压到指定的目录/123/abc/ ,前提要保证存在/123/abc/这个目录。 [root@zcwyou ~]# tar -zxvf zc…

Linux tar 命令 将归档内指定文件解压到指定目录

首先介绍一下 tar 命令&#xff1a; 用途&#xff1a;打包文件&#xff08;制作归档文件&#xff09;、释放归档文件 格式&#xff1a; tar [选项]... 归档文件名 源文件或目录 tar [选项]... 归档文件名 [-C 目标目录] 常用命令选项&#xff1a; -c 创建 .tar 格式…

20191004在LINUX下如何将tar压缩文件解压到指定的目录下

百度搜索&#xff1a;tar 解压缩到指定目录 https://zhidao.baidu.com/question/9844116.html 在LINUX下如何将tar压缩文件解压到指定的目录下 各位&#xff0c;请教一下在LINUX下如何将tar压缩文件解压到指定的目录下&#xff0c;直接用tar xvf 解压出来的是放在当前目录的&am…

关于linux打包以及解压到指定目录的简单操作demo

1.打包到指定目录 命令:tar zcvf /root/test99/a.tar.gz a.txt 1.1打包到当前目录 命令:tar -zcvf a.tar.gz a.txt 2.解压到指定目录 命令: tar -zxvf a.tar.gz -C /root/test99 2.2解压到当前目录 命令:tar -zxvf a.tar.gz 打zip包: 方法如下&#xf…

linux tar解压文件至指定目录,不包含原目录

1、tar解压文件至指定目录&#xff0c;不包含原目录 要解压的压缩包原目录结构如下 通过 --strip-components 1 参数 解压到指定目录或当前目录&#xff08;不含打包前原目录&#xff09; tar zxf log.tar.gz --strip-components 1 -C /opt/new_test注&#xff1a; --strip-co…

Linux拓展之产生随机数

在 Linux 中可以通过内置变量 RANDOM 来产生随机数&#xff0c;该变量会产生一个 [0, 32767] 范围内的随机整数。如下&#xff1a; echo $RANDOM如果要产生 [0-10] 之内的随机整数&#xff1a;echo $(( $RANDOM % 10 )) 如果要产生 [1-10] 之内的随机整数&#xff1a;echo $((…

Linux生成随机数

生成随机数的方法有7种 1.通过时间获取随机数 1&#xff09;date %s &#xff08;随机生成10位数字&#xff09; 用于获得时间戳。 如果用它做随机数&#xff0c;相同一秒的数据是一样的。在做循环处理&#xff0c;多线程里面基本不能满足要求了。 2&#xff09;date…

Linux时间子系统之定时事件层(Clock Events)

几乎所有的计算机系统中都会存在一个所谓的定时设备&#xff0c;经过设置后&#xff0c;在某个固定的时间或某个相对的时间间隔后&#xff0c;达到触发条件&#xff0c;发送中断给处理器。 系统中的每一种实际的定时事件设备都由一个叫做clock_event_device的结构体变量表示&a…