Skip to content

Commit

Permalink
hotfix: follwerThread queueCount 수에 맞게 변경
Browse files Browse the repository at this point in the history
  • Loading branch information
miiiinju1 committed Aug 29, 2024
1 parent 2df3c25 commit 2fcd23a
Showing 1 changed file with 5 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class AsyncMultiProcessor<E> implements EventProducer<E> {

private final List<ReentrantLogQueue<E>> queues;
private final List<ExecutorService> flatterExecutors;
private final List<ExecutorService> leaderExecutors;
private Consumer<List<E>> saveFunction;
private final int queueCount;
private final ObjectProvider<ReentrantLogQueue<E>> objectProvider;
Expand All @@ -35,6 +36,7 @@ public AsyncMultiProcessor(@Value("${queue.count:3}") int queueCount,
ObjectProvider<ReentrantLogQueue<E>> objectProvider) {
this.queueCount = queueCount;
this.queues = new ArrayList<>(queueCount);
this.leaderExecutors = new ArrayList<>(queueCount);
this.flatterExecutors = new ArrayList<>(queueCount);
this.objectProvider = objectProvider;
int poolSize = getPoolSize(jdbcTemplate);
Expand All @@ -57,10 +59,12 @@ public void produce(List<E> data) {
private void setup(int queueCount, Long timeout, Integer bulkSize, int poolSize) {
ReentrantLogQueue<E> queue = objectProvider.getObject(timeout, bulkSize);
for (int i = 0; i < queueCount; i++) {
ExecutorService leaderExecutor = Executors.newFixedThreadPool(poolSize);
queues.add(queue);
leaderExecutors.add(leaderExecutor);
flatterExecutors.add(Executors.newSingleThreadExecutor());
CompletableFuture.runAsync(() -> leaderTask(queue, leaderExecutor));
}
CompletableFuture.runAsync(() -> leaderTask(queue, Executors.newFixedThreadPool(poolSize)));
}

private void leaderTask(ReentrantLogQueue<E> queue, ExecutorService follower) {
Expand Down

0 comments on commit 2fcd23a

Please sign in to comment.