响应式编程之网络新约:RSocket

article/2025/8/18 2:15:23

响应式reactive是Java中高效应用的下一个前沿,但它目前主要有两个障碍:数据访问和网络。RSocket是一种新的第7层语言无关的应用网络协议(解决后者),它由Facebook,Netifi和Pivotal等工程师开发,提供Java,JavaScript,C ++和Kotlin等实现,RSocket与Servlet并不是同类的产品。

RSocket

RSocket RSocket是一个二进制的协议,以异步消息的方式提供4种对等的交互模型,以字节流的方式运行在TCP, WebSockets, Aeron等传输层之上。RSocket专门设计用于与Reactive风格应用配合使用,这些应用程序基本上是非阻塞的,并且通常(但不总是)与异步行为配对。它是传输无关的,支持 TCP、WebSocket和Aeron UDP协议,并支持无语义损失的混合传输协议——回压和流量控制仍然有效。

它还支持连接恢复。当你建立 RSocket 连接时,你可以指定前一个连接的 ID,如果流仍然在服务器的内存中,则你可以继续消费你的流。

Payload就是前面说到基于消息通讯,那就是拿到消息返回消息。对于一个消息来说,由两部分组成,原信息(metadata)和数据(data)。Mono和Flux是用来处理异步的关键字,这是Reactive编程要求。

public interface RSocket extends Availability, Closeable {/** * 推送元信息,数据可以自己定*/Mono<Void> metadataPush(Payload payload);/**请求/响应* 当你发送一个请求并接收一个响应时,该协议也比 HTTP 更具优势,因为它是异步且多路复用的*/Mono<Payload> requestResponse(Payload payload);/**即发即忘* 请求/响应的优化,在不需要响应时非常有用,比如用于非关键事件的日志记录*/Mono<Void> fireAndForget(Payload payload);/**请求/流* 类似于返回集合的请求/响应,集合将以流的方式返回,而不是等到查询完成,例如,发送一个银行帐号,使用一个实时的帐户事务流进行响应*/Flux<Payload> requestStream(Payload payload);/**通道* 允许任意交互模型的双向消息流*/Flux<Payload> requestChannel(Publisher<Payload> payloads);/**健康度检查* double值可以作为权重,如1.0表示处理能力非常好,0.8一般*/default double availability() {return isDisposed() ? 0.0 : 1.0;}
}

RSocket vs Servlet

Servlet是一套Java的API规范,基于HTTP协议之上。主要功能提供HTTP服务的class,就是通过HTTP Request,处理后,最终调用HTTP Response完成输出!

public abstract class HttpServlet extends Servlet {protected abstract void doGet(HttpServletRequest request,HttpServletResponse response)  throws ServletException, IOException;protected abstract void doPost(HttpServletRequest request,HttpServletResponse response)  throws ServletException, IOException;
}

协议层

Servlet基于HTTP协议的,HTTP并非非常简单,1.1,2.0版本开始是有点复杂的
RSocket自定义二进制协议,RSocket定位高性能通讯,比HTTP高非常多(号称10倍)

通讯模式

Servlet都是request/response模式,所以也叫做 request command,其他例如流式推送、fireAndForget和双向通讯,Servlet2.0都不支持,但是这些指令都是为浏览器设计的,并非为服务通讯设计的
RSocket对等通讯,不再介于传统的理解是Client -> Server模式,RSocket没有这个概念,大家的地位是对等的,都可以在server端,我调用你的服务,你也可以调用我的服务

message

ServletHTTP1.1是基于文本的通讯,2.0是基于message的(二进制),基于message的好处是异步化。message都必须有一个ID,这个消息发送出去后,就不用等立即返回,可以继续发其他message,收到message后,再根据返回的message ID和之前的发出去的message ID进行匹配。
RSocket基于message

RSocket && dubbo

Dubbo 在 3.0.0-SNAPSHOT 版本里基于 RSocket 对响应式编程提供了支持,用户可以非常方便的使用RSocket的语法。使用实例可以参阅官方,待正式版发布后,接触RSocket的机会也会越来越多。

RSocket &&  Spring

随着Spring Cloud的推出,Spring Framework 5.2 即将要把RSocket作为缺省的通讯协议,springBoot中提供相应支持。

