RSocket基础开发demo
package com.pshdhx.rsocket;import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.util.DefaultPayload;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;@Slf4j
public class MessageRSocketHandler implements RSocket {@Overridepublic Mono<Void> fireAndForget(Payload payload) { //无响应 用于日志String message = payload.getDataUtf8(); //获取数据log.info("[fireAndForget]接收请求数据:{}",message);return Mono.empty();}@Overridepublic Mono<Payload> requestResponse(Payload payload) { //传统模式 有请求有响应String message = payload.getDataUtf8(); //获取数据log.info("[RequestAndResponse]接收请求数据:{}",message);return Mono.just(DefaultPayload.create("[echo]"+message));}@Overridepublic Flux<Payload> requestStream(Payload payload) { //处理流数据String message = payload.getDataUtf8(); //获取数据log.info("[RequestStream]接收请求数据:{}",message);return Flux.fromStream(message.chars() //将接收到的字符串转换为int型数据流.mapToObj(c->Character.toUpperCase(c)) //将里边的每一个字符编码大写.map(Object::toString)//将字符转为String.map(DefaultPayload::create)); //创建payload附加数据}@Overridepublic Flux<Payload> requestChannel(Publisher<Payload> publisher) { //双向流return Flux.from(publisher).map(Payload::getDataUtf8).map(msg->{log.info("【RequestChannel】接收请求数据:{}",msg);return msg;}).map(DefaultPayload::create);}@Overridepublic Mono<Void> metadataPush(Payload payload) {return null;}@Overridepublic Mono<Void> onClose() {return null;}@Overridepublic void dispose() {}
}
RSocket服务器开发
package com.pshdhx.controller;import com.pshdhx.message.MessageService;
import com.pshdhx.vo.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.time.Duration;@Controller
//不使用rest
@Slf4j
public class MessageFluxController {@Autowiredprivate MessageService messageService;@MessageMapping("message.echo")public Mono<Message> echoMessage(Mono<Message> message){return message.doOnNext(msg->this.messageService.echo(msg)) //响应处理.doOnNext(msg->log.info("消息接收{}",message));}@MessageMapping("message.delete")public void deleteMessage(Mono<String> title){title.doOnNext(msg->log.info("消息删除{}",msg)).subscribe();}@MessageMapping("message.list")public Flux<Message> listMessage(){return Flux.fromStream(this.messageService.list().stream());}@MessageMapping("message.get")public Flux<Message> getMessage(Flux<String> title){return title.doOnNext(t->log.info("消息查询{}",t)).map(titleInfo->titleInfo.toLowerCase()).map(this.messageService::get).delayElements(Duration.ofSeconds(1));}
}
RSocket客户端开发
配置策略和通信
package com.pshdhx.config;import io.rsocket.RSocket;
import io.rsocket.transport.netty.client.TcpClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.http.codec.cbor.Jackson2CborDecoder;
import org.springframework.http.codec.cbor.Jackson2CborEncoder;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;import java.time.Duration;@Configuration
public class RSocketConfig {/*** 配置策略,编码和解码*/@Beanpublic RSocketStrategies getRSocketStrategies(){return RSocketStrategies.builder().encoders(encoders -> encoders.add(new Jackson2CborEncoder())).decoders(decoders -> decoders.add(new Jackson2CborDecoder())).build();}/*** 配置RSocket连接策略*/@Beanpublic Mono<RSocketRequester> getRSocketRequester(RSocketRequester.Builder builder){return Mono.just(builder.rsocketConnector(rSocketConnector -> rSocketConnector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2)))).dataMimeType(MediaType.APPLICATION_CBOR).transport(TcpClientTransport.create(6869)));}}
客户端模拟调用
import com.pshdhx.AppClient;
import com.pshdhx.vo.Message;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.context.web.WebAppConfiguration;
import reactor.core.publisher.Mono;@ExtendWith(SpringExtension.class)
@WebAppConfiguration
@SpringBootTest(classes = AppClient.class)
public class TestRSocketClient {@Autowiredprivate Mono<RSocketRequester> requesterMono; //来进行服务调用@Testpublic void testEchoMessage(){ //测试服务响应this.requesterMono.map(r->r.route("message.echo").data(new Message("pshdhx","fighting"))).flatMap(r->r.retrieveMono(Message.class)).doOnNext(o-> System.out.println(o)).block();}@Testpublic void testDeleteMessage(){this.requesterMono.map(r->r.route("message.delete").data("pshdhx")).flatMap(RSocketRequester.RetrieveSpec::send).block();}@Testpublic void testListMessage(){this.requesterMono.map(r->r.route("message.list")).flatMapMany(r->r.retrieveFlux(Message.class)).doOnNext(o->System.out.println(o)).blockLast();}}
RSocket实现文件上传
RSocket协议由于本身是基于二进制传输,所以也提供了方便文件上传的处理支持。而在进行文件上传时,并不是将一个文件整体上传,而是采用了文件块(chunk)的形式进行上传文件的切割,利用Flux包裹要上传的一组文件块,而在服务器接收到该文件块之后,会通过专属的通道进行保存,同时也会将上传的状态发送到客户端。
common模块
package com.pshdhx.type;public enum UploadStatus {CHUNK_COMPLETED, //文件上传处理之中COMPLETED, //文件上传完毕FAILED;//失败}package com.pshdhx.constants;public class UploadConstants {public static final String MINE_FILE_NAME = "message/x.upload.file.name";public static final String MINE_FILE_EXTENSION = "message/x.upload.file.extension";public static final String FILE_NAME = "file.name";public static final String FILE_EXT = "file.ext";
}
服务器端:
package com.pshdhx.config;import com.pshdhx.constants.UploadConstants;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.codec.cbor.Jackson2CborDecoder;
import org.springframework.http.codec.cbor.Jackson2CborEncoder;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeType;@Configuration
public class RSocketServerConfig {@Beanpublic RSocketStrategies getRSocketStrategies(){return RSocketStrategies.builder().encoders(encoders -> encoders.add(new Jackson2CborEncoder())).decoders(decoders -> decoders.add(new Jackson2CborDecoder())).metadataExtractorRegistry(metadataExtractorRegistry -> {metadataExtractorRegistry.metadataToExtract(MimeType.valueOf(UploadConstants.MINE_FILE_NAME),String.class,UploadConstants.FILE_NAME);metadataExtractorRegistry.metadataToExtract(MimeType.valueOf(UploadConstants.MINE_FILE_EXTENSION),String.class,UploadConstants.MINE_FILE_EXTENSION);}).build();}
}
@Controller
//不使用rest
@Slf4j
public class MessageFluxController {@Autowiredprivate MessageService messageService;@Value("${output.file.path.upload}")private Path outputPath;@MessageMapping("message.upload")public Flux<UploadStatus> upload(@Headers Map<String,Object> metadata,@Payload Flux<DataBuffer> content) throws Exception{log.info("【上传后路径】outputPaht={}",this.outputPath);var fileName = metadata.get(UploadConstants.FILE_NAME);var fileExt = metadata.get(UploadConstants.MINE_FILE_EXTENSION);var path = Paths.get(fileName+"."+fileExt);log.info("【文件上传】fileName={}、fileExt = {},path={}",fileName,fileExt,path);AsynchronousFileChannel channel = AsynchronousFileChannel.open(this.outputPath.resolve(path),StandardOpenOption.CREATE, //文件创建StandardOpenOption.WRITE //文件写入);//异步文件通道return Flux.concat(DataBufferUtils.write(content,channel).map(s->UploadStatus.CHUNK_COMPLETED),Mono.just(UploadStatus.COMPLETED)).doOnComplete(()->{try {channel.close();}catch(Exception e){e.printStackTrace();}}).onErrorReturn(UploadStatus.FAILED);}
客户端:
@Value("classpath:/images/pic.jpg")private Resource resource;@Testpublic void testUpload(){String fileName = "pshdhx"+ UUID.randomUUID();String fileExt = this.resource.getFilename().substring(this.resource.getFilename().lastIndexOf(".")+1);Flux<DataBuffer> resourceFlux = DataBufferUtils.read(this.resource,new DefaultDataBufferFactory(),1024).doOnNext(s-> System.out.println("文件上传:"+s));Flux<UploadStatus> uploadStatusFlux = this.requesterMono.map(r->r.route("message.upload").metadata(metadataSpec -> {System.out.println("【上传测试:】文件名称"+fileName+"."+fileExt);metadataSpec.metadata(fileName, MimeType.valueOf(UploadConstants.MINE_FILE_NAME));metadataSpec.metadata(fileExt, MimeType.valueOf(UploadConstants.MINE_FILE_EXTENSION));}).data(resourceFlux)).flatMapMany(r->r.retrieveFlux(UploadStatus.class)).doOnNext(o-> System.out.println("上传进度:"+o));uploadStatusFlux.blockLast();}