ReactiveX -Rx

article/2025/10/23 12:12:44

转载:https://www.open-open.com/lib/view/open1440166491833.html

ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 http://reactivex.io/

什么是ReactiveX

微软给的定义是,Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序,使用Rx,开发者可以用Observables表示异步数据流,用LINQ操作符查询异步数据流, 用Schedulers参数化异步数据流的并发处理,Rx可以这样定义:Rx = Observables + LINQ + Schedulers。

ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。

ReactiveX的应用

很多公司都在使用ReactiveX,例如Microsoft、Netflix、Github、Trello、SoundCloud。

ReactiveX宣言

ReactiveX不仅仅是一个编程接口,它是一种编程思想的突破,它影响了许多其它的程序库和框架以及编程语言。

使用观察者模式

  • 创建:Rx可以方便的创建事件流和数据流
  • 组合:Rx使用查询式的操作符组合和变换数据流
  • 监听:Rx可以订阅任何可观察的数据流并执行操作

简化代码

  • 函数式风格:对可观察数据流使用无副作用的输入输出函数,避免了程序里错综复杂的状态 简化代码:Rx的操作符通通常可以将复杂的难题简化为很少的几行代码
  • 异步错误处理:传统的try/catch没办法处理异步计算,Rx提供了合适的错误处理机制
  • 轻松使用并发:Rx的Observables和Schedulers让开发者可以摆脱底层的线程同步和各种并发问题

使用Observable的优势

Rx扩展了观察者模式用于支持数据和事件序列,添加了一些操作符,它让你可以声明式的组合这些序列,而无需关注底层的实现:如线程、同步、线程安全、并发数据结构和非阻塞IO。

 

Observable通过使用最佳的方式访问异步数据序列填补了这个间隙

单个数据                        多个数据
同步  T getData()                 Iterable<T> getData()
异步  Future<T> getData()         Observable<T> getData()

 

Rx的Observable模型让你可以像使用集合数据一样操作异步事件流,对异步事件流使用各种简单、可组合的操作。

Observable可组合

对于单层的异步操作来说,Java中Future对象的处理方式是非常简单有效的,但是一旦涉及到嵌套,它们就开始变得异常繁琐和复杂。使用Future很难很好的组合带条件的异步执行流程(考虑到运行时各种潜在的问题,甚至可以说是不可能的),当然,要想实现还是可以做到的,但是非常困难,或许你可以用Future.get(),但这样做,异步执行的优势就完全没有了。从另一方面说,Rx的Observable一开始就是为组合异步数据流准备的。

Observable更灵活

Rx的Observable不仅支持处理单独的标量值(就像Future可以做的),也支持数据序列,甚至是无穷的数据流。Observable是一个抽象概念,适用于任何场景。Observable拥有它的近亲Iterable的全部优雅与灵活。

复制代码
Observable是异步的双向push,Iterable是同步的单向pull,对比:事件           Iterable(pull)          Observable(push)
获取数据        T next()                onNext(T)
异常处理        throws Exception        onError(Exception)
任务完成        !hasNext()              onCompleted()
复制代码

 

Observable无偏见

Rx对于对于并发性或异步性没有任何特殊的偏好,Observable可以用任何方式实现,线程池、事件循环、非阻塞IO、Actor模式,任何满足你的需求的,你擅长或偏好的方式都可以。无论你选择怎样实现它,无论底层实现是阻塞的还是非阻塞的,客户端代码将所有与Observable的交互都当做是异步的。

Observable是如何实现的?

public Observable<data> getData();

 

  • 它能与调用者在同一线程同步执行吗?
  • 它能异步地在单独的线程执行吗?
  • 它会将工作分发到多个线程,返回数据的顺序是任意的吗?
  • 它使用Actor模式而不是线程池吗?
  • 它使用NIO和事件循环执行异步网络访问吗?
  • 它使用事件循环将工作线程从回调线程分离出来吗?

从Observer的视角看,这些都无所谓,重要的是:使用Rx,你可以改变你的观念,你可以在完全不影响Observable程序库使用者的情况下,彻底的改变Observable的底层实现。

使用回调存在很多问题

回调在不阻塞任何事情的情况下,解决了Future.get()过早阻塞的问题。由于响应结果一旦就绪Callback就会被调用,它们天生就是高效率的。不过,就像使用Future一样,对于单层的异步执行来说,回调很容易使用,对于嵌套的异步组合,它们显得非常笨拙。

Rx是一个多语言的实现

Rx在大量的编程语言中都有实现,并尊重实现语言的风格,而且更多的实现正在飞速增加。

响应式编程

Rx提供了一系列的操作符,你可以使用它们来过滤(filter)、选择(select)、变换(transform)、结合(combine)和组合(compose)多个Observable,这些操作符让执行和复合变得非常高效。

你可以把Observable当做Iterable的推送方式的等价物,使用Iterable,消费者从生产者那拉取数据,线程阻塞直至数据准备好。使用Observable,在数据准备好时,生产者将数据推送给消费者。数据可以同步或异步的到达,这种方式更灵活。

