diff --git a/vripper-server/src/main/java/tn/mnlr/vripper/download/DownloadService.java b/vripper-server/src/main/java/tn/mnlr/vripper/download/DownloadService.java index 8594d8d2..4af11f76 100644 --- a/vripper-server/src/main/java/tn/mnlr/vripper/download/DownloadService.java +++ b/vripper-server/src/main/java/tn/mnlr/vripper/download/DownloadService.java @@ -15,7 +15,6 @@ import javax.annotation.PostConstruct; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -28,18 +27,17 @@ public class DownloadService { private final int MAX_POOL_SIZE = 12; - private final ConcurrentHashMap threadCount = new ConcurrentHashMap<>(); - private final ExecutorService executor = Executors.newFixedThreadPool(MAX_POOL_SIZE); - private final List running = new ArrayList<>(); - private final List pending = new ArrayList<>(); - private final SettingsService settingsService; private final DataService dataService; private final MetadataService metadataService; - private final List hosts; - private Thread pollThread; + private final Map threadCount = new HashMap<>(); + private final ExecutorService executor = Executors.newFixedThreadPool(MAX_POOL_SIZE); + private final List running = new ArrayList<>(); + private final List pending = new ArrayList<>(); + + private final Thread pollThread; @Autowired public DownloadService( @@ -51,11 +49,36 @@ public DownloadService( this.dataService = dataService; this.metadataService = metadataService; this.hosts = hosts; + pollThread = + new Thread( + () -> { + while (!Thread.interrupted()) { + try { + synchronized (this) { + List accepted = new ArrayList<>(); + List candidates = getCandidates(candidateCount()); + candidates.forEach( + c -> { + if (canRun(c.getImage().getHost())) { + accepted.add(c); + } + }); + pending.removeAll(accepted); + accepted.forEach(this::schedule); + accepted.clear(); + this.wait(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + }, + "Download scheduler thread"); } @PostConstruct private void init() { - pollThread = new Thread(this::start, "Polling thread"); pollThread.start(); } @@ -70,7 +93,10 @@ public void destroy() throws Exception { log.debug(String.format("Stopping download jobs for %s", p)); this.stopRunning(p.getPostId()); }); - executor.awaitTermination(5, TimeUnit.SECONDS); + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + log.warn("Some jobs are still running!, forcing shutdown"); + executor.shutdownNow(); + } } private synchronized void stopRunning(@NonNull String postId) { @@ -186,19 +212,14 @@ private Map candidateCount() { return map; } - private List getCandidates(Map candidateCount, int max) { - int i = 0; + private List getCandidates(Map candidateCount) { List candidates = new ArrayList<>(); for (DownloadJob downloadJob : pending) { Host host = downloadJob.getImage().getHost(); Integer maxPerHost = candidateCount.get(host); - if (maxPerHost > 0 && i < max) { + if (maxPerHost > 0) { candidates.add(downloadJob); candidateCount.put(host, maxPerHost - 1); - i++; - } - if (i >= max) { - break; } } return candidates; @@ -225,31 +246,7 @@ public void enqueue(Map> images) { } } - private void start() { - while (!Thread.interrupted()) { - try { - synchronized (this) { - List accepted = new ArrayList<>(); - List candidates = getCandidates(candidateCount(), MAX_POOL_SIZE); - candidates.forEach( - c -> { - if (canRun(c.getImage().getHost())) { - accepted.add(c); - } - }); - pending.removeAll(accepted); - accepted.forEach(this::push); - accepted.clear(); - this.wait(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - break; - } - } - } - - private synchronized void push(DownloadJob downloadJob) { + private synchronized void schedule(DownloadJob downloadJob) { log.debug(String.format("Scheduling a job for %s", downloadJob.getImage().getUrl())); executor.execute(new DownloadJobWrapper(downloadJob)); running.add(downloadJob);