forked from j62/ctbrec
1
0
Fork 0

Rewrite recording code for local recording

Remote recording is TBD
This commit is contained in:
0xboobface 2019-05-31 20:00:07 +02:00
parent f7ac97c683
commit 0f3d0b6337
34 changed files with 1568 additions and 1110 deletions

View File

@ -1,9 +1,10 @@
1.21.2
========================
* Fix: Downloads not working in client/server setup
* Fix: Playlist generation and post-processing was not executed on server
shutdown
* Fix: Post-Processing for split recordings
* Added split recordings for the server
* Fix: Downloads not working in client/server setup (regression in last version)
* Fix: post-processing for split recordings
* Fix: All recordings are finished properly on shutdown (with playlist
generation on the server and post-processing)
1.21.1
========================

View File

@ -32,7 +32,7 @@ import ctbrec.event.EventBusHolder;
import ctbrec.event.EventHandler;
import ctbrec.event.EventHandlerConfiguration;
import ctbrec.io.HttpClient;
import ctbrec.recorder.LocalRecorder;
import ctbrec.recorder.NextGenLocalRecorder;
import ctbrec.recorder.OnlineMonitor;
import ctbrec.recorder.Recorder;
import ctbrec.recorder.RemoteRecorder;
@ -174,6 +174,7 @@ public class CamrecApplication extends Application {
primaryStage.setMaximized(Config.getInstance().getSettings().windowMaximized);
primaryStage.maximizedProperty()
.addListener((observable, oldVal, newVal) -> Config.getInstance().getSettings().windowMaximized = newVal.booleanValue());
Player.scene = primaryStage.getScene();
primaryStage.setX(Config.getInstance().getSettings().windowX);
primaryStage.setY(Config.getInstance().getSettings().windowY);
primaryStage.xProperty().addListener((observable, oldVal, newVal) -> Config.getInstance().getSettings().windowX = newVal.intValue());
@ -318,7 +319,16 @@ public class CamrecApplication extends Application {
private void createRecorder() {
if (config.getSettings().localRecording) {
recorder = new LocalRecorder(config);
//recorder = new LocalRecorder(config);
try {
recorder = new NextGenLocalRecorder(config, sites);
} catch (IOException e) {
LOG.error("Couldn't initialize recorder", e);
Alert alert = new AutosizeAlert(Alert.AlertType.ERROR, primaryStage.getScene());
alert.setTitle("Whoopsie");
alert.setContentText("Couldn't initialize recorder: " + e.getLocalizedMessage());
alert.showAndWait();
}
} else {
recorder = new RemoteRecorder(config, httpClient, sites);
}

View File

@ -3,6 +3,7 @@ package ctbrec.ui;
import java.time.Instant;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.Recording;
import javafx.beans.property.LongProperty;
import javafx.beans.property.SimpleLongProperty;
@ -20,16 +21,19 @@ public class JavaFxRecording extends Recording {
public JavaFxRecording(Recording recording) {
this.delegate = recording;
setStatus(recording.getStatus());
setSizeInByte(recording.getSizeInByte());
setProgress(recording.getProgress());
}
@Override
public String getModelName() {
return delegate.getModelName();
public Model getModel() {
return delegate.getModel();
}
@Override
public void setModelName(String modelName) {
delegate.setModelName(modelName);
public void setModel(Model model) {
delegate.setModel(model);
}
@Override
@ -73,7 +77,20 @@ public class JavaFxRecording extends Recording {
case STOPPED:
statusProperty.set("stopped");
break;
case DELETED:
statusProperty.set("deleted");
break;
case DELETING:
statusProperty.set("deleting");
break;
case FAILED:
statusProperty.set("failed");
break;
case WAITING:
statusProperty.set("waiting");
break;
case UNKNOWN:
default:
statusProperty.set("unknown");
break;
}
@ -142,16 +159,6 @@ public class JavaFxRecording extends Recording {
delegate.setPath(path);
}
@Override
public boolean hasPlaylist() {
return delegate.hasPlaylist();
}
@Override
public void setHasPlaylist(boolean hasPlaylist) {
delegate.setHasPlaylist(hasPlaylist);
}
@Override
public long getSizeInByte() {
return delegate.getSizeInByte();
@ -161,6 +168,16 @@ public class JavaFxRecording extends Recording {
return sizeProperty;
}
@Override
public void setMetaDataFile(String metaDataFile) {
delegate.setMetaDataFile(metaDataFile);
}
@Override
public String getMetaDataFile() {
return delegate.getMetaDataFile();
}
public boolean valueChanged() {
boolean changed = getSizeInByte() != lastValue;
lastValue = getSizeInByte();

View File

@ -16,12 +16,15 @@ import ctbrec.Recording;
import ctbrec.io.DevNull;
import ctbrec.io.StreamRedirectThread;
import ctbrec.recorder.download.StreamSource;
import ctbrec.ui.controls.Dialogs;
import javafx.application.Platform;
import javafx.scene.Scene;
import javafx.scene.control.Alert;
public class Player {
private static final transient Logger LOG = LoggerFactory.getLogger(Player.class);
private static PlayerThread playerThread;
public static Scene scene;
public static boolean play(String url) {
return play(url, true);
@ -165,6 +168,7 @@ public class Player {
LOG.debug("Media player finished.");
} catch (Exception e) {
LOG.error("Error in player thread", e);
Dialogs.showError(scene, "Playback failed", "Couldn't start playback", e);
}
running = false;
}

View File

@ -455,7 +455,7 @@ public class RecordedModelsTab extends Tab implements TabSelectionListener {
.peek(fxm -> {
for (Recording recording : recordings) {
if(recording.getStatus() == Recording.State.RECORDING &&
recording.getModelName().equals(fxm.getSanitizedNamed()))
recording.getModel().getName().equals(fxm.getSanitizedNamed()))
{
fxm.getRecordingProperty().set(true);
break;
@ -616,7 +616,10 @@ public class RecordedModelsTab extends Tab implements TabSelectionListener {
private void stopAction(List<JavaFxModel> selectedModels) {
List<Model> models = selectedModels.stream().map(jfxm -> jfxm.getDelegate()).collect(Collectors.toList());
new StopRecordingAction(getTabPane(), models, recorder).execute((m) -> {
observableModels.remove(m);
Platform.runLater(() -> {
table.getSelectionModel().clearSelection(table.getItems().indexOf(m));
table.getItems().remove(m);
});
});
};

View File

@ -43,6 +43,7 @@ import ctbrec.sites.Site;
import ctbrec.ui.controls.Toast;
import javafx.application.Platform;
import javafx.beans.property.SimpleObjectProperty;
import javafx.beans.property.SimpleStringProperty;
import javafx.collections.FXCollections;
import javafx.collections.ObservableList;
import javafx.concurrent.ScheduledService;
@ -63,7 +64,6 @@ import javafx.scene.control.TableColumn;
import javafx.scene.control.TableColumn.SortType;
import javafx.scene.control.TableView;
import javafx.scene.control.Tooltip;
import javafx.scene.control.cell.PropertyValueFactory;
import javafx.scene.input.ContextMenuEvent;
import javafx.scene.input.KeyCode;
import javafx.scene.input.KeyEvent;
@ -123,7 +123,7 @@ public class RecordingsTab extends Tab implements TabSelectionListener {
table.getSelectionModel().setSelectionMode(SelectionMode.MULTIPLE);
TableColumn<JavaFxRecording, String> name = new TableColumn<>("Model");
name.setPrefWidth(200);
name.setCellValueFactory(new PropertyValueFactory<JavaFxRecording, String>("modelName"));
name.setCellValueFactory(cdf -> new SimpleStringProperty(cdf.getValue().getModel().getName()));
TableColumn<JavaFxRecording, Instant> date = new TableColumn<>("Date");
date.setCellValueFactory((cdf) -> {
Instant instant = cdf.getValue().getStartDate();
@ -429,23 +429,9 @@ public class RecordingsTab extends Tab implements TabSelectionListener {
contextMenu.getItems().add(downloadRecording);
}
MenuItem regenPlaylist = new MenuItem("Regenerate Playlist");
regenPlaylist.setOnAction((e) -> {
try {
recorder.regeneratePlaylist(recordings.get(0));
} catch (IOException | InvalidKeyException | NoSuchAlgorithmException | IllegalStateException e1) {
showErrorDialog("Error while regenerating playlist", "The recording could not be regenerated", e1);
LOG.error("Error while regenerating playlist", e1);
}
});
if (!Config.getInstance().getSettings().localRecording && recordings.get(0).getStatus() == State.FINISHED) {
contextMenu.getItems().add(regenPlaylist);
}
if(recordings.size() > 1) {
openInPlayer.setDisable(true);
openDir.setDisable(true);
regenPlaylist.setDisable(true);
}
return contextMenu;
@ -561,7 +547,7 @@ public class RecordingsTab extends Tab implements TabSelectionListener {
msg = "Delete " + recordings.size() + " recordings for good?";
} else {
Recording r = recordings.get(0);
msg = "Delete " + r.getModelName() + "/" + r.getStartDate() + " for good?";
msg = "Delete " + r.getModel().getName() + "/" + r.getStartDate() + " for good?";
}
AutosizeAlert confirm = new AutosizeAlert(AlertType.CONFIRMATION, msg, getTabPane().getScene(), YES, NO);
confirm.setTitle("Delete recording?");

View File

@ -276,11 +276,11 @@ public class ThumbCell extends StackPane {
}
int[] resolution = resolutionCache.getIfPresent(model);
if(resolution != null) {
if (resolution != null) {
ThumbOverviewTab.threadPool.submit(() -> {
try {
updateResolutionTag(resolution);
} catch(Exception e) {
} catch (Exception e) {
LOG.warn("Couldn't update resolution tag for model {}", model.getName(), e);
}
});

View File

@ -19,6 +19,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@ -87,7 +88,7 @@ public class ThumbOverviewTab extends Tab implements TabSelectionListener {
private static final transient Logger LOG = LoggerFactory.getLogger(ThumbOverviewTab.class);
protected static BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
static ExecutorService threadPool = new ThreadPoolExecutor(2, 2, 10, TimeUnit.MINUTES, queue);
static ExecutorService threadPool = new ThreadPoolExecutor(2, 2, 10, TimeUnit.MINUTES, queue, createThreadFactory());
static Set<Model> resolutionProcessing = Collections.synchronizedSet(new HashSet<>());
protected FlowPane grid = new FlowPane();
@ -852,4 +853,15 @@ public class ThumbOverviewTab extends Tab implements TabSelectionListener {
selectedThumbCells.get(0).setSelected(false);
}
}
private static int threadCounter = 0;
private static ThreadFactory createThreadFactory() {
return r -> {
Thread t = new Thread(r);
t.setDaemon(true);
t.setPriority(Thread.MIN_PRIORITY);
t.setName("ResolutionDetector-" + threadCounter++);
return t;
};
}
}

View File

@ -17,7 +17,7 @@ import javafx.scene.Node;
public class ModelMassEditAction {
static BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
static ExecutorService threadPool = new ThreadPoolExecutor(2, 2, 10, TimeUnit.MINUTES, queue);
static ExecutorService threadPool = new ThreadPoolExecutor(2, 10, 10, TimeUnit.MINUTES, queue);
protected List<? extends Model> models;
protected Consumer<Model> action;
@ -39,14 +39,14 @@ public class ModelMassEditAction {
}
public void execute(Consumer<Model> callback) {
Consumer<Model> cb = Objects.requireNonNull(callback);
Consumer<Model> cb = Objects.requireNonNull(callback, "Callback is null, call execute() instead");
source.setCursor(Cursor.WAIT);
threadPool.submit(() -> {
for (Model model : models) {
for (Model model : models) {
threadPool.submit(() -> {
action.accept(model);
cb.accept(model);
}
Platform.runLater(() -> source.setCursor(Cursor.DEFAULT));
});
Platform.runLater(() -> source.setCursor(Cursor.DEFAULT));
});
}
}
}

View File

@ -21,7 +21,7 @@ public class PlayAction {
public void execute() {
source.setCursor(Cursor.WAIT);
new Thread(() -> {
Thread t = new Thread(() -> {
SiteUI siteUI = SiteUiFactory.getUi(selectedModel.getSite());
boolean started = siteUI.play(selectedModel);
Platform.runLater(() -> {
@ -30,6 +30,9 @@ public class PlayAction {
}
source.setCursor(Cursor.DEFAULT);
});
}).start();
});
t.setName("Player " + selectedModel);
t.setDaemon(true);
t.start();
}
}

View File

@ -2,9 +2,6 @@ package ctbrec.ui.sites.camsoda;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ctbrec.sites.ConfigUI;
import ctbrec.sites.camsoda.Camsoda;
import ctbrec.ui.TabProvider;
@ -12,8 +9,6 @@ import ctbrec.ui.sites.AbstractSiteUi;
public class CamsodaSiteUi extends AbstractSiteUi {
private static final transient Logger LOG = LoggerFactory.getLogger(CamsodaSiteUi.class);
private CamsodaTabProvider tabProvider;
private CamsodaConfigUI configUi;
private Camsoda camsoda;

View File

@ -34,6 +34,7 @@
<logger name="ctbrec.recorder.server.RecorderServlet" level="INFO"/>
<logger name="ctbrec.io.CookieJarImpl" level="INFO"/>
<logger name="ctbrec.ui.ThumbOverviewTab" level="DEBUG"/>
<logger name="ctbrec.recorder.RecordingFileMonitor" level="TRACE"/>
<logger name="org.eclipse.jetty" level="INFO" />
<logger name="streamer" level="ERROR" />

View File

@ -1,18 +1,38 @@
package ctbrec;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import com.iheartradio.m3u8.Encoding;
import com.iheartradio.m3u8.Format;
import com.iheartradio.m3u8.ParseException;
import com.iheartradio.m3u8.ParsingMode;
import com.iheartradio.m3u8.PlaylistException;
import com.iheartradio.m3u8.PlaylistParser;
import com.iheartradio.m3u8.data.MediaPlaylist;
import com.iheartradio.m3u8.data.Playlist;
import com.iheartradio.m3u8.data.TrackData;
import ctbrec.event.EventBusHolder;
import ctbrec.event.RecordingStateChangedEvent;
import ctbrec.recorder.download.Download;
public class Recording {
private String modelName;
private Model model;
private transient Download download;
private Instant startDate;
private String path;
private boolean hasPlaylist;
private State status = State.UNKNOWN;
private int progress = -1;
private long sizeInByte;
private long sizeInByte = -1;
private String metaDataFile;
public static enum State {
RECORDING("recording"),
@ -21,7 +41,11 @@ public class Recording {
POST_PROCESSING("post-processing"),
FINISHED("finished"),
DOWNLOADING("downloading"),
UNKNOWN("unknown");
DELETING("deleting"),
DELETED("deleted"),
UNKNOWN("unknown"),
WAITING("waiting"),
FAILED("failed");
private String desc;
@ -37,21 +61,14 @@ public class Recording {
public Recording() {}
public Recording(String path) throws ParseException {
this.path = path;
this.modelName = path.substring(0, path.indexOf("/"));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm");
Date date = sdf.parse(path.substring(path.indexOf('/')+1));
startDate = Instant.ofEpochMilli(date.getTime());
}
// public Recording(String path) throws ParseException {
// this.path = path;
// this.modelName = path.substring(0, path.indexOf("/"));
// SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm");
// Date date = sdf.parse(path.substring(path.indexOf('/')+1));
// startDate = Instant.ofEpochMilli(date.getTime());
// }
public String getModelName() {
return modelName;
}
public void setModelName(String modelName) {
this.modelName = modelName;
}
public Instant getStartDate() {
return startDate;
@ -69,6 +86,11 @@ public class Recording {
this.status = status;
}
public void setStatusWithEvent(State status, boolean fireEvent) {
setStatus(status);
fireStatusEvent(status);
}
public int getProgress() {
return this.progress;
}
@ -85,15 +107,16 @@ public class Recording {
this.path = path;
}
public boolean hasPlaylist() {
return hasPlaylist;
}
public void setHasPlaylist(boolean hasPlaylist) {
this.hasPlaylist = hasPlaylist;
public File getAbsoluteFile() {
String recordingsDir = Config.getInstance().getSettings().recordingsDir;
File recordingsFile = new File(recordingsDir, getPath());
return recordingsFile;
}
public long getSizeInByte() {
if (sizeInByte == -1 || getStatus() == State.RECORDING) {
this.sizeInByte = getSize();
}
return sizeInByte;
}
@ -101,13 +124,86 @@ public class Recording {
this.sizeInByte = sizeInByte;
}
public void postprocess() {
getDownload().postprocess(this);
}
private void fireStatusEvent(State status) {
RecordingStateChangedEvent evt = new RecordingStateChangedEvent(getDownload().getTarget(), status, getModel(), getStartDate());
EventBusHolder.BUS.post(evt);
}
public Model getModel() {
return model;
}
public void setModel(Model model) {
this.model = model;
}
public Download getDownload() {
return download;
}
public void setDownload(Download download) {
this.download = download;
}
public String getMetaDataFile() {
return metaDataFile;
}
public void setMetaDataFile(String metaDataFile) {
this.metaDataFile = metaDataFile;
}
public Duration getLength() throws IOException, ParseException, PlaylistException {
// check, if the recording exists
File rec = new File(Config.getInstance().getSettings().recordingsDir, getPath());
if (!rec.exists()) {
return Duration.ofSeconds(0);
}
// check, if the recording has data at all
long size = getSizeInByte();
if (size == 0) {
return Duration.ofSeconds(0);
}
// determine the length
if (getPath().endsWith(".ts")) {
return Duration.ofSeconds((long) MpegUtil.getFileDuration(rec));
} else if (rec.isDirectory()) {
File playlist = new File(rec, "playlist.m3u8");
if (playlist.exists()) {
return Duration.ofSeconds((long) getPlaylistLength(playlist));
}
}
return Duration.ofSeconds(0);
}
private double getPlaylistLength(File playlist) throws IOException, ParseException, PlaylistException {
if (playlist.exists()) {
PlaylistParser playlistParser = new PlaylistParser(new FileInputStream(playlist), Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT);
Playlist m3u = playlistParser.parse();
MediaPlaylist mediaPlaylist = m3u.getMediaPlaylist();
double length = 0;
for (TrackData trackData : mediaPlaylist.getTracks()) {
length += trackData.getTrackInfo().duration;
}
return length;
} else {
throw new FileNotFoundException(playlist.getAbsolutePath() + " does not exist");
}
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((modelName == null) ? 0 : modelName.hashCode());
result = prime * result + ((path == null) ? 0 : path.hashCode());
result = prime * result + ((startDate == null) ? 0 : startDate.hashCode());
result = prime * result + ((getModel() == null) ? 0 : getModel().hashCode());
result = prime * result + ((getPath() == null) ? 0 : getPath().hashCode());
result = prime * result + ((getStartDate() == null) ? 0 : getStartDate().hashCode());
return result;
}
@ -117,22 +213,46 @@ public class Recording {
return true;
if (obj == null)
return false;
Recording other = (Recording) obj;
if (modelName == null) {
if (other.getModelName() != null)
return false;
} else if (!modelName.equals(other.getModelName()))
if(!(obj instanceof Recording)) {
return false;
if (path == null) {
}
Recording other = (Recording) obj;
if (getModel() == null) {
if (other.getModel() != null)
return false;
} else if (!getModel().equals(other.getModel()))
return false;
if (getPath() == null) {
if (other.getPath() != null)
return false;
} else if (!path.equals(other.getPath()))
} else if (!getPath().equals(other.getPath()))
return false;
if (startDate == null) {
if (getStartDate() == null) {
if (other.getStartDate() != null)
return false;
} else if (!startDate.equals(other.getStartDate()))
} else if (!getStartDate().equals(other.getStartDate()))
return false;
return true;
}
@Override
public String toString() {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.RECORDING_DATE_FORMAT);
LocalDateTime localStartDate = LocalDateTime.ofInstant(getStartDate(), ZoneId.systemDefault());
return getModel().getSanitizedNamed() + '_' + formatter.format(localStartDate);
}
private long getSize() {
File rec = new File(Config.getInstance().getSettings().recordingsDir, getPath());
if(rec.isDirectory()) {
long size = 0;
File[] files = rec.listFiles();
for (File file : files) {
size += file.length();
}
return size;
} else {
return rec.length();
}
}
}

View File

@ -1,852 +0,0 @@
package ctbrec.recorder;
import static ctbrec.Recording.State.*;
import static ctbrec.event.Event.Type.*;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.eventbus.Subscribe;
import com.iheartradio.m3u8.Encoding;
import com.iheartradio.m3u8.Format;
import com.iheartradio.m3u8.ParseException;
import com.iheartradio.m3u8.ParsingMode;
import com.iheartradio.m3u8.PlaylistError;
import com.iheartradio.m3u8.PlaylistException;
import com.iheartradio.m3u8.PlaylistParser;
import com.iheartradio.m3u8.data.MediaPlaylist;
import com.iheartradio.m3u8.data.Playlist;
import com.iheartradio.m3u8.data.TrackData;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.MpegUtil;
import ctbrec.Recording;
import ctbrec.Recording.State;
import ctbrec.event.Event;
import ctbrec.event.EventBusHolder;
import ctbrec.event.ModelIsOnlineEvent;
import ctbrec.event.RecordingStateChangedEvent;
import ctbrec.io.HttpClient;
import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException;
import ctbrec.recorder.download.Download;
public class LocalRecorder implements Recorder {
private static final transient Logger LOG = LoggerFactory.getLogger(LocalRecorder.class);
private static final boolean IGNORE_CACHE = true;
private List<Model> models = Collections.synchronizedList(new ArrayList<>());
private Map<Model, Download> recordingProcesses = Collections.synchronizedMap(new HashMap<>());
private Map<File, PlaylistGenerator> playlistGenerators = new HashMap<>();
private Config config;
private ProcessMonitor processMonitor;
private volatile boolean recording = true;
private List<File> deleteInProgress = Collections.synchronizedList(new ArrayList<>());
private RecorderHttpClient client = new RecorderHttpClient();
private ReentrantLock lock = new ReentrantLock();
private long lastSpaceMessage = 0;
private ExecutorService ppThreadPool = Executors.newFixedThreadPool(2);
public LocalRecorder(Config config) {
this.config = config;
config.getSettings().models.stream().forEach((m) -> {
if(m.getSite().isEnabled()) {
models.add(m);
} else {
LOG.info("{} disabled -> ignoring {}", m.getSite().getName(), m.getName());
}
});
recording = true;
processMonitor = new ProcessMonitor();
processMonitor.start();
registerEventBusListener();
if(Config.isServerMode()) {
processUnfinishedRecordings();
}
LOG.debug("Recorder initialized");
LOG.info("Models to record: {}", models);
LOG.info("Saving recordings in {}", config.getSettings().recordingsDir);
}
private void registerEventBusListener() {
EventBusHolder.BUS.register(new Object() {
@Subscribe
public void modelEvent(Event e) {
try {
if (e.getType() == MODEL_ONLINE) {
ModelIsOnlineEvent evt = (ModelIsOnlineEvent) e;
Model model = evt.getModel();
if(!isSuspended(model) && !recordingProcesses.containsKey(model)) {
startRecordingProcess(model);
}
}
} catch (Exception e1) {
LOG.error("Error while handling model state changed event", e);
}
}
});
}
@Override
public void startRecording(Model model) {
if (!models.contains(model)) {
LOG.info("Model {} added", model);
lock.lock();
try {
models.add(model);
config.getSettings().models.add(model);
config.save();
} catch (IOException e) {
LOG.error("Couldn't save config", e);
} finally {
lock.unlock();
}
}
}
@Override
public void stopRecording(Model model) throws IOException {
lock.lock();
try {
if (models.contains(model)) {
models.remove(model);
config.getSettings().models.remove(model);
if (recordingProcesses.containsKey(model)) {
stopRecordingProcess(model);
}
LOG.info("Model {} removed", model);
config.save();
} else {
throw new NoSuchElementException("Model " + model.getName() + " ["+model.getUrl()+"] not found in list of recorded models");
}
} finally {
lock.unlock();
}
}
private void startRecordingProcess(Model model) throws IOException {
if(!recording) {
// recorder is not in recording mode
return;
}
if(model.isSuspended()) {
LOG.info("Recording for model {} is suspended.", model);
return;
}
if (recordingProcesses.containsKey(model)) {
LOG.error("A recording for model {} is already running", model);
return;
}
lock.lock();
try {
if (!models.contains(model)) {
LOG.info("Model {} has been removed. Restarting of recording cancelled.", model);
return;
}
} finally {
lock.unlock();
}
if(!enoughSpaceForRecording()) {
long now = System.currentTimeMillis();
if( (now - lastSpaceMessage) > TimeUnit.MINUTES.toMillis(1)) {
LOG.info("Not enough space for recording, not starting recording for {}", model);
lastSpaceMessage = now;
}
return;
}
if(!downloadSlotAvailable()) {
LOG.info("The number of downloads is maxed out, not starting recording for {}", model);
return;
}
LOG.debug("Starting recording for model {}", model.getName());
Download download = model.createDownload();
LOG.debug("Downloading with {}", download.getClass().getSimpleName());
recordingProcesses.put(model, download);
Thread downloadThread = new Thread(() -> {
try {
download.start(model, config);
} catch (IOException e) {
LOG.error("Download for {} failed. Download alive: {}", model.getName(), download.isAlive(), e);
}
});
downloadThread.setName("Download " + model.getName());
downloadThread.start();
}
private boolean downloadSlotAvailable() {
int concurrentRecordings = Config.getInstance().getSettings().concurrentRecordings;
return concurrentRecordings == 0 || concurrentRecordings > 0 && recordingProcesses.size() < concurrentRecordings;
}
private void stopRecordingProcess(Model model) {
LOG.debug("Stopping recording for {}", model);
Download download = recordingProcesses.get(model);
recordingProcesses.remove(model);
fireRecordingStateChanged(download.getTarget(), STOPPED, model, download.getStartTime());
Runnable stopAndThePostProcess = () -> {
LOG.debug("Stopping download for {}", model);
download.stop();
LOG.debug("Running post-processing for {}", model);
createPostProcessor(download).run();
};
ppThreadPool.submit(stopAndThePostProcess);
}
@Override
public boolean isTracked(Model model) {
lock.lock();
try {
return models.contains(model);
} finally {
lock.unlock();
}
}
@Override
public boolean isSuspended(Model model) {
lock.lock();
try {
int index = models.indexOf(model);
if(index >= 0) {
Model m = models.get(index);
return m.isSuspended();
} else {
return false;
}
} finally {
lock.unlock();
}
}
@Override
public List<Model> getModels() {
lock.lock();
try {
return Collections.unmodifiableList(new ArrayList<>(models));
} finally {
lock.unlock();
}
}
@Override
public List<Model> getOnlineModels() {
return getModels()
.stream()
.filter(m -> {
try {
return m.isOnline();
} catch (IOException | ExecutionException | InterruptedException e) {
return false;
}
})
.collect(Collectors.toList());
}
@Override
public void shutdown() {
LOG.info("Shutting down");
recording = false;
LOG.debug("Stopping monitor threads");
processMonitor.running = false;
LOG.debug("Stopping all recording processes");
stopRecordingProcesses();
ppThreadPool.shutdown();
try {
ppThreadPool.awaitTermination(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
LOG.error("Couldn't wait for post-processing to finish. Some recordings might be broken!");
}
client.shutdown();
}
private void stopRecordingProcesses() {
lock.lock();
try {
for (Model model : models) {
Download recordingProcess = recordingProcesses.get(model);
if (recordingProcess != null) {
stopRecordingProcess(model);
}
}
} finally {
lock.unlock();
}
}
private void tryRestartRecording(Model model) {
if (!recording) {
// recorder is not in recording state
return;
}
try {
boolean modelInRecordingList = isTracked(model);
boolean online = model.isOnline(IGNORE_CACHE);
if (modelInRecordingList && online) {
LOG.info("Restarting recording for model {}", model);
recordingProcesses.remove(model);
startRecordingProcess(model);
}
} catch (Exception e) {
LOG.error("Couldn't restart recording for model {}", model);
}
}
private class ProcessMonitor extends Thread {
private volatile boolean running = false;
public ProcessMonitor() {
setName("ProcessMonitor");
setDaemon(true);
}
@Override
public void run() {
running = true;
while (running) {
try {
if(!enoughSpaceForRecording() && !recordingProcesses.isEmpty()) {
LOG.info("No space left -> Stopping all recordings");
stopRecordingProcesses();
}
} catch (IOException e1) {
LOG.warn("Couldn't check free space left", e1);
}
List<Model> restart = new ArrayList<>();
for (Iterator<Entry<Model, Download>> iterator = recordingProcesses.entrySet().iterator(); iterator.hasNext();) {
Entry<Model, Download> entry = iterator.next();
Model m = entry.getKey();
Download download = entry.getValue();
if (!download.isAlive()) {
LOG.debug("Recording terminated for model {}", m.getName());
iterator.remove();
restart.add(m);
fireRecordingStateChanged(download.getTarget(), STOPPED, m, download.getStartTime());
Runnable pp = createPostProcessor(download);
ppThreadPool.submit(pp);
}
}
for (Model m : restart) {
tryRestartRecording(m);
}
try {
if (running)
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.error("Couldn't sleep", e);
}
}
LOG.debug(getName() + " terminated");
}
}
private void generatePlaylist(File recDir) {
if(!config.getSettings().generatePlaylist) {
return;
}
PlaylistGenerator playlistGenerator = new PlaylistGenerator();
playlistGenerators.put(recDir, playlistGenerator);
try {
File playlist = playlistGenerator.generate(recDir);
if(playlist != null) {
playlistGenerator.validate(recDir);
}
} catch (IOException | ParseException e) {
LOG.error("Couldn't generate playlist file", e);
} catch (PlaylistException e) {
if(e.getErrors().isEmpty()) {
LOG.error("Couldn't generate playlist file", e);
} else {
LOG.error("Playlist contains errors");
for (PlaylistError error : e.getErrors()) {
LOG.error("Error: {}", error.toString());
}
}
} catch (InvalidPlaylistException e) {
LOG.error("Playlist is invalid and will be deleted", e);
File playlist = new File(recDir, "playlist.m3u8");
playlist.delete();
} finally {
playlistGenerators.remove(recDir);
}
}
private void fireRecordingStateChanged(File path, Recording.State newState, Model model, Instant startTime) {
RecordingStateChangedEvent evt = new RecordingStateChangedEvent(path, newState, model, startTime);
EventBusHolder.BUS.post(evt);
}
/**
* This is called once at start for server mode. When the server is killed, recordings are
* left without playlist. This method creates playlists for them.
*/
private void processUnfinishedRecordings() {
try {
List<Recording> recs = getRecordings();
for (Recording rec : recs) {
if (rec.getStatus() == RECORDING) {
boolean recordingProcessFound = false;
File recordingsDir = new File(config.getSettings().recordingsDir);
File recDir = new File(recordingsDir, rec.getPath());
for (Entry<Model, Download> download : recordingProcesses.entrySet()) {
if (download.getValue().getTarget().equals(recDir)) {
recordingProcessFound = true;
}
}
if (!recordingProcessFound) {
ppThreadPool.submit(() -> {
generatePlaylist(recDir);
});
}
}
}
} catch (Exception e) {
LOG.error("Unexpected error in playlist trigger", e);
}
}
@Override
public List<Recording> getRecordings() {
if(Config.isServerMode()) {
return listSegmentedRecordings();
} else {
return listMergedRecordings();
}
}
private List<Recording> listMergedRecordings() {
File recordingsDir = new File(config.getSettings().recordingsDir);
List<File> possibleRecordings = new LinkedList<>();
listRecursively(recordingsDir, possibleRecordings, (dir, name) -> name.matches(".*?_\\d{4}-\\d{2}-\\d{2}_\\d{2}-\\d{2}-\\d{2}_\\d{3}\\.(ts|mp4)"));
SimpleDateFormat sdf = new SimpleDateFormat(Config.RECORDING_DATE_FORMAT);
List<Recording> recordings = new ArrayList<>();
for (File ts: possibleRecordings) {
try {
String filename = ts.getName();
int extLength = filename.length() - filename.lastIndexOf('.');
String dateString = filename.substring(filename.length() - extLength - Config.RECORDING_DATE_FORMAT.length(), filename.length() - extLength);
Date startDate = sdf.parse(dateString);
Recording recording = new Recording();
recording.setModelName(filename.substring(0, filename.length() - extLength - 1 - Config.RECORDING_DATE_FORMAT.length()));
recording.setStartDate(Instant.ofEpochMilli(startDate.getTime()));
String path = ts.getAbsolutePath().replace(config.getSettings().recordingsDir, "");
if(!path.startsWith("/")) {
path = '/' + path;
}
recording.setPath(path);
recording.setSizeInByte(ts.length());
recording.setStatus(getStatus(recording));
recordings.add(recording);
} catch(Exception e) {
LOG.error("Ignoring {} - {}", ts.getAbsolutePath(), e.getMessage());
}
}
return recordings;
}
private State getStatus(Recording recording) {
File absolutePath = new File(Config.getInstance().getSettings().recordingsDir, recording.getPath());
PlaylistGenerator playlistGenerator = playlistGenerators.get(absolutePath);
if (playlistGenerator != null) {
recording.setProgress(playlistGenerator.getProgress());
return GENERATING_PLAYLIST;
}
if (Config.isServerMode()) {
if (recording.hasPlaylist()) {
return FINISHED;
} else {
return RECORDING;
}
} else {
boolean dirUsedByRecordingProcess = false;
for (Download download : recordingProcesses.values()) {
if(absolutePath.equals(download.getTarget())) {
dirUsedByRecordingProcess = true;
break;
}
}
if(dirUsedByRecordingProcess) {
return RECORDING;
} else {
return FINISHED;
}
}
}
private List<Recording> listSegmentedRecordings() {
List<Recording> recordings = new ArrayList<>();
File recordingsDir = new File(config.getSettings().recordingsDir);
File[] subdirs = recordingsDir.listFiles();
if (subdirs == null) {
return Collections.emptyList();
}
for (File subdir : subdirs) {
// ignore empty directories
File[] recordingsDirs = subdir.listFiles();
if(recordingsDirs == null || recordingsDirs.length == 0) {
continue;
}
// start going over valid directories
for (File rec : recordingsDirs) {
SimpleDateFormat sdf = new SimpleDateFormat(Config.RECORDING_DATE_FORMAT);
if (rec.isDirectory()) {
try {
// ignore directories, which are probably not created by ctbrec
if (rec.getName().length() != Config.RECORDING_DATE_FORMAT.length()) {
continue;
}
// ignore empty directories
if (rec.listFiles().length == 0) {
continue;
}
// don't list recordings, which currently get deleted
if (deleteInProgress.contains(rec)) {
continue;
}
Date startDate = sdf.parse(rec.getName());
Recording recording = new Recording();
recording.setModelName(subdir.getName());
recording.setStartDate(Instant.ofEpochMilli(startDate.getTime()));
recording.setPath(recording.getModelName() + "/" + rec.getName());
recording.setSizeInByte(getSize(rec));
File playlist = new File(rec, "playlist.m3u8");
recording.setHasPlaylist(playlist.exists());
recording.setStatus(getStatus(recording));
recordings.add(recording);
} catch (Exception e) {
LOG.debug("Ignoring {} - {}", rec.getAbsolutePath(), e.getMessage());
}
}
}
}
return recordings;
}
private void listRecursively(File dir, List<File> result, FilenameFilter filenameFilter) {
File[] files = dir.listFiles();
if(files != null) {
for (File file : files) {
if(file.isDirectory()) {
listRecursively(file, result, filenameFilter);
}
if(filenameFilter.accept(dir, file.getName())) {
result.add(file);
}
}
}
}
private long getSize(File rec) {
long size = 0;
File[] files = rec.listFiles();
for (File file : files) {
size += file.length();
}
return size;
}
@Override
public void delete(Recording recording) throws IOException {
File recordingsDir = new File(config.getSettings().recordingsDir);
File path = new File(recordingsDir, recording.getPath());
LOG.debug("Deleting {}", path);
if(path.isFile()) {
Files.delete(path.toPath());
deleteEmptyParents(path.getParentFile());
} else {
deleteDirectory(path);
deleteEmptyParents(path);
}
}
private void deleteEmptyParents(File parent) throws IOException {
File recDir = new File(Config.getInstance().getSettings().recordingsDir);
while(parent != null && parent.list() != null && parent.list().length == 0) {
if(parent.equals(recDir)) {
return;
}
LOG.debug("Deleting empty directory {}", parent.getAbsolutePath());
Files.delete(parent.toPath());
parent = parent.getParentFile();
}
}
private void deleteDirectory(File directory) throws IOException {
if (!directory.exists()) {
throw new IOException("Recording does not exist");
}
try {
deleteInProgress.add(directory);
File[] files = directory.listFiles();
boolean deletedAllFiles = true;
for (File file : files) {
try {
LOG.trace("Deleting {}", file.getAbsolutePath());
Files.delete(file.toPath());
} catch (Exception e) {
deletedAllFiles = false;
LOG.debug("Couldn't delete {}", file, e);
}
}
if (!deletedAllFiles) {
throw new IOException("Couldn't delete all files in " + directory);
}
} finally {
deleteInProgress.remove(directory);
}
}
@Override
public void switchStreamSource(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
if (models.contains(model)) {
int index = models.indexOf(model);
models.get(index).setStreamUrlIndex(model.getStreamUrlIndex());
config.save();
LOG.debug("Switching stream source to index {} for model {}", model.getStreamUrlIndex(), model.getName());
Download download = recordingProcesses.get(model);
if(download != null) {
stopRecordingProcess(model);
}
tryRestartRecording(model);
} else {
LOG.warn("Couldn't switch stream source for model {}. Not found in list", model.getName());
return;
}
}
@Override
public void suspendRecording(Model model) {
lock.lock();
try {
if (models.contains(model)) {
int index = models.indexOf(model);
models.get(index).setSuspended(true);
model.setSuspended(true);
config.save();
} else {
LOG.warn("Couldn't suspend model {}. Not found in list", model.getName());
return;
}
} catch (IOException e) {
LOG.error("Couldn't save config", e);
} finally {
lock.unlock();
}
Download download = recordingProcesses.get(model);
if(download != null) {
stopRecordingProcess(model);
}
}
@Override
public void resumeRecording(Model model) throws IOException {
lock.lock();
try {
if (models.contains(model)) {
int index = models.indexOf(model);
Model m = models.get(index);
m.setSuspended(false);
if(m.isOnline()) {
startRecordingProcess(m);
}
model.setSuspended(false);
config.save();
} else {
LOG.warn("Couldn't resume model {}. Not found in list", model.getName());
return;
}
} catch (ExecutionException | InterruptedException e) {
LOG.error("Couldn't check, if model {} is online", model.getName());
} finally {
lock.unlock();
}
}
@Override
public HttpClient getHttpClient() {
return client;
}
@Override
public long getTotalSpaceBytes() throws IOException {
return getRecordingsFileStore().getTotalSpace();
}
@Override
public long getFreeSpaceBytes() throws IOException {
return getRecordingsFileStore().getUsableSpace();
}
private FileStore getRecordingsFileStore() throws IOException {
File recordingsDir = new File(config.getSettings().recordingsDir);
if(!recordingsDir.exists()) {
Files.createDirectories(recordingsDir.toPath());
}
FileStore store = Files.getFileStore(recordingsDir.toPath());
return store;
}
private boolean enoughSpaceForRecording() throws IOException {
long minimum = config.getSettings().minimumSpaceLeftInBytes;
if(minimum == 0) { // 0 means don't check
return true;
} else {
return getFreeSpaceBytes() > minimum;
}
}
private Runnable createPostProcessor(Download download) {
return () -> {
LOG.debug("Starting post-processing for {}", download.getTarget());
if(Config.isServerMode()) {
fireRecordingStateChanged(download.getTarget(), GENERATING_PLAYLIST, download.getModel(), download.getStartTime());
generatePlaylist(download.getTarget());
}
boolean deleted = deleteIfTooShort(download);
if(!deleted) {
fireRecordingStateChanged(download.getTarget(), POST_PROCESSING, download.getModel(), download.getStartTime());
download.postprocess(download.getTarget());
}
fireRecordingStateChanged(download.getTarget(), FINISHED, download.getModel(), download.getStartTime());
};
}
// TODO maybe get file size and bitrate and check, if the values are plausible
// we could also compare the length with the time elapsed since starting the recording
private boolean deleteIfTooShort(Download download) {
try {
File target = download.getTarget();
if(!target.exists()) {
return true;
}
if(target.isDirectory()) {
if(target.list() == null || target.list().length == 0) {
deleteDirectory(target);
deleteEmptyParents(target);
return true;
}
} else {
if(target.length() == 0) {
Files.delete(target.toPath());
deleteEmptyParents(target.getParentFile());
return true;
}
}
long minimumLengthInSeconds = Config.getInstance().getSettings().minimumLengthInSeconds;
if(minimumLengthInSeconds <= 0) {
return false;
}
LOG.debug("Determining video length for {}", download.getTarget());
double duration = 0;
if(target.isDirectory()) {
File playlist = new File(target, "playlist.m3u8");
duration = getPlaylistLength(playlist);
} else {
duration = MpegUtil.getFileDuration(target);
}
Duration minLength = Duration.ofSeconds(minimumLengthInSeconds);
Duration videoLength = Duration.ofSeconds((long) duration);
LOG.debug("Recording started at:{}. Video length is {}", download.getStartTime(), videoLength);
if(videoLength.minus(minLength).isNegative()) {
LOG.debug("Video too short {} {}", videoLength, download.getTarget());
LOG.debug("Deleting {}", target);
if(target.isDirectory()) {
deleteDirectory(target);
deleteEmptyParents(target);
} else {
Files.delete(target.toPath());
deleteEmptyParents(target.getParentFile());
}
return true;
} else {
return false;
}
} catch (Exception e) {
LOG.error("Couldn't check video length", e);
return false;
}
}
private double getPlaylistLength(File playlist) throws IOException, ParseException, PlaylistException {
if(playlist.exists()) {
PlaylistParser playlistParser = new PlaylistParser(new FileInputStream(playlist), Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT);
Playlist m3u = playlistParser.parse();
MediaPlaylist mediaPlaylist = m3u.getMediaPlaylist();
double length = 0;
for (TrackData trackData : mediaPlaylist.getTracks()) {
length += trackData.getTrackInfo().duration;
}
return length;
} else {
throw new FileNotFoundException(playlist.getAbsolutePath() + " does not exist");
}
}
@Override
public void regeneratePlaylist(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
new Thread(() -> {
LOG.debug("Regenerate playlist {}", recording.getPath());
File recordingsDir = new File(config.getSettings().recordingsDir);
File path = new File(recordingsDir, recording.getPath());
generatePlaylist(path);
}).start();
}
}

View File

@ -0,0 +1,559 @@
package ctbrec.recorder;
import static ctbrec.event.Event.Type.*;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.eventbus.Subscribe;
import com.iheartradio.m3u8.ParseException;
import com.iheartradio.m3u8.PlaylistException;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.Recording;
import ctbrec.Recording.State;
import ctbrec.event.Event;
import ctbrec.event.EventBusHolder;
import ctbrec.event.ModelIsOnlineEvent;
import ctbrec.event.RecordingStateChangedEvent;
import ctbrec.io.HttpClient;
import ctbrec.recorder.download.Download;
import ctbrec.sites.Site;
public class NextGenLocalRecorder implements Recorder {
private static final transient Logger LOG = LoggerFactory.getLogger(NextGenLocalRecorder.class);
private static final boolean IGNORE_CACHE = true;
private List<Model> models = Collections.synchronizedList(new ArrayList<>());
private Config config;
private volatile boolean recording = true;
private ReentrantLock modelLock = new ReentrantLock();
private ReentrantLock recordingsLock = new ReentrantLock();
private RecorderHttpClient client = new RecorderHttpClient();
private long lastSpaceMessage = 0;
private Map<Model, Recording> recordingProcesses = Collections.synchronizedMap(new HashMap<>());
private RecordingManager recordingManager;
// thread pools for downloads and post-processing
private BlockingQueue<Runnable> downloadQueue = new SynchronousQueue<>();
private ThreadPoolExecutor downloadPool = new ThreadPoolExecutor(2, 100, 5, TimeUnit.MINUTES, downloadQueue, createThreadFactory("Download"));
private ExecutorCompletionService<Recording> completionService = new ExecutorCompletionService<>(downloadPool);
private BlockingQueue<Runnable> ppQueue = new LinkedBlockingQueue<>();
private ThreadPoolExecutor ppPool = new ThreadPoolExecutor(2, 2, 5, TimeUnit.MINUTES, ppQueue, createThreadFactory("PP"));
private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
public NextGenLocalRecorder(Config config, List<Site> sites) throws IOException {
this.config = config;
recordingManager = new RecordingManager(config, sites);
config.getSettings().models.stream().forEach((m) -> {
if (m.getSite().isEnabled()) {
models.add(m);
} else {
LOG.info("{} disabled -> ignoring {}", m.getSite().getName(), m.getName());
}
});
recording = true;
registerEventBusListener();
// if(Config.isServerMode()) {
// processUnfinishedRecordings();
// }
LOG.debug("Recorder initialized");
LOG.info("Models to record: {}", models);
LOG.info("Saving recordings in {}", config.getSettings().recordingsDir);
Thread completionHandler = new Thread(() -> {
while (!Thread.interrupted()) {
try {
Future<Recording> result = completionService.take();
Recording recording = result.get();
recordingProcesses.remove(recording.getModel());
if (recording.getStatus() == State.WAITING) {
LOG.debug("Download finished for {} -> Starting post-processing", recording.getModel().getName());
ppPool.submit(() -> {
setRecordingStatus(recording, State.POST_PROCESSING);
recordingManager.saveRecording(recording);
recording.postprocess();
setRecordingStatus(recording, State.FINISHED);
recordingManager.saveRecording(recording);
return recording;
});
// check, if we have to restart the recording
Model model = recording.getModel();
tryRestartRecording(model);
} else {
// TODO is this ok?
setRecordingStatus(recording, State.FAILED);
recordingsLock.lock();
try {
recordingManager.delete(recording);
} catch (IOException e) {
LOG.error("Couldn't delete recording {}", recording, e);
} finally {
recordingsLock.unlock();
}
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
});
completionHandler.setName("CompletionHandler");
completionHandler.setDaemon(true);
completionHandler.start();
scheduler.scheduleWithFixedDelay(() -> {
try {
if (!recordingProcesses.isEmpty() && !enoughSpaceForRecording()) {
LOG.info("No space left -> Stopping all recordings");
stopRecordingProcesses();
}
} catch (IOException e) {
LOG.error("Couldn't check space left on device", e);
}
}, 1, 1, TimeUnit.SECONDS);
}
private void setRecordingStatus(Recording recording, State status) {
recording.setStatus(status);
RecordingStateChangedEvent evt = new RecordingStateChangedEvent(recording.getDownload().getTarget(), status, recording.getModel(),
recording.getStartDate());
EventBusHolder.BUS.post(evt);
}
@Override
public void startRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
if (!models.contains(model)) {
LOG.info("Model {} added", model);
modelLock.lock();
try {
models.add(model);
config.getSettings().models.add(model);
config.save();
} catch (IOException e) {
LOG.error("Couldn't save config", e);
} finally {
modelLock.unlock();
}
// try to start the recording immediately
try {
if (model.isOnline()) {
startRecordingProcess(model);
}
} catch (ExecutionException | InterruptedException e) {
}
}
}
private void startRecordingProcess(Model model) throws IOException {
recordingsLock.lock();
try {
if (!recording) {
// recorder is not in recording mode
return;
}
if (model.isSuspended()) {
LOG.info("Recording for model {} is suspended.", model);
return;
}
if (recordingProcesses.containsKey(model)) {
LOG.error("A recording for model {} is already running", model);
return;
}
modelLock.lock();
try {
if (!models.contains(model)) {
LOG.info("Model {} has been removed. Restarting of recording cancelled.", model);
return;
}
} finally {
modelLock.unlock();
}
if (!enoughSpaceForRecording()) {
long now = System.currentTimeMillis();
if ((now - lastSpaceMessage) > TimeUnit.MINUTES.toMillis(1)) {
LOG.info("Not enough space for recording, not starting recording for {}", model);
lastSpaceMessage = now;
}
return;
}
if (!downloadSlotAvailable()) {
LOG.info("The number of downloads is maxed out, not starting recording for {}", model);
return;
}
LOG.debug("Starting recording for model {}", model.getName());
Download download = model.createDownload();
download.init(config, model);
LOG.debug("Downloading with {}", download.getClass().getSimpleName());
Recording rec = new Recording();
rec.setDownload(download);
rec.setPath(download.getPath(model));
rec.setModel(model);
rec.setStartDate(Instant.now());
recordingProcesses.put(model, rec);
recordingManager.add(rec);
completionService.submit(() -> {
try {
setRecordingStatus(rec, State.RECORDING);
recordingManager.saveRecording(rec);
download.start();
boolean deleted = deleteIfTooShort(rec);
setRecordingStatus(rec, deleted ? State.DELETED : State.WAITING);
recordingManager.saveRecording(rec);
} catch (IOException e) {
LOG.error("Download for {} failed. Download state: {}", model.getName(), rec.getStatus(), e);
}
return rec;
});
} finally {
recordingsLock.unlock();
}
}
private boolean deleteIfTooShort(Recording rec) throws IOException, ParseException, PlaylistException {
Duration minimumLengthInSeconds = Duration.ofSeconds(Config.getInstance().getSettings().minimumLengthInSeconds);
if (minimumLengthInSeconds.getSeconds() <= 0) {
return false;
}
Duration recordingLength = rec.getLength();
if (recordingLength.compareTo(minimumLengthInSeconds) < 0) {
recordingManager.delete(rec);
return true;
}
return false;
}
@Override
public void stopRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
modelLock.lock();
try {
if (models.contains(model)) {
models.remove(model);
config.getSettings().models.remove(model);
LOG.info("Model {} removed", model);
config.save();
} else {
throw new NoSuchElementException("Model " + model.getName() + " [" + model.getUrl() + "] not found in list of recorded models");
}
} finally {
modelLock.unlock();
}
if (recordingProcesses.containsKey(model)) {
Recording recording = recordingProcesses.get(model);
recording.getDownload().stop();
}
}
@Override
public void switchStreamSource(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
if (models.contains(model)) {
int index = models.indexOf(model);
models.get(index).setStreamUrlIndex(model.getStreamUrlIndex());
config.save();
LOG.debug("Switching stream source to index {} for model {}", model.getStreamUrlIndex(), model.getName());
Recording recording = recordingProcesses.get(model);
if (recording != null) {
stopRecordingProcess(model);
}
tryRestartRecording(model);
} else {
LOG.warn("Couldn't switch stream source for model {}. Not found in list", model.getName());
return;
}
}
private void stopRecordingProcess(Model model) {
LOG.debug("Stopping recording for {}", model);
Recording recording = recordingProcesses.get(model);
LOG.debug("Stopping download for {}", model);
recording.getDownload().stop();
}
private void stopRecordingProcesses() {
recordingsLock.lock();
try {
for (Recording recording : recordingProcesses.values()) {
recording.getDownload().stop();
}
} finally {
recordingsLock.unlock();
}
}
@Override
public boolean isTracked(Model model) {
modelLock.lock();
try {
return models.contains(model);
} finally {
modelLock.unlock();
}
}
@Override
public List<Model> getModels() {
modelLock.lock();
try {
return new ArrayList<>(models);
} finally {
modelLock.unlock();
}
}
@Override
public List<Recording> getRecordings() throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
recordingsLock.lock();
try {
return recordingManager.getAll();
} finally {
recordingsLock.unlock();
}
}
@Override
public void delete(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
recordingManager.delete(recording);
}
@Override
public void shutdown() {
// TODO add a config flag for waitign or stopping immediately
LOG.info("Shutting down");
recording = false;
LOG.debug("Stopping all recording processes");
for (Recording rec : recordingProcesses.values()) {
Optional.ofNullable(rec.getDownload()).ifPresent(Download::stop);
}
// wait for post-processing to finish
LOG.info("Waiting for downloads to finish");
while (!recordingProcesses.isEmpty()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.error("Error while waiting for downloads to finish", e);
}
}
// shutdown threadpools
try {
LOG.info("Shutting down pools");
downloadPool.shutdown();
ppPool.shutdown();
client.shutdown();
downloadPool.awaitTermination(1, TimeUnit.MINUTES);
LOG.info("Waiting for post-processing to finish");
ppPool.awaitTermination(10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
LOG.error("Error while waiting for pools to finish", e);
}
}
@Override
public void suspendRecording(Model model) {
modelLock.lock();
try {
if (models.contains(model)) {
int index = models.indexOf(model);
models.get(index).setSuspended(true);
model.setSuspended(true);
config.save();
} else {
LOG.warn("Couldn't suspend model {}. Not found in list", model.getName());
return;
}
} catch (IOException e) {
LOG.error("Couldn't save config", e);
} finally {
modelLock.unlock();
}
Recording recording = recordingProcesses.get(model);
Optional.ofNullable(recording).map(Recording::getDownload).ifPresent(Download::stop);
}
@Override
public void resumeRecording(Model model) throws IOException {
modelLock.lock();
try {
if (models.contains(model)) {
int index = models.indexOf(model);
Model m = models.get(index);
m.setSuspended(false);
if (m.isOnline()) {
startRecordingProcess(m);
}
model.setSuspended(false);
config.save();
} else {
LOG.warn("Couldn't resume model {}. Not found in list", model.getName());
return;
}
} catch (ExecutionException | InterruptedException e) {
LOG.error("Couldn't check, if model {} is online", model.getName());
} finally {
modelLock.unlock();
}
}
@Override
public boolean isSuspended(Model model) {
modelLock.lock();
try {
int index = models.indexOf(model);
if (index >= 0) {
Model m = models.get(index);
return m.isSuspended();
} else {
return false;
}
} finally {
modelLock.unlock();
}
}
@Override
public List<Model> getOnlineModels() {
return getModels().stream().filter(m -> {
try {
return m.isOnline();
} catch (IOException | ExecutionException | InterruptedException e) {
return false;
}
}).collect(Collectors.toList());
}
@Override
public HttpClient getHttpClient() {
return client;
}
@Override
public long getTotalSpaceBytes() throws IOException {
return getRecordingsFileStore().getTotalSpace();
}
@Override
public long getFreeSpaceBytes() throws IOException {
return getRecordingsFileStore().getUsableSpace();
}
private FileStore getRecordingsFileStore() throws IOException {
File recordingsDir = new File(config.getSettings().recordingsDir);
if (!recordingsDir.exists()) {
Files.createDirectories(recordingsDir.toPath());
}
FileStore store = Files.getFileStore(recordingsDir.toPath());
return store;
}
private boolean enoughSpaceForRecording() throws IOException {
long minimum = config.getSettings().minimumSpaceLeftInBytes;
if (minimum == 0) { // 0 means don't check
return true;
} else {
return getFreeSpaceBytes() > minimum;
}
}
private boolean downloadSlotAvailable() {
int concurrentRecordings = Config.getInstance().getSettings().concurrentRecordings;
return concurrentRecordings == 0 || concurrentRecordings > 0 && recordingProcesses.size() < concurrentRecordings;
}
private void tryRestartRecording(Model model) {
if (!recording) {
// recorder is not in recording state
return;
}
try {
boolean modelInRecordingList = isTracked(model);
boolean online = model.isOnline(IGNORE_CACHE);
if (modelInRecordingList && online) {
LOG.info("Restarting recording for model {}", model);
startRecordingProcess(model);
}
} catch (Exception e) {
LOG.error("Couldn't restart recording for model {}", model);
}
}
private void registerEventBusListener() {
EventBusHolder.BUS.register(new Object() {
@Subscribe
public void modelEvent(Event e) {
try {
if (e.getType() == MODEL_ONLINE) {
ModelIsOnlineEvent evt = (ModelIsOnlineEvent) e;
Model model = evt.getModel();
if (!isSuspended(model) && !recordingProcesses.containsKey(model)) {
startRecordingProcess(model);
}
}
} catch (Exception e1) {
LOG.error("Error while handling model state changed event {}", e, e1);
}
}
});
}
private ThreadFactory createThreadFactory(String name) {
return new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(name + " " + UUID.randomUUID().toString().substring(0, 8));
t.setDaemon(true);
return t;
}
};
}
}

View File

@ -148,7 +148,7 @@ public class PlaylistGenerator {
if(segments.length == 0) {
throw new InvalidPlaylistException("No segments found. Playlist is empty");
} else if(segments.length != playlistSize) {
throw new InvalidPlaylistException("Playlist size and amount of segments differ");
throw new InvalidPlaylistException("Playlist size and amount of segments differ (" + segments.length + " != " + playlistSize + ")");
} else {
LOG.debug("Generated playlist looks good");
}

View File

@ -59,7 +59,7 @@ public interface Recorder {
List<Recording> recordings = getRecordings();
return getModels().stream().filter(m -> {
for (Recording recording : recordings) {
if (recording.getStatus() == Recording.State.RECORDING && recording.getModelName().equals(m.getSanitizedNamed())) {
if (recording.getStatus() == Recording.State.RECORDING && recording.getModel().equals(m)) {
return true;
}
}
@ -82,16 +82,4 @@ public interface Recorder {
* @throws IOException
*/
public long getFreeSpaceBytes() throws IOException;
/**
* Regenerate the playlist for a recording. This is helpful, if the
* playlist is corrupt or hasn't been generated for whatever reason
* @param recording
* @throws IllegalStateException
* @throws NoSuchAlgorithmException
* @throws InvalidKeyException
* @throws IOException
*/
public void regeneratePlaylist(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException;
}

View File

@ -0,0 +1,236 @@
package ctbrec.recorder;
import static java.nio.file.StandardWatchEventKinds.*;
import java.io.File;
import java.io.IOException;
import java.nio.file.ClosedWatchServiceException;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ctbrec.Config;
import ctbrec.Recording;
public class RecordingFileMonitor {
private static final transient Logger LOG = LoggerFactory.getLogger(RecordingFileMonitor.class);
private WatchService watcher;
private Map<WatchKey, Path> keys;
private boolean running = true;
private RecordingManager manager;
public RecordingFileMonitor(RecordingManager manager) throws IOException {
this.manager = manager;
this.watcher = FileSystems.getDefault().newWatchService();
this.keys = new HashMap<>();
registerAll(new File(Config.getInstance().getSettings().recordingsDir).toPath());
}
void processEvents() {
while (running) {
// wait for key to be signalled
WatchKey key;
try {
key = watcher.take();
} catch (InterruptedException | ClosedWatchServiceException x) {
return;
}
Path dir = keys.get(key);
if (dir == null) {
LOG.error("WatchKey not recognized!!");
continue;
}
List<WatchEvent<?>> events = key.pollEvents();
LOG.debug("Size: {}", events.size());
if (isRenameProcess(events)) {
handleRename(dir, events);
} else {
for (WatchEvent<?> event : events) {
WatchEvent.Kind<?> kind = event.kind();
// TBD - provide example of how OVERFLOW event is handled
if (kind == OVERFLOW) {
continue;
}
// Context for directory entry event is the file name of entry
WatchEvent<Path> ev = cast(event);
Path name = ev.context();
Path child = dir.resolve(name);
if(Files.isRegularFile(child)) {
if (kind == ENTRY_CREATE) {
handleFileCreation(child);
} else if (kind == ENTRY_DELETE) {
handleFileDeletion(child);
}
} else {
if (kind == ENTRY_CREATE) {
handleDirCreation(child);
} else if (kind == ENTRY_DELETE) {
handleDirDeletion(child);
}
}
}
}
// reset key and remove from set if directory no longer accessible
boolean valid = key.reset();
if (!valid) {
keys.remove(key);
// all directories are inaccessible
if (keys.isEmpty()) {
break;
}
}
}
}
private void handleRename(Path dir, List<WatchEvent<?>> events) {
WatchEvent<Path> deleteEvent = cast(events.get(0));
WatchEvent<Path> createEvent = cast(events.get(1));
Path from = dir.resolve(deleteEvent.context());
Path to = dir.resolve(createEvent.context());
LOG.debug("{} -> {}", from, to);
List<Recording> affectedRecordings = getAffectedRecordings(from);
adjustPaths(affectedRecordings, from, to);
if (Files.isDirectory(to, LinkOption.NOFOLLOW_LINKS)) {
unregister(from);
try {
registerAll(to);
} catch (IOException e) {
e.printStackTrace();
}
}
}
private List<Recording> getAffectedRecordings(Path from) {
String f = from.toAbsolutePath().toString();
List<Recording> affected = new ArrayList<>();
for (Recording rec : manager.getAll()) {
String r = rec.getAbsoluteFile().getAbsolutePath();
if (r.startsWith(f)) {
affected.add(rec);
}
}
return affected;
}
private void adjustPaths(List<Recording> affectedRecordings, Path from, Path to) {
for (Recording rec : affectedRecordings) {
String oldPath = rec.getAbsoluteFile().getAbsolutePath();
String newPath = oldPath.replace(from.toString(), to.toString());
String recordingsDir = Config.getInstance().getSettings().recordingsDir;
String relativePath = newPath.replaceFirst(Pattern.quote(recordingsDir), "");
LOG.debug("Recording path has changed {} -> {}", rec.getPath(), relativePath);
rec.setPath(relativePath);
try {
manager.saveRecording(rec);
} catch (IOException e) {
LOG.error("Couldn't update recording path in meta data file", e);
}
}
}
private void handleFileCreation(Path child) {
LOG.trace("File created {}", child);
}
private void handleFileDeletion(Path child) {
LOG.trace("File deleted {}", child);
}
private void handleDirCreation(Path dir) {
try {
registerAll(dir);
LOG.trace("Directory added {}", dir);
} catch (IOException x) {
// ignore to keep sample readbale
}
}
private void handleDirDeletion(Path dir) {
// TODO unregister key ?!?
// only delete directories, which have actually been deleted
if(Files.notExists(dir, LinkOption.NOFOLLOW_LINKS)) {
LOG.trace("Directory Deleted {}", dir);
}
}
private boolean isRenameProcess(List<WatchEvent<?>> events) {
if(events.size() == 2) {
boolean deleteFirst = events.get(0).kind() == ENTRY_DELETE;
boolean createSecond = events.get(1).kind() == ENTRY_CREATE;
return deleteFirst && createSecond;
} else {
return false;
}
}
/**
* Register the given directory, and all its sub-directories, with the
* WatchService.
*/
private void registerAll(final Path start) throws IOException {
// register directory and sub-directories
Files.walkFileTree(start, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
register(dir);
return FileVisitResult.CONTINUE;
}
});
}
/**
* Register the given directory with the WatchService
*/
void register(Path dir) {
try {
WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE);
keys.put(key, dir);
LOG.debug("Monitor {}", dir);
} catch(IOException e) {
LOG.warn("Couldn't register directory monitor for directory {}", dir, e);
}
}
public void unregister(Path path) {
}
@SuppressWarnings("unchecked")
static <T> WatchEvent<T> cast(WatchEvent<?> event) {
return (WatchEvent<T>) event;
}
public void addDirectory(Path dir) throws IOException {
LOG.info("Adding monitor for {}", dir);
registerAll(dir);
}
public void stop() throws IOException {
running = false;
watcher.close();
}
}

View File

@ -0,0 +1,186 @@
package ctbrec.recorder;
import static java.nio.file.StandardOpenOption.*;
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.file.Files;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.squareup.moshi.JsonAdapter;
import com.squareup.moshi.Moshi;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.Recording;
import ctbrec.Recording.State;
import ctbrec.io.InstantJsonAdapter;
import ctbrec.io.ModelJsonAdapter;
import ctbrec.sites.Site;
public class RecordingManager {
private static final transient Logger LOG = LoggerFactory.getLogger(RecordingManager.class);
private Config config;
private Moshi moshi;
private JsonAdapter<Recording> adapter;
private List<Recording> recordings = new ArrayList<>();
// private RecordingFileMonitor monitor = new RecordingFileMonitor(this);
public RecordingManager(Config config, List<Site> sites) throws IOException {
this.config = config;
moshi = new Moshi.Builder()
.add(Model.class, new ModelJsonAdapter(sites))
.add(Instant.class, new InstantJsonAdapter())
.build();
adapter = moshi.adapter(Recording.class).indent(" ");
loadRecordings();
// startMonitoring();
}
public void add(Recording rec) throws UnsupportedEncodingException, IOException {
saveRecording(rec);
recordings.add(rec);
// registerFileWatch(rec);
}
public void saveRecording(Recording rec) throws UnsupportedEncodingException, IOException {
String json = adapter.toJson(rec);
File recordingsMetaDir = getDir();
Files.createDirectories(recordingsMetaDir.toPath());
String filename = rec.toString() + ".json";
File recordingMetaData = new File(recordingsMetaDir, filename);
rec.setMetaDataFile(recordingMetaData.getAbsolutePath());
Files.write(recordingMetaData.toPath(), json.getBytes("utf-8"), CREATE, WRITE, TRUNCATE_EXISTING);
}
private void loadRecordings() throws IOException {
File recordingsMetaDir = getDir();
File[] metaFiles = recordingsMetaDir.listFiles((file, name) -> name.endsWith(".json"));
if (metaFiles != null) {
for (File file : metaFiles) {
String json = new String(Files.readAllBytes(file.toPath()), "utf-8");
Recording recording = adapter.fromJson(json);
if (recording.getStatus() == State.RECORDING) {
recording.setStatus(State.WAITING);
}
if (recordingExists(recording)) {
recordings.add(recording);
} else {
LOG.info("Recording {} does not exist anymore -> deleting meta data", recording);
Files.deleteIfExists(new File(recording.getMetaDataFile()).toPath());
}
}
}
}
// private void startMonitoring() {
// for (Recording recording : recordings) {
// registerFileWatch(recording);
// }
// Thread watcher = new Thread(() -> monitor.processEvents());
// watcher.setDaemon(true);
// watcher.setPriority(Thread.MIN_PRIORITY);
// watcher.setName("RecordingFileMonitor");
// watcher.start();
// }
//
// private void registerFileWatch(Recording recording) {
// File rec = recording.getAbsoluteFile();
// if (rec.isDirectory()) {
// monitor.register(rec.toPath());
// } else {
// monitor.register(rec.getParentFile().toPath());
// }
// }
//
// private void removeFileWatch(Recording recording) {
// File rec = recording.getAbsoluteFile();
// if (rec.isDirectory()) {
// monitor.unregister(rec.toPath());
// } else {
// monitor.unregister(rec.getParentFile().toPath());
// }
// }
private boolean recordingExists(Recording recording) {
File rec = new File(config.getSettings().recordingsDir, recording.getPath());
return rec.exists();
}
private File getDir() {
File configDir = config.getConfigDir();
File recordingsMetaDir = new File(configDir, "recordings");
return recordingsMetaDir;
}
public void delete(Recording recording) throws IOException {
recording.setStatus(State.DELETING);
File recordingsDir = new File(config.getSettings().recordingsDir);
File path = new File(recordingsDir, recording.getPath());
LOG.debug("Deleting {}", path);
// delete the video files
if (path.isFile()) {
Files.delete(path.toPath());
deleteEmptyParents(path.getParentFile());
} else {
deleteDirectory(path);
deleteEmptyParents(path);
}
// delete the meta data
Files.deleteIfExists(new File(recording.getMetaDataFile()).toPath());
// remove from data structure
recordings.remove(recording);
recording.setStatus(State.DELETED);
// removeFileWatch(recording);
}
public List<Recording> getAll() {
return new ArrayList<>(recordings);
}
private void deleteEmptyParents(File parent) throws IOException {
File recDir = new File(Config.getInstance().getSettings().recordingsDir);
while (parent != null && parent.list() != null && parent.list().length == 0) {
if (parent.equals(recDir)) {
return;
}
LOG.debug("Deleting empty directory {}", parent.getAbsolutePath());
Files.delete(parent.toPath());
parent = parent.getParentFile();
}
}
private void deleteDirectory(File directory) throws IOException {
if (!directory.exists()) {
return;
}
File[] files = directory.listFiles();
boolean deletedAllFiles = true;
for (File file : files) {
try {
LOG.trace("Deleting {}", file.getAbsolutePath());
Files.delete(file.toPath());
} catch (Exception e) {
deletedAllFiles = false;
LOG.debug("Couldn't delete {}", file, e);
}
}
if (!deletedAllFiles) {
throw new IOException("Couldn't delete all files in " + directory);
}
}
}

View File

@ -283,7 +283,7 @@ public class RemoteRecorder implements Recorder {
if(newRecording.getStatus() != recording.getStatus()) {
File file = new File(recording.getPath());
Model m = new UnknownModel();
m.setName(newRecording.getModelName());
m.setName(newRecording.getModel().getName());
RecordingStateChangedEvent evt = new RecordingStateChangedEvent(file, newRecording.getStatus(), m, recording.getStartDate());
EventBusHolder.BUS.post(evt);
}
@ -432,26 +432,4 @@ public class RemoteRecorder implements Recorder {
public long getFreeSpaceBytes() {
return spaceFree;
}
@Override
public void regeneratePlaylist(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
String msg = "{\"action\": \"regeneratePlaylist\", \"recording\": \""+recording.getPath()+"\"}";
RequestBody body = RequestBody.create(JSON, msg);
Request.Builder builder = new Request.Builder()
.url("http://" + config.getSettings().httpServer + ":" + config.getSettings().httpPort + "/rec")
.post(body);
addHmacIfNeeded(msg, builder);
Request request = builder.build();
try(Response response = client.execute(request)) {
String json = response.body().string();
RecordingListResponse resp = recordingListResponseAdapter.fromJson(json);
if(response.isSuccessful()) {
if(!resp.status.equals("success")) {
throw new IOException("Couldn't regenerate playlist for recording: " + resp.msg);
}
} else {
throw new IOException("Couldn't regenerate playlist recording: " + resp.msg);
}
}
}
}

View File

@ -15,6 +15,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -35,6 +36,8 @@ import com.iheartradio.m3u8.data.TrackData;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.OS;
import ctbrec.Recording;
import ctbrec.Recording.State;
import ctbrec.UnknownModel;
import ctbrec.io.HttpClient;
import ctbrec.io.HttpException;
@ -45,19 +48,30 @@ import okhttp3.Response;
public abstract class AbstractHlsDownload implements Download {
private static final transient Logger LOG = LoggerFactory.getLogger(AbstractHlsDownload.class);
private static int threadCounter = 0;
protected HttpClient client;
protected volatile boolean running = false;
protected volatile boolean alive = true;
protected Instant startTime;
protected Model model = new UnknownModel();
protected BlockingQueue<Runnable> downloadQueue = new LinkedBlockingQueue<>(50);
protected ExecutorService downloadThreadPool = new ThreadPoolExecutor(5, 5, 2, TimeUnit.MINUTES, downloadQueue);
protected ExecutorService downloadThreadPool = new ThreadPoolExecutor(5, 5, 2, TimeUnit.MINUTES, downloadQueue, createThreadFactory());
protected State state = State.UNKNOWN;
private int playlistEmptyCount = 0;
public AbstractHlsDownload(HttpClient client) {
this.client = client;
}
private ThreadFactory createThreadFactory() {
return r -> {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("SegmentDownloadThread-" + threadCounter++);
return t;
};
}
protected SegmentPlaylist getNextSegments(String segmentsURL) throws IOException, ParseException, PlaylistException {
URL segmentsUrl = new URL(segmentsURL);
Request request = new Request.Builder()
@ -148,11 +162,24 @@ public abstract class AbstractHlsDownload implements Download {
return url;
}
@Override
public boolean isAlive() {
return alive;
protected void emptyPlaylistCheck(SegmentPlaylist playlist) {
if(playlist.segments.isEmpty()) {
playlistEmptyCount++;
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
}
} else {
playlistEmptyCount = 0;
}
if(playlistEmptyCount == 10) {
LOG.info("Last 10 playlists were empty for {}. Stopping recording!", getModel());
internalStop();
}
}
abstract void internalStop();
@Override
public Instant getStartTime() {
return startTime;
@ -164,8 +191,8 @@ public abstract class AbstractHlsDownload implements Download {
}
@Override
public void postprocess(File target) {
runPostProcessingScript(target);
public void postprocess(Recording recording) {
runPostProcessingScript(recording.getAbsoluteFile());
}
private void runPostProcessingScript(File target) {

View File

@ -6,13 +6,29 @@ import java.time.Instant;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.Recording;
public interface Download {
public void start(Model model, Config config) throws IOException;
public void init(Config config, Model model);
public void start() throws IOException;
public void stop();
public boolean isAlive();
public File getTarget();
public Model getModel();
public Instant getStartTime();
public void postprocess(File target);
public void postprocess(Recording recording);
/**
* Returns the path to the recording in the filesystem as file object
* @param model
* @return
* @see #getPath(Model)
*/
public File getTarget();
/**
* Returns the path to the recording starting from the configured recordings directory
* @param model
* @return
* @see #getTarget()
*/
public String getPath(Model model);
}

View File

@ -16,23 +16,33 @@ import java.nio.file.Path;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.iheartradio.m3u8.ParseException;
import com.iheartradio.m3u8.PlaylistError;
import com.iheartradio.m3u8.PlaylistException;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.Recording;
import ctbrec.event.EventBusHolder;
import ctbrec.event.RecordingStateChangedEvent;
import ctbrec.io.HttpClient;
import ctbrec.io.HttpException;
import ctbrec.recorder.PlaylistGenerator;
import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException;
import okhttp3.Request;
import okhttp3.Response;
@ -45,21 +55,29 @@ public class HlsDownload extends AbstractHlsDownload {
private int segmentCounter = 1;
private NumberFormat nf = new DecimalFormat("000000");
private Object downloadFinished = new Object();
private ZonedDateTime splitRecStartTime;
private Config config;
public HlsDownload(HttpClient client) {
super(client);
}
@Override
public void start(Model model, Config config) throws IOException {
public void init(Config config, Model model) {
this.config = config;
super.model = model;
startTime = Instant.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.RECORDING_DATE_FORMAT);
String startTime = formatter.format(this.startTime);
Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getSanitizedNamed());
downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), startTime);
}
@Override
public void start() throws IOException {
try {
running = true;
startTime = Instant.now();
super.model = model;
SimpleDateFormat sdf = new SimpleDateFormat(Config.RECORDING_DATE_FORMAT);
String startTime = sdf.format(new Date());
Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getSanitizedNamed());
downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), startTime);
splitRecStartTime = ZonedDateTime.now();
if(!model.isOnline()) {
throw new IOException(model.getName() +"'s room is not public");
@ -74,30 +92,36 @@ public class HlsDownload extends AbstractHlsDownload {
if (!Files.exists(downloadDir, LinkOption.NOFOLLOW_LINKS)) {
Files.createDirectories(downloadDir);
}
int lastSegment = 0;
int nextSegment = 0;
int lastSegmentNumber = 0;
int nextSegmentNumber = 0;
int waitFactor = 1;
while(running) {
SegmentPlaylist playlist = getNextSegments(segments);
if(nextSegment > 0 && playlist.seq > nextSegment) {
emptyPlaylistCheck(playlist);
if(nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) {
// TODO switch to a lower bitrate/resolution ?!?
waitFactor *= 2;
LOG.warn("Missed segments {} < {} in download for {} - setting wait factor to 1/{}", nextSegment, playlist.seq, model, waitFactor);
LOG.warn("Missed segments {} < {} in download for {} - setting wait factor to 1/{}", nextSegmentNumber, playlist.seq, model, waitFactor);
}
int skip = nextSegment - playlist.seq;
int skip = nextSegmentNumber - playlist.seq;
Future<Boolean> lastSegmentDownload = null;
for (String segment : playlist.segments) {
if(skip > 0) {
skip--;
} else {
URL segmentUrl = new URL(segment);
String prefix = nf.format(segmentCounter++);
downloadThreadPool.submit(new SegmentDownload(playlist, segmentUrl, downloadDir, client, prefix));
SegmentDownload segmentDownload = new SegmentDownload(playlist, segmentUrl, downloadDir, client, prefix);
lastSegmentDownload = downloadThreadPool.submit(segmentDownload);
//new SegmentDownload(segment, downloadDir).call();
}
}
// split recordings
splitRecording(lastSegmentDownload);
long wait = 0;
if(lastSegment == playlist.seq) {
if(lastSegmentNumber == playlist.seq) {
// playlist didn't change -> wait for at least half the target duration
wait = (long) playlist.targetDuration * 1000 / waitFactor;
LOG.trace("Playlist didn't change... waiting for {}ms", wait);
@ -117,9 +141,9 @@ public class HlsDownload extends AbstractHlsDownload {
// this if check makes sure, that we don't decrease nextSegment. for some reason
// streamate playlists sometimes jump back. e.g. max sequence = 79 -> 80 -> 79
lastSegment = playlist.seq;
if(lastSegment + playlist.segments.size() > nextSegment) {
nextSegment = lastSegment + playlist.segments.size();
lastSegmentNumber = playlist.seq;
if(lastSegmentNumber + playlist.segments.size() > nextSegmentNumber) {
nextSegmentNumber = lastSegmentNumber + playlist.segments.size();
}
}
} else {
@ -148,7 +172,6 @@ public class HlsDownload extends AbstractHlsDownload {
LOG.debug("Waiting for last segments for {}", model);
downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {}
alive = false;
synchronized (downloadFinished) {
downloadFinished.notifyAll();
}
@ -156,9 +179,81 @@ public class HlsDownload extends AbstractHlsDownload {
}
}
@Override
public void postprocess(Recording recording) {
generatePlaylist(recording.getAbsoluteFile());
super.postprocess(recording);
}
private void generatePlaylist(File recDir) {
if(!config.getSettings().generatePlaylist) {
return;
}
PlaylistGenerator playlistGenerator = new PlaylistGenerator();
try {
File playlist = playlistGenerator.generate(recDir);
if(playlist != null) {
playlistGenerator.validate(recDir);
}
} catch (IOException | ParseException e) {
LOG.error("Couldn't generate playlist file", e);
} catch (PlaylistException e) {
if(e.getErrors().isEmpty()) {
LOG.error("Couldn't generate playlist file", e);
} else {
LOG.error("Playlist contains errors");
for (PlaylistError error : e.getErrors()) {
LOG.error("Error: {}", error.toString());
}
}
} catch (InvalidPlaylistException e) {
LOG.error("Playlist is invalid and will be deleted", e);
File playlist = new File(recDir, "playlist.m3u8");
playlist.delete();
}
}
private void splitRecording(Future<Boolean> lastSegmentDownload) {
if(config.getSettings().splitRecordings > 0) {
Duration recordingDuration = Duration.between(splitRecStartTime, ZonedDateTime.now());
long seconds = recordingDuration.getSeconds();
if(seconds >= config.getSettings().splitRecordings) {
File lastTargetFile = downloadDir.toFile();
// switch to the next dir
SimpleDateFormat sdf = new SimpleDateFormat(Config.RECORDING_DATE_FORMAT);
super.startTime = Instant.now();
String startTime = sdf.format(new Date());
Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getSanitizedNamed());
LOG.debug("Switching to {}", downloadDir);
downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), startTime);
downloadDir.toFile().mkdirs();
splitRecStartTime = ZonedDateTime.now();
// post-process current recording
LOG.debug("Running post-processing for {}", lastTargetFile);
Thread pp = new Thread(() -> {
if(lastSegmentDownload != null) {
// wait for last segment in this directory
try {
lastSegmentDownload.get();
} catch (InterruptedException | ExecutionException e) {
LOG.error("Couldn't wait for last segment to arrive in this directory. Playlist might be inclomplete", e);
}
}
});
pp.setName("Post-Processing split recording");
pp.setPriority(Thread.MIN_PRIORITY);
pp.start();
}
}
}
@Override
public void stop() {
running = false;
internalStop();
try {
synchronized (downloadFinished) {
downloadFinished.wait();
@ -168,6 +263,11 @@ public class HlsDownload extends AbstractHlsDownload {
}
}
@Override
void internalStop() {
running = false;
}
private static class SegmentDownload implements Callable<Boolean> {
private URL url;
private Path file;
@ -225,4 +325,12 @@ public class HlsDownload extends AbstractHlsDownload {
public File getTarget() {
return downloadDir.toFile();
}
@Override
public String getPath(Model model) {
String absolutePath = getTarget().getAbsolutePath();
String recordingsDir = Config.getInstance().getSettings().recordingsDir;
String relativePath = absolutePath.replaceFirst(Pattern.quote(recordingsDir), "");
return relativePath;
}
}

View File

@ -1,6 +1,5 @@
package ctbrec.recorder.download;
import static ctbrec.Recording.State.*;
import static java.nio.file.StandardOpenOption.*;
import java.io.ByteArrayInputStream;
@ -26,6 +25,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,8 +41,6 @@ import com.iheartradio.m3u8.PlaylistException;
import ctbrec.Config;
import ctbrec.Hmac;
import ctbrec.Model;
import ctbrec.event.EventBusHolder;
import ctbrec.event.RecordingStateChangedEvent;
import ctbrec.io.HttpClient;
import ctbrec.io.HttpException;
import ctbrec.recorder.ProgressListener;
@ -66,6 +64,13 @@ public class MergedHlsDownload extends AbstractHlsDownload {
super(client);
}
@Override
public void init(Config config, Model model) {
this.config = config;
this.model = model;
targetFile = Config.getInstance().getFileForRecording(model, "ts");
}
@Override
public File getTarget() {
return targetFile;
@ -110,7 +115,6 @@ public class MergedHlsDownload extends AbstractHlsDownload {
LOG.debug("Waiting for last segments for {}", model);
downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {}
alive = false;
synchronized (downloadFinished) {
downloadFinished.notifyAll();
}
@ -119,8 +123,7 @@ public class MergedHlsDownload extends AbstractHlsDownload {
}
@Override
public void start(Model model, Config config) throws IOException {
this.config = config;
public void start() throws IOException {
try {
if(!model.isOnline(IGNORE_CACHE)) {
throw new IOException(model.getName() +"'s room is not public");
@ -130,20 +133,15 @@ public class MergedHlsDownload extends AbstractHlsDownload {
super.startTime = Instant.now();
splitRecStartTime = ZonedDateTime.now();
super.model = model;
targetFile = Config.getInstance().getFileForRecording(model, "ts");
// let the world know, that we are recording now
RecordingStateChangedEvent evt = new RecordingStateChangedEvent(getTarget(), RECORDING, model, getStartTime());
EventBusHolder.BUS.post(evt);
String segments = getSegmentPlaylistUrl(model);
mergeThread = createMergeThread(targetFile, null, true);
mergeThread.start();
if(segments != null) {
downloadSegments(segments, true);
if(config.getSettings().splitRecordings > 0) {
if (segments != null) {
if (config.getSettings().splitRecordings > 0) {
LOG.debug("Splitting recordings every {} seconds", config.getSettings().splitRecordings);
}
downloadSegments(segments, true);
} else {
throw new IOException("Couldn't determine segments uri");
}
@ -157,10 +155,10 @@ public class MergedHlsDownload extends AbstractHlsDownload {
} catch(Exception e) {
throw new IOException("Couldn't download segment", e);
} finally {
if(streamer != null) {
if (streamer != null) {
try {
streamer.stop();
} catch(Exception e) {
} catch (Exception e) {
LOG.error("Couldn't stop streamer", e);
}
}
@ -169,8 +167,8 @@ public class MergedHlsDownload extends AbstractHlsDownload {
LOG.debug("Waiting for last segments for {}", model);
downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {}
alive = false;
synchronized (downloadFinished) {
LOG.debug("Download finished notify {}", model);
downloadFinished.notifyAll();
}
LOG.debug("Download for {} terminated", model);
@ -180,14 +178,10 @@ public class MergedHlsDownload extends AbstractHlsDownload {
private void downloadSegments(String segmentPlaylistUri, boolean livestreamDownload) throws IOException, ParseException, PlaylistException {
int lastSegment = 0;
int nextSegment = 0;
long playlistNotFoundFirstEncounter = -1;
while(running) {
try {
if(playlistNotFoundFirstEncounter != -1) {
LOG.debug("Downloading playlist {}", segmentPlaylistUri);
}
SegmentPlaylist lsp = getNextSegments(segmentPlaylistUri);
playlistNotFoundFirstEncounter = -1;
emptyPlaylistCheck(lsp);
if(!livestreamDownload) {
multiSource.setTotalSegments(lsp.segments.size());
}
@ -209,7 +203,10 @@ public class MergedHlsDownload extends AbstractHlsDownload {
if(livestreamDownload) {
// split up the recording, if configured
splitRecording();
boolean split = splitRecording();
if (split) {
break;
}
// wait some time until requesting the segment playlist again to not hammer the server
waitForNewSegments(lsp, lastSegment, downloadTookMillis);
@ -246,9 +243,6 @@ public class MergedHlsDownload extends AbstractHlsDownload {
private void downloadNewSegments(SegmentPlaylist lsp, int nextSegment) throws MalformedURLException, MissingSegmentException, ExecutionException, HttpException {
int skip = nextSegment - lsp.seq;
if(lsp.segments.isEmpty()) {
LOG.debug("Empty playlist: {}", lsp.url);
}
// add segments to download threadpool
Queue<Future<byte[]>> downloads = new LinkedList<>();
@ -319,34 +313,37 @@ public class MergedHlsDownload extends AbstractHlsDownload {
multiSource.addSource(source);
}
private void splitRecording() {
private boolean splitRecording() {
if(config.getSettings().splitRecordings > 0) {
Duration recordingDuration = Duration.between(splitRecStartTime, ZonedDateTime.now());
long seconds = recordingDuration.getSeconds();
if(seconds >= config.getSettings().splitRecordings) {
try {
File lastTargetFile = targetFile;
// switch to the next file
targetFile = Config.getInstance().getFileForRecording(model, "ts");
LOG.debug("Switching to file {}", targetFile.getAbsolutePath());
fileChannel = FileChannel.open(targetFile.toPath(), CREATE, WRITE);
MTSSink sink = ByteChannelSink.builder().setByteChannel(fileChannel).build();
streamer.switchSink(sink);
super.startTime = Instant.now();
splitRecStartTime = ZonedDateTime.now();
// post-process current recording
Thread pp = new Thread(() -> postprocess(lastTargetFile));
pp.setName("Post-Processing split recording");
pp.setPriority(Thread.MIN_PRIORITY);
pp.start();
} catch (IOException e) {
LOG.error("Error while splitting recording", e);
running = false;
}
internalStop();
return true;
// try {
// File lastTargetFile = targetFile;
//
// // switch to the next file
// targetFile = Config.getInstance().getFileForRecording(model, "ts");
// LOG.debug("Switching to file {}", targetFile.getAbsolutePath());
// fileChannel = FileChannel.open(targetFile.toPath(), CREATE, WRITE);
// MTSSink sink = ByteChannelSink.builder().setByteChannel(fileChannel).build();
// streamer.switchSink(sink);
// super.startTime = Instant.now();
// splitRecStartTime = ZonedDateTime.now();
//
// // post-process current recording
// Thread pp = new Thread(() -> postprocess(lastTargetFile));
// pp.setName("Post-Processing split recording");
// pp.setPriority(Thread.MIN_PRIORITY);
// pp.start();
// } catch (IOException e) {
// LOG.error("Error while splitting recording", e);
// running = false;
// }
}
}
return false;
}
private void waitForNewSegments(SegmentPlaylist lsp, int lastSegment, long downloadTookMillis) {
@ -377,12 +374,10 @@ public class MergedHlsDownload extends AbstractHlsDownload {
@Override
public void stop() {
running = false;
if(streamer != null) {
streamer.stop();
}
internalStop();
try {
synchronized (downloadFinished) {
LOG.debug("Waiting for finished notify {}", model);
downloadFinished.wait();
}
} catch (InterruptedException e) {
@ -391,6 +386,15 @@ public class MergedHlsDownload extends AbstractHlsDownload {
LOG.debug("Download stopped");
}
@Override
void internalStop() {
running = false;
if (streamer != null) {
streamer.stop();
streamer = null;
}
}
private Thread createMergeThread(File targetFile, ProgressListener listener, boolean liveStream) {
Thread t = new Thread(() -> {
multiSource = BlockingMultiMTSSource.builder()
@ -418,10 +422,10 @@ public class MergedHlsDownload extends AbstractHlsDownload {
streamer.stream();
LOG.debug("Streamer finished");
} catch (InterruptedException e) {
if(running) {
if (running) {
LOG.error("Error while waiting for a download future", e);
}
} catch(Exception e) {
} catch (Exception e) {
LOG.error("Error while saving stream to file", e);
} finally {
deleteEmptyRecording(targetFile);
@ -504,4 +508,12 @@ public class MergedHlsDownload extends AbstractHlsDownload {
return false;
}
}
@Override
public String getPath(Model model) {
String absolutePath = targetFile.getAbsolutePath();
String recordingsDir = Config.getInstance().getSettings().recordingsDir;
String relativePath = absolutePath.replaceFirst(Pattern.quote(recordingsDir), "");
return relativePath;
}
}

View File

@ -355,12 +355,12 @@ public class ChaturbateModel extends AbstractModel {
}
private void acquireSlot() throws InterruptedException {
LOG.debug("Acquire: {}", requestThrottle.availablePermits());
//LOG.debug("Acquire: {}", requestThrottle.availablePermits());
requestThrottle.acquire();
long now = System.currentTimeMillis();
long millisSinceLastRequest = now - lastRequest;
if(millisSinceLastRequest < 500) {
LOG.debug("Sleeping: {}", (500-millisSinceLastRequest));
//LOG.debug("Sleeping: {}", (500-millisSinceLastRequest));
Thread.sleep(500 - millisSinceLastRequest);
}
}
@ -368,6 +368,6 @@ public class ChaturbateModel extends AbstractModel {
private void releaseSlot() {
lastRequest = System.currentTimeMillis();
requestThrottle.release();
LOG.debug("Release: {}", requestThrottle.availablePermits());
//LOG.debug("Release: {}", requestThrottle.availablePermits());
}
}

View File

@ -19,11 +19,16 @@ public class Fc2HlsDownload extends HlsDownload {
}
@Override
public void start(Model model, Config config) throws IOException {
public void init(Config config, Model model) {
super.init(config, model);
}
@Override
public void start() throws IOException {
Fc2Model fc2Model = (Fc2Model) model;
try {
fc2Model.openWebsocket();
super.start(model, config);
super.start();
} catch (InterruptedException e) {
LOG.error("Couldn't start download for {}", model, e);
} finally {

View File

@ -19,11 +19,16 @@ public class Fc2MergedHlsDownload extends MergedHlsDownload {
}
@Override
public void start(Model model, Config config) throws IOException {
public void init(Config config, Model model) {
super.init(config, model);
}
@Override
public void start() throws IOException {
Fc2Model fc2Model = (Fc2Model) model;
try {
fc2Model.openWebsocket();
super.start(model, config);
super.start();
} catch (InterruptedException e) {
LOG.error("Couldn't start download for {}", model, e);
} finally {

View File

@ -470,12 +470,12 @@ public class Flirt4FreeModel extends AbstractModel {
}
private void acquireSlot() throws InterruptedException {
LOG.debug("Acquire: {}", requestThrottle.availablePermits());
//LOG.debug("Acquire: {}", requestThrottle.availablePermits());
requestThrottle.acquire();
long now = System.currentTimeMillis();
long millisSinceLastRequest = now - lastRequest;
if(millisSinceLastRequest < 500) {
LOG.debug("Sleeping: {}", (500-millisSinceLastRequest));
//LOG.debug("Sleeping: {}", (500-millisSinceLastRequest));
Thread.sleep(500 - millisSinceLastRequest);
}
}
@ -483,6 +483,6 @@ public class Flirt4FreeModel extends AbstractModel {
private void releaseSlot() {
lastRequest = System.currentTimeMillis();
requestThrottle.release();
LOG.debug("Release: {}", requestThrottle.availablePermits());
//LOG.debug("Release: {}", requestThrottle.availablePermits());
}
}

View File

@ -8,6 +8,7 @@ import java.net.URLEncoder;
import java.nio.file.Files;
import java.time.Instant;
import java.util.Random;
import java.util.regex.Pattern;
import org.json.JSONArray;
import org.json.JSONObject;
@ -16,7 +17,7 @@ import org.slf4j.LoggerFactory;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.Recording.State;
import ctbrec.Recording;
import ctbrec.io.HttpClient;
import ctbrec.recorder.download.Download;
import okhttp3.Request;
@ -55,11 +56,14 @@ public class LiveJasminChunkedHttpDownload implements Download {
}
@Override
public void start(Model model, Config config) throws IOException {
public void init(Config config, Model model) {
this.model = model;
startTime = Instant.now();
targetFile = config.getFileForRecording(model, "mp4");
this.startTime = Instant.now();
this.targetFile = config.getFileForRecording(model, "mp4");
}
@Override
public void start() throws IOException {
getPerformerDetails(model.getName());
try {
getStreamPath();
@ -287,11 +291,14 @@ public class LiveJasminChunkedHttpDownload implements Download {
}
@Override
public void postprocess(File target) {
public void postprocess(Recording recording) {
}
@Override
public State getState() {
return State.UNKNOWN;
public String getPath(Model model) {
String absolutePath = targetFile.getAbsolutePath();
String recordingsDir = Config.getInstance().getSettings().recordingsDir;
String relativePath = absolutePath.replaceFirst(Pattern.quote(recordingsDir), "");
return relativePath;
}
}

View File

@ -7,6 +7,7 @@ import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.file.Files;
import java.time.Instant;
import java.util.regex.Pattern;
import org.json.JSONArray;
import org.json.JSONObject;
@ -17,7 +18,7 @@ import com.google.common.eventbus.Subscribe;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.Recording.State;
import ctbrec.Recording;
import ctbrec.event.Event;
import ctbrec.event.EventBusHolder;
import ctbrec.event.ModelStateChangedEvent;
@ -56,11 +57,14 @@ public class LiveJasminWebSocketDownload implements Download {
}
@Override
public void start(Model model, Config config) throws IOException {
public void init(Config config, Model model) {
this.model = model;
startTime = Instant.now();
targetFile = config.getFileForRecording(model, "mp4");
this.startTime = Instant.now();
this.targetFile = config.getFileForRecording(model, "mp4");
}
@Override
public void start() throws IOException {
getPerformerDetails(model.getName());
LOG.debug("appid: {}", applicationId);
LOG.debug("sessionid: {}",sessionId);
@ -351,11 +355,14 @@ public class LiveJasminWebSocketDownload implements Download {
}
@Override
public void postprocess(File target) {
public void postprocess(Recording recording) {
}
@Override
public State getState() {
return State.UNKNOWN;
public String getPath(Model model) {
String absolutePath = targetFile.getAbsolutePath();
String recordingsDir = Config.getInstance().getSettings().recordingsDir;
String relativePath = absolutePath.replaceFirst(Pattern.quote(recordingsDir), "");
return relativePath;
}
}

View File

@ -1,7 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="src/main/java"/>
<classpathentry kind="src" path="src/main/resources"/>
<classpathentry kind="src" output="target/classes" path="src/main/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8">
<attributes>
<attribute name="maven.pomderived" value="true"/>
@ -12,5 +21,12 @@
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="src" output="target/test-classes" path="src/test/java">
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>

View File

@ -1,2 +1,4 @@
eclipse.preferences.version=1
encoding//src/main/java=UTF-8
encoding//src/main/resources=UTF-8
encoding/<project>=UTF-8

View File

@ -24,7 +24,7 @@ import ctbrec.Version;
import ctbrec.event.EventBusHolder;
import ctbrec.event.EventHandler;
import ctbrec.event.EventHandlerConfiguration;
import ctbrec.recorder.LocalRecorder;
import ctbrec.recorder.NextGenLocalRecorder;
import ctbrec.recorder.OnlineMonitor;
import ctbrec.recorder.Recorder;
import ctbrec.sites.Site;
@ -69,7 +69,7 @@ public class HttpServer {
if(config.getSettings().key != null) {
LOG.info("HMAC authentication is enabled");
}
recorder = new LocalRecorder(config);
recorder = new NextGenLocalRecorder(config, sites);
for (Site site : sites) {
if(site.isEnabled()) {
site.init();

View File

@ -3,6 +3,8 @@ package ctbrec.recorder.server;
import static javax.servlet.http.HttpServletResponse.*;
import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
@ -68,8 +70,14 @@ public class RecorderServlet extends AbstractCtbrecServlet {
resp.getWriter().write(response);
break;
case "stop":
response = "{\"status\": \"success\", \"msg\": \"Recording stopped\"}";
recorder.stopRecording(request.model);
new Thread(() -> {
try {
recorder.stopRecording(request.model);
} catch (InvalidKeyException | NoSuchAlgorithmException | IllegalStateException | IOException e) {
LOG.error("Couldn't stop recording for model {}", request.model, e);
}
}).start();
response = "{\"status\": \"success\", \"msg\": \"Stopping recording\"}";
resp.getWriter().write(response);
break;
case "list":
@ -113,22 +121,14 @@ public class RecorderServlet extends AbstractCtbrecServlet {
break;
case "delete":
String path = request.recording;
Recording rec = new Recording(path);
Recording rec = new Recording();
rec.setPath(path);
recorder.delete(rec);
recAdapter = moshi.adapter(Recording.class);
resp.getWriter().write("{\"status\": \"success\", \"msg\": \"List of recordings\", \"recordings\": [");
resp.getWriter().write(recAdapter.toJson(rec));
resp.getWriter().write("]}");
break;
case "regeneratePlaylist":
path = request.recording;
rec = new Recording(path);
recorder.regeneratePlaylist(rec);
recAdapter = moshi.adapter(Recording.class);
resp.getWriter().write("{\"status\": \"success\", \"msg\": \"List of recordings\", \"recordings\": [");
resp.getWriter().write(recAdapter.toJson(rec));
resp.getWriter().write("]}");
break;
case "switch":
recorder.switchStreamSource(request.model);
response = "{\"status\": \"success\", \"msg\": \"Resolution switched\"}";
@ -136,8 +136,14 @@ public class RecorderServlet extends AbstractCtbrecServlet {
break;
case "suspend":
LOG.debug("Suspend recording for model {} - {}", request.model.getName(), request.model.getUrl());
recorder.suspendRecording(request.model);
response = "{\"status\": \"success\", \"msg\": \"Recording suspended\"}";
new Thread(() -> {
try {
recorder.suspendRecording(request.model);
} catch (InvalidKeyException | NoSuchAlgorithmException | IllegalStateException | IOException e) {
LOG.error("Couldn't suspend recording for model {}", request.model, e);
}
}).start();
response = "{\"status\": \"success\", \"msg\": \"Suspending recording\"}";
resp.getWriter().write(response);
break;
case "resume":