Move recording precondition checks to its own class

This commit is contained in:
0xb00bface 2020-08-18 10:44:41 +02:00
parent 1aa3ccdedf
commit ab10e9c176
4 changed files with 142 additions and 108 deletions

View File

@ -57,15 +57,15 @@ import ctbrec.sites.Site;
public class NextGenLocalRecorder implements Recorder {
private static final Logger LOG = LoggerFactory.getLogger(NextGenLocalRecorder.class);
private static final boolean IGNORE_CACHE = true;
public static final boolean IGNORE_CACHE = true;
private List<Model> models = Collections.synchronizedList(new ArrayList<>());
private Config config;
private volatile boolean recording = true;
private ReentrantLock recorderLock = new ReentrantLock();
private RecorderHttpClient client = new RecorderHttpClient();
private long lastPreconditionMessage = 0;
private Map<Model, Recording> recordingProcesses = Collections.synchronizedMap(new HashMap<>());
private RecordingManager recordingManager;
private RecordingPreconditions preconditions;
// thread pools for downloads and post-processing
private BlockingQueue<Runnable> downloadQueue = new SynchronousQueue<>();
@ -98,6 +98,8 @@ public class NextGenLocalRecorder implements Recorder {
recording = true;
registerEventBusListener();
preconditions = new RecordingPreconditions(this);
LOG.debug("Recorder initialized");
LOG.info("Models to record: {}", models);
LOG.info("Saving recordings in {}", config.getSettings().recordingsDir);
@ -201,23 +203,14 @@ public class NextGenLocalRecorder implements Recorder {
recorderLock.unlock();
}
// try to start the recording immediately
try {
if (model.isOnline()) {
startRecordingProcess(model);
}
} catch (ExecutionException e) {
// noop
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
startRecordingProcess(model);
}
}
private void startRecordingProcess(Model model) throws IOException {
recorderLock.lock();
try {
checkRecordingPreconditions(model);
preconditions.check(model);
LOG.info("Starting recording for model {}", model.getName());
Download download = createDownload(model);
Recording rec = createRecording(download);
@ -291,85 +284,6 @@ public class NextGenLocalRecorder implements Recorder {
return rec;
}
private void checkRecordingPreconditions(Model model) throws IOException {
ensureRecorderIsActive();
ensureModelIsNotSuspended(model);
ensureRecordUntilIsInFuture(model);
ensureNoRecordingRunningForModel(model);
ensureModelShouldBeRecorded(model);
ensureEnoughSpaceForRecording();
ensureDownloadSlotAvailable(model);
}
private void ensureRecordUntilIsInFuture(Model model) {
if (Instant.now().isAfter(model.getRecordUntil())) {
throw new RecordUntilExpiredException(model.getRecordUntil());
}
}
private void ensureEnoughSpaceForRecording() throws IOException {
if (!enoughSpaceForRecording()) {
throw new PreconditionNotMetException("Not enough disk space for recording");
}
}
private void ensureDownloadSlotAvailable(Model model) {
if (!downloadSlotAvailable()) {
long now = System.currentTimeMillis();
if ((now - lastPreconditionMessage) > TimeUnit.MINUTES.toMillis(1)) {
LOG.info("The number of downloads is maxed out");
}
// check, if we can stop a recording for a model with lower priority
Optional<Recording> lowerPrioRecordingProcess = recordingProcessWithLowerPrio(model.getPriority());
if (lowerPrioRecordingProcess.isPresent()) {
Download download = lowerPrioRecordingProcess.get().getDownload();
Model lowerPrioModel = download.getModel();
LOG.info("Stopping recording for {}. Prio {} < {}", lowerPrioModel.getName(), lowerPrioModel.getPriority(), model.getPriority());
stopRecordingProcess(lowerPrioModel);
} else {
throw new PreconditionNotMetException("Other models have higher prio, not starting recording for " + model.getName());
}
}
}
private void ensureModelShouldBeRecorded(Model model) {
if (!models.contains(model)) {
throw new PreconditionNotMetException("Model " + model + " has been removed. Restarting of recording cancelled.");
}
}
private void ensureNoRecordingRunningForModel(Model model) {
if (recordingProcesses.containsKey(model)) {
throw new PreconditionNotMetException("A recording for model " + model + " is already running");
}
}
private void ensureModelIsNotSuspended(Model model) {
if (model.isSuspended()) {
throw new PreconditionNotMetException("Recording for model " + model + " is suspended");
}
}
private void ensureRecorderIsActive() {
if (!recording) {
throw new PreconditionNotMetException("Recorder is not in recording mode");
}
}
private Optional<Recording> recordingProcessWithLowerPrio(int priority) {
Model lowest = null;
for (Model m : recordingProcesses.keySet()) {
if (lowest == null || m.getPriority() < lowest.getPriority()) {
lowest = m;
}
}
if (lowest != null && lowest.getPriority() < priority) {
return Optional.of(recordingProcesses.get(lowest));
} else {
return Optional.empty();
}
}
private boolean deleteIfEmpty(Recording rec) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
rec.refresh();
long sizeInByte = rec.getSizeInByte();
@ -442,7 +356,7 @@ public class NextGenLocalRecorder implements Recorder {
}
}
private void stopRecordingProcess(Model model) {
void stopRecordingProcess(Model model) {
recorderLock.lock();
try {
LOG.debug("Stopping recording for {}", model);
@ -658,7 +572,7 @@ public class NextGenLocalRecorder implements Recorder {
return store;
}
private boolean enoughSpaceForRecording() throws IOException {
boolean enoughSpaceForRecording() throws IOException {
long minimum = config.getSettings().minimumSpaceLeftInBytes;
if (minimum == 0) { // 0 means don't check
return true;
@ -667,11 +581,6 @@ public class NextGenLocalRecorder implements Recorder {
}
}
private boolean downloadSlotAvailable() {
int concurrentRecordings = Config.getInstance().getSettings().concurrentRecordings;
return concurrentRecordings == 0 || concurrentRecordings > 0 && recordingProcesses.size() < concurrentRecordings;
}
private void tryRestartRecording(Model model) {
if (!recording) {
// recorder is not in recording state
@ -797,4 +706,12 @@ public class NextGenLocalRecorder implements Recorder {
executeRecordUntilSubsequentAction(model);
}
}
boolean isRecording() {
return recording;
}
Map<Model, Recording> getRecordingProcesses() {
return recordingProcesses;
}
}

View File

@ -0,0 +1,126 @@
package ctbrec.recorder;
import static ctbrec.recorder.NextGenLocalRecorder.*;
import java.io.IOException;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.Recording;
import ctbrec.recorder.download.Download;
public class RecordingPreconditions {
private static final Logger LOG = LoggerFactory.getLogger(RecordingPreconditions.class);
private NextGenLocalRecorder recorder;
private long lastPreconditionMessage = 0;
RecordingPreconditions(NextGenLocalRecorder recorder) {
this.recorder = recorder;
}
void check(Model model) throws IOException {
ensureRecorderIsActive();
ensureModelIsNotSuspended(model);
ensureRecordUntilIsInFuture(model);
ensureNoRecordingRunningForModel(model);
ensureModelShouldBeRecorded(model);
ensureEnoughSpaceForRecording();
ensureDownloadSlotAvailable(model);
ensureModelIsOnline(model);
}
private void ensureModelIsOnline(Model model) {
try {
if (!model.isOnline(IGNORE_CACHE)) {
throw new PreconditionNotMetException(model.getName() + "'s room is not public");
}
} catch (IOException | ExecutionException e) {
throw new PreconditionNotMetException(model.getName() + "'s room is not public");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PreconditionNotMetException(model.getName() + "'s room is not public");
}
}
private void ensureRecordUntilIsInFuture(Model model) {
if (Instant.now().isAfter(model.getRecordUntil())) {
throw new RecordUntilExpiredException(model.getRecordUntil());
}
}
private void ensureEnoughSpaceForRecording() throws IOException {
if (!recorder.enoughSpaceForRecording()) {
throw new PreconditionNotMetException("Not enough disk space for recording");
}
}
private void ensureDownloadSlotAvailable(Model model) {
if (!downloadSlotAvailable()) {
long now = System.currentTimeMillis();
if ((now - lastPreconditionMessage) > TimeUnit.MINUTES.toMillis(1)) {
LOG.info("The number of downloads is maxed out");
}
// check, if we can stop a recording for a model with lower priority
Optional<Recording> lowerPrioRecordingProcess = recordingProcessWithLowerPrio(model.getPriority());
if (lowerPrioRecordingProcess.isPresent()) {
Download download = lowerPrioRecordingProcess.get().getDownload();
Model lowerPrioModel = download.getModel();
LOG.info("Stopping recording for {}. Prio {} < {}", lowerPrioModel.getName(), lowerPrioModel.getPriority(), model.getPriority());
recorder.stopRecordingProcess(lowerPrioModel);
} else {
throw new PreconditionNotMetException("Other models have higher prio, not starting recording for " + model.getName());
}
}
}
private Optional<Recording> recordingProcessWithLowerPrio(int priority) {
Model lowest = null;
for (Model m : recorder.getRecordingProcesses().keySet()) {
if (lowest == null || m.getPriority() < lowest.getPriority()) {
lowest = m;
}
}
if (lowest != null && lowest.getPriority() < priority) {
return Optional.of(recorder.getRecordingProcesses().get(lowest));
} else {
return Optional.empty();
}
}
private void ensureModelShouldBeRecorded(Model model) {
if (!recorder.getModels().contains(model)) {
throw new PreconditionNotMetException("Model " + model + " has been removed. Restarting of recording cancelled.");
}
}
private void ensureNoRecordingRunningForModel(Model model) {
if (recorder.getRecordingProcesses().containsKey(model)) {
throw new PreconditionNotMetException("A recording for model " + model + " is already running");
}
}
private void ensureModelIsNotSuspended(Model model) {
if (model.isSuspended()) {
throw new PreconditionNotMetException("Recording for model " + model + " is suspended");
}
}
private void ensureRecorderIsActive() {
if (!recorder.isRecording()) {
throw new PreconditionNotMetException("Recorder is not in recording mode");
}
}
private boolean downloadSlotAvailable() {
int concurrentRecordings = Config.getInstance().getSettings().concurrentRecordings;
return concurrentRecordings == 0 || concurrentRecordings > 0 && recorder.getRecordingProcesses().size() < concurrentRecordings;
}
}

View File

@ -87,11 +87,6 @@ public class HlsDownload extends AbstractHlsDownload {
running = true;
Thread.currentThread().setName("Download " + model.getName());
splitRecStartTime = ZonedDateTime.now();
if (!model.isOnline()) {
throw new IOException(model.getName() + "'s room is not public");
}
String segments = getSegmentPlaylistUrl(model);
if (segments != null) {
if (!downloadDir.toFile().exists()) {

View File

@ -83,10 +83,6 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
@Override
public void start() throws IOException {
try {
if (!model.isOnline(IGNORE_CACHE)) {
throw new IOException(model.getName() + "'s room is not public");
}
running = true;
super.startTime = Instant.now();
splitRecStartTime = ZonedDateTime.now();