RxJava和RxAndroid学习记录

article/2025/9/29 22:26:13

目录

1 概念和说明

1.1 响应式编程

1.2 RxJava

1.3 关于RxJava和RxAndroid

1.4 关于响应式编程和普通编程

2. 基本使用

2.1 基本元素关系图

2.2 代码示例:

2.3 关于subscribe()

2.4 线程调度

        2.4.1 线程调度

        2.4.2 RxJava内置的常用的线程项:

下一篇说操作符。


附:RxJava源码地址: https://github.com/ReactiveX/RxJava

1 概念和说明

1.1 响应式编程

        即rective programming, 是一种通过异步数据流来构建事物关系的编程模型,用事件来驱动事务

1.2 RxJava

        一个通过观察者模式实现响应式编程

优点:a. 代码体现为为链式调用,而非嵌套式。

           b. 方便的实现了线程调度数据转换(即中间事务的转换)

           c. android中常用来和retrofit配合

1.3 关于RxJava和RxAndroid

        两者差不多,RxAndroid是RxJava针对Android平台做了一些调整。可以理解为RxAndroid是专供 android的RxJava。

1.4 关于响应式编程和普通编程

        场景:有三个任务A、B、C,B的执行依赖于A的执行结果,同样,C的执行结果依赖于B,则两种编程的作废分别为:

        普通编程:先执行A,A执行完后执行B,B执行完执行C,即除了3个任务本身的执行过程外,3个任务之间的依赖执行逻辑也要由开发者编写。

        响应式编程:会将3个任务做依赖关系绑定:C 依赖 B 依赖 A,二这个依赖绑定的先后执行逻辑由语言或调用库来支持,开发者只需专注于3个任务本身的执行逻辑,而不必处理3个任务间的执行逻辑,只需要把它委托给语言或调用库即可。之后,B会自动响应A的执行结果,C会自动响应B的执行结果。

        总结:响应式编程优雅的处理了任务(业务或事务)间的依赖关系。

     

2. 基本使用

2.1 基本元素关系图

---->  observables 是被观察者、事件源,用来处理事件的派发。

---->  observer/subscriber 是观察者、订阅者,观察/订阅目标是observables,observables派发出来的事件由他处理。

        observer 和 subscriber 都是观察者/订阅者,后者是前者的扩展,内部增加了 onStart()方法:在时间发送之前订阅,用于做一些准备工作;还增加了 unSubscribe(),用于取消订阅。

---->  subscribe 是一个动作,将两者建立“订阅”关系,一旦订阅关系建立,observer/subscriber就可以立即接受、响应observables的变化。

2.2 代码示例:

        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {observableEmitter.onNext("emit A");observableEmitter.onNext("emit B");observableEmitter.onNext("emit C");observableEmitter.onComplete();}});Observer<String> observer = new Observer<String>() {@Overridepublic void onSubscribe(Disposable disposable) {System.out.println("onSubscribe");}@Overridepublic void onNext(String s) {System.out.println("onNext");System.out.println(s);}@Overridepublic void onError(Throwable throwable) {System.out.println("onError");}@Overridepublic void onComplete() {System.out.println("onComplete");}};observable.subscribe(observer);

执行结果:

        onSubscribe                                                                                                                               
        onNext                                                                                                                                       
        emit A                                                                                                                                         
        onNext                                                                                                                                       
        emit B                                                                                                                                        
        onNext                                                                                                                                       
        emit C                                                                                                                                        
        onComplete                                                                                                                               

---->  observer 的 onSubscribe(Disposable disposable)方法在执行订阅动作后立即执行回调,在这里可以用  disposable 取消订阅或者将该对象保存起来以后再取消订阅。

---->  oberver 的 onNext()在emitter.onNext()执行后回调执行,表示响应被观察者事件,onNext()方法会执行多次,表示有多个事件,具体执行次数由被观察者决定。

---->  observer 的 onComplete()在被观察者的 onComplete()执行后回调执行,表示事件全部完成,通常在这里做一些收尾工作。

---->  observer 的 onError()在被观察者执行过程中的任何位置出现异常时回调执行,表示事件执行异常。同 onComplete()互斥。

        通过以上说明可以发现:在 observer 中,除了 onSubscribe(Disposable d)以外,其他方法都和创建 observable 时发射器 observableEmitter 的方法同名,且都在同名方法执行后回调。比如,observableEmitter 执行一次 onNext(),则 observer 中的 onNext()方法执行一次。

        onComplete()和onError()互斥,两者智能触发其一(且其中一个触发后,onNext()变不会再执行):