RSocket && 微服务

RSocket的主要障碍是应用程序之间必须要用RSocket通讯。微服务普及后,其为了“简化”微服务之间的通讯,引入了很多层的技术栈。这当然是好事,但是很多的决定是由于收到上一代的通讯协议的技术所限制。

示例

spring-boot-starter-rsocket其实也已经封装好了,使用起来比下面例子更加简单方便,感觉离rpc更近了一步

public final class ChannelEchoClient {static final Payload payload1 = ByteBufPayload.create("Hello ");public static void main(String[] args) {RSocketFactory.receive().frameDecoder(PayloadDecoder.ZERO_COPY).acceptor(new SocketAcceptorImpl()).transport(LocalServerTransport.create("localhost")).start().subscribe();RSocket socket =RSocketFactory.connect().keepAliveAckTimeout(Duration.ofMinutes(10)).frameDecoder(PayloadDecoder.ZERO_COPY).transport(LocalClientTransport.create("localhost")).start().block();Flux.range(0, 100000000).concatMap(i -> socket.fireAndForget(payload1.retain())).blockLast();}private static class SocketAcceptorImpl implements SocketAcceptor {@Overridepublic Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) {return Mono.just(new AbstractRSocket() {@Overridepublic Mono<Void> fireAndForget(Payload payload) {//System.out.println(payload.getDataUtf8());payload.release();return Mono.empty();}@Overridepublic Mono<Payload> requestResponse(Payload payload) {return Mono.just(payload);}@Overridepublic Flux<Payload> requestChannel(Publisher<Payload> payloads) {return Flux.from(payloads).subscribeOn(Schedulers.single());}});}}
}
//request/response
public final class HelloWorldClient {public static void main(String[] args) {RSocketFactory.receive().acceptor((setupPayload, reactiveSocket) ->Mono.just(new AbstractRSocket() {boolean fail = true;@Overridepublic Mono<Payload> requestResponse(Payload p) {if (fail) {fail = false;return Mono.error(new Throwable());} else {return Mono.just(p);}}})).transport(TcpServerTransport.create("localhost", 7000)).start().subscribe();RSocket socket =RSocketFactory.connect().transport(TcpClientTransport.create("localhost", 7000)).start().block();socket.requestResponse(DefaultPayload.create("Hello")).map(Payload::getDataUtf8).onErrorReturn("error").doOnNext(System.out::println).block();socket.requestResponse(DefaultPayload.create("Hello")).map(Payload::getDataUtf8).onErrorReturn("error").doOnNext(System.out::println).block();socket.requestResponse(DefaultPayload.create("Hello")).map(Payload::getDataUtf8).onErrorReturn("error").doOnNext(System.out::println).block();socket.dispose();}
}
//request/stream
public final class StreamingClient {public static void main(String[] args) {RSocketFactory.receive().acceptor(new SocketAcceptorImpl()).transport(TcpServerTransport.create("localhost", 7000)).start().subscribe();RSocket socket =RSocketFactory.connect().transport(TcpClientTransport.create("localhost", 7000)).start().block();socket.requestStream(DefaultPayload.create("Hello")).map(Payload::getDataUtf8).doOnNext(System.out::println).take(10).then().doFinally(signalType -> socket.dispose()).then().block();}private static class SocketAcceptorImpl implements SocketAcceptor {@Overridepublic Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) {return Mono.just(new AbstractRSocket() {@Overridepublic Flux<Payload> requestStream(Payload payload) {return Flux.interval(Duration.ofMillis(100)).map(aLong -> DefaultPayload.create("Interval: " + aLong));}});}}
}

参阅资料

  • Introduction to RSocket

REST的最大限制是它与HTTP相关联,经常使用REST的原因是它易于调试,因为它是“人类可读”。开源RSocket专为服务而设计。它是一种面向连接的消息驱动协议,在应用程序级别具有内置流控制。它既可以在浏览器中同样使用,也可以在服务器上使用。这意味着您可以流式传输数据或执行Pub / Sub而无需设置应用程序队列。在Facebook,RSocket用于名为LiveServer的服务,该服务负责响应可被视为GraphQL订阅的实时查询。


http://chatgpt.dhexx.cn/article/tWFZwXb4.shtml

相关文章

一篇文章了解RSocket协议

RSocket是一个类似于HTTP的通讯协议。在了解Rsocket协议之前&#xff0c;先简单介绍下HTTP协议。 之所以推出springboot的技术&#xff0c;一个原因是因为前后端设计的分离。因为基于HTTP协议可以直接返回REST数据内容。 REST是一个简单且容易使用的异构处理架构&#xff0c;R…

RSocket 学习(一):初探

girl.jpg 一. RSocket 介绍 RSocket 是一种二进制字节流传输协议&#xff0c;位于 OSI 七层模型中的5、6层&#xff0c;对应 TCP/IP 模型中的应用层。RSocket 并没有规定必须使用何种底层传输层协议&#xff0c;开发者可以使用不同的底层传输协议&#xff0c;包括 TCP、WebSock…

RSocket——Http协议的替代者

1. 简介 RSocket是一种二进制的点对点通信协议&#xff0c;是一种新的网络通信第七层协议。旨在用于分布式应用程序中。从这个意义上讲&#xff0c;RSocket是HTTP等其他协议的替代方案。它是一种基于Reactive Streams规范具有异步&#xff0c;背压的双向&#xff0c;多路复用&…

java-语言学习-eclipse安装java汉化包

java的汉化&#xff1a; 1.打开链接&#xff1a;https://www.eclipse.org/babel/ 2.进入网页后&#xff0c;往下翻一点&#xff0c;看到一个download&#xff08;下图红框所示&#xff09;&#xff0c;点开。 3.进入一个页面后&#xff0c;往下翻一点点&#xff0c;看到有几…

安装Eclipse的中文语言包

安装Eclipse的中文语言包 下载语言包 1、进入网址 http://www.eclipse.org/babel 2、对应版本下载中文语言包 3、替换Eclipse软件安装文件的features、plugins&#xff0c;再次启动加载 4、替换出现The Eclipse executable launcher was unable to locate its companion l…

Eclipse 官方简体中文语言包下载地址及安装方法

转自: http://www.cnblogs.com/yaotong/archive/2011/12/28/2305421.html 打开Eclipse Babel Project 主页: http://www.eclipse.org/babel/downloads.php 根据Eclipse的版本找到相应的插件地址&#xff0c;复制下来。 进入Eclipse&#xff0c;选择Help->Install New Softwa…

Eclipse-中文语言包

Eclipse-中文语言包 效果图 版本使用中文语言包 下载过程 点击如下 打开插件市场 填写链接 填写此链接地址https://download.eclipse.org/technology/babel/update-site/latest/到下图 简体中文选择 简体中文选择 选择允许协议 安装代码提示插件codota 是什么 打开插件市场 搜…

Eclipse汉化 中文语言包下载安装 Babel Language Pack

相关链接 Java & Eclipse & Maven 使用配置方法 Eclipse平台上新建Java项目使用Junit测试 如何在Eclipse平台使用git从GitHub上下载文件至本地及管理本地git项目 Eclipse汉化 中文语言包下载安装 Babel Language Pack 点击进入Eclipse Babel Project 下拉可见 选择与自…

eclipse中文语言包安装(别看网上那些乱七八糟的,我这个最简单)

一、安装好JDK和eclispe。&#xff08;这个步骤不用多说了&#xff09; 二、步骤 1、找语言包并下载&#xff1a;https://www.eclipse.org/babel/downloads.php 找到汉化文件下载备用。 2、把下载好的文件复制到 eclipse的dropins文件夹中。 3、启动eclispe&#xff0c;汉化…

Eclipse语言包在官网下载不了-解决方案

可能家里的网络不好&#xff0c;在官网语言包一直出现这个问题&#xff0c;网上找了很久没找到解决方案&#xff0c;应该是家里网络和对方服务器不对付。就发现有下面这个选项&#xff0c;选了nanjing的节点就解决了。

eclipse汉化-设置语言包

Eclipse 是一个开放源代码的、基于Java的可扩展开发平台。就其本身而言&#xff0c;它只是一个框架和一组服务&#xff0c;用于通过插件组件构建开发环境。 一、通过插件的方式进行eclipse的汉化 1、登陆Eclipse Babel Project Downloads | The Eclipse Foundationhttps://ww…

基于遥感影像的道路提取论文、开源代码和数据集汇总

文章目录 前言2017DeepRoadMapperTopology Loss 2018RoadTraceriterative-deep-learning 2019Leveraging Crowdsourced GPS Data for Road Extraction from Aerial ImageryRoadNetRoadTaggerGenerative Graph Transformerroad_connectivityNL-LinkNet: Toward Lighter but More…

遥感影像云检测-云检测数据集信息及下载

常用云检测数据集信息及下载 1.LandSat7云量评估数据集2.LandSat8-Biome生物群落云量评估数据集3.LandSat8-38Cloud数据集4.高分系列-GF1-WHU遥感影像云数据集5.Sentinel-2 Cloud Mask Catalogue5.1.数据介绍5.2.数据集目录编排5.3.统计数据5.4.错误和不确定性 6.CESBIO数据集(…

AI实战营第二期 第七节 《语义分割与MMSegmentation》——笔记8

文章目录 摘要主要特性 案例什么是语义分割应用&#xff1a;无人驾驶汽车应用&#xff1a;人像分割应用&#xff1a;智能遥感应用 : 医疗影像分析 三种分割的区别语义分割的基本思路按颜色分割逐像素份分类全卷积网络 Fully Convolutional Network 2015存在问题 基于多层级特征…

【毕业设计_课程设计】基于 U-Net 网络的遥感图像语义分割(源码+论文)

文章目录 0 项目说明1 研究目的2 研究方法3 研究结论4 论文目录5 项目工程 0 项目说明 **基于 U-Net 网络的遥感图像语义分割 ** 提示&#xff1a;适合用于课程设计或毕业设计&#xff0c;工作量达标&#xff0c;源码开放 实验训练使用 Anaconda 版 Python 3.7 下的 TensorF…

EISeg——应用于语义分割的自动标注软件

1、基本介绍 EISeg(Efficient Interactive Segmentation)是以RITM及EdgeFlow算法为基础&#xff0c;基于飞桨开发的一个高效智能的交互式分割标注软件。涵盖了通用、人像、遥感、医疗等不同方向的高质量交互式分割模型&#xff0c;方便开发者快速实现语义及实例标签的标注&…

基于paddleSeg的自定义遥感数据语义分割——以DLRSD为例

转自AI Studio&#xff0c;原文链接&#xff1a;基于paddleSeg的自定义遥感数据语义分割——以DLRSD为例 - 飞桨AI Studio 基于paddleseg 2.1使用自定义数据集DLRSD,其他遥感数据集实现训练、测试、推理脚本版任务 数据格式准备 DLRSD数据集 基于UCMerced_LandUse数据集进行…

【计算机视觉】最全语义分割模型总结(从FCN到deeplabv3+)

文章目录 一、前言1.1 语义分割 二、FCN&#xff1a;CNN语义分割的开山之作2.1 结构2.2 特点 三、Deeplab_v13.1 前言3.2 特点 四、U-Net4.1 结构4.2 特点 五、Seg-Net5.1 结构5.2 特点 六、Deeplab_v26.1 结构6.2 特点6.3 Fcis6.3.1 特点 七、RefineNet7.1 结构7.2 特点 八、L…

语义分割网络系列2——Unet

目录 1 Unet网络介绍1.1 Unet论文1.2 简介1.3 6大特点 2 Unet网络3种不同的实现方式2.1 Unet网络的class实现&#xff08;mIou&#xff09;2.2 Unet网络的layer的实现&#xff08;mIou&#xff09;2.3 第3种实现方法&#xff08;存在问题&#xff0c;验证集准确率一直不变&…

毕业设计 U-Net遥感图像语义分割(源码+论文)

文章目录 0 项目说明1 研究目的2 研究方法3 研究结论4 论文目录5 项目源码6 最后 0 项目说明 **基于 U-Net 网络的遥感图像语义分割 ** 提示&#xff1a;适合用于课程设计或毕业设计&#xff0c;工作量达标&#xff0c;源码开放 实验训练使用 Anaconda 版 Python 3.7 下的 T…