Optimize multi-threading

- Set thread priorities for different thread pools
- Run costly stuff in downloadCompletionHandler asynchronously
This commit is contained in:
0xb00bface 2021-01-01 14:58:38 +01:00
parent a1492927e6
commit f730943245
1 changed files with 22 additions and 18 deletions

View File

@ -2,6 +2,7 @@ package ctbrec.recorder;
import static ctbrec.SubsequentAction.*; import static ctbrec.SubsequentAction.*;
import static ctbrec.event.Event.Type.*; import static ctbrec.event.Event.Type.*;
import static java.lang.Thread.*;
import static java.util.concurrent.TimeUnit.*; import static java.util.concurrent.TimeUnit.*;
import java.io.File; import java.io.File;
@ -23,6 +24,7 @@ import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -72,8 +74,8 @@ public class NextGenLocalRecorder implements Recorder {
// thread pools for downloads and post-processing // thread pools for downloads and post-processing
private ScheduledExecutorService downloadPool; private ScheduledExecutorService downloadPool;
private ThreadPoolScaler threadPoolScaler; private ThreadPoolScaler threadPoolScaler;
private ExecutorService segmentDownloadPool = new ThreadPoolExecutor(0, 1000, 30L, SECONDS, new SynchronousQueue<>(), createThreadFactory("SegmentDownload")); private ExecutorService segmentDownloadPool = new ThreadPoolExecutor(0, 1000, 30L, SECONDS, new SynchronousQueue<>(), createThreadFactory("SegmentDownload", MAX_PRIORITY - 2));
private ExecutorService downloadCompletionPool = Executors.newFixedThreadPool(1, createThreadFactory("DownloadCompletionWorker")); private ExecutorService downloadCompletionPool = Executors.newFixedThreadPool(1, createThreadFactory("DownloadCompletionWorker", MAX_PRIORITY - 1));
private BlockingQueue<ScheduledFuture<Recording>> downloadFutureQueue = new LinkedBlockingQueue<>(); private BlockingQueue<ScheduledFuture<Recording>> downloadFutureQueue = new LinkedBlockingQueue<>();
private Map<ScheduledFuture<Recording>, Recording> downloadFutureRecordingMap = Collections.synchronizedMap(new HashMap<>()); private Map<ScheduledFuture<Recording>, Recording> downloadFutureRecordingMap = Collections.synchronizedMap(new HashMap<>());
@ -84,7 +86,7 @@ public class NextGenLocalRecorder implements Recorder {
public NextGenLocalRecorder(Config config, List<Site> sites) throws IOException { public NextGenLocalRecorder(Config config, List<Site> sites) throws IOException {
this.config = config; this.config = config;
downloadPool = Executors.newScheduledThreadPool(config.getSettings().downloadThreadPoolSize, createThreadFactory("Download")); downloadPool = Executors.newScheduledThreadPool(config.getSettings().downloadThreadPoolSize, createThreadFactory("Download", MAX_PRIORITY));
threadPoolScaler = new ThreadPoolScaler((ThreadPoolExecutor) downloadPool, config.getSettings().downloadThreadPoolSize); threadPoolScaler = new ThreadPoolScaler((ThreadPoolExecutor) downloadPool, config.getSettings().downloadThreadPoolSize);
recordingManager = new RecordingManager(config, sites); recordingManager = new RecordingManager(config, sites);
config.getSettings().models.stream().forEach(m -> { config.getSettings().models.stream().forEach(m -> {
@ -100,7 +102,7 @@ public class NextGenLocalRecorder implements Recorder {
}); });
int ppThreads = config.getSettings().postProcessingThreads; int ppThreads = config.getSettings().postProcessingThreads;
ppPool = new ThreadPoolExecutor(ppThreads, ppThreads, 5, TimeUnit.MINUTES, ppQueue, createThreadFactory("PP")); ppPool = new ThreadPoolExecutor(ppThreads, ppThreads, 5, TimeUnit.MINUTES, ppQueue, createThreadFactory("PP", MIN_PRIORITY));
recording = true; recording = true;
registerEventBusListener(); registerEventBusListener();
@ -157,19 +159,20 @@ public class NextGenLocalRecorder implements Recorder {
ScheduledFuture<Recording> rescheduledFuture = downloadPool.schedule(rec, delay, TimeUnit.MILLISECONDS); ScheduledFuture<Recording> rescheduledFuture = downloadPool.schedule(rec, delay, TimeUnit.MILLISECONDS);
downloadFutureQueue.add(rescheduledFuture); downloadFutureQueue.add(rescheduledFuture);
} else { } else {
d.finalizeDownload(); CompletableFuture.runAsync(() -> {
deleteIfEmpty(rec); deleteIfEmpty(rec);
removeRecordingProcess(rec); removeRecordingProcess(rec);
if (rec.getStatus() == State.WAITING) { if (rec.getStatus() == State.WAITING) {
LOG.info("Download finished for {} -> Starting post-processing", rec.getModel().getName()); LOG.info("Download finished for {} -> Starting post-processing", rec.getModel().getName());
submitPostProcessingJob(rec); submitPostProcessingJob(rec);
// check, if we have to restart the recording // check, if we have to restart the recording
Model model = rec.getModel(); Model model = rec.getModel();
tryRestartRecording(model); tryRestartRecording(model);
} else { } else {
setRecordingStatus(rec, State.FAILED); setRecordingStatus(rec, State.FAILED);
} }
});
} }
} }
} catch (ExecutionException | IllegalStateException e) { } catch (ExecutionException | IllegalStateException e) {
@ -201,6 +204,7 @@ public class NextGenLocalRecorder implements Recorder {
ppPool.submit(() -> { ppPool.submit(() -> {
try { try {
setRecordingStatus(recording, State.POST_PROCESSING); setRecordingStatus(recording, State.POST_PROCESSING);
recording.getDownload().finalizeDownload();
recording.refresh(); recording.refresh();
recordingManager.saveRecording(recording); recordingManager.saveRecording(recording);
recording.postprocess(); recording.postprocess();
@ -646,12 +650,12 @@ public class NextGenLocalRecorder implements Recorder {
}); });
} }
private ThreadFactory createThreadFactory(String name) { private ThreadFactory createThreadFactory(String name, int priority) {
return r -> { return r -> {
Thread t = new Thread(r); Thread t = new Thread(r);
t.setName(name + " " + UUID.randomUUID().toString().substring(0, 8)); t.setName(name + " " + UUID.randomUUID().toString().substring(0, 8));
t.setDaemon(true); t.setDaemon(true);
t.setPriority(Thread.MAX_PRIORITY); t.setPriority(priority);
return t; return t;
}; };
} }