ReactiveX 简介

article/2025/10/23 12:06:05

Observable

在ReactiveX中,观察者订阅了一个Observable。然后,该观察者对Observable发出的任何项目或项目序列做出反应。

这种模式有利于并发操作,因为它不需要在等待Observable发出对象时阻塞,而是以观察者的形式创建一个哨兵,随时准备在Observable所做的任何时候做出适当的反应。这个页面解释了反应模式是什么以及Observables和观察者是什么(以及观察者如何订阅Observables)。

其他页面显示了如何使用各种Observable运算符将Observable链接在一起并更改其行为。本文档附有“大理石图表”的说明。以下是大理石图表如何表示Observables的Observables和转换:

Background

在许多软件编程任务中,您或多或少地期望您编写的指令将按照您编写的顺序逐个执行并逐步完成。但是在ReactiveX中,许多指令可以并行执行,其结果随后由“观察者”按任意顺序捕获。而不是调用方法,而是以“Observable”的形式定义检索和转换数据的机制。,“然后订阅一个观察者,此时,先前定义的机制开始行动,观察员站立哨兵,以便在准备好时捕获并响应其排放。

这种方法的一个优点是,当你有一堆不依赖于彼此的任务时,你可以同时启动所有任务,而不是等到每个任务完成后再开始下一个 - 这样,你的整个捆绑任务只需要与捆绑中最长的任务一样长。

有许多术语用于描述这种异步编程和设计模型。本文档将使用以下术语:观察者订阅Observable。Observable通过调用观察者的方法发出项目或向其观察者发送通知。

在其他文件和其他背景下,我们所谓的“观察者”有时被称为“用户”,“观察者”或“反应堆”。这种模型通常被称为“反应堆模式”。

Establishing Observers

此页面使用类似Groovy的伪代码作为示例,但在许多语言中都有ReactiveX实现。

在普通的方法调用中 - 也就是说,不是ReactiveX中典型的异步并行调用 - 流程是这样的:

  1. 调用方法。
  2. 将该方法的返回值存储在变量中。
  3. 使用该变量及其新值来做一些有用的事情。
// make the call, assign its return value to `returnVal`
returnVal = someMethod(itsParameters);
// do something useful with returnVal

在异步模型中,流程更像是:

  1. 定义一个方法,该方法对异步调用的返回值执行一些有用的操作;这种方法是观察者的一部分。
  2. 将异步调用本身定义为Observable。
  3. 通过订阅将观察者附加到该Observable(这也启动了Observable的操作)。
  4. 继续你的事业;每当调用返回时,观察者的方法将开始对其返回值或值进行操作 -  Observable发出的项。

看起来像这样:

// defines, but does not invoke, the Subscriber's onNext handler
// (in this example, the observer is very simple and has only an onNext handler)
def myOnNext = { it -> do something useful with it };
// defines, but does not invoke, the Observable
def myObservable = someObservable(itsParameters);
// subscribes the Subscriber to the Observable, and invokes the Observable
myObservable.subscribe(myOnNext);
// go on about my business

onNext, onCompleted, and onError

Subscribe方法是将观察者连接到Observable的方法。您的观察者实现以下方法的某些子集:

  • onNext

只要Observable发出一个项目,Observable就会调用此方法。此方法将Observable发出的项作为参数。

  • onError

Observable调用此方法以指示它无法生成预期数据或遇到其他一些错误。它不会进一步调用onNext或onCompleted。onError方法将参数指示导致错误的原因。

  • onCompleted

如果Observable在最后一次调用onNext,如果它没有遇到任何错误,则调用此方法。

根据Observable合约的条款,它可以调用onNext零次或多次,然后可以通过调用onCompleted或onError而不是两者来跟随这些调用,这将是它的最后一次调用。按照惯例,在本文档中,对onNext的调用通常称为项目的“发射”,而对onCompleted或onError的调用则称为“通知”。

更完整的订阅调用示例如下所示:

def myOnNext     = { item -> /* do something useful with item */ };
def myError      = { throwable -> /* react sensibly to a failed call */ };
def myComplete   = { /* clean up after the final response */ };
def myObservable = someMethod(itsParameters);
myObservable.subscribe(myOnNext, myError, myComplete);
// go on about my business

Unsubscribing

在一些ReactiveX实现中,有一个专门的观察​​者接口Subscriber,它实现了一个unsubscribe方法。您可以调用此方法来指示订阅服务器不再对其当前订阅的任何Observable感兴趣。那些Observable可以(如果他们没有其他感兴趣的观察者)选择停止生成要发出的新项目。

此取消订阅的结果将通过适用于观察者订阅的Observable的运算符链级联,这将导致链中的每个链接停止发出项目。但是,这并不能保证立即发生,即使在没有观察者观察这些发射之后,Observable也有可能产生并尝试发射物品一段时间。

Some Notes on Naming Conventions

ReactiveX的每种语言特定实现都有自己的命名怪癖。虽然实现之间存在许多共性,但没有规范的命名标准。

