ReactiveX简介

article/2025/10/23 12:08:13

一、ReactiveX简介

在学习RxJava前首先需要了解ReactiveX,因为RxJava是ReactiveX的一种Java的实现形式

  • ReactiveX的官网地址为:
 http://reactivex.io/

ReactiveX官网对于自身的介绍是:

An API for asynchronous programming with observable streams

实质上我们可以对其解读为三部分:

ReactiveX的解读
①An API: 首先它是个编程接口,不同语言提供不同实现。例如Java中的RxJava。
②For asynchronous programming: 在异步编程设计中使用。例如开启子线程处理耗时的网络请求。
③With observable streams: 基于可观察的事件流。例如观察者模式中观察者对被观察者的监听。

而ReactiveX结合了如下三部分内容:

  1. 观察者模式,即定义对象间一种一对多的依赖关系,当一个对象改变状态时,则所有依赖它的对象都会被改变。
  2. Iterator模式,即迭代流式编程模式。
  3. 函数式编程模式,即提供一系列函数样式的方法供快速开发。

Reactive的模式图如下:

图1.1 ReactiveX的模式图

二、RxJava的使用

1、RxJava的优势

在Android的SDK中,给开发者提供的用于异步操作的原生内容有AsyncTask和Handler。对于简单的异步请求来说,使用Android原生的AsyncTask和Handler即可满足需求,但是对于复杂的业务逻辑而言,依然使用AsyncTask和Handler会导致代码结构混乱,代码的可读性非常差。
但是RxJava的异步操作是基于观察者模式实现的,在越来越复杂的业务逻辑中,RxJava依旧可以保持简洁

2、RxJava的配置

首先,在Android Studio中配置Module的build.gradle,在这里我们使用的版本是1.2版本,并且导入RxAndroid,辅助RxJava完成线程调度:

        implementation "io.reactivex:rxjava:1.2.0"implementation "io.reactivex:rxandroid:1.2.0"

然后,RxJava基于观察者设计模式,其中的关键性三个步骤如下:

(1)Observable被观察者

Observable被观察者创建的代码如下:

        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("Alex");subscriber.onCompleted();}});

 

 

在这里,要强调的是Observable被观察者是类类型,其中有诸多方法,我们关注其构造函数与创建Observable对象的方法,查看如下图对应的视图结构:

图2.2.1 Observable被观察者对象的视图结构

查看源码:

        protected Observable(OnSubscribe<T> f) {this.onSubscribe = f;}public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {}
        public static <T> Observable<T> create(OnSubscribe<T> f) {return new Observable<T>(RxJavaHooks.onCreate(f));}public static <S, T> Observable<T> create(SyncOnSubscribe<S, T> syncOnSubscribe) {return create((OnSubscribe<T>)syncOnSubscribe);}public static <S, T> Observable<T> create(AsyncOnSubscribe<S, T> asyncOnSubscribe) {return create((OnSubscribe<T>)asyncOnSubscribe);}

通过源码分析,可知Observable提供了create()方法来获取Observable实例对象。
此外,除了基本的创建的方法,Observable还提供了便捷的创建Observable序列的两种方式,代码如下:

  • 第一种,会将参数逐个发送
        Observable<String> observable1 = Observable.just("Alex","Payne");
  • 第二种,会将数组元素逐个转换完毕后逐个发送
        String[] observableArr = {"Alex", "Payne"};Observable<String> observable2 = Observable.from(observableArr);

其中Observable.just()方法会调用from()方法,详情可查看源码。

(2)Observer观察者

Observer观察者创建的代码如下:

        Observer<String> observer = new Observer<String>() {@Overridepublic void onCompleted() {Log.e(TAG, "onCompleted");}@Overridepublic void onError(Throwable e) {Log.e(TAG, "onError,Error Info is:" + e.getMessage());}@Overridepublic void onNext(String s) {Log.e(TAG, s);}};

Observer是接口,其中包含的方法有onCompleted()、onNext()、onError()。查看如下图所示Observer的视图结构:

图2.2.2 Observer观察者对象的视图结构


那么在RxJava中,Observer有其接口实现类对象Subscriber,它们的使用onNext、onCompleted、onError方法是一样的,但是Subscriber对于Observer接口进行了拓展,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用,代码如下:

 

        Subscriber<String> subscriber = new Subscriber<String>() {@Overridepublic void onStart() {Log.e(TAG, "onStart");}@Overridepublic void onCompleted() {Log.e(TAG, "onCompleted");}@Overridepublic void onError(Throwable e) {Log.e(TAG, "onError,Error Info is:" + e.getMessage());}@Overridepublic void onNext(String s) {Log.e(TAG, s);}};

