Replace NextGenLocalRecorder with SimplifiedLocalRecorder

The multi-threading in SimplifiedLocalRecorder is a bit simpler. It also makes sure, that each recording is looked at on a regular basis, which should get rid of the stalled recordings problem.
This commit is contained in:
0xb00bface 2023-05-28 17:03:57 +02:00
parent c62634de92
commit fb5fef8912
45 changed files with 1256 additions and 969 deletions

View File

@ -22,10 +22,10 @@ import ctbrec.io.HttpException;
import ctbrec.notes.LocalModelNotesService;
import ctbrec.notes.ModelNotesService;
import ctbrec.notes.RemoteModelNotesService;
import ctbrec.recorder.NextGenLocalRecorder;
import ctbrec.recorder.OnlineMonitor;
import ctbrec.recorder.Recorder;
import ctbrec.recorder.RemoteRecorder;
import ctbrec.recorder.SimplifiedLocalRecorder;
import ctbrec.sites.Site;
import ctbrec.sites.amateurtv.AmateurTv;
import ctbrec.sites.bonga.BongaCams;
@ -549,7 +549,8 @@ public class CamrecApplication extends Application {
private void createRecorder() {
if (config.getSettings().localRecording) {
try {
recorder = new NextGenLocalRecorder(config, sites);
//recorder = new NextGenLocalRecorder(config, sites);
recorder = new SimplifiedLocalRecorder(config, sites);
} catch (IOException e) {
LOG.error("Couldn't initialize recorder", e);
Alert alert = new AutosizeAlert(Alert.AlertType.ERROR, primaryStage.getScene());

View File

@ -1,21 +1,13 @@
package ctbrec.ui;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ExecutionException;
import javax.xml.bind.JAXBException;
import com.iheartradio.m3u8.ParseException;
import com.iheartradio.m3u8.PlaylistException;
import com.squareup.moshi.JsonReader;
import com.squareup.moshi.JsonWriter;
import ctbrec.Model;
import ctbrec.SubsequentAction;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.HttpHeaderFactory;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.recorder.download.StreamSource;
import ctbrec.sites.Site;
import javafx.beans.property.BooleanProperty;
@ -23,6 +15,12 @@ import javafx.beans.property.SimpleBooleanProperty;
import javafx.beans.property.SimpleIntegerProperty;
import javafx.beans.property.SimpleObjectProperty;
import javax.xml.bind.JAXBException;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ExecutionException;
/**
* Just a wrapper for Model, which augments it with JavaFX value binding properties, so that UI widgets get updated proeprly
*/
@ -297,7 +295,7 @@ public class JavaFxModel implements Model {
}
@Override
public Download createDownload() {
public RecordingProcess createDownload() {
return delegate.createDownload();
}

View File

@ -1,28 +1,27 @@
package ctbrec.ui;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.Recording;
import ctbrec.recorder.download.RecordingProcess;
import javafx.beans.property.LongProperty;
import javafx.beans.property.SimpleLongProperty;
import javafx.beans.property.SimpleStringProperty;
import javafx.beans.property.StringProperty;
import java.io.File;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.Set;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.Recording;
import ctbrec.recorder.download.Download;
import javafx.beans.property.LongProperty;
import javafx.beans.property.SimpleLongProperty;
import javafx.beans.property.SimpleStringProperty;
import javafx.beans.property.StringProperty;
public class JavaFxRecording extends Recording {
private transient StringProperty statusProperty = new SimpleStringProperty();
private transient StringProperty progressProperty = new SimpleStringProperty();
private transient StringProperty notesProperty = new SimpleStringProperty();
private transient LongProperty sizeProperty = new SimpleLongProperty();
private Recording delegate;
private final transient StringProperty statusProperty = new SimpleStringProperty();
private final transient StringProperty progressProperty = new SimpleStringProperty();
private final transient StringProperty notesProperty = new SimpleStringProperty();
private final transient LongProperty sizeProperty = new SimpleLongProperty();
private final Recording delegate;
private long lastValue = 0;
public JavaFxRecording(Recording recording) {
@ -31,9 +30,7 @@ public class JavaFxRecording extends Recording {
setSizeInByte(recording.getSizeInByte());
setProgress(recording.getProgress());
setNote(recording.getNote());
notesProperty.addListener((obs, oldV, newV) -> {
delegate.setNote(newV);
});
notesProperty.addListener((obs, oldV, newV) -> delegate.setNote(newV));
}
public Recording getDelegate() {
@ -72,38 +69,38 @@ public class JavaFxRecording extends Recording {
@Override
public void setStatus(State status) {
delegate.setStatus(status);
switch(status) {
case RECORDING:
statusProperty.set("recording");
break;
case GENERATING_PLAYLIST:
statusProperty.set("generating playlist");
break;
case FINISHED:
statusProperty.set("finished");
break;
case DOWNLOADING:
statusProperty.set("downloading");
break;
case POST_PROCESSING:
statusProperty.set("post-processing");
break;
case DELETED:
statusProperty.set("deleted");
break;
case DELETING:
statusProperty.set("deleting");
break;
case FAILED:
statusProperty.set("failed");
break;
case WAITING:
statusProperty.set("waiting");
break;
case UNKNOWN:
default:
statusProperty.set("unknown");
break;
switch (status) {
case RECORDING:
statusProperty.set("recording");
break;
case GENERATING_PLAYLIST:
statusProperty.set("generating playlist");
break;
case FINISHED:
statusProperty.set("finished");
break;
case DOWNLOADING:
statusProperty.set("downloading");
break;
case POST_PROCESSING:
statusProperty.set("post-processing");
break;
case DELETED:
statusProperty.set("deleted");
break;
case DELETING:
statusProperty.set("deleting");
break;
case FAILED:
statusProperty.set("failed");
break;
case WAITING:
statusProperty.set("waiting");
break;
case UNKNOWN:
default:
statusProperty.set("unknown");
break;
}
if (isPinned()) {
statusProperty.set(statusProperty.get() + " 🔒");
@ -118,8 +115,8 @@ public class JavaFxRecording extends Recording {
@Override
public void setProgress(int progress) {
delegate.setProgress(progress);
if(progress >= 0) {
progressProperty.set(progress+"%");
if (progress >= 0) {
progressProperty.set(progress + "%");
} else {
progressProperty.set("");
}
@ -151,8 +148,8 @@ public class JavaFxRecording extends Recording {
}
public void update(Recording updated) {
if(!Config.getInstance().getSettings().localRecording) {
if(getStatus() == State.DOWNLOADING && updated.getStatus() != State.DOWNLOADING) {
if (!Config.getInstance().getSettings().localRecording) {
if (getStatus() == State.DOWNLOADING && updated.getStatus() != State.DOWNLOADING) {
// ignore, because the the status coming from the server is FINISHED and we are
// overriding it with DOWNLOADING
return;
@ -267,13 +264,13 @@ public class JavaFxRecording extends Recording {
}
@Override
public Download getDownload() {
return delegate.getDownload();
public RecordingProcess getRecordingProcess() {
return delegate.getRecordingProcess();
}
@Override
public void setDownload(Download download) {
delegate.setDownload(download);
public void setRecordingProcess(RecordingProcess recordingProcess) {
delegate.setRecordingProcess(recordingProcess);
}
@Override

View File

@ -1,17 +1,5 @@
package ctbrec.ui.sites.jasmin;
import static ctbrec.io.HttpConstants.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.io.HttpException;
@ -23,6 +11,17 @@ import javafx.concurrent.Task;
import okhttp3.Cookie;
import okhttp3.HttpUrl;
import okhttp3.Request;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import static ctbrec.io.HttpConstants.*;
public class LiveJasminUpdateService extends PaginatedScheduledService {
@ -65,11 +64,11 @@ public class LiveJasminUpdateService extends PaginatedScheduledService {
var body = response.body().string();
List<Model> models = new ArrayList<>();
var json = new JSONObject(body);
if(json.optBoolean("success")) {
if (json.optBoolean("success")) {
parseModels(models, json);
} else if(json.optString("error").equals("Please login.")) {
} else if (json.optString("error").equals("Please login.")) {
var siteUI = SiteUiFactory.getUi(liveJasmin);
if(siteUI.login()) {
if (siteUI.login()) {
return call();
} else {
LOG.error("Request failed:\n{}", body);
@ -95,13 +94,14 @@ public class LiveJasminUpdateService extends PaginatedScheduledService {
for (var i = 0; i < performers.length(); i++) {
var m = performers.getJSONObject(i);
var name = m.optString("pid");
if(name.isEmpty()) {
if (name.isEmpty()) {
continue;
}
LiveJasminModel model = (LiveJasminModel) liveJasmin.createModel(name);
model.setId(m.getString("id"));
model.setPreview(m.getString("profilePictureUrl"));
model.setOnlineState(LiveJasminModel.mapStatus(m.optInt("status")));
model.setDisplayName(m.optString("display_name", null));
models.add(model);
}
}

View File

@ -1,14 +1,14 @@
package ctbrec.ui.tabs.recorded;
import java.util.Optional;
import ctbrec.Model;
import ctbrec.ModelGroup;
import ctbrec.recorder.Recorder;
import java.util.Optional;
public class ModelName {
private Model mdl;
private Recorder rec;
private final Model mdl;
private final Recorder rec;
public ModelName(Model model, Recorder recorder) {
mdl = model;
@ -22,8 +22,8 @@ public class ModelName {
if (modelGroup.isPresent()) {
s = modelGroup.get().getName() + " (aka " + mdl.getDisplayName() + ')';
} else {
return mdl.toString();
return mdl.getDisplayName();
}
return s;
}
}
}

View File

@ -1,6 +1,16 @@
package ctbrec;
import static ctbrec.io.HttpConstants.*;
import com.squareup.moshi.JsonReader;
import com.squareup.moshi.JsonWriter;
import ctbrec.recorder.download.HttpHeaderFactory;
import ctbrec.recorder.download.HttpHeaderFactoryImpl;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.recorder.download.hls.HlsDownload;
import ctbrec.recorder.download.hls.HlsdlDownload;
import ctbrec.recorder.download.hls.MergedFfmpegHlsDownload;
import ctbrec.sites.Site;
import okhttp3.Request;
import okhttp3.Response;
import java.io.IOException;
import java.time.Instant;
@ -10,18 +20,7 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import com.squareup.moshi.JsonReader;
import com.squareup.moshi.JsonWriter;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.HttpHeaderFactory;
import ctbrec.recorder.download.HttpHeaderFactoryImpl;
import ctbrec.recorder.download.hls.HlsDownload;
import ctbrec.recorder.download.hls.HlsdlDownload;
import ctbrec.recorder.download.hls.MergedFfmpegHlsDownload;
import ctbrec.sites.Site;
import okhttp3.Request;
import okhttp3.Response;
import static ctbrec.io.HttpConstants.USER_AGENT;
public abstract class AbstractModel implements Model {
@ -72,7 +71,7 @@ public abstract class AbstractModel implements Model {
@Override
public String getDisplayName() {
if(displayName != null) {
if (displayName != null) {
return displayName;
} else {
return getName();
@ -290,7 +289,7 @@ public abstract class AbstractModel implements Model {
}
@Override
public Download createDownload() {
public RecordingProcess createDownload() {
if (Config.getInstance().getSettings().useHlsdl) {
return new HlsdlDownload();
} else {

View File

@ -1,23 +1,21 @@
package ctbrec;
import java.io.IOException;
import java.io.Serializable;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ExecutionException;
import javax.xml.bind.JAXBException;
import com.iheartradio.m3u8.ParseException;
import com.iheartradio.m3u8.PlaylistException;
import com.squareup.moshi.JsonReader;
import com.squareup.moshi.JsonWriter;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.HttpHeaderFactory;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.recorder.download.StreamSource;
import ctbrec.sites.Site;
import javax.xml.bind.JAXBException;
import java.io.IOException;
import java.io.Serializable;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ExecutionException;
public interface Model extends Comparable<Model>, Serializable {
long RECORD_INDEFINITELY = 9000000000000000000L;
@ -32,6 +30,7 @@ public interface Model extends Comparable<Model>, Serializable {
UNKNOWN("unknown");
final String display;
State(String display) {
this.display = display;
}
@ -102,10 +101,8 @@ public interface Model extends Comparable<Model>, Serializable {
/**
* Determines the stream resolution for this model
*
* @param failFast
* If set to true, the method returns immediately, even if the resolution is unknown. If
* the resolution is unknown, the array contains 0,0
*
* @param failFast If set to true, the method returns immediately, even if the resolution is unknown. If
* the resolution is unknown, the array contains 0,0
* @return a tupel of width and height represented by an int[2]
* @throws ExecutionException
*/
@ -131,7 +128,7 @@ public interface Model extends Comparable<Model>, Serializable {
void setMarkedForLaterRecording(boolean marked);
Download createDownload();
RecordingProcess createDownload();
void setPriority(int priority);
@ -140,14 +137,18 @@ public interface Model extends Comparable<Model>, Serializable {
HttpHeaderFactory getHttpHeaderFactory();
boolean isRecordingTimeLimited();
Instant getRecordUntil();
void setRecordUntil(Instant instant);
SubsequentAction getRecordUntilSubsequentAction();
void setRecordUntilSubsequentAction(SubsequentAction action);
/**
* Check, if this model account exists
*
* @return true, if it exists, false otherwise
* @throws IOException
*/

View File

@ -3,7 +3,8 @@ package ctbrec;
import ctbrec.event.EventBusHolder;
import ctbrec.event.RecordingStateChangedEvent;
import ctbrec.io.IoUtils;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.recorder.download.StreamSource;
import ctbrec.recorder.download.VideoLengthDetector;
import lombok.extern.slf4j.Slf4j;
@ -18,16 +19,17 @@ import java.time.format.DateTimeFormatter;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import static ctbrec.Recording.State.*;
@Slf4j
public class Recording implements Serializable, Callable<Recording> {
public class Recording implements Serializable {
private String id;
private Model model;
private transient Download download;
private transient RecordingProcess recordingProcess;
private transient Future<RecordingProcess> currentIteration;
private Instant startDate;
private String path;
private State status = State.UNKNOWN;
@ -71,15 +73,6 @@ public class Recording implements Serializable, Callable<Recording> {
}
}
@Override
public Recording call() throws Exception {
download.call();
if (selectedResolution == -1) {
selectedResolution = download.getSelectedResolution();
}
return this;
}
public String getId() {
return id;
}
@ -154,11 +147,11 @@ public class Recording implements Serializable, Callable<Recording> {
}
public void postprocess() {
getDownload().postprocess(this);
getRecordingProcess().postProcess(this);
}
private void fireStatusEvent(State status) {
RecordingStateChangedEvent evt = new RecordingStateChangedEvent(getDownload().getTarget(), status, getModel(), getStartDate());
RecordingStateChangedEvent evt = new RecordingStateChangedEvent(getRecordingProcess().getTarget(), status, getModel(), getStartDate());
EventBusHolder.BUS.post(evt);
}
@ -170,12 +163,12 @@ public class Recording implements Serializable, Callable<Recording> {
this.model = model;
}
public Download getDownload() {
return download;
public RecordingProcess getRecordingProcess() {
return recordingProcess;
}
public void setDownload(Download download) {
this.download = download;
public void setRecordingProcess(RecordingProcess recordingProcess) {
this.recordingProcess = recordingProcess;
}
public boolean isSingleFile() {
@ -211,6 +204,9 @@ public class Recording implements Serializable, Callable<Recording> {
}
public int getSelectedResolution() {
if ((selectedResolution == -1 || selectedResolution == StreamSource.UNKNOWN) && recordingProcess != null) {
selectedResolution = recordingProcess.getSelectedResolution();
}
return selectedResolution;
}
@ -319,4 +315,12 @@ public class Recording implements Serializable, Callable<Recording> {
public void setDirtyFlag(boolean dirtyFlag) {
this.dirtyFlag = dirtyFlag;
}
public Future<RecordingProcess> getCurrentIteration() {
return currentIteration;
}
public void setCurrentIteration(Future<RecordingProcess> currentIteration) {
this.currentIteration = currentIteration;
}
}

View File

@ -1,24 +1,22 @@
package ctbrec.io;
import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.JsonReader;
import com.squareup.moshi.JsonReader.Token;
import com.squareup.moshi.JsonWriter;
import ctbrec.Model;
import ctbrec.SubsequentAction;
import ctbrec.sites.Site;
import ctbrec.sites.chaturbate.ChaturbateModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.JsonReader;
import com.squareup.moshi.JsonReader.Token;
import com.squareup.moshi.JsonWriter;
import ctbrec.Model;
import ctbrec.SubsequentAction;
import ctbrec.sites.Site;
import ctbrec.sites.chaturbate.ChaturbateModel;
public class ModelJsonAdapter extends JsonAdapter<Model> {
private static final Logger LOG = LoggerFactory.getLogger(ModelJsonAdapter.class);
@ -49,6 +47,8 @@ public class ModelJsonAdapter extends JsonAdapter<Model> {
model = (Model) modelClass.getDeclaredConstructor().newInstance();
} else if (key.equals("name")) {
model.setName(reader.nextString());
} else if (key.equals("displayName")) {
model.setName(reader.nextString());
} else if (key.equals("description")) {
model.setDescription(reader.nextString());
} else if (key.equals("url")) {
@ -85,7 +85,7 @@ public class ModelJsonAdapter extends JsonAdapter<Model> {
reader.skipValue();
}
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException
| NoSuchMethodException | SecurityException e) {
| NoSuchMethodException | SecurityException e) {
throw new IOException("Couldn't instantiate model class [" + type + "]", e);
}
}
@ -106,6 +106,7 @@ public class ModelJsonAdapter extends JsonAdapter<Model> {
writer.beginObject();
writer.name("type").value(model.getClass().getName());
writeValueIfSet(writer, "name", model.getName());
writeValueIfSet(writer, "displayName", model.getDisplayName());
writeValueIfSet(writer, "description", model.getDescription());
writeValueIfSet(writer, "url", model.getUrl());
writer.name("priority").value(model.getPriority());

View File

@ -80,7 +80,9 @@ public class RemoteModelNotesService extends RemoteService implements ModelNotes
cache.put(entry.getKey(), entry.getValue());
}
} catch (Exception e) {
throw new CacheLoader.InvalidCacheLoadException("Loading of model notes from server failed");
var exception = new CacheLoader.InvalidCacheLoadException("Loading of model notes from server failed");
exception.initCause(e);
throw exception;
}
}
return Optional.ofNullable(notesCache.get(modelUrl)).orElse("");
@ -94,14 +96,20 @@ public class RemoteModelNotesService extends RemoteService implements ModelNotes
log.trace("Loading all model notes from server");
try (Response resp = httpClient.execute(builder.build())) {
if (resp.isSuccessful()) {
String body = resp.body().string();
log.trace("Model notes from server:\n{}", body);
Map<String, String> result = new HashMap<>();
JSONObject json = new JSONObject(resp.body().string());
JSONArray names = json.names();
for (int i = 0; i < names.length(); i++) {
String name = names.getString(i);
result.put(name, json.getString(name));
JSONObject json = new JSONObject(body);
if (json.names() != null) {
JSONArray names = json.names();
for (int i = 0; i < names.length(); i++) {
String name = names.getString(i);
result.put(name, json.getString(name));
}
return Collections.unmodifiableMap(result);
} else {
return Collections.emptyMap();
}
return Collections.unmodifiableMap(result);
} else {
throw new HttpException(resp.code(), resp.message());
}

View File

@ -1,5 +1,10 @@
package ctbrec.recorder;
import ctbrec.Model;
import ctbrec.ModelGroup;
import ctbrec.Recording;
import ctbrec.io.HttpClient;
import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
@ -8,78 +13,81 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import ctbrec.Model;
import ctbrec.ModelGroup;
import ctbrec.Recording;
import ctbrec.io.HttpClient;
public interface Recorder {
public void addModel(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
void addModel(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
public void stopRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
public void stopRecordingAt(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
void stopRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
public void switchStreamSource(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
void stopRecordingAt(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
void switchStreamSource(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
/**
* Returns true, if a model is in the list of models to record. This does not reflect, if there currently is a recording running. The model might be offline
* aswell.
*/
public boolean isTracked(Model model);
boolean isTracked(Model model);
/**
* Get the list of all models, which are tracked by ctbrec
*
* @return a List of Model objects, which might be empty
*/
public List<Model> getModels();
List<Model> getModels();
public List<Recording> getRecordings() throws IOException, InvalidKeyException, NoSuchAlgorithmException;
List<Recording> getRecordings() throws IOException, InvalidKeyException, NoSuchAlgorithmException;
public void delete(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
void delete(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
/**
* Pins a recording. A pinned recording cannot be deleted.
*
* @param recording
* @throws IOException
* @throws InvalidKeyException
* @throws NoSuchAlgorithmException
*/
public void pin(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
void pin(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
/**
* Unpins a previously pinned recording. A pinned recording cannot be deleted.
*
* @param recording
* @throws IOException
* @throws InvalidKeyException
* @throws NoSuchAlgorithmException
*/
public void unpin(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
void unpin(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
public void shutdown(boolean immediately);
void shutdown(boolean immediately);
public void suspendRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
public void resumeRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
void suspendRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
public boolean isSuspended(Model model);
void resumeRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
public boolean isMarkedForLaterRecording(Model model);
public void markForLaterRecording(Model model, boolean mark) throws InvalidKeyException, NoSuchAlgorithmException, IOException;
boolean isSuspended(Model model);
boolean isMarkedForLaterRecording(Model model);
void markForLaterRecording(Model model, boolean mark) throws InvalidKeyException, NoSuchAlgorithmException, IOException;
/**
* Returns only the models from getModels(), which are online
*
* @return
*/
public List<Model> getOnlineModels();
List<Model> getOnlineModels();
/**
* Returns only the models from getModels(), which are actually recorded right now (a recording process is currently running).
*
* @return
* @throws IOException
* @throws IllegalStateException
* @throws NoSuchAlgorithmException
* @throws InvalidKeyException
*/
public default List<Model> getCurrentlyRecording() throws InvalidKeyException, NoSuchAlgorithmException, IOException {
default List<Model> getCurrentlyRecording() throws InvalidKeyException, NoSuchAlgorithmException, IOException {
List<Recording> recordings = getRecordings();
return getModels().stream().filter(m -> {
for (Recording recording : recordings) {
@ -91,32 +99,35 @@ public interface Recorder {
}).collect(Collectors.toList());
}
public HttpClient getHttpClient();
HttpClient getHttpClient();
/**
* Get the total size of the filesystem we are recording to
*
* @return the total size in bytes
* @throws IOException
*/
public long getTotalSpaceBytes() throws IOException;
long getTotalSpaceBytes() throws IOException;
/**
* Get the free space left on the filesystem we are recording to
*
* @return the free space in bytes
* @throws IOException
*/
public long getFreeSpaceBytes() throws IOException;
long getFreeSpaceBytes() throws IOException;
/**
* Regenerate the playlist for a recording. This is helpful, if the
* playlist is corrupt or hasn't been generated for whatever reason
*
* @param recording
* @throws IllegalStateException
* @throws NoSuchAlgorithmException
* @throws InvalidKeyException
* @throws IOException
*/
public void rerunPostProcessing(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
void rerunPostProcessing(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
/**
* Tells the recorder, that the recording priority for the given model has changed
@ -126,46 +137,50 @@ public interface Recorder {
* @throws InvalidKeyException
* @throws NoSuchAlgorithmException
*/
public void priorityChanged(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
void priorityChanged(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
public void setNote(Recording rec, String note) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
void setNote(Recording rec, String note) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
/**
* Pauses the recording of models entirely. The state of which models should be recorded and which are paused
* is kept.
*
* @throws IOException
* @throws NoSuchAlgorithmException
* @throws InvalidKeyException
*/
public void pause() throws InvalidKeyException, NoSuchAlgorithmException, IOException;
void pause() throws InvalidKeyException, NoSuchAlgorithmException, IOException;
/**
* Resumes recording
*
* @throws IOException
* @throws NoSuchAlgorithmException
* @throws InvalidKeyException
*/
public void resume() throws InvalidKeyException, NoSuchAlgorithmException, IOException;
void resume() throws InvalidKeyException, NoSuchAlgorithmException, IOException;
/**
* Returns the number of models, which are on the recording list and not marked for later recording
*
* @return
*/
public int getModelCount();
int getModelCount();
public Set<ModelGroup> getModelGroups();
Set<ModelGroup> getModelGroups();
/**
* Saves a model group. If the group already exists, it will be overwritten. Otherwise it will
* be saved as a new group.
*
* @param group
* @throws IOException
* @throws NoSuchAlgorithmException
* @throws InvalidKeyException
*/
public void saveModelGroup(ModelGroup group) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
void saveModelGroup(ModelGroup group) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
public void deleteModelGroup(ModelGroup group) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
void deleteModelGroup(ModelGroup group) throws IOException, InvalidKeyException, NoSuchAlgorithmException;
default Optional<ModelGroup> getModelGroup(Model model) {
return getModelGroups().stream()

View File

@ -4,7 +4,7 @@ import ctbrec.Config;
import ctbrec.Model;
import ctbrec.ModelGroup;
import ctbrec.Recording;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.RecordingProcess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -18,18 +18,17 @@ import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static ctbrec.recorder.NextGenLocalRecorder.IGNORE_CACHE;
public class RecordingPreconditions {
private static final Logger LOG = LoggerFactory.getLogger(RecordingPreconditions.class);
public static final boolean IGNORE_CACHE = true;
private final Config config;
private final NextGenLocalRecorder recorder;
private final SimplifiedLocalRecorder recorder;
private long lastPreconditionMessage = 0;
RecordingPreconditions(NextGenLocalRecorder recorder, Config config) {
RecordingPreconditions(SimplifiedLocalRecorder recorder, Config config) {
this.recorder = recorder;
this.config = config;
}
@ -84,7 +83,7 @@ public class RecordingPreconditions {
}
private void ensureEnoughSpaceForRecording() throws IOException {
if (!recorder.enoughSpaceForRecording()) {
if (recorder.notEnoughSpaceForRecording()) {
throw new PreconditionNotMetException("Not enough disk space for recording");
}
}
@ -99,7 +98,7 @@ public class RecordingPreconditions {
// check, if we can stop a recording for a model with lower priority
Optional<Recording> lowerPrioRecordingProcess = recordingProcessWithLowerPrio(model.getPriority());
if (lowerPrioRecordingProcess.isPresent()) {
Download download = lowerPrioRecordingProcess.get().getDownload();
RecordingProcess download = lowerPrioRecordingProcess.get().getRecordingProcess();
Model lowerPrioModel = download.getModel();
LOG.info("Stopping recording for {}. Prio {} < {}", lowerPrioModel.getName(), lowerPrioModel.getPriority(), model.getPriority());
recorder.stopRecordingProcess(lowerPrioModel);
@ -148,7 +147,7 @@ public class RecordingPreconditions {
}
private void ensureRecorderIsActive() {
if (!recorder.isRecording()) {
if (!recorder.isRunning()) {
throw new PreconditionNotMetException("Recorder is not in recording mode");
}
}

View File

@ -1,50 +1,30 @@
package ctbrec.recorder;
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;
import ctbrec.Config;
import ctbrec.Hmac;
import ctbrec.Model;
import ctbrec.ModelGroup;
import ctbrec.Recording;
import ctbrec.*;
import ctbrec.event.EventBusHolder;
import ctbrec.event.NoSpaceLeftEvent;
import ctbrec.event.RecordingStateChangedEvent;
import ctbrec.io.BandwidthMeter;
import ctbrec.io.FileJsonAdapter;
import ctbrec.io.HttpClient;
import ctbrec.io.HttpException;
import ctbrec.io.InstantJsonAdapter;
import ctbrec.io.ModelJsonAdapter;
import ctbrec.io.UuidJSonAdapter;
import ctbrec.io.*;
import ctbrec.sites.Site;
import okhttp3.MediaType;
import okhttp3.Request;
import okhttp3.Request.Builder;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;
public class RemoteRecorder implements Recorder {
@ -54,34 +34,35 @@ public class RemoteRecorder implements Recorder {
private static final Logger LOG = LoggerFactory.getLogger(RemoteRecorder.class);
public static final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
private Moshi moshi = new Moshi.Builder()
private static final String LOG_MSG_SENDING_REQUERST = "Sending request to recording server: {}";
private final Moshi moshi = new Moshi.Builder()
.add(Instant.class, new InstantJsonAdapter())
.add(Model.class, new ModelJsonAdapter())
.add(File.class, new FileJsonAdapter())
.add(UUID.class, new UuidJSonAdapter())
.build();
private JsonAdapter<ModelListResponse> modelListResponseAdapter = moshi.adapter(ModelListResponse.class);
private JsonAdapter<RecordingListResponse> recordingListResponseAdapter = moshi.adapter(RecordingListResponse.class);
private JsonAdapter<ModelRequest> modelRequestAdapter = moshi.adapter(ModelRequest.class);
private JsonAdapter<ModelGroupRequest> modelGroupRequestAdapter = moshi.adapter(ModelGroupRequest.class);
private JsonAdapter<ModelGroupListResponse> modelGroupListResponseAdapter = moshi.adapter(ModelGroupListResponse.class);
private JsonAdapter<RecordingRequest> recordingRequestAdapter = moshi.adapter(RecordingRequest.class);
private JsonAdapter<SimpleResponse> simpleResponseAdapter = moshi.adapter(SimpleResponse.class);
private final JsonAdapter<ModelListResponse> modelListResponseAdapter = moshi.adapter(ModelListResponse.class);
private final JsonAdapter<RecordingListResponse> recordingListResponseAdapter = moshi.adapter(RecordingListResponse.class);
private final JsonAdapter<ModelRequest> modelRequestAdapter = moshi.adapter(ModelRequest.class);
private final JsonAdapter<ModelGroupRequest> modelGroupRequestAdapter = moshi.adapter(ModelGroupRequest.class);
private final JsonAdapter<ModelGroupListResponse> modelGroupListResponseAdapter = moshi.adapter(ModelGroupListResponse.class);
private final JsonAdapter<RecordingRequest> recordingRequestAdapter = moshi.adapter(RecordingRequest.class);
private final JsonAdapter<SimpleResponse> simpleResponseAdapter = moshi.adapter(SimpleResponse.class);
private List<Model> models = Collections.emptyList();
private List<Model> onlineModels = Collections.emptyList();
private List<Recording> recordings = Collections.emptyList();
private ReentrantLock modelGroupLock = new ReentrantLock();
private Set<ModelGroup> modelGroups = new HashSet<>();
private List<Site> sites;
private final ReentrantLock modelGroupLock = new ReentrantLock();
private final Set<ModelGroup> modelGroups = new HashSet<>();
private final List<Site> sites;
private long spaceTotal = -1;
private long spaceFree = -1;
private boolean noSpaceLeftDetected = false;
private Config config;
private HttpClient client;
private final Config config;
private final HttpClient client;
private Instant lastSync = Instant.EPOCH;
private SyncThread syncThread;
private final SyncThread syncThread;
public RemoteRecorder(Config config, HttpClient client, List<Site> sites) {
this.config = config;
@ -125,7 +106,7 @@ public class RemoteRecorder implements Recorder {
private Optional<String> sendRequest(String action) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
String msg = "{\"action\": \"" + action + "\"}";
LOG.trace("Sending request to recording server: {}", msg);
LOG.trace(LOG_MSG_SENDING_REQUERST, msg);
RequestBody requestBody = RequestBody.Companion.create(msg, JSON);
Request.Builder builder = new Request.Builder().url(getRecordingEndpoint()).post(requestBody);
addHmacIfNeeded(msg, builder);
@ -143,7 +124,7 @@ public class RemoteRecorder implements Recorder {
private void sendRequest(String action, Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
String payload = modelRequestAdapter.toJson(new ModelRequest(action, model));
LOG.trace("Sending request to recording server: {}", payload);
LOG.trace(LOG_MSG_SENDING_REQUERST, payload);
RequestBody body = RequestBody.Companion.create(payload, JSON);
Request.Builder builder = new Request.Builder().url(getRecordingEndpoint()).post(body);
addHmacIfNeeded(payload, builder);
@ -166,7 +147,7 @@ public class RemoteRecorder implements Recorder {
String msg = recordingRequestAdapter.toJson(recReq);
RequestBody body = RequestBody.Companion.create(msg, JSON);
Request.Builder builder = new Request.Builder().url(getRecordingEndpoint()).post(body);
LOG.trace("Sending request to recording server: {}", msg);
LOG.trace(LOG_MSG_SENDING_REQUERST, msg);
addHmacIfNeeded(msg, builder);
Request request = builder.build();
try (Response response = client.execute(request)) {
@ -188,7 +169,7 @@ public class RemoteRecorder implements Recorder {
private void sendRequest(String action, ModelGroup model) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
String payload = modelGroupRequestAdapter.toJson(new ModelGroupRequest(action, model));
LOG.trace("Sending request to recording server: {}", payload);
LOG.trace(LOG_MSG_SENDING_REQUERST, payload);
RequestBody body = RequestBody.Companion.create(payload, JSON);
Request.Builder builder = new Request.Builder().url(getRecordingEndpoint()).post(body);
addHmacIfNeeded(payload, builder);
@ -218,7 +199,7 @@ public class RemoteRecorder implements Recorder {
}
}
private void addHmacIfNeeded(String msg, Builder builder) throws InvalidKeyException, NoSuchAlgorithmException, UnsupportedEncodingException {
private void addHmacIfNeeded(String msg, Builder builder) throws InvalidKeyException, NoSuchAlgorithmException {
if (Config.getInstance().getSettings().requireAuthentication) {
byte[] key = Config.getInstance().getSettings().key;
String hmac = Hmac.calculate(msg, key);
@ -246,7 +227,7 @@ public class RemoteRecorder implements Recorder {
private Optional<Model> findModel(Model m) {
int index = Optional.ofNullable(models).map(list -> list.indexOf(m)).orElse(-1);
if (index >= 0) {
return Optional.of(models.get(index));
return Optional.ofNullable(models).map(mdls -> mdls.get(index));
} else {
return Optional.empty();
}
@ -418,7 +399,7 @@ public class RemoteRecorder implements Recorder {
if (resp.status.equals(SUCCESS)) {
List<Recording> newRecordings = resp.recordings;
// fire changed events
for (Iterator<Recording> iterator = recordings.iterator(); iterator.hasNext();) {
for (Iterator<Recording> iterator = recordings.iterator(); iterator.hasNext(); ) {
Recording recording = iterator.next();
if (newRecordings.contains(recording)) {
int idx = newRecordings.indexOf(recording);
@ -505,7 +486,7 @@ public class RemoteRecorder implements Recorder {
}
@Override
public List<Recording> getRecordings() throws IOException, InvalidKeyException, NoSuchAlgorithmException {
public List<Recording> getRecordings() {
return recordings;
}
@ -543,7 +524,7 @@ public class RemoteRecorder implements Recorder {
public static class ModelGroupRequest {
private String action;
private ModelGroup modelGroup;
private final ModelGroup modelGroup;
public ModelGroupRequest(String action, ModelGroup modelGroup) {
super();
@ -562,10 +543,6 @@ public class RemoteRecorder implements Recorder {
public ModelGroup getModelGroup() {
return modelGroup;
}
public void setModelGroup(ModelGroup model) {
this.modelGroup = model;
}
}
public static class RecordingRequest {

View File

@ -6,12 +6,11 @@ import ctbrec.Recording.State;
import ctbrec.event.*;
import ctbrec.io.HttpClient;
import ctbrec.notes.LocalModelNotesService;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.recorder.postprocessing.PostProcessingContext;
import ctbrec.recorder.postprocessing.PostProcessor;
import ctbrec.sites.Site;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
import java.io.IOException;
@ -25,21 +24,20 @@ import java.time.ZoneId;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static ctbrec.Recording.State.WAITING;
import static ctbrec.SubsequentAction.*;
import static ctbrec.event.Event.Type.MODEL_ONLINE;
import static java.lang.Thread.MAX_PRIORITY;
import static java.lang.Thread.MIN_PRIORITY;
import static java.util.concurrent.TimeUnit.SECONDS;
public class NextGenLocalRecorder implements Recorder {
@Slf4j
public class SimplifiedLocalRecorder implements Recorder {
private static final Logger LOG = LoggerFactory.getLogger(NextGenLocalRecorder.class);
public static final boolean IGNORE_CACHE = true;
private final List<Model> models = Collections.synchronizedList(new ArrayList<>());
private final Config config;
private volatile boolean recording;
private volatile boolean running;
private final ReentrantLock recorderLock = new ReentrantLock();
private final ReentrantLock modelGroupLock = new ReentrantLock();
private final RecorderHttpClient client;
@ -47,127 +45,104 @@ public class NextGenLocalRecorder implements Recorder {
private final RecordingManager recordingManager;
private final RecordingPreconditions preconditions;
private final BlockingQueue<Recording> recordings = new LinkedBlockingQueue<>();
// thread pools for downloads and post-processing
private final ScheduledExecutorService downloadPool;
private final ScheduledExecutorService scheduler;
private final ExecutorService playlistDownloadPool = Executors.newFixedThreadPool(10);
private final ExecutorService segmentDownloadPool = Executors.newFixedThreadPool(10);
private final ExecutorService postProcessing;
private final ThreadPoolScaler threadPoolScaler;
private final ExecutorService segmentDownloadPool = new ThreadPoolExecutor(0, 1000, 30L, SECONDS, new SynchronousQueue<>(), createThreadFactory("SegmentDownload", MAX_PRIORITY - 2));
private final ExecutorService downloadCompletionPool = Executors.newFixedThreadPool(1, createThreadFactory("DownloadCompletionWorker", MAX_PRIORITY - 1));
private final BlockingQueue<ScheduledFuture<Recording>> downloadFutureQueue = new LinkedBlockingQueue<>();
private final Map<ScheduledFuture<Recording>, Recording> downloadFutureRecordingMap = Collections.synchronizedMap(new HashMap<>());
private long lastSpaceCheck;
private final ThreadPoolExecutor ppPool;
public NextGenLocalRecorder(Config config, List<Site> sites) throws IOException {
public SimplifiedLocalRecorder(Config config, List<Site> sites) throws IOException {
this.config = config;
client = new RecorderHttpClient(config);
downloadPool = Executors.newScheduledThreadPool(5, createThreadFactory("Download", MAX_PRIORITY));
threadPoolScaler = new ThreadPoolScaler((ThreadPoolExecutor) downloadPool, 5);
scheduler = Executors.newScheduledThreadPool(5, createThreadFactory("Download", MAX_PRIORITY));
threadPoolScaler = new ThreadPoolScaler((ThreadPoolExecutor) scheduler, 5);
recordingManager = new RecordingManager(config, sites);
loadModels();
int ppThreads = config.getSettings().postProcessingThreads;
BlockingQueue<Runnable> ppQueue = new LinkedBlockingQueue<>();
ppPool = new ThreadPoolExecutor(ppThreads, ppThreads, 5, TimeUnit.MINUTES, ppQueue, createThreadFactory("PP", MIN_PRIORITY));
postProcessing = new ThreadPoolExecutor(ppThreads, ppThreads, 5, TimeUnit.MINUTES, ppQueue, createThreadFactory("PP", MIN_PRIORITY));
recording = true;
running = true;
registerEventBusListener();
preconditions = new RecordingPreconditions(this, config);
LOG.debug("Recorder initialized");
LOG.info("Models to record: {}", models);
LOG.info("Saving recordings in {}", config.getSettings().recordingsDir);
log.debug("Recorder initialized");
log.info("Models to record: {}", models);
log.info("Saving recordings in {}", config.getSettings().recordingsDir);
startCompletionHandler();
startRecordingLoop();
}
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleWithFixedDelay(() -> {
private void startRecordingLoop() {
new Thread(() -> {
while (running) {
Recording rec = recordings.poll();
if (rec != null) {
processRecording(rec);
}
checkFreeSpace();
threadPoolScaler.tick();
waitABit(1);
}
}).start();
}
private void checkFreeSpace() {
if ((System.currentTimeMillis() - lastSpaceCheck) > 0) {
lastSpaceCheck = System.currentTimeMillis();
try {
if (!recordingProcesses.isEmpty() && !enoughSpaceForRecording()) {
LOG.info("No space left -> Stopping all recordings");
if (!recordingProcesses.isEmpty() && notEnoughSpaceForRecording()) {
log.info("No space left -> Stopping all recordings");
stopRecordingProcesses();
EventBusHolder.BUS.post(new NoSpaceLeftEvent());
}
} catch (IOException e) {
LOG.error("Couldn't check space left on device", e);
log.error("Couldn't check space left on device", e);
}
}, 1, 1, TimeUnit.SECONDS);
}
private void loadModels() {
config.getSettings().models.forEach(m -> {
if (m.getSite() != null) {
if (m.getSite().isEnabled()) {
models.add(m);
} else {
LOG.info("{} disabled -> ignoring {}", m.getSite().getName(), m.getName());
}
} else {
LOG.info("Site for model {} is unknown -> ignoring", m.getName());
}
});
}
private void startCompletionHandler() {
downloadCompletionPool.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
ScheduledFuture<Recording> future = downloadFutureQueue.take();
rescheduleRecordingTask(future);
threadPoolScaler.tick();
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Error while getting recording result from download queue", e);
} catch (Exception e) {
LOG.error("Error in completion handler", e);
}
}
});
}
private void rescheduleRecordingTask(ScheduledFuture<Recording> future) throws InterruptedException {
try {
if (!future.isDone()) {
downloadFutureQueue.put(future);
} else {
Recording rec = future.get();
Download d = rec.getDownload();
downloadFutureRecordingMap.remove(future);
if (d.isRunning()) {
long delay = Math.max(0, Duration.between(Instant.now(), d.getRescheduleTime()).toMillis());
ScheduledFuture<Recording> rescheduledFuture = downloadPool.schedule(rec, delay, TimeUnit.MILLISECONDS);
downloadFutureQueue.add(rescheduledFuture);
} else {
segmentDownloadPool.submit(() -> {
deleteIfEmpty(rec);
removeRecordingProcess(rec);
if (rec.getStatus() == State.WAITING) {
LOG.info("Download finished for {} -> Starting post-processing", rec.getModel().getName());
submitPostProcessingJob(rec);
// check, if we have to restart the recording
Model model = rec.getModel();
tryRestartRecording(model);
} else {
setRecordingStatus(rec, State.FAILED);
}
});
}
}
} catch (ExecutionException | IllegalStateException e) {
fail(future, e);
}
}
private void fail(ScheduledFuture<Recording> future, Exception e) {
if (downloadFutureRecordingMap.containsKey(future)) {
Recording rec = downloadFutureRecordingMap.remove(future);
deleteIfEmpty(rec);
removeRecordingProcess(rec);
rec.getDownload().finalizeDownload();
LOG.error("Error while recording stream for model {}", rec.getModel(), e);
private void waitABit(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Interrupted while waiting in main loop. CPU usage might be high now :(");
}
}
private void processRecording(Recording recording) {
if (recording.getCurrentIteration().isDone()) {
if (recording.getRecordingProcess().isRunning()) {
try {
Instant rescheduleAt = recording.getCurrentIteration().get().getRescheduleTime();
Duration duration = Duration.between(Instant.now(), rescheduleAt);
long delayInMillis = Math.max(0, duration.toMillis());
log.trace("Current iteration is done {}. Recording status {}. Rescheduling in {}ms", recording.getModel().getName(), recording.getStatus().name(), delayInMillis);
scheduleRecording(recording, delayInMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
fail(recording);
} catch (ExecutionException e) {
// TODO react to different exceptions, e.g. with a retry
log.error("Error while recording model {}. Stopping recording.", recording.getModel(), e);
fail(recording);
}
} else {
removeRecordingProcess(recording);
if (deleteIfEmpty(recording)) {
return;
}
submitPostProcessingJob(recording);
}
} else {
LOG.error("Error while recording stream", e);
recordings.add(recording);
}
}
@ -180,12 +155,78 @@ public class NextGenLocalRecorder implements Recorder {
}
}
private void fail(Recording recording) {
stopRecordingProcess(recording.getModel());
recording.getRecordingProcess().finalizeDownload();
if (deleteIfEmpty(recording)) {
return;
}
submitPostProcessingJob(recording);
startRecordingProcess(recording.getModel());
}
private void scheduleRecording(Recording recording, long delayInMillis) {
ScheduledFuture<RecordingProcess> future = scheduler.schedule(recording.getRecordingProcess(), delayInMillis, TimeUnit.MILLISECONDS);
recording.setCurrentIteration(future);
recording.getSelectedResolution();
recordings.add(recording);
}
private void loadModels() {
config.getSettings().models.forEach(m -> {
if (m.getSite() != null) {
if (m.getSite().isEnabled()) {
models.add(m);
} else {
log.info("{} disabled -> ignoring {}", m.getSite().getName(), m.getName());
}
} else {
log.info("Site for model {} is unknown -> ignoring", m.getName());
}
});
}
private void shutdownPool(String name, ExecutorService executorService, int secondsToWaitForTermination) throws InterruptedException {
log.info("Stopping thread pool {}", name);
executorService.shutdown();
boolean terminated = executorService.awaitTermination(secondsToWaitForTermination, TimeUnit.SECONDS);
if (terminated) {
log.info("Thread pool {} terminated", name);
} else {
log.warn("{} did not terminate in time.", name);
}
}
private void stopRecordings() {
log.info("Stopping all recordings");
for (Recording recording : recordings) {
recording.getRecordingProcess().stop();
recording.getRecordingProcess().awaitEnd();
}
log.info("Recordings have been stopped");
}
// private void fail(ScheduledFuture<Recording> future, Exception e) {
// if (downloadFutureRecordingMap.containsKey(future)) {
// Recording rec = downloadFutureRecordingMap.remove(future);
// deleteIfEmpty(rec);
// removeRecordingProcess(rec);
// rec.getRecordingProcess().finalizeDownload();
// log.error("Error while recording stream for model {}", rec.getModel(), e);
// } else {
// log.error("Error while recording stream", e);
// }
// }
private void submitPostProcessingJob(Recording recording) {
ppPool.submit(() -> {
setRecordingStatus(recording, WAITING);
postProcessing.submit(() -> {
try {
recording.setDirtyFlag(true);
setRecordingStatus(recording, State.POST_PROCESSING);
recording.getDownload().finalizeDownload();
recording.getRecordingProcess().stop();
recording.getRecordingProcess().awaitEnd();
recording.setDirtyFlag(true);
recording.getRecordingProcess().finalizeDownload();
recording.refresh();
recordingManager.saveRecording(recording);
recording.postprocess();
@ -193,13 +234,13 @@ public class NextGenLocalRecorder implements Recorder {
PostProcessingContext ctx = createPostProcessingContext(recording);
for (PostProcessor postProcessor : postProcessors) {
if (postProcessor.isEnabled()) {
LOG.debug("Running post-processor: {}", postProcessor.getName());
log.debug("Running post-processor: {}", postProcessor.getName());
boolean continuePP = postProcessor.postprocess(ctx);
if (!continuePP) {
break;
}
} else {
LOG.debug("Skipping post-processor {} because it is disabled", postProcessor.getName());
log.debug("Skipping post-processor {} because it is disabled", postProcessor.getName());
}
}
recording.refresh();
@ -207,17 +248,17 @@ public class NextGenLocalRecorder implements Recorder {
setRecordingStatus(recording, State.FINISHED);
recordingManager.saveRecording(recording);
}
LOG.info("Post-processing finished for {}", recording.getModel().getName());
log.info("Post-processing finished for {}", recording.getModel().getName());
} catch (Exception e) {
if (e instanceof InterruptedException) { // NOSONAR
Thread.currentThread().interrupt();
}
LOG.error("Error while post-processing recording {}", recording, e);
log.error("Error while post-processing recording {}", recording, e);
recording.setStatus(State.FAILED);
try {
recordingManager.saveRecording(recording);
} catch (IOException e1) {
LOG.error("Couldn't update recording state for recording {}", recording, e1);
log.error("Couldn't update recording state for recording {}", recording, e1);
}
}
});
@ -229,13 +270,13 @@ public class NextGenLocalRecorder implements Recorder {
ctx.setRecorder(this);
ctx.setRecording(recording);
ctx.setRecordingManager(recordingManager);
ctx.setModelNotesService(new LocalModelNotesService(config));
ctx.setModelNotesService(new LocalModelNotesService(config)); // TODO
return ctx;
}
private void setRecordingStatus(Recording recording, State status) {
recording.setStatus(status);
RecordingStateChangedEvent evt = new RecordingStateChangedEvent(recording.getDownload().getTarget(), status, recording.getModel(),
RecordingStateChangedEvent evt = new RecordingStateChangedEvent(recording.getRecordingProcess().getTarget(), status, recording.getModel(),
recording.getStartDate());
EventBusHolder.BUS.post(evt);
}
@ -248,7 +289,7 @@ public class NextGenLocalRecorder implements Recorder {
throw new ModelIsIgnoredException(model);
}
LOG.info("Model {} added", model);
log.info("Model {} added", model);
recorderLock.lock();
try {
models.add(model);
@ -258,7 +299,7 @@ public class NextGenLocalRecorder implements Recorder {
config.getSettings().models.add(model);
config.save();
} catch (IOException e) {
LOG.error("Couldn't save config", e);
log.error("Couldn't save config", e);
} finally {
recorderLock.unlock();
}
@ -278,44 +319,17 @@ public class NextGenLocalRecorder implements Recorder {
existing.setRecordUntilSubsequentAction(src.getRecordUntilSubsequentAction());
}
private CompletableFuture<Void> startRecordingProcess(Model model) {
return CompletableFuture.runAsync(() -> {
recorderLock.lock();
try {
preconditions.check(model);
LOG.info("Starting recording for model {}", model.getName());
Download download = createDownload(model);
Recording rec = createRecording(download);
setRecordingStatus(rec, State.RECORDING);
rec.getModel().setLastRecorded(rec.getStartDate());
recordingManager.saveRecording(rec);
ScheduledFuture<Recording> future = downloadPool.schedule(rec, 0, TimeUnit.MILLISECONDS);
downloadFutureQueue.add(future);
downloadFutureRecordingMap.put(future, rec);
} catch (RecordUntilExpiredException e) {
LOG.info("Precondition not met to record {}: {}", model, e.getLocalizedMessage());
executeRecordUntilSubsequentAction(model);
} catch (PreconditionNotMetException e) {
LOG.info("Precondition not met to record {}: {}", model, e.getLocalizedMessage());
} catch (Exception e) {
LOG.error("Couldn't start recording process for {}", model, e);
} finally {
recorderLock.unlock();
}
}, segmentDownloadPool);
}
private Download createDownload(Model model) throws IOException {
Download download = model.createDownload();
private RecordingProcess createDownload(Model model) throws IOException {
RecordingProcess download = model.createDownload();
download.init(config, model, Instant.now(), segmentDownloadPool);
Objects.requireNonNull(download.getStartTime(),
"At this point the download should have set a startTime. Make sure to set a startTime in " + download.getClass() + ".init()");
LOG.debug("Downloading with {}", download.getClass().getSimpleName());
log.debug("Downloading with {}", download.getClass().getSimpleName());
return download;
}
private void executeRecordUntilSubsequentAction(Model model) {
LOG.debug("Stopping recording {} because the recording timeframe ended at {}. Subsequent action is {}", model,
log.debug("Stopping recording {} because the recording timeframe ended at {}. Subsequent action is {}", model,
model.getRecordUntil().atZone(ZoneId.systemDefault()), model.getRecordUntilSubsequentAction());
if (model.getRecordUntilSubsequentAction() == PAUSE) {
model.setSuspended(true);
@ -323,13 +337,13 @@ public class NextGenLocalRecorder implements Recorder {
try {
stopRecording(model);
} catch (Exception e1) {
LOG.error("Error while stopping recording", e1);
log.error("Error while stopping recording", e1);
}
} else if (model.getRecordUntilSubsequentAction() == RECORD_LATER) {
try {
markForLaterRecording(model, true);
} catch (Exception e1) {
LOG.error("Error while stopping recording", e1);
log.error("Error while stopping recording", e1);
}
}
// reset values, so that model can be recorded again
@ -337,11 +351,11 @@ public class NextGenLocalRecorder implements Recorder {
model.setRecordUntilSubsequentAction(PAUSE);
}
private Recording createRecording(Download download) throws IOException {
private Recording createRecording(RecordingProcess download) throws IOException {
Model model = download.getModel();
Recording rec = new Recording();
rec.setId(UUID.randomUUID().toString());
rec.setDownload(download);
rec.setRecordingProcess(download);
String recordingFile = download.getPath(model).replace('\\', '/');
File absoluteFile = new File(config.getSettings().recordingsDir, recordingFile);
rec.setAbsoluteFile(absoluteFile);
@ -359,17 +373,17 @@ public class NextGenLocalRecorder implements Recorder {
rec.refresh();
long sizeInByte = rec.getSizeInByte();
if (sizeInByte <= 0) {
LOG.info("Deleting empty recording {}", rec);
log.info("Deleting empty recording {}", rec);
delete(rec);
deleted = true;
}
setRecordingStatus(rec, deleted ? State.DELETED : State.WAITING);
setRecordingStatus(rec, deleted ? State.DELETED : WAITING);
if (!deleted) {
// only save the status, if the recording has not been deleted, otherwise we recreate the metadata file
recordingManager.saveRecording(rec);
}
} catch (IOException e) {
LOG.error("Couldn't execute post-processing step \"delete if empty\"", e);
log.error("Couldn't execute post-processing step \"delete if empty\"", e);
}
return deleted;
}
@ -381,7 +395,7 @@ public class NextGenLocalRecorder implements Recorder {
if (models.contains(model)) {
models.remove(model);
config.getSettings().models.remove(model);
LOG.info("Model {} removed", model);
log.info("Model {} removed", model);
config.save();
} else {
throw new NoSuchElementException("Model " + model.getName() + " [" + model.getUrl() + "] not found in list of recorded models");
@ -389,7 +403,7 @@ public class NextGenLocalRecorder implements Recorder {
if (recordingProcesses.containsKey(model)) {
Recording rec = recordingProcesses.get(model);
rec.getDownload().stop();
rec.getRecordingProcess().stop();
}
} finally {
recorderLock.unlock();
@ -402,7 +416,7 @@ public class NextGenLocalRecorder implements Recorder {
int index = models.indexOf(model);
models.get(index).setStreamUrlIndex(model.getStreamUrlIndex());
config.save();
LOG.debug("Switching stream source to index {} for model {}", model.getStreamUrlIndex(), model.getName());
log.debug("Switching stream source to index {} for model {}", model.getStreamUrlIndex(), model.getName());
recorderLock.lock();
try {
Recording rec = recordingProcesses.get(model);
@ -414,17 +428,18 @@ public class NextGenLocalRecorder implements Recorder {
}
tryRestartRecording(model);
} else {
LOG.warn("Couldn't switch stream source for model {}. Not found in list", model.getName());
log.warn("Couldn't switch stream source for model {}. Not found in list", model.getName());
}
}
void stopRecordingProcess(Model model) {
recorderLock.lock();
try {
LOG.debug("Stopping recording for {}", model);
log.debug("Stopping recording for {} - recording found: {}", model, recordingProcesses.get(model));
Recording rec = recordingProcesses.get(model);
LOG.debug("Stopping download for {}", model);
rec.getDownload().stop();
log.debug("Stopping download for {}", model);
rec.getRecordingProcess().stop();
recordingProcesses.remove(model);
} finally {
recorderLock.unlock();
}
@ -434,7 +449,7 @@ public class NextGenLocalRecorder implements Recorder {
recorderLock.lock();
try {
for (Recording rec : recordingProcesses.values()) {
rec.getDownload().stop();
rec.getRecordingProcess().stop();
}
} finally {
recorderLock.unlock();
@ -463,45 +478,20 @@ public class NextGenLocalRecorder implements Recorder {
@Override
public void shutdown(boolean immediately) {
LOG.info("Shutting down");
recording = false;
log.info("Shutting down");
if (!immediately) {
stopRecordingProcesses();
awaitDownloadsFinish();
shutdownThreadPools();
}
}
private void awaitDownloadsFinish() {
LOG.info("Waiting for downloads to finish");
for (int i = 0; i < 60; i++) {
if (!recordingProcesses.isEmpty()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Error while waiting for downloads to finish", e);
}
try {
stopRecordings();
shutdownPool("Scheduler", scheduler, 60);
shutdownPool("PlaylistDownloadPool", playlistDownloadPool, 60);
shutdownPool("SegmentDownloadPool", segmentDownloadPool, 60);
shutdownPool("Post-Processing", postProcessing, 600);
} catch (InterruptedException e) {
log.warn("Interrupted while waiting for recordings to finish");
Thread.currentThread().interrupt();
}
}
}
private void shutdownThreadPools() {
try {
LOG.info("Shutting down download pool");
downloadPool.shutdown();
client.shutdown();
downloadPool.awaitTermination(1, TimeUnit.MINUTES);
LOG.info("Shutting down post-processing pool");
ppPool.shutdown();
int minutesToWait = 10;
LOG.info("Waiting {} minutes (max) for post-processing to finish", minutesToWait);
ppPool.awaitTermination(minutesToWait, TimeUnit.MINUTES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Error while waiting for pools to finish", e);
}
running = false;
}
@Override
@ -514,14 +504,14 @@ public class NextGenLocalRecorder implements Recorder {
model.setSuspended(true);
config.save();
} else {
LOG.warn("Couldn't suspend model {}. Not found in list", model.getName());
log.warn("Couldn't suspend model {}. Not found in list", model.getName());
return;
}
Recording rec = recordingProcesses.get(model);
Optional.ofNullable(rec).map(Recording::getDownload).ifPresent(Download::stop);
Optional.ofNullable(rec).map(Recording::getRecordingProcess).ifPresent(RecordingProcess::stop);
} catch (IOException e) {
LOG.error("Couldn't save config", e);
log.error("Couldn't save config", e);
} finally {
recorderLock.unlock();
}
@ -541,7 +531,7 @@ public class NextGenLocalRecorder implements Recorder {
config.save();
startRecordingProcess(m);
} else {
LOG.warn("Couldn't resume model {}. Not found in list", model.getName());
log.warn("Couldn't resume model {}. Not found in list", model.getName());
}
} finally {
recorderLock.unlock();
@ -570,19 +560,19 @@ public class NextGenLocalRecorder implements Recorder {
Optional<Model> existingModel = findModel(model);
if (existingModel.isPresent()) {
Model m = existingModel.get();
LOG.debug("Mark for later: {}. Model found: {}", mark, m);
log.debug("Mark for later: {}. Model found: {}", mark, m);
m.setMarkedForLaterRecording(mark);
if (mark && getCurrentlyRecording().contains(m)) {
LOG.debug("Stopping recording of {}", m);
log.debug("Stopping recording of {}", m);
stopRecordingProcess(m);
}
if (!mark) {
LOG.debug("Removing model: {}", m);
log.debug("Removing model: {}", m);
stopRecording(model);
}
} else {
if (mark) {
LOG.debug("Model {} not found to mark for later recording", model);
log.debug("Model {} not found to mark for later recording", model);
model.setMarkedForLaterRecording(true);
addModel(model);
}
@ -614,7 +604,7 @@ public class NextGenLocalRecorder implements Recorder {
} catch (Exception e) {
return false;
}
}).collect(Collectors.toList());
}).toList();
}
@Override
@ -641,17 +631,17 @@ public class NextGenLocalRecorder implements Recorder {
return store;
}
boolean enoughSpaceForRecording() throws IOException {
boolean notEnoughSpaceForRecording() throws IOException {
long minimum = config.getSettings().minimumSpaceLeftInBytes;
if (minimum == 0) { // 0 means don't check
return getFreeSpaceBytes() > 100 * 1024 * 1024; // leave at least 100 MiB free
return getFreeSpaceBytes() <= 100 * 1024 * 1024; // leave at least 100 MiB free
} else {
return getFreeSpaceBytes() > minimum;
return getFreeSpaceBytes() <= minimum;
}
}
private void tryRestartRecording(Model model) {
if (!recording) {
if (!running) {
// recorder is not in recording state
return;
}
@ -660,14 +650,14 @@ public class NextGenLocalRecorder implements Recorder {
boolean modelInRecordingList = isTracked(model);
boolean online = model.isOnline(IGNORE_CACHE);
if (modelInRecordingList && online) {
LOG.info("Restarting recording for model {}", model);
log.info("Restarting recording for model {}", model);
startRecordingProcess(model);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Couldn't restart recording for model {}", model);
log.error("Couldn't restart recording for model {}", model);
} catch (Exception e) {
LOG.error("Couldn't restart recording for model {}", model);
log.error("Couldn't restart recording for model {}", model);
}
}
@ -680,12 +670,13 @@ public class NextGenLocalRecorder implements Recorder {
if (e.getType() == MODEL_ONLINE) {
ModelIsOnlineEvent evt = (ModelIsOnlineEvent) e;
Model model = evt.getModel();
log.trace("Model online event: {} - suspended:{} - already recording:{}", model, model.isSuspended(), recordingProcesses.containsKey(model));
if (!isSuspended(model) && !recordingProcesses.containsKey(model)) {
startRecordingProcess(model);
}
}
} catch (Exception e1) {
LOG.error("Error while handling model state changed event {}", e, e1);
log.error("Error while handling model state changed event {}", e, e1);
} finally {
recorderLock.unlock();
}
@ -693,6 +684,31 @@ public class NextGenLocalRecorder implements Recorder {
});
}
private CompletableFuture<Void> startRecordingProcess(Model model) {
return CompletableFuture.runAsync(() -> {
recorderLock.lock();
try {
preconditions.check(model);
log.info("Starting recording for model {}", model.getName());
RecordingProcess download = createDownload(model);
Recording rec = createRecording(download);
setRecordingStatus(rec, State.RECORDING);
rec.getModel().setLastRecorded(rec.getStartDate());
recordingManager.saveRecording(rec);
scheduleRecording(rec, 0);
} catch (RecordUntilExpiredException e) {
log.info("Precondition not met to record {}: {}", model, e.getLocalizedMessage());
executeRecordUntilSubsequentAction(model);
} catch (PreconditionNotMetException e) {
log.info("Precondition not met to record {}: {}", model, e.getLocalizedMessage());
} catch (Exception e) {
log.error("Couldn't start recording process for {}", model, e);
} finally {
recorderLock.unlock();
}
}, segmentDownloadPool);
}
private ThreadFactory createThreadFactory(String name, int priority) {
return r -> {
Thread t = new Thread(r);
@ -706,19 +722,18 @@ public class NextGenLocalRecorder implements Recorder {
@Override
public void rerunPostProcessing(Recording recording) throws IOException {
recording.setPostProcessedFile(null);
List<Recording> recordings = recordingManager.getAll();
for (Recording other : recordings) {
for (Recording other : recordingManager.getAll()) {
if (other.equals(recording)) {
Download download = other.getModel().createDownload();
RecordingProcess download = other.getModel().createDownload();
download.init(Config.getInstance(), other.getModel(), other.getStartDate(), segmentDownloadPool);
other.setDownload(download);
other.setRecordingProcess(download);
other.setPostProcessedFile(null);
other.setStatus(State.WAITING);
other.setStatus(WAITING);
submitPostProcessingJob(other);
return;
}
}
LOG.error("Recording {} not found. Can't rerun post-processing", recording);
log.error("Recording {} not found. Can't rerun post-processing", recording);
}
@Override
@ -730,10 +745,10 @@ public class NextGenLocalRecorder implements Recorder {
models.get(index).setPriority(model.getPriority());
config.save();
} else {
LOG.warn("Couldn't change priority for model {}. Not found in list", model.getName());
log.warn("Couldn't change priority for model {}. Not found in list", model.getName());
}
} catch (IOException e) {
LOG.error("Couldn't save config", e);
log.error("Couldn't save config", e);
} finally {
recorderLock.unlock();
}
@ -763,7 +778,7 @@ public class NextGenLocalRecorder implements Recorder {
Model m = models.get(index);
m.setRecordUntil(model.getRecordUntil());
m.setRecordUntilSubsequentAction(model.getRecordUntilSubsequentAction());
LOG.debug("Stopping recording of model {} at {} and then {}", m, model.getRecordUntil(), m.getRecordUntilSubsequentAction());
log.debug("Stopping recording of model {} at {} and then {}", m, model.getRecordUntil(), m.getRecordUntilSubsequentAction());
config.save();
} else {
throw new NoSuchElementException("Model " + model.getName() + " [" + model.getUrl() + "] not found in list of recorded models");
@ -777,8 +792,8 @@ public class NextGenLocalRecorder implements Recorder {
}
}
boolean isRecording() {
return recording;
boolean isRunning() {
return running;
}
Map<Model, Recording> getRecordingProcesses() {
@ -787,20 +802,20 @@ public class NextGenLocalRecorder implements Recorder {
@Override
public void pause() throws InvalidKeyException, NoSuchAlgorithmException, IOException {
LOG.info("Pausing recorder");
log.info("Pausing recorder");
try {
recording = false;
running = false;
stopRecordingProcesses();
} catch (Exception e) {
recording = true;
running = true;
throw e;
}
}
@Override
public void resume() throws InvalidKeyException, NoSuchAlgorithmException, IOException {
LOG.info("Resuming recorder");
recording = true;
log.info("Resuming recorder");
running = true;
}
@Override

View File

@ -1,19 +1,18 @@
package ctbrec.recorder;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slf4j
public class ThreadPoolScaler {
private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolScaler.class);
private final ThreadPoolExecutor threadPool;
private final int configuredPoolSize;
private ThreadPoolExecutor threadPool;
private int configuredPoolSize;
private int[] values = new int[20];
private final int[] values = new int[20];
private int index = -1;
private Instant lastAdjustment = Instant.now();
private Instant downScaleCoolDown = Instant.EPOCH;
@ -36,29 +35,26 @@ public class ThreadPoolScaler {
if (average > 0.65 * coreSize) {
threadPool.setCorePoolSize(coreSize + 1);
downScaleCoolDown = now.plusSeconds(30);
if (LOG.isTraceEnabled()) {
LOG.trace("Adjusted scheduler pool size to {}", threadPool.getCorePoolSize());
if (log.isTraceEnabled()) {
log.trace("Adjusted scheduler pool size to {}", threadPool.getCorePoolSize());
}
} else if (average < 0.15 * coreSize) {
int newValue = Math.max(configuredPoolSize, coreSize - 1);
if (threadPool.getCorePoolSize() != newValue && now.isAfter(downScaleCoolDown)) {
threadPool.setCorePoolSize(newValue);
downScaleCoolDown = now.plusSeconds(10);
LOG.trace("Adjusted scheduler pool size to {}", threadPool.getCorePoolSize());
log.trace("Adjusted scheduler pool size to {}", threadPool.getCorePoolSize());
}
}
lastAdjustment = now;
if (LOG.isTraceEnabled()) {
LOG.trace("Thread pool size is {}", threadPool.getCorePoolSize());
if (log.isTraceEnabled()) {
log.trace("Thread pool size is {}", threadPool.getCorePoolSize());
}
}
}
private double calculateAverage() {
int sum = 0;
for (int i = 0; i < values.length; i++) {
sum += values[i];
}
int sum = IntStream.of(values).sum();
double average = sum / (double) values.length;
return average;
}

View File

@ -1,9 +1,5 @@
package ctbrec.recorder.download;
import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.ExecutorService;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.Settings;
@ -13,7 +9,11 @@ import ctbrec.recorder.download.hls.NoopSplittingStrategy;
import ctbrec.recorder.download.hls.SizeSplittingStrategy;
import ctbrec.recorder.download.hls.TimeSplittingStrategy;
public abstract class AbstractDownload implements Download {
import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.ExecutorService;
public abstract class AbstractDownload implements RecordingProcess {
protected Instant startTime;
protected Instant rescheduleTime = Instant.now();
@ -45,21 +45,21 @@ public abstract class AbstractDownload implements Download {
protected SplittingStrategy initSplittingStrategy(Settings settings) {
SplittingStrategy strategy;
switch (settings.splitStrategy) {
case TIME:
strategy = new TimeSplittingStrategy();
break;
case SIZE:
strategy = new SizeSplittingStrategy();
break;
case TIME_OR_SIZE:
SplittingStrategy timeSplittingStrategy = new TimeSplittingStrategy();
SplittingStrategy sizeSplittingStrategy = new SizeSplittingStrategy();
strategy = new CombinedSplittingStrategy(timeSplittingStrategy, sizeSplittingStrategy);
break;
case DONT:
default:
strategy = new NoopSplittingStrategy();
break;
case TIME:
strategy = new TimeSplittingStrategy();
break;
case SIZE:
strategy = new SizeSplittingStrategy();
break;
case TIME_OR_SIZE:
SplittingStrategy timeSplittingStrategy = new TimeSplittingStrategy();
SplittingStrategy sizeSplittingStrategy = new SizeSplittingStrategy();
strategy = new CombinedSplittingStrategy(timeSplittingStrategy, sizeSplittingStrategy);
break;
case DONT:
default:
strategy = new NoopSplittingStrategy();
break;
}
strategy.init(settings);
return strategy;
@ -74,4 +74,9 @@ public abstract class AbstractDownload implements Download {
public int getSelectedResolution() {
return StreamSource.UNKNOWN;
}
@Override
public void awaitEnd() {
// do nothing per default
}
}

View File

@ -1,47 +1,59 @@
package ctbrec.recorder.download;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.Recording;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.Recording;
public interface Download extends Callable<Download> {
public interface RecordingProcess extends Callable<RecordingProcess> {
void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException;
void stop();
void finalizeDownload();
boolean isRunning();
Model getModel();
Instant getStartTime();
Instant getRescheduleTime();
void postprocess(Recording recording);
void postProcess(Recording recording);
int getSelectedResolution();
/**
* Returns the path to the recording in the filesystem as file object
* @param model
*
* @return
* @see #getPath(Model)
*/
public File getTarget();
File getTarget();
/**
* Returns the path to the recording starting from the configured recordings directory
*
* @param model
* @return
* @see #getTarget()
*/
public String getPath(Model model);
String getPath(Model model);
/**
* Specifies, if the final result of this recording is a single file or consists of segments + playlist
*
* @return true, if the recording is only a single file
*/
public boolean isSingleFile();
boolean isSingleFile();
public long getSizeInByte();
long getSizeInByte();
void awaitEnd();
}

View File

@ -5,5 +5,6 @@ import ctbrec.Settings;
public interface SplittingStrategy {
void init(Settings settings);
boolean splitNecessary(Download download);
boolean splitNecessary(RecordingProcess download);
}

View File

@ -1,13 +1,26 @@
package ctbrec.recorder.download.dash;
import static ctbrec.Recording.State.*;
import static ctbrec.io.HttpConstants.*;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.Recording;
import ctbrec.io.BandwidthMeter;
import ctbrec.io.HttpClient;
import ctbrec.io.HttpException;
import ctbrec.io.IoUtils;
import ctbrec.recorder.download.AbstractDownload;
import ctbrec.recorder.download.dash.SegmentTimelineType.S;
import ctbrec.recorder.download.hls.NoStreamFoundException;
import ctbrec.recorder.download.hls.PostProcessingException;
import okhttp3.Request;
import okhttp3.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import java.io.*;
import java.math.BigInteger;
import java.net.URL;
import java.nio.file.Path;
@ -22,27 +35,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import ctbrec.recorder.download.hls.NoStreamFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.Recording;
import ctbrec.io.BandwidthMeter;
import ctbrec.io.HttpClient;
import ctbrec.io.HttpException;
import ctbrec.io.IoUtils;
import ctbrec.recorder.download.AbstractDownload;
import ctbrec.recorder.download.dash.SegmentTimelineType.S;
import ctbrec.recorder.download.hls.PostProcessingException;
import okhttp3.Request;
import okhttp3.Response;
import static ctbrec.Recording.State.POST_PROCESSING;
import static ctbrec.io.HttpConstants.*;
public class DashDownload extends AbstractDownload {
private static final String CONTENT_LENGTH = "Content-Length";
@ -367,7 +361,7 @@ public class DashDownload extends AbstractDownload {
}
@Override
public void postprocess(Recording recording) {
public void postProcess(Recording recording) {
try {
Thread.currentThread().setName("PP " + model.getName());
recording.setStatus(POST_PROCESSING);

View File

@ -1,7 +1,7 @@
package ctbrec.recorder.download.hls;
import ctbrec.Settings;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.recorder.download.SplittingStrategy;
public class CombinedSplittingStrategy implements SplittingStrategy {
@ -20,7 +20,7 @@ public class CombinedSplittingStrategy implements SplittingStrategy {
}
@Override
public boolean splitNecessary(Download download) {
public boolean splitNecessary(RecordingProcess download) {
for (SplittingStrategy splittingStrategy : splittingStrategies) {
if (splittingStrategy.splitNecessary(download)) {
return true;

View File

@ -1,5 +1,19 @@
package ctbrec.recorder.download.hls;
import com.iheartradio.m3u8.ParseException;
import com.iheartradio.m3u8.PlaylistException;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.OS;
import ctbrec.Recording;
import ctbrec.io.HttpClient;
import ctbrec.io.StreamRedirector;
import ctbrec.recorder.download.ProcessExitedUncleanException;
import ctbrec.recorder.download.hls.SegmentPlaylist.Segment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.xml.bind.JAXBException;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@ -12,23 +26,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;
import javax.xml.bind.JAXBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.iheartradio.m3u8.ParseException;
import com.iheartradio.m3u8.PlaylistException;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.OS;
import ctbrec.Recording;
import ctbrec.io.HttpClient;
import ctbrec.io.StreamRedirector;
import ctbrec.recorder.download.ProcessExitedUncleanException;
import ctbrec.recorder.download.hls.SegmentPlaylist.Segment;
/**
* Does the whole HLS download with FFmpeg. Not used at the moment, because FFMpeg can't
* handle the HLS encryption of Flirt4Free correctly
@ -64,7 +61,7 @@ public class FFmpegDownload extends AbstractHlsDownload {
argsPlusFile[i++] = "-i";
argsPlusFile[i++] = chunkPlaylist;
System.arraycopy(args, 0, argsPlusFile, i, args.length);
argsPlusFile[argsPlusFile.length-1] = targetFile.getAbsolutePath();
argsPlusFile[argsPlusFile.length - 1] = targetFile.getAbsolutePath();
String[] cmdline = OS.getFFmpegCommand(argsPlusFile);
LOG.debug("Command line: {}", Arrays.toString(cmdline));
ffmpeg = Runtime.getRuntime().exec(cmdline, new String[0], targetFile.getParentFile());
@ -119,7 +116,7 @@ public class FFmpegDownload extends AbstractHlsDownload {
}
@Override
public void postprocess(Recording recording) {
public void postProcess(Recording recording) {
// nothing to here for now
}

View File

@ -155,7 +155,7 @@ public class HlsDownload extends AbstractHlsDownload {
}
@Override
public void postprocess(Recording recording) {
public void postProcess(Recording recording) {
// nothing to do
}

View File

@ -1,8 +1,18 @@
package ctbrec.recorder.download.hls;
import static ctbrec.recorder.download.StreamSource.*;
import static java.util.concurrent.TimeUnit.*;
import com.iheartradio.m3u8.ParseException;
import com.iheartradio.m3u8.PlaylistException;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.OS;
import ctbrec.Recording;
import ctbrec.recorder.download.AbstractDownload;
import ctbrec.recorder.download.ProcessExitedUncleanException;
import ctbrec.recorder.download.StreamSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.xml.bind.JAXBException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
@ -17,26 +27,12 @@ import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.xml.bind.JAXBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.iheartradio.m3u8.ParseException;
import com.iheartradio.m3u8.PlaylistException;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.OS;
import ctbrec.Recording;
import ctbrec.recorder.download.AbstractDownload;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.ProcessExitedUncleanException;
import ctbrec.recorder.download.StreamSource;
import static ctbrec.recorder.download.StreamSource.UNKNOWN;
import static java.util.concurrent.TimeUnit.SECONDS;
public class HlsdlDownload extends AbstractDownload {
private static final transient Logger LOG = LoggerFactory.getLogger(HlsdlDownload.class);
private static final Logger LOG = LoggerFactory.getLogger(HlsdlDownload.class);
protected File targetFile;
@ -64,7 +60,7 @@ public class HlsdlDownload extends AbstractDownload {
}
@Override
public Download call() throws Exception {
public HlsdlDownload call() throws Exception {
try {
if (running && !hlsdlProcess.isAlive()) {
running = false;
@ -176,7 +172,7 @@ public class HlsdlDownload extends AbstractDownload {
}
@Override
public void postprocess(Recording recording) {
public void postProcess(Recording recording) {
// nothing to do
}

View File

@ -1,23 +1,5 @@
package ctbrec.recorder.download.hls;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.time.Instant;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.OS;
@ -26,6 +8,19 @@ import ctbrec.io.HttpClient;
import ctbrec.recorder.FFmpeg;
import ctbrec.recorder.download.ProcessExitedUncleanException;
import ctbrec.recorder.download.hls.SegmentPlaylist.Segment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.time.Instant;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
@ -195,7 +190,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
}
@Override
public void postprocess(Recording recording) {
public void postProcess(Recording recording) {
// nothing to do
}

View File

@ -1,7 +1,7 @@
package ctbrec.recorder.download.hls;
import ctbrec.Settings;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.recorder.download.SplittingStrategy;
public class NoopSplittingStrategy implements SplittingStrategy {
@ -12,7 +12,7 @@ public class NoopSplittingStrategy implements SplittingStrategy {
}
@Override
public boolean splitNecessary(Download download) {
public boolean splitNecessary(RecordingProcess download) {
return false;
}

View File

@ -1,7 +1,7 @@
package ctbrec.recorder.download.hls;
import ctbrec.Settings;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.recorder.download.SplittingStrategy;
public class SizeSplittingStrategy implements SplittingStrategy {
@ -14,7 +14,7 @@ public class SizeSplittingStrategy implements SplittingStrategy {
}
@Override
public boolean splitNecessary(Download download) {
public boolean splitNecessary(RecordingProcess download) {
long sizeInByte = download.getSizeInByte();
return sizeInByte >= settings.splitRecordingsBiggerThanBytes;
}

View File

@ -1,13 +1,13 @@
package ctbrec.recorder.download.hls;
import ctbrec.Settings;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.recorder.download.SplittingStrategy;
import java.time.Duration;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import ctbrec.Settings;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.SplittingStrategy;
public class TimeSplittingStrategy implements SplittingStrategy {
private Settings settings;
@ -18,7 +18,7 @@ public class TimeSplittingStrategy implements SplittingStrategy {
}
@Override
public boolean splitNecessary(Download download) {
public boolean splitNecessary(RecordingProcess download) {
ZonedDateTime startTime = download.getStartTime().atZone(ZoneId.systemDefault());
Duration recordingDuration = Duration.between(startTime, ZonedDateTime.now());
long seconds = recordingDuration.getSeconds();

View File

@ -7,7 +7,7 @@ import ctbrec.io.BandwidthMeter;
import ctbrec.io.HttpClient;
import ctbrec.io.HttpException;
import ctbrec.recorder.download.AbstractDownload;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.recorder.download.StreamSource;
import okhttp3.Request;
import okhttp3.Response;
@ -29,105 +29,105 @@ import static ctbrec.io.HttpConstants.*;
public class AmateurTvDownload extends AbstractDownload {
private static final Logger LOG = LoggerFactory.getLogger(AmateurTvDownload.class);
private static final int MAX_SECONDS_WITHOUT_TRANSFER = 20;
private static final Logger LOG = LoggerFactory.getLogger(AmateurTvDownload.class);
private static final int MAX_SECONDS_WITHOUT_TRANSFER = 20;
private final HttpClient httpClient;
private FileOutputStream fout;
private Instant timeOfLastTransfer = Instant.MAX;
private final HttpClient httpClient;
private FileOutputStream fout;
private Instant timeOfLastTransfer = Instant.MAX;
private volatile boolean running;
private volatile boolean started;
private volatile boolean running;
private volatile boolean started;
private File targetFile;
private File targetFile;
public AmateurTvDownload(HttpClient httpClient) {
this.httpClient = httpClient;
}
public AmateurTvDownload(HttpClient httpClient) {
this.httpClient = httpClient;
}
@Override
public void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException {
this.config = config;
this.model = model;
this.startTime = startTime;
this.downloadExecutor = executorService;
splittingStrategy = initSplittingStrategy(config.getSettings());
targetFile = config.getFileForRecording(model, "mp4", startTime);
timeOfLastTransfer = Instant.now();
}
@Override
public void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException {
this.config = config;
this.model = model;
this.startTime = startTime;
this.downloadExecutor = executorService;
splittingStrategy = initSplittingStrategy(config.getSettings());
targetFile = config.getFileForRecording(model, "mp4", startTime);
timeOfLastTransfer = Instant.now();
}
@Override
public void stop() {
running = false;
}
@Override
public void stop() {
running = false;
}
@Override
public void finalizeDownload() {
if (fout != null) {
try {
LOG.debug("Closing recording file {}", targetFile);
fout.close();
} catch (IOException e) {
LOG.error("Error while closing recording file {}", targetFile, e);
}
@Override
public void finalizeDownload() {
if (fout != null) {
try {
LOG.debug("Closing recording file {}", targetFile);
fout.close();
} catch (IOException e) {
LOG.error("Error while closing recording file {}", targetFile, e);
}
}
}
@Override
public boolean isRunning() {
return running;
@Override
public boolean isRunning() {
return running;
}
@Override
public void postProcess(Recording recording) {
// nothing to do
}
@Override
public File getTarget() {
return targetFile;
}
@Override
public String getPath(Model model) {
String absolutePath = targetFile.getAbsolutePath();
String recordingsDir = Config.getInstance().getSettings().recordingsDir;
String relativePath = absolutePath.replaceFirst(Pattern.quote(recordingsDir), "");
return relativePath;
}
@Override
public boolean isSingleFile() {
return true;
}
@Override
public long getSizeInByte() {
return getTarget().length();
}
@Override
public RecordingProcess call() throws Exception {
if (!started) {
started = true;
startDownload();
}
@Override
public void postprocess(Recording recording) {
// nothing to do
if (splittingStrategy.splitNecessary(this)) {
stop();
rescheduleTime = Instant.now();
} else {
rescheduleTime = Instant.now().plusSeconds(5);
}
@Override
public File getTarget() {
return targetFile;
if (!model.isOnline(true)) {
stop();
}
@Override
public String getPath(Model model) {
String absolutePath = targetFile.getAbsolutePath();
String recordingsDir = Config.getInstance().getSettings().recordingsDir;
String relativePath = absolutePath.replaceFirst(Pattern.quote(recordingsDir), "");
return relativePath;
}
@Override
public boolean isSingleFile() {
return true;
}
@Override
public long getSizeInByte() {
return getTarget().length();
}
@Override
public Download call() throws Exception {
if (!started) {
started = true;
startDownload();
}
if (splittingStrategy.splitNecessary(this)) {
stop();
rescheduleTime = Instant.now();
} else {
rescheduleTime = Instant.now().plusSeconds(5);
}
if (!model.isOnline(true)) {
stop();
}
if (Duration.between(timeOfLastTransfer, Instant.now()).getSeconds() > MAX_SECONDS_WITHOUT_TRANSFER) {
LOG.info("No video data received for {} seconds. Stopping recording for model {}", MAX_SECONDS_WITHOUT_TRANSFER, model);
stop();
}
return this;
if (Duration.between(timeOfLastTransfer, Instant.now()).getSeconds() > MAX_SECONDS_WITHOUT_TRANSFER) {
LOG.info("No video data received for {} seconds. Stopping recording for model {}", MAX_SECONDS_WITHOUT_TRANSFER, model);
stop();
}
return this;
}
private void startDownload() {
downloadExecutor.submit(() -> {
@ -161,7 +161,7 @@ public class AmateurTvDownload extends AbstractDownload {
}
}
} catch (Exception e) {
LOG.error("Error while downloading MP4", e);
LOG.error("Error while downloading MP4", e);
}
running = false;
});

View File

@ -7,7 +7,7 @@ import ctbrec.AbstractModel;
import ctbrec.Config;
import ctbrec.io.HttpClient;
import ctbrec.io.HttpException;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.recorder.download.StreamSource;
import okhttp3.FormBody;
import okhttp3.Request;
@ -183,7 +183,7 @@ public class AmateurTvModel extends AbstractModel {
}
@Override
public Download createDownload() {
public RecordingProcess createDownload() {
return new AmateurTvDownload(getSite().getHttpClient());
}
}

View File

@ -1,6 +1,23 @@
package ctbrec.sites.fc2live;
import static ctbrec.io.HttpConstants.*;
import com.iheartradio.m3u8.*;
import com.iheartradio.m3u8.data.MasterPlaylist;
import com.iheartradio.m3u8.data.Playlist;
import com.iheartradio.m3u8.data.PlaylistData;
import com.iheartradio.m3u8.data.StreamInfo;
import com.squareup.moshi.JsonReader;
import com.squareup.moshi.JsonWriter;
import ctbrec.AbstractModel;
import ctbrec.Config;
import ctbrec.io.HttpException;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.recorder.download.StreamSource;
import okhttp3.*;
import okio.ByteString;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
@ -13,35 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.iheartradio.m3u8.Encoding;
import com.iheartradio.m3u8.Format;
import com.iheartradio.m3u8.ParseException;
import com.iheartradio.m3u8.PlaylistException;
import com.iheartradio.m3u8.PlaylistParser;
import com.iheartradio.m3u8.data.MasterPlaylist;
import com.iheartradio.m3u8.data.Playlist;
import com.iheartradio.m3u8.data.PlaylistData;
import com.iheartradio.m3u8.data.StreamInfo;
import com.squareup.moshi.JsonReader;
import com.squareup.moshi.JsonWriter;
import ctbrec.AbstractModel;
import ctbrec.Config;
import ctbrec.io.HttpException;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.StreamSource;
import okhttp3.FormBody;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;
import static ctbrec.io.HttpConstants.*;
public class Fc2Model extends AbstractModel {
private static final Logger LOG = LoggerFactory.getLogger(Fc2Model.class);
@ -57,7 +46,7 @@ public class Fc2Model extends AbstractModel {
@Override
public boolean isOnline(boolean ignoreCache) throws IOException, ExecutionException, InterruptedException {
if(ignoreCache) {
if (ignoreCache) {
loadModelInfo();
}
return online;
@ -79,8 +68,8 @@ public class Fc2Model extends AbstractModel {
.header(USER_AGENT, Config.getInstance().getSettings().httpUserAgent)
.header(X_REQUESTED_WITH, XML_HTTP_REQUEST)
.build();
try(Response resp = getSite().getHttpClient().execute(req)) {
if(resp.isSuccessful()) {
try (Response resp = getSite().getHttpClient().execute(req)) {
if (resp.isSuccessful()) {
String msg = resp.body().string();
JSONObject json = new JSONObject(msg);
// LOG.debug(json.toString(2));
@ -88,7 +77,7 @@ public class Fc2Model extends AbstractModel {
JSONObject channelData = data.getJSONObject("channel_data");
online = channelData.optInt("is_publish") == 1;
onlineState = online ? State.ONLINE : State.OFFLINE;
if(channelData.optInt("fee") == 1) {
if (channelData.optInt("fee") == 1) {
onlineState = State.PRIVATE;
online = false;
}
@ -105,9 +94,9 @@ public class Fc2Model extends AbstractModel {
@Override
public State getOnlineState(boolean failFast) throws IOException, ExecutionException {
if(failFast) {
if (failFast) {
return onlineState;
} else if(Objects.equals(onlineState, State.UNKNOWN)){
} else if (Objects.equals(onlineState, State.UNKNOWN)) {
loadModelInfo();
}
return onlineState;
@ -139,8 +128,8 @@ public class Fc2Model extends AbstractModel {
.header(ORIGIN, Fc2Live.BASE_URL)
.header(REFERER, getUrl())
.build();
try(Response response = site.getHttpClient().execute(req)) {
if(response.isSuccessful()) {
try (Response response = site.getHttpClient().execute(req)) {
if (response.isSuccessful()) {
InputStream inputStream = response.body().byteStream();
PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8);
Playlist playlist = parser.parse();
@ -189,11 +178,11 @@ public class Fc2Model extends AbstractModel {
.header(X_REQUESTED_WITH, XML_HTTP_REQUEST)
.build();
LOG.debug("Fetching page {}", url);
try(Response resp = getSite().getHttpClient().execute(req)) {
if(resp.isSuccessful()) {
try (Response resp = getSite().getHttpClient().execute(req)) {
if (resp.isSuccessful()) {
String msg = resp.body().string();
JSONObject json = new JSONObject(msg);
if(json.has("url")) {
if (json.has("url")) {
String wssurl = json.getString("url");
String token = json.getString("control_token");
callback.accept(token, wssurl);
@ -232,7 +221,7 @@ public class Fc2Model extends AbstractModel {
}
private boolean followUnfollow(String mode) throws IOException {
if(!getSite().getHttpClient().login()) {
if (!getSite().getHttpClient().login()) {
throw new IOException("Login didn't work");
}
@ -246,8 +235,8 @@ public class Fc2Model extends AbstractModel {
.header("Content-Type", "application/x-www-form-urlencoded")
.post(body)
.build();
try(Response resp = getSite().getHttpClient().execute(req)) {
if(resp.isSuccessful()) {
try (Response resp = getSite().getHttpClient().execute(req)) {
if (resp.isSuccessful()) {
String content = resp.body().string();
JSONObject json = new JSONObject(content);
return json.optInt("status") == 1;
@ -285,7 +274,7 @@ public class Fc2Model extends AbstractModel {
messageId = 1;
int usage = websocketUsage.incrementAndGet();
LOG.debug("{} objects using the websocket for {}", usage, this);
if(ws != null) {
if (ws != null) {
return;
} else {
Object monitor = new Object();
@ -311,10 +300,10 @@ public class Fc2Model extends AbstractModel {
@Override
public void onMessage(WebSocket webSocket, String text) {
JSONObject json = new JSONObject(text);
if(json.optString("name").equals("_response_")) {
if(json.has("arguments")) {
if (json.optString("name").equals("_response_")) {
if (json.has("arguments")) {
JSONObject args = json.getJSONObject("arguments");
if(args.has("playlists_high_latency")) {
if (args.has("playlists_high_latency")) {
JSONArray playlists = args.getJSONArray("playlists_high_latency");
JSONObject playlist = playlists.getJSONObject(0);
playlistUrl = playlist.getString("url");
@ -326,7 +315,7 @@ public class Fc2Model extends AbstractModel {
LOG.trace(json.toString());
}
}
} else if(json.optString("name").equals("user_count") || json.optString("name").equals("comment")) {
} else if (json.optString("name").equals("user_count") || json.optString("name").equals("comment")) {
// ignore
} else {
LOG.trace("WS <-- {}: {}", getName(), text);
@ -334,7 +323,7 @@ public class Fc2Model extends AbstractModel {
// send heartbeat every now and again
long now = System.currentTimeMillis();
if( (now - lastHeartBeat) > TimeUnit.SECONDS.toMillis(30)) {
if ((now - lastHeartBeat) > TimeUnit.SECONDS.toMillis(30)) {
webSocket.send("{\"name\":\"heartbeat\",\"arguments\":{},\"id\":" + messageId + "}");
lastHeartBeat = now;
LOG.trace("Sending heartbeat for {} (messageId: {})", getName(), messageId);
@ -363,7 +352,7 @@ public class Fc2Model extends AbstractModel {
// wait at max 10 seconds, otherwise we can assume, that the stream is not available
monitor.wait(TimeUnit.SECONDS.toMillis(20));
}
if(playlistUrl == null) {
if (playlistUrl == null) {
throw new IOException("No playlist response for 20 seconds");
}
}
@ -372,7 +361,7 @@ public class Fc2Model extends AbstractModel {
public void closeWebsocket() {
int websocketUsers = websocketUsage.decrementAndGet();
LOG.debug("{} objects using the websocket for {}", websocketUsers, this);
if(websocketUsers == 0) {
if (websocketUsers == 0) {
LOG.debug("Closing the websocket for {}", this);
ws.close(1000, "");
ws = null;
@ -380,7 +369,7 @@ public class Fc2Model extends AbstractModel {
}
@Override
public Download createDownload() {
public RecordingProcess createDownload() {
if (Config.getInstance().getSettings().useHlsdl) {
return new Fc2HlsdlDownload();
} else {

View File

@ -6,8 +6,9 @@ import com.squareup.moshi.JsonReader;
import com.squareup.moshi.JsonWriter;
import ctbrec.AbstractModel;
import ctbrec.Config;
import ctbrec.StringUtil;
import ctbrec.io.HttpException;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.recorder.download.StreamSource;
import okhttp3.Request;
import okhttp3.Response;
@ -16,7 +17,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import static ctbrec.io.HttpConstants.*;
@ -41,7 +45,7 @@ public class LiveJasminModel extends AbstractModel {
}
protected void loadModelInfo() throws IOException {
String url = "https://m." + LiveJasmin.baseDomain + "/en/chat-html5/" + getName();
String url = LiveJasmin.baseUrl + "/en/flash/get-performer-details/" + getName();
Request req = new Request.Builder().url(url)
.header(USER_AGENT,
"Mozilla/5.0 (iPhone; CPU OS 10_14 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/11.1.1 Mobile/14E304 Safari/605.1.15")
@ -54,39 +58,21 @@ public class LiveJasminModel extends AbstractModel {
if (response.isSuccessful()) {
String body = response.body().string();
JSONObject json = new JSONObject(body);
//LOG.debug(json.toString(2));
//Files.writeString(Path.of("/tmp/model.json"), json.toString(2));
if (json.optBoolean("success")) {
JSONObject data = json.getJSONObject("data");
JSONObject config = data.getJSONObject("config");
JSONObject chatRoom = config.getJSONObject("chatRoom");
JSONObject armageddonConfig = config.getJSONObject("armageddonConfig");
setId(chatRoom.getString("p_id"));
setName(chatRoom.getString("performer_id"));
setDisplayName(chatRoom.getString("display_name"));
if (chatRoom.has("profile_picture_url")) {
setPreview(chatRoom.getString("profile_picture_url"));
}
int status = chatRoom.optInt("status", -1);
onlineState = mapStatus(status);
if (chatRoom.optInt("is_on_private", 0) == 1) {
onlineState = State.PRIVATE;
}
if (chatRoom.optInt("is_video_call_enabled", 0) == 1) {
onlineState = State.PRIVATE;
}
resolution = new int[2];
resolution[0] = config.optInt("streamWidth");
resolution[1] = config.optInt("streamHeight");
modelInfo = new LiveJasminModelInfo.LiveJasminModelInfoBuilder()
.sbIp(chatRoom.getString("sb_ip"))
.sbHash(chatRoom.getString("sb_hash"))
.sessionId(armageddonConfig.getString("sessionid"))
.jsm2session(armageddonConfig.getString("jsm2session"))
.sbIp(data.optString("sb_ip", null))
.sbHash(data.optString("sb_hash", null))
.sessionId("m12345678901234567890123456789012")
.jsm2session(getSite().getHttpClient().getCookiesByName("session").get(0).value())
.performerId(getName())
.clientInstanceId(randomClientInstanceId())
.status(data.optInt("status", -1))
.build();
online = onlineState == State.ONLINE;
onlineState = mapStatus(modelInfo.getStatus());
online = onlineState == State.ONLINE
&& StringUtil.isNotBlank(modelInfo.getSbIp())
&& StringUtil.isNotBlank(modelInfo.getSbHash());
LOG.trace("{} - status:{} {} {} {} {}", getName(), online, onlineState, Arrays.toString(resolution), getUrl(), id);
} else {
throw new IOException("Response was not successful: " + body);
@ -107,15 +93,19 @@ public class LiveJasminModel extends AbstractModel {
public static State mapStatus(int status) {
switch (status) {
case 0:
case 0 -> {
return State.OFFLINE;
case 1:
}
case 1 -> {
return State.ONLINE;
case 2, 3:
}
case 2, 3 -> {
return State.PRIVATE;
default:
}
default -> {
LOG.debug("Unkown state {}", status);
return State.UNKNOWN;
}
}
}
@ -129,17 +119,16 @@ public class LiveJasminModel extends AbstractModel {
public List<StreamSource> getStreamSources() throws IOException, ExecutionException, ParseException, PlaylistException {
loadModelInfo();
String websocketUrlTemplate = "wss://dss-relay-{ipWithDashes}.dditscdn.com/?random={clientInstanceId}";
String websocketUrlTemplate = "wss://dss-relay-{ipWithDashes}.dditscdn.com/memberChat/jasmin{modelName}{sb_hash}?random={clientInstanceId}";
String websocketUrl = websocketUrlTemplate
.replace("{ipWithDashes}", modelInfo.getSbIp().replace('.', '-'))
.replace("{modelName}", getName())
.replace("{sb_hash}", modelInfo.getSbHash())
.replace("{clientInstanceId}", modelInfo.getClientInstanceId());
modelInfo.setWebsocketUrl(websocketUrl);
LiveJasminStreamRegistration liveJasminStreamRegistration = new LiveJasminStreamRegistration(site, modelInfo);
List<StreamSource> streamSources = liveJasminStreamRegistration.getStreamSources();
streamSources.stream().max(Comparator.naturalOrder()).ifPresent(ss -> {
new LiveJasminStreamStarter().start(site, modelInfo, (LiveJasminStreamSource) ss);
});
return streamSources;
}
@ -150,10 +139,6 @@ public class LiveJasminModel extends AbstractModel {
@Override
public void receiveTip(Double tokens) throws IOException {
// tips are send over the relay websocket, e.g:
// {"event":"call","funcName":"sendSurprise","data":[1,"SurpriseGirlFlower"]}
// response:
// {"event":"call","funcName":"startSurprise","userId":"xyz_hash_gibberish","data":[{"memberid":"userxyz","amount":1,"tipName":"SurpriseGirlFlower","err_desc":"OK","err_text":"OK"}]}
LiveJasminTippingWebSocket tippingSocket = new LiveJasminTippingWebSocket(site.getHttpClient());
try {
tippingSocket.sendTip(this, Config.getInstance(), tokens);
@ -252,11 +237,12 @@ public class LiveJasminModel extends AbstractModel {
}
@Override
public Download createDownload() {
if (Config.isServerMode() && !Config.getInstance().getSettings().recordSingleFile) {
return new LiveJasminHlsDownload(getSite().getHttpClient());
} else {
return new LiveJasminMergedHlsDownload(getSite().getHttpClient());
}
public RecordingProcess createDownload() {
return new LiveJasminWebrtcDownload(getSite().getHttpClient());
// if (Config.isServerMode() && !Config.getInstance().getSettings().recordSingleFile) {
// return new LiveJasminHlsDownload(getSite().getHttpClient());
// } else {
// return new LiveJasminMergedHlsDownload(getSite().getHttpClient());
// }
}
}

View File

@ -13,4 +13,5 @@ public class LiveJasminModelInfo {
private String jsm2session;
private String performerId;
private String clientInstanceId;
private int status;
}

View File

@ -3,6 +3,7 @@ package ctbrec.sites.jasmin;
import ctbrec.Config;
import ctbrec.recorder.download.StreamSource;
import ctbrec.sites.Site;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
@ -12,12 +13,9 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URLEncoder;
import java.util.LinkedList;
import java.util.List;
import java.util.*;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
@ -26,9 +24,9 @@ import java.util.concurrent.TimeoutException;
import static ctbrec.io.HttpConstants.USER_AGENT;
import static java.nio.charset.StandardCharsets.UTF_8;
@Slf4j
public class LiveJasminStreamRegistration {
private static final Logger LOG = LoggerFactory.getLogger(LiveJasminStreamRegistration.class);
private static final String KEY_EVENT = "event";
private static final String KEY_FUNC_NAME = "funcName";
@ -36,53 +34,55 @@ public class LiveJasminStreamRegistration {
private final LiveJasminModelInfo modelInfo;
private final CyclicBarrier barrier = new CyclicBarrier(2);
private int streamCount = 0;
public LiveJasminStreamRegistration(Site site, LiveJasminModelInfo modelInfo) {
this.site = site;
this.modelInfo = modelInfo;
}
List<StreamSource> getStreamSources() {
var streamSources = new LinkedList<StreamSource>();
var streamSources = new LinkedList<LiveJasminStreamSource>();
try {
Request webSocketRequest = new Request.Builder()
.url(modelInfo.getWebsocketUrl())
.addHeader(USER_AGENT, Config.getInstance().getSettings().httpUserAgentMobile)
.build();
LOG.debug("Websocket: {}", modelInfo.getWebsocketUrl());
log.debug("Websocket: {}", modelInfo.getWebsocketUrl());
site.getHttpClient().newWebSocket(webSocketRequest, new WebSocketListener() {
@Override
public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) {
LOG.debug("onOpen");
log.debug("onOpen");
JSONObject register = new JSONObject()
.put(KEY_EVENT, "register")
.put("applicationId", "memberChat/jasmin" + modelInfo.getPerformerId() + modelInfo.getSbHash())
.put("connectionData", new JSONObject()
.put("jasmin2App", false)
.put("isMobileClient", true)
.put("platform", "mobile")
.put("chatID", "freechat")
.put("sessionID", modelInfo.getSessionId())
.put("jasmin2App", true)
.put("isMobileClient", false)
.put("platform", "desktop")
.put("chatID", "freechat")
.put("jsm2SessionId", modelInfo.getJsm2session())
.put("userType", "user")
.put("performerId", modelInfo.getPerformerId())
.put("clientRevision", "")
.put("playerVer", "nanoPlayerVersion: 4.12.1 appCodeName: Mozilla appName: Netscape appVersion: 5.0 (iPad; CPU OS 10_14 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/11.1.1 Mobile/15E148 Safari/605.1.15 platform: iPad")
.put("livejasminTvmember", false)
.put("newApplet", true)
.put("livefeedtype", JSONObject.NULL)
.put("gravityCookieId", "")
.put("passparam", "")
.put("clientInstanceId", modelInfo.getClientInstanceId())
.put("armaVersion", "39.158.0")
.put("isPassive", false)
.put("brandID", "jasmin")
.put("cobrandId", "")
.put("cobrandId", "livejasmin")
.put("subbrand", "livejasmin")
.put("siteName", "LiveJasmin")
.put("siteUrl", "https://m." + LiveJasmin.baseDomain)
.put("chatHistoryRequired", false)
.put("siteUrl", "https://www.livejasmin.com")
.put("clientInstanceId", modelInfo.getClientInstanceId())
.put("armaVersion", "38.10.3-LIVEJASMIN-39585-1")
.put("isPassive", false)
.put("peekPatternJsm2", true)
.put("chatHistoryRequired", true)
);
log.trace("Stream registration\n{}", register.toString(2));
webSocket.send(register.toString());
webSocket.send(new JSONObject().put(KEY_EVENT, "ping").toString());
webSocket.send(new JSONObject()
@ -104,12 +104,11 @@ public class LiveJasminStreamRegistration {
.put(KEY_EVENT, "connectSharedObject")
.put("name", "data/chat_so")
.toString());
//webSocket.close(1000, "Good bye");
}
@Override
public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) {
LOG.error("onFailure", t);
log.error("onFailure", t);
awaitBarrier();
webSocket.close(1000, "");
}
@ -127,7 +126,7 @@ public class LiveJasminStreamRegistration {
webSocket.send(new JSONObject().put(KEY_EVENT, "ping").toString());
}).start();
} else if (message.optString(KEY_EVENT).equals("updateSharedObject") && message.optString("name").equals("data/chat_so")) {
LOG.trace(message.toString(2));
log.trace(message.toString(2));
JSONArray list = message.getJSONArray("list");
for (int i = 0; i < list.length(); i++) {
JSONObject attribute = list.getJSONObject(i);
@ -140,48 +139,135 @@ public class LiveJasminStreamRegistration {
JSONObject stream = streams.getJSONObject(j);
addStreamSource(streamSources, freePattern, stream);
}
webSocket.close(1000, "");
Collections.sort(streamSources);
Collections.reverse(streamSources);
for (LiveJasminStreamSource src : streamSources) {
JSONObject getVideoData = new JSONObject()
.put(KEY_EVENT, "call")
.put(KEY_FUNC_NAME, "getVideoData")
.put("data", new JSONArray()
.put(new JSONObject()
.put("protocols", new JSONArray()
.put("h5live")
)
.put("streamId", src.streamId)
.put("correlationId", UUID.randomUUID().toString().replace("-", "").substring(0, 16))
)
);
streamCount++;
webSocket.send(getVideoData.toString());
}
}
}
} else if (message.optString(KEY_FUNC_NAME).equals("setVideoData")) {
JSONObject data = message.getJSONArray("data").getJSONArray(0).getJSONObject(0);
String streamId = data.getString("streamId");
String wssUrl = data.getJSONObject("protocol").getJSONObject("h5live").getString("wssUrl");
streamSources.stream().filter(src -> Objects.equals(src.streamId, streamId)).findAny().ifPresent(src -> src.mediaPlaylistUrl = wssUrl);
if (--streamCount == 0) {
awaitBarrier();
}
} else if (!message.optString(KEY_FUNC_NAME).equals("chatHistory")) {
LOG.trace("onMessageT {}", new JSONObject(text).toString(2));
log.trace("onMessageT {}", new JSONObject(text).toString(2));
}
}
@Override
public void onMessage(@NotNull WebSocket webSocket, @NotNull ByteString bytes) {
LOG.trace("onMessageB");
log.trace("onMessageB");
super.onMessage(webSocket, bytes);
}
@Override
public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
LOG.debug("onClosed {} {}", code, reason);
log.debug("onClosed {} {}", code, reason);
super.onClosed(webSocket, code, reason);
}
@Override
public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
LOG.trace("onClosing {} {}", code, reason);
log.trace("onClosing {} {}", code, reason);
awaitBarrier();
}
});
LOG.debug("Waiting for websocket to return");
log.debug("Waiting for websocket to return");
awaitBarrier();
LOG.debug("Websocket is done. Stream sources {}", streamSources);
log.debug("Websocket is done. Stream sources {}", streamSources);
} catch (Exception e) {
LOG.error("Couldn't determine stream sources", e);
log.error("Couldn't determine stream sources", e);
}
return streamSources;
return streamSources.stream().map(StreamSource.class::cast).toList();
}
private void addStreamSource(LinkedList<StreamSource> streamSources, String pattern, JSONObject stream) {
public void keepStreamAlive() {
try {
Request webSocketRequest = new Request.Builder()
.url(modelInfo.getWebsocketUrl())
.addHeader(USER_AGENT, Config.getInstance().getSettings().httpUserAgentMobile)
.build();
log.debug("Websocket: {}", modelInfo.getWebsocketUrl());
site.getHttpClient().newWebSocket(webSocketRequest, new WebSocketListener() {
@Override
public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) {
log.debug("onOpen");
webSocket.send(new JSONObject().put(KEY_EVENT, "ping").toString());
}
@Override
public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) {
log.error("onFailure", t);
webSocket.close(1000, "");
}
@Override
public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
JSONObject message = new JSONObject(text);
if (message.opt(KEY_EVENT).equals("pong")) {
new Thread(() -> {
try {
Thread.sleep(message.optInt("nextPing"));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
webSocket.send(new JSONObject().put(KEY_EVENT, "ping").toString());
}).start();
}
}
@Override
public void onMessage(@NotNull WebSocket webSocket, @NotNull ByteString bytes) {
log.debug("onMessageB");
super.onMessage(webSocket, bytes);
}
@Override
public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
log.debug("onClosed {} {}", code, reason);
super.onClosed(webSocket, code, reason);
}
@Override
public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
log.debug("onClosing {} {}", code, reason);
awaitBarrier();
}
});
log.debug("Waiting for websocket to return");
awaitBarrier();
} catch (Exception e) {
log.error("Couldn't determine stream sources", e);
}
}
private void addStreamSource(LinkedList<LiveJasminStreamSource> streamSources, String pattern, JSONObject stream) {
int w = stream.getInt("width");
int h = stream.getInt("height");
int bitrate = stream.getInt("bitrate") * 1024;
String name = stream.getString("name");
String streamName = pattern.replace("{$streamname}", name);
String streamId = stream.getString("streamId");
String rtmpUrl = "rtmp://{ip}/memberChat/jasmin{modelName}{sb_hash}?sessionId-{sessionId}|clientInstanceId-{clientInstanceId}"
.replace("{ip}", modelInfo.getSbIp())
@ -202,6 +288,7 @@ public class LiveJasminStreamRegistration {
streamSource.bandwidth = bitrate;
streamSource.rtmpUrl = rtmpUrl;
streamSource.streamName = streamName;
streamSource.streamId = streamId;
streamSources.add(streamSource);
}
@ -210,9 +297,9 @@ public class LiveJasminStreamRegistration {
barrier.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error(e.getLocalizedMessage(), e);
log.error(e.getLocalizedMessage(), e);
} catch (TimeoutException | BrokenBarrierException e) {
LOG.error(e.getLocalizedMessage(), e);
log.error(e.getLocalizedMessage(), e);
}
}
}

View File

@ -5,4 +5,5 @@ import ctbrec.recorder.download.StreamSource;
public class LiveJasminStreamSource extends StreamSource {
public String rtmpUrl;
public String streamName;
public String streamId;
}

View File

@ -0,0 +1,221 @@
package ctbrec.sites.jasmin;
import com.iheartradio.m3u8.ParseException;
import com.iheartradio.m3u8.PlaylistException;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.Recording;
import ctbrec.io.BandwidthMeter;
import ctbrec.io.HttpClient;
import ctbrec.recorder.download.AbstractDownload;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.sites.showup.Showup;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;
import static ctbrec.io.HttpConstants.*;
public class LiveJasminWebrtcDownload extends AbstractDownload {
private static final Logger LOG = LoggerFactory.getLogger(LiveJasminWebrtcDownload.class);
private static final int MAX_SECONDS_WITHOUT_TRANSFER = 20;
private final HttpClient httpClient;
private WebSocket ws;
private FileOutputStream fout;
private Instant timeOfLastTransfer = Instant.MAX;
private volatile boolean running;
private volatile boolean started;
private File targetFile;
public LiveJasminWebrtcDownload(HttpClient httpClient) {
this.httpClient = httpClient;
}
@Override
public void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException {
this.config = config;
this.model = model;
this.startTime = startTime;
this.downloadExecutor = executorService;
splittingStrategy = initSplittingStrategy(config.getSettings());
targetFile = config.getFileForRecording(model, "mp4", startTime);
timeOfLastTransfer = Instant.now();
}
@Override
public void stop() {
running = false;
if (ws != null) {
ws.close(1000, "");
ws = null;
}
}
@Override
public void finalizeDownload() {
if (fout != null) {
try {
LOG.debug("Closing recording file {}", targetFile);
fout.close();
} catch (IOException e) {
LOG.error("Error while closing recording file {}", targetFile, e);
}
}
}
@Override
public boolean isRunning() {
return running;
}
@Override
public void postProcess(Recording recording) {
// nothing to do
}
@Override
public File getTarget() {
return targetFile;
}
@Override
public String getPath(Model model) {
String absolutePath = targetFile.getAbsolutePath();
String recordingsDir = Config.getInstance().getSettings().recordingsDir;
String relativePath = absolutePath.replaceFirst(Pattern.quote(recordingsDir), "");
return relativePath;
}
@Override
public boolean isSingleFile() {
return true;
}
@Override
public long getSizeInByte() {
return getTarget().length();
}
@Override
public RecordingProcess call() throws Exception {
if (!started) {
started = true;
startDownload();
}
if (splittingStrategy.splitNecessary(this)) {
stop();
rescheduleTime = Instant.now();
} else {
rescheduleTime = Instant.now().plusSeconds(5);
}
if (!model.isOnline(true)) {
stop();
}
if (Duration.between(timeOfLastTransfer, Instant.now()).getSeconds() > MAX_SECONDS_WITHOUT_TRANSFER) {
LOG.info("No video data received for {} seconds. Stopping recording for model {}", MAX_SECONDS_WITHOUT_TRANSFER, model);
stop();
}
return this;
}
private void startDownload() throws IOException, PlaylistException, ParseException, ExecutionException {
LiveJasminModel liveJasminModel = (LiveJasminModel) model;
Request request = new Request.Builder()
.url(liveJasminModel.getStreamSources().get(0).getMediaPlaylistUrl())
.header(USER_AGENT, Config.getInstance().getSettings().httpUserAgent)
.header(ACCEPT, "*/*")
.header(ACCEPT_LANGUAGE, "pl")
.header(ORIGIN, Showup.BASE_URL)
.build();
running = true;
LOG.debug("Opening webrtc connection {}", request.url());
ws = httpClient.newWebSocket(request, new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
super.onOpen(webSocket, response);
LOG.trace("onOpen {} {}", webSocket, response);
response.close();
try {
LOG.trace("Recording video stream to {}", targetFile);
Files.createDirectories(targetFile.getParentFile().toPath());
fout = new FileOutputStream(targetFile);
} catch (Exception e) {
LOG.error("Couldn't open file {} to save the video stream", targetFile, e);
stop();
}
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
super.onMessage(webSocket, bytes);
LOG.trace("received video data with length {}", bytes.size());
timeOfLastTransfer = Instant.now();
try {
byte[] videoData = bytes.toByteArray();
fout.write(videoData);
BandwidthMeter.add(videoData.length);
} catch (IOException e) {
if (running) {
LOG.error("Couldn't write video stream to file", e);
stop();
}
}
}
@Override
public void onMessage(WebSocket webSocket, String text) {
super.onMessage(webSocket, text);
LOG.trace("onMessageT {} {}", webSocket, text);
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
super.onFailure(webSocket, t, response);
stop();
if (t instanceof EOFException) {
LOG.info("End of stream detected for model {}", model);
} else {
LOG.error("Websocket failure for model {} {}", model, response, t);
}
if (response != null) {
response.close();
}
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
super.onClosing(webSocket, code, reason);
LOG.trace("Websocket closing for model {} {} {}", model, code, reason);
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
super.onClosed(webSocket, code, reason);
LOG.debug("Websocket closed for model {} {} {}", model, code, reason);
stop();
}
});
}
}

View File

@ -8,7 +8,7 @@ import com.squareup.moshi.JsonReader;
import com.squareup.moshi.JsonWriter;
import ctbrec.*;
import ctbrec.io.HttpException;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.recorder.download.StreamSource;
import ctbrec.sites.ModelOfflineException;
import lombok.extern.slf4j.Slf4j;
@ -223,7 +223,7 @@ public class MVLiveModel extends AbstractModel {
}
@Override
public Download createDownload() {
public RecordingProcess createDownload() {
if (Config.isServerMode() && !Config.getInstance().getSettings().recordSingleFile) {
return new MVLiveHlsDownload(getHttpClient());
} else {

View File

@ -8,9 +8,9 @@ import ctbrec.AbstractModel;
import ctbrec.Config;
import ctbrec.io.HtmlParser;
import ctbrec.io.HttpException;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.HttpHeaderFactory;
import ctbrec.recorder.download.HttpHeaderFactoryImpl;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.recorder.download.StreamSource;
import okhttp3.FormBody;
import okhttp3.Request;
@ -335,7 +335,7 @@ public class MyFreeCamsModel extends AbstractModel {
}
@Override
public Download createDownload() {
public RecordingProcess createDownload() {
if (streamUrl == null) {
updateStreamUrl();
}

View File

@ -6,7 +6,7 @@ import ctbrec.AbstractModel;
import ctbrec.Config;
import ctbrec.io.HtmlParser;
import ctbrec.io.HttpException;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.recorder.download.StreamSource;
import okhttp3.HttpUrl;
import okhttp3.Request;
@ -192,7 +192,7 @@ public class SecretFriendsModel extends AbstractModel {
}
@Override
public Download createDownload() {
public RecordingProcess createDownload() {
return new SecretFriendsWebrtcDownload(getSite().getHttpClient());
}
}

View File

@ -6,7 +6,7 @@ import ctbrec.Recording;
import ctbrec.io.BandwidthMeter;
import ctbrec.io.HttpClient;
import ctbrec.recorder.download.AbstractDownload;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.RecordingProcess;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
@ -86,7 +86,7 @@ public class SecretFriendsWebrtcDownload extends AbstractDownload {
}
@Override
public void postprocess(Recording recording) {
public void postProcess(Recording recording) {
// nothing to do
}
@ -114,7 +114,7 @@ public class SecretFriendsWebrtcDownload extends AbstractDownload {
}
@Override
public Download call() throws Exception {
public RecordingProcess call() throws Exception {
if (!started) {
started = true;
startDownload();

View File

@ -1,31 +1,25 @@
package ctbrec.sites.showup;
import static ctbrec.Model.State.*;
import static ctbrec.io.HttpConstants.*;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import javax.xml.bind.JAXBException;
import com.google.common.base.Objects;
import com.iheartradio.m3u8.ParseException;
import com.iheartradio.m3u8.PlaylistException;
import ctbrec.AbstractModel;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.HttpHeaderFactory;
import ctbrec.recorder.download.HttpHeaderFactoryImpl;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.recorder.download.StreamSource;
import javax.xml.bind.JAXBException;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.*;
import java.util.concurrent.ExecutionException;
import static ctbrec.Model.State.*;
import static ctbrec.io.HttpConstants.*;
public class ShowupModel extends AbstractModel {
private String uid;
@ -63,7 +57,7 @@ public class ShowupModel extends AbstractModel {
src.width = 480;
src.height = 360;
if(streamId == null || streamTranscoderAddr == null) {
if (streamId == null || streamTranscoderAddr == null) {
List<Model> modelList = getShowupSite().getModelList();
for (Model model : modelList) {
ShowupModel m = (ShowupModel) model;
@ -138,7 +132,7 @@ public class ShowupModel extends AbstractModel {
}
@Override
public Download createDownload() {
public RecordingProcess createDownload() {
return new ShowupWebrtcDownload(getSite().getHttpClient());
}
@ -160,7 +154,7 @@ public class ShowupModel extends AbstractModel {
}
public String getWebRtcUrl() throws IOException {
if(streamId == null || streamTranscoderAddr == null) {
if (streamId == null || streamTranscoderAddr == null) {
List<Model> modelList = getShowupSite().getModelList();
for (Model model : modelList) {
ShowupModel m = (ShowupModel) model;

View File

@ -6,7 +6,7 @@ import ctbrec.Recording;
import ctbrec.io.BandwidthMeter;
import ctbrec.io.HttpClient;
import ctbrec.recorder.download.AbstractDownload;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.RecordingProcess;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
@ -85,7 +85,7 @@ public class ShowupWebrtcDownload extends AbstractDownload {
}
@Override
public void postprocess(Recording recording) {
public void postProcess(Recording recording) {
// nothing to do
}
@ -113,7 +113,7 @@ public class ShowupWebrtcDownload extends AbstractDownload {
}
@Override
public Download call() throws Exception {
public RecordingProcess call() throws Exception {
if (!started) {
started = true;
startDownload();
@ -192,7 +192,7 @@ public class ShowupWebrtcDownload extends AbstractDownload {
if (t instanceof EOFException) {
LOG.info("End of stream detected for model {}", model);
} else {
LOG.error("Websocket failure for model {} {} {}", model, response, t);
LOG.error("Websocket failure for model {} {}", model, response, t);
}
if (response != null) {
response.close();

View File

@ -7,7 +7,7 @@ import com.iheartradio.m3u8.data.PlaylistData;
import ctbrec.AbstractModel;
import ctbrec.Config;
import ctbrec.io.HttpException;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.recorder.download.StreamSource;
import ctbrec.recorder.download.hls.HlsdlDownload;
import ctbrec.recorder.download.hls.MergedFfmpegHlsDownload;
@ -264,7 +264,7 @@ public class StripchatModel extends AbstractModel {
}
@Override
public Download createDownload() {
public RecordingProcess createDownload() {
if (Config.getInstance().getSettings().useHlsdl) {
return new HlsdlDownload();
} else {

View File

@ -1,14 +1,12 @@
package ctbrec.recorder;
import ctbrec.*;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.RecordingProcess;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.time.LocalTime;
import java.time.temporal.ChronoUnit;
@ -22,7 +20,7 @@ import static org.mockito.Mockito.*;
class RecordingPreconditionsTest {
private Config config;
private Settings settings = new Settings();
private final Settings settings = new Settings();
@BeforeEach
void setup() {
@ -34,8 +32,8 @@ class RecordingPreconditionsTest {
}
@Test
void testRecorderNotInRecordingMode() throws InvalidKeyException, NoSuchAlgorithmException, IOException {
NextGenLocalRecorder recorder = mock(NextGenLocalRecorder.class);
void testRecorderNotInRecordingMode() {
var recorder = mock(SimplifiedLocalRecorder.class);
Model model = mock(Model.class);
RecordingPreconditions preconditions = new RecordingPreconditions(recorder, config);
@ -44,11 +42,11 @@ class RecordingPreconditionsTest {
}
@Test
void testModelIsSuspended() throws InvalidKeyException, NoSuchAlgorithmException, IOException {
NextGenLocalRecorder recorder = mock(NextGenLocalRecorder.class);
void testModelIsSuspended() {
var recorder = mock(SimplifiedLocalRecorder.class);
Model model = mock(Model.class);
when(recorder.isRecording()).thenReturn(true);
when(recorder.isRunning()).thenReturn(true);
when(model.isSuspended()).thenReturn(true);
when(model.toString()).thenReturn("Mockita Boobilicious");
@ -58,11 +56,11 @@ class RecordingPreconditionsTest {
}
@Test
void testModelMarkedForLaterRecording() throws InvalidKeyException, NoSuchAlgorithmException, IOException {
NextGenLocalRecorder recorder = mock(NextGenLocalRecorder.class);
void testModelMarkedForLaterRecording() {
var recorder = mock(SimplifiedLocalRecorder.class);
Model model = mock(Model.class);
when(recorder.isRecording()).thenReturn(true);
when(recorder.isRunning()).thenReturn(true);
when(model.isMarkedForLaterRecording()).thenReturn(true);
when(model.toString()).thenReturn("Mockita Boobilicious");
@ -72,11 +70,11 @@ class RecordingPreconditionsTest {
}
@Test
void testRecordUntil() throws InvalidKeyException, NoSuchAlgorithmException, IOException {
NextGenLocalRecorder recorder = mock(NextGenLocalRecorder.class);
void testRecordUntil() {
var recorder = mock(SimplifiedLocalRecorder.class);
Model model = mock(Model.class);
when(recorder.isRecording()).thenReturn(true);
when(recorder.isRunning()).thenReturn(true);
when(model.getRecordUntil()).thenReturn(Instant.now().minus(1, HOURS));
when(model.toString()).thenReturn("Mockita Boobilicious");
@ -86,11 +84,11 @@ class RecordingPreconditionsTest {
}
@Test
void testRecordingAlreadyRunning() throws InvalidKeyException, NoSuchAlgorithmException, IOException {
NextGenLocalRecorder recorder = mock(NextGenLocalRecorder.class);
void testRecordingAlreadyRunning() {
var recorder = mock(SimplifiedLocalRecorder.class);
Model model = mock(Model.class);
when(recorder.isRecording()).thenReturn(true);
when(recorder.isRunning()).thenReturn(true);
Map<Model, Recording> recordingProcesses = new HashMap<>();
recordingProcesses.put(model, new Recording());
when(recorder.getRecordingProcesses()).thenReturn(recordingProcesses);
@ -103,11 +101,11 @@ class RecordingPreconditionsTest {
}
@Test
void testModelShouldBeRecorded() throws InvalidKeyException, NoSuchAlgorithmException, IOException, ExecutionException, InterruptedException {
NextGenLocalRecorder recorder = mock(NextGenLocalRecorder.class);
void testModelShouldBeRecorded() throws IOException, ExecutionException, InterruptedException {
var recorder = mock(SimplifiedLocalRecorder.class);
Model model = mock(Model.class);
when(recorder.isRecording()).thenReturn(true);
when(recorder.isRunning()).thenReturn(true);
List<Model> modelsToRecord = new LinkedList<>();
when(recorder.getModels()).thenReturn(modelsToRecord);
when(model.getRecordUntil()).thenReturn(Instant.MAX);
@ -120,25 +118,25 @@ class RecordingPreconditionsTest {
modelsToRecord.add(model);
reset(recorder);
when(recorder.isRecording()).thenReturn(true);
when(recorder.isRunning()).thenReturn(true);
when(recorder.getModels()).thenReturn(modelsToRecord);
when(recorder.enoughSpaceForRecording()).thenReturn(true);
when(recorder.notEnoughSpaceForRecording()).thenReturn(false);
assertDoesNotThrow(() -> preconditions.check(model));
}
@Test
void testEnoughSpace() throws InvalidKeyException, NoSuchAlgorithmException, IOException, ExecutionException, InterruptedException {
void testEnoughSpace() throws IOException, ExecutionException, InterruptedException {
Model model = mock(Model.class);
when(model.getRecordUntil()).thenReturn(Instant.MAX);
when(model.toString()).thenReturn("Mockita Boobilicious");
when(model.isOnline(true)).thenReturn(true);
NextGenLocalRecorder recorder = mock(NextGenLocalRecorder.class);
var recorder = mock(SimplifiedLocalRecorder.class);
List<Model> modelsToRecord = new LinkedList<>();
modelsToRecord.add(model);
when(recorder.isRecording()).thenReturn(true);
when(recorder.isRunning()).thenReturn(true);
when(recorder.getModels()).thenReturn(modelsToRecord);
when(recorder.enoughSpaceForRecording()).thenReturn(false);
when(recorder.notEnoughSpaceForRecording()).thenReturn(true);
RecordingPreconditions preconditions = new RecordingPreconditions(recorder, config);
PreconditionNotMetException ex = assertThrows(PreconditionNotMetException.class, () -> preconditions.check(model));
@ -146,7 +144,7 @@ class RecordingPreconditionsTest {
}
@Test
void testNoOtherFromGroupIsRecording() throws InvalidKeyException, NoSuchAlgorithmException, IOException, ExecutionException, InterruptedException {
void testNoOtherFromGroupIsRecording() throws IOException, ExecutionException, InterruptedException {
Model mockita = mock(Model.class);
when(mockita.getRecordUntil()).thenReturn(Instant.MAX);
when(mockita.toString()).thenReturn("Mockita Boobilicious");
@ -163,14 +161,14 @@ class RecordingPreconditionsTest {
group.add(theOtherOne);
group.add(mockita);
NextGenLocalRecorder recorder = mock(NextGenLocalRecorder.class);
var recorder = mock(SimplifiedLocalRecorder.class);
List<Model> modelsToRecord = new LinkedList<>();
settings.models = modelsToRecord;
modelsToRecord.add(theOtherOne);
modelsToRecord.add(mockita);
when(recorder.isRecording()).thenReturn(true);
when(recorder.isRunning()).thenReturn(true);
when(recorder.getModels()).thenReturn(modelsToRecord);
when(recorder.enoughSpaceForRecording()).thenReturn(true);
when(recorder.notEnoughSpaceForRecording()).thenReturn(false);
when(recorder.getModelGroup(theOtherOne)).thenReturn(Optional.of(group));
when(recorder.getModelGroup(mockita)).thenReturn(Optional.of(group));
Map<Model, Recording> recordingProcesses = new HashMap<>();
@ -184,20 +182,20 @@ class RecordingPreconditionsTest {
}
@Test
void testModelIsOffline() throws InvalidKeyException, NoSuchAlgorithmException, IOException, ExecutionException, InterruptedException {
void testModelIsOffline() throws IOException, ExecutionException, InterruptedException {
Model mockita = mock(Model.class);
when(mockita.getRecordUntil()).thenReturn(Instant.MAX);
when(mockita.getName()).thenReturn("Mockita Boobilicious");
when(mockita.toString()).thenReturn("Mockita Boobilicious");
when(mockita.isOnline(true)).thenReturn(false);
NextGenLocalRecorder recorder = mock(NextGenLocalRecorder.class);
var recorder = mock(SimplifiedLocalRecorder.class);
List<Model> modelsToRecord = new LinkedList<>();
settings.models = modelsToRecord;
modelsToRecord.add(mockita);
when(recorder.isRecording()).thenReturn(true);
when(recorder.isRunning()).thenReturn(true);
when(recorder.getModels()).thenReturn(modelsToRecord);
when(recorder.enoughSpaceForRecording()).thenReturn(true);
when(recorder.notEnoughSpaceForRecording()).thenReturn(false);
RecordingPreconditions preconditions = new RecordingPreconditions(recorder, config);
PreconditionNotMetException ex = assertThrows(PreconditionNotMetException.class, () -> preconditions.check(mockita));
@ -205,19 +203,19 @@ class RecordingPreconditionsTest {
}
@Test
void testModelIsOnlineWithExpection() throws InvalidKeyException, NoSuchAlgorithmException, IOException, ExecutionException, InterruptedException {
void testModelIsOnlineWithExpection() throws IOException, ExecutionException, InterruptedException {
Model mockita = mock(Model.class);
when(mockita.isOnline(true)).thenThrow(new IOException("Service unavailable"));
when(mockita.getRecordUntil()).thenReturn(Instant.MAX);
when(mockita.getName()).thenReturn("Mockita Boobilicious");
NextGenLocalRecorder recorder = mock(NextGenLocalRecorder.class);
var recorder = mock(SimplifiedLocalRecorder.class);
List<Model> modelsToRecord = new LinkedList<>();
settings.models = modelsToRecord;
modelsToRecord.add(mockita);
when(recorder.isRecording()).thenReturn(true);
when(recorder.isRunning()).thenReturn(true);
when(recorder.getModels()).thenReturn(modelsToRecord);
when(recorder.enoughSpaceForRecording()).thenReturn(true);
when(recorder.notEnoughSpaceForRecording()).thenReturn(false);
RecordingPreconditions preconditions = new RecordingPreconditions(recorder, config);
PreconditionNotMetException ex = assertThrows(PreconditionNotMetException.class, () -> preconditions.check(mockita));
@ -232,7 +230,7 @@ class RecordingPreconditionsTest {
}
@Test
void testDownloadSlotsExhausted() throws InvalidKeyException, NoSuchAlgorithmException, IOException, ExecutionException, InterruptedException {
void testDownloadSlotsExhausted() throws IOException, ExecutionException, InterruptedException {
settings.concurrentRecordings = 1;
Model mockita = mock(Model.class);
@ -246,13 +244,13 @@ class RecordingPreconditionsTest {
when(theOtherOne.isOnline(true)).thenReturn(true);
when(theOtherOne.getUrl()).thenReturn("http://localhost/theOtherOne");
NextGenLocalRecorder recorder = mock(NextGenLocalRecorder.class);
var recorder = mock(SimplifiedLocalRecorder.class);
List<Model> modelsToRecord = new LinkedList<>();
settings.models = modelsToRecord;
modelsToRecord.add(mockita);
when(recorder.isRecording()).thenReturn(true);
when(recorder.isRunning()).thenReturn(true);
when(recorder.getModels()).thenReturn(modelsToRecord);
when(recorder.enoughSpaceForRecording()).thenReturn(true);
when(recorder.notEnoughSpaceForRecording()).thenReturn(false);
Map<Model, Recording> recordingProcesses = new HashMap<>();
recordingProcesses.put(theOtherOne, new Recording());
when(recorder.getRecordingProcesses()).thenReturn(recordingProcesses);
@ -274,7 +272,7 @@ class RecordingPreconditionsTest {
}
@Test
void testDownloadSlotFreedUp() throws InvalidKeyException, NoSuchAlgorithmException, IOException, ExecutionException, InterruptedException {
void testDownloadSlotFreedUp() throws IOException, ExecutionException, InterruptedException {
settings.concurrentRecordings = 1;
Model mockita = mock(Model.class);
@ -283,13 +281,13 @@ class RecordingPreconditionsTest {
when(mockita.getName()).thenReturn("Mockita Boobilicious");
when(mockita.getPriority()).thenReturn(100);
NextGenLocalRecorder recorder = mock(NextGenLocalRecorder.class);
var recorder = mock(SimplifiedLocalRecorder.class);
List<Model> modelsToRecord = new LinkedList<>();
settings.models = modelsToRecord;
modelsToRecord.add(mockita);
when(recorder.isRecording()).thenReturn(true);
when(recorder.isRunning()).thenReturn(true);
when(recorder.getModels()).thenReturn(modelsToRecord);
when(recorder.enoughSpaceForRecording()).thenReturn(true);
when(recorder.notEnoughSpaceForRecording()).thenReturn(false);
Map<Model, Recording> recordingProcesses = new HashMap<>();
when(recorder.getRecordingProcesses()).thenReturn(recordingProcesses);
@ -315,21 +313,21 @@ class RecordingPreconditionsTest {
}
@Test
void testNotInTimeoutPeriod() throws InvalidKeyException, NoSuchAlgorithmException, IOException, ExecutionException, InterruptedException {
void testNotInTimeoutPeriod() throws IOException, ExecutionException, InterruptedException {
Model mockita = mock(Model.class);
when(mockita.isOnline(true)).thenReturn(true);
when(mockita.getRecordUntil()).thenReturn(Instant.MAX);
when(mockita.getName()).thenReturn("Mockita Boobilicious");
when(mockita.getPriority()).thenReturn(100);
NextGenLocalRecorder recorder = mock(NextGenLocalRecorder.class);
var recorder = mock(SimplifiedLocalRecorder.class);
List<Model> modelsToRecord = new LinkedList<>();
settings.models = modelsToRecord;
settings.timeoutRecordingStartingAt = LocalTime.now().minusHours(1).truncatedTo(ChronoUnit.MINUTES);
settings.timeoutRecordingEndingAt = LocalTime.now().plusHours(1).truncatedTo(ChronoUnit.MINUTES);
modelsToRecord.add(mockita);
when(recorder.isRecording()).thenReturn(true);
when(recorder.isRunning()).thenReturn(true);
when(recorder.getModels()).thenReturn(modelsToRecord);
when(recorder.enoughSpaceForRecording()).thenReturn(true);
when(recorder.notEnoughSpaceForRecording()).thenReturn(false);
Map<Model, Recording> recordingProcesses = new HashMap<>();
when(recorder.getRecordingProcesses()).thenReturn(recordingProcesses);
@ -369,10 +367,10 @@ class RecordingPreconditionsTest {
}
private Recording mockRecordingProcess(Model model) {
Download download = mock(Download.class);
RecordingProcess download = mock(RecordingProcess.class);
when(download.getModel()).thenReturn(model);
Recording runningRecording = mock(Recording.class);
when(runningRecording.getDownload()).thenReturn(download);
when(runningRecording.getRecordingProcess()).thenReturn(download);
return runningRecording;
}
}

View File

@ -1,22 +1,21 @@
package ctbrec.recorder.postprocessing;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import ctbrec.Config;
import ctbrec.Recording;
import ctbrec.recorder.RecordingManager;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.recorder.download.VideoLengthDetector;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import ctbrec.Config;
import ctbrec.Recording;
import ctbrec.recorder.RecordingManager;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.VideoLengthDetector;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
class DeleteTooShortTest extends AbstractPpTest {
@ -91,14 +90,14 @@ class DeleteTooShortTest extends AbstractPpTest {
}
private Recording createRec(File original) {
Download download = mock(Download.class);
RecordingProcess download = mock(RecordingProcess.class);
Recording rec = new Recording();
rec.setModel(mockModel());
rec.setAbsoluteFile(original);
rec.setPostProcessedFile(original);
rec.setStartDate(now);
rec.setSingleFile(true);
rec.setDownload(download);
rec.setRecordingProcess(download);
return rec;
}
}

View File

@ -8,9 +8,9 @@ import ctbrec.event.EventBusHolder;
import ctbrec.event.EventHandler;
import ctbrec.event.EventHandlerConfiguration;
import ctbrec.image.LocalPortraitStore;
import ctbrec.recorder.NextGenLocalRecorder;
import ctbrec.recorder.OnlineMonitor;
import ctbrec.recorder.Recorder;
import ctbrec.recorder.SimplifiedLocalRecorder;
import ctbrec.servlet.StaticFileServlet;
import ctbrec.sites.Site;
import ctbrec.sites.amateurtv.AmateurTv;
@ -89,7 +89,7 @@ public class HttpServer {
config = Config.getInstance();
LOG.info("HMAC authentication is {}", config.getSettings().key != null ? "enabled" : "disabled");
recorder = new NextGenLocalRecorder(config, sites);
recorder = new SimplifiedLocalRecorder(config, sites);
initSites();
onlineMonitor = new OnlineMonitor(recorder, config);