Fix graceful shutdown

This commit is contained in:
0xb00bface 2023-11-11 11:58:42 +01:00
parent 797c69c06b
commit ef2e354d65
2 changed files with 72 additions and 55 deletions

View File

@ -65,15 +65,16 @@ public class RecordingPreconditions {
}
private void ensureModelIsOnline(Model model) {
String msg = model.getName() + "'s room is not public";
try {
if (!model.isOnline(IGNORE_CACHE)) {
throw new PreconditionNotMetException(model.getName() + "'s room is not public");
throw new PreconditionNotMetException(msg);
}
} catch (IOException | ExecutionException e) {
throw new PreconditionNotMetException(model.getName() + "'s room is not public");
throw new PreconditionNotMetException(msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PreconditionNotMetException(model.getName() + "'s room is not public");
throw new PreconditionNotMetException(msg);
}
}
@ -158,7 +159,7 @@ public class RecordingPreconditions {
}
private void ensureRecorderIsActive() {
if (!recorder.isRunning()) {
if (!recorder.isRunning() || recorder.isShuttingDown()) {
throw new PreconditionNotMetException("Recorder is not in recording mode");
}
}
@ -170,24 +171,26 @@ public class RecordingPreconditions {
private void ensureNoOtherFromModelGroupIsRecording(Model model) throws InvalidKeyException, NoSuchAlgorithmException, IOException {
Optional<ModelGroup> modelGroup = recorder.getModelGroup(model);
if (modelGroup.isPresent()) {
for (String modelUrl : modelGroup.get().getModelUrls()) {
if (modelUrl.equals(model.getUrl())) {
// no other model with higher prio is online, start recording
// but before that stop all recordings of models with lower prio
stopModelsWithLowerPrio(modelGroup.get());
return;
} else {
Optional<Model> otherModel = getModelForUrl(modelUrl);
if (otherModel.isPresent()) {
if (otherModelIsRecorded(otherModel.get())) {
throw new PreconditionNotMetException(otherModel.get() + " from the same group is already recorded");
} else if (otherModelCanBeRecorded(otherModel.get())) {
throw new PreconditionNotMetException(otherModel.get() + " from the same group can be recorded");
}
} else {
LOG.warn("Couldn't check if model from same group has higer prio for {}", modelUrl);
if (modelGroup.isEmpty()) {
return;
}
for (String modelUrl : modelGroup.get().getModelUrls()) {
if (modelUrl.equals(model.getUrl())) {
// no other model with higher prio is online, start recording
// but before that stop all recordings of models with lower prio
stopModelsWithLowerPrio(modelGroup.get());
return;
} else {
Optional<Model> otherModel = getModelForUrl(modelUrl);
if (otherModel.isPresent()) {
if (otherModelIsRecorded(otherModel.get())) {
throw new PreconditionNotMetException(otherModel.get() + " from the same group is already recorded");
} else if (otherModelCanBeRecorded(otherModel.get())) {
throw new PreconditionNotMetException(otherModel.get() + " from the same group can be recorded");
}
} else {
LOG.warn("Couldn't check if model from same group has higer prio for {}", modelUrl);
}
}
}

View File

@ -40,9 +40,10 @@ import static java.lang.Thread.MIN_PRIORITY;
public class SimplifiedLocalRecorder implements Recorder {
public static final boolean IGNORE_CACHE = true;
private List<Model> models = Collections.synchronizedList(new ArrayList<>());
private final List<Model> models = Collections.synchronizedList(new ArrayList<>());
private final Config config;
private volatile boolean running;
private volatile boolean shuttingDown = false;
private final ReentrantLock recorderLock = new ReentrantLock();
private final ReentrantLock modelGroupLock = new ReentrantLock();
private final RecorderHttpClient client;
@ -223,7 +224,7 @@ public class SimplifiedLocalRecorder implements Recorder {
}
private void waitForRecordingsToTerminate() {
long secondsToWait = 30;
long secondsToWait = TimeUnit.MINUTES.toSeconds(10);
for (int i = 0; i < secondsToWait; i++) {
if (recordingProcesses.isEmpty()) {
return;
@ -240,35 +241,7 @@ public class SimplifiedLocalRecorder implements Recorder {
try {
postProcessing.submit(() -> {
try {
setRecordingStatus(recording, State.POST_PROCESSING);
recording.getRecordingProcess().stop();
recording.getRecordingProcess().awaitEnd();
recording.getRecordingProcess().finalizeDownload();
recording.refresh();
recordingManager.saveRecording(recording);
recording.postprocess();
List<PostProcessor> postProcessors = config.getSettings().postProcessors
.stream()
.map(Mappers.getMapper(PostProcessorMapper.class)::toPostProcessor)
.toList();
PostProcessingContext ctx = createPostProcessingContext(recording);
for (PostProcessor postProcessor : postProcessors) {
if (postProcessor.isEnabled()) {
log.debug("Running post-processor: {}", postProcessor.getName());
boolean continuePP = postProcessor.postprocess(ctx);
if (!continuePP) {
break;
}
} else {
log.debug("Skipping post-processor {} because it is disabled", postProcessor.getName());
}
}
recording.refresh();
if (recording.getStatus() != State.DELETED) {
setRecordingStatus(recording, State.FINISHED);
recordingManager.saveRecording(recording);
}
log.info("Post-processing finished for {}", recording.getModel().getName());
runPostProcessing(recording);
} catch (Exception e) {
if (e instanceof InterruptedException) { // NOSONAR
Thread.currentThread().interrupt();
@ -287,6 +260,38 @@ public class SimplifiedLocalRecorder implements Recorder {
}
}
private void runPostProcessing(Recording recording) throws IOException, InterruptedException {
setRecordingStatus(recording, State.POST_PROCESSING);
recording.getRecordingProcess().stop();
recording.getRecordingProcess().awaitEnd();
recording.getRecordingProcess().finalizeDownload();
recording.refresh();
recordingManager.saveRecording(recording);
recording.postprocess();
List<PostProcessor> postProcessors = config.getSettings().postProcessors
.stream()
.map(Mappers.getMapper(PostProcessorMapper.class)::toPostProcessor)
.toList();
PostProcessingContext ctx = createPostProcessingContext(recording);
for (PostProcessor postProcessor : postProcessors) {
if (postProcessor.isEnabled()) {
log.debug("Running post-processor: {}", postProcessor.getName());
boolean continuePP = postProcessor.postprocess(ctx);
if (!continuePP) {
break;
}
} else {
log.debug("Skipping post-processor {} because it is disabled", postProcessor.getName());
}
}
recording.refresh();
if (recording.getStatus() != State.DELETED) {
setRecordingStatus(recording, State.FINISHED);
recordingManager.saveRecording(recording);
}
log.info("Post-processing finished for {}", recording.getModel().getName());
}
private PostProcessingContext createPostProcessingContext(Recording recording) {
PostProcessingContext ctx = new PostProcessingContext();
ctx.setConfig(config);
@ -322,7 +327,7 @@ public class SimplifiedLocalRecorder implements Recorder {
config.getSettings().models.add(Mappers.getMapper(ModelMapper.class).toDto(model));
config.save();
} catch (IOException e) {
log.error("Couldn't save config", e);
errorSavingConfig(e);
} finally {
recorderLock.unlock();
}
@ -334,6 +339,10 @@ public class SimplifiedLocalRecorder implements Recorder {
}
}
private void errorSavingConfig(IOException e) {
log.error("Couldn't save config", e);
}
private void copyModelProperties(Model src, Model existing) {
existing.setSuspended(src.isSuspended());
existing.setMarkedForLaterRecording(src.isMarkedForLaterRecording());
@ -505,6 +514,7 @@ public class SimplifiedLocalRecorder implements Recorder {
@Override
public void shutdown(boolean immediately) {
log.info("Shutting down");
shuttingDown = true;
if (!immediately) {
try {
stopRecordings();
@ -536,7 +546,7 @@ public class SimplifiedLocalRecorder implements Recorder {
getRecordingProcessForModel(model).ifPresent(this::stopRecordingProcess);
} catch (IOException e) {
log.error("Couldn't save config", e);
errorSavingConfig(e);
} finally {
recorderLock.unlock();
}
@ -780,7 +790,7 @@ public class SimplifiedLocalRecorder implements Recorder {
log.warn("Couldn't change priority for model {}. Not found in list", model.getName());
}
} catch (IOException e) {
log.error("Couldn't save config", e);
errorSavingConfig(e);
} finally {
recorderLock.unlock();
}
@ -828,6 +838,10 @@ public class SimplifiedLocalRecorder implements Recorder {
return running;
}
boolean isShuttingDown() {
return shuttingDown;
}
List<Recording> getRecordingProcesses() {
return recordingProcesses;
}