forked from j62/ctbrec
1
0
Fork 0

Add mechanism to record a model only up to a certain timestamp

This commit is contained in:
0xb00bface 2020-08-08 15:28:29 +02:00
parent cbd529d001
commit 729319dfd2
8 changed files with 203 additions and 57 deletions

View File

@ -13,6 +13,7 @@ import com.squareup.moshi.JsonReader;
import com.squareup.moshi.JsonWriter;
import ctbrec.Model;
import ctbrec.SubsequentAction;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.HttpHeaderFactory;
import ctbrec.recorder.download.StreamSource;
@ -286,4 +287,24 @@ public class JavaFxModel implements Model {
public HttpHeaderFactory getHttpHeaderFactory() {
return delegate.getHttpHeaderFactory();
}
@Override
public Instant getRecordUntil() {
return delegate.getRecordUntil();
}
@Override
public void setRecordUntil(Instant instant) {
delegate.setRecordUntil(instant);
}
@Override
public SubsequentAction getRecordUntilSubsequentAction() {
return delegate.getRecordUntilSubsequentAction();
}
@Override
public void setRecordUntilSubsequentAction(SubsequentAction action) {
delegate.setRecordUntilSubsequentAction(action);
}
}

View File

@ -33,6 +33,8 @@ public abstract class AbstractModel implements Model {
protected State onlineState = State.UNKNOWN;
private Instant lastSeen;
private Instant lastRecorded;
private Instant recordUntil;
private SubsequentAction recordUntilSubsequentAction;
@Override
public boolean isOnline() throws IOException, ExecutionException, InterruptedException {
@ -231,6 +233,26 @@ public abstract class AbstractModel implements Model {
this.lastRecorded = lastRecorded;
}
@Override
public Instant getRecordUntil() {
return Optional.ofNullable(recordUntil).orElse(Instant.ofEpochMilli(Long.MAX_VALUE));
}
@Override
public void setRecordUntil(Instant recordUntil) {
this.recordUntil = recordUntil;
}
@Override
public SubsequentAction getRecordUntilSubsequentAction() {
return Optional.ofNullable(recordUntilSubsequentAction).orElse(SubsequentAction.PAUSE);
}
@Override
public void setRecordUntilSubsequentAction(SubsequentAction recordUntilSubsequentAction) {
this.recordUntilSubsequentAction = recordUntilSubsequentAction;
}
@Override
public Download createDownload() {
if (Config.isServerMode() && !Config.getInstance().getSettings().recordSingleFile) {

View File

@ -128,4 +128,10 @@ public interface Model extends Comparable<Model>, Serializable {
public HttpHeaderFactory getHttpHeaderFactory();
public Instant getRecordUntil();
public void setRecordUntil(Instant instant);
public SubsequentAction getRecordUntilSubsequentAction();
public void setRecordUntilSubsequentAction(SubsequentAction action);
}

View File

@ -0,0 +1,6 @@
package ctbrec;
public enum SubsequentAction {
PAUSE,
REMOVE
}

View File

@ -15,6 +15,7 @@ import com.squareup.moshi.JsonReader.Token;
import com.squareup.moshi.JsonWriter;
import ctbrec.Model;
import ctbrec.SubsequentAction;
import ctbrec.sites.Site;
import ctbrec.sites.chaturbate.ChaturbateModel;
@ -74,6 +75,10 @@ public class ModelJsonAdapter extends JsonAdapter<Model> {
model.setLastSeen(Instant.ofEpochMilli(reader.nextLong()));
} else if(key.equals("lastRecorded")) {
model.setLastRecorded(Instant.ofEpochMilli(reader.nextLong()));
} else if(key.equals("recordUntil")) {
model.setRecordUntil(Instant.ofEpochMilli(reader.nextLong()));
} else if(key.equals("recordUntilSubsequentAction")) {
model.setRecordUntilSubsequentAction(SubsequentAction.valueOf(reader.nextString()));
} else if(key.equals("siteSpecific")) {
reader.beginObject();
try {
@ -115,6 +120,8 @@ public class ModelJsonAdapter extends JsonAdapter<Model> {
writer.name("suspended").value(model.isSuspended());
writer.name("lastSeen").value(model.getLastSeen().toEpochMilli());
writer.name("lastRecorded").value(model.getLastRecorded().toEpochMilli());
writer.name("recordUntil").value(model.getRecordUntil().toEpochMilli());
writer.name("recordUntilSubsequentAction").value(model.getRecordUntilSubsequentAction().name());
writer.name("siteSpecific");
writer.beginObject();
model.writeSiteSpecificData(writer);

View File

@ -42,6 +42,7 @@ import ctbrec.Config;
import ctbrec.Model;
import ctbrec.Recording;
import ctbrec.Recording.State;
import ctbrec.SubsequentAction;
import ctbrec.event.Event;
import ctbrec.event.EventBusHolder;
import ctbrec.event.ModelIsOnlineEvent;
@ -59,7 +60,7 @@ public class NextGenLocalRecorder implements Recorder {
private volatile boolean recording = true;
private ReentrantLock recorderLock = new ReentrantLock();
private RecorderHttpClient client = new RecorderHttpClient();
private long lastSpaceMessage = 0;
private long lastPreconditionMessage = 0;
private Map<Model, Recording> recordingProcesses = Collections.synchronizedMap(new HashMap<>());
private RecordingManager recordingManager;
@ -213,54 +214,7 @@ public class NextGenLocalRecorder implements Recorder {
private void startRecordingProcess(Model model) throws IOException {
recorderLock.lock();
try {
if (!recording) {
// recorder is not in recording mode
return;
}
if (model.isSuspended()) {
LOG.info("Recording for model {} is suspended.", model);
return;
}
if (recordingProcesses.containsKey(model)) {
LOG.error("A recording for model {} is already running", model);
return;
}
if (!models.contains(model)) {
LOG.info("Model {} has been removed. Restarting of recording cancelled.", model);
return;
}
if (!enoughSpaceForRecording()) {
long now = System.currentTimeMillis();
if ((now - lastSpaceMessage) > TimeUnit.MINUTES.toMillis(1)) {
LOG.info("Not enough space for recording, not starting recording for {}", model);
lastSpaceMessage = now;
}
return;
}
if (!downloadSlotAvailable()) {
long now = System.currentTimeMillis();
if ((now - lastSpaceMessage) > TimeUnit.MINUTES.toMillis(1)) {
LOG.info("The number of downloads is maxed out");
}
// check, if we can stop a recording for a model with lower priority
Optional<Recording> lowerPrioRecordingProcess = recordingProcessWithLowerPrio(model.getPriority());
if (lowerPrioRecordingProcess.isPresent()) {
Download download = lowerPrioRecordingProcess.get().getDownload();
Model lowerPrioModel = download.getModel();
LOG.info("Stopping recording for {}. Prio {} < {}", lowerPrioModel.getName(), lowerPrioModel.getPriority(), model.getPriority());
stopRecordingProcess(lowerPrioModel);
} else {
if ((now - lastSpaceMessage) > TimeUnit.MINUTES.toMillis(1)) {
LOG.info("Other models have higher prio, not starting recording for {}", model.getName());
}
return;
}
}
checkRecordingPreconditions(model);
LOG.info("Starting recording for model {}", model.getName());
Download download = model.createDownload();
@ -269,14 +223,7 @@ public class NextGenLocalRecorder implements Recorder {
"At this point the download should have set a startTime. Make sure to set a startTime in " + download.getClass() + ".init()");
LOG.debug("Downloading with {}", download.getClass().getSimpleName());
Recording rec = new Recording();
rec.setDownload(download);
rec.setPath(download.getPath(model).replaceAll("\\\\", "/"));
rec.setModel(model);
rec.setStartDate(download.getStartTime());
rec.setSingleFile(download.isSingleFile());
recordingProcesses.put(model, rec);
recordingManager.add(rec);
Recording rec = createRecording(model, download);
completionService.submit(() -> {
try {
setRecordingStatus(rec, State.RECORDING);
@ -294,11 +241,112 @@ public class NextGenLocalRecorder implements Recorder {
}
return rec;
});
} catch (RecordUntilExpiredException e) {
LOG.info("Precondition not met to record {}: {}", model, e.getLocalizedMessage());
executeRecordUntilSubsequentAction(model);
} catch (PreconditionNotMetException e) {
// long now = System.currentTimeMillis();
// if ((now - lastSpaceMessage) > TimeUnit.MINUTES.toMillis(1)) {
// LOG.info("Not enough space for recording, not starting recording for {}", model);
// lastSpaceMessage = now;
// }
LOG.info("Precondition not met to record {}: {}", model, e.getLocalizedMessage());
return;
} finally {
recorderLock.unlock();
}
}
private void executeRecordUntilSubsequentAction(Model model) throws IOException {
if (model.getRecordUntilSubsequentAction() == SubsequentAction.PAUSE) {
model.setSuspended(true);
} else if (model.getRecordUntilSubsequentAction() == SubsequentAction.REMOVE) {
try {
LOG.info("Recording timeframe expired for model {} - {}", model, model.getRecordUntil());
stopRecording(model);
} catch (InvalidKeyException | NoSuchAlgorithmException e1) {
LOG.error("Error while stopping recording", e1);
}
}
}
private Recording createRecording(Model model, Download download) throws IOException {
Recording rec = new Recording();
rec.setDownload(download);
rec.setPath(download.getPath(model).replaceAll("\\\\", "/"));
rec.setModel(model);
rec.setStartDate(download.getStartTime());
rec.setSingleFile(download.isSingleFile());
recordingProcesses.put(model, rec);
recordingManager.add(rec);
return rec;
}
private void checkRecordingPreconditions(Model model) throws IOException {
ensureRecorderIsActive();
ensureModelIsNotSuspended(model);
ensureRecordUntilIsInFuture(model);
ensureNoRecordingRunningForModel(model);
ensureModelShouldBeRecorded(model);
ensureEnoughSpaceForRecording();
ensureDownloadSlotAvailable(model);
}
private void ensureRecordUntilIsInFuture(Model model) {
if (Instant.now().isAfter(model.getRecordUntil())) {
throw new RecordUntilExpiredException(model.getRecordUntil());
}
}
private void ensureEnoughSpaceForRecording() throws IOException {
if (!enoughSpaceForRecording()) {
throw new PreconditionNotMetException("Not enough disk space for recording");
}
}
private void ensureDownloadSlotAvailable(Model model) {
if (!downloadSlotAvailable()) {
long now = System.currentTimeMillis();
if ((now - lastPreconditionMessage) > TimeUnit.MINUTES.toMillis(1)) {
LOG.info("The number of downloads is maxed out");
}
// check, if we can stop a recording for a model with lower priority
Optional<Recording> lowerPrioRecordingProcess = recordingProcessWithLowerPrio(model.getPriority());
if (lowerPrioRecordingProcess.isPresent()) {
Download download = lowerPrioRecordingProcess.get().getDownload();
Model lowerPrioModel = download.getModel();
LOG.info("Stopping recording for {}. Prio {} < {}", lowerPrioModel.getName(), lowerPrioModel.getPriority(), model.getPriority());
stopRecordingProcess(lowerPrioModel);
} else {
throw new PreconditionNotMetException("Other models have higher prio, not starting recording for " + model.getName());
}
}
}
private void ensureModelShouldBeRecorded(Model model) {
if (!models.contains(model)) {
throw new PreconditionNotMetException("Model " + model + " has been removed. Restarting of recording cancelled.");
}
}
private void ensureNoRecordingRunningForModel(Model model) {
if (recordingProcesses.containsKey(model)) {
throw new PreconditionNotMetException("A recording for model " + model + " is already running");
}
}
private void ensureModelIsNotSuspended(Model model) {
if (model.isSuspended()) {
throw new PreconditionNotMetException("Recording for model " + model + " is suspended");
}
}
private void ensureRecorderIsActive() {
if (!recording) {
throw new PreconditionNotMetException("Recorder is not in recording mode");
}
}
private Optional<Recording> recordingProcessWithLowerPrio(int priority) {
Model lowest = null;
for (Model m : recordingProcesses.keySet()) {

View File

@ -0,0 +1,15 @@
package ctbrec.recorder;
public class PreconditionNotMetException extends RuntimeException {
public PreconditionNotMetException() {
super("Precondition not met");
}
public PreconditionNotMetException(String message) {
super(message);
}
public PreconditionNotMetException(String message, Throwable t) {
super(message, t);
}
}

View File

@ -0,0 +1,21 @@
package ctbrec.recorder;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
public class RecordUntilExpiredException extends PreconditionNotMetException {
private Instant until;
public RecordUntilExpiredException(Instant until) {
this.until = until;
}
@Override
public String getMessage() {
LocalDateTime dateTime = LocalDateTime.ofInstant(until, ZoneId.systemDefault());
String date = DateTimeFormatter.ISO_LOCAL_DATE.format(dateTime);
return "Recording expired at " + date;
}
}