其中,onStart()方法会在事件未发送前被调用,可以用于订阅关系建立前的准备工作,例如将数据清空或者重置,在Subscriber中默认是空实现,我们可以在该方法中调用自己的业务逻辑代码。在如下的视图结构中我们可以看到Subscriber的拓展内容,重点是add()、unsubscribe()方法以及名为subscription的Subscription队列

图2.2.3 Subscriber对象视图结构

 

(3)Subscribe订阅关系

Observable与observer形成订阅关系代码如下:

            observable.subscribe(observer);//或者observable.subscribe(subscriber);

那么我们以observable.subscribe(observer)为例在这里继续查看源码,查看subscribe()方法到底做了什么:

 

图2.3.1 Observable调用Subscribe将Observer转换为Subscriber对象

Observer转换为Subscriber对象在这里得到印证。

  • 在之后的内容中统一以Subscriber作为订阅观察者对象

继续深入,我们可以看到订阅关系中的关键步骤(仅核心代码):

            subscriber.onStart();RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);return RxJavaHooks.onObservableReturn(subscriber);

在这里RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)等价于OnSubscribe.call(subscriber),见下图2.3.2:

图2.3.2 RxJavaHooks.onObservableStart()转换为OnSubscribe

在return RxJavaHooks.onObservableReturn(subscriber)这里等价于return subscription,见下图2.3.3:

图2.3.3 RxJavaHooks.onObservableReturn()转换为Subscrition

 

  • 可以看到,subscriber() 做了3件事:
  1. 调用 Subscriber.onStart() 。该方法用于在订阅关系建立之前的准备。
  2. 调用 Observable 中的 OnSubscribe.call(Subscriber) 。OnSubscribe是Observable的内部接口,而事件发送的逻辑在这里开始运行。从这也可以看出,在 RxJava 中当 subscribe() 方法执行的时候订阅关系确立,Observable 开始发送事件。
  3. 将传入的 Subscriber 作为 Subscription 返回。这是为了方便后续调用unsubscribe()。

三、RxJava的不完整回调

1、不完整回调的代码示例

        Observable<String> observable = Observable.just("Alex", "Payne");Action1<String> onNextAction = new Action1<String>() {@Overridepublic void call(String s) {Log.e(TAG, s);}};Action1<Throwable> onErrorAction = new Action1<Throwable>() {@Overridepublic void call(Throwable throwable) {Log.e(TAG, "onError,Error Info is:" + throwable.getMessage());}};Action0 onCompletedAction = new Action0() {@Overridepublic void call() {Log.e(TAG, "onCompleted");}};// 根据onNextAction 来定义 onNext()observable.subscribe(onNextAction);// 根据onNextAction 来定义 onNext()、根据onErrorAction 来定义 onError()observable.subscribe(onNextAction, onErrorAction);// 根据onNextAction 来定义 onNext()、根据onErrorAction 来定义 onError()、onCompletedAction 来定义 onCompleted()observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

2、不完整回调的原理分析

在这里我们可以看到:

Action0无参数泛型无返回值类型,而Subscriber中的onCompleted()方法也没有参数泛型
Action1有1个参数泛型无返回值类型 ,onNextAction设置的参数泛型为String,而Subscriber中的onNext()方法参数泛型也是String(和本文中观察者对象中的OnNext方法对比)
Action1有1个参数泛型无返回值类型,onErrorAction设置的参数泛型为Throwable,而Subscriber中的onError()方法参数泛型也是Throwable

那么,我们来查看observable.subscribe(onNextAction)的源码,在这里, Action1可以被当成一个包装对象,将onNext()方法进行包装作为不完整的回调传入到observable.subscribe()中

图3.2.1 传入的onNextAction最终被包装成ActionSubscriber

 

我们来看看Action1有何玄机,Action1的源码如下图所示:

图3.2.2 Action1接口源码

实质上,这种根据参数泛型的个数且无返回值类型的包装在RxJava中有多种如下图所示的体现,例如Action0的参数个数为0,Action1的参数个数为1以此类推:

图3.2.3 根据参数泛型的个数且无返回值类型的包装

 

四、RxJava的线程切换

1、Scheduler线程调度器

如果不指定线程,默认是在调用subscribe方法的线程上进行回调,那么如果子线程中调用subscibe方法,而想在主线程中进行UI更新,则会抛出异常。当然了RxJava已经帮我们考虑到了这一点,所以提供了Scheduler线程调度器帮助我们进行线程之间的切换。
实质上,Scheduler线程调度器和RxJava的操作符有紧密关联,我将在下一篇文章中进行详细介绍。

  • RxJava内置了如下所示几个的线程调度器:
  1. Schedulers.immediate():在当前线程中执行
  2. Schedulers.newThread():启动新线程,在新线程中进行操作
  3. Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  4. Schedulers.computation():计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  5. Schedulers.trampoline():会将任务按添加顺序依次放入当前线程中等待执行。线程一次只执行一个任务,其余任务排队等待,一个任务都执行完成后再开始下一个任务的执行。
  • 此外RxJava还提供了用于测试的调度器Schedulers.test() 及 可自定义Scheduler—-Schedulers.form() 。

