Skip to content

Commit

Permalink
perf: reduce executor locking contention
Browse files Browse the repository at this point in the history
  • Loading branch information
ishland committed Feb 5, 2024
1 parent a0c7df2 commit bf8183b
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 17 deletions.
18 changes: 6 additions & 12 deletions src/main/java/com/ishland/flowsched/executor/ExecutorManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,10 @@ void releaseLocks(Task task) {
* @return the task, or {@code null} if no task is executable.
*/
Task pollExecutableTask() {
synchronized (this.schedulingMutex) {
Task task;
while ((task = this.globalWorkQueue.dequeue()) != null) {
if (this.tryLock(task)) {
return task;
}
Task task;
while ((task = this.globalWorkQueue.dequeue()) != null) {
if (this.tryLock(task)) {
return task;
}
}
return null;
Expand All @@ -121,9 +119,7 @@ public void shutdown() {
* @param task the task.
*/
public void schedule(Task task) {
synchronized (this.schedulingMutex) {
this.globalWorkQueue.enqueue(task, task.priority());
}
this.globalWorkQueue.enqueue(task, task.priority());
synchronized (this.workerMonitor) {
this.workerMonitor.notify();
}
Expand Down Expand Up @@ -155,9 +151,7 @@ public Executor executor(int priority) {
* @param task the task.
*/
public void notifyPriorityChange(Task task) {
synchronized (this.schedulingMutex) {
this.globalWorkQueue.changePriority(task, task.priority());
}
this.globalWorkQueue.changePriority(task, task.priority());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public DynamicPriorityQueue(int priorityCount) {
}
}

public void enqueue(E element, int priority) {
public synchronized void enqueue(E element, int priority) {
if (priority < 0 || priority >= priorities.length)
throw new IllegalArgumentException("Priority out of range");
if (priorityMap.containsKey(element))
Expand All @@ -37,7 +37,7 @@ public void enqueue(E element, int priority) {
currentMinPriority = priority;
}

public boolean changePriority(E element, int priority) {
public synchronized boolean changePriority(E element, int priority) {
if (priority < 0 || priority >= priorities.length)
throw new IllegalArgumentException("Priority out of range");
if (!priorityMap.containsKey(element)) return false; // ignored
Expand All @@ -53,7 +53,7 @@ public boolean changePriority(E element, int priority) {
return true;
}

public E dequeue() {
public synchronized E dequeue() {
while (currentMinPriority < priorities.length) {
ObjectLinkedOpenHashSet<E> priority = this.priorities[currentMinPriority];
if (priority.isEmpty()) {
Expand All @@ -67,11 +67,11 @@ public E dequeue() {
return null;
}

public boolean contains(E element) {
public synchronized boolean contains(E element) {
return priorityMap.containsKey(element);
}

public void remove(E element) {
public synchronized void remove(E element) {
if (!priorityMap.containsKey(element))
return; // ignore
int priority = priorityMap.removeInt(element);
Expand Down

0 comments on commit bf8183b

Please sign in to comment.