a. onComplete()触发后,后续 observableEmitter 调用任何方法都不在生效;

b. onError()触发后,如果 observableEmitter 再调用 onComplete()或 onError(),RxJava会抛出异常,开发者需要自行保证唯一性。

        特别需要注意的是:ObservableOnSubscribe 的 subscribe()方法在每次有新的observer加入时,都会在 observer 的 onSubscribe()回调后触发,这就保证了所有的观察者都可以接收到事件。

2.3 关于subscribe()

        即订阅动作,他将观察者和被观察者建立了订阅关系,通过该方法,observable 回调 observer 的对应方法,从而达到 观察者相应被观察者发出的事件(实际上被观察者只负责产生事件,他是事件源,真正发送事件的是他在订阅时,即subscribe()被调用时)。

        有多个重载的方法:

    // 1.观察者对被观察者发送的任何事件都响应@SchedulerSupport("none")public final void subscribe(Observer<? super T> observer) {ObjectHelper.requireNonNull(observer, "observer is null");try {observer = RxJavaPlugins.onSubscribe(this, observer);ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");this.subscribeActual(observer);} catch (NullPointerException var4) {throw var4;} catch (Throwable var5) {Exceptions.throwIfFatal(var5);RxJavaPlugins.onError(var5);NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");npe.initCause(var5);throw npe;}}// 2.观察者对被观察者发出的事件作出响应(被观察者还可以继续发送事件)@SchedulerSupport("none")public final Disposable subscribe() {return this.subscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer());}// 3.观察者只对被观察者发出的onNext()事件作出响应    @SchedulerSupport("none")public final Disposable subscribe(Consumer<? super T> onNext) {return this.subscribe(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer());}// 4.观察者只对被观察者发出的onNext()、onError()事件作出响应  @SchedulerSupport("none")public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {return this.subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());}// 5.观察者只对被观察者发出的onNext()、onError()、onComplete()事件作出响应  @SchedulerSupport("none")public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {return this.subscribe(onNext, onError, onComplete, Functions.emptyConsumer());}// 6.观察者只对被观察者发出的onNext()、onError()、onComplete()、onSubscribe()事件作出响应@SchedulerSupport("none")public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {ObjectHelper.requireNonNull(onNext, "onNext is null");ObjectHelper.requireNonNull(onError, "onError is null");ObjectHelper.requireNonNull(onComplete, "onComplete is null");ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");LambdaObserver<T> ls = new LambdaObserver(onNext, onError, onComplete, onSubscribe);this.subscribe((Observer)ls);return ls;}

        其中,2、3、4、5都this到6,而6最后又调用了1。另外,上面增加了两个新类:ConsumerAction :observer 中的 onNext()、onComplete()、onError()被 Consumer.accept()代替;observer中的 onComplete()方法被 Action.run()方法代替。因此,如果只关心observer中的一个或几个回调方法,则可以通过使用 Consumer 或 Action 来替换observer注册到被观察者中。 比如:上面的示例代码,如果只关心onNext(),则可以写为:

        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {observableEmitter.onNext("emit A");observableEmitter.onNext("emit B");observableEmitter.onNext("emit C");observableEmitter.onComplete();}});Consumer<String> consumer = new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println(s);}};observable.subscribe(consumer);

2.4 线程调度

        2.4.1 线程调度

        默认情况下,被观察者和观察者处于同一线程中。而异步才是RxJava的核心,所有需要用到线程调用,其实现通过 Scheduler 来实现:

a. subscribeOn():指定被被观察者在subscribe所发生的线程(即指定事件源的线程)。

        多次指定发射事件的线程只有第一次指定有效,也就是多次调用只有第一次有效,之后的调用全被忽略。

b. observerOn():指定观察者/订阅者接受/响应事件的线程(即指定接受事件的线程)。

        多次指定接受事件的线程是可以的,每指定一次,线程就切换一次,所以以最终指定的为准,即多次指定,以最后一次为准。

        2.4.2 RxJava内置的常用的线程项:

a. Schedulers.io(): 代表执行io操作的线程,常用于网络请求、文件读写等io密集型操作的地方。行为模式和 new Thread 类似,只是其内部维护了一个无上限的线程池,更高效。

b. Schedulers.newThread():总是启用新线程,在新线程中执行当前事务。

c. Schedulers.complutation():代表cpu计算密集型的操作,既不会被io操作限制性能的操作,如图形计算等。内部维护了一个固定大小(大小为cpu的核数)的线程池。不要把io操作放到该线程项中,浪费cpu的资源。

d. AndroidSchedulers.mainThread():android的主线程,用于更新UI,为android独有。

下一篇说操作符


http://chatgpt.dhexx.cn/article/hbcYYB8P.shtml

相关文章

RxAndroid的基础使用

作为一个android开发者&#xff0c;在开发应用的过程中避免不了异步这个问题。android系统为我们提供了Handler这个类帮助我们进行线程间的通信和切换&#xff0c;但是GitHub上也有很多其他非常优秀的开源框架来帮助我们进行异步处理&#xff0c;比如今天学习的RxAndroid。 简…

rxandroid 基础知识

概述 在Android 中, 使用 rxandroid , rxandroid和rxJava的关系是,rxandroid包 依赖rxJava包,在其功能上增加了一些Android特有功能,项目中如果不需要指定rxJava包的版本,只需引入rxandroid包即可,如果需要更改 rxandroid包中默认的rxJava包版本 , 在项目中引入指定的rxJava包即…

RxAndroid使用初探;简洁、优雅、高效

引言 RxAndroid是一个开发库、是一种代码风格、也是一种思维方式。 正如标题所言,RxAndroid的特点是简洁、优雅、高效,它的优点是多线程切换简单、数据变换容易、代码简洁可读性好、第三方支持丰富易于开发;缺点是学习成本较高、出错难以排查。 用途与优势 起源 RxAndroid…

RxAndroid的学习和研究

1.什么是RxAndroid RxAndroid的含义为响应式编程&#xff0c;Rx含义是响应式编程&#xff0c;其本质就是观察者模式&#xff0c;以观察者&#xff08;Observer&#xff09;和订阅者&#xff08;Subscriber&#xff09;为基础的异步响应方式。    Observables发出一系列事件&a…

linux基本功系列之dd命令实战

文章目录 前言&#x1f680;&#x1f680;&#x1f680;一. dd 命令介绍二. 语法格式及常用选项三. 参考案例3.1 创建指定大小的文件3.2 清空磁盘数据3.3 给磁盘做备份还原3.4 把光盘拷贝到root下3.5 内存不足的处理方法 四. 文中出现的概念解释swapon命令介绍4.2 /dev/zero 介…

Linux系统中dd命令用法详解

命令介绍&#xff1a; Linux dd 命令用于读取、转换并输出数据。dd 可从标准输入或文件中读取数据&#xff0c;根据指定的格式来转换数据&#xff0c;再输出到文件、设备或标准输出。 参数介绍 if 代表输入文件。如果不指定 if&#xff0c;默认就会从 stdin 中读取输入。of …

dd 命令详解

dd命令是Linux/Unix下的一个很常见的文件拷贝工具。 我们先列下dd命名的常用的参数&#xff0c;再详细分析&#xff1a; bsBYTES read and write up to BYTES bytes at a time cbsBYTES convert BYTES bytes at a time convCONVS convert the file as pe…

dd命令使用总结

dd命令介绍 dd是Linux下一个非常有用的命令&#xff0c;该命令用于读取、转换并输出数据&#xff1b;dd命令在Android shell下也支持使用。 语法格式&#xff1a; dd [option]dd指令选项详解 iffile&#xff1a;输入文件名&#xff0c;缺省为标准输入 offile&#xff1a;输…

dd命令相关整理

对于一个软件测试人员而言&#xff0c;工作开展前就是准备自己的测试环境&#xff0c;那么重装系统就是首当其冲的一个必备技能。最近因为手边工作环境没有windows的系统&#xff0c;所以没有条件利用软碟通这类刻录软件直接刻录启动盘。被逼无奈之下用命令来刻录&#xff0c;整…

Linux:shell 脚本 自动解压压缩文件tar.gz到指定目录

具体情境 Ubuntu16.04系统&#xff0c;将.tar.gz格式的文件从/home/myftp/upload/nuodongiot目录自动解压到/home/myftp/upload/backupcopy目录中&#xff0c;并将源目录/home/myftp/upload/nuodongiot中的文件移动至/home/myftp/upload/extarct目录中 该过程进行单个文件进行…

tar解压文件至指定目录,不包含原目录

1、tar解压文件至指定目录&#xff0c;不包含原目录 要解压的压缩包原目录结构如下 tar -zxf log.tar.gz --strip-components 1 -C /opt/new_test注&#xff1a; --strip-components 1 解压至下一级目录&#xff0c;若为2则解压至下下级目录 2、压缩只指定的目录&#xff0c…

linux gz解压 指定目,linux解压tar.gz到指定文件夹或目录

1. 前言 本文主要讲解如何解压tar.gz到指定文件夹或目录,tar是Linux系统上的一种打包与压缩工具。 2. linux解压tar文件使用案例 Linux下使用tar命令把当前目录下的zcwyou.tar.gz解压到指定的目录/123/abc/ ,前提要保证存在/123/abc/这个目录。 [root@zcwyou ~]# tar -zxvf zc…

Linux tar 命令 将归档内指定文件解压到指定目录

首先介绍一下 tar 命令&#xff1a; 用途&#xff1a;打包文件&#xff08;制作归档文件&#xff09;、释放归档文件 格式&#xff1a; tar [选项]... 归档文件名 源文件或目录 tar [选项]... 归档文件名 [-C 目标目录] 常用命令选项&#xff1a; -c 创建 .tar 格式…

20191004在LINUX下如何将tar压缩文件解压到指定的目录下

百度搜索&#xff1a;tar 解压缩到指定目录 https://zhidao.baidu.com/question/9844116.html 在LINUX下如何将tar压缩文件解压到指定的目录下 各位&#xff0c;请教一下在LINUX下如何将tar压缩文件解压到指定的目录下&#xff0c;直接用tar xvf 解压出来的是放在当前目录的&am…

关于linux打包以及解压到指定目录的简单操作demo

1.打包到指定目录 命令:tar zcvf /root/test99/a.tar.gz a.txt 1.1打包到当前目录 命令:tar -zcvf a.tar.gz a.txt 2.解压到指定目录 命令: tar -zxvf a.tar.gz -C /root/test99 2.2解压到当前目录 命令:tar -zxvf a.tar.gz 打zip包: 方法如下&#xf…

linux tar解压文件至指定目录,不包含原目录

1、tar解压文件至指定目录&#xff0c;不包含原目录 要解压的压缩包原目录结构如下 通过 --strip-components 1 参数 解压到指定目录或当前目录&#xff08;不含打包前原目录&#xff09; tar zxf log.tar.gz --strip-components 1 -C /opt/new_test注&#xff1a; --strip-co…

Linux拓展之产生随机数

在 Linux 中可以通过内置变量 RANDOM 来产生随机数&#xff0c;该变量会产生一个 [0, 32767] 范围内的随机整数。如下&#xff1a; echo $RANDOM如果要产生 [0-10] 之内的随机整数&#xff1a;echo $(( $RANDOM % 10 )) 如果要产生 [1-10] 之内的随机整数&#xff1a;echo $((…

Linux生成随机数

生成随机数的方法有7种 1.通过时间获取随机数 1&#xff09;date %s &#xff08;随机生成10位数字&#xff09; 用于获得时间戳。 如果用它做随机数&#xff0c;相同一秒的数据是一样的。在做循环处理&#xff0c;多线程里面基本不能满足要求了。 2&#xff09;date…

Linux时间子系统之定时事件层(Clock Events)

几乎所有的计算机系统中都会存在一个所谓的定时设备&#xff0c;经过设置后&#xff0c;在某个固定的时间或某个相对的时间间隔后&#xff0c;达到触发条件&#xff0c;发送中断给处理器。 系统中的每一种实际的定时事件设备都由一个叫做clock_event_device的结构体变量表示&a…

Linux随机数发生器

Linux随机数发生器 日期&#xff1a;2017-11-29 01:42:10 星期三 Linux随机数发生器 一、源代码的基本情况 Linux内核版本涉及文件功能概述 二、外部访问接口 内核层输出接口用户层输出接口环境噪音输入接口 三、核心源码分析 随机数发生器理论熵池结构熵的加入随机数的生成启动…