Skip to content

Commit

Permalink
优化JdbcReactiveSqlExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao committed Jul 28, 2021
1 parent 94b9fb8 commit 424f9ba
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@
import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.sql.Connection;
import java.util.stream.Collectors;

import static org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrappers.*;
import static org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrappers.consumer;

@Slf4j
public abstract class JdbcReactiveSqlExecutor extends JdbcSqlExecutor implements ReactiveSqlExecutor {
Expand All @@ -22,48 +20,43 @@ public JdbcReactiveSqlExecutor() {
super(log);
}

public abstract Mono<Connection> getConnection(SqlRequest sqlRequest);

public abstract void releaseConnection(Connection connection, SqlRequest sqlRequest);
public abstract Mono<Connection> getConnection();

@Override
public Mono<Integer> update(Publisher<SqlRequest> request) {

return Mono.defer(() -> toFlux(request)
.flatMap(sqlRequest -> getConnection(sqlRequest)
.flatMap(connection -> Mono.fromSupplier(() -> doUpdate(connection, sqlRequest))
.doFinally((type) -> releaseConnection(connection, sqlRequest))))
.collect(Collectors.summingInt(Integer::intValue)));
return getConnection()
.flatMap(connection -> toFlux(request)
.map(sql -> doUpdate(connection, sql))
.reduce(Math::addExact))
.defaultIfEmpty(0);

}

@Override
public Mono<Void> execute(Publisher<SqlRequest> request) {

return Mono.defer(() -> toFlux(request)
.flatMap(sqlRequest -> getConnection(sqlRequest)
.flatMap(connection ->
Mono.<Void>fromSupplier(() -> {
doExecute(connection, sqlRequest);
return null;
}).doFinally(type -> releaseConnection(connection, sqlRequest)))).then());
return getConnection()
.flatMap(connection -> toFlux(request)
.doOnNext(sql -> doExecute(connection, sql))
.then());
}

@Override
public <E> Flux<E> select(Publisher<SqlRequest> request, ResultWrapper<E, ?> wrapper) {

return Flux.create(sink -> {
Disposable disposable = toFlux(request)
.doFinally(type -> sink.complete())
.subscribe(sqlRequest -> getConnection(sqlRequest)
.subscribe(connection -> {
doSelect(connection, sqlRequest, consumer(wrapper, sink::next));
releaseConnection(connection, sqlRequest);
}));

sink.onCancel(disposable)
.onDispose(disposable);
});
return Flux
.create(sink -> {
Disposable disposable = getConnection()
.flatMap(connection -> toFlux(request)
.doOnNext(sql -> doSelect(connection, sql, consumer(wrapper, sink::next)))
.then())
.doOnError(sink::error)
.subscriberContext(sink.currentContext())
.doOnSuccess(ignore -> sink.complete())
.subscribe();

sink.onCancel(disposable)
.onDispose(disposable);
});
}

protected Flux<SqlRequest> toFlux(Publisher<SqlRequest> request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ public class TestJdbcReactiveSqlExecutor extends JdbcReactiveSqlExecutor {
private final ConnectionProvider provider;

@Override
public Mono<Connection> getConnection(SqlRequest sqlRequest) {
return Mono.fromSupplier(provider::getConnection);
public Mono<Connection> getConnection() {
return Mono
.using(
provider::getConnection,
Mono::just,
provider::releaseConnect
);
}

@Override
public void releaseConnection(Connection connection, SqlRequest sqlRequest) {
provider.releaseConnect(connection);
}

}

0 comments on commit 424f9ba

Please sign in to comment.