Make sure that only one recording per model is started and that recordings terminate before shutting down the thread pools
This commit is contained in:
parent
fb5fef8912
commit
013d28c33d
|
@ -71,6 +71,11 @@
|
||||||
<artifactId>antlr4-runtime</artifactId>
|
<artifactId>antlr4-runtime</artifactId>
|
||||||
<version>${antlr.version}</version>
|
<version>${antlr.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>ch.qos.logback</groupId>
|
||||||
|
<artifactId>logback-classic</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|
|
@ -8,7 +8,7 @@ import java.util.concurrent.Future;
|
||||||
|
|
||||||
public class GlobalThreadPool {
|
public class GlobalThreadPool {
|
||||||
|
|
||||||
private static ExecutorService threadPool = Executors.newCachedThreadPool(r -> {
|
private static final ExecutorService threadPool = Executors.newCachedThreadPool(r -> {
|
||||||
Thread t = new Thread(r);
|
Thread t = new Thread(r);
|
||||||
t.setDaemon(true);
|
t.setDaemon(true);
|
||||||
t.setName("GlobalWorker-" + UUID.randomUUID().toString().substring(0, 8));
|
t.setName("GlobalWorker-" + UUID.randomUUID().toString().substring(0, 8));
|
||||||
|
|
|
@ -101,7 +101,7 @@ public class RecordingPreconditions {
|
||||||
RecordingProcess download = lowerPrioRecordingProcess.get().getRecordingProcess();
|
RecordingProcess download = lowerPrioRecordingProcess.get().getRecordingProcess();
|
||||||
Model lowerPrioModel = download.getModel();
|
Model lowerPrioModel = download.getModel();
|
||||||
LOG.info("Stopping recording for {}. Prio {} < {}", lowerPrioModel.getName(), lowerPrioModel.getPriority(), model.getPriority());
|
LOG.info("Stopping recording for {}. Prio {} < {}", lowerPrioModel.getName(), lowerPrioModel.getPriority(), model.getPriority());
|
||||||
recorder.stopRecordingProcess(lowerPrioModel);
|
recorder.stopRecordingProcess(lowerPrioRecordingProcess.get());
|
||||||
} else {
|
} else {
|
||||||
throw new PreconditionNotMetException("Other models have higher prio, not starting recording for " + model.getName());
|
throw new PreconditionNotMetException("Other models have higher prio, not starting recording for " + model.getName());
|
||||||
}
|
}
|
||||||
|
@ -109,14 +109,17 @@ public class RecordingPreconditions {
|
||||||
}
|
}
|
||||||
|
|
||||||
private Optional<Recording> recordingProcessWithLowerPrio(int priority) {
|
private Optional<Recording> recordingProcessWithLowerPrio(int priority) {
|
||||||
Model lowest = null;
|
Recording lowest = null;
|
||||||
for (Model m : recorder.getRecordingProcesses().keySet()) {
|
int lowestPrio = Integer.MAX_VALUE;
|
||||||
if (lowest == null || m.getPriority() < lowest.getPriority()) {
|
for (Recording rec : recorder.getRecordingProcesses()) {
|
||||||
lowest = m;
|
Model m = rec.getModel();
|
||||||
|
if (m.getPriority() < lowestPrio) {
|
||||||
|
lowest = rec;
|
||||||
|
lowestPrio = m.getPriority();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (lowest != null && lowest.getPriority() < priority) {
|
if (lowestPrio < priority) {
|
||||||
return Optional.of(recorder.getRecordingProcesses().get(lowest));
|
return Optional.of(lowest);
|
||||||
} else {
|
} else {
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
@ -129,7 +132,7 @@ public class RecordingPreconditions {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void ensureNoRecordingRunningForModel(Model model) {
|
private void ensureNoRecordingRunningForModel(Model model) {
|
||||||
if (recorder.getRecordingProcesses().containsKey(model)) {
|
if (recorder.isRecordingRunningForModel(model)) {
|
||||||
throw new PreconditionNotMetException("A recording for model " + model + " is already running");
|
throw new PreconditionNotMetException("A recording for model " + model + " is already running");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -189,7 +192,7 @@ public class RecordingPreconditions {
|
||||||
private void stopModelsWithLowerPrio(ModelGroup modelGroup) throws InvalidKeyException, NoSuchAlgorithmException, IOException {
|
private void stopModelsWithLowerPrio(ModelGroup modelGroup) throws InvalidKeyException, NoSuchAlgorithmException, IOException {
|
||||||
recorder.getCurrentlyRecording().stream()
|
recorder.getCurrentlyRecording().stream()
|
||||||
.filter(m -> modelGroup.getModelUrls().contains(m.getUrl()))
|
.filter(m -> modelGroup.getModelUrls().contains(m.getUrl()))
|
||||||
.forEach(recorder::stopRecordingProcess);
|
.forEach(m -> recorder.getRecordingProcessForModel(m).ifPresent(recorder::stopRecordingProcess));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,7 +41,7 @@ public class SimplifiedLocalRecorder implements Recorder {
|
||||||
private final ReentrantLock recorderLock = new ReentrantLock();
|
private final ReentrantLock recorderLock = new ReentrantLock();
|
||||||
private final ReentrantLock modelGroupLock = new ReentrantLock();
|
private final ReentrantLock modelGroupLock = new ReentrantLock();
|
||||||
private final RecorderHttpClient client;
|
private final RecorderHttpClient client;
|
||||||
private final Map<Model, Recording> recordingProcesses = Collections.synchronizedMap(new HashMap<>());
|
private final List<Recording> recordingProcesses = Collections.synchronizedList(new ArrayList<>());
|
||||||
private final RecordingManager recordingManager;
|
private final RecordingManager recordingManager;
|
||||||
private final RecordingPreconditions preconditions;
|
private final RecordingPreconditions preconditions;
|
||||||
|
|
||||||
|
@ -88,7 +88,7 @@ public class SimplifiedLocalRecorder implements Recorder {
|
||||||
}
|
}
|
||||||
checkFreeSpace();
|
checkFreeSpace();
|
||||||
threadPoolScaler.tick();
|
threadPoolScaler.tick();
|
||||||
waitABit(1);
|
waitABit(100);
|
||||||
}
|
}
|
||||||
}).start();
|
}).start();
|
||||||
}
|
}
|
||||||
|
@ -130,7 +130,6 @@ public class SimplifiedLocalRecorder implements Recorder {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
fail(recording);
|
fail(recording);
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
// TODO react to different exceptions, e.g. with a retry
|
|
||||||
log.error("Error while recording model {}. Stopping recording.", recording.getModel(), e);
|
log.error("Error while recording model {}. Stopping recording.", recording.getModel(), e);
|
||||||
fail(recording);
|
fail(recording);
|
||||||
}
|
}
|
||||||
|
@ -149,14 +148,14 @@ public class SimplifiedLocalRecorder implements Recorder {
|
||||||
private void removeRecordingProcess(Recording rec) {
|
private void removeRecordingProcess(Recording rec) {
|
||||||
recorderLock.lock();
|
recorderLock.lock();
|
||||||
try {
|
try {
|
||||||
recordingProcesses.remove(rec.getModel());
|
recordingProcesses.remove(rec);
|
||||||
} finally {
|
} finally {
|
||||||
recorderLock.unlock();
|
recorderLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void fail(Recording recording) {
|
private void fail(Recording recording) {
|
||||||
stopRecordingProcess(recording.getModel());
|
stopRecordingProcess(recording);
|
||||||
recording.getRecordingProcess().finalizeDownload();
|
recording.getRecordingProcess().finalizeDownload();
|
||||||
if (deleteIfEmpty(recording)) {
|
if (deleteIfEmpty(recording)) {
|
||||||
return;
|
return;
|
||||||
|
@ -166,6 +165,12 @@ public class SimplifiedLocalRecorder implements Recorder {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void scheduleRecording(Recording recording, long delayInMillis) {
|
private void scheduleRecording(Recording recording, long delayInMillis) {
|
||||||
|
if (recording.getModel().isSuspended()) {
|
||||||
|
log.info("Recording process for suspended model found: {}. Stopping now", recording.getModel());
|
||||||
|
stopRecordingProcess(recording);
|
||||||
|
submitPostProcessingJob(recording);
|
||||||
|
return;
|
||||||
|
}
|
||||||
ScheduledFuture<RecordingProcess> future = scheduler.schedule(recording.getRecordingProcess(), delayInMillis, TimeUnit.MILLISECONDS);
|
ScheduledFuture<RecordingProcess> future = scheduler.schedule(recording.getRecordingProcess(), delayInMillis, TimeUnit.MILLISECONDS);
|
||||||
recording.setCurrentIteration(future);
|
recording.setCurrentIteration(future);
|
||||||
recording.getSelectedResolution();
|
recording.getSelectedResolution();
|
||||||
|
@ -203,65 +208,71 @@ public class SimplifiedLocalRecorder implements Recorder {
|
||||||
recording.getRecordingProcess().stop();
|
recording.getRecordingProcess().stop();
|
||||||
recording.getRecordingProcess().awaitEnd();
|
recording.getRecordingProcess().awaitEnd();
|
||||||
}
|
}
|
||||||
|
waitForRecordingsToTerminate();
|
||||||
log.info("Recordings have been stopped");
|
log.info("Recordings have been stopped");
|
||||||
}
|
}
|
||||||
|
|
||||||
// private void fail(ScheduledFuture<Recording> future, Exception e) {
|
private void waitForRecordingsToTerminate() {
|
||||||
// if (downloadFutureRecordingMap.containsKey(future)) {
|
long secondsToWait = 30;
|
||||||
// Recording rec = downloadFutureRecordingMap.remove(future);
|
for (int i = 0; i < secondsToWait; i++) {
|
||||||
// deleteIfEmpty(rec);
|
if (recordingProcesses.isEmpty()) {
|
||||||
// removeRecordingProcess(rec);
|
return;
|
||||||
// rec.getRecordingProcess().finalizeDownload();
|
}
|
||||||
// log.error("Error while recording stream for model {}", rec.getModel(), e);
|
|
||||||
// } else {
|
log.info("Waiting for recording processes to terminate");
|
||||||
// log.error("Error while recording stream", e);
|
waitABit(1000);
|
||||||
// }
|
}
|
||||||
// }
|
log.warn("Recording processes didn't finish in {} seconds. Continuing, but some recordings might not get post-processed", secondsToWait);
|
||||||
|
}
|
||||||
|
|
||||||
private void submitPostProcessingJob(Recording recording) {
|
private void submitPostProcessingJob(Recording recording) {
|
||||||
setRecordingStatus(recording, WAITING);
|
setRecordingStatus(recording, WAITING);
|
||||||
postProcessing.submit(() -> {
|
try {
|
||||||
try {
|
postProcessing.submit(() -> {
|
||||||
setRecordingStatus(recording, State.POST_PROCESSING);
|
try {
|
||||||
recording.getRecordingProcess().stop();
|
setRecordingStatus(recording, State.POST_PROCESSING);
|
||||||
recording.getRecordingProcess().awaitEnd();
|
recording.getRecordingProcess().stop();
|
||||||
recording.setDirtyFlag(true);
|
recording.getRecordingProcess().awaitEnd();
|
||||||
recording.getRecordingProcess().finalizeDownload();
|
recording.setDirtyFlag(true);
|
||||||
recording.refresh();
|
recording.getRecordingProcess().finalizeDownload();
|
||||||
recordingManager.saveRecording(recording);
|
recording.refresh();
|
||||||
recording.postprocess();
|
recordingManager.saveRecording(recording);
|
||||||
List<PostProcessor> postProcessors = config.getSettings().postProcessors;
|
recording.postprocess();
|
||||||
PostProcessingContext ctx = createPostProcessingContext(recording);
|
List<PostProcessor> postProcessors = config.getSettings().postProcessors;
|
||||||
for (PostProcessor postProcessor : postProcessors) {
|
PostProcessingContext ctx = createPostProcessingContext(recording);
|
||||||
if (postProcessor.isEnabled()) {
|
for (PostProcessor postProcessor : postProcessors) {
|
||||||
log.debug("Running post-processor: {}", postProcessor.getName());
|
if (postProcessor.isEnabled()) {
|
||||||
boolean continuePP = postProcessor.postprocess(ctx);
|
log.debug("Running post-processor: {}", postProcessor.getName());
|
||||||
if (!continuePP) {
|
boolean continuePP = postProcessor.postprocess(ctx);
|
||||||
break;
|
if (!continuePP) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.debug("Skipping post-processor {} because it is disabled", postProcessor.getName());
|
||||||
}
|
}
|
||||||
} else {
|
}
|
||||||
log.debug("Skipping post-processor {} because it is disabled", postProcessor.getName());
|
recording.refresh();
|
||||||
|
if (recording.getStatus() != State.DELETED) {
|
||||||
|
setRecordingStatus(recording, State.FINISHED);
|
||||||
|
recordingManager.saveRecording(recording);
|
||||||
|
}
|
||||||
|
log.info("Post-processing finished for {}", recording.getModel().getName());
|
||||||
|
} catch (Exception e) {
|
||||||
|
if (e instanceof InterruptedException) { // NOSONAR
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
log.error("Error while post-processing recording {}", recording, e);
|
||||||
|
recording.setStatus(State.FAILED);
|
||||||
|
try {
|
||||||
|
recordingManager.saveRecording(recording);
|
||||||
|
} catch (IOException e1) {
|
||||||
|
log.error("Couldn't update recording state for recording {}", recording, e1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
recording.refresh();
|
});
|
||||||
if (recording.getStatus() != State.DELETED) {
|
} catch (RejectedExecutionException e) {
|
||||||
setRecordingStatus(recording, State.FINISHED);
|
log.error("Could not start post-processing for {} {}:{}. Execution rejected by thread pool", recording, recording.getModel().getSite().getName(), recording.getModel().getDisplayName());
|
||||||
recordingManager.saveRecording(recording);
|
}
|
||||||
}
|
|
||||||
log.info("Post-processing finished for {}", recording.getModel().getName());
|
|
||||||
} catch (Exception e) {
|
|
||||||
if (e instanceof InterruptedException) { // NOSONAR
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
log.error("Error while post-processing recording {}", recording, e);
|
|
||||||
recording.setStatus(State.FAILED);
|
|
||||||
try {
|
|
||||||
recordingManager.saveRecording(recording);
|
|
||||||
} catch (IOException e1) {
|
|
||||||
log.error("Couldn't update recording state for recording {}", recording, e1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private PostProcessingContext createPostProcessingContext(Recording recording) {
|
private PostProcessingContext createPostProcessingContext(Recording recording) {
|
||||||
|
@ -362,7 +373,7 @@ public class SimplifiedLocalRecorder implements Recorder {
|
||||||
rec.setModel(model);
|
rec.setModel(model);
|
||||||
rec.setStartDate(download.getStartTime());
|
rec.setStartDate(download.getStartTime());
|
||||||
rec.setSingleFile(download.isSingleFile());
|
rec.setSingleFile(download.isSingleFile());
|
||||||
recordingProcesses.put(model, rec);
|
recordingProcesses.add(rec);
|
||||||
recordingManager.add(rec);
|
recordingManager.add(rec);
|
||||||
return rec;
|
return rec;
|
||||||
}
|
}
|
||||||
|
@ -401,15 +412,22 @@ public class SimplifiedLocalRecorder implements Recorder {
|
||||||
throw new NoSuchElementException("Model " + model.getName() + " [" + model.getUrl() + "] not found in list of recorded models");
|
throw new NoSuchElementException("Model " + model.getName() + " [" + model.getUrl() + "] not found in list of recorded models");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (recordingProcesses.containsKey(model)) {
|
if (isRecordingRunningForModel(model)) {
|
||||||
Recording rec = recordingProcesses.get(model);
|
getRecordingProcessForModel(model).map(Recording::getRecordingProcess).ifPresent(RecordingProcess::stop);
|
||||||
rec.getRecordingProcess().stop();
|
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
recorderLock.unlock();
|
recorderLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isRecordingRunningForModel(Model model) {
|
||||||
|
return recordingProcesses.stream().anyMatch(rec -> Objects.equals(rec.getModel(), model));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Optional<Recording> getRecordingProcessForModel(Model model) {
|
||||||
|
return recordingProcesses.stream().filter(rec -> Objects.equals(rec.getModel(), model)).findAny();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void switchStreamSource(Model model) throws IOException {
|
public void switchStreamSource(Model model) throws IOException {
|
||||||
if (models.contains(model)) {
|
if (models.contains(model)) {
|
||||||
|
@ -419,10 +437,7 @@ public class SimplifiedLocalRecorder implements Recorder {
|
||||||
log.debug("Switching stream source to index {} for model {}", model.getStreamUrlIndex(), model.getName());
|
log.debug("Switching stream source to index {} for model {}", model.getStreamUrlIndex(), model.getName());
|
||||||
recorderLock.lock();
|
recorderLock.lock();
|
||||||
try {
|
try {
|
||||||
Recording rec = recordingProcesses.get(model);
|
getRecordingProcessForModel(model).ifPresent(this::stopRecordingProcess);
|
||||||
if (rec != null) {
|
|
||||||
stopRecordingProcess(model);
|
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
recorderLock.unlock();
|
recorderLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -432,14 +447,14 @@ public class SimplifiedLocalRecorder implements Recorder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void stopRecordingProcess(Model model) {
|
void stopRecordingProcess(Recording rec) {
|
||||||
recorderLock.lock();
|
recorderLock.lock();
|
||||||
try {
|
try {
|
||||||
log.debug("Stopping recording for {} - recording found: {}", model, recordingProcesses.get(model));
|
var model = rec.getModel();
|
||||||
Recording rec = recordingProcesses.get(model);
|
log.debug("Stopping recording for {} - recording found: {}", model, rec);
|
||||||
log.debug("Stopping download for {}", model);
|
log.debug("Stopping download for {}", model);
|
||||||
rec.getRecordingProcess().stop();
|
rec.getRecordingProcess().stop();
|
||||||
recordingProcesses.remove(model);
|
recordingProcesses.remove(rec);
|
||||||
} finally {
|
} finally {
|
||||||
recorderLock.unlock();
|
recorderLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -448,7 +463,7 @@ public class SimplifiedLocalRecorder implements Recorder {
|
||||||
private void stopRecordingProcesses() {
|
private void stopRecordingProcesses() {
|
||||||
recorderLock.lock();
|
recorderLock.lock();
|
||||||
try {
|
try {
|
||||||
for (Recording rec : recordingProcesses.values()) {
|
for (Recording rec : recordingProcesses) {
|
||||||
rec.getRecordingProcess().stop();
|
rec.getRecordingProcess().stop();
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -508,8 +523,7 @@ public class SimplifiedLocalRecorder implements Recorder {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Recording rec = recordingProcesses.get(model);
|
getRecordingProcessForModel(model).ifPresent(this::stopRecordingProcess);
|
||||||
Optional.ofNullable(rec).map(Recording::getRecordingProcess).ifPresent(RecordingProcess::stop);
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
log.error("Couldn't save config", e);
|
log.error("Couldn't save config", e);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -564,7 +578,7 @@ public class SimplifiedLocalRecorder implements Recorder {
|
||||||
m.setMarkedForLaterRecording(mark);
|
m.setMarkedForLaterRecording(mark);
|
||||||
if (mark && getCurrentlyRecording().contains(m)) {
|
if (mark && getCurrentlyRecording().contains(m)) {
|
||||||
log.debug("Stopping recording of {}", m);
|
log.debug("Stopping recording of {}", m);
|
||||||
stopRecordingProcess(m);
|
getRecordingProcessForModel(m).ifPresent(this::stopRecordingProcess);
|
||||||
}
|
}
|
||||||
if (!mark) {
|
if (!mark) {
|
||||||
log.debug("Removing model: {}", m);
|
log.debug("Removing model: {}", m);
|
||||||
|
@ -670,8 +684,9 @@ public class SimplifiedLocalRecorder implements Recorder {
|
||||||
if (e.getType() == MODEL_ONLINE) {
|
if (e.getType() == MODEL_ONLINE) {
|
||||||
ModelIsOnlineEvent evt = (ModelIsOnlineEvent) e;
|
ModelIsOnlineEvent evt = (ModelIsOnlineEvent) e;
|
||||||
Model model = evt.getModel();
|
Model model = evt.getModel();
|
||||||
log.trace("Model online event: {} - suspended:{} - already recording:{}", model, model.isSuspended(), recordingProcesses.containsKey(model));
|
log.trace("Model online event: {} - suspended:{} - already recording:{}", model, model.isSuspended(), isRecordingRunningForModel(model));
|
||||||
if (!isSuspended(model) && !recordingProcesses.containsKey(model)) {
|
log.trace("Recording processes: {}", recordingProcesses.size());
|
||||||
|
if (!isSuspended(model) && !isRecordingRunningForModel(model)) {
|
||||||
startRecordingProcess(model);
|
startRecordingProcess(model);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -796,7 +811,7 @@ public class SimplifiedLocalRecorder implements Recorder {
|
||||||
return running;
|
return running;
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<Model, Recording> getRecordingProcesses() {
|
List<Recording> getRecordingProcesses() {
|
||||||
return recordingProcesses;
|
return recordingProcesses;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,15 +4,18 @@ import ctbrec.Config;
|
||||||
import ctbrec.Model;
|
import ctbrec.Model;
|
||||||
import ctbrec.Settings;
|
import ctbrec.Settings;
|
||||||
import ctbrec.UnknownModel;
|
import ctbrec.UnknownModel;
|
||||||
import ctbrec.recorder.download.hls.CombinedSplittingStrategy;
|
import ctbrec.recorder.download.hls.*;
|
||||||
import ctbrec.recorder.download.hls.NoopSplittingStrategy;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import ctbrec.recorder.download.hls.SizeSplittingStrategy;
|
|
||||||
import ctbrec.recorder.download.hls.TimeSplittingStrategy;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
|
import static ctbrec.recorder.download.StreamSource.UNKNOWN;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
public abstract class AbstractDownload implements RecordingProcess {
|
public abstract class AbstractDownload implements RecordingProcess {
|
||||||
|
|
||||||
protected Instant startTime;
|
protected Instant startTime;
|
||||||
|
@ -22,6 +25,7 @@ public abstract class AbstractDownload implements RecordingProcess {
|
||||||
protected Config config;
|
protected Config config;
|
||||||
protected SplittingStrategy splittingStrategy;
|
protected SplittingStrategy splittingStrategy;
|
||||||
protected ExecutorService downloadExecutor;
|
protected ExecutorService downloadExecutor;
|
||||||
|
protected int selectedResolution = UNKNOWN;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException {
|
public void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException {
|
||||||
|
@ -72,11 +76,40 @@ public abstract class AbstractDownload implements RecordingProcess {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getSelectedResolution() {
|
public int getSelectedResolution() {
|
||||||
return StreamSource.UNKNOWN;
|
return selectedResolution;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void awaitEnd() {
|
public void awaitEnd() {
|
||||||
// do nothing per default
|
// do nothing per default
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected StreamSource selectStreamSource(List<StreamSource> streamSources) throws ExecutionException {
|
||||||
|
if (model.getStreamUrlIndex() >= 0 && model.getStreamUrlIndex() < streamSources.size()) {
|
||||||
|
// TODO don't use the index, but the bandwidth. if the bandwidth does not match, take the closest one
|
||||||
|
log.debug("Model stream index: {}", model.getStreamUrlIndex());
|
||||||
|
streamSources.forEach(ss -> log.debug(ss.toString()));
|
||||||
|
StreamSource source = streamSources.get(model.getStreamUrlIndex());
|
||||||
|
log.debug("{} selected {}", model.getName(), source);
|
||||||
|
selectedResolution = source.height;
|
||||||
|
return source;
|
||||||
|
} else {
|
||||||
|
// filter out stream resolutions, which are out of range of the configured min and max
|
||||||
|
int minRes = Config.getInstance().getSettings().minimumResolution;
|
||||||
|
int maxRes = Config.getInstance().getSettings().maximumResolution;
|
||||||
|
List<StreamSource> filteredStreamSources = streamSources.stream()
|
||||||
|
.filter(src -> src.height == 0 || src.height == UNKNOWN || minRes <= src.height)
|
||||||
|
.filter(src -> src.height == 0 || src.height == UNKNOWN || maxRes >= src.height)
|
||||||
|
.toList();
|
||||||
|
|
||||||
|
if (filteredStreamSources.isEmpty()) {
|
||||||
|
throw new ExecutionException(new NoStreamFoundException("No stream left in playlist"));
|
||||||
|
} else {
|
||||||
|
StreamSource source = filteredStreamSources.get(filteredStreamSources.size() - 1);
|
||||||
|
log.debug("{} selected {}", model.getName(), source);
|
||||||
|
selectedResolution = source.height;
|
||||||
|
return source;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,7 +27,9 @@ public interface RecordingProcess extends Callable<RecordingProcess> {
|
||||||
|
|
||||||
void postProcess(Recording recording);
|
void postProcess(Recording recording);
|
||||||
|
|
||||||
int getSelectedResolution();
|
default int getSelectedResolution() {
|
||||||
|
return StreamSource.UNKNOWN;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the path to the recording in the filesystem as file object
|
* Returns the path to the recording in the filesystem as file object
|
||||||
|
|
|
@ -252,31 +252,10 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
|
||||||
for (StreamSource streamSource : streamSources) {
|
for (StreamSource streamSource : streamSources) {
|
||||||
LOG.debug("{}:{} src {}", model.getSite().getName(), model.getName(), streamSource);
|
LOG.debug("{}:{} src {}", model.getSite().getName(), model.getName(), streamSource);
|
||||||
}
|
}
|
||||||
String url;
|
StreamSource selectedStreamSource = selectStreamSource(streamSources);
|
||||||
if (model.getStreamUrlIndex() >= 0 && model.getStreamUrlIndex() < streamSources.size()) {
|
String url = selectedStreamSource.getMediaPlaylistUrl();
|
||||||
// TODO don't use the index, but the bandwidth. if the bandwidth does not match, take the closest one
|
selectedResolution = selectedStreamSource.height;
|
||||||
StreamSource source = streamSources.get(model.getStreamUrlIndex());
|
|
||||||
LOG.debug("{} selected {}", model.getName(), source);
|
|
||||||
url = source.getMediaPlaylistUrl();
|
|
||||||
selectedResolution = source.height;
|
|
||||||
} else {
|
|
||||||
// filter out stream resolutions, which are out of range of the configured min and max
|
|
||||||
int minRes = Config.getInstance().getSettings().minimumResolution;
|
|
||||||
int maxRes = Config.getInstance().getSettings().maximumResolution;
|
|
||||||
List<StreamSource> filteredStreamSources = streamSources.stream()
|
|
||||||
.filter(src -> src.height == 0 || src.height == UNKNOWN || minRes <= src.height)
|
|
||||||
.filter(src -> src.height == 0 || src.height == UNKNOWN || maxRes >= src.height)
|
|
||||||
.toList();
|
|
||||||
|
|
||||||
if (filteredStreamSources.isEmpty()) {
|
|
||||||
throw new ExecutionException(new NoStreamFoundException("No stream left in playlist"));
|
|
||||||
} else {
|
|
||||||
StreamSource source = filteredStreamSources.get(filteredStreamSources.size() - 1);
|
|
||||||
LOG.debug("{} selected {}", model.getName(), source);
|
|
||||||
url = source.getMediaPlaylistUrl();
|
|
||||||
selectedResolution = source.height;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LOG.debug("Segment playlist url {}", url);
|
LOG.debug("Segment playlist url {}", url);
|
||||||
return url;
|
return url;
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,10 @@ import java.io.IOException;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.time.LocalTime;
|
import java.time.LocalTime;
|
||||||
import java.time.temporal.ChronoUnit;
|
import java.time.temporal.ChronoUnit;
|
||||||
import java.util.*;
|
import java.util.ArrayList;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
import static java.time.temporal.ChronoUnit.HOURS;
|
import static java.time.temporal.ChronoUnit.HOURS;
|
||||||
|
@ -84,16 +87,19 @@ class RecordingPreconditionsTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void testRecordingAlreadyRunning() {
|
void testRecordingAlreadyRunning() throws IOException, ExecutionException, InterruptedException {
|
||||||
var recorder = mock(SimplifiedLocalRecorder.class);
|
var recorder = mock(SimplifiedLocalRecorder.class);
|
||||||
Model model = mock(Model.class);
|
Model model = mock(Model.class);
|
||||||
|
when(model.isOnline(true)).thenReturn(true);
|
||||||
|
|
||||||
when(recorder.isRunning()).thenReturn(true);
|
when(recorder.isRunning()).thenReturn(true);
|
||||||
Map<Model, Recording> recordingProcesses = new HashMap<>();
|
Recording rec = new Recording();
|
||||||
recordingProcesses.put(model, new Recording());
|
rec.setModel(model);
|
||||||
when(recorder.getRecordingProcesses()).thenReturn(recordingProcesses);
|
when(recorder.getRecordingProcesses()).thenReturn(List.of(rec));
|
||||||
when(model.getRecordUntil()).thenReturn(Instant.MAX);
|
when(model.getRecordUntil()).thenReturn(Instant.MAX);
|
||||||
when(model.toString()).thenReturn("Mockita Boobilicious");
|
when(model.toString()).thenReturn("Mockita Boobilicious");
|
||||||
|
when(recorder.getModels()).thenReturn(List.of(model));
|
||||||
|
when(recorder.isRecordingRunningForModel(model)).thenReturn(true);
|
||||||
|
|
||||||
RecordingPreconditions preconditions = new RecordingPreconditions(recorder, config);
|
RecordingPreconditions preconditions = new RecordingPreconditions(recorder, config);
|
||||||
PreconditionNotMetException ex = assertThrows(PreconditionNotMetException.class, () -> preconditions.check(model));
|
PreconditionNotMetException ex = assertThrows(PreconditionNotMetException.class, () -> preconditions.check(model));
|
||||||
|
@ -171,9 +177,9 @@ class RecordingPreconditionsTest {
|
||||||
when(recorder.notEnoughSpaceForRecording()).thenReturn(false);
|
when(recorder.notEnoughSpaceForRecording()).thenReturn(false);
|
||||||
when(recorder.getModelGroup(theOtherOne)).thenReturn(Optional.of(group));
|
when(recorder.getModelGroup(theOtherOne)).thenReturn(Optional.of(group));
|
||||||
when(recorder.getModelGroup(mockita)).thenReturn(Optional.of(group));
|
when(recorder.getModelGroup(mockita)).thenReturn(Optional.of(group));
|
||||||
Map<Model, Recording> recordingProcesses = new HashMap<>();
|
Recording recording = new Recording();
|
||||||
recordingProcesses.put(theOtherOne, new Recording());
|
recording.setModel(theOtherOne);
|
||||||
when(recorder.getRecordingProcesses()).thenReturn(recordingProcesses);
|
when(recorder.getRecordingProcesses()).thenReturn(List.of(recording));
|
||||||
|
|
||||||
|
|
||||||
RecordingPreconditions preconditions = new RecordingPreconditions(recorder, config);
|
RecordingPreconditions preconditions = new RecordingPreconditions(recorder, config);
|
||||||
|
@ -251,8 +257,10 @@ class RecordingPreconditionsTest {
|
||||||
when(recorder.isRunning()).thenReturn(true);
|
when(recorder.isRunning()).thenReturn(true);
|
||||||
when(recorder.getModels()).thenReturn(modelsToRecord);
|
when(recorder.getModels()).thenReturn(modelsToRecord);
|
||||||
when(recorder.notEnoughSpaceForRecording()).thenReturn(false);
|
when(recorder.notEnoughSpaceForRecording()).thenReturn(false);
|
||||||
Map<Model, Recording> recordingProcesses = new HashMap<>();
|
List<Recording> recordingProcesses = new ArrayList<>();
|
||||||
recordingProcesses.put(theOtherOne, new Recording());
|
Recording rec = new Recording();
|
||||||
|
rec.setModel(theOtherOne);
|
||||||
|
recordingProcesses.add(rec);
|
||||||
when(recorder.getRecordingProcesses()).thenReturn(recordingProcesses);
|
when(recorder.getRecordingProcesses()).thenReturn(recordingProcesses);
|
||||||
|
|
||||||
RecordingPreconditions preconditions = new RecordingPreconditions(recorder, config);
|
RecordingPreconditions preconditions = new RecordingPreconditions(recorder, config);
|
||||||
|
@ -288,7 +296,7 @@ class RecordingPreconditionsTest {
|
||||||
when(recorder.isRunning()).thenReturn(true);
|
when(recorder.isRunning()).thenReturn(true);
|
||||||
when(recorder.getModels()).thenReturn(modelsToRecord);
|
when(recorder.getModels()).thenReturn(modelsToRecord);
|
||||||
when(recorder.notEnoughSpaceForRecording()).thenReturn(false);
|
when(recorder.notEnoughSpaceForRecording()).thenReturn(false);
|
||||||
Map<Model, Recording> recordingProcesses = new HashMap<>();
|
List<Recording> recordingProcesses = new ArrayList<>();
|
||||||
when(recorder.getRecordingProcesses()).thenReturn(recordingProcesses);
|
when(recorder.getRecordingProcesses()).thenReturn(recordingProcesses);
|
||||||
|
|
||||||
Model theOtherOne = mock(Model.class);
|
Model theOtherOne = mock(Model.class);
|
||||||
|
@ -297,7 +305,7 @@ class RecordingPreconditionsTest {
|
||||||
when(theOtherOne.isOnline(true)).thenReturn(true);
|
when(theOtherOne.isOnline(true)).thenReturn(true);
|
||||||
when(theOtherOne.getUrl()).thenReturn("http://localhost/theOtherOne");
|
when(theOtherOne.getUrl()).thenReturn("http://localhost/theOtherOne");
|
||||||
when(theOtherOne.getPriority()).thenReturn(50);
|
when(theOtherOne.getPriority()).thenReturn(50);
|
||||||
recordingProcesses.put(theOtherOne, mockRecordingProcess(theOtherOne));
|
recordingProcesses.add(mockRecordingProcess(theOtherOne));
|
||||||
|
|
||||||
Model lowestPrio = mock(Model.class);
|
Model lowestPrio = mock(Model.class);
|
||||||
when(lowestPrio.getRecordUntil()).thenReturn(Instant.MAX);
|
when(lowestPrio.getRecordUntil()).thenReturn(Instant.MAX);
|
||||||
|
@ -305,11 +313,16 @@ class RecordingPreconditionsTest {
|
||||||
when(lowestPrio.isOnline(true)).thenReturn(true);
|
when(lowestPrio.isOnline(true)).thenReturn(true);
|
||||||
when(lowestPrio.getUrl()).thenReturn("http://localhost/lowest");
|
when(lowestPrio.getUrl()).thenReturn("http://localhost/lowest");
|
||||||
when(lowestPrio.getPriority()).thenReturn(1);
|
when(lowestPrio.getPriority()).thenReturn(1);
|
||||||
recordingProcesses.put(theOtherOne, mockRecordingProcess(lowestPrio));
|
Recording lowestPrioRecording = new Recording();
|
||||||
|
RecordingProcess recordingProcess = mock(RecordingProcess.class);
|
||||||
|
when(recordingProcess.getModel()).thenReturn(lowestPrio);
|
||||||
|
lowestPrioRecording.setRecordingProcess(recordingProcess);
|
||||||
|
lowestPrioRecording.setModel(lowestPrio);
|
||||||
|
recordingProcesses.add(lowestPrioRecording);
|
||||||
|
|
||||||
RecordingPreconditions preconditions = new RecordingPreconditions(recorder, config);
|
RecordingPreconditions preconditions = new RecordingPreconditions(recorder, config);
|
||||||
assertDoesNotThrow(() -> preconditions.check(mockita));
|
assertDoesNotThrow(() -> preconditions.check(mockita));
|
||||||
verify(recorder).stopRecordingProcess(lowestPrio);
|
verify(recorder).stopRecordingProcess(lowestPrioRecording);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -328,7 +341,7 @@ class RecordingPreconditionsTest {
|
||||||
when(recorder.isRunning()).thenReturn(true);
|
when(recorder.isRunning()).thenReturn(true);
|
||||||
when(recorder.getModels()).thenReturn(modelsToRecord);
|
when(recorder.getModels()).thenReturn(modelsToRecord);
|
||||||
when(recorder.notEnoughSpaceForRecording()).thenReturn(false);
|
when(recorder.notEnoughSpaceForRecording()).thenReturn(false);
|
||||||
Map<Model, Recording> recordingProcesses = new HashMap<>();
|
List<Recording> recordingProcesses = new ArrayList<>();
|
||||||
when(recorder.getRecordingProcesses()).thenReturn(recordingProcesses);
|
when(recorder.getRecordingProcesses()).thenReturn(recordingProcesses);
|
||||||
|
|
||||||
RecordingPreconditions preconditions = new RecordingPreconditions(recorder, config);
|
RecordingPreconditions preconditions = new RecordingPreconditions(recorder, config);
|
||||||
|
@ -371,6 +384,7 @@ class RecordingPreconditionsTest {
|
||||||
when(download.getModel()).thenReturn(model);
|
when(download.getModel()).thenReturn(model);
|
||||||
Recording runningRecording = mock(Recording.class);
|
Recording runningRecording = mock(Recording.class);
|
||||||
when(runningRecording.getRecordingProcess()).thenReturn(download);
|
when(runningRecording.getRecordingProcess()).thenReturn(download);
|
||||||
|
when(runningRecording.getModel()).thenReturn(model);
|
||||||
return runningRecording;
|
return runningRecording;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue