Skip to content

Commit

Permalink
Fix query resource clear async thread stuck bug
Browse files Browse the repository at this point in the history
  • Loading branch information
JackieTien97 authored Nov 9, 2024
1 parent 4be1020 commit f706fa9
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.base.Verify.verify;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.lang.Boolean.TRUE;
import static org.apache.iotdb.db.queryengine.execution.operator.Operator.NOT_BLOCKED;
Expand Down Expand Up @@ -325,15 +326,16 @@ private <T> Optional<T> tryWithLock(
return Optional.empty();
}

Optional<T> result;
T result = null;
Throwable failure = null;
try {
result = Optional.of(task.get());
result = task.get();
// opportunistic check to avoid unnecessary lock reacquisition
destroyIfNecessary();
} catch (Throwable t) {
failure = t;
} finally {
try {
destroyIfNecessary();
} finally {
exclusiveLock.unlock();
}
exclusiveLock.unlock();
}

// We need to recheck whether the state is NEED_DESTRUCTION, if so, destroy the driver.
Expand All @@ -346,12 +348,25 @@ private <T> Optional<T> tryWithLock(
if (state.get() == State.NEED_DESTRUCTION && exclusiveLock.tryLock(interruptOnClose)) {
try {
destroyIfNecessary();
} catch (Throwable t) {
if (failure == null) {
failure = t;
} else if (failure != t) {
failure.addSuppressed(t);
}
} finally {
exclusiveLock.unlock();
}
}

return result;
if (failure != null) {
throwIfUnchecked(failure);
// should never happen
throw new AssertionError(failure);
}

verify(result != null, "result is null");
return Optional.of(result);
}

@SuppressWarnings({"squid:S1181", "squid:S112"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
import org.apache.iotdb.udf.api.type.Type;

import java.util.NoSuchElementException;

/** calculate the exact or approximate median. */
public class UDAFMedian implements UDTF {

Expand Down Expand Up @@ -73,10 +75,14 @@ public void transform(Row row, PointCollector collector) throws Exception {

@Override
public void terminate(PointCollector collector) throws Exception {
if (exact) {
collector.putDouble(0, statistics.getMedian());
} else {
collector.putDouble(0, sketch.query(0.5));
try {
if (exact) {
collector.putDouble(0, statistics.getMedian());
} else {
collector.putDouble(0, sketch.query(0.5));
}
} catch (NoSuchElementException e) {
// just ignore it
}
}
}

0 comments on commit f706fa9

Please sign in to comment.