Improve group hangling

- try starting model's group after failing to restart that specific model
- fix recording hijacking of same-priority model
This commit is contained in:
reusedname 2025-02-27 21:22:47 +05:00
parent 164907b691
commit 994bb99b1f
3 changed files with 74 additions and 36 deletions

View File

@ -78,7 +78,7 @@ public class OnlineMonitor extends Thread {
boolean skipCheckForMarkedAsLater = model.isMarkedForLaterRecording();
if (skipCheckForSuspended || skipCheckForMarkedAsLater) {
// force set offline in case model was suspended while online
// TODO: check if this is thread safe
// this should be thread safe as we don't access the same model from multiple threads
setModelStateNotified(model, Model.State.OFFLINE);
} else {
futures.add(updateModel(model));

View File

@ -8,11 +8,14 @@ import ctbrec.recorder.download.RecordingProcess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Comparators;
import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.time.LocalTime;
import java.util.Comparator;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@ -33,7 +36,7 @@ public class RecordingPreconditions {
this.config = config;
}
void check(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
void check(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, ExecutionException {
LOG.debug("Checking preconditions for model {}", model);
ensureRecorderIsActive();
ensureNotInTimeoutPeriod();
@ -44,6 +47,7 @@ public class RecordingPreconditions {
ensureNoRecordingRunningForModel(model);
ensureModelShouldBeRecorded(model);
ensureEnoughSpaceForRecording();
//ensureAllModelsFromGroupAreOnlineChecked(model);
ensureNoOtherFromModelGroupIsRecording(model);
ensureModelIsOnline(model);
ensureDownloadSlotAvailable(model);
@ -169,17 +173,39 @@ public class RecordingPreconditions {
return concurrentRecordings == 0 || concurrentRecordings > 0 && recorder.getRecordingProcesses().size() < concurrentRecordings;
}
private void ensureAllModelsFromGroupAreOnlineChecked(Model model) throws IOException, ExecutionException {
Optional<ModelGroup> modelGroup = recorder.getModelGroup(model);
if (modelGroup.isEmpty()) {
return;
}
for (String modelUrl : modelGroup.get().getModelUrls()) {
Optional<Model> otherModel = getModelForUrl(modelUrl);
if (otherModel.isPresent()) {
if (otherModel.get().getOnlineState(true) == Model.State.UNCHECKED) {
throw new PreconditionNotMetException(otherModel.get() + "'s online state has not been checked yet");
}
} else {
LOG.warn("Model for url {} was not found", modelUrl);
}
}
}
private void ensureNoOtherFromModelGroupIsRecording(Model model) throws InvalidKeyException, NoSuchAlgorithmException, IOException {
Optional<ModelGroup> modelGroup = recorder.getModelGroup(model);
if (modelGroup.isEmpty()) {
return;
}
// go through each model in group in descendind priority order, checking this model last amongst same-prio models
// this is to make sure that we only stop lower priority recordings in favor of this one. I.e. if same-prio model is recordnig, let it run
for (var groupModel : modelGroup.get().getModelUrls().stream()
.map(modelUrl -> getModelForUrl(modelUrl))
.filter(x -> x.isPresent())
.map(x -> x.get())
.sorted((l, r) -> Integer.compare(r.getPriority(), l.getPriority())) // high to low
.sorted(Comparator
.comparing((Model m) -> m.getPriority()).reversed() // high to low
.thenComparing((Model m) -> m.getUrl().equals(model.getUrl()))) // this model last (false -> true)
.toList()) {
if (model.getUrl().equals(groupModel.getUrl())) {
// no other model with higher prio is online, start recording
@ -187,15 +213,11 @@ public class RecordingPreconditions {
stopModelsWithLowerPrio(modelGroup.get());
return;
} else {
Optional<Model> otherModel = getModelForUrl(groupModel.getUrl());
if (otherModel.isPresent()) {
if (!otherModel.get().isSuspended() && otherModelIsRecorded(otherModel.get())) {
throw new PreconditionNotMetException(otherModel.get() + " from the same group is already recorded");
} else if (otherModelCanBeRecorded(otherModel.get())) {
throw new PreconditionNotMetException(otherModel.get() + " from the same group can be recorded");
}
} else {
LOG.warn("Couldn't check if model from same group has higer prio for {}", groupModel.getUrl());
Model otherModel = groupModel;
if (!otherModel.isSuspended() && otherModelIsRecorded(otherModel)) {
throw new PreconditionNotMetException(otherModel + " from the same group is already recorded");
} else if (otherModel.getPriority() > model.getPriority() && otherModelCanBeRecorded(otherModel)) {
throw new PreconditionNotMetException(otherModel + " from the same group can be recorded");
}
}
}

View File

@ -747,13 +747,14 @@ public class SimplifiedLocalRecorder implements Recorder {
boolean online = model.isOnline(IGNORE_CACHE);
if (online) {
log.info("Restarting recording for model {}", model);
try {
recorderLock.lock();
startRecordingProcessAsync(model);
} finally {
recorderLock.unlock();
}
online = startRecordingProcessSync(model);
}
// if the specific model is offline or starting it failed, try to restart it's group
// FIXME: this might spam unnecessary start attempts
if (!online) {
log.info("Restarting recording for group of offline model {}", model);
startGroupRecordingOfModel(model);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@ -789,7 +790,7 @@ public class SimplifiedLocalRecorder implements Recorder {
});
}
private void startRecordingProcessSync(Model model) {
private boolean startRecordingProcessSync(Model model) {
recorderLock.lock();
try {
preconditions.check(model);
@ -800,6 +801,7 @@ public class SimplifiedLocalRecorder implements Recorder {
rec.getModel().setLastRecorded(rec.getStartDate());
recordingManager.saveRecording(rec);
recordingLoopPool.submit(() -> {singleRecordingLoop(rec);});
return true;
} catch (RecordUntilExpiredException e) {
log.info("Precondition not met to record {}: {}", model, e.getLocalizedMessage());
executeRecordUntilSubsequentAction(model);
@ -810,6 +812,7 @@ public class SimplifiedLocalRecorder implements Recorder {
} finally {
recorderLock.unlock();
}
return false;
}
private void startRecordingProcessAsync(Model model) {
@ -845,30 +848,43 @@ public class SimplifiedLocalRecorder implements Recorder {
log.error("Recording {} not found. Can't rerun post-processing", recording);
}
private void startGroupRecordingOfModel(Model model) {
private boolean startGroupRecordingOfModel(Model model) {
var opt = getModelGroup(model);
if (!opt.isPresent()) return;
if (!opt.isPresent())
return false;
var group = opt.get();
List<Model> groupModels = group.getModelUrls().stream()
.map(url -> {
// FIXME: replace loop with hashmap lookup
for (var m : models) {
if (m.getUrl().equals(url))
return Optional.of(m);
}
return Optional.empty();
})
.filter(Optional::isPresent)
.map(x -> (Model)x.get())
.sorted((l, r) -> Integer.compare(r.getPriority(), l.getPriority()))
.toList();
recordingLoopPool.submit(() -> {
List<Model> groupModels = group.getModelUrls().stream()
.map(url -> {
// FIXME: replace loop with hashmap lookup
for (var m : models) {
if (m.getUrl().equals(url))
return Optional.of(m);
}
return Optional.empty();
})
.filter(Optional::isPresent)
.map(x -> (Model)x.get())
.filter(x -> {
try {
// this call is IO blocking
return x.isOnline();
} catch (Exception e) {
return false;
}
})
.sorted((l, r) -> Integer.compare(r.getPriority(), l.getPriority()))
.toList();
for (var groupModel : groupModels) {
startRecordingProcessSync(groupModel);
if (startRecordingProcessSync(groupModel))
return;
}
});
return true;
}
@Override