diff --git a/logbat/src/main/java/info/logbat/domain/log/repository/AsyncMultiProcessor.java b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncMultiProcessor.java index 4c063d1..c3653cd 100644 --- a/logbat/src/main/java/info/logbat/domain/log/repository/AsyncMultiProcessor.java +++ b/logbat/src/main/java/info/logbat/domain/log/repository/AsyncMultiProcessor.java @@ -25,6 +25,7 @@ public class AsyncMultiProcessor implements EventProducer { private final List> queues; private final List flatterExecutors; + private final List leaderExecutors; private Consumer> saveFunction; private final int queueCount; private final ObjectProvider> objectProvider; @@ -35,6 +36,7 @@ public AsyncMultiProcessor(@Value("${queue.count:3}") int queueCount, ObjectProvider> objectProvider) { this.queueCount = queueCount; this.queues = new ArrayList<>(queueCount); + this.leaderExecutors = new ArrayList<>(queueCount); this.flatterExecutors = new ArrayList<>(queueCount); this.objectProvider = objectProvider; int poolSize = getPoolSize(jdbcTemplate); @@ -57,10 +59,12 @@ public void produce(List data) { private void setup(int queueCount, Long timeout, Integer bulkSize, int poolSize) { ReentrantLogQueue queue = objectProvider.getObject(timeout, bulkSize); for (int i = 0; i < queueCount; i++) { + ExecutorService leaderExecutor = Executors.newFixedThreadPool(poolSize); queues.add(queue); + leaderExecutors.add(leaderExecutor); flatterExecutors.add(Executors.newSingleThreadExecutor()); + CompletableFuture.runAsync(() -> leaderTask(queue, leaderExecutor)); } - CompletableFuture.runAsync(() -> leaderTask(queue, Executors.newFixedThreadPool(poolSize))); } private void leaderTask(ReentrantLogQueue queue, ExecutorService follower) {