RxAndroid并且其为我们提供了AndroidSchedulers.mainThread()进行主线程的回调

2、线程控制

调用Observable对象的subscribeOn()、observeOn()方法即可完成线程控制。

  • subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
  • observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
            Observable.just("Alex", "Payne").subscribeOn(Schedulers.io())//指定 subscribe() 所发生的线程.unsubscribeOn(Schedulers.io())//事件发送完毕后,及时取消发送.observeOn(AndroidSchedulers.mainThread())//指定 Subscriber 所运行在的线程.subscribe(new Action1<String>() {@Overridepublic void call(String s) {Log.e(TAG, s);}});

五、总结

本文主要介绍了RxJava的由来、使用步骤、部分内容的原理解析。在下篇文章中我会详细介绍RxJava的操作符。希望本文对你在学习RxJava的路上有所启发。

小礼物走一走,来简书关注我



作者:Alex_Payne
链接:https://www.jianshu.com/p/2d3d7c77dc92
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。


http://chatgpt.dhexx.cn/article/1ujwJvrQ.shtml

相关文章

搭建物联网服务器(一):购买阿里云服务器

一、购买阿里云服务器 2019年双十一一起拼团抢购阿里云服务器&#xff0c;一年86元钱&#xff1b;打算做个物联网的服务器玩玩&#xff1b; 二、解决问题 阿里云经常出现连接中断&#xff0c;查出问题是服务器的保活机制影响链接&#xff0c;于是进行处理&#xff1b; 解决方…

物联网服务器协议命令,物联网使用HTTP协议传输数据

物联网设备使用HTTP协议传输数据也是一种常用的方式&#xff0c;而且HTTP协议是个很成熟的东西&#xff0c;在服务器上很容易部署。 HTTP协议并不是很复杂的东西。其处在互联网的应用层&#xff0c;因此这个协议只是规定了数据包的格式。具体的数据传递则是由TCP/IP来实现的。 …

物联网协议(设备到物联网服务)

物联网中的网络分为设备到设备的网络与设备到物联网服务的网络。其中&#xff0c;设备到物联网服务的网络通信协议有HTTP&#xff0c;Websocket&#xff0c;MQTT等。 HTTP协议 HTTP&#xff08;超文本传输协议&#xff09;是一个负责从万维网服务器获取超文本到本地浏览器的传…

物联网系统上位机源码,含服务器和客户端 物联网服务端程序

物联网系统上位机源码&#xff0c;含服务器和客户端 物联网服务端程序&#xff0c;可以接受市面上大多数透传数据的DTU登录&#xff0c;以及和DTU双向通讯 程序功能&#xff1a;能分组管理&#xff0c;不同的组别用户只可见自己组别的设备&#xff0c;设备和客户端登录掉线直观…

物联网python开发实践

文章目录 第1章 物联网邂逅python物联网组成架构发展现状典型应用使用python的理由python与网关/云平台 第2章 开启python之旅版本选择、搭建开发环境python语言介绍 第3章 python数据结构第4章 python高级特性第5章 物联网核心组件网络通信方案网络通信协议硬件物联网云平台 第…

物联网系统怎么部署服务器,如何搭建物联网云服务器

如何搭建物联网云服务器 内容精选 换一换 创建并登录弹性云服务器,请参见《弹性云服务器快速入门》中“购买弹性云服务器”和“登录弹性云服务器”。该弹性云服务器用于连接文档数据库实例,需要与待连接的实例处于同一虚拟私有云子网内。创建弹性云服务器时,要选择操作系统,…

DGIOT物联网开源平台——腾讯云轻量应用服务器部署

为了节省开发者和实施工程师的时间&#xff0c;降低部署难度&#xff0c;本文提供了一套基于linux系统的一键式部署方式&#xff0c;以便快速交付&#xff0c;提升学习和商用部署的效率。 安装部署 1.服务器&#xff08;本案例采用腾讯云服务器作为示例&#xff09; 腾讯云地…

ubuntu系统下搭建本地物联网mqtt服务器的步骤

摘要&#xff1a;mqtt broker&#xff08;服务器&#xff09;是物联网通信的核心&#xff0c;网上有很多种开源的服务器可供选择&#xff0c;本文介绍如何在ubuntu系统下安装emqx服务器&#xff0c;让大家可以在局域网环境下搭建“云服务器”&#xff0c;体验物联网的乐趣。本文…

【物联网】12.物联网服务器发送方式(HTTP,WebSocket ,MQTT )

