Skip to content

Commit

Permalink
improve of OrderedExecutor and SingleThreadExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
fanjianye committed May 17, 2024
1 parent c2e8037 commit 6814b3c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,10 @@ protected OrderedExecutor(String baseName, int numThreads, ThreadFactory threadF
ExecutorService thread = createSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(name + "-" + getClass().getSimpleName() + "-" + i + "-%d")
.setThreadFactory(threadFactory).build());
SingleThreadExecutor ste = null;
if (thread instanceof SingleThreadExecutor) {
ste = (SingleThreadExecutor) thread;
}

if (traceTaskExecution || preserveMdcForTaskExecution) {
thread = addExecutorDecorators(thread);
Expand Down Expand Up @@ -425,48 +429,8 @@ protected OrderedExecutor(String baseName, int numThreads, ThreadFactory threadF
throw new RuntimeException("Couldn't start thread " + i, e);
}

if (thread instanceof SingleThreadExecutor) {
SingleThreadExecutor ste = (SingleThreadExecutor) thread;
ste.registerMetrics(statsLogger);
} else if (thread instanceof ThreadPoolExecutor) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) thread;
// Register gauges
statsLogger.scopeLabel("thread", String.valueOf(idx))
.registerGauge(String.format("%s-queue", name), new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}

@Override
public Number getSample() {
return threadPoolExecutor.getQueue().size();
}
});
statsLogger.scopeLabel("thread", String.valueOf(idx))
.registerGauge(String.format("%s-completed-tasks", name), new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}

@Override
public Number getSample() {
return threadPoolExecutor.getCompletedTaskCount();
}
});
statsLogger.scopeLabel("thread", String.valueOf(idx))
.registerGauge(String.format("%s-total-tasks", name), new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}

@Override
public Number getSample() {
return threadPoolExecutor.getTaskCount();
}
});
if (ste != null) {
ste.registerMetrics(statsLogger, name, idx);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,10 @@ public void execute(Runnable r) {
}
}

public void registerMetrics(StatsLogger statsLogger) {
public void registerMetrics(StatsLogger statsLogger, String name, int idx) {
// Register gauges
statsLogger.scopeLabel("thread", runner.getName())
.registerGauge("thread_executor_queue", new Gauge<Number>() {
statsLogger.scopeLabel("thread", String.valueOf(idx))
.registerGauge(String.format("%s-queue", name), new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
Expand All @@ -239,8 +239,8 @@ public Number getSample() {
return getQueuedTasksCount();
}
});
statsLogger.scopeLabel("thread", runner.getName())
.registerGauge("thread_executor_completed", new Gauge<Number>() {
statsLogger.scopeLabel("thread", String.valueOf(idx))
.registerGauge(String.format("%s-completed-tasks", name), new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
Expand All @@ -251,20 +251,8 @@ public Number getSample() {
return getCompletedTasksCount();
}
});
statsLogger.scopeLabel("thread", runner.getName())
.registerGauge("thread_executor_tasks_completed", new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}

@Override
public Number getSample() {
return getCompletedTasksCount();
}
});
statsLogger.scopeLabel("thread", runner.getName())
.registerGauge("thread_executor_tasks_rejected", new Gauge<Number>() {
statsLogger.scopeLabel("thread", String.valueOf(idx))
.registerGauge(String.format("%s-rejected-tasks", name), new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
Expand All @@ -275,8 +263,8 @@ public Number getSample() {
return getRejectedTasksCount();
}
});
statsLogger.scopeLabel("thread", runner.getName())
.registerGauge("thread_executor_tasks_failed", new Gauge<Number>() {
statsLogger.scopeLabel("thread", String.valueOf(idx))
.registerGauge(String.format("%s-failed-tasks", name), new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
Expand Down

0 comments on commit 6814b3c

Please sign in to comment.