diff --git a/common/src/main/java/ctbrec/recorder/OnlineMonitor.java b/common/src/main/java/ctbrec/recorder/OnlineMonitor.java index 0d7332b0..7ceaab31 100644 --- a/common/src/main/java/ctbrec/recorder/OnlineMonitor.java +++ b/common/src/main/java/ctbrec/recorder/OnlineMonitor.java @@ -78,7 +78,7 @@ public class OnlineMonitor extends Thread { boolean skipCheckForMarkedAsLater = model.isMarkedForLaterRecording(); if (skipCheckForSuspended || skipCheckForMarkedAsLater) { // force set offline in case model was suspended while online - // TODO: check if this is thread safe + // this should be thread safe as we don't access the same model from multiple threads setModelStateNotified(model, Model.State.OFFLINE); } else { futures.add(updateModel(model)); diff --git a/common/src/main/java/ctbrec/recorder/RecordingPreconditions.java b/common/src/main/java/ctbrec/recorder/RecordingPreconditions.java index bffbed64..66482995 100644 --- a/common/src/main/java/ctbrec/recorder/RecordingPreconditions.java +++ b/common/src/main/java/ctbrec/recorder/RecordingPreconditions.java @@ -8,11 +8,14 @@ import ctbrec.recorder.download.RecordingProcess; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Comparators; + import java.io.IOException; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.time.Instant; import java.time.LocalTime; +import java.util.Comparator; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -33,7 +36,7 @@ public class RecordingPreconditions { this.config = config; } - void check(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException { + void check(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, ExecutionException { LOG.debug("Checking preconditions for model {}", model); ensureRecorderIsActive(); ensureNotInTimeoutPeriod(); @@ -44,6 +47,7 @@ public class RecordingPreconditions { ensureNoRecordingRunningForModel(model); ensureModelShouldBeRecorded(model); ensureEnoughSpaceForRecording(); + //ensureAllModelsFromGroupAreOnlineChecked(model); ensureNoOtherFromModelGroupIsRecording(model); ensureModelIsOnline(model); ensureDownloadSlotAvailable(model); @@ -169,17 +173,39 @@ public class RecordingPreconditions { return concurrentRecordings == 0 || concurrentRecordings > 0 && recorder.getRecordingProcesses().size() < concurrentRecordings; } + private void ensureAllModelsFromGroupAreOnlineChecked(Model model) throws IOException, ExecutionException { + Optional modelGroup = recorder.getModelGroup(model); + if (modelGroup.isEmpty()) { + return; + } + + for (String modelUrl : modelGroup.get().getModelUrls()) { + Optional otherModel = getModelForUrl(modelUrl); + if (otherModel.isPresent()) { + if (otherModel.get().getOnlineState(true) == Model.State.UNCHECKED) { + throw new PreconditionNotMetException(otherModel.get() + "'s online state has not been checked yet"); + } + } else { + LOG.warn("Model for url {} was not found", modelUrl); + } + } + } + private void ensureNoOtherFromModelGroupIsRecording(Model model) throws InvalidKeyException, NoSuchAlgorithmException, IOException { Optional modelGroup = recorder.getModelGroup(model); if (modelGroup.isEmpty()) { return; } + // go through each model in group in descendind priority order, checking this model last amongst same-prio models + // this is to make sure that we only stop lower priority recordings in favor of this one. I.e. if same-prio model is recordnig, let it run for (var groupModel : modelGroup.get().getModelUrls().stream() .map(modelUrl -> getModelForUrl(modelUrl)) .filter(x -> x.isPresent()) .map(x -> x.get()) - .sorted((l, r) -> Integer.compare(r.getPriority(), l.getPriority())) // high to low + .sorted(Comparator + .comparing((Model m) -> m.getPriority()).reversed() // high to low + .thenComparing((Model m) -> m.getUrl().equals(model.getUrl()))) // this model last (false -> true) .toList()) { if (model.getUrl().equals(groupModel.getUrl())) { // no other model with higher prio is online, start recording @@ -187,15 +213,11 @@ public class RecordingPreconditions { stopModelsWithLowerPrio(modelGroup.get()); return; } else { - Optional otherModel = getModelForUrl(groupModel.getUrl()); - if (otherModel.isPresent()) { - if (!otherModel.get().isSuspended() && otherModelIsRecorded(otherModel.get())) { - throw new PreconditionNotMetException(otherModel.get() + " from the same group is already recorded"); - } else if (otherModelCanBeRecorded(otherModel.get())) { - throw new PreconditionNotMetException(otherModel.get() + " from the same group can be recorded"); - } - } else { - LOG.warn("Couldn't check if model from same group has higer prio for {}", groupModel.getUrl()); + Model otherModel = groupModel; + if (!otherModel.isSuspended() && otherModelIsRecorded(otherModel)) { + throw new PreconditionNotMetException(otherModel + " from the same group is already recorded"); + } else if (otherModel.getPriority() > model.getPriority() && otherModelCanBeRecorded(otherModel)) { + throw new PreconditionNotMetException(otherModel + " from the same group can be recorded"); } } } diff --git a/common/src/main/java/ctbrec/recorder/SimplifiedLocalRecorder.java b/common/src/main/java/ctbrec/recorder/SimplifiedLocalRecorder.java index 1b1ee567..fe66bef9 100644 --- a/common/src/main/java/ctbrec/recorder/SimplifiedLocalRecorder.java +++ b/common/src/main/java/ctbrec/recorder/SimplifiedLocalRecorder.java @@ -747,13 +747,14 @@ public class SimplifiedLocalRecorder implements Recorder { boolean online = model.isOnline(IGNORE_CACHE); if (online) { log.info("Restarting recording for model {}", model); - - try { - recorderLock.lock(); - startRecordingProcessAsync(model); - } finally { - recorderLock.unlock(); - } + online = startRecordingProcessSync(model); + } + + // if the specific model is offline or starting it failed, try to restart it's group + // FIXME: this might spam unnecessary start attempts + if (!online) { + log.info("Restarting recording for group of offline model {}", model); + startGroupRecordingOfModel(model); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -789,7 +790,7 @@ public class SimplifiedLocalRecorder implements Recorder { }); } - private void startRecordingProcessSync(Model model) { + private boolean startRecordingProcessSync(Model model) { recorderLock.lock(); try { preconditions.check(model); @@ -800,6 +801,7 @@ public class SimplifiedLocalRecorder implements Recorder { rec.getModel().setLastRecorded(rec.getStartDate()); recordingManager.saveRecording(rec); recordingLoopPool.submit(() -> {singleRecordingLoop(rec);}); + return true; } catch (RecordUntilExpiredException e) { log.info("Precondition not met to record {}: {}", model, e.getLocalizedMessage()); executeRecordUntilSubsequentAction(model); @@ -810,6 +812,7 @@ public class SimplifiedLocalRecorder implements Recorder { } finally { recorderLock.unlock(); } + return false; } private void startRecordingProcessAsync(Model model) { @@ -845,30 +848,43 @@ public class SimplifiedLocalRecorder implements Recorder { log.error("Recording {} not found. Can't rerun post-processing", recording); } - private void startGroupRecordingOfModel(Model model) { + private boolean startGroupRecordingOfModel(Model model) { var opt = getModelGroup(model); - if (!opt.isPresent()) return; + if (!opt.isPresent()) + return false; var group = opt.get(); - List groupModels = group.getModelUrls().stream() - .map(url -> { - // FIXME: replace loop with hashmap lookup - for (var m : models) { - if (m.getUrl().equals(url)) - return Optional.of(m); - } - return Optional.empty(); - }) - .filter(Optional::isPresent) - .map(x -> (Model)x.get()) - .sorted((l, r) -> Integer.compare(r.getPriority(), l.getPriority())) - .toList(); recordingLoopPool.submit(() -> { + List groupModels = group.getModelUrls().stream() + .map(url -> { + // FIXME: replace loop with hashmap lookup + for (var m : models) { + if (m.getUrl().equals(url)) + return Optional.of(m); + } + return Optional.empty(); + }) + .filter(Optional::isPresent) + .map(x -> (Model)x.get()) + .filter(x -> { + try { + // this call is IO blocking + return x.isOnline(); + } catch (Exception e) { + return false; + } + }) + .sorted((l, r) -> Integer.compare(r.getPriority(), l.getPriority())) + .toList(); + for (var groupModel : groupModels) { - startRecordingProcessSync(groupModel); + if (startRecordingProcessSync(groupModel)) + return; } }); + + return true; } @Override