From bda992f0b3cffa3045464cc6c383e93a6c968b4c Mon Sep 17 00:00:00 2001 From: Ahoo Wang Date: Tue, 13 Aug 2024 10:26:21 +0800 Subject: [PATCH] feat(mongo): Optimize the blocking calls of the responsive API (#611) --- .../cosid/mongo/reactive/BlockingAdapter.java | 18 +---- .../reactive/BlockingAdapterSubscriber.java | 72 +++++++++++++++++++ 2 files changed, 75 insertions(+), 15 deletions(-) create mode 100644 cosid-mongo/src/main/java/me/ahoo/cosid/mongo/reactive/BlockingAdapterSubscriber.java diff --git a/cosid-mongo/src/main/java/me/ahoo/cosid/mongo/reactive/BlockingAdapter.java b/cosid-mongo/src/main/java/me/ahoo/cosid/mongo/reactive/BlockingAdapter.java index 6fb1132da..1c5a7a06c 100644 --- a/cosid-mongo/src/main/java/me/ahoo/cosid/mongo/reactive/BlockingAdapter.java +++ b/cosid-mongo/src/main/java/me/ahoo/cosid/mongo/reactive/BlockingAdapter.java @@ -13,21 +13,15 @@ package me.ahoo.cosid.mongo.reactive; -import me.ahoo.cosid.CosId; - import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; import java.time.Duration; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public final class BlockingAdapter { public static final Duration DEFAULT_TIME_OUT = Duration.ofSeconds(10); - private static final Scheduler SCHEDULER = Schedulers.newSingle(CosId.COSID_PREFIX + BlockingAdapter.class.getSimpleName()); private BlockingAdapter() { } @@ -39,17 +33,11 @@ public static R block(Publisher publisher) { public static R block(Mono mono) { try { - - return mono.subscribeOn(SCHEDULER) - .timeout(DEFAULT_TIME_OUT) - .toFuture().get(DEFAULT_TIME_OUT.toMillis(), TimeUnit.MILLISECONDS); + BlockingAdapterSubscriber blockingAdapterSubscriber = new BlockingAdapterSubscriber<>(); + mono.subscribe(blockingAdapterSubscriber); + return blockingAdapterSubscriber.block(DEFAULT_TIME_OUT.toMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException | TimeoutException e) { throw new RuntimeException(e); - } catch (ExecutionException e) { - if (e.getCause() instanceof RuntimeException) { - throw (RuntimeException) e.getCause(); - } - throw new RuntimeException(e.getCause()); } } } diff --git a/cosid-mongo/src/main/java/me/ahoo/cosid/mongo/reactive/BlockingAdapterSubscriber.java b/cosid-mongo/src/main/java/me/ahoo/cosid/mongo/reactive/BlockingAdapterSubscriber.java new file mode 100644 index 000000000..c9d328c47 --- /dev/null +++ b/cosid-mongo/src/main/java/me/ahoo/cosid/mongo/reactive/BlockingAdapterSubscriber.java @@ -0,0 +1,72 @@ +/* + * Copyright [2021-present] [ahoo wang (https://github.com/Ahoo-Wang)]. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package me.ahoo.cosid.mongo.reactive; + +import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.SignalType; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class BlockingAdapterSubscriber extends BaseSubscriber { + private final CountDownLatch latch; + + private T value; + private Throwable error; + + public BlockingAdapterSubscriber() { + this.latch = new CountDownLatch(1); + } + + @Override + protected void hookOnNext(final T value) { + this.value = value; + } + + @Override + protected void hookOnError(Throwable throwable) { + this.error = throwable; + } + + public T getValue() { + return value; + } + + public T block(final long timeout, final TimeUnit unit) throws InterruptedException, TimeoutException { + return await(timeout, unit).getValue(); + } + + public Throwable getError() { + return error; + } + + @Override + protected void hookFinally(final SignalType type) { + latch.countDown(); + } + + public BlockingAdapterSubscriber await(final long timeout, final TimeUnit unit) throws TimeoutException, InterruptedException { + if (!latch.await(timeout, unit)) { + throw new TimeoutException("Timeout after " + timeout + " " + unit); + } + if (getError() != null) { + if (getError() instanceof RuntimeException) { + throw (RuntimeException) getError(); + } + throw new RuntimeException(getError()); + } + return this; + } +}