发送服务器的目的在于向设备发送数据并控制设备。 这篇主要是利用HTTP、WebSocket、MQTT 协议来看看如何实现同步和异步传输。 HTTP 发送数据 发送服务器等待接收HTTP 请求的Web 服务器。设备向这台服务器申请发送数据&#xff0c;作为响应&#xff0c;服务器把数据发给设备。…

轻量级高并发物联网服务器接收程序源码

轻量级高并发物联网服务器接收程序源码 &#xff08;仅仅是接收硬件数据程序 &#xff0c;没有web端&#xff0c;不是java&#xff0c;协议自己写&#xff0c;如果问及这些问题统统不回复。&#xff09;&#xff0c;对接几万个设备没问题&#xff0c;数据库采用ef6sqlite&…

C#物联网平台服务器框架源码

C#物联网平台服务器框架源码。 这套带码是通过C#编写集成IOCP高性能高并发优势服务器服务源码。 带手机appdemo源码 具体具备功能如下&#xff1a; 1、具备EF6mssql数据库功能&#xff0c;可更改为MYSQL或SQLITe. 2、自带WEB API服务&#xff0c;抛弃IIS支持。 用户可以通…

成本360元的迷你物联网服务器有多香?

嗨&#xff0c;大家好&#xff0c;我们又见面了&#xff0c;前段时间比较忙&#xff0c;所以一直没更新。之前在自己的笔记本上搭建了blynk服务器和Domoticz服务器&#xff0c;但是一直开着笔记本插着电源对电池损害太大&#xff0c;也曾经在手机上安装了服务器&#xff0c;但是…

c#轻量级高并发物联网服务器接收程序源码

c#轻量级高并发物联网服务器接收程序源码&#xff08;仅仅是接收硬件数据程序&#xff0c;没有web端&#xff0c;不是java&#xff0c;协议自己写。 &#xff09;&#xff0c;对接几万个设备没问题&#xff0c;数据库采用ef6sqlite&#xff0c;可改efMySQL.该程序只是源码使用示…

基于ESP32搭建物联网服务器十三(自已搭建一个MQTT服务器)

在之前的文章中:ESP32搭建WEB服务器十二(使用MQTT协议与ESP32互动)_你的幻境的博客-CSDN博客 我们已经实现了ESP32通过MQTT协议连接到公共MQTT服务器上&#xff0c;但是公共服务器在稳定性或安全性上&#xff0c;很多时候无法保证。这时&#xff0c;我们除了可以购买比如: 阿…

阿里云搭建MQTT物联网服务器

一、MQTT简介 1、MQTT&#xff08;Message Queuing Telemetry Transport&#xff0c;消息队列遥测传输协议&#xff09;&#xff0c;是一种基于发布/订阅&#xff08;publish/subscribe&#xff09;模式的"轻量级"通讯协议&#xff0c;该协议构建于TCP/IP协议上&…

搭建一个物联网平台

搭建一个简单的物联网平台 之所以产生自己搭建平台的想法&#xff0c;是因为本来要使用 one* 平台完成一项作业&#xff0c;但是这个平台的官方文档写的太混乱了&#xff0c;有些地方写的有很简略&#xff0c;对我这种想要入门的小白来说不太友好&#xff0c;而且网上的第三方资…

物联网服务器搭建记录,心得

前言 在庞大的物联网世界中&#xff0c;自己拥有一台物联网服务器时多么一件美好的事。如同自己做的衣服更合身&#xff0c;自己搭建的物联网服务更灵活。 2022/3/9搭建服务 首次搭建了服务&#xff0c;但是感觉在安全方面是个很大的问题。每个设备都能连接&#xff0c;每个…

超微物联网超级服务器IoT SuperServer SYS-210SE-31A 评测

SYS-210SE-31A是Supermicro在日益流行的边缘服务器领域的最新产品。有趣的是&#xff0c;这款2单元430mm短深度服务器提供了3个热插拔节点&#xff0c;每个节点都有1个CPU和8个DIMM插槽。它还具有PCIe Gen4 x16扩展插槽、免工具可维护性和高达55℃的工作温度范围。对实现高度通…

JAVA-删除文件夹下所有文件的3种方法,推荐使用JDK8 Stream流的语法

一、删除文件或文件夹的四种基础方法 下面的四个方法都可以删除文件或文件夹&#xff0c;它们的共同点是&#xff1a;当文件夹中包含子文件的时候都会删除失败&#xff0c;也就是说这四个方法只能删除空文件夹。 需要注意的是&#xff1a;传统 IO 中的 File 类和 NIO 中的 Pat…

软件测试必看的5本书

最近好多朋友加我微信,问我有没有好一点的测试相关书籍推荐看一下,现具体介绍如下几本: 1. 软件测试的艺术(第3版) 软件测试的艺术 作 者:(美)梅耶(Myers, G. J.) 等著,张晓明,黄琳 译 出 版 社:机械工业出版社 简介:本书以一次自评价测试开篇,从软件测试的心理…