使用 RSocket——服务端主动调用客户端方法

article/2025/8/17 23:54:16

1. 编写客户端接收请求的逻辑

我们可以在初始化 Rsocket 实例的时候指定客户端可以被调用的方法,使用 acceptor() 指定可被调用的方法和方法使用的通信模型类型:

  • 通信类型为 RequestResponse 时:
    .acceptor(SocketAcceptor.forRequestResponse(payload -> {}))
    
  • 通信类型为 FireAndForget 时
    .acceptor(SocketAcceptor.forFireAndForget(payload -> {}))
    
  • 通信类型为 RequestStream 时
    .acceptor(SocketAcceptor.forRequestStream(payload -> {}))
    
  • 通信类型为 RequestStream 时
    .acceptor(SocketAcceptor.forRequestChannel(payloads ->Flux.from(payloads)...));
    

接下来编写客户端方法的处理逻辑,以 RequestResponse 为例

rsocket-demo/rsocket-client-raw/src/main/java/org/example/CallingTheClientSide.java at master · joexu01/rsocket-demo · GitHub

public static void main(String[] args) {final Logger logger = LoggerFactory.getLogger(RSocketClientRaw.class);// 随机生成 UUID 标识客户端UUID uuid = UUID.randomUUID();logger.info("My UUID is {}", uuid);// 生成 SETUP 阶段(建立连接时) Payload 使用的 route 信息ByteBuf setupRouteMetadata = encodeRoute("connect.setup");RSocket socket = RSocketConnector.create()// 设置 metadata MIME Type,方便服务端根据 MIME 类型确定 metadata 内容.metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())// SETUP 阶段的 Payload,data 里面存放 UUID.setupPayload(ByteBufPayload.create(ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, uuid.toString()),setupRouteMetadata))// 编写 Request&Response Acceptor.acceptor(SocketAcceptor.forRequestResponse(payload -> {String route = decodeRoute(payload.sliceMetadata());logger.info("[Client Acceptor] Received RequestResponse[route={}]", route);String metadataUtf8 = payload.getMetadataUtf8();String dataUtf8 = payload.getDataUtf8();logger.info("[Client Acceptor] This Req&Resp contains data: {}, metadata: {}", dataUtf8, metadataUtf8);payload.release();if ("request.status.callback".equals(route)) {return Mono.just(ByteBufPayload.create("Thanks for handling my task!"));} else if ("request.server.call".equals(route)) {return Mono.just(ByteBufPayload.create("You called my handler actively from server!"));}byte[] respBytes = String.format("Client received your message, but no handler matched. Your meta is %s and data is %s",metadataUtf8, dataUtf8).getBytes();return Mono.just(DefaultPayload.create(respBytes));}))// 设置重连策略.reconnect(Retry.backoff(2, Duration.ofMillis(500))).connect(TcpClientTransport.create(TcpClient.create().host("127.0.0.1").port(8099))).block();

在这里我们设置客户端能够接收 RequestResponse 类型的服务端请求,仔细观察可以看到,服务端发送的请求也是可以携带包含路由信息的 metadata 的,在客户端,我们也可以根据 Payload 中的路由信息将请求分发到不同方法中处理。

为了方便演示,如果服务端调用时指定的路由信息是 request.status.callback,那么服务端就是在完成一个由客户端发起的,异步执行的任务后调用客户端的回调函数返回任务执行结果。

如果服务端调用时指定的路由信息是 request.server.call,那么服务端就是在主动调用客户端以获取一些状态信息。

当然,使用上面的代码设置客户端可被调用的 RSocket 方法有一个局限性,那就是我们只能设置 RequestResponse FireAndForget RequestStream Channel 这四种通信模式的其中一种。也就是说,用这种方法,服务端无法同时向客户端发出 RequestResponse FireAndForget RequestStream Channel 请求。本文会在第四部分展示如何让客户端支持同时响应这四种通信模式。

2. 场景:客户端提交一个耗时任务,服务端完成任务后使用回调函数返回结果

如果客户端提交一个耗时任务,服务端可以接受这个任务然后立刻返回响应:“任务提交成功”,然后执行任务。当任务执行完,服务端再使用回调函数将结果返回给客户端。

我们不妨将执行任务的模块封装成一个 Spring Service:

@Service
public class RequestProcessor {private static final Logger logger = LoggerFactory.getLogger(RequestProcessor.class);public void processRequests(RSocketRequester rSocketRequester, UUID uuid) {logger.info("[RequestProcessor.processRequests]I'm handling this!");ByteBuf routeMetadata = TaggingMetadataCodec.createTaggingContent(ByteBufAllocator.DEFAULT, Collections.singletonList("request.status.callback"));Mono.just("Your request " + uuid + "  is completed").delayElement(Duration.ofSeconds(ThreadLocalRandom.current().nextInt(10, 15))).flatMap(m -> rSocketRequester.rsocketClient().requestResponse(Mono.just(ByteBufPayload.create(ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT,String.format("[TASK %s]This is a task result from server using spring.", uuid)),routeMetadata))).doOnSuccess(p -> logger.info("[RequestProcessor.processRequests]Received from client: {}", p.getDataUtf8()))).subscribe();}
}

这个 Service 中的方法接收一个 RSocketRequester 和一个 任务的 UUID,当任务完成时,这个方法会生成一个 Payload 存放任务结果,指定 metadata 中的路由信息为 request.status.callback。这样客户端在收到这个 RequestResponse 时就能知道这是一个已经提交任务的回调。在这里我们使用 delayElement 模拟处理任务时耗时的操作。

值得注意的是,RSocketRequester 参数的来源,我们在编写服务端接收任务提交的方法时可以将其作为参数,这是 Spring RSocket 的固定用法,这样就可以拿到服务端-客户端连接的 RSocketRequester 实例,然后就可以在 Service 中通过 RSocketRequester 实例调用客户端的回调函数:

@MessageMapping("handler.task")
public Mono<String> task(String request, RSocketRequester rSocketRequester) {logger.info("[handler.request]Client request: {}", request);UUID uuid = UUID.randomUUID();this.requestProcessor.processRequests(rSocketRequester, uuid);return Mono.just(uuid.toString());
}

3. 场景:服务端主动调用客户端获取信息

我们在【RSocket】使用 RSocket (一)——建立连接一文中已经在连接建立的时刻将客户端-服务端连接的 RSocketRequester 实例保存在一个 ConcurrentHashMap 中了。我们可以通过一些机制,比如定时任务,或者使用 REST API 向服务端下命令的方式,让服务端主动调用已经建立连接的客户端的 RSocket 方法。

在这个示例里,我们编写两个 REST API,一个 API 返回所有已连接到服务端的客户端信息,包括客户端 UUID、连接建立的时间等:

@ResponseBody
@GetMapping("/client/list")
public List<ConnectedClientDto> clientsInfo() {List<ConnectedClientDto> info = new ArrayList<>();RSocketController.clientsManager.clients.forEach((key, value) -> {info.add(new ConnectedClientDto(key, value.connectedTime));});return info;
}

另一个 API 用于触发服务端向客户端发送请求:

@GetMapping("/client/call")
public ServerResponse callFromServer(String clientRoute, String clientUUID) {RSocketRequester requester = RSocketController.clientsManager.getClientRequester(clientUUID);if (requester == null) {return new ServerResponse("failed: client rSocket has closed.");}ByteBuf routeMetadata = TaggingMetadataCodec.createTaggingContent(ByteBufAllocator.DEFAULT, Collections.singletonList(clientRoute));Mono.just("Server is calling you.")
//                .delayElement(Duration.ofSeconds(ThreadLocalRandom.current().nextInt(5, 10))).flatMap(m -> requester.rsocketClient().requestResponse(Mono.just(ByteBufPayload.create(ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT,"This is a message from server using spring-stack."),routeMetadata))).doOnSubscribe(subscription -> logger.info("subscribed.")).doOnError(throwable -> logger.error("Error when calling client: {}", throwable.toString())).doOnSuccess(p -> logger.info("[test.connect.requester]Received from client: {}.", p.getDataUtf8()))).subscribe();return new ServerResponse(String.format("request from server has sent to the client %s.", clientUUID));
}

我们首先启动服务端再启动客户端,然后测试上述两个 API:

  • 启动两个客户端和服务端后查看连接信息

  • 向其中一个客户端发送一个请求

    可以从客户端的输出看到客户端接收到了这次请求

4. 让客户端同时接收不同类型的请求

前面我们提到如果使用 .acceptor(SocketAcceptor.for...) 来添加客户端可以被调用的方法时,只能指定四种通信模式中的一种。

这时候,我们可以实现 io.rsocket.SocketAcceptor 接口,重写 accept 方法,accept 方法的返回值是 Mono<RSocket> ,我们可以实现 RSocket 接口并重写其中 fireAndForget requestResponse requestStream requestChannel 四个方法来达到让客户端同时接收四种通信模式的目的。

首先实现 RSocket 接口,并重写其中的方法:

// https://github.com/joexu01/rsocket-demo/blob/master/rsocket-client-raw/src/main/java/org/example/service/ClientService.java
public class ClientService implements RSocket {Logger logger = LoggerFactory.getLogger(ClientService.class);static String decodeRoute(ByteBuf metadata) {final RoutingMetadata routingMetadata = new RoutingMetadata(metadata);return routingMetadata.iterator().next();}@Overridepublic Mono<Void> fireAndForget(Payload payload) {logger.info("Receiving: " + payload.getDataUtf8());return Mono.empty();}@Overridepublic Mono<Payload> requestResponse(Payload payload) {logger.info("Receiving: " + payload.getDataUtf8());return Mono.just(DefaultPayload.create("Client received your RequestResponse"));}@Overridepublic Flux<Payload> requestStream(Payload payload) {return Flux.range(-5, 10).delayElements(Duration.ofMillis(500)).map(obj ->ByteBufPayload.create(ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, obj.toString())));}@Overridepublic Flux<Payload> requestChannel(Publisher<Payload> payloads) {return Flux.range(-5, 10).delayElements(Duration.ofMillis(500)).map(obj ->ByteBufPayload.create(ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, obj.toString())));}
}

这只是一个示例,如果业务需要也可以解析 Payload 中的 metadata 来实现路由。

接下来我们实现 RSocketAcceptor 接口:

// https://github.com/joexu01/rsocket-demo/blob/master/rsocket-client-raw/src/main/java/org/example/SocketAcceptorImpl.java
public class SocketAcceptorImpl implements SocketAcceptor {@Overridepublic Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {return Mono.just(new ClientService());}
}

然后我们在初始化客户端的时候这样设定 Acceptor 即可:

RSocket socket = RSocketConnector.create().acceptor(new SocketAcceptorImpl())

http://chatgpt.dhexx.cn/article/IUFFwLGh.shtml

相关文章

RSocket 与 gRPC性能对比

几乎每次我向观众介绍RSocket时&#xff0c;都会有人问这个问题&#xff1a;“ RSocket与gRPC相比如何&#xff1f;” 今天我们要找出答案。 搭建平台 插座 RSocket在应用程序网络上实现反应式语义。它是一种网络协议&#xff0c;可端对端实施反压力和其他反应流概念。 gR…

浅谈RSocket与响应式编程

简介&#xff1a; RSocket是高效一个二进制的网络通讯协议&#xff0c;能够满足很多场景下使用。另外&#xff0c;RSocket也是一个激进的响应式捍卫者&#xff0c;激进到连API都跟响应式无缝集成。本文我们将和大家分享RSocket与响应式编程。 作者 | 素渡 来源 | 阿里技术公众号…

阿里雷卷:RSocket从入门到落地,RSocket让AJP换发青春

考虑很久&#xff0c;决定还是写一下这篇文章&#xff0c;主要是 AJP 技术太老&#xff0c;我只能说 Long long ago &#xff0c;估计我在用这个技术的时候&#xff0c;很多同学小学还没有毕业。但是没有问题&#xff0c;这篇文章只是一个架构启发&#xff0c;不会浪费你时间让…

云原生实践之 RSocket 从入门到落地:Servlet vs RSocket

技术实践的作用在于&#xff1a;除了用于构建业务&#xff0c;也是为了验证某项技术或框架是否值得大规模推广。 本期开始&#xff0c;我们推出《RSocket 从入门到落地》系列文章&#xff0c;通过实例和对比来介绍RSocket。主要围绕RSocket如何实现Polyglot RPC、Service Regi…

RSocket 基于消息传递的反应式应用层网络协议

下面基于RSocket的一些主要特性分别做一下介绍&#xff0c;并和HTTP之类的常见协议进行比较&#xff1a; Multiplexed, Binary Protocol 多路复用的二进制协议Bidirectional Streaming 双向流Flow Control 流控制Socket Resumption 连接恢复Message passing 消息传递模型Trans…

RSocket 从入门到落地:两种微服务对比

✏️ Pic by Alibaba Tech on Facebook 技术实践的作用在于&#xff1a;除了用于构建业务&#xff0c;也是为了验证某项技术或框架是否值得大规模推广。 这是《RSocket 从入门到落地》系列文章的第三篇&#xff0c;来一起对比下开发微服务应用和微服务之间的网络通讯。该系列文…

开源的技术底座技术中台spring cloud Rsocket 微服务

一、项目背景 企业对新技术、用户体验、需求响应、交互协作提出了新的要求 1、应用新技术&#xff1a; 物联网、人工智能、大数据挖掘和分析、机器人、自动化等 2、重用核心能力&#xff1a; 使组织能够在其ERP核心解决方案的基础上进行构建&#xff0c;并为“下一步做什么”铺…

spring响应式编程13 RSocket:一种新的高性能网络通信协议

前面几讲我们讨论了如何使用 WebFlux 构建响应式 Web 服务的实现方案。WebFlux 和 WebMVC 一样&#xff0c;都是基于 HTTP 协议实现请求-响应式的交互方式。这种交互方案很简单&#xff0c;但不够灵活&#xff0c;也无法应对所有的响应式应用场景。那么&#xff0c;有没有在网络…

RSocket 学习(二):HTTP VS WebSocket VS RSocket

在比对 HTTP、WebSocket、RSocket 之前&#xff0c;我们先通过下面这张 OSI 七层模型的图快速梳理一下网络通信的面貌&#xff0c; 以便后续更好地理解它们。 osi model.png 一. HTTP 的特性 超文本传输协议&#xff08;英语&#xff1a;HyperText Transfer Protocol&#xff0…

基于RSocket协议实现客户端与服务端通信

RSocket基础开发demo package com.pshdhx.rsocket;import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.util.DefaultPayload; import lombok.extern.slf4j.Slf4j; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import …

响应式编程之网络新约:RSocket

响应式reactive是Java中高效应用的下一个前沿&#xff0c;但它目前主要有两个障碍&#xff1a;数据访问和网络。RSocket是一种新的第7层语言无关的应用网络协议&#xff08;解决后者&#xff09;&#xff0c;它由Facebook&#xff0c;Netifi和Pivotal等工程师开发&#xff0c;提…

一篇文章了解RSocket协议

RSocket是一个类似于HTTP的通讯协议。在了解Rsocket协议之前&#xff0c;先简单介绍下HTTP协议。 之所以推出springboot的技术&#xff0c;一个原因是因为前后端设计的分离。因为基于HTTP协议可以直接返回REST数据内容。 REST是一个简单且容易使用的异构处理架构&#xff0c;R…

RSocket 学习(一):初探

girl.jpg 一. RSocket 介绍 RSocket 是一种二进制字节流传输协议&#xff0c;位于 OSI 七层模型中的5、6层&#xff0c;对应 TCP/IP 模型中的应用层。RSocket 并没有规定必须使用何种底层传输层协议&#xff0c;开发者可以使用不同的底层传输协议&#xff0c;包括 TCP、WebSock…

RSocket——Http协议的替代者

1. 简介 RSocket是一种二进制的点对点通信协议&#xff0c;是一种新的网络通信第七层协议。旨在用于分布式应用程序中。从这个意义上讲&#xff0c;RSocket是HTTP等其他协议的替代方案。它是一种基于Reactive Streams规范具有异步&#xff0c;背压的双向&#xff0c;多路复用&…

java-语言学习-eclipse安装java汉化包

java的汉化&#xff1a; 1.打开链接&#xff1a;https://www.eclipse.org/babel/ 2.进入网页后&#xff0c;往下翻一点&#xff0c;看到一个download&#xff08;下图红框所示&#xff09;&#xff0c;点开。 3.进入一个页面后&#xff0c;往下翻一点点&#xff0c;看到有几…

安装Eclipse的中文语言包

安装Eclipse的中文语言包 下载语言包 1、进入网址 http://www.eclipse.org/babel 2、对应版本下载中文语言包 3、替换Eclipse软件安装文件的features、plugins&#xff0c;再次启动加载 4、替换出现The Eclipse executable launcher was unable to locate its companion l…

Eclipse 官方简体中文语言包下载地址及安装方法

转自: http://www.cnblogs.com/yaotong/archive/2011/12/28/2305421.html 打开Eclipse Babel Project 主页: http://www.eclipse.org/babel/downloads.php 根据Eclipse的版本找到相应的插件地址&#xff0c;复制下来。 进入Eclipse&#xff0c;选择Help->Install New Softwa…

Eclipse-中文语言包

Eclipse-中文语言包 效果图 版本使用中文语言包 下载过程 点击如下 打开插件市场 填写链接 填写此链接地址https://download.eclipse.org/technology/babel/update-site/latest/到下图 简体中文选择 简体中文选择 选择允许协议 安装代码提示插件codota 是什么 打开插件市场 搜…

Eclipse汉化 中文语言包下载安装 Babel Language Pack

相关链接 Java & Eclipse & Maven 使用配置方法 Eclipse平台上新建Java项目使用Junit测试 如何在Eclipse平台使用git从GitHub上下载文件至本地及管理本地git项目 Eclipse汉化 中文语言包下载安装 Babel Language Pack 点击进入Eclipse Babel Project 下拉可见 选择与自…

eclipse中文语言包安装(别看网上那些乱七八糟的,我这个最简单)

一、安装好JDK和eclispe。&#xff08;这个步骤不用多说了&#xff09; 二、步骤 1、找语言包并下载&#xff1a;https://www.eclipse.org/babel/downloads.php 找到汉化文件下载备用。 2、把下载好的文件复制到 eclipse的dropins文件夹中。 3、启动eclispe&#xff0c;汉化…