此外,这些名称中的一些在其他情境中具有不同的含义,或者在特定实现语言的习语中看起来很尴尬。例如,有onEvent命名模式(例如onNext,onCompleted,onError)。

在某些上下文中,这些名称将指示通过哪些方法注册事件处理程序。但是,在ReactiveX中,它们自己命名事件处理程序。

“Hot” and “Cold” Observables

Observable何时开始发出其物品序列?这取决于Observable。“热”Observable可以在创建项目后立即开始发出项目,因此任何后来订阅该Observable的观察者都可以开始在中间某处观察序列。另一方面,“冷”Observable等待观察者在开始发射物品之前订阅它,因此这样的观察者保证从一开始就看到整个序列。

在ReactiveX的一些实现中,还存在称为“可连接”可观察的东西。这样的Observable在调用Connect方法之前不会开始发出项目,无论是否有任何观察者都订阅了它。

Composition via Observable Operators

Observable和观察者只是ReactiveX的开始。

它们本身只不过是标准观察者模式的轻微扩展,更适合处理一系列事件而不是单个回调。

真正的力量来自“反应式扩展”(因此是“ReactiveX”) - 允许您转换,组合,操作和处理Observable发出的项目序列的运算符。

这些Rx运算符允许您以声明方式组合异步序列,同时具有回调的所有效率优势,但没有嵌套通常与异步系统相关联的回调处理程序的缺点。

本文档将有关各种运算符的信息及其用法示例分组到以下页面中:

Creating Observables

CreateDeferEmpty/Never/ThrowFromIntervalJustRangeRepeatStart, and Timer

Transforming Observable Items

BufferFlatMapGroupByMapScan, and Window

Filtering Observables

DebounceDistinctElementAtFilterFirstIgnoreElementsLastSampleSkipSkipLastTake, and TakeLast

Combining Observables

And/Then/WhenCombineLatestJoinMergeStartWithSwitch, and Zip

Error Handling Operators

Catch and Retry

Utility Operators

DelayDoMaterialize/DematerializeObserveOnSerializeSubscribeSubscribeOn,TimeIntervalTimeoutTimestamp, and Using

Conditional and Boolean Operators

AllAmbContainsDefaultIfEmptySequenceEqualSkipUntilSkipWhileTakeUntil, and TakeWhile

Mathematical and Aggregate Operators

AverageConcatCountMaxMinReduce, and Sum

Converting Observables

To

Connectable Observable Operators

ConnectPublishRefCount, and Replay

Backpressure Operators

a variety of operators that enforce particular flow-control policies

这些页面包含一些运算符的信息,这些运算符不属于ReactiveX的核心,而是在一个或多个特定于语言的实现和/或可选模块中实现。

Chaining Operators

大多数运算符都在Observable上运行并返回一个Observable。这允许您在链中一个接一个地应用这些运算符。链中的每个运算符都会修改由前一个运算符的运算产生的Observable。

还有其他模式,如Builder模式,其中特定类的各种方法通过方法的操作修改该对象,对同一类的项进行操作。这些模式还允许您以类似的方式链接方法。

但是在Builder模式中,方法在链中出现的顺序通常并不重要,因为Observable运算符的顺序很重要。一组Observable运算符不能在原始的Observable上独立运行,它发起链,但它们依次运行,每个运算符都由运算符在链中的前一个运算符生成。


http://chatgpt.dhexx.cn/article/a4UWh2S4.shtml

相关文章

ReactiveX简介

一、ReactiveX简介 在学习RxJava前首先需要了解ReactiveX,因为RxJava是ReactiveX的一种Java的实现形式。 ReactiveX的官网地址为: http://reactivex.io/ReactiveX官网对于自身的介绍是: An API for asynchronous programming with observa…

搭建物联网服务器(一):购买阿里云服务器

一、购买阿里云服务器 2019年双十一一起拼团抢购阿里云服务器,一年86元钱;打算做个物联网的服务器玩玩; 二、解决问题 阿里云经常出现连接中断,查出问题是服务器的保活机制影响链接,于是进行处理; 解决方…

物联网服务器协议命令,物联网使用HTTP协议传输数据

物联网设备使用HTTP协议传输数据也是一种常用的方式,而且HTTP协议是个很成熟的东西,在服务器上很容易部署。 HTTP协议并不是很复杂的东西。其处在互联网的应用层,因此这个协议只是规定了数据包的格式。具体的数据传递则是由TCP/IP来实现的。 …

物联网协议(设备到物联网服务)

物联网中的网络分为设备到设备的网络与设备到物联网服务的网络。其中,设备到物联网服务的网络通信协议有HTTP,Websocket,MQTT等。 HTTP协议 HTTP(超文本传输协议)是一个负责从万维网服务器获取超文本到本地浏览器的传…

物联网系统上位机源码,含服务器和客户端 物联网服务端程序

物联网系统上位机源码,含服务器和客户端 物联网服务端程序,可以接受市面上大多数透传数据的DTU登录,以及和DTU双向通讯 程序功能:能分组管理,不同的组别用户只可见自己组别的设备,设备和客户端登录掉线直观…

物联网python开发实践

文章目录 第1章 物联网邂逅python物联网组成架构发展现状典型应用使用python的理由python与网关/云平台 第2章 开启python之旅版本选择、搭建开发环境python语言介绍 第3章 python数据结构第4章 python高级特性第5章 物联网核心组件网络通信方案网络通信协议硬件物联网云平台 第…

物联网系统怎么部署服务器,如何搭建物联网云服务器

如何搭建物联网云服务器 内容精选 换一换 创建并登录弹性云服务器,请参见《弹性云服务器快速入门》中“购买弹性云服务器”和“登录弹性云服务器”。该弹性云服务器用于连接文档数据库实例,需要与待连接的实例处于同一虚拟私有云子网内。创建弹性云服务器时,要选择操作系统,…

DGIOT物联网开源平台——腾讯云轻量应用服务器部署

为了节省开发者和实施工程师的时间,降低部署难度,本文提供了一套基于linux系统的一键式部署方式,以便快速交付,提升学习和商用部署的效率。 安装部署 1.服务器(本案例采用腾讯云服务器作为示例) 腾讯云地…

ubuntu系统下搭建本地物联网mqtt服务器的步骤

摘要:mqtt broker(服务器)是物联网通信的核心,网上有很多种开源的服务器可供选择,本文介绍如何在ubuntu系统下安装emqx服务器,让大家可以在局域网环境下搭建“云服务器”,体验物联网的乐趣。本文…

【物联网】12.物联网服务器发送方式(HTTP,WebSocket ,MQTT )

发送服务器的目的在于向设备发送数据并控制设备。 这篇主要是利用HTTP、WebSocket、MQTT 协议来看看如何实现同步和异步传输。 HTTP 发送数据 发送服务器等待接收HTTP 请求的Web 服务器。设备向这台服务器申请发送数据,作为响应,服务器把数据发给设备。…

轻量级高并发物联网服务器接收程序源码

轻量级高并发物联网服务器接收程序源码 (仅仅是接收硬件数据程序 ,没有web端,不是java,协议自己写,如果问及这些问题统统不回复。),对接几万个设备没问题,数据库采用ef6sqlite&…

C#物联网平台服务器框架源码

C#物联网平台服务器框架源码。 这套带码是通过C#编写集成IOCP高性能高并发优势服务器服务源码。 带手机appdemo源码 具体具备功能如下: 1、具备EF6mssql数据库功能,可更改为MYSQL或SQLITe. 2、自带WEB API服务,抛弃IIS支持。 用户可以通…

成本360元的迷你物联网服务器有多香?

嗨,大家好,我们又见面了,前段时间比较忙,所以一直没更新。之前在自己的笔记本上搭建了blynk服务器和Domoticz服务器,但是一直开着笔记本插着电源对电池损害太大,也曾经在手机上安装了服务器,但是…

c#轻量级高并发物联网服务器接收程序源码

c#轻量级高并发物联网服务器接收程序源码(仅仅是接收硬件数据程序,没有web端,不是java,协议自己写。 ),对接几万个设备没问题,数据库采用ef6sqlite,可改efMySQL.该程序只是源码使用示…

基于ESP32搭建物联网服务器十三(自已搭建一个MQTT服务器)

在之前的文章中:ESP32搭建WEB服务器十二(使用MQTT协议与ESP32互动)_你的幻境的博客-CSDN博客 我们已经实现了ESP32通过MQTT协议连接到公共MQTT服务器上,但是公共服务器在稳定性或安全性上,很多时候无法保证。这时,我们除了可以购买比如: 阿…

阿里云搭建MQTT物联网服务器

一、MQTT简介 1、MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上&…

搭建一个物联网平台

搭建一个简单的物联网平台 之所以产生自己搭建平台的想法,是因为本来要使用 one* 平台完成一项作业,但是这个平台的官方文档写的太混乱了,有些地方写的有很简略,对我这种想要入门的小白来说不太友好,而且网上的第三方资…

物联网服务器搭建记录,心得

前言 在庞大的物联网世界中,自己拥有一台物联网服务器时多么一件美好的事。如同自己做的衣服更合身,自己搭建的物联网服务更灵活。 2022/3/9搭建服务 首次搭建了服务,但是感觉在安全方面是个很大的问题。每个设备都能连接,每个…

超微物联网超级服务器IoT SuperServer SYS-210SE-31A 评测

SYS-210SE-31A是Supermicro在日益流行的边缘服务器领域的最新产品。有趣的是,这款2单元430mm短深度服务器提供了3个热插拔节点,每个节点都有1个CPU和8个DIMM插槽。它还具有PCIe Gen4 x16扩展插槽、免工具可维护性和高达55℃的工作温度范围。对实现高度通…

JAVA-删除文件夹下所有文件的3种方法,推荐使用JDK8 Stream流的语法

一、删除文件或文件夹的四种基础方法 下面的四个方法都可以删除文件或文件夹,它们的共同点是:当文件夹中包含子文件的时候都会删除失败,也就是说这四个方法只能删除空文件夹。 需要注意的是:传统 IO 中的 File 类和 NIO 中的 Pat…