基于RSocket协议实现客户端与服务端通信

article/2025/8/18 2:13:36

RSocket基础开发demo

package com.pshdhx.rsocket;import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.util.DefaultPayload;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;@Slf4j
public class MessageRSocketHandler implements RSocket {@Overridepublic Mono<Void> fireAndForget(Payload payload) { //无响应 用于日志String message = payload.getDataUtf8(); //获取数据log.info("[fireAndForget]接收请求数据:{}",message);return Mono.empty();}@Overridepublic Mono<Payload> requestResponse(Payload payload) { //传统模式 有请求有响应String message = payload.getDataUtf8(); //获取数据log.info("[RequestAndResponse]接收请求数据:{}",message);return Mono.just(DefaultPayload.create("[echo]"+message));}@Overridepublic Flux<Payload> requestStream(Payload payload) { //处理流数据String message = payload.getDataUtf8(); //获取数据log.info("[RequestStream]接收请求数据:{}",message);return Flux.fromStream(message.chars()          //将接收到的字符串转换为int型数据流.mapToObj(c->Character.toUpperCase(c)) //将里边的每一个字符编码大写.map(Object::toString)//将字符转为String.map(DefaultPayload::create)); //创建payload附加数据}@Overridepublic Flux<Payload> requestChannel(Publisher<Payload> publisher) { //双向流return Flux.from(publisher).map(Payload::getDataUtf8).map(msg->{log.info("【RequestChannel】接收请求数据:{}",msg);return msg;}).map(DefaultPayload::create);}@Overridepublic Mono<Void> metadataPush(Payload payload) {return null;}@Overridepublic Mono<Void> onClose() {return null;}@Overridepublic void dispose() {}
}

RSocket服务器开发

package com.pshdhx.controller;import com.pshdhx.message.MessageService;
import com.pshdhx.vo.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.time.Duration;@Controller
//不使用rest
@Slf4j
public class MessageFluxController {@Autowiredprivate MessageService messageService;@MessageMapping("message.echo")public Mono<Message> echoMessage(Mono<Message> message){return message.doOnNext(msg->this.messageService.echo(msg)) //响应处理.doOnNext(msg->log.info("消息接收{}",message));}@MessageMapping("message.delete")public void deleteMessage(Mono<String> title){title.doOnNext(msg->log.info("消息删除{}",msg)).subscribe();}@MessageMapping("message.list")public Flux<Message> listMessage(){return Flux.fromStream(this.messageService.list().stream());}@MessageMapping("message.get")public Flux<Message> getMessage(Flux<String> title){return title.doOnNext(t->log.info("消息查询{}",t)).map(titleInfo->titleInfo.toLowerCase()).map(this.messageService::get).delayElements(Duration.ofSeconds(1));}
}

RSocket客户端开发

配置策略和通信

package com.pshdhx.config;import io.rsocket.RSocket;
import io.rsocket.transport.netty.client.TcpClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.http.codec.cbor.Jackson2CborDecoder;
import org.springframework.http.codec.cbor.Jackson2CborEncoder;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;import java.time.Duration;@Configuration
public class RSocketConfig {/*** 配置策略,编码和解码*/@Beanpublic RSocketStrategies getRSocketStrategies(){return RSocketStrategies.builder().encoders(encoders -> encoders.add(new Jackson2CborEncoder())).decoders(decoders -> decoders.add(new Jackson2CborDecoder())).build();}/*** 配置RSocket连接策略*/@Beanpublic Mono<RSocketRequester> getRSocketRequester(RSocketRequester.Builder builder){return Mono.just(builder.rsocketConnector(rSocketConnector -> rSocketConnector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2)))).dataMimeType(MediaType.APPLICATION_CBOR).transport(TcpClientTransport.create(6869)));}}

客户端模拟调用

import com.pshdhx.AppClient;
import com.pshdhx.vo.Message;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.context.web.WebAppConfiguration;
import reactor.core.publisher.Mono;@ExtendWith(SpringExtension.class)
@WebAppConfiguration
@SpringBootTest(classes = AppClient.class)
public class TestRSocketClient {@Autowiredprivate Mono<RSocketRequester> requesterMono; //来进行服务调用@Testpublic void testEchoMessage(){ //测试服务响应this.requesterMono.map(r->r.route("message.echo").data(new Message("pshdhx","fighting"))).flatMap(r->r.retrieveMono(Message.class)).doOnNext(o-> System.out.println(o)).block();}@Testpublic void testDeleteMessage(){this.requesterMono.map(r->r.route("message.delete").data("pshdhx")).flatMap(RSocketRequester.RetrieveSpec::send).block();}@Testpublic void testListMessage(){this.requesterMono.map(r->r.route("message.list")).flatMapMany(r->r.retrieveFlux(Message.class)).doOnNext(o->System.out.println(o)).blockLast();}}

RSocket实现文件上传

RSocket协议由于本身是基于二进制传输,所以也提供了方便文件上传的处理支持。而在进行文件上传时,并不是将一个文件整体上传,而是采用了文件块(chunk)的形式进行上传文件的切割,利用Flux包裹要上传的一组文件块,而在服务器接收到该文件块之后,会通过专属的通道进行保存,同时也会将上传的状态发送到客户端。

 

 common模块

package com.pshdhx.type;public enum  UploadStatus {CHUNK_COMPLETED, //文件上传处理之中COMPLETED, //文件上传完毕FAILED;//失败}package com.pshdhx.constants;public class UploadConstants {public static final String MINE_FILE_NAME = "message/x.upload.file.name";public static final String MINE_FILE_EXTENSION = "message/x.upload.file.extension";public static final String FILE_NAME = "file.name";public static final String FILE_EXT = "file.ext";
}

服务器端:

package com.pshdhx.config;import com.pshdhx.constants.UploadConstants;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.codec.cbor.Jackson2CborDecoder;
import org.springframework.http.codec.cbor.Jackson2CborEncoder;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeType;@Configuration
public class RSocketServerConfig {@Beanpublic RSocketStrategies getRSocketStrategies(){return RSocketStrategies.builder().encoders(encoders -> encoders.add(new Jackson2CborEncoder())).decoders(decoders -> decoders.add(new Jackson2CborDecoder())).metadataExtractorRegistry(metadataExtractorRegistry -> {metadataExtractorRegistry.metadataToExtract(MimeType.valueOf(UploadConstants.MINE_FILE_NAME),String.class,UploadConstants.FILE_NAME);metadataExtractorRegistry.metadataToExtract(MimeType.valueOf(UploadConstants.MINE_FILE_EXTENSION),String.class,UploadConstants.MINE_FILE_EXTENSION);}).build();}
}
@Controller
//不使用rest
@Slf4j
public class MessageFluxController {@Autowiredprivate MessageService messageService;@Value("${output.file.path.upload}")private Path outputPath;@MessageMapping("message.upload")public Flux<UploadStatus> upload(@Headers Map<String,Object> metadata,@Payload Flux<DataBuffer> content) throws  Exception{log.info("【上传后路径】outputPaht={}",this.outputPath);var fileName = metadata.get(UploadConstants.FILE_NAME);var fileExt = metadata.get(UploadConstants.MINE_FILE_EXTENSION);var path = Paths.get(fileName+"."+fileExt);log.info("【文件上传】fileName={}、fileExt = {},path={}",fileName,fileExt,path);AsynchronousFileChannel channel = AsynchronousFileChannel.open(this.outputPath.resolve(path),StandardOpenOption.CREATE, //文件创建StandardOpenOption.WRITE  //文件写入);//异步文件通道return Flux.concat(DataBufferUtils.write(content,channel).map(s->UploadStatus.CHUNK_COMPLETED),Mono.just(UploadStatus.COMPLETED)).doOnComplete(()->{try {channel.close();}catch(Exception e){e.printStackTrace();}}).onErrorReturn(UploadStatus.FAILED);}

客户端:

    @Value("classpath:/images/pic.jpg")private Resource resource;@Testpublic void testUpload(){String fileName = "pshdhx"+ UUID.randomUUID();String fileExt = this.resource.getFilename().substring(this.resource.getFilename().lastIndexOf(".")+1);Flux<DataBuffer> resourceFlux = DataBufferUtils.read(this.resource,new DefaultDataBufferFactory(),1024).doOnNext(s-> System.out.println("文件上传:"+s));Flux<UploadStatus> uploadStatusFlux = this.requesterMono.map(r->r.route("message.upload").metadata(metadataSpec -> {System.out.println("【上传测试:】文件名称"+fileName+"."+fileExt);metadataSpec.metadata(fileName, MimeType.valueOf(UploadConstants.MINE_FILE_NAME));metadataSpec.metadata(fileExt, MimeType.valueOf(UploadConstants.MINE_FILE_EXTENSION));}).data(resourceFlux)).flatMapMany(r->r.retrieveFlux(UploadStatus.class)).doOnNext(o-> System.out.println("上传进度:"+o));uploadStatusFlux.blockLast();}


http://chatgpt.dhexx.cn/article/uIaGHZIg.shtml

相关文章

响应式编程之网络新约:RSocket

响应式reactive是Java中高效应用的下一个前沿&#xff0c;但它目前主要有两个障碍&#xff1a;数据访问和网络。RSocket是一种新的第7层语言无关的应用网络协议&#xff08;解决后者&#xff09;&#xff0c;它由Facebook&#xff0c;Netifi和Pivotal等工程师开发&#xff0c;提…

一篇文章了解RSocket协议

RSocket是一个类似于HTTP的通讯协议。在了解Rsocket协议之前&#xff0c;先简单介绍下HTTP协议。 之所以推出springboot的技术&#xff0c;一个原因是因为前后端设计的分离。因为基于HTTP协议可以直接返回REST数据内容。 REST是一个简单且容易使用的异构处理架构&#xff0c;R…

RSocket 学习(一):初探

girl.jpg 一. RSocket 介绍 RSocket 是一种二进制字节流传输协议&#xff0c;位于 OSI 七层模型中的5、6层&#xff0c;对应 TCP/IP 模型中的应用层。RSocket 并没有规定必须使用何种底层传输层协议&#xff0c;开发者可以使用不同的底层传输协议&#xff0c;包括 TCP、WebSock…

RSocket——Http协议的替代者

1. 简介 RSocket是一种二进制的点对点通信协议&#xff0c;是一种新的网络通信第七层协议。旨在用于分布式应用程序中。从这个意义上讲&#xff0c;RSocket是HTTP等其他协议的替代方案。它是一种基于Reactive Streams规范具有异步&#xff0c;背压的双向&#xff0c;多路复用&…

java-语言学习-eclipse安装java汉化包

java的汉化&#xff1a; 1.打开链接&#xff1a;https://www.eclipse.org/babel/ 2.进入网页后&#xff0c;往下翻一点&#xff0c;看到一个download&#xff08;下图红框所示&#xff09;&#xff0c;点开。 3.进入一个页面后&#xff0c;往下翻一点点&#xff0c;看到有几…

安装Eclipse的中文语言包

安装Eclipse的中文语言包 下载语言包 1、进入网址 http://www.eclipse.org/babel 2、对应版本下载中文语言包 3、替换Eclipse软件安装文件的features、plugins&#xff0c;再次启动加载 4、替换出现The Eclipse executable launcher was unable to locate its companion l…

Eclipse 官方简体中文语言包下载地址及安装方法

转自: http://www.cnblogs.com/yaotong/archive/2011/12/28/2305421.html 打开Eclipse Babel Project 主页: http://www.eclipse.org/babel/downloads.php 根据Eclipse的版本找到相应的插件地址&#xff0c;复制下来。 进入Eclipse&#xff0c;选择Help->Install New Softwa…

Eclipse-中文语言包

Eclipse-中文语言包 效果图 版本使用中文语言包 下载过程 点击如下 打开插件市场 填写链接 填写此链接地址https://download.eclipse.org/technology/babel/update-site/latest/到下图 简体中文选择 简体中文选择 选择允许协议 安装代码提示插件codota 是什么 打开插件市场 搜…

Eclipse汉化 中文语言包下载安装 Babel Language Pack

相关链接 Java & Eclipse & Maven 使用配置方法 Eclipse平台上新建Java项目使用Junit测试 如何在Eclipse平台使用git从GitHub上下载文件至本地及管理本地git项目 Eclipse汉化 中文语言包下载安装 Babel Language Pack 点击进入Eclipse Babel Project 下拉可见 选择与自…

eclipse中文语言包安装(别看网上那些乱七八糟的,我这个最简单)

一、安装好JDK和eclispe。&#xff08;这个步骤不用多说了&#xff09; 二、步骤 1、找语言包并下载&#xff1a;https://www.eclipse.org/babel/downloads.php 找到汉化文件下载备用。 2、把下载好的文件复制到 eclipse的dropins文件夹中。 3、启动eclispe&#xff0c;汉化…

Eclipse语言包在官网下载不了-解决方案

可能家里的网络不好&#xff0c;在官网语言包一直出现这个问题&#xff0c;网上找了很久没找到解决方案&#xff0c;应该是家里网络和对方服务器不对付。就发现有下面这个选项&#xff0c;选了nanjing的节点就解决了。

eclipse汉化-设置语言包

Eclipse 是一个开放源代码的、基于Java的可扩展开发平台。就其本身而言&#xff0c;它只是一个框架和一组服务&#xff0c;用于通过插件组件构建开发环境。 一、通过插件的方式进行eclipse的汉化 1、登陆Eclipse Babel Project Downloads | The Eclipse Foundationhttps://ww…

基于遥感影像的道路提取论文、开源代码和数据集汇总

文章目录 前言2017DeepRoadMapperTopology Loss 2018RoadTraceriterative-deep-learning 2019Leveraging Crowdsourced GPS Data for Road Extraction from Aerial ImageryRoadNetRoadTaggerGenerative Graph Transformerroad_connectivityNL-LinkNet: Toward Lighter but More…

遥感影像云检测-云检测数据集信息及下载

常用云检测数据集信息及下载 1.LandSat7云量评估数据集2.LandSat8-Biome生物群落云量评估数据集3.LandSat8-38Cloud数据集4.高分系列-GF1-WHU遥感影像云数据集5.Sentinel-2 Cloud Mask Catalogue5.1.数据介绍5.2.数据集目录编排5.3.统计数据5.4.错误和不确定性 6.CESBIO数据集(…

AI实战营第二期 第七节 《语义分割与MMSegmentation》——笔记8

文章目录 摘要主要特性 案例什么是语义分割应用&#xff1a;无人驾驶汽车应用&#xff1a;人像分割应用&#xff1a;智能遥感应用 : 医疗影像分析 三种分割的区别语义分割的基本思路按颜色分割逐像素份分类全卷积网络 Fully Convolutional Network 2015存在问题 基于多层级特征…

【毕业设计_课程设计】基于 U-Net 网络的遥感图像语义分割(源码+论文)

文章目录 0 项目说明1 研究目的2 研究方法3 研究结论4 论文目录5 项目工程 0 项目说明 **基于 U-Net 网络的遥感图像语义分割 ** 提示&#xff1a;适合用于课程设计或毕业设计&#xff0c;工作量达标&#xff0c;源码开放 实验训练使用 Anaconda 版 Python 3.7 下的 TensorF…

EISeg——应用于语义分割的自动标注软件

1、基本介绍 EISeg(Efficient Interactive Segmentation)是以RITM及EdgeFlow算法为基础&#xff0c;基于飞桨开发的一个高效智能的交互式分割标注软件。涵盖了通用、人像、遥感、医疗等不同方向的高质量交互式分割模型&#xff0c;方便开发者快速实现语义及实例标签的标注&…

基于paddleSeg的自定义遥感数据语义分割——以DLRSD为例

转自AI Studio&#xff0c;原文链接&#xff1a;基于paddleSeg的自定义遥感数据语义分割——以DLRSD为例 - 飞桨AI Studio 基于paddleseg 2.1使用自定义数据集DLRSD,其他遥感数据集实现训练、测试、推理脚本版任务 数据格式准备 DLRSD数据集 基于UCMerced_LandUse数据集进行…

【计算机视觉】最全语义分割模型总结(从FCN到deeplabv3+)

文章目录 一、前言1.1 语义分割 二、FCN&#xff1a;CNN语义分割的开山之作2.1 结构2.2 特点 三、Deeplab_v13.1 前言3.2 特点 四、U-Net4.1 结构4.2 特点 五、Seg-Net5.1 结构5.2 特点 六、Deeplab_v26.1 结构6.2 特点6.3 Fcis6.3.1 特点 七、RefineNet7.1 结构7.2 特点 八、L…

语义分割网络系列2——Unet

目录 1 Unet网络介绍1.1 Unet论文1.2 简介1.3 6大特点 2 Unet网络3种不同的实现方式2.1 Unet网络的class实现&#xff08;mIou&#xff09;2.2 Unet网络的layer的实现&#xff08;mIou&#xff09;2.3 第3种实现方法&#xff08;存在问题&#xff0c;验证集准确率一直不变&…