下面的例子展示了相似的高阶函数在Iterable和Observable上的应用

复制代码
// Iterable getDataFromLocalMemory().skip(10).take(5).map({ s -> return s + " transformed" }).forEach({ println "next => " + it }) // Observable getDataFromNetwork().skip(10).take(5).map({ s -> return s + " transformed" }).subscribe({ println "onNext => " + it })
复制代码

 

Observable类型给GOF的观察者模式添加了两种缺少的语义,这样就和Iterable类型中可用的操作一致了:

  1. 生产者可以发信号给消费者,通知它没有更多数据可用了(对于Iterable,一个for循环正常完成表示没有数据了;对于Observable,就是调用观察者的onCompleted方法)
  2. 生产者可以发信号给消费者,通知它遇到了一个错误(对于Iterable,迭代过程中发生错误会抛出异常;对于Observable,就是调用观察者(Observer)的onError方法)

有了这两种功能,Rx就能使Observable与Iterable保持一致了,唯一的不同是数据流的方向。任何对Iterable的操作,你都可以对Observable使用。

名词定义

这里给出一些名词的翻译

  • Reactive 直译为反应性的,有活性的,根据上下文一般反以为反应式,响应式
  • Iterable 可迭代对象,支持以迭代器的形式遍历,许多语言中都存在这个概念
  • Observable 可观察对象,在Rx中定义为更强大的Iterable,在观察者模式中是被观察的对象,一旦数据产生或发生变化,会通过某种方式通知观察者或订阅者
  • Observer 观察者对象,监听Observable发射的数据并做出响应,Subscriber是它的一个特殊实现 *emit 直译为发射,发布,发出,含义是Observable在数据产生或变化时发送通知给Observer,调用Observer对应的方法,文章里一律译为发射 *items 直译为项目,条目,在Rx里是指Observable发射的数据项,文章里一律译为数据,数据项

转载于:https://www.cnblogs.com/start-fxw/p/11226937.html


http://chatgpt.dhexx.cn/article/qDInUrjm.shtml

相关文章

Java中ReactiveX(RxJava)的使用

1.1 ReactiveX概述 ReactiveX官网&#xff1a;ReactiveX 1.1.1 Android的MVP开发模式 MVP的工作流程 Presenter负责逻辑的处理Model提供数据View负责显示 作为一种新的模式&#xff0c;在MVP中View并不直接使用Model&#xff0c;它们之间的通信是通过Presenter来进行的&am…

ReactiveX-Observable

Observable &#xff08;此篇文章翻译自ReactiveX的官方文档) 在ReactiveX中&#xff0c;Observer&#xff08;观察者&#xff09;订阅了一个Observable&#xff08;观察对象&#xff09;。那么这个观察者会对观察对象发生的任何单一的变化&#xff0c;或者是一连串的变化做出…

ReactiveX 简介

Observable 在ReactiveX中&#xff0c;观察者订阅了一个Observable。然后&#xff0c;该观察者对Observable发出的任何项目或项目序列做出反应。 这种模式有利于并发操作&#xff0c;因为它不需要在等待Observable发出对象时阻塞&#xff0c;而是以观察者的形式创建一个哨兵&…

ReactiveX简介

一、ReactiveX简介 在学习RxJava前首先需要了解ReactiveX&#xff0c;因为RxJava是ReactiveX的一种Java的实现形式。 ReactiveX的官网地址为&#xff1a; http://reactivex.io/ReactiveX官网对于自身的介绍是&#xff1a; An API for asynchronous programming with observa…

搭建物联网服务器(一):购买阿里云服务器

一、购买阿里云服务器 2019年双十一一起拼团抢购阿里云服务器&#xff0c;一年86元钱&#xff1b;打算做个物联网的服务器玩玩&#xff1b; 二、解决问题 阿里云经常出现连接中断&#xff0c;查出问题是服务器的保活机制影响链接&#xff0c;于是进行处理&#xff1b; 解决方…

物联网服务器协议命令,物联网使用HTTP协议传输数据

物联网设备使用HTTP协议传输数据也是一种常用的方式&#xff0c;而且HTTP协议是个很成熟的东西&#xff0c;在服务器上很容易部署。 HTTP协议并不是很复杂的东西。其处在互联网的应用层&#xff0c;因此这个协议只是规定了数据包的格式。具体的数据传递则是由TCP/IP来实现的。 …

物联网协议(设备到物联网服务)

物联网中的网络分为设备到设备的网络与设备到物联网服务的网络。其中&#xff0c;设备到物联网服务的网络通信协议有HTTP&#xff0c;Websocket&#xff0c;MQTT等。 HTTP协议 HTTP&#xff08;超文本传输协议&#xff09;是一个负责从万维网服务器获取超文本到本地浏览器的传…

物联网系统上位机源码,含服务器和客户端 物联网服务端程序

物联网系统上位机源码&#xff0c;含服务器和客户端 物联网服务端程序&#xff0c;可以接受市面上大多数透传数据的DTU登录&#xff0c;以及和DTU双向通讯 程序功能&#xff1a;能分组管理&#xff0c;不同的组别用户只可见自己组别的设备&#xff0c;设备和客户端登录掉线直观…

物联网python开发实践

文章目录 第1章 物联网邂逅python物联网组成架构发展现状典型应用使用python的理由python与网关/云平台 第2章 开启python之旅版本选择、搭建开发环境python语言介绍 第3章 python数据结构第4章 python高级特性第5章 物联网核心组件网络通信方案网络通信协议硬件物联网云平台 第…

物联网系统怎么部署服务器,如何搭建物联网云服务器

如何搭建物联网云服务器 内容精选 换一换 创建并登录弹性云服务器,请参见《弹性云服务器快速入门》中“购买弹性云服务器”和“登录弹性云服务器”。该弹性云服务器用于连接文档数据库实例,需要与待连接的实例处于同一虚拟私有云子网内。创建弹性云服务器时,要选择操作系统,…

DGIOT物联网开源平台——腾讯云轻量应用服务器部署

为了节省开发者和实施工程师的时间&#xff0c;降低部署难度&#xff0c;本文提供了一套基于linux系统的一键式部署方式&#xff0c;以便快速交付&#xff0c;提升学习和商用部署的效率。 安装部署 1.服务器&#xff08;本案例采用腾讯云服务器作为示例&#xff09; 腾讯云地…

ubuntu系统下搭建本地物联网mqtt服务器的步骤

摘要&#xff1a;mqtt broker&#xff08;服务器&#xff09;是物联网通信的核心&#xff0c;网上有很多种开源的服务器可供选择&#xff0c;本文介绍如何在ubuntu系统下安装emqx服务器&#xff0c;让大家可以在局域网环境下搭建“云服务器”&#xff0c;体验物联网的乐趣。本文…

【物联网】12.物联网服务器发送方式(HTTP,WebSocket ,MQTT )

发送服务器的目的在于向设备发送数据并控制设备。 这篇主要是利用HTTP、WebSocket、MQTT 协议来看看如何实现同步和异步传输。 HTTP 发送数据 发送服务器等待接收HTTP 请求的Web 服务器。设备向这台服务器申请发送数据&#xff0c;作为响应&#xff0c;服务器把数据发给设备。…

轻量级高并发物联网服务器接收程序源码

轻量级高并发物联网服务器接收程序源码 &#xff08;仅仅是接收硬件数据程序 &#xff0c;没有web端&#xff0c;不是java&#xff0c;协议自己写&#xff0c;如果问及这些问题统统不回复。&#xff09;&#xff0c;对接几万个设备没问题&#xff0c;数据库采用ef6sqlite&…

C#物联网平台服务器框架源码

C#物联网平台服务器框架源码。 这套带码是通过C#编写集成IOCP高性能高并发优势服务器服务源码。 带手机appdemo源码 具体具备功能如下&#xff1a; 1、具备EF6mssql数据库功能&#xff0c;可更改为MYSQL或SQLITe. 2、自带WEB API服务&#xff0c;抛弃IIS支持。 用户可以通…

成本360元的迷你物联网服务器有多香?

嗨&#xff0c;大家好&#xff0c;我们又见面了&#xff0c;前段时间比较忙&#xff0c;所以一直没更新。之前在自己的笔记本上搭建了blynk服务器和Domoticz服务器&#xff0c;但是一直开着笔记本插着电源对电池损害太大&#xff0c;也曾经在手机上安装了服务器&#xff0c;但是…

c#轻量级高并发物联网服务器接收程序源码

c#轻量级高并发物联网服务器接收程序源码&#xff08;仅仅是接收硬件数据程序&#xff0c;没有web端&#xff0c;不是java&#xff0c;协议自己写。 &#xff09;&#xff0c;对接几万个设备没问题&#xff0c;数据库采用ef6sqlite&#xff0c;可改efMySQL.该程序只是源码使用示…

基于ESP32搭建物联网服务器十三(自已搭建一个MQTT服务器)

在之前的文章中:ESP32搭建WEB服务器十二(使用MQTT协议与ESP32互动)_你的幻境的博客-CSDN博客 我们已经实现了ESP32通过MQTT协议连接到公共MQTT服务器上&#xff0c;但是公共服务器在稳定性或安全性上&#xff0c;很多时候无法保证。这时&#xff0c;我们除了可以购买比如: 阿…

阿里云搭建MQTT物联网服务器

一、MQTT简介 1、MQTT&#xff08;Message Queuing Telemetry Transport&#xff0c;消息队列遥测传输协议&#xff09;&#xff0c;是一种基于发布/订阅&#xff08;publish/subscribe&#xff09;模式的"轻量级"通讯协议&#xff0c;该协议构建于TCP/IP协议上&…

搭建一个物联网平台

搭建一个简单的物联网平台 之所以产生自己搭建平台的想法&#xff0c;是因为本来要使用 one* 平台完成一项作业&#xff0c;但是这个平台的官方文档写的太混乱了&#xff0c;有些地方写的有很简略&#xff0c;对我这种想要入门的小白来说不太友好&#xff0c;而且网上的第三方资…