什么是WebFlux
springWebFlux 是 SpringFrameworlk5.0 添加的新功能,它是完全非阻塞的,支持Reactive Stream及背压,可以运行于Netty、Undertow等服务器,及Servlet 3.1+容器。
webflux主要在如下两方面体现出独有的优势:
1)非阻塞式
其实在servlet3.1提供了非阻塞的API,WebFlux提供了一种比其更完美的解决方案。使用非阻塞的方式可以利用较小的线程或硬件资源来处理并发进而提高其可伸缩性
2) 函数式编程端点
老生常谈的编程方式了,Spring5必须让你使用java8,那么函数式编程就是java8重要的特点之一,而WebFlux支持函数式编程来定义路由端点处理请求。
WebFlux与SpringMVC
区别:
- WebFlux是完全异步非阻塞的,SpringMVC是同步阻塞的。
- WebFlux采用异步响应式编程,SpringMVC采用命令式编程。
- WebFlux由于完全异步,所有操作数据库的框架,以及数据库也都要求是支持异步的,所以目前不支持Mybatis、不支持Oracle数据库。
WebFlux学习步骤
使用WebFlux需要有以下的技术基础,才能快速上手。
- Java8的新特性(Lambda表达式、Stream API、函数式编程)
- Java9(Reactive Stream)
- Project reactor
- WebFlux
怎么用
Java8的新特性
请看这篇博客:Java8新特性
Java9的Reactive Stream
reactive stream就是一个异步stream处理的标准,它的特点就是非阻塞的back pressure。reactive stream需要做哪些事情:
- 能够处理无效数量的消息
- 消息处理是有顺序的
- 可以异步的在组件之间传递消息
- 一定是非阻塞和backpressure的
为了实现这4个功能,reactive stream定义了4个接口,Publisher,Subscriber,Subscription,Processor。这四个接口实际上是一个观察者模式的实现。
JDK中reactive stream的实现:
在JDK中java.util.concurrent.Flow就是reactive stream语义的一种实现。Flow从JDK9就开始有了。我们看下它的结构:
package com.zx.webflux.reactive; import org.junit.Test; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; /** * @Description jdk9 reactive stream的使用 * @Author zx * @Date 2021/9/13 9:03 */ public class ReactiveDemo { @Test public void test1(){ //创建发布者 SubmissionPublisher<String> publisher = new SubmissionPublisher<String>(); //创建订阅者 Flow.Subscriber subscriber = new Flow.Subscriber() { private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { //subscription 订阅协议 System.out.println("建立订阅关系第一次调用!"); this.subscription = subscription; //可控制请求数量 this.subscription.request(1); } @Override public void onNext(Object item) { System.out.println("接受数据:" +item); this.subscription.request(1); //this.subscription.cancel(); } @Override public void onError(Throwable throwable) { System.out.println("onError"); } @Override public void onComplete() { System.out.println("数据接收完成!"); } }; //建立订阅关系 publisher.subscribe(subscriber); try { //发数据 for (int i = 0; i < 10; i++) { System.out.println("开始发送数据:" + "reactive" + i); publisher.submit("hello reactive" + i); } Thread.currentThread().join(); } catch (Exception e) { e.printStackTrace(); }finally { //结束 publisher.close(); } } } |
Project reactor
Project Reactor是一个运行在JVM上的反应式编程基础库,以“背压”的形式管理数据处理,提供了可组合的异步序列APIFlux和Mono。同时,它也实现了Reactive Streams 规范。
Flux<T>是一个标准的Reactive Streams规范中的Publisher<T>,它代表一个包含了[0…N]个元素的异步序列流。在Reactive Streams规范中,针对流中每个元素,订阅者将会监听这三个事件:onNext、onComplete、onError。
Mono<T>是一个特殊的Flux<T>,它代表一个仅包含1个元素的异步序列流。因为只有一个元素,所以订阅者只需要监听onComplete、onError。
Maven依赖:
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>Bismuth-RELEASE</version> <type>pom</type> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.3.16.RELEASE</version> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <version>3.3.16.RELEASE</version> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency> |
package com.zx.webflux.reactor; import org.junit.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.Arrays; /** * @Description ReactorDemo * @Author zx * @Date 2021/9/13 9:56 */ public class ReactorDemo { @Test public void test1(){ //发布者 Mono<String> mono = Mono.just("hello mono"); //订阅 mono.subscribe((x) -> System.out.println("1个元素:" + x)); // System.out.println("------------------------------"); // Mono<String> mono1 = Mono.empty(); // mono1.subscribe((x) -> System.out.println("空的元素:" + x)); System.out.println("------------------------------"); //Flux Flux<String> flux1 = Flux.just("hello flux1","hello flux2","hello flux3"); flux1.subscribe((x) -> System.out.println("多个元素:" + x)); System.out.println("------------------------------"); //Flux.fromIterable Flux<String> flux2 = Flux.fromIterable(Arrays.asList("flux1","flux2","flux3")); flux2.subscribe((x) -> System.out.println("多个元素:" + x)); System.out.println("------------------------------"); //通过程序创建 Flux<String> flux3 = Flux.generate(() -> 0, (state, sink) -> { sink.next("3 x " + state + " = " + 3 * state); if (state == 10) { sink.complete(); } return state + 1; }); flux3.subscribe(System.out::println); } } |
WebFlux
Maven依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connection-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-r2dbc</artifactId> </dependency> <dependency> <groupId>dev.miku</groupId> <artifactId>r2dbc-mysql</artifactId> <scope>runtime</scope> </dependency> |
WebFlux的2种实现方式
注解的方式
package com.zx.demo.annotate; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; /** * @Description TODO * @Author zx * @Date 2021/9/15 10:16 */ @SpringBootApplication @RestController @RequestMapping("annt") public class AnnoControllerAppliaction { public static void main(String[] args) { SpringApplication.run(AnnoControllerAppliaction.class, args); } @GetMapping("/get") public Mono<String> get(){ return Mono.just("webflux annotation"); } } |
编码的方式
package com.zx.demo.function; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.server.*; /** * @Description 路由 (相当于配置映射关系(url --> 业务方法)) * @Author zx * @Date 2021/9/15 10:26 */ @Configuration public class WebFluxRouting { @Bean public RouterFunction<ServerResponse> route(WebFluxHandler webFluxHandler){ return RouterFunctions.route(RequestPredicates.GET("/hello") .and(RequestPredicates .accept(MediaType.TEXT_PLAIN)), webFluxHandler::hello); } } |
package com.zx.demo.function; import org.springframework.http.MediaType; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; import reactor.core.publisher.Mono; /** * @Description * @Author zx * @Date 2021/9/15 10:25 */ @Component public class WebFluxHandler { public Mono<ServerResponse> hello(ServerRequest serverRequest){ return ServerResponse.ok() .contentType(MediaType.TEXT_PLAIN) .body(BodyInserters.fromValue("hello webflux")); } } |
事务的2种实现方式
注解的方式
编码的方式
SpringBoot + WebFlux + r2dbc增删改查
项目结构
package com.zx.demo.crud.config; import io.r2dbc.spi.ConnectionFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.r2dbc.connection.R2dbcTransactionManager; import org.springframework.transaction.ReactiveTransactionManager; import org.springframework.transaction.reactive.TransactionalOperator; /** * @Description 事务配置类 * @Author zx * @Date 2021/9/16 15:16 */ @Configuration public class R2dbcConfiguration { @Bean public ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory){ return (new R2dbcTransactionManager(connectionFactory)); } @Bean public TransactionalOperator transactionalOperator(ReactiveTransactionManager transactionManager){ return TransactionalOperator.create(transactionManager); } } |
package com.zx.demo.crud.config; /** * @Description 所有url * @Author zx * @Date 2021/9/15 11:20 */ public class RoutePath { public static final String root = "/rout/product"; public static final String save = root + "/save"; public static final String find = root + "/find/{productId}"; public static final String delete = root + "/delete/{productId}"; public static final String update = root + "/update/{productId}"; public static final String findAll = root + "/findAll"; public static final String findByPage = root + "/findByPage"; public static final String saveMany = root + "/saveMany"; } |
package com.zx.demo.crud.dao; import org.springframework.data.r2dbc.repository.Query; import org.springframework.data.repository.reactive.ReactiveCrudRepository; import com.zx.demo.crud.domain.Product; import reactor.core.publisher.Flux; /** * @Description TODO * @Author zx * @Date 2021/9/15 10:57 */ public interface ProductRepository extends ReactiveCrudRepository<Product,Long>{ //分页查询 @Query("select * from t_product where category = :category limit :page,:pageSize") Flux<Product> findByPage(String category,int page,int pageSize); } |
package com.zx.demo.crud.domain; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import org.springframework.data.annotation.Id; import org.springframework.data.relational.core.mapping.Table; import java.math.BigDecimal; /** * @Description TODO * @Author zx * @Date 2021/9/15 10:58 */ @Data @AllArgsConstructor @NoArgsConstructor @Builder @Table("t_product") public class Product { @Id private Long id; private BigDecimal price; private String name; private Integer stock; private String category; } |
package com.zx.demo.crud.domain; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; /** * @Description 分页参数 * @Author zx * @Date 2021/9/15 10:58 */ @Data @AllArgsConstructor @NoArgsConstructor @Builder public class ProductPage { private String category; private Integer page; private Integer pageSize; } |
package com.zx.demo.crud.reative; import com.zx.demo.crud.config.RoutePath; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.RouterFunctions; import org.springframework.web.reactive.function.server.ServerResponse; /** * @Description 路由 * @Author zx * @Date 2021/9/15 11:17 */ @Configuration public class BaseRouter { @Bean public RouterFunction<ServerResponse> route(MyWebHandler myWebHandler){ return RouterFunctions.route() .POST(RoutePath.save, myWebHandler::save) .GET(RoutePath.find, myWebHandler::find) .DELETE(RoutePath.delete, myWebHandler::delete) .PUT(RoutePath.update, myWebHandler::update) .GET(RoutePath.findAll, myWebHandler::findAll) .GET(RoutePath.findByPage, myWebHandler::findByPage) .POST(RoutePath.saveMany, myWebHandler::saveMany) .build(); } } |
package com.zx.demo.crud.reative; import com.zx.demo.crud.domain.Product; import com.zx.demo.crud.domain.ProductPage; import com.zx.demo.crud.service.ProductService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.reactive.TransactionalOperator; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; import reactor.core.publisher.Mono; /** * @Description TODO * @Author zx * @Date 2021/9/15 11:10 */ @Component public class MyWebHandler { @Autowired private ProductService productService; @Autowired private TransactionalOperator transactionalOperator; /** * * 功能描述: 商品保存 * * @param: * @return: * @auther: zx * @date: 2021/9/15 11:17 */ public Mono<ServerResponse> save(ServerRequest serverRequest){ return serverRequest.bodyToMono(Product.class) .flatMap(i -> productService.save(i)) .flatMap(p -> ServerResponse.ok().bodyValue(p)); } /** * * 功能描述: 商品查询 * * @param: * @return: * @auther: zx * @date: 2021/9/15 11:17 */ public Mono<ServerResponse> find(ServerRequest serverRequest){ Long productId = Long.parseLong(serverRequest.pathVariable("productId")); return productService.find(productId) .flatMap(p -> ServerResponse.ok().bodyValue(p)) .switchIfEmpty(ServerResponse.notFound().build()); } /** * * 功能描述: 商品刪除 * * @param: * @return: * @auther: zx * @date: 2021/9/15 11:17 */ public Mono<ServerResponse> delete(ServerRequest serverRequest){ long productId = Long.parseLong(serverRequest.pathVariable("productId")); return productService.find(productId) .flatMap(p -> productService.delete(p.getId()).then(ServerResponse.ok().build())) .switchIfEmpty(ServerResponse.notFound().build()); } /** * * 功能描述: 商品更新 * * @param: * @return: * @auther: zx * @date: 2021/9/15 11:17 */ public Mono<ServerResponse> update(ServerRequest serverRequest){ long productId = Long.parseLong(serverRequest.pathVariable("productId")); return productService.find(productId) .flatMap(p -> serverRequest.bodyToMono(Product.class) .flatMap(i -> productService.save(i))) .flatMap(p -> ServerResponse.ok().bodyValue(p)) .switchIfEmpty(ServerResponse.notFound().build()); } /** * * 功能描述: 查詢所有 * * @param: * @return: * @auther: zx * @date: 2021/9/15 11:17 */ public Mono<ServerResponse> findAll(ServerRequest serverRequest){ return ServerResponse.ok().body(productService.findAll(), Product.class); } /** * * 功能描述: 分页查询 * * @param: * @return: * @auther: zx * @date: 2021/9/15 11:17 */ public Mono<ServerResponse> findByPage(ServerRequest serverRequest){ return serverRequest.bodyToMono(ProductPage.class) .flatMap(p -> { System.out.println(p); return ServerResponse.ok().body(productService.findByPage(p), Product.class); }); } /** * * 功能描述: 保存多个商品 --事务测试 * * @param: * @return: * @auther: zx * @date: 2021/9/15 11:09 */ @Transactional public Mono<ServerResponse> saveMany(ServerRequest serverRequest){ return serverRequest.bodyToFlux(Product.class) .flatMap(i -> productService.saveMany(i)) .then(ServerResponse.ok().bodyValue("ok")); // .as(transactionalOperator::transactional); } } |
package com.zx.demo.crud.service; import com.zx.demo.crud.dao.ProductRepository; import com.zx.demo.crud.domain.Product; import com.zx.demo.crud.domain.ProductPage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** * @Description TODO * @Author zx * @Date 2021/9/15 11:08 */ @Service public class ProductService { @Autowired private ProductRepository productRepository; /** * * 功能描述: 商品保存 * * @param: * @return: * @auther: zx * @date: 2021/9/15 11:09 */ public Mono<Product> save(Product product){ return productRepository.save(product); } /** * * 功能描述: 商品查询 * * @param: * @return: * @auther: zx * @date: 2021/9/15 11:09 */ public Mono<Product> find(Long id){ return productRepository.findById(id); } /** * * 功能描述: 商品删除 * * @param: * @return: * @auther: zx * @date: 2021/9/15 11:09 */ public Mono<Void> delete(Long productId){ return productRepository. existsById(productId) .flatMap(p -> { if(p){ return productRepository.deleteById(productId); } return Mono.empty(); } ); } /** * * 功能描述: 所有商品 * * @param: * @return: * @auther: zx * @date: 2021/9/15 11:09 */ public Flux<Product> findAll(){ return productRepository.findAll(); } /** * * 功能描述: 所有商品--分页查询 * * @param: * @return: * @auther: zx * @date: 2021/9/15 11:09 */ public Flux<Product> findByPage(ProductPage productPage){ return productRepository.findByPage(productPage.getCategory(),productPage.getPage() ,productPage.getPageSize()); } /** * * 功能描述: 保存多个商品 --事务处理 * * @param: * @return: * @auther: zx * @date: 2021/9/15 11:09 */ public Mono<Product> saveMany(Product product){ return productRepository.save(product); } } |
/** * @Description TODO * @Author zx * @Date 2021/9/15 10:26 */ @SpringBootApplication //开启事务的注解 @EnableTransactionManagement public class WebfluxApplication { public static void main(String[] args) { SpringApplication.run(WebfluxApplication.class,args); } } |