From f7309432454dd37b7984eb3a03e37fa414a267d0 Mon Sep 17 00:00:00 2001 From: 0xb00bface <0xboobface@gmail.com> Date: Fri, 1 Jan 2021 14:58:38 +0100 Subject: [PATCH] Optimize multi-threading - Set thread priorities for different thread pools - Run costly stuff in downloadCompletionHandler asynchronously --- .../ctbrec/recorder/NextGenLocalRecorder.java | 40 ++++++++++--------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java index 779ad885..9ef525ca 100644 --- a/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java +++ b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java @@ -2,6 +2,7 @@ package ctbrec.recorder; import static ctbrec.SubsequentAction.*; import static ctbrec.event.Event.Type.*; +import static java.lang.Thread.*; import static java.util.concurrent.TimeUnit.*; import java.io.File; @@ -23,6 +24,7 @@ import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -72,8 +74,8 @@ public class NextGenLocalRecorder implements Recorder { // thread pools for downloads and post-processing private ScheduledExecutorService downloadPool; private ThreadPoolScaler threadPoolScaler; - private ExecutorService segmentDownloadPool = new ThreadPoolExecutor(0, 1000, 30L, SECONDS, new SynchronousQueue<>(), createThreadFactory("SegmentDownload")); - private ExecutorService downloadCompletionPool = Executors.newFixedThreadPool(1, createThreadFactory("DownloadCompletionWorker")); + private ExecutorService segmentDownloadPool = new ThreadPoolExecutor(0, 1000, 30L, SECONDS, new SynchronousQueue<>(), createThreadFactory("SegmentDownload", MAX_PRIORITY - 2)); + private ExecutorService downloadCompletionPool = Executors.newFixedThreadPool(1, createThreadFactory("DownloadCompletionWorker", MAX_PRIORITY - 1)); private BlockingQueue> downloadFutureQueue = new LinkedBlockingQueue<>(); private Map, Recording> downloadFutureRecordingMap = Collections.synchronizedMap(new HashMap<>()); @@ -84,7 +86,7 @@ public class NextGenLocalRecorder implements Recorder { public NextGenLocalRecorder(Config config, List sites) throws IOException { this.config = config; - downloadPool = Executors.newScheduledThreadPool(config.getSettings().downloadThreadPoolSize, createThreadFactory("Download")); + downloadPool = Executors.newScheduledThreadPool(config.getSettings().downloadThreadPoolSize, createThreadFactory("Download", MAX_PRIORITY)); threadPoolScaler = new ThreadPoolScaler((ThreadPoolExecutor) downloadPool, config.getSettings().downloadThreadPoolSize); recordingManager = new RecordingManager(config, sites); config.getSettings().models.stream().forEach(m -> { @@ -100,7 +102,7 @@ public class NextGenLocalRecorder implements Recorder { }); int ppThreads = config.getSettings().postProcessingThreads; - ppPool = new ThreadPoolExecutor(ppThreads, ppThreads, 5, TimeUnit.MINUTES, ppQueue, createThreadFactory("PP")); + ppPool = new ThreadPoolExecutor(ppThreads, ppThreads, 5, TimeUnit.MINUTES, ppQueue, createThreadFactory("PP", MIN_PRIORITY)); recording = true; registerEventBusListener(); @@ -157,19 +159,20 @@ public class NextGenLocalRecorder implements Recorder { ScheduledFuture rescheduledFuture = downloadPool.schedule(rec, delay, TimeUnit.MILLISECONDS); downloadFutureQueue.add(rescheduledFuture); } else { - d.finalizeDownload(); - deleteIfEmpty(rec); - removeRecordingProcess(rec); - if (rec.getStatus() == State.WAITING) { - LOG.info("Download finished for {} -> Starting post-processing", rec.getModel().getName()); - submitPostProcessingJob(rec); + CompletableFuture.runAsync(() -> { + deleteIfEmpty(rec); + removeRecordingProcess(rec); + if (rec.getStatus() == State.WAITING) { + LOG.info("Download finished for {} -> Starting post-processing", rec.getModel().getName()); + submitPostProcessingJob(rec); - // check, if we have to restart the recording - Model model = rec.getModel(); - tryRestartRecording(model); - } else { - setRecordingStatus(rec, State.FAILED); - } + // check, if we have to restart the recording + Model model = rec.getModel(); + tryRestartRecording(model); + } else { + setRecordingStatus(rec, State.FAILED); + } + }); } } } catch (ExecutionException | IllegalStateException e) { @@ -201,6 +204,7 @@ public class NextGenLocalRecorder implements Recorder { ppPool.submit(() -> { try { setRecordingStatus(recording, State.POST_PROCESSING); + recording.getDownload().finalizeDownload(); recording.refresh(); recordingManager.saveRecording(recording); recording.postprocess(); @@ -646,12 +650,12 @@ public class NextGenLocalRecorder implements Recorder { }); } - private ThreadFactory createThreadFactory(String name) { + private ThreadFactory createThreadFactory(String name, int priority) { return r -> { Thread t = new Thread(r); t.setName(name + " " + UUID.randomUUID().toString().substring(0, 8)); t.setDaemon(true); - t.setPriority(Thread.MAX_PRIORITY); + t.setPriority(priority); return t; }; }