Skip to content

Commit

Permalink
Code enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
dev-claw committed May 31, 2021
1 parent b0f5a15 commit c018033
Showing 1 changed file with 39 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -28,18 +27,17 @@ public class DownloadService {

private final int MAX_POOL_SIZE = 12;

private final ConcurrentHashMap<Host, AtomicInteger> threadCount = new ConcurrentHashMap<>();
private final ExecutorService executor = Executors.newFixedThreadPool(MAX_POOL_SIZE);
private final List<DownloadJob> running = new ArrayList<>();
private final List<DownloadJob> pending = new ArrayList<>();

private final SettingsService settingsService;
private final DataService dataService;
private final MetadataService metadataService;

private final List<Host> hosts;

private Thread pollThread;
private final Map<Host, AtomicInteger> threadCount = new HashMap<>();
private final ExecutorService executor = Executors.newFixedThreadPool(MAX_POOL_SIZE);
private final List<DownloadJob> running = new ArrayList<>();
private final List<DownloadJob> pending = new ArrayList<>();

private final Thread pollThread;

@Autowired
public DownloadService(
Expand All @@ -51,11 +49,36 @@ public DownloadService(
this.dataService = dataService;
this.metadataService = metadataService;
this.hosts = hosts;
pollThread =
new Thread(
() -> {
while (!Thread.interrupted()) {
try {
synchronized (this) {
List<DownloadJob> accepted = new ArrayList<>();
List<DownloadJob> candidates = getCandidates(candidateCount());
candidates.forEach(
c -> {
if (canRun(c.getImage().getHost())) {
accepted.add(c);
}
});
pending.removeAll(accepted);
accepted.forEach(this::schedule);
accepted.clear();
this.wait();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
},
"Download scheduler thread");
}

@PostConstruct
private void init() {
pollThread = new Thread(this::start, "Polling thread");
pollThread.start();
}

Expand All @@ -70,7 +93,10 @@ public void destroy() throws Exception {
log.debug(String.format("Stopping download jobs for %s", p));
this.stopRunning(p.getPostId());
});
executor.awaitTermination(5, TimeUnit.SECONDS);
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
log.warn("Some jobs are still running!, forcing shutdown");
executor.shutdownNow();
}
}

private synchronized void stopRunning(@NonNull String postId) {
Expand Down Expand Up @@ -186,19 +212,14 @@ private Map<Host, Integer> candidateCount() {
return map;
}

private List<DownloadJob> getCandidates(Map<Host, Integer> candidateCount, int max) {
int i = 0;
private List<DownloadJob> getCandidates(Map<Host, Integer> candidateCount) {
List<DownloadJob> candidates = new ArrayList<>();
for (DownloadJob downloadJob : pending) {
Host host = downloadJob.getImage().getHost();
Integer maxPerHost = candidateCount.get(host);
if (maxPerHost > 0 && i < max) {
if (maxPerHost > 0) {
candidates.add(downloadJob);
candidateCount.put(host, maxPerHost - 1);
i++;
}
if (i >= max) {
break;
}
}
return candidates;
Expand All @@ -225,31 +246,7 @@ public void enqueue(Map<Post, Collection<Image>> images) {
}
}

private void start() {
while (!Thread.interrupted()) {
try {
synchronized (this) {
List<DownloadJob> accepted = new ArrayList<>();
List<DownloadJob> candidates = getCandidates(candidateCount(), MAX_POOL_SIZE);
candidates.forEach(
c -> {
if (canRun(c.getImage().getHost())) {
accepted.add(c);
}
});
pending.removeAll(accepted);
accepted.forEach(this::push);
accepted.clear();
this.wait();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}

private synchronized void push(DownloadJob downloadJob) {
private synchronized void schedule(DownloadJob downloadJob) {
log.debug(String.format("Scheduling a job for %s", downloadJob.getImage().getUrl()));
executor.execute(new DownloadJobWrapper(downloadJob));
running.add(downloadJob);
Expand Down

0 comments on commit c018033

Please sign in to comment.