From 424f9ba40c5302f7de4887c5960ebfeecfb86ad6 Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Wed, 28 Jul 2021 09:15:51 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96JdbcReactiveSqlExecutor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jdbc/JdbcReactiveSqlExecutor.java | 57 ++++++++----------- .../rdb/TestJdbcReactiveSqlExecutor.java | 14 +++-- 2 files changed, 33 insertions(+), 38 deletions(-) diff --git a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/executor/jdbc/JdbcReactiveSqlExecutor.java b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/executor/jdbc/JdbcReactiveSqlExecutor.java index ba521ad9..77154ce3 100644 --- a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/executor/jdbc/JdbcReactiveSqlExecutor.java +++ b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/executor/jdbc/JdbcReactiveSqlExecutor.java @@ -5,15 +5,13 @@ import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor; import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper; import org.reactivestreams.Publisher; -import org.slf4j.Logger; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.sql.Connection; -import java.util.stream.Collectors; -import static org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrappers.*; +import static org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrappers.consumer; @Slf4j public abstract class JdbcReactiveSqlExecutor extends JdbcSqlExecutor implements ReactiveSqlExecutor { @@ -22,48 +20,43 @@ public JdbcReactiveSqlExecutor() { super(log); } - public abstract Mono getConnection(SqlRequest sqlRequest); - - public abstract void releaseConnection(Connection connection, SqlRequest sqlRequest); + public abstract Mono getConnection(); @Override public Mono update(Publisher request) { - return Mono.defer(() -> toFlux(request) - .flatMap(sqlRequest -> getConnection(sqlRequest) - .flatMap(connection -> Mono.fromSupplier(() -> doUpdate(connection, sqlRequest)) - .doFinally((type) -> releaseConnection(connection, sqlRequest)))) - .collect(Collectors.summingInt(Integer::intValue))); + return getConnection() + .flatMap(connection -> toFlux(request) + .map(sql -> doUpdate(connection, sql)) + .reduce(Math::addExact)) + .defaultIfEmpty(0); } @Override public Mono execute(Publisher request) { - - return Mono.defer(() -> toFlux(request) - .flatMap(sqlRequest -> getConnection(sqlRequest) - .flatMap(connection -> - Mono.fromSupplier(() -> { - doExecute(connection, sqlRequest); - return null; - }).doFinally(type -> releaseConnection(connection, sqlRequest)))).then()); + return getConnection() + .flatMap(connection -> toFlux(request) + .doOnNext(sql -> doExecute(connection, sql)) + .then()); } @Override public Flux select(Publisher request, ResultWrapper wrapper) { - - return Flux.create(sink -> { - Disposable disposable = toFlux(request) - .doFinally(type -> sink.complete()) - .subscribe(sqlRequest -> getConnection(sqlRequest) - .subscribe(connection -> { - doSelect(connection, sqlRequest, consumer(wrapper, sink::next)); - releaseConnection(connection, sqlRequest); - })); - - sink.onCancel(disposable) - .onDispose(disposable); - }); + return Flux + .create(sink -> { + Disposable disposable = getConnection() + .flatMap(connection -> toFlux(request) + .doOnNext(sql -> doSelect(connection, sql, consumer(wrapper, sink::next))) + .then()) + .doOnError(sink::error) + .subscriberContext(sink.currentContext()) + .doOnSuccess(ignore -> sink.complete()) + .subscribe(); + + sink.onCancel(disposable) + .onDispose(disposable); + }); } protected Flux toFlux(Publisher request) { diff --git a/hsweb-easy-orm-rdb/src/test/java/org/hswebframework/ezorm/rdb/TestJdbcReactiveSqlExecutor.java b/hsweb-easy-orm-rdb/src/test/java/org/hswebframework/ezorm/rdb/TestJdbcReactiveSqlExecutor.java index 339d868f..d3191292 100644 --- a/hsweb-easy-orm-rdb/src/test/java/org/hswebframework/ezorm/rdb/TestJdbcReactiveSqlExecutor.java +++ b/hsweb-easy-orm-rdb/src/test/java/org/hswebframework/ezorm/rdb/TestJdbcReactiveSqlExecutor.java @@ -13,12 +13,14 @@ public class TestJdbcReactiveSqlExecutor extends JdbcReactiveSqlExecutor { private final ConnectionProvider provider; @Override - public Mono getConnection(SqlRequest sqlRequest) { - return Mono.fromSupplier(provider::getConnection); + public Mono getConnection() { + return Mono + .using( + provider::getConnection, + Mono::just, + provider::releaseConnect + ); } - @Override - public void releaseConnection(Connection connection, SqlRequest sqlRequest) { - provider.releaseConnect(connection); - } + }