virtual thread-based recording loops (one per recording)

(cherry picked from commit a4c44a4bc0a25e76715354bfa6e71681c0a8a50b)
This commit is contained in:
reusedname 2024-12-17 21:31:07 +05:00
parent 435cbbdb95
commit 74852692d0
1 changed files with 93 additions and 81 deletions

View File

@ -1,6 +1,7 @@
package ctbrec.recorder; package ctbrec.recorder;
import com.google.common.eventbus.Subscribe; import com.google.common.eventbus.Subscribe;
import ctbrec.*; import ctbrec.*;
import ctbrec.Recording.State; import ctbrec.Recording.State;
import ctbrec.event.*; import ctbrec.event.*;
@ -29,6 +30,7 @@ import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import com.google.common.collect.Ordering;
import static ctbrec.Recording.State.WAITING; import static ctbrec.Recording.State.WAITING;
import static ctbrec.SubsequentAction.*; import static ctbrec.SubsequentAction.*;
@ -40,7 +42,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
@Slf4j @Slf4j
public class SimplifiedLocalRecorder implements Recorder { public class SimplifiedLocalRecorder implements Recorder {
public static final Statistics STATS = new Statistics(); public static final Statistics STATS = new Statistics();
public static final boolean IGNORE_CACHE = true; public static final boolean IGNORE_CACHE = true;
private final List<Model> models = Collections.synchronizedList(new ArrayList<>()); private final List<Model> models = Collections.synchronizedList(new ArrayList<>());
private final Config config; private final Config config;
@ -53,28 +55,24 @@ public class SimplifiedLocalRecorder implements Recorder {
private final RecordingManager recordingManager; private final RecordingManager recordingManager;
private final RecordingPreconditions preconditions; private final RecordingPreconditions preconditions;
private final BlockingQueue<Recording> recordings = new LinkedBlockingQueue<>();
// thread pools for downloads and post-processing // thread pools for downloads and post-processing
private final ScheduledExecutorService scheduler; private final ExecutorService segmentDownloadPool = Executors.newVirtualThreadPerTaskExecutor();
private final ExecutorService playlistDownloadPool = Executors.newFixedThreadPool(10); private final ExecutorService recordingLoopPool = Executors.newVirtualThreadPerTaskExecutor();
private final ExecutorService segmentDownloadPool = new ThreadPoolExecutor(0, 1000, 30L, SECONDS, new SynchronousQueue<>(), createThreadFactory("SegmentDownload", MAX_PRIORITY - 2)); private final ThreadPoolExecutor postProcessing;
private final ExecutorService postProcessing; private final Thread maintenanceThread;
private final ThreadPoolScaler threadPoolScaler;
private long lastSpaceCheck; private long lastSpaceCheck;
public SimplifiedLocalRecorder(Config config, List<Site> sites) throws IOException { public SimplifiedLocalRecorder(Config config, List<Site> sites) throws IOException {
this.config = config; this.config = config;
client = new RecorderHttpClient(config); client = new RecorderHttpClient(config);
scheduler = Executors.newScheduledThreadPool(5, createThreadFactory("Download", MAX_PRIORITY));
threadPoolScaler = new ThreadPoolScaler((ThreadPoolExecutor) scheduler, 5);
recordingManager = new RecordingManager(config, sites); recordingManager = new RecordingManager(config, sites);
loadModels(sites); loadModels(sites);
int ppThreads = config.getSettings().postProcessingThreads; int ppThreads = config.getSettings().postProcessingThreads;
BlockingQueue<Runnable> ppQueue = new LinkedBlockingQueue<>(); BlockingQueue<Runnable> ppQueue = new LinkedBlockingQueue<>();
postProcessing = new ThreadPoolExecutor(ppThreads, ppThreads, 5, TimeUnit.MINUTES, ppQueue, createThreadFactory("PP", MIN_PRIORITY)); postProcessing = new ThreadPoolExecutor(ppThreads, ppThreads, 5, TimeUnit.MINUTES, ppQueue, createThreadFactory("PP", MIN_PRIORITY));
running = true; running = true;
registerEventBusListener(); registerEventBusListener();
@ -84,21 +82,20 @@ public class SimplifiedLocalRecorder implements Recorder {
log.info("Models to record: {}", models); log.info("Models to record: {}", models);
log.info("Saving recordings in {}", config.getSettings().recordingsDir); log.info("Saving recordings in {}", config.getSettings().recordingsDir);
startRecordingLoop(); maintenanceThread = startMaintenanceLoop();
} }
private void startRecordingLoop() { private Thread startMaintenanceLoop() {
new Thread(() -> { var t = new Thread(() -> {
while (running) { while (running && !Thread.currentThread().isInterrupted()) {
Recording rec = recordings.poll();
if (rec != null) {
processRecording(rec);
}
checkFreeSpace(); checkFreeSpace();
threadPoolScaler.tick(); //threadPoolScaler.tick();
waitABit(100); waitABit(1000);
} }
}).start(); });
t.setName("Recording loop");
t.start();
return t;
} }
private void checkFreeSpace() { private void checkFreeSpace() {
@ -125,33 +122,42 @@ public class SimplifiedLocalRecorder implements Recorder {
} }
} }
private void processRecording(Recording recording) { private void singleRecordingLoop(Recording recording) {
if (recording.getCurrentIteration().isDone()) { while (recording.getRecordingProcess().isRunning()) {
if (recording.getRecordingProcess().isRunning()) { try {
try { // run single iteration
Instant rescheduleAt = recording.getCurrentIteration().get().getRescheduleTime(); recording.getRecordingProcess().call();
Duration duration = Duration.between(Instant.now(), rescheduleAt);
long delayInMillis = Math.max(0, duration.toMillis()); if (recording.getModel().isSuspended()) {
log.trace("Current iteration is done {}. Recording status {}. Rescheduling in {}ms", recording.getModel().getName(), recording.getStatus().name(), delayInMillis); log.info("Recording process for suspended model found: {}. Stopping now", recording.getModel());
scheduleRecording(recording, delayInMillis); stopRecordingProcess(recording);
} catch (InterruptedException e) { submitPostProcessingJob(recording);
Thread.currentThread().interrupt(); break;
fail(recording);
} catch (ExecutionException e) {
log.error("Error while recording model {}. Stopping recording.", recording.getModel(), e);
fail(recording);
} }
} else {
removeRecordingProcess(recording); // wait necessary time
if (deleteIfEmpty(recording)) { Instant rescheduleAt = recording.getRecordingProcess().getRescheduleTime();
return; Duration duration = Duration.between(Instant.now(), rescheduleAt);
} long delayInMillis = Math.max(0, duration.toMillis());
submitPostProcessingJob(recording); log.trace("Current iteration is done {}. Recording status {}. Rescheduling in {}ms", recording.getModel().getName(), recording.getStatus().name(), delayInMillis);
tryRestartRecording(recording.getModel()); Thread.sleep(delayInMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
fail(recording);
return;
} catch (Exception e) {
log.error("Error while recording model {}. Stopping recording.", recording.getModel(), e);
fail(recording);
return;
} }
} else { }
recordings.add(recording);
removeRecordingProcess(recording);
if (deleteIfEmpty(recording)) {
return;
} }
submitPostProcessingJob(recording);
tryRestartRecording(recording.getModel());
} }
private void removeRecordingProcess(Recording rec) { private void removeRecordingProcess(Recording rec) {
@ -173,19 +179,6 @@ public class SimplifiedLocalRecorder implements Recorder {
tryRestartRecording(recording.getModel()); tryRestartRecording(recording.getModel());
} }
private void scheduleRecording(Recording recording, long delayInMillis) {
if (recording.getModel().isSuspended()) {
log.info("Recording process for suspended model found: {}. Stopping now", recording.getModel());
stopRecordingProcess(recording);
submitPostProcessingJob(recording);
return;
}
ScheduledFuture<RecordingProcess> future = scheduler.schedule(recording.getRecordingProcess(), delayInMillis, TimeUnit.MILLISECONDS);
recording.setCurrentIteration(future);
recording.getSelectedResolution();
recordings.add(recording);
}
private void loadModels(List<Site> sites) { private void loadModels(List<Site> sites) {
config.getSettings().models config.getSettings().models
.stream() .stream()
@ -217,9 +210,16 @@ public class SimplifiedLocalRecorder implements Recorder {
private void stopRecordings() { private void stopRecordings() {
log.info("Stopping all recordings"); log.info("Stopping all recordings");
for (Recording recording : recordings) { recorderLock.lock();
recording.getRecordingProcess().stop(); try {
recording.getRecordingProcess().awaitEnd(); for (Recording recording : recordingProcesses) {
recording.getRecordingProcess().stop();
}
for (Recording recording : recordingProcesses) {
recording.getRecordingProcess().awaitEnd();
}
} finally {
recorderLock.unlock();
} }
waitForRecordingsToTerminate(); waitForRecordingsToTerminate();
log.info("Recordings have been stopped"); log.info("Recordings have been stopped");
@ -518,11 +518,11 @@ public class SimplifiedLocalRecorder implements Recorder {
public void shutdown(boolean immediately) { public void shutdown(boolean immediately) {
log.info("Shutting down"); log.info("Shutting down");
shuttingDown = true; shuttingDown = true;
maintenanceThread.interrupt();
if (!immediately) { if (!immediately) {
try { try {
stopRecordings(); stopRecordings();
shutdownPool("Scheduler", scheduler, 60); shutdownPool("Recording loops", recordingLoopPool, 60);
shutdownPool("PlaylistDownloadPool", playlistDownloadPool, 60);
shutdownPool("SegmentDownloadPool", segmentDownloadPool, 60); shutdownPool("SegmentDownloadPool", segmentDownloadPool, 60);
shutdownPool("Post-Processing", postProcessing, 600); shutdownPool("Post-Processing", postProcessing, 600);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -733,18 +733,30 @@ public class SimplifiedLocalRecorder implements Recorder {
return; return;
} }
try { boolean modelInRecordingList = isTracked(model);
boolean modelInRecordingList = isTracked(model);
boolean online = model.isOnline(IGNORE_CACHE); if (modelInRecordingList) {
if (modelInRecordingList && online) { // .isOnline() check does blocking http request, so do this async
log.info("Restarting recording for model {}", model); recordingLoopPool.submit(() -> {
startRecordingProcess(model); try {
} boolean online = model.isOnline(IGNORE_CACHE);
} catch (InterruptedException e) { if (online) {
Thread.currentThread().interrupt(); log.info("Restarting recording for model {}", model);
log.error("Couldn't restart recording for model {}", model);
} catch (Exception e) { try {
log.error("Couldn't restart recording for model {}", model); recorderLock.lock();
startRecordingProcess(model);
} finally {
recorderLock.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Couldn't restart recording for model {}", model);
} catch (Exception e) {
log.error("Couldn't restart recording for model {}", model);
}
});
} }
} }
@ -772,8 +784,8 @@ public class SimplifiedLocalRecorder implements Recorder {
}); });
} }
private CompletableFuture<Void> startRecordingProcess(Model model) { private void startRecordingProcess(Model model) {
return CompletableFuture.runAsync(() -> { recordingLoopPool.submit(() -> {
recorderLock.lock(); recorderLock.lock();
try { try {
preconditions.check(model); preconditions.check(model);
@ -783,7 +795,7 @@ public class SimplifiedLocalRecorder implements Recorder {
setRecordingStatus(rec, State.RECORDING); setRecordingStatus(rec, State.RECORDING);
rec.getModel().setLastRecorded(rec.getStartDate()); rec.getModel().setLastRecorded(rec.getStartDate());
recordingManager.saveRecording(rec); recordingManager.saveRecording(rec);
scheduleRecording(rec, 0); recordingLoopPool.submit(() -> {singleRecordingLoop(rec);});
} catch (RecordUntilExpiredException e) { } catch (RecordUntilExpiredException e) {
log.info("Precondition not met to record {}: {}", model, e.getLocalizedMessage()); log.info("Precondition not met to record {}: {}", model, e.getLocalizedMessage());
executeRecordUntilSubsequentAction(model); executeRecordUntilSubsequentAction(model);
@ -794,12 +806,12 @@ public class SimplifiedLocalRecorder implements Recorder {
} finally { } finally {
recorderLock.unlock(); recorderLock.unlock();
} }
}, segmentDownloadPool); });
} }
private ThreadFactory createThreadFactory(String name, int priority) { private ThreadFactory createThreadFactory(String name, int priority) {
return r -> { return r -> {
Thread t = new Thread(r); Thread t = Thread.ofPlatform().unstarted(r);
t.setName(name + " " + UUID.randomUUID().toString().substring(0, 8)); t.setName(name + " " + UUID.randomUUID().toString().substring(0, 8));
t.setDaemon(true); t.setDaemon(true);
t.setPriority(priority); t.setPriority(priority);