diff --git a/common/src/main/java/ctbrec/recorder/SimplifiedLocalRecorder.java b/common/src/main/java/ctbrec/recorder/SimplifiedLocalRecorder.java index d5ee142a..067d9f22 100644 --- a/common/src/main/java/ctbrec/recorder/SimplifiedLocalRecorder.java +++ b/common/src/main/java/ctbrec/recorder/SimplifiedLocalRecorder.java @@ -1,6 +1,7 @@ package ctbrec.recorder; import com.google.common.eventbus.Subscribe; + import ctbrec.*; import ctbrec.Recording.State; import ctbrec.event.*; @@ -29,6 +30,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; +import com.google.common.collect.Ordering; import static ctbrec.Recording.State.WAITING; import static ctbrec.SubsequentAction.*; @@ -40,7 +42,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; @Slf4j public class SimplifiedLocalRecorder implements Recorder { public static final Statistics STATS = new Statistics(); - + public static final boolean IGNORE_CACHE = true; private final List models = Collections.synchronizedList(new ArrayList<>()); private final Config config; @@ -53,28 +55,24 @@ public class SimplifiedLocalRecorder implements Recorder { private final RecordingManager recordingManager; private final RecordingPreconditions preconditions; - private final BlockingQueue recordings = new LinkedBlockingQueue<>(); - + // thread pools for downloads and post-processing - private final ScheduledExecutorService scheduler; - private final ExecutorService playlistDownloadPool = Executors.newFixedThreadPool(10); - private final ExecutorService segmentDownloadPool = new ThreadPoolExecutor(0, 1000, 30L, SECONDS, new SynchronousQueue<>(), createThreadFactory("SegmentDownload", MAX_PRIORITY - 2)); - private final ExecutorService postProcessing; - private final ThreadPoolScaler threadPoolScaler; + private final ExecutorService segmentDownloadPool = Executors.newVirtualThreadPerTaskExecutor(); + private final ExecutorService recordingLoopPool = Executors.newVirtualThreadPerTaskExecutor(); + private final ThreadPoolExecutor postProcessing; + private final Thread maintenanceThread; private long lastSpaceCheck; public SimplifiedLocalRecorder(Config config, List sites) throws IOException { this.config = config; client = new RecorderHttpClient(config); - scheduler = Executors.newScheduledThreadPool(5, createThreadFactory("Download", MAX_PRIORITY)); - threadPoolScaler = new ThreadPoolScaler((ThreadPoolExecutor) scheduler, 5); recordingManager = new RecordingManager(config, sites); loadModels(sites); int ppThreads = config.getSettings().postProcessingThreads; BlockingQueue ppQueue = new LinkedBlockingQueue<>(); postProcessing = new ThreadPoolExecutor(ppThreads, ppThreads, 5, TimeUnit.MINUTES, ppQueue, createThreadFactory("PP", MIN_PRIORITY)); - + running = true; registerEventBusListener(); @@ -84,21 +82,20 @@ public class SimplifiedLocalRecorder implements Recorder { log.info("Models to record: {}", models); log.info("Saving recordings in {}", config.getSettings().recordingsDir); - startRecordingLoop(); + maintenanceThread = startMaintenanceLoop(); } - private void startRecordingLoop() { - new Thread(() -> { - while (running) { - Recording rec = recordings.poll(); - if (rec != null) { - processRecording(rec); - } + private Thread startMaintenanceLoop() { + var t = new Thread(() -> { + while (running && !Thread.currentThread().isInterrupted()) { checkFreeSpace(); - threadPoolScaler.tick(); - waitABit(100); + //threadPoolScaler.tick(); + waitABit(1000); } - }).start(); + }); + t.setName("Recording loop"); + t.start(); + return t; } private void checkFreeSpace() { @@ -125,33 +122,42 @@ public class SimplifiedLocalRecorder implements Recorder { } } - private void processRecording(Recording recording) { - if (recording.getCurrentIteration().isDone()) { - if (recording.getRecordingProcess().isRunning()) { - try { - Instant rescheduleAt = recording.getCurrentIteration().get().getRescheduleTime(); - Duration duration = Duration.between(Instant.now(), rescheduleAt); - long delayInMillis = Math.max(0, duration.toMillis()); - log.trace("Current iteration is done {}. Recording status {}. Rescheduling in {}ms", recording.getModel().getName(), recording.getStatus().name(), delayInMillis); - scheduleRecording(recording, delayInMillis); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - fail(recording); - } catch (ExecutionException e) { - log.error("Error while recording model {}. Stopping recording.", recording.getModel(), e); - fail(recording); + private void singleRecordingLoop(Recording recording) { + while (recording.getRecordingProcess().isRunning()) { + try { + // run single iteration + recording.getRecordingProcess().call(); + + if (recording.getModel().isSuspended()) { + log.info("Recording process for suspended model found: {}. Stopping now", recording.getModel()); + stopRecordingProcess(recording); + submitPostProcessingJob(recording); + break; } - } else { - removeRecordingProcess(recording); - if (deleteIfEmpty(recording)) { - return; - } - submitPostProcessingJob(recording); - tryRestartRecording(recording.getModel()); + + // wait necessary time + Instant rescheduleAt = recording.getRecordingProcess().getRescheduleTime(); + Duration duration = Duration.between(Instant.now(), rescheduleAt); + long delayInMillis = Math.max(0, duration.toMillis()); + log.trace("Current iteration is done {}. Recording status {}. Rescheduling in {}ms", recording.getModel().getName(), recording.getStatus().name(), delayInMillis); + Thread.sleep(delayInMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + fail(recording); + return; + } catch (Exception e) { + log.error("Error while recording model {}. Stopping recording.", recording.getModel(), e); + fail(recording); + return; } - } else { - recordings.add(recording); + } + + removeRecordingProcess(recording); + if (deleteIfEmpty(recording)) { + return; } + submitPostProcessingJob(recording); + tryRestartRecording(recording.getModel()); } private void removeRecordingProcess(Recording rec) { @@ -173,19 +179,6 @@ public class SimplifiedLocalRecorder implements Recorder { tryRestartRecording(recording.getModel()); } - private void scheduleRecording(Recording recording, long delayInMillis) { - if (recording.getModel().isSuspended()) { - log.info("Recording process for suspended model found: {}. Stopping now", recording.getModel()); - stopRecordingProcess(recording); - submitPostProcessingJob(recording); - return; - } - ScheduledFuture future = scheduler.schedule(recording.getRecordingProcess(), delayInMillis, TimeUnit.MILLISECONDS); - recording.setCurrentIteration(future); - recording.getSelectedResolution(); - recordings.add(recording); - } - private void loadModels(List sites) { config.getSettings().models .stream() @@ -217,9 +210,16 @@ public class SimplifiedLocalRecorder implements Recorder { private void stopRecordings() { log.info("Stopping all recordings"); - for (Recording recording : recordings) { - recording.getRecordingProcess().stop(); - recording.getRecordingProcess().awaitEnd(); + recorderLock.lock(); + try { + for (Recording recording : recordingProcesses) { + recording.getRecordingProcess().stop(); + } + for (Recording recording : recordingProcesses) { + recording.getRecordingProcess().awaitEnd(); + } + } finally { + recorderLock.unlock(); } waitForRecordingsToTerminate(); log.info("Recordings have been stopped"); @@ -518,11 +518,11 @@ public class SimplifiedLocalRecorder implements Recorder { public void shutdown(boolean immediately) { log.info("Shutting down"); shuttingDown = true; + maintenanceThread.interrupt(); if (!immediately) { try { stopRecordings(); - shutdownPool("Scheduler", scheduler, 60); - shutdownPool("PlaylistDownloadPool", playlistDownloadPool, 60); + shutdownPool("Recording loops", recordingLoopPool, 60); shutdownPool("SegmentDownloadPool", segmentDownloadPool, 60); shutdownPool("Post-Processing", postProcessing, 600); } catch (InterruptedException e) { @@ -733,18 +733,30 @@ public class SimplifiedLocalRecorder implements Recorder { return; } - try { - boolean modelInRecordingList = isTracked(model); - boolean online = model.isOnline(IGNORE_CACHE); - if (modelInRecordingList && online) { - log.info("Restarting recording for model {}", model); - startRecordingProcess(model); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.error("Couldn't restart recording for model {}", model); - } catch (Exception e) { - log.error("Couldn't restart recording for model {}", model); + boolean modelInRecordingList = isTracked(model); + + if (modelInRecordingList) { + // .isOnline() check does blocking http request, so do this async + recordingLoopPool.submit(() -> { + try { + boolean online = model.isOnline(IGNORE_CACHE); + if (online) { + log.info("Restarting recording for model {}", model); + + try { + recorderLock.lock(); + startRecordingProcess(model); + } finally { + recorderLock.unlock(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("Couldn't restart recording for model {}", model); + } catch (Exception e) { + log.error("Couldn't restart recording for model {}", model); + } + }); } } @@ -772,8 +784,8 @@ public class SimplifiedLocalRecorder implements Recorder { }); } - private CompletableFuture startRecordingProcess(Model model) { - return CompletableFuture.runAsync(() -> { + private void startRecordingProcess(Model model) { + recordingLoopPool.submit(() -> { recorderLock.lock(); try { preconditions.check(model); @@ -783,7 +795,7 @@ public class SimplifiedLocalRecorder implements Recorder { setRecordingStatus(rec, State.RECORDING); rec.getModel().setLastRecorded(rec.getStartDate()); recordingManager.saveRecording(rec); - scheduleRecording(rec, 0); + recordingLoopPool.submit(() -> {singleRecordingLoop(rec);}); } catch (RecordUntilExpiredException e) { log.info("Precondition not met to record {}: {}", model, e.getLocalizedMessage()); executeRecordUntilSubsequentAction(model); @@ -794,12 +806,12 @@ public class SimplifiedLocalRecorder implements Recorder { } finally { recorderLock.unlock(); } - }, segmentDownloadPool); + }); } private ThreadFactory createThreadFactory(String name, int priority) { return r -> { - Thread t = new Thread(r); + Thread t = Thread.ofPlatform().unstarted(r); t.setName(name + " " + UUID.randomUUID().toString().substring(0, 8)); t.setDaemon(true); t.setPriority(priority);