Skip to content

Commit

Permalink
change: remove old worker pool
Browse files Browse the repository at this point in the history
  • Loading branch information
ishland committed Nov 7, 2023
1 parent 8e782c9 commit ba31ea5
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.ishland.c2me.base.ModuleEntryPoint;
import com.ishland.c2me.base.common.util.C2MENormalWorkerThreadFactory;
import com.ishland.flowsched.executor.ExecutorManager;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -16,27 +13,12 @@
public class GlobalExecutors {

// private static final C2MEForkJoinWorkerThreadFactory factory = new C2MEForkJoinWorkerThreadFactory("c2me", "C2ME worker #%d", Thread.NORM_PRIORITY - 1);
private static final C2MENormalWorkerThreadFactory factory = new C2MENormalWorkerThreadFactory("c2me", "C2ME worker #%d", Thread.NORM_PRIORITY - 1);
public static final int GLOBAL_EXECUTOR_PARALLELISM = (int) ModuleEntryPoint.globalExecutorParallelism;
// public static final ForkJoinPool executor = new ForkJoinPool(
// GLOBAL_EXECUTOR_PARALLELISM,
// factory,
// null,
// true
// );
public static final ExecutorService executor = Executors.newFixedThreadPool(GLOBAL_EXECUTOR_PARALLELISM, factory);
private static final AtomicInteger prioritizedSchedulerCounter = new AtomicInteger(0);
public static final ExecutorManager prioritizedScheduler = new ExecutorManager(GlobalExecutors.GLOBAL_EXECUTOR_PARALLELISM, thread -> {
thread.setDaemon(true);
thread.setName("c2me-prioritized-%d".formatted(prioritizedSchedulerCounter.getAndIncrement()));
});
public static final Executor invokingExecutor = r -> {
if (Thread.currentThread().getThreadGroup() == factory.getThreadGroup()) {
r.run();
} else {
executor.execute(r);
}
};

public static final ExecutorService asyncScheduler = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.ishland.c2me.base.common.scheduler;

import com.ishland.flowsched.executor.Task;
import it.unimi.dsi.fastutil.objects.ReferenceArrayList;

import java.util.Objects;

public abstract class AbstractPosAwarePrioritizedTask implements Task {

protected final ReferenceArrayList<Runnable> postExec = new ReferenceArrayList<>(4);
private final long pos;
private int priority = Integer.MAX_VALUE;

public AbstractPosAwarePrioritizedTask(long pos) {
this.pos = pos;
}

@Override
public int priority() {
return this.priority;
}

public void setPriority(int priority) {
this.priority = priority;
}

public long getPos() {
return this.pos;
}

public void addPostExec(Runnable runnable) {
synchronized (this.postExec) {
postExec.add(Objects.requireNonNull(runnable));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
package com.ishland.c2me.base.common.scheduler;

import com.ishland.flowsched.executor.LockToken;
import com.ishland.flowsched.executor.Task;
import it.unimi.dsi.fastutil.objects.ReferenceArrayList;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

public class ScheduledTask<T> implements Task {
public class ScheduledTask<T> extends AbstractPosAwarePrioritizedTask {

private final long pos;
private final Supplier<CompletableFuture<T>> action;
private final LockToken[] lockTokens;
private final CompletableFuture<T> future = new CompletableFuture<>();
private final ReferenceArrayList<Runnable> postExec = new ReferenceArrayList<>(4);
private int priority = Integer.MAX_VALUE;

public ScheduledTask(long pos, Supplier<CompletableFuture<T>> action, LockToken[] lockTokens) {
this.pos = pos;
super(pos);
this.action = action;
this.lockTokens = lockTokens;
}
Expand Down Expand Up @@ -46,30 +40,11 @@ public void propagateException(Throwable t) {
future.completeExceptionally(t);
}

public void addPostExec(Runnable runnable) {
synchronized (this.postExec) {
postExec.add(Objects.requireNonNull(runnable));
}
}

@Override
public LockToken[] lockTokens() {
return this.lockTokens;
}

@Override
public int priority() {
return this.priority;
}

public void setPriority(int priority) {
this.priority = priority;
}

public long getPos() {
return this.pos;
}

public CompletableFuture<T> getFuture() {
return this.future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ public class SchedulingManager {
private static final AtomicInteger COUNTER = new AtomicInteger(0);

public static final int MAX_LEVEL = ChunkLevels.INACCESSIBLE + 1;
private final Long2ReferenceOpenHashMap<ObjectArraySet<ScheduledTask<?>>> pos2Tasks = new Long2ReferenceOpenHashMap<>();
private final SimpleObjectPool<ObjectArraySet<ScheduledTask<?>>> pos2TasksPool = new SimpleObjectPool<>(unused -> new ObjectArraySet<>(), ObjectArraySet::clear, ObjectArraySet::clear, 2048);
private final Long2ReferenceOpenHashMap<ObjectArraySet<AbstractPosAwarePrioritizedTask>> pos2Tasks = new Long2ReferenceOpenHashMap<>();
private final SimpleObjectPool<ObjectArraySet<AbstractPosAwarePrioritizedTask>> pos2TasksPool = new SimpleObjectPool<>(unused -> new ObjectArraySet<>(), ObjectArraySet::clear, ObjectArraySet::clear, 2048);
private final Long2IntOpenHashMap prioritiesFromLevel = new Long2IntOpenHashMap();
private final Object schedulingMutex = new Object();
private final int id = COUNTER.getAndIncrement();
Expand All @@ -33,16 +33,16 @@ public SchedulingManager(Executor executor) {
this.executor = executor;
}

public void enqueue(ScheduledTask<?> task) {
public void enqueue(AbstractPosAwarePrioritizedTask task) {
synchronized (this.schedulingMutex) {
final long pos = task.getPos();
final ObjectArraySet<ScheduledTask<?>> locks = this.pos2Tasks.computeIfAbsent(pos, unused -> this.pos2TasksPool.alloc());
final ObjectArraySet<AbstractPosAwarePrioritizedTask> locks = this.pos2Tasks.computeIfAbsent(pos, unused -> this.pos2TasksPool.alloc());
locks.add(task);
updatePriorityInternal(pos);
}
task.addPostExec(() -> {
synchronized (this.schedulingMutex) {
final ObjectArraySet<ScheduledTask<?>> tasks = this.pos2Tasks.get(task.getPos());
final ObjectArraySet<AbstractPosAwarePrioritizedTask> tasks = this.pos2Tasks.get(task.getPos());
if (tasks != null) {
tasks.remove(task);
if (tasks.isEmpty()) {
Expand All @@ -55,6 +55,14 @@ public void enqueue(ScheduledTask<?> task) {
GlobalExecutors.prioritizedScheduler.schedule(task);
}

public void enqueue(long pos, Runnable command) {
this.enqueue(new WrappingTask(pos, command));
}

public Executor positionedExecutor(long pos) {
return command -> this.enqueue(pos, command);
}

public void updatePriorityFromLevel(long pos, int level) {
this.executor.execute(() -> {
synchronized (this.schedulingMutex) {
Expand Down Expand Up @@ -84,9 +92,9 @@ private void updatePriorityInternal(long pos) {
fromSyncLoad = MAX_LEVEL;
}
int priority = Math.min(fromLevel, fromSyncLoad);
final ObjectArraySet<ScheduledTask<?>> locks = this.pos2Tasks.get(pos);
final ObjectArraySet<AbstractPosAwarePrioritizedTask> locks = this.pos2Tasks.get(pos);
if (locks != null) {
for (ScheduledTask<?> lock : locks) {
for (AbstractPosAwarePrioritizedTask lock : locks) {
lock.setPriority(priority);
GlobalExecutors.prioritizedScheduler.notifyPriorityChange(lock);
}
Expand All @@ -109,10 +117,6 @@ public void setCurrentSyncLoad(ChunkPos pos) {
});
}

public Executor getExecutor() {
return executor;
}

public int getId() {
return this.id;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.ishland.c2me.base.common.scheduler;

import com.ishland.flowsched.executor.LockToken;

import java.util.Objects;

public class WrappingTask extends AbstractPosAwarePrioritizedTask {

private static final LockToken[] EMPTY_LOCK_TOKENS = new LockToken[0];

private final Runnable wrapped;

public WrappingTask(long pos, Runnable wrapped) {
super(pos);
this.wrapped = Objects.requireNonNull(wrapped);
}

@Override
public void run() {
try {
wrapped.run();
} finally {
for (Runnable runnable : this.postExec) {
try {
runnable.run();
} catch (Throwable t1) {
t1.printStackTrace();
}
}
}
}

@Override
public void propagateException(Throwable t) {
t.printStackTrace();
}

@Override
public LockToken[] lockTokens() {
return EMPTY_LOCK_TOKENS;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,14 @@ private boolean handlePendingReads() {
future.complete(null);
} else if (cached.left().isPresent()) {
if (scanner != null) {
GlobalExecutors.executor.execute(() -> {
GlobalExecutors.prioritizedScheduler.schedule(() -> {
try {
cached.left().get().accept(scanner);
future.complete(null);
} catch (Throwable t) {
future.completeExceptionally(t);
}
});
}, 16);
} else {
future.complete(cached.left().get());
}
Expand All @@ -227,7 +227,7 @@ private boolean handlePendingReads() {
SneakyThrow.sneaky(e);
return null; // unreachable
}
}, GlobalExecutors.executor)
}, GlobalExecutors.prioritizedScheduler.executor(16))
.thenAccept(future::complete)
.exceptionally(throwable -> {
future.completeExceptionally(throwable);
Expand Down Expand Up @@ -293,7 +293,7 @@ private void scheduleChunkRead(long pos, CompletableFuture<NbtCompound> future,
SneakyThrow.sneaky(t);
return null; // Unreachable anyway
}
}, GlobalExecutors.executor).handle((compound, throwable) -> {
}, GlobalExecutors.prioritizedScheduler.executor(16)).handle((compound, throwable) -> {
if (throwable != null) future.completeExceptionally(throwable);
else future.complete(compound);
return null;
Expand Down Expand Up @@ -337,7 +337,7 @@ private void writeChunk(long pos, Either<NbtCompound, byte[]> nbt) {
SneakyThrow.sneaky(t);
return null; // Unreachable anyway
}
}, GlobalExecutors.executor).thenAcceptAsync(bytes -> {
}, GlobalExecutors.prioritizedScheduler.executor(16)).thenAcceptAsync(bytes -> {
if (nbt == this.cache.get(pos)) { // only write if match to avoid overwrites
try {
final ChunkPos pos1 = new ChunkPos(pos);
Expand Down
Loading

0 comments on commit ba31ea5

Please sign in to comment.