From 0f3d0b6337103e95b55e309e389ee57571e6bc03 Mon Sep 17 00:00:00 2001 From: 0xboobface <0xboobface@gmail.com> Date: Fri, 31 May 2019 20:00:07 +0200 Subject: [PATCH] Rewrite recording code for local recording Remote recording is TBD --- CHANGELOG.md | 9 +- .../java/ctbrec/ui/CamrecApplication.java | 14 +- .../main/java/ctbrec/ui/JavaFxRecording.java | 45 +- client/src/main/java/ctbrec/ui/Player.java | 4 + .../java/ctbrec/ui/RecordedModelsTab.java | 7 +- .../main/java/ctbrec/ui/RecordingsTab.java | 20 +- client/src/main/java/ctbrec/ui/ThumbCell.java | 4 +- .../main/java/ctbrec/ui/ThumbOverviewTab.java | 14 +- .../ctbrec/ui/action/ModelMassEditAction.java | 14 +- .../java/ctbrec/ui/action/PlayAction.java | 7 +- .../ui/sites/camsoda/CamsodaSiteUi.java | 5 - client/src/main/resources/logback.xml | 1 + common/src/main/java/ctbrec/Recording.java | 198 +++- .../java/ctbrec/recorder/LocalRecorder.java | 852 ------------------ .../ctbrec/recorder/NextGenLocalRecorder.java | 559 ++++++++++++ .../ctbrec/recorder/PlaylistGenerator.java | 2 +- .../main/java/ctbrec/recorder/Recorder.java | 14 +- .../ctbrec/recorder/RecordingFileMonitor.java | 236 +++++ .../ctbrec/recorder/RecordingManager.java | 186 ++++ .../java/ctbrec/recorder/RemoteRecorder.java | 24 +- .../download/AbstractHlsDownload.java | 41 +- .../ctbrec/recorder/download/Download.java | 24 +- .../ctbrec/recorder/download/HlsDownload.java | 146 ++- .../recorder/download/MergedHlsDownload.java | 120 +-- .../sites/chaturbate/ChaturbateModel.java | 6 +- .../ctbrec/sites/fc2live/Fc2HlsDownload.java | 9 +- .../sites/fc2live/Fc2MergedHlsDownload.java | 9 +- .../sites/flirt4free/Flirt4FreeModel.java | 6 +- .../jasmin/LiveJasminChunkedHttpDownload.java | 21 +- .../jasmin/LiveJasminWebSocketDownload.java | 21 +- server/.classpath | 20 +- .../org.eclipse.core.resources.prefs | 2 + .../ctbrec/recorder/server/HttpServer.java | 4 +- .../recorder/server/RecorderServlet.java | 34 +- 34 files changed, 1568 insertions(+), 1110 deletions(-) delete mode 100644 common/src/main/java/ctbrec/recorder/LocalRecorder.java create mode 100644 common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java create mode 100644 common/src/main/java/ctbrec/recorder/RecordingFileMonitor.java create mode 100644 common/src/main/java/ctbrec/recorder/RecordingManager.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 92bd4d3b..594e32c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,10 @@ 1.21.2 ======================== -* Fix: Downloads not working in client/server setup -* Fix: Playlist generation and post-processing was not executed on server - shutdown -* Fix: Post-Processing for split recordings +* Added split recordings for the server +* Fix: Downloads not working in client/server setup (regression in last version) +* Fix: post-processing for split recordings +* Fix: All recordings are finished properly on shutdown (with playlist + generation on the server and post-processing) 1.21.1 ======================== diff --git a/client/src/main/java/ctbrec/ui/CamrecApplication.java b/client/src/main/java/ctbrec/ui/CamrecApplication.java index 36f6e466..fa489ae0 100644 --- a/client/src/main/java/ctbrec/ui/CamrecApplication.java +++ b/client/src/main/java/ctbrec/ui/CamrecApplication.java @@ -32,7 +32,7 @@ import ctbrec.event.EventBusHolder; import ctbrec.event.EventHandler; import ctbrec.event.EventHandlerConfiguration; import ctbrec.io.HttpClient; -import ctbrec.recorder.LocalRecorder; +import ctbrec.recorder.NextGenLocalRecorder; import ctbrec.recorder.OnlineMonitor; import ctbrec.recorder.Recorder; import ctbrec.recorder.RemoteRecorder; @@ -174,6 +174,7 @@ public class CamrecApplication extends Application { primaryStage.setMaximized(Config.getInstance().getSettings().windowMaximized); primaryStage.maximizedProperty() .addListener((observable, oldVal, newVal) -> Config.getInstance().getSettings().windowMaximized = newVal.booleanValue()); + Player.scene = primaryStage.getScene(); primaryStage.setX(Config.getInstance().getSettings().windowX); primaryStage.setY(Config.getInstance().getSettings().windowY); primaryStage.xProperty().addListener((observable, oldVal, newVal) -> Config.getInstance().getSettings().windowX = newVal.intValue()); @@ -318,7 +319,16 @@ public class CamrecApplication extends Application { private void createRecorder() { if (config.getSettings().localRecording) { - recorder = new LocalRecorder(config); + //recorder = new LocalRecorder(config); + try { + recorder = new NextGenLocalRecorder(config, sites); + } catch (IOException e) { + LOG.error("Couldn't initialize recorder", e); + Alert alert = new AutosizeAlert(Alert.AlertType.ERROR, primaryStage.getScene()); + alert.setTitle("Whoopsie"); + alert.setContentText("Couldn't initialize recorder: " + e.getLocalizedMessage()); + alert.showAndWait(); + } } else { recorder = new RemoteRecorder(config, httpClient, sites); } diff --git a/client/src/main/java/ctbrec/ui/JavaFxRecording.java b/client/src/main/java/ctbrec/ui/JavaFxRecording.java index e737ce8e..b9265034 100644 --- a/client/src/main/java/ctbrec/ui/JavaFxRecording.java +++ b/client/src/main/java/ctbrec/ui/JavaFxRecording.java @@ -3,6 +3,7 @@ package ctbrec.ui; import java.time.Instant; import ctbrec.Config; +import ctbrec.Model; import ctbrec.Recording; import javafx.beans.property.LongProperty; import javafx.beans.property.SimpleLongProperty; @@ -20,16 +21,19 @@ public class JavaFxRecording extends Recording { public JavaFxRecording(Recording recording) { this.delegate = recording; + setStatus(recording.getStatus()); + setSizeInByte(recording.getSizeInByte()); + setProgress(recording.getProgress()); } @Override - public String getModelName() { - return delegate.getModelName(); + public Model getModel() { + return delegate.getModel(); } @Override - public void setModelName(String modelName) { - delegate.setModelName(modelName); + public void setModel(Model model) { + delegate.setModel(model); } @Override @@ -73,7 +77,20 @@ public class JavaFxRecording extends Recording { case STOPPED: statusProperty.set("stopped"); break; + case DELETED: + statusProperty.set("deleted"); + break; + case DELETING: + statusProperty.set("deleting"); + break; + case FAILED: + statusProperty.set("failed"); + break; + case WAITING: + statusProperty.set("waiting"); + break; case UNKNOWN: + default: statusProperty.set("unknown"); break; } @@ -142,16 +159,6 @@ public class JavaFxRecording extends Recording { delegate.setPath(path); } - @Override - public boolean hasPlaylist() { - return delegate.hasPlaylist(); - } - - @Override - public void setHasPlaylist(boolean hasPlaylist) { - delegate.setHasPlaylist(hasPlaylist); - } - @Override public long getSizeInByte() { return delegate.getSizeInByte(); @@ -161,6 +168,16 @@ public class JavaFxRecording extends Recording { return sizeProperty; } + @Override + public void setMetaDataFile(String metaDataFile) { + delegate.setMetaDataFile(metaDataFile); + } + + @Override + public String getMetaDataFile() { + return delegate.getMetaDataFile(); + } + public boolean valueChanged() { boolean changed = getSizeInByte() != lastValue; lastValue = getSizeInByte(); diff --git a/client/src/main/java/ctbrec/ui/Player.java b/client/src/main/java/ctbrec/ui/Player.java index 516a5ac4..d5c4fec7 100644 --- a/client/src/main/java/ctbrec/ui/Player.java +++ b/client/src/main/java/ctbrec/ui/Player.java @@ -16,12 +16,15 @@ import ctbrec.Recording; import ctbrec.io.DevNull; import ctbrec.io.StreamRedirectThread; import ctbrec.recorder.download.StreamSource; +import ctbrec.ui.controls.Dialogs; import javafx.application.Platform; +import javafx.scene.Scene; import javafx.scene.control.Alert; public class Player { private static final transient Logger LOG = LoggerFactory.getLogger(Player.class); private static PlayerThread playerThread; + public static Scene scene; public static boolean play(String url) { return play(url, true); @@ -165,6 +168,7 @@ public class Player { LOG.debug("Media player finished."); } catch (Exception e) { LOG.error("Error in player thread", e); + Dialogs.showError(scene, "Playback failed", "Couldn't start playback", e); } running = false; } diff --git a/client/src/main/java/ctbrec/ui/RecordedModelsTab.java b/client/src/main/java/ctbrec/ui/RecordedModelsTab.java index 54df4fe4..45f41c43 100644 --- a/client/src/main/java/ctbrec/ui/RecordedModelsTab.java +++ b/client/src/main/java/ctbrec/ui/RecordedModelsTab.java @@ -455,7 +455,7 @@ public class RecordedModelsTab extends Tab implements TabSelectionListener { .peek(fxm -> { for (Recording recording : recordings) { if(recording.getStatus() == Recording.State.RECORDING && - recording.getModelName().equals(fxm.getSanitizedNamed())) + recording.getModel().getName().equals(fxm.getSanitizedNamed())) { fxm.getRecordingProperty().set(true); break; @@ -616,7 +616,10 @@ public class RecordedModelsTab extends Tab implements TabSelectionListener { private void stopAction(List selectedModels) { List models = selectedModels.stream().map(jfxm -> jfxm.getDelegate()).collect(Collectors.toList()); new StopRecordingAction(getTabPane(), models, recorder).execute((m) -> { - observableModels.remove(m); + Platform.runLater(() -> { + table.getSelectionModel().clearSelection(table.getItems().indexOf(m)); + table.getItems().remove(m); + }); }); }; diff --git a/client/src/main/java/ctbrec/ui/RecordingsTab.java b/client/src/main/java/ctbrec/ui/RecordingsTab.java index 98cbddfd..68d3b54a 100644 --- a/client/src/main/java/ctbrec/ui/RecordingsTab.java +++ b/client/src/main/java/ctbrec/ui/RecordingsTab.java @@ -43,6 +43,7 @@ import ctbrec.sites.Site; import ctbrec.ui.controls.Toast; import javafx.application.Platform; import javafx.beans.property.SimpleObjectProperty; +import javafx.beans.property.SimpleStringProperty; import javafx.collections.FXCollections; import javafx.collections.ObservableList; import javafx.concurrent.ScheduledService; @@ -63,7 +64,6 @@ import javafx.scene.control.TableColumn; import javafx.scene.control.TableColumn.SortType; import javafx.scene.control.TableView; import javafx.scene.control.Tooltip; -import javafx.scene.control.cell.PropertyValueFactory; import javafx.scene.input.ContextMenuEvent; import javafx.scene.input.KeyCode; import javafx.scene.input.KeyEvent; @@ -123,7 +123,7 @@ public class RecordingsTab extends Tab implements TabSelectionListener { table.getSelectionModel().setSelectionMode(SelectionMode.MULTIPLE); TableColumn name = new TableColumn<>("Model"); name.setPrefWidth(200); - name.setCellValueFactory(new PropertyValueFactory("modelName")); + name.setCellValueFactory(cdf -> new SimpleStringProperty(cdf.getValue().getModel().getName())); TableColumn date = new TableColumn<>("Date"); date.setCellValueFactory((cdf) -> { Instant instant = cdf.getValue().getStartDate(); @@ -429,23 +429,9 @@ public class RecordingsTab extends Tab implements TabSelectionListener { contextMenu.getItems().add(downloadRecording); } - MenuItem regenPlaylist = new MenuItem("Regenerate Playlist"); - regenPlaylist.setOnAction((e) -> { - try { - recorder.regeneratePlaylist(recordings.get(0)); - } catch (IOException | InvalidKeyException | NoSuchAlgorithmException | IllegalStateException e1) { - showErrorDialog("Error while regenerating playlist", "The recording could not be regenerated", e1); - LOG.error("Error while regenerating playlist", e1); - } - }); - if (!Config.getInstance().getSettings().localRecording && recordings.get(0).getStatus() == State.FINISHED) { - contextMenu.getItems().add(regenPlaylist); - } - if(recordings.size() > 1) { openInPlayer.setDisable(true); openDir.setDisable(true); - regenPlaylist.setDisable(true); } return contextMenu; @@ -561,7 +547,7 @@ public class RecordingsTab extends Tab implements TabSelectionListener { msg = "Delete " + recordings.size() + " recordings for good?"; } else { Recording r = recordings.get(0); - msg = "Delete " + r.getModelName() + "/" + r.getStartDate() + " for good?"; + msg = "Delete " + r.getModel().getName() + "/" + r.getStartDate() + " for good?"; } AutosizeAlert confirm = new AutosizeAlert(AlertType.CONFIRMATION, msg, getTabPane().getScene(), YES, NO); confirm.setTitle("Delete recording?"); diff --git a/client/src/main/java/ctbrec/ui/ThumbCell.java b/client/src/main/java/ctbrec/ui/ThumbCell.java index ad16ba1d..8129cda2 100644 --- a/client/src/main/java/ctbrec/ui/ThumbCell.java +++ b/client/src/main/java/ctbrec/ui/ThumbCell.java @@ -276,11 +276,11 @@ public class ThumbCell extends StackPane { } int[] resolution = resolutionCache.getIfPresent(model); - if(resolution != null) { + if (resolution != null) { ThumbOverviewTab.threadPool.submit(() -> { try { updateResolutionTag(resolution); - } catch(Exception e) { + } catch (Exception e) { LOG.warn("Couldn't update resolution tag for model {}", model.getName(), e); } }); diff --git a/client/src/main/java/ctbrec/ui/ThumbOverviewTab.java b/client/src/main/java/ctbrec/ui/ThumbOverviewTab.java index 02349f7b..a9819225 100644 --- a/client/src/main/java/ctbrec/ui/ThumbOverviewTab.java +++ b/client/src/main/java/ctbrec/ui/ThumbOverviewTab.java @@ -19,6 +19,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -87,7 +88,7 @@ public class ThumbOverviewTab extends Tab implements TabSelectionListener { private static final transient Logger LOG = LoggerFactory.getLogger(ThumbOverviewTab.class); protected static BlockingQueue queue = new LinkedBlockingQueue<>(); - static ExecutorService threadPool = new ThreadPoolExecutor(2, 2, 10, TimeUnit.MINUTES, queue); + static ExecutorService threadPool = new ThreadPoolExecutor(2, 2, 10, TimeUnit.MINUTES, queue, createThreadFactory()); static Set resolutionProcessing = Collections.synchronizedSet(new HashSet<>()); protected FlowPane grid = new FlowPane(); @@ -852,4 +853,15 @@ public class ThumbOverviewTab extends Tab implements TabSelectionListener { selectedThumbCells.get(0).setSelected(false); } } + + private static int threadCounter = 0; + private static ThreadFactory createThreadFactory() { + return r -> { + Thread t = new Thread(r); + t.setDaemon(true); + t.setPriority(Thread.MIN_PRIORITY); + t.setName("ResolutionDetector-" + threadCounter++); + return t; + }; + } } diff --git a/client/src/main/java/ctbrec/ui/action/ModelMassEditAction.java b/client/src/main/java/ctbrec/ui/action/ModelMassEditAction.java index 7da778be..e4ff44b5 100644 --- a/client/src/main/java/ctbrec/ui/action/ModelMassEditAction.java +++ b/client/src/main/java/ctbrec/ui/action/ModelMassEditAction.java @@ -17,7 +17,7 @@ import javafx.scene.Node; public class ModelMassEditAction { static BlockingQueue queue = new LinkedBlockingQueue<>(); - static ExecutorService threadPool = new ThreadPoolExecutor(2, 2, 10, TimeUnit.MINUTES, queue); + static ExecutorService threadPool = new ThreadPoolExecutor(2, 10, 10, TimeUnit.MINUTES, queue); protected List models; protected Consumer action; @@ -39,14 +39,14 @@ public class ModelMassEditAction { } public void execute(Consumer callback) { - Consumer cb = Objects.requireNonNull(callback); + Consumer cb = Objects.requireNonNull(callback, "Callback is null, call execute() instead"); source.setCursor(Cursor.WAIT); - threadPool.submit(() -> { - for (Model model : models) { + for (Model model : models) { + threadPool.submit(() -> { action.accept(model); cb.accept(model); - } - Platform.runLater(() -> source.setCursor(Cursor.DEFAULT)); - }); + Platform.runLater(() -> source.setCursor(Cursor.DEFAULT)); + }); + } } } diff --git a/client/src/main/java/ctbrec/ui/action/PlayAction.java b/client/src/main/java/ctbrec/ui/action/PlayAction.java index a667a04e..544ba35a 100644 --- a/client/src/main/java/ctbrec/ui/action/PlayAction.java +++ b/client/src/main/java/ctbrec/ui/action/PlayAction.java @@ -21,7 +21,7 @@ public class PlayAction { public void execute() { source.setCursor(Cursor.WAIT); - new Thread(() -> { + Thread t = new Thread(() -> { SiteUI siteUI = SiteUiFactory.getUi(selectedModel.getSite()); boolean started = siteUI.play(selectedModel); Platform.runLater(() -> { @@ -30,6 +30,9 @@ public class PlayAction { } source.setCursor(Cursor.DEFAULT); }); - }).start(); + }); + t.setName("Player " + selectedModel); + t.setDaemon(true); + t.start(); } } diff --git a/client/src/main/java/ctbrec/ui/sites/camsoda/CamsodaSiteUi.java b/client/src/main/java/ctbrec/ui/sites/camsoda/CamsodaSiteUi.java index 5bb5a62c..e987b0b8 100644 --- a/client/src/main/java/ctbrec/ui/sites/camsoda/CamsodaSiteUi.java +++ b/client/src/main/java/ctbrec/ui/sites/camsoda/CamsodaSiteUi.java @@ -2,9 +2,6 @@ package ctbrec.ui.sites.camsoda; import java.io.IOException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import ctbrec.sites.ConfigUI; import ctbrec.sites.camsoda.Camsoda; import ctbrec.ui.TabProvider; @@ -12,8 +9,6 @@ import ctbrec.ui.sites.AbstractSiteUi; public class CamsodaSiteUi extends AbstractSiteUi { - private static final transient Logger LOG = LoggerFactory.getLogger(CamsodaSiteUi.class); - private CamsodaTabProvider tabProvider; private CamsodaConfigUI configUi; private Camsoda camsoda; diff --git a/client/src/main/resources/logback.xml b/client/src/main/resources/logback.xml index b6629bea..c299d670 100644 --- a/client/src/main/resources/logback.xml +++ b/client/src/main/resources/logback.xml @@ -34,6 +34,7 @@ + diff --git a/common/src/main/java/ctbrec/Recording.java b/common/src/main/java/ctbrec/Recording.java index dadbe3c7..83525d04 100644 --- a/common/src/main/java/ctbrec/Recording.java +++ b/common/src/main/java/ctbrec/Recording.java @@ -1,18 +1,38 @@ package ctbrec; -import java.text.ParseException; -import java.text.SimpleDateFormat; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.time.Duration; import java.time.Instant; -import java.util.Date; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; + +import com.iheartradio.m3u8.Encoding; +import com.iheartradio.m3u8.Format; +import com.iheartradio.m3u8.ParseException; +import com.iheartradio.m3u8.ParsingMode; +import com.iheartradio.m3u8.PlaylistException; +import com.iheartradio.m3u8.PlaylistParser; +import com.iheartradio.m3u8.data.MediaPlaylist; +import com.iheartradio.m3u8.data.Playlist; +import com.iheartradio.m3u8.data.TrackData; + +import ctbrec.event.EventBusHolder; +import ctbrec.event.RecordingStateChangedEvent; +import ctbrec.recorder.download.Download; public class Recording { - private String modelName; + private Model model; + private transient Download download; private Instant startDate; private String path; - private boolean hasPlaylist; private State status = State.UNKNOWN; private int progress = -1; - private long sizeInByte; + private long sizeInByte = -1; + private String metaDataFile; public static enum State { RECORDING("recording"), @@ -21,7 +41,11 @@ public class Recording { POST_PROCESSING("post-processing"), FINISHED("finished"), DOWNLOADING("downloading"), - UNKNOWN("unknown"); + DELETING("deleting"), + DELETED("deleted"), + UNKNOWN("unknown"), + WAITING("waiting"), + FAILED("failed"); private String desc; @@ -37,21 +61,14 @@ public class Recording { public Recording() {} - public Recording(String path) throws ParseException { - this.path = path; - this.modelName = path.substring(0, path.indexOf("/")); - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm"); - Date date = sdf.parse(path.substring(path.indexOf('/')+1)); - startDate = Instant.ofEpochMilli(date.getTime()); - } + // public Recording(String path) throws ParseException { + // this.path = path; + // this.modelName = path.substring(0, path.indexOf("/")); + // SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm"); + // Date date = sdf.parse(path.substring(path.indexOf('/')+1)); + // startDate = Instant.ofEpochMilli(date.getTime()); + // } - public String getModelName() { - return modelName; - } - - public void setModelName(String modelName) { - this.modelName = modelName; - } public Instant getStartDate() { return startDate; @@ -69,6 +86,11 @@ public class Recording { this.status = status; } + public void setStatusWithEvent(State status, boolean fireEvent) { + setStatus(status); + fireStatusEvent(status); + } + public int getProgress() { return this.progress; } @@ -85,15 +107,16 @@ public class Recording { this.path = path; } - public boolean hasPlaylist() { - return hasPlaylist; - } - - public void setHasPlaylist(boolean hasPlaylist) { - this.hasPlaylist = hasPlaylist; + public File getAbsoluteFile() { + String recordingsDir = Config.getInstance().getSettings().recordingsDir; + File recordingsFile = new File(recordingsDir, getPath()); + return recordingsFile; } public long getSizeInByte() { + if (sizeInByte == -1 || getStatus() == State.RECORDING) { + this.sizeInByte = getSize(); + } return sizeInByte; } @@ -101,13 +124,86 @@ public class Recording { this.sizeInByte = sizeInByte; } + public void postprocess() { + getDownload().postprocess(this); + } + + private void fireStatusEvent(State status) { + RecordingStateChangedEvent evt = new RecordingStateChangedEvent(getDownload().getTarget(), status, getModel(), getStartDate()); + EventBusHolder.BUS.post(evt); + } + + public Model getModel() { + return model; + } + + public void setModel(Model model) { + this.model = model; + } + + public Download getDownload() { + return download; + } + + public void setDownload(Download download) { + this.download = download; + } + + public String getMetaDataFile() { + return metaDataFile; + } + + public void setMetaDataFile(String metaDataFile) { + this.metaDataFile = metaDataFile; + } + + public Duration getLength() throws IOException, ParseException, PlaylistException { + // check, if the recording exists + File rec = new File(Config.getInstance().getSettings().recordingsDir, getPath()); + if (!rec.exists()) { + return Duration.ofSeconds(0); + } + + // check, if the recording has data at all + long size = getSizeInByte(); + if (size == 0) { + return Duration.ofSeconds(0); + } + + // determine the length + if (getPath().endsWith(".ts")) { + return Duration.ofSeconds((long) MpegUtil.getFileDuration(rec)); + } else if (rec.isDirectory()) { + File playlist = new File(rec, "playlist.m3u8"); + if (playlist.exists()) { + return Duration.ofSeconds((long) getPlaylistLength(playlist)); + } + } + return Duration.ofSeconds(0); + } + + private double getPlaylistLength(File playlist) throws IOException, ParseException, PlaylistException { + if (playlist.exists()) { + PlaylistParser playlistParser = new PlaylistParser(new FileInputStream(playlist), Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT); + Playlist m3u = playlistParser.parse(); + MediaPlaylist mediaPlaylist = m3u.getMediaPlaylist(); + double length = 0; + for (TrackData trackData : mediaPlaylist.getTracks()) { + length += trackData.getTrackInfo().duration; + } + return length; + } else { + throw new FileNotFoundException(playlist.getAbsolutePath() + " does not exist"); + } + } + @Override public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((modelName == null) ? 0 : modelName.hashCode()); - result = prime * result + ((path == null) ? 0 : path.hashCode()); - result = prime * result + ((startDate == null) ? 0 : startDate.hashCode()); + result = prime * result + ((getModel() == null) ? 0 : getModel().hashCode()); + result = prime * result + ((getPath() == null) ? 0 : getPath().hashCode()); + result = prime * result + ((getStartDate() == null) ? 0 : getStartDate().hashCode()); return result; } @@ -117,22 +213,46 @@ public class Recording { return true; if (obj == null) return false; - Recording other = (Recording) obj; - if (modelName == null) { - if (other.getModelName() != null) - return false; - } else if (!modelName.equals(other.getModelName())) + if(!(obj instanceof Recording)) { return false; - if (path == null) { + } + Recording other = (Recording) obj; + if (getModel() == null) { + if (other.getModel() != null) + return false; + } else if (!getModel().equals(other.getModel())) + return false; + if (getPath() == null) { if (other.getPath() != null) return false; - } else if (!path.equals(other.getPath())) + } else if (!getPath().equals(other.getPath())) return false; - if (startDate == null) { + if (getStartDate() == null) { if (other.getStartDate() != null) return false; - } else if (!startDate.equals(other.getStartDate())) + } else if (!getStartDate().equals(other.getStartDate())) return false; return true; } + + @Override + public String toString() { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.RECORDING_DATE_FORMAT); + LocalDateTime localStartDate = LocalDateTime.ofInstant(getStartDate(), ZoneId.systemDefault()); + return getModel().getSanitizedNamed() + '_' + formatter.format(localStartDate); + } + + private long getSize() { + File rec = new File(Config.getInstance().getSettings().recordingsDir, getPath()); + if(rec.isDirectory()) { + long size = 0; + File[] files = rec.listFiles(); + for (File file : files) { + size += file.length(); + } + return size; + } else { + return rec.length(); + } + } } diff --git a/common/src/main/java/ctbrec/recorder/LocalRecorder.java b/common/src/main/java/ctbrec/recorder/LocalRecorder.java deleted file mode 100644 index f4ecccf0..00000000 --- a/common/src/main/java/ctbrec/recorder/LocalRecorder.java +++ /dev/null @@ -1,852 +0,0 @@ -package ctbrec.recorder; - -import static ctbrec.Recording.State.*; -import static ctbrec.event.Event.Type.*; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FilenameFilter; -import java.io.IOException; -import java.nio.file.FileStore; -import java.nio.file.Files; -import java.security.InvalidKeyException; -import java.security.NoSuchAlgorithmException; -import java.text.SimpleDateFormat; -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NoSuchElementException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.eventbus.Subscribe; -import com.iheartradio.m3u8.Encoding; -import com.iheartradio.m3u8.Format; -import com.iheartradio.m3u8.ParseException; -import com.iheartradio.m3u8.ParsingMode; -import com.iheartradio.m3u8.PlaylistError; -import com.iheartradio.m3u8.PlaylistException; -import com.iheartradio.m3u8.PlaylistParser; -import com.iheartradio.m3u8.data.MediaPlaylist; -import com.iheartradio.m3u8.data.Playlist; -import com.iheartradio.m3u8.data.TrackData; - -import ctbrec.Config; -import ctbrec.Model; -import ctbrec.MpegUtil; -import ctbrec.Recording; -import ctbrec.Recording.State; -import ctbrec.event.Event; -import ctbrec.event.EventBusHolder; -import ctbrec.event.ModelIsOnlineEvent; -import ctbrec.event.RecordingStateChangedEvent; -import ctbrec.io.HttpClient; -import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException; -import ctbrec.recorder.download.Download; - -public class LocalRecorder implements Recorder { - - private static final transient Logger LOG = LoggerFactory.getLogger(LocalRecorder.class); - private static final boolean IGNORE_CACHE = true; - - private List models = Collections.synchronizedList(new ArrayList<>()); - private Map recordingProcesses = Collections.synchronizedMap(new HashMap<>()); - private Map playlistGenerators = new HashMap<>(); - private Config config; - private ProcessMonitor processMonitor; - private volatile boolean recording = true; - private List deleteInProgress = Collections.synchronizedList(new ArrayList<>()); - private RecorderHttpClient client = new RecorderHttpClient(); - private ReentrantLock lock = new ReentrantLock(); - private long lastSpaceMessage = 0; - - private ExecutorService ppThreadPool = Executors.newFixedThreadPool(2); - - public LocalRecorder(Config config) { - this.config = config; - config.getSettings().models.stream().forEach((m) -> { - if(m.getSite().isEnabled()) { - models.add(m); - } else { - LOG.info("{} disabled -> ignoring {}", m.getSite().getName(), m.getName()); - } - }); - - recording = true; - processMonitor = new ProcessMonitor(); - processMonitor.start(); - - registerEventBusListener(); - if(Config.isServerMode()) { - processUnfinishedRecordings(); - } - - LOG.debug("Recorder initialized"); - LOG.info("Models to record: {}", models); - LOG.info("Saving recordings in {}", config.getSettings().recordingsDir); - } - - private void registerEventBusListener() { - EventBusHolder.BUS.register(new Object() { - @Subscribe - public void modelEvent(Event e) { - try { - if (e.getType() == MODEL_ONLINE) { - ModelIsOnlineEvent evt = (ModelIsOnlineEvent) e; - Model model = evt.getModel(); - if(!isSuspended(model) && !recordingProcesses.containsKey(model)) { - startRecordingProcess(model); - } - } - } catch (Exception e1) { - LOG.error("Error while handling model state changed event", e); - } - } - }); - } - - @Override - public void startRecording(Model model) { - if (!models.contains(model)) { - LOG.info("Model {} added", model); - lock.lock(); - try { - models.add(model); - config.getSettings().models.add(model); - config.save(); - } catch (IOException e) { - LOG.error("Couldn't save config", e); - } finally { - lock.unlock(); - } - } - } - - @Override - public void stopRecording(Model model) throws IOException { - lock.lock(); - try { - if (models.contains(model)) { - models.remove(model); - config.getSettings().models.remove(model); - if (recordingProcesses.containsKey(model)) { - stopRecordingProcess(model); - } - LOG.info("Model {} removed", model); - config.save(); - } else { - throw new NoSuchElementException("Model " + model.getName() + " ["+model.getUrl()+"] not found in list of recorded models"); - } - } finally { - lock.unlock(); - } - } - - private void startRecordingProcess(Model model) throws IOException { - if(!recording) { - // recorder is not in recording mode - return; - } - - if(model.isSuspended()) { - LOG.info("Recording for model {} is suspended.", model); - return; - } - - if (recordingProcesses.containsKey(model)) { - LOG.error("A recording for model {} is already running", model); - return; - } - - lock.lock(); - try { - if (!models.contains(model)) { - LOG.info("Model {} has been removed. Restarting of recording cancelled.", model); - return; - } - } finally { - lock.unlock(); - } - - if(!enoughSpaceForRecording()) { - long now = System.currentTimeMillis(); - if( (now - lastSpaceMessage) > TimeUnit.MINUTES.toMillis(1)) { - LOG.info("Not enough space for recording, not starting recording for {}", model); - lastSpaceMessage = now; - } - return; - } - - if(!downloadSlotAvailable()) { - LOG.info("The number of downloads is maxed out, not starting recording for {}", model); - return; - } - - LOG.debug("Starting recording for model {}", model.getName()); - Download download = model.createDownload(); - LOG.debug("Downloading with {}", download.getClass().getSimpleName()); - recordingProcesses.put(model, download); - Thread downloadThread = new Thread(() -> { - try { - download.start(model, config); - } catch (IOException e) { - LOG.error("Download for {} failed. Download alive: {}", model.getName(), download.isAlive(), e); - } - }); - downloadThread.setName("Download " + model.getName()); - downloadThread.start(); - } - - private boolean downloadSlotAvailable() { - int concurrentRecordings = Config.getInstance().getSettings().concurrentRecordings; - return concurrentRecordings == 0 || concurrentRecordings > 0 && recordingProcesses.size() < concurrentRecordings; - } - - private void stopRecordingProcess(Model model) { - LOG.debug("Stopping recording for {}", model); - Download download = recordingProcesses.get(model); - recordingProcesses.remove(model); - fireRecordingStateChanged(download.getTarget(), STOPPED, model, download.getStartTime()); - - Runnable stopAndThePostProcess = () -> { - LOG.debug("Stopping download for {}", model); - download.stop(); - LOG.debug("Running post-processing for {}", model); - createPostProcessor(download).run(); - }; - ppThreadPool.submit(stopAndThePostProcess); - } - - @Override - public boolean isTracked(Model model) { - lock.lock(); - try { - return models.contains(model); - } finally { - lock.unlock(); - } - } - - @Override - public boolean isSuspended(Model model) { - lock.lock(); - try { - int index = models.indexOf(model); - if(index >= 0) { - Model m = models.get(index); - return m.isSuspended(); - } else { - return false; - } - } finally { - lock.unlock(); - } - } - - @Override - public List getModels() { - lock.lock(); - try { - return Collections.unmodifiableList(new ArrayList<>(models)); - } finally { - lock.unlock(); - } - } - - @Override - public List getOnlineModels() { - return getModels() - .stream() - .filter(m -> { - try { - return m.isOnline(); - } catch (IOException | ExecutionException | InterruptedException e) { - return false; - } - }) - .collect(Collectors.toList()); - } - - @Override - public void shutdown() { - LOG.info("Shutting down"); - recording = false; - LOG.debug("Stopping monitor threads"); - processMonitor.running = false; - LOG.debug("Stopping all recording processes"); - stopRecordingProcesses(); - ppThreadPool.shutdown(); - try { - ppThreadPool.awaitTermination(5, TimeUnit.MINUTES); - } catch (InterruptedException e) { - LOG.error("Couldn't wait for post-processing to finish. Some recordings might be broken!"); - } - client.shutdown(); - } - - private void stopRecordingProcesses() { - lock.lock(); - try { - for (Model model : models) { - Download recordingProcess = recordingProcesses.get(model); - if (recordingProcess != null) { - stopRecordingProcess(model); - } - } - } finally { - lock.unlock(); - } - } - - private void tryRestartRecording(Model model) { - if (!recording) { - // recorder is not in recording state - return; - } - - try { - boolean modelInRecordingList = isTracked(model); - boolean online = model.isOnline(IGNORE_CACHE); - if (modelInRecordingList && online) { - LOG.info("Restarting recording for model {}", model); - recordingProcesses.remove(model); - startRecordingProcess(model); - } - } catch (Exception e) { - LOG.error("Couldn't restart recording for model {}", model); - } - } - - private class ProcessMonitor extends Thread { - private volatile boolean running = false; - - public ProcessMonitor() { - setName("ProcessMonitor"); - setDaemon(true); - } - - @Override - public void run() { - running = true; - while (running) { - try { - if(!enoughSpaceForRecording() && !recordingProcesses.isEmpty()) { - LOG.info("No space left -> Stopping all recordings"); - stopRecordingProcesses(); - } - } catch (IOException e1) { - LOG.warn("Couldn't check free space left", e1); - } - - List restart = new ArrayList<>(); - for (Iterator> iterator = recordingProcesses.entrySet().iterator(); iterator.hasNext();) { - Entry entry = iterator.next(); - Model m = entry.getKey(); - Download download = entry.getValue(); - if (!download.isAlive()) { - LOG.debug("Recording terminated for model {}", m.getName()); - iterator.remove(); - restart.add(m); - fireRecordingStateChanged(download.getTarget(), STOPPED, m, download.getStartTime()); - Runnable pp = createPostProcessor(download); - ppThreadPool.submit(pp); - } - } - for (Model m : restart) { - tryRestartRecording(m); - } - - try { - if (running) - Thread.sleep(1000); - } catch (InterruptedException e) { - LOG.error("Couldn't sleep", e); - } - } - LOG.debug(getName() + " terminated"); - } - } - - private void generatePlaylist(File recDir) { - if(!config.getSettings().generatePlaylist) { - return; - } - - PlaylistGenerator playlistGenerator = new PlaylistGenerator(); - playlistGenerators.put(recDir, playlistGenerator); - try { - File playlist = playlistGenerator.generate(recDir); - if(playlist != null) { - playlistGenerator.validate(recDir); - } - } catch (IOException | ParseException e) { - LOG.error("Couldn't generate playlist file", e); - } catch (PlaylistException e) { - if(e.getErrors().isEmpty()) { - LOG.error("Couldn't generate playlist file", e); - } else { - LOG.error("Playlist contains errors"); - for (PlaylistError error : e.getErrors()) { - LOG.error("Error: {}", error.toString()); - } - } - } catch (InvalidPlaylistException e) { - LOG.error("Playlist is invalid and will be deleted", e); - File playlist = new File(recDir, "playlist.m3u8"); - playlist.delete(); - } finally { - playlistGenerators.remove(recDir); - } - } - - private void fireRecordingStateChanged(File path, Recording.State newState, Model model, Instant startTime) { - RecordingStateChangedEvent evt = new RecordingStateChangedEvent(path, newState, model, startTime); - EventBusHolder.BUS.post(evt); - } - - /** - * This is called once at start for server mode. When the server is killed, recordings are - * left without playlist. This method creates playlists for them. - */ - private void processUnfinishedRecordings() { - try { - List recs = getRecordings(); - for (Recording rec : recs) { - if (rec.getStatus() == RECORDING) { - boolean recordingProcessFound = false; - File recordingsDir = new File(config.getSettings().recordingsDir); - File recDir = new File(recordingsDir, rec.getPath()); - for (Entry download : recordingProcesses.entrySet()) { - if (download.getValue().getTarget().equals(recDir)) { - recordingProcessFound = true; - } - } - if (!recordingProcessFound) { - ppThreadPool.submit(() -> { - generatePlaylist(recDir); - }); - } - } - } - } catch (Exception e) { - LOG.error("Unexpected error in playlist trigger", e); - } - } - - @Override - public List getRecordings() { - if(Config.isServerMode()) { - return listSegmentedRecordings(); - } else { - return listMergedRecordings(); - } - } - - private List listMergedRecordings() { - File recordingsDir = new File(config.getSettings().recordingsDir); - List possibleRecordings = new LinkedList<>(); - listRecursively(recordingsDir, possibleRecordings, (dir, name) -> name.matches(".*?_\\d{4}-\\d{2}-\\d{2}_\\d{2}-\\d{2}-\\d{2}_\\d{3}\\.(ts|mp4)")); - SimpleDateFormat sdf = new SimpleDateFormat(Config.RECORDING_DATE_FORMAT); - List recordings = new ArrayList<>(); - for (File ts: possibleRecordings) { - try { - String filename = ts.getName(); - int extLength = filename.length() - filename.lastIndexOf('.'); - String dateString = filename.substring(filename.length() - extLength - Config.RECORDING_DATE_FORMAT.length(), filename.length() - extLength); - Date startDate = sdf.parse(dateString); - Recording recording = new Recording(); - recording.setModelName(filename.substring(0, filename.length() - extLength - 1 - Config.RECORDING_DATE_FORMAT.length())); - recording.setStartDate(Instant.ofEpochMilli(startDate.getTime())); - String path = ts.getAbsolutePath().replace(config.getSettings().recordingsDir, ""); - if(!path.startsWith("/")) { - path = '/' + path; - } - recording.setPath(path); - recording.setSizeInByte(ts.length()); - recording.setStatus(getStatus(recording)); - recordings.add(recording); - } catch(Exception e) { - LOG.error("Ignoring {} - {}", ts.getAbsolutePath(), e.getMessage()); - } - } - return recordings; - } - - private State getStatus(Recording recording) { - File absolutePath = new File(Config.getInstance().getSettings().recordingsDir, recording.getPath()); - - PlaylistGenerator playlistGenerator = playlistGenerators.get(absolutePath); - if (playlistGenerator != null) { - recording.setProgress(playlistGenerator.getProgress()); - return GENERATING_PLAYLIST; - } - - if (Config.isServerMode()) { - if (recording.hasPlaylist()) { - return FINISHED; - } else { - return RECORDING; - } - } else { - boolean dirUsedByRecordingProcess = false; - for (Download download : recordingProcesses.values()) { - if(absolutePath.equals(download.getTarget())) { - dirUsedByRecordingProcess = true; - break; - } - } - if(dirUsedByRecordingProcess) { - return RECORDING; - } else { - return FINISHED; - } - } - } - - private List listSegmentedRecordings() { - List recordings = new ArrayList<>(); - File recordingsDir = new File(config.getSettings().recordingsDir); - File[] subdirs = recordingsDir.listFiles(); - if (subdirs == null) { - return Collections.emptyList(); - } - - for (File subdir : subdirs) { - // ignore empty directories - File[] recordingsDirs = subdir.listFiles(); - if(recordingsDirs == null || recordingsDirs.length == 0) { - continue; - } - - // start going over valid directories - for (File rec : recordingsDirs) { - SimpleDateFormat sdf = new SimpleDateFormat(Config.RECORDING_DATE_FORMAT); - if (rec.isDirectory()) { - try { - // ignore directories, which are probably not created by ctbrec - if (rec.getName().length() != Config.RECORDING_DATE_FORMAT.length()) { - continue; - } - // ignore empty directories - if (rec.listFiles().length == 0) { - continue; - } - // don't list recordings, which currently get deleted - if (deleteInProgress.contains(rec)) { - continue; - } - - Date startDate = sdf.parse(rec.getName()); - Recording recording = new Recording(); - recording.setModelName(subdir.getName()); - recording.setStartDate(Instant.ofEpochMilli(startDate.getTime())); - recording.setPath(recording.getModelName() + "/" + rec.getName()); - recording.setSizeInByte(getSize(rec)); - File playlist = new File(rec, "playlist.m3u8"); - recording.setHasPlaylist(playlist.exists()); - recording.setStatus(getStatus(recording)); - recordings.add(recording); - } catch (Exception e) { - LOG.debug("Ignoring {} - {}", rec.getAbsolutePath(), e.getMessage()); - } - } - } - } - return recordings; - } - - private void listRecursively(File dir, List result, FilenameFilter filenameFilter) { - File[] files = dir.listFiles(); - if(files != null) { - for (File file : files) { - if(file.isDirectory()) { - listRecursively(file, result, filenameFilter); - } - if(filenameFilter.accept(dir, file.getName())) { - result.add(file); - } - } - } - } - - private long getSize(File rec) { - long size = 0; - File[] files = rec.listFiles(); - for (File file : files) { - size += file.length(); - } - return size; - } - - @Override - public void delete(Recording recording) throws IOException { - File recordingsDir = new File(config.getSettings().recordingsDir); - File path = new File(recordingsDir, recording.getPath()); - LOG.debug("Deleting {}", path); - - if(path.isFile()) { - Files.delete(path.toPath()); - deleteEmptyParents(path.getParentFile()); - } else { - deleteDirectory(path); - deleteEmptyParents(path); - } - } - - private void deleteEmptyParents(File parent) throws IOException { - File recDir = new File(Config.getInstance().getSettings().recordingsDir); - while(parent != null && parent.list() != null && parent.list().length == 0) { - if(parent.equals(recDir)) { - return; - } - LOG.debug("Deleting empty directory {}", parent.getAbsolutePath()); - Files.delete(parent.toPath()); - parent = parent.getParentFile(); - } - } - - private void deleteDirectory(File directory) throws IOException { - if (!directory.exists()) { - throw new IOException("Recording does not exist"); - } - - try { - deleteInProgress.add(directory); - File[] files = directory.listFiles(); - boolean deletedAllFiles = true; - for (File file : files) { - try { - LOG.trace("Deleting {}", file.getAbsolutePath()); - Files.delete(file.toPath()); - } catch (Exception e) { - deletedAllFiles = false; - LOG.debug("Couldn't delete {}", file, e); - } - } - - if (!deletedAllFiles) { - throw new IOException("Couldn't delete all files in " + directory); - } - } finally { - deleteInProgress.remove(directory); - } - } - - @Override - public void switchStreamSource(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { - if (models.contains(model)) { - int index = models.indexOf(model); - models.get(index).setStreamUrlIndex(model.getStreamUrlIndex()); - config.save(); - LOG.debug("Switching stream source to index {} for model {}", model.getStreamUrlIndex(), model.getName()); - Download download = recordingProcesses.get(model); - if(download != null) { - stopRecordingProcess(model); - } - tryRestartRecording(model); - } else { - LOG.warn("Couldn't switch stream source for model {}. Not found in list", model.getName()); - return; - } - } - - @Override - public void suspendRecording(Model model) { - lock.lock(); - try { - if (models.contains(model)) { - int index = models.indexOf(model); - models.get(index).setSuspended(true); - model.setSuspended(true); - config.save(); - } else { - LOG.warn("Couldn't suspend model {}. Not found in list", model.getName()); - return; - } - } catch (IOException e) { - LOG.error("Couldn't save config", e); - } finally { - lock.unlock(); - } - - Download download = recordingProcesses.get(model); - if(download != null) { - stopRecordingProcess(model); - } - } - - @Override - public void resumeRecording(Model model) throws IOException { - lock.lock(); - try { - if (models.contains(model)) { - int index = models.indexOf(model); - Model m = models.get(index); - m.setSuspended(false); - if(m.isOnline()) { - startRecordingProcess(m); - } - model.setSuspended(false); - config.save(); - } else { - LOG.warn("Couldn't resume model {}. Not found in list", model.getName()); - return; - } - } catch (ExecutionException | InterruptedException e) { - LOG.error("Couldn't check, if model {} is online", model.getName()); - } finally { - lock.unlock(); - } - } - - @Override - public HttpClient getHttpClient() { - return client; - } - - @Override - public long getTotalSpaceBytes() throws IOException { - return getRecordingsFileStore().getTotalSpace(); - } - - @Override - public long getFreeSpaceBytes() throws IOException { - return getRecordingsFileStore().getUsableSpace(); - } - - private FileStore getRecordingsFileStore() throws IOException { - File recordingsDir = new File(config.getSettings().recordingsDir); - if(!recordingsDir.exists()) { - Files.createDirectories(recordingsDir.toPath()); - } - FileStore store = Files.getFileStore(recordingsDir.toPath()); - return store; - } - - private boolean enoughSpaceForRecording() throws IOException { - long minimum = config.getSettings().minimumSpaceLeftInBytes; - if(minimum == 0) { // 0 means don't check - return true; - } else { - return getFreeSpaceBytes() > minimum; - } - } - - private Runnable createPostProcessor(Download download) { - return () -> { - LOG.debug("Starting post-processing for {}", download.getTarget()); - if(Config.isServerMode()) { - fireRecordingStateChanged(download.getTarget(), GENERATING_PLAYLIST, download.getModel(), download.getStartTime()); - generatePlaylist(download.getTarget()); - } - boolean deleted = deleteIfTooShort(download); - if(!deleted) { - fireRecordingStateChanged(download.getTarget(), POST_PROCESSING, download.getModel(), download.getStartTime()); - download.postprocess(download.getTarget()); - } - fireRecordingStateChanged(download.getTarget(), FINISHED, download.getModel(), download.getStartTime()); - }; - } - - - // TODO maybe get file size and bitrate and check, if the values are plausible - // we could also compare the length with the time elapsed since starting the recording - private boolean deleteIfTooShort(Download download) { - try { - File target = download.getTarget(); - if(!target.exists()) { - return true; - } - - if(target.isDirectory()) { - if(target.list() == null || target.list().length == 0) { - deleteDirectory(target); - deleteEmptyParents(target); - return true; - } - } else { - if(target.length() == 0) { - Files.delete(target.toPath()); - deleteEmptyParents(target.getParentFile()); - return true; - } - } - - long minimumLengthInSeconds = Config.getInstance().getSettings().minimumLengthInSeconds; - if(minimumLengthInSeconds <= 0) { - return false; - } - - LOG.debug("Determining video length for {}", download.getTarget()); - - double duration = 0; - if(target.isDirectory()) { - File playlist = new File(target, "playlist.m3u8"); - duration = getPlaylistLength(playlist); - } else { - duration = MpegUtil.getFileDuration(target); - } - Duration minLength = Duration.ofSeconds(minimumLengthInSeconds); - Duration videoLength = Duration.ofSeconds((long) duration); - LOG.debug("Recording started at:{}. Video length is {}", download.getStartTime(), videoLength); - if(videoLength.minus(minLength).isNegative()) { - LOG.debug("Video too short {} {}", videoLength, download.getTarget()); - LOG.debug("Deleting {}", target); - if(target.isDirectory()) { - deleteDirectory(target); - deleteEmptyParents(target); - } else { - Files.delete(target.toPath()); - deleteEmptyParents(target.getParentFile()); - } - return true; - } else { - return false; - } - } catch (Exception e) { - LOG.error("Couldn't check video length", e); - return false; - } - } - - private double getPlaylistLength(File playlist) throws IOException, ParseException, PlaylistException { - if(playlist.exists()) { - PlaylistParser playlistParser = new PlaylistParser(new FileInputStream(playlist), Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT); - Playlist m3u = playlistParser.parse(); - MediaPlaylist mediaPlaylist = m3u.getMediaPlaylist(); - double length = 0; - for (TrackData trackData : mediaPlaylist.getTracks()) { - length += trackData.getTrackInfo().duration; - } - return length; - } else { - throw new FileNotFoundException(playlist.getAbsolutePath() + " does not exist"); - } - } - - @Override - public void regeneratePlaylist(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { - new Thread(() -> { - LOG.debug("Regenerate playlist {}", recording.getPath()); - File recordingsDir = new File(config.getSettings().recordingsDir); - File path = new File(recordingsDir, recording.getPath()); - generatePlaylist(path); - }).start(); - } -} diff --git a/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java new file mode 100644 index 00000000..a91ea2e2 --- /dev/null +++ b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java @@ -0,0 +1,559 @@ +package ctbrec.recorder; + +import static ctbrec.event.Event.Type.*; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileStore; +import java.nio.file.Files; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.eventbus.Subscribe; +import com.iheartradio.m3u8.ParseException; +import com.iheartradio.m3u8.PlaylistException; + +import ctbrec.Config; +import ctbrec.Model; +import ctbrec.Recording; +import ctbrec.Recording.State; +import ctbrec.event.Event; +import ctbrec.event.EventBusHolder; +import ctbrec.event.ModelIsOnlineEvent; +import ctbrec.event.RecordingStateChangedEvent; +import ctbrec.io.HttpClient; +import ctbrec.recorder.download.Download; +import ctbrec.sites.Site; + +public class NextGenLocalRecorder implements Recorder { + + private static final transient Logger LOG = LoggerFactory.getLogger(NextGenLocalRecorder.class); + private static final boolean IGNORE_CACHE = true; + private List models = Collections.synchronizedList(new ArrayList<>()); + private Config config; + private volatile boolean recording = true; + private ReentrantLock modelLock = new ReentrantLock(); + private ReentrantLock recordingsLock = new ReentrantLock(); + private RecorderHttpClient client = new RecorderHttpClient(); + private long lastSpaceMessage = 0; + private Map recordingProcesses = Collections.synchronizedMap(new HashMap<>()); + private RecordingManager recordingManager; + + // thread pools for downloads and post-processing + private BlockingQueue downloadQueue = new SynchronousQueue<>(); + private ThreadPoolExecutor downloadPool = new ThreadPoolExecutor(2, 100, 5, TimeUnit.MINUTES, downloadQueue, createThreadFactory("Download")); + + private ExecutorCompletionService completionService = new ExecutorCompletionService<>(downloadPool); + private BlockingQueue ppQueue = new LinkedBlockingQueue<>(); + private ThreadPoolExecutor ppPool = new ThreadPoolExecutor(2, 2, 5, TimeUnit.MINUTES, ppQueue, createThreadFactory("PP")); + + private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + public NextGenLocalRecorder(Config config, List sites) throws IOException { + this.config = config; + recordingManager = new RecordingManager(config, sites); + config.getSettings().models.stream().forEach((m) -> { + if (m.getSite().isEnabled()) { + models.add(m); + } else { + LOG.info("{} disabled -> ignoring {}", m.getSite().getName(), m.getName()); + } + }); + + recording = true; + registerEventBusListener(); + // if(Config.isServerMode()) { + // processUnfinishedRecordings(); + // } + + LOG.debug("Recorder initialized"); + LOG.info("Models to record: {}", models); + LOG.info("Saving recordings in {}", config.getSettings().recordingsDir); + + Thread completionHandler = new Thread(() -> { + while (!Thread.interrupted()) { + try { + Future result = completionService.take(); + Recording recording = result.get(); + recordingProcesses.remove(recording.getModel()); + if (recording.getStatus() == State.WAITING) { + LOG.debug("Download finished for {} -> Starting post-processing", recording.getModel().getName()); + ppPool.submit(() -> { + setRecordingStatus(recording, State.POST_PROCESSING); + recordingManager.saveRecording(recording); + recording.postprocess(); + setRecordingStatus(recording, State.FINISHED); + recordingManager.saveRecording(recording); + return recording; + }); + + // check, if we have to restart the recording + Model model = recording.getModel(); + tryRestartRecording(model); + } else { + // TODO is this ok? + setRecordingStatus(recording, State.FAILED); + recordingsLock.lock(); + try { + recordingManager.delete(recording); + } catch (IOException e) { + LOG.error("Couldn't delete recording {}", recording, e); + } finally { + recordingsLock.unlock(); + } + } + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + } + }); + completionHandler.setName("CompletionHandler"); + completionHandler.setDaemon(true); + completionHandler.start(); + + scheduler.scheduleWithFixedDelay(() -> { + try { + if (!recordingProcesses.isEmpty() && !enoughSpaceForRecording()) { + LOG.info("No space left -> Stopping all recordings"); + stopRecordingProcesses(); + } + } catch (IOException e) { + LOG.error("Couldn't check space left on device", e); + } + }, 1, 1, TimeUnit.SECONDS); + } + + private void setRecordingStatus(Recording recording, State status) { + recording.setStatus(status); + RecordingStateChangedEvent evt = new RecordingStateChangedEvent(recording.getDownload().getTarget(), status, recording.getModel(), + recording.getStartDate()); + EventBusHolder.BUS.post(evt); + } + + @Override + public void startRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { + if (!models.contains(model)) { + LOG.info("Model {} added", model); + modelLock.lock(); + try { + models.add(model); + config.getSettings().models.add(model); + config.save(); + } catch (IOException e) { + LOG.error("Couldn't save config", e); + } finally { + modelLock.unlock(); + } + + // try to start the recording immediately + try { + if (model.isOnline()) { + startRecordingProcess(model); + } + } catch (ExecutionException | InterruptedException e) { + } + } + } + + private void startRecordingProcess(Model model) throws IOException { + recordingsLock.lock(); + try { + if (!recording) { + // recorder is not in recording mode + return; + } + + if (model.isSuspended()) { + LOG.info("Recording for model {} is suspended.", model); + return; + } + + if (recordingProcesses.containsKey(model)) { + LOG.error("A recording for model {} is already running", model); + return; + } + + modelLock.lock(); + try { + if (!models.contains(model)) { + LOG.info("Model {} has been removed. Restarting of recording cancelled.", model); + return; + } + } finally { + modelLock.unlock(); + } + + if (!enoughSpaceForRecording()) { + long now = System.currentTimeMillis(); + if ((now - lastSpaceMessage) > TimeUnit.MINUTES.toMillis(1)) { + LOG.info("Not enough space for recording, not starting recording for {}", model); + lastSpaceMessage = now; + } + return; + } + + if (!downloadSlotAvailable()) { + LOG.info("The number of downloads is maxed out, not starting recording for {}", model); + return; + } + + LOG.debug("Starting recording for model {}", model.getName()); + Download download = model.createDownload(); + download.init(config, model); + LOG.debug("Downloading with {}", download.getClass().getSimpleName()); + + Recording rec = new Recording(); + rec.setDownload(download); + rec.setPath(download.getPath(model)); + rec.setModel(model); + rec.setStartDate(Instant.now()); + recordingProcesses.put(model, rec); + recordingManager.add(rec); + completionService.submit(() -> { + try { + setRecordingStatus(rec, State.RECORDING); + recordingManager.saveRecording(rec); + download.start(); + boolean deleted = deleteIfTooShort(rec); + setRecordingStatus(rec, deleted ? State.DELETED : State.WAITING); + recordingManager.saveRecording(rec); + } catch (IOException e) { + LOG.error("Download for {} failed. Download state: {}", model.getName(), rec.getStatus(), e); + } + return rec; + }); + } finally { + recordingsLock.unlock(); + } + } + + private boolean deleteIfTooShort(Recording rec) throws IOException, ParseException, PlaylistException { + Duration minimumLengthInSeconds = Duration.ofSeconds(Config.getInstance().getSettings().minimumLengthInSeconds); + if (minimumLengthInSeconds.getSeconds() <= 0) { + return false; + } + + Duration recordingLength = rec.getLength(); + if (recordingLength.compareTo(minimumLengthInSeconds) < 0) { + recordingManager.delete(rec); + return true; + } + + return false; + } + + @Override + public void stopRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { + modelLock.lock(); + try { + if (models.contains(model)) { + models.remove(model); + config.getSettings().models.remove(model); + LOG.info("Model {} removed", model); + config.save(); + } else { + throw new NoSuchElementException("Model " + model.getName() + " [" + model.getUrl() + "] not found in list of recorded models"); + } + } finally { + modelLock.unlock(); + } + + if (recordingProcesses.containsKey(model)) { + Recording recording = recordingProcesses.get(model); + recording.getDownload().stop(); + } + } + + @Override + public void switchStreamSource(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { + if (models.contains(model)) { + int index = models.indexOf(model); + models.get(index).setStreamUrlIndex(model.getStreamUrlIndex()); + config.save(); + LOG.debug("Switching stream source to index {} for model {}", model.getStreamUrlIndex(), model.getName()); + Recording recording = recordingProcesses.get(model); + if (recording != null) { + stopRecordingProcess(model); + } + tryRestartRecording(model); + } else { + LOG.warn("Couldn't switch stream source for model {}. Not found in list", model.getName()); + return; + } + } + + private void stopRecordingProcess(Model model) { + LOG.debug("Stopping recording for {}", model); + Recording recording = recordingProcesses.get(model); + LOG.debug("Stopping download for {}", model); + recording.getDownload().stop(); + } + + private void stopRecordingProcesses() { + recordingsLock.lock(); + try { + for (Recording recording : recordingProcesses.values()) { + recording.getDownload().stop(); + } + } finally { + recordingsLock.unlock(); + } + } + + @Override + public boolean isTracked(Model model) { + modelLock.lock(); + try { + return models.contains(model); + } finally { + modelLock.unlock(); + } + } + + @Override + public List getModels() { + modelLock.lock(); + try { + return new ArrayList<>(models); + } finally { + modelLock.unlock(); + } + } + + @Override + public List getRecordings() throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { + recordingsLock.lock(); + try { + return recordingManager.getAll(); + } finally { + recordingsLock.unlock(); + } + } + + @Override + public void delete(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { + recordingManager.delete(recording); + } + + @Override + public void shutdown() { + // TODO add a config flag for waitign or stopping immediately + LOG.info("Shutting down"); + recording = false; + + LOG.debug("Stopping all recording processes"); + for (Recording rec : recordingProcesses.values()) { + Optional.ofNullable(rec.getDownload()).ifPresent(Download::stop); + } + + // wait for post-processing to finish + LOG.info("Waiting for downloads to finish"); + while (!recordingProcesses.isEmpty()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.error("Error while waiting for downloads to finish", e); + } + } + + // shutdown threadpools + try { + LOG.info("Shutting down pools"); + downloadPool.shutdown(); + ppPool.shutdown(); + client.shutdown(); + downloadPool.awaitTermination(1, TimeUnit.MINUTES); + LOG.info("Waiting for post-processing to finish"); + ppPool.awaitTermination(10, TimeUnit.MINUTES); + } catch (InterruptedException e) { + LOG.error("Error while waiting for pools to finish", e); + } + } + + @Override + public void suspendRecording(Model model) { + modelLock.lock(); + try { + if (models.contains(model)) { + int index = models.indexOf(model); + models.get(index).setSuspended(true); + model.setSuspended(true); + config.save(); + } else { + LOG.warn("Couldn't suspend model {}. Not found in list", model.getName()); + return; + } + } catch (IOException e) { + LOG.error("Couldn't save config", e); + } finally { + modelLock.unlock(); + } + + Recording recording = recordingProcesses.get(model); + Optional.ofNullable(recording).map(Recording::getDownload).ifPresent(Download::stop); + } + + @Override + public void resumeRecording(Model model) throws IOException { + modelLock.lock(); + try { + if (models.contains(model)) { + int index = models.indexOf(model); + Model m = models.get(index); + m.setSuspended(false); + if (m.isOnline()) { + startRecordingProcess(m); + } + model.setSuspended(false); + config.save(); + } else { + LOG.warn("Couldn't resume model {}. Not found in list", model.getName()); + return; + } + } catch (ExecutionException | InterruptedException e) { + LOG.error("Couldn't check, if model {} is online", model.getName()); + } finally { + modelLock.unlock(); + } + } + + @Override + public boolean isSuspended(Model model) { + modelLock.lock(); + try { + int index = models.indexOf(model); + if (index >= 0) { + Model m = models.get(index); + return m.isSuspended(); + } else { + return false; + } + } finally { + modelLock.unlock(); + } + } + + @Override + public List getOnlineModels() { + return getModels().stream().filter(m -> { + try { + return m.isOnline(); + } catch (IOException | ExecutionException | InterruptedException e) { + return false; + } + }).collect(Collectors.toList()); + } + + @Override + public HttpClient getHttpClient() { + return client; + } + + @Override + public long getTotalSpaceBytes() throws IOException { + return getRecordingsFileStore().getTotalSpace(); + } + + @Override + public long getFreeSpaceBytes() throws IOException { + return getRecordingsFileStore().getUsableSpace(); + } + + private FileStore getRecordingsFileStore() throws IOException { + File recordingsDir = new File(config.getSettings().recordingsDir); + if (!recordingsDir.exists()) { + Files.createDirectories(recordingsDir.toPath()); + } + FileStore store = Files.getFileStore(recordingsDir.toPath()); + return store; + } + + private boolean enoughSpaceForRecording() throws IOException { + long minimum = config.getSettings().minimumSpaceLeftInBytes; + if (minimum == 0) { // 0 means don't check + return true; + } else { + return getFreeSpaceBytes() > minimum; + } + } + + private boolean downloadSlotAvailable() { + int concurrentRecordings = Config.getInstance().getSettings().concurrentRecordings; + return concurrentRecordings == 0 || concurrentRecordings > 0 && recordingProcesses.size() < concurrentRecordings; + } + + private void tryRestartRecording(Model model) { + if (!recording) { + // recorder is not in recording state + return; + } + + try { + boolean modelInRecordingList = isTracked(model); + boolean online = model.isOnline(IGNORE_CACHE); + if (modelInRecordingList && online) { + LOG.info("Restarting recording for model {}", model); + startRecordingProcess(model); + } + } catch (Exception e) { + LOG.error("Couldn't restart recording for model {}", model); + } + } + + private void registerEventBusListener() { + EventBusHolder.BUS.register(new Object() { + @Subscribe + public void modelEvent(Event e) { + try { + if (e.getType() == MODEL_ONLINE) { + ModelIsOnlineEvent evt = (ModelIsOnlineEvent) e; + Model model = evt.getModel(); + if (!isSuspended(model) && !recordingProcesses.containsKey(model)) { + startRecordingProcess(model); + } + } + } catch (Exception e1) { + LOG.error("Error while handling model state changed event {}", e, e1); + } + } + }); + } + + private ThreadFactory createThreadFactory(String name) { + return new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName(name + " " + UUID.randomUUID().toString().substring(0, 8)); + t.setDaemon(true); + return t; + } + }; + } +} diff --git a/common/src/main/java/ctbrec/recorder/PlaylistGenerator.java b/common/src/main/java/ctbrec/recorder/PlaylistGenerator.java index c55db001..7656df2d 100644 --- a/common/src/main/java/ctbrec/recorder/PlaylistGenerator.java +++ b/common/src/main/java/ctbrec/recorder/PlaylistGenerator.java @@ -148,7 +148,7 @@ public class PlaylistGenerator { if(segments.length == 0) { throw new InvalidPlaylistException("No segments found. Playlist is empty"); } else if(segments.length != playlistSize) { - throw new InvalidPlaylistException("Playlist size and amount of segments differ"); + throw new InvalidPlaylistException("Playlist size and amount of segments differ (" + segments.length + " != " + playlistSize + ")"); } else { LOG.debug("Generated playlist looks good"); } diff --git a/common/src/main/java/ctbrec/recorder/Recorder.java b/common/src/main/java/ctbrec/recorder/Recorder.java index 0a578ce1..1306a6c6 100644 --- a/common/src/main/java/ctbrec/recorder/Recorder.java +++ b/common/src/main/java/ctbrec/recorder/Recorder.java @@ -59,7 +59,7 @@ public interface Recorder { List recordings = getRecordings(); return getModels().stream().filter(m -> { for (Recording recording : recordings) { - if (recording.getStatus() == Recording.State.RECORDING && recording.getModelName().equals(m.getSanitizedNamed())) { + if (recording.getStatus() == Recording.State.RECORDING && recording.getModel().equals(m)) { return true; } } @@ -82,16 +82,4 @@ public interface Recorder { * @throws IOException */ public long getFreeSpaceBytes() throws IOException; - - /** - * Regenerate the playlist for a recording. This is helpful, if the - * playlist is corrupt or hasn't been generated for whatever reason - * @param recording - * @throws IllegalStateException - * @throws NoSuchAlgorithmException - * @throws InvalidKeyException - * @throws IOException - */ - public void regeneratePlaylist(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException; - } diff --git a/common/src/main/java/ctbrec/recorder/RecordingFileMonitor.java b/common/src/main/java/ctbrec/recorder/RecordingFileMonitor.java new file mode 100644 index 00000000..0a5fe253 --- /dev/null +++ b/common/src/main/java/ctbrec/recorder/RecordingFileMonitor.java @@ -0,0 +1,236 @@ +package ctbrec.recorder; + +import static java.nio.file.StandardWatchEventKinds.*; + +import java.io.File; +import java.io.IOException; +import java.nio.file.ClosedWatchServiceException; +import java.nio.file.FileSystems; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.LinkOption; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import ctbrec.Config; +import ctbrec.Recording; + +public class RecordingFileMonitor { + + private static final transient Logger LOG = LoggerFactory.getLogger(RecordingFileMonitor.class); + private WatchService watcher; + private Map keys; + private boolean running = true; + private RecordingManager manager; + + public RecordingFileMonitor(RecordingManager manager) throws IOException { + this.manager = manager; + this.watcher = FileSystems.getDefault().newWatchService(); + this.keys = new HashMap<>(); + registerAll(new File(Config.getInstance().getSettings().recordingsDir).toPath()); + } + + void processEvents() { + while (running) { + // wait for key to be signalled + WatchKey key; + try { + key = watcher.take(); + } catch (InterruptedException | ClosedWatchServiceException x) { + return; + } + + Path dir = keys.get(key); + if (dir == null) { + LOG.error("WatchKey not recognized!!"); + continue; + } + + List> events = key.pollEvents(); + LOG.debug("Size: {}", events.size()); + if (isRenameProcess(events)) { + handleRename(dir, events); + } else { + for (WatchEvent event : events) { + WatchEvent.Kind kind = event.kind(); + + // TBD - provide example of how OVERFLOW event is handled + if (kind == OVERFLOW) { + continue; + } + + // Context for directory entry event is the file name of entry + WatchEvent ev = cast(event); + Path name = ev.context(); + Path child = dir.resolve(name); + + if(Files.isRegularFile(child)) { + if (kind == ENTRY_CREATE) { + handleFileCreation(child); + } else if (kind == ENTRY_DELETE) { + handleFileDeletion(child); + } + } else { + if (kind == ENTRY_CREATE) { + handleDirCreation(child); + } else if (kind == ENTRY_DELETE) { + handleDirDeletion(child); + } + } + } + } + + // reset key and remove from set if directory no longer accessible + boolean valid = key.reset(); + if (!valid) { + keys.remove(key); + + // all directories are inaccessible + if (keys.isEmpty()) { + break; + } + } + } + } + + private void handleRename(Path dir, List> events) { + WatchEvent deleteEvent = cast(events.get(0)); + WatchEvent createEvent = cast(events.get(1)); + Path from = dir.resolve(deleteEvent.context()); + Path to = dir.resolve(createEvent.context()); + LOG.debug("{} -> {}", from, to); + List affectedRecordings = getAffectedRecordings(from); + adjustPaths(affectedRecordings, from, to); + if (Files.isDirectory(to, LinkOption.NOFOLLOW_LINKS)) { + unregister(from); + try { + registerAll(to); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + private List getAffectedRecordings(Path from) { + String f = from.toAbsolutePath().toString(); + List affected = new ArrayList<>(); + for (Recording rec : manager.getAll()) { + String r = rec.getAbsoluteFile().getAbsolutePath(); + if (r.startsWith(f)) { + affected.add(rec); + } + } + return affected; + } + + private void adjustPaths(List affectedRecordings, Path from, Path to) { + for (Recording rec : affectedRecordings) { + String oldPath = rec.getAbsoluteFile().getAbsolutePath(); + String newPath = oldPath.replace(from.toString(), to.toString()); + String recordingsDir = Config.getInstance().getSettings().recordingsDir; + String relativePath = newPath.replaceFirst(Pattern.quote(recordingsDir), ""); + LOG.debug("Recording path has changed {} -> {}", rec.getPath(), relativePath); + rec.setPath(relativePath); + try { + manager.saveRecording(rec); + } catch (IOException e) { + LOG.error("Couldn't update recording path in meta data file", e); + } + } + } + + private void handleFileCreation(Path child) { + LOG.trace("File created {}", child); + } + + private void handleFileDeletion(Path child) { + LOG.trace("File deleted {}", child); + } + + private void handleDirCreation(Path dir) { + try { + registerAll(dir); + LOG.trace("Directory added {}", dir); + } catch (IOException x) { + // ignore to keep sample readbale + } + } + + private void handleDirDeletion(Path dir) { + // TODO unregister key ?!? + + // only delete directories, which have actually been deleted + if(Files.notExists(dir, LinkOption.NOFOLLOW_LINKS)) { + LOG.trace("Directory Deleted {}", dir); + } + } + + private boolean isRenameProcess(List> events) { + if(events.size() == 2) { + boolean deleteFirst = events.get(0).kind() == ENTRY_DELETE; + boolean createSecond = events.get(1).kind() == ENTRY_CREATE; + return deleteFirst && createSecond; + } else { + return false; + } + } + + /** + * Register the given directory, and all its sub-directories, with the + * WatchService. + */ + private void registerAll(final Path start) throws IOException { + // register directory and sub-directories + Files.walkFileTree(start, new SimpleFileVisitor() { + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) { + register(dir); + return FileVisitResult.CONTINUE; + } + }); + } + + /** + * Register the given directory with the WatchService + */ + void register(Path dir) { + try { + WatchKey key = dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE); + keys.put(key, dir); + LOG.debug("Monitor {}", dir); + } catch(IOException e) { + LOG.warn("Couldn't register directory monitor for directory {}", dir, e); + } + } + + public void unregister(Path path) { + + } + + @SuppressWarnings("unchecked") + static WatchEvent cast(WatchEvent event) { + return (WatchEvent) event; + } + + public void addDirectory(Path dir) throws IOException { + LOG.info("Adding monitor for {}", dir); + registerAll(dir); + } + + public void stop() throws IOException { + running = false; + watcher.close(); + } +} diff --git a/common/src/main/java/ctbrec/recorder/RecordingManager.java b/common/src/main/java/ctbrec/recorder/RecordingManager.java new file mode 100644 index 00000000..fc89380e --- /dev/null +++ b/common/src/main/java/ctbrec/recorder/RecordingManager.java @@ -0,0 +1,186 @@ +package ctbrec.recorder; + +import static java.nio.file.StandardOpenOption.*; + +import java.io.File; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.file.Files; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.squareup.moshi.JsonAdapter; +import com.squareup.moshi.Moshi; + +import ctbrec.Config; +import ctbrec.Model; +import ctbrec.Recording; +import ctbrec.Recording.State; +import ctbrec.io.InstantJsonAdapter; +import ctbrec.io.ModelJsonAdapter; +import ctbrec.sites.Site; + +public class RecordingManager { + private static final transient Logger LOG = LoggerFactory.getLogger(RecordingManager.class); + + private Config config; + private Moshi moshi; + private JsonAdapter adapter; + private List recordings = new ArrayList<>(); + // private RecordingFileMonitor monitor = new RecordingFileMonitor(this); + + public RecordingManager(Config config, List sites) throws IOException { + this.config = config; + moshi = new Moshi.Builder() + .add(Model.class, new ModelJsonAdapter(sites)) + .add(Instant.class, new InstantJsonAdapter()) + .build(); + adapter = moshi.adapter(Recording.class).indent(" "); + + loadRecordings(); + // startMonitoring(); + } + + public void add(Recording rec) throws UnsupportedEncodingException, IOException { + saveRecording(rec); + recordings.add(rec); + // registerFileWatch(rec); + } + + public void saveRecording(Recording rec) throws UnsupportedEncodingException, IOException { + String json = adapter.toJson(rec); + File recordingsMetaDir = getDir(); + Files.createDirectories(recordingsMetaDir.toPath()); + String filename = rec.toString() + ".json"; + File recordingMetaData = new File(recordingsMetaDir, filename); + rec.setMetaDataFile(recordingMetaData.getAbsolutePath()); + Files.write(recordingMetaData.toPath(), json.getBytes("utf-8"), CREATE, WRITE, TRUNCATE_EXISTING); + } + + private void loadRecordings() throws IOException { + File recordingsMetaDir = getDir(); + File[] metaFiles = recordingsMetaDir.listFiles((file, name) -> name.endsWith(".json")); + if (metaFiles != null) { + for (File file : metaFiles) { + String json = new String(Files.readAllBytes(file.toPath()), "utf-8"); + Recording recording = adapter.fromJson(json); + if (recording.getStatus() == State.RECORDING) { + recording.setStatus(State.WAITING); + } + if (recordingExists(recording)) { + recordings.add(recording); + } else { + LOG.info("Recording {} does not exist anymore -> deleting meta data", recording); + Files.deleteIfExists(new File(recording.getMetaDataFile()).toPath()); + } + } + } + } + + // private void startMonitoring() { + // for (Recording recording : recordings) { + // registerFileWatch(recording); + // } + // Thread watcher = new Thread(() -> monitor.processEvents()); + // watcher.setDaemon(true); + // watcher.setPriority(Thread.MIN_PRIORITY); + // watcher.setName("RecordingFileMonitor"); + // watcher.start(); + // } + // + // private void registerFileWatch(Recording recording) { + // File rec = recording.getAbsoluteFile(); + // if (rec.isDirectory()) { + // monitor.register(rec.toPath()); + // } else { + // monitor.register(rec.getParentFile().toPath()); + // } + // } + // + // private void removeFileWatch(Recording recording) { + // File rec = recording.getAbsoluteFile(); + // if (rec.isDirectory()) { + // monitor.unregister(rec.toPath()); + // } else { + // monitor.unregister(rec.getParentFile().toPath()); + // } + // } + + private boolean recordingExists(Recording recording) { + File rec = new File(config.getSettings().recordingsDir, recording.getPath()); + return rec.exists(); + } + + private File getDir() { + File configDir = config.getConfigDir(); + File recordingsMetaDir = new File(configDir, "recordings"); + return recordingsMetaDir; + } + + public void delete(Recording recording) throws IOException { + recording.setStatus(State.DELETING); + File recordingsDir = new File(config.getSettings().recordingsDir); + File path = new File(recordingsDir, recording.getPath()); + LOG.debug("Deleting {}", path); + + // delete the video files + if (path.isFile()) { + Files.delete(path.toPath()); + deleteEmptyParents(path.getParentFile()); + } else { + deleteDirectory(path); + deleteEmptyParents(path); + } + + // delete the meta data + Files.deleteIfExists(new File(recording.getMetaDataFile()).toPath()); + + // remove from data structure + recordings.remove(recording); + recording.setStatus(State.DELETED); + + // removeFileWatch(recording); + } + + public List getAll() { + return new ArrayList<>(recordings); + } + + private void deleteEmptyParents(File parent) throws IOException { + File recDir = new File(Config.getInstance().getSettings().recordingsDir); + while (parent != null && parent.list() != null && parent.list().length == 0) { + if (parent.equals(recDir)) { + return; + } + LOG.debug("Deleting empty directory {}", parent.getAbsolutePath()); + Files.delete(parent.toPath()); + parent = parent.getParentFile(); + } + } + + private void deleteDirectory(File directory) throws IOException { + if (!directory.exists()) { + return; + } + + File[] files = directory.listFiles(); + boolean deletedAllFiles = true; + for (File file : files) { + try { + LOG.trace("Deleting {}", file.getAbsolutePath()); + Files.delete(file.toPath()); + } catch (Exception e) { + deletedAllFiles = false; + LOG.debug("Couldn't delete {}", file, e); + } + } + + if (!deletedAllFiles) { + throw new IOException("Couldn't delete all files in " + directory); + } + } +} diff --git a/common/src/main/java/ctbrec/recorder/RemoteRecorder.java b/common/src/main/java/ctbrec/recorder/RemoteRecorder.java index 498e1578..8fe3b2bb 100644 --- a/common/src/main/java/ctbrec/recorder/RemoteRecorder.java +++ b/common/src/main/java/ctbrec/recorder/RemoteRecorder.java @@ -283,7 +283,7 @@ public class RemoteRecorder implements Recorder { if(newRecording.getStatus() != recording.getStatus()) { File file = new File(recording.getPath()); Model m = new UnknownModel(); - m.setName(newRecording.getModelName()); + m.setName(newRecording.getModel().getName()); RecordingStateChangedEvent evt = new RecordingStateChangedEvent(file, newRecording.getStatus(), m, recording.getStartDate()); EventBusHolder.BUS.post(evt); } @@ -432,26 +432,4 @@ public class RemoteRecorder implements Recorder { public long getFreeSpaceBytes() { return spaceFree; } - - @Override - public void regeneratePlaylist(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { - String msg = "{\"action\": \"regeneratePlaylist\", \"recording\": \""+recording.getPath()+"\"}"; - RequestBody body = RequestBody.create(JSON, msg); - Request.Builder builder = new Request.Builder() - .url("http://" + config.getSettings().httpServer + ":" + config.getSettings().httpPort + "/rec") - .post(body); - addHmacIfNeeded(msg, builder); - Request request = builder.build(); - try(Response response = client.execute(request)) { - String json = response.body().string(); - RecordingListResponse resp = recordingListResponseAdapter.fromJson(json); - if(response.isSuccessful()) { - if(!resp.status.equals("success")) { - throw new IOException("Couldn't regenerate playlist for recording: " + resp.msg); - } - } else { - throw new IOException("Couldn't regenerate playlist recording: " + resp.msg); - } - } - } } diff --git a/common/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java b/common/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java index 88bf03aa..8e006798 100644 --- a/common/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java @@ -15,6 +15,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -35,6 +36,8 @@ import com.iheartradio.m3u8.data.TrackData; import ctbrec.Config; import ctbrec.Model; import ctbrec.OS; +import ctbrec.Recording; +import ctbrec.Recording.State; import ctbrec.UnknownModel; import ctbrec.io.HttpClient; import ctbrec.io.HttpException; @@ -45,19 +48,30 @@ import okhttp3.Response; public abstract class AbstractHlsDownload implements Download { private static final transient Logger LOG = LoggerFactory.getLogger(AbstractHlsDownload.class); + private static int threadCounter = 0; protected HttpClient client; protected volatile boolean running = false; - protected volatile boolean alive = true; protected Instant startTime; protected Model model = new UnknownModel(); protected BlockingQueue downloadQueue = new LinkedBlockingQueue<>(50); - protected ExecutorService downloadThreadPool = new ThreadPoolExecutor(5, 5, 2, TimeUnit.MINUTES, downloadQueue); + protected ExecutorService downloadThreadPool = new ThreadPoolExecutor(5, 5, 2, TimeUnit.MINUTES, downloadQueue, createThreadFactory()); + protected State state = State.UNKNOWN; + private int playlistEmptyCount = 0; public AbstractHlsDownload(HttpClient client) { this.client = client; } + private ThreadFactory createThreadFactory() { + return r -> { + Thread t = new Thread(r); + t.setDaemon(true); + t.setName("SegmentDownloadThread-" + threadCounter++); + return t; + }; + } + protected SegmentPlaylist getNextSegments(String segmentsURL) throws IOException, ParseException, PlaylistException { URL segmentsUrl = new URL(segmentsURL); Request request = new Request.Builder() @@ -148,11 +162,24 @@ public abstract class AbstractHlsDownload implements Download { return url; } - @Override - public boolean isAlive() { - return alive; + protected void emptyPlaylistCheck(SegmentPlaylist playlist) { + if(playlist.segments.isEmpty()) { + playlistEmptyCount++; + try { + Thread.sleep(6000); + } catch (InterruptedException e) { + } + } else { + playlistEmptyCount = 0; + } + if(playlistEmptyCount == 10) { + LOG.info("Last 10 playlists were empty for {}. Stopping recording!", getModel()); + internalStop(); + } } + abstract void internalStop(); + @Override public Instant getStartTime() { return startTime; @@ -164,8 +191,8 @@ public abstract class AbstractHlsDownload implements Download { } @Override - public void postprocess(File target) { - runPostProcessingScript(target); + public void postprocess(Recording recording) { + runPostProcessingScript(recording.getAbsoluteFile()); } private void runPostProcessingScript(File target) { diff --git a/common/src/main/java/ctbrec/recorder/download/Download.java b/common/src/main/java/ctbrec/recorder/download/Download.java index cd0d02d7..074d903c 100644 --- a/common/src/main/java/ctbrec/recorder/download/Download.java +++ b/common/src/main/java/ctbrec/recorder/download/Download.java @@ -6,13 +6,29 @@ import java.time.Instant; import ctbrec.Config; import ctbrec.Model; +import ctbrec.Recording; public interface Download { - public void start(Model model, Config config) throws IOException; + public void init(Config config, Model model); + public void start() throws IOException; public void stop(); - public boolean isAlive(); - public File getTarget(); public Model getModel(); public Instant getStartTime(); - public void postprocess(File target); + public void postprocess(Recording recording); + + /** + * Returns the path to the recording in the filesystem as file object + * @param model + * @return + * @see #getPath(Model) + */ + public File getTarget(); + + /** + * Returns the path to the recording starting from the configured recordings directory + * @param model + * @return + * @see #getTarget() + */ + public String getPath(Model model); } diff --git a/common/src/main/java/ctbrec/recorder/download/HlsDownload.java b/common/src/main/java/ctbrec/recorder/download/HlsDownload.java index c3e105c1..8a9500cd 100644 --- a/common/src/main/java/ctbrec/recorder/download/HlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/HlsDownload.java @@ -16,23 +16,33 @@ import java.nio.file.Path; import java.text.DecimalFormat; import java.text.NumberFormat; import java.text.SimpleDateFormat; +import java.time.Duration; import java.time.Instant; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.Date; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.iheartradio.m3u8.ParseException; +import com.iheartradio.m3u8.PlaylistError; import com.iheartradio.m3u8.PlaylistException; import ctbrec.Config; import ctbrec.Model; +import ctbrec.Recording; import ctbrec.event.EventBusHolder; import ctbrec.event.RecordingStateChangedEvent; import ctbrec.io.HttpClient; import ctbrec.io.HttpException; +import ctbrec.recorder.PlaylistGenerator; +import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException; import okhttp3.Request; import okhttp3.Response; @@ -45,21 +55,29 @@ public class HlsDownload extends AbstractHlsDownload { private int segmentCounter = 1; private NumberFormat nf = new DecimalFormat("000000"); private Object downloadFinished = new Object(); + private ZonedDateTime splitRecStartTime; + private Config config; public HlsDownload(HttpClient client) { super(client); } @Override - public void start(Model model, Config config) throws IOException { + public void init(Config config, Model model) { + this.config = config; + super.model = model; + startTime = Instant.now(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.RECORDING_DATE_FORMAT); + String startTime = formatter.format(this.startTime); + Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getSanitizedNamed()); + downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), startTime); + } + + @Override + public void start() throws IOException { try { running = true; - startTime = Instant.now(); - super.model = model; - SimpleDateFormat sdf = new SimpleDateFormat(Config.RECORDING_DATE_FORMAT); - String startTime = sdf.format(new Date()); - Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getSanitizedNamed()); - downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), startTime); + splitRecStartTime = ZonedDateTime.now(); if(!model.isOnline()) { throw new IOException(model.getName() +"'s room is not public"); @@ -74,30 +92,36 @@ public class HlsDownload extends AbstractHlsDownload { if (!Files.exists(downloadDir, LinkOption.NOFOLLOW_LINKS)) { Files.createDirectories(downloadDir); } - int lastSegment = 0; - int nextSegment = 0; + int lastSegmentNumber = 0; + int nextSegmentNumber = 0; int waitFactor = 1; while(running) { SegmentPlaylist playlist = getNextSegments(segments); - if(nextSegment > 0 && playlist.seq > nextSegment) { + emptyPlaylistCheck(playlist); + if(nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) { // TODO switch to a lower bitrate/resolution ?!? waitFactor *= 2; - LOG.warn("Missed segments {} < {} in download for {} - setting wait factor to 1/{}", nextSegment, playlist.seq, model, waitFactor); + LOG.warn("Missed segments {} < {} in download for {} - setting wait factor to 1/{}", nextSegmentNumber, playlist.seq, model, waitFactor); } - int skip = nextSegment - playlist.seq; + int skip = nextSegmentNumber - playlist.seq; + Future lastSegmentDownload = null; for (String segment : playlist.segments) { if(skip > 0) { skip--; } else { URL segmentUrl = new URL(segment); String prefix = nf.format(segmentCounter++); - downloadThreadPool.submit(new SegmentDownload(playlist, segmentUrl, downloadDir, client, prefix)); + SegmentDownload segmentDownload = new SegmentDownload(playlist, segmentUrl, downloadDir, client, prefix); + lastSegmentDownload = downloadThreadPool.submit(segmentDownload); //new SegmentDownload(segment, downloadDir).call(); } } + // split recordings + splitRecording(lastSegmentDownload); + long wait = 0; - if(lastSegment == playlist.seq) { + if(lastSegmentNumber == playlist.seq) { // playlist didn't change -> wait for at least half the target duration wait = (long) playlist.targetDuration * 1000 / waitFactor; LOG.trace("Playlist didn't change... waiting for {}ms", wait); @@ -117,9 +141,9 @@ public class HlsDownload extends AbstractHlsDownload { // this if check makes sure, that we don't decrease nextSegment. for some reason // streamate playlists sometimes jump back. e.g. max sequence = 79 -> 80 -> 79 - lastSegment = playlist.seq; - if(lastSegment + playlist.segments.size() > nextSegment) { - nextSegment = lastSegment + playlist.segments.size(); + lastSegmentNumber = playlist.seq; + if(lastSegmentNumber + playlist.segments.size() > nextSegmentNumber) { + nextSegmentNumber = lastSegmentNumber + playlist.segments.size(); } } } else { @@ -148,7 +172,6 @@ public class HlsDownload extends AbstractHlsDownload { LOG.debug("Waiting for last segments for {}", model); downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS); } catch (InterruptedException e) {} - alive = false; synchronized (downloadFinished) { downloadFinished.notifyAll(); } @@ -156,9 +179,81 @@ public class HlsDownload extends AbstractHlsDownload { } } + @Override + public void postprocess(Recording recording) { + generatePlaylist(recording.getAbsoluteFile()); + super.postprocess(recording); + } + + private void generatePlaylist(File recDir) { + if(!config.getSettings().generatePlaylist) { + return; + } + + PlaylistGenerator playlistGenerator = new PlaylistGenerator(); + + try { + File playlist = playlistGenerator.generate(recDir); + if(playlist != null) { + playlistGenerator.validate(recDir); + } + } catch (IOException | ParseException e) { + LOG.error("Couldn't generate playlist file", e); + } catch (PlaylistException e) { + if(e.getErrors().isEmpty()) { + LOG.error("Couldn't generate playlist file", e); + } else { + LOG.error("Playlist contains errors"); + for (PlaylistError error : e.getErrors()) { + LOG.error("Error: {}", error.toString()); + } + } + } catch (InvalidPlaylistException e) { + LOG.error("Playlist is invalid and will be deleted", e); + File playlist = new File(recDir, "playlist.m3u8"); + playlist.delete(); + } + } + + private void splitRecording(Future lastSegmentDownload) { + if(config.getSettings().splitRecordings > 0) { + Duration recordingDuration = Duration.between(splitRecStartTime, ZonedDateTime.now()); + long seconds = recordingDuration.getSeconds(); + if(seconds >= config.getSettings().splitRecordings) { + File lastTargetFile = downloadDir.toFile(); + + // switch to the next dir + SimpleDateFormat sdf = new SimpleDateFormat(Config.RECORDING_DATE_FORMAT); + super.startTime = Instant.now(); + String startTime = sdf.format(new Date()); + Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getSanitizedNamed()); + LOG.debug("Switching to {}", downloadDir); + downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), startTime); + downloadDir.toFile().mkdirs(); + splitRecStartTime = ZonedDateTime.now(); + + // post-process current recording + LOG.debug("Running post-processing for {}", lastTargetFile); + Thread pp = new Thread(() -> { + if(lastSegmentDownload != null) { + // wait for last segment in this directory + try { + lastSegmentDownload.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Couldn't wait for last segment to arrive in this directory. Playlist might be inclomplete", e); + } + } + }); + pp.setName("Post-Processing split recording"); + pp.setPriority(Thread.MIN_PRIORITY); + pp.start(); + } + } + } + @Override public void stop() { - running = false; + internalStop(); try { synchronized (downloadFinished) { downloadFinished.wait(); @@ -168,6 +263,11 @@ public class HlsDownload extends AbstractHlsDownload { } } + @Override + void internalStop() { + running = false; + } + private static class SegmentDownload implements Callable { private URL url; private Path file; @@ -225,4 +325,12 @@ public class HlsDownload extends AbstractHlsDownload { public File getTarget() { return downloadDir.toFile(); } + + @Override + public String getPath(Model model) { + String absolutePath = getTarget().getAbsolutePath(); + String recordingsDir = Config.getInstance().getSettings().recordingsDir; + String relativePath = absolutePath.replaceFirst(Pattern.quote(recordingsDir), ""); + return relativePath; + } } diff --git a/common/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java b/common/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java index 0e468cb0..8d6cd53c 100644 --- a/common/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java @@ -1,6 +1,5 @@ package ctbrec.recorder.download; -import static ctbrec.Recording.State.*; import static java.nio.file.StandardOpenOption.*; import java.io.ByteArrayInputStream; @@ -26,6 +25,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,8 +41,6 @@ import com.iheartradio.m3u8.PlaylistException; import ctbrec.Config; import ctbrec.Hmac; import ctbrec.Model; -import ctbrec.event.EventBusHolder; -import ctbrec.event.RecordingStateChangedEvent; import ctbrec.io.HttpClient; import ctbrec.io.HttpException; import ctbrec.recorder.ProgressListener; @@ -66,6 +64,13 @@ public class MergedHlsDownload extends AbstractHlsDownload { super(client); } + @Override + public void init(Config config, Model model) { + this.config = config; + this.model = model; + targetFile = Config.getInstance().getFileForRecording(model, "ts"); + } + @Override public File getTarget() { return targetFile; @@ -110,7 +115,6 @@ public class MergedHlsDownload extends AbstractHlsDownload { LOG.debug("Waiting for last segments for {}", model); downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS); } catch (InterruptedException e) {} - alive = false; synchronized (downloadFinished) { downloadFinished.notifyAll(); } @@ -119,8 +123,7 @@ public class MergedHlsDownload extends AbstractHlsDownload { } @Override - public void start(Model model, Config config) throws IOException { - this.config = config; + public void start() throws IOException { try { if(!model.isOnline(IGNORE_CACHE)) { throw new IOException(model.getName() +"'s room is not public"); @@ -130,20 +133,15 @@ public class MergedHlsDownload extends AbstractHlsDownload { super.startTime = Instant.now(); splitRecStartTime = ZonedDateTime.now(); super.model = model; - targetFile = Config.getInstance().getFileForRecording(model, "ts"); - - // let the world know, that we are recording now - RecordingStateChangedEvent evt = new RecordingStateChangedEvent(getTarget(), RECORDING, model, getStartTime()); - EventBusHolder.BUS.post(evt); String segments = getSegmentPlaylistUrl(model); mergeThread = createMergeThread(targetFile, null, true); mergeThread.start(); - if(segments != null) { - downloadSegments(segments, true); - if(config.getSettings().splitRecordings > 0) { + if (segments != null) { + if (config.getSettings().splitRecordings > 0) { LOG.debug("Splitting recordings every {} seconds", config.getSettings().splitRecordings); } + downloadSegments(segments, true); } else { throw new IOException("Couldn't determine segments uri"); } @@ -157,10 +155,10 @@ public class MergedHlsDownload extends AbstractHlsDownload { } catch(Exception e) { throw new IOException("Couldn't download segment", e); } finally { - if(streamer != null) { + if (streamer != null) { try { streamer.stop(); - } catch(Exception e) { + } catch (Exception e) { LOG.error("Couldn't stop streamer", e); } } @@ -169,8 +167,8 @@ public class MergedHlsDownload extends AbstractHlsDownload { LOG.debug("Waiting for last segments for {}", model); downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS); } catch (InterruptedException e) {} - alive = false; synchronized (downloadFinished) { + LOG.debug("Download finished notify {}", model); downloadFinished.notifyAll(); } LOG.debug("Download for {} terminated", model); @@ -180,14 +178,10 @@ public class MergedHlsDownload extends AbstractHlsDownload { private void downloadSegments(String segmentPlaylistUri, boolean livestreamDownload) throws IOException, ParseException, PlaylistException { int lastSegment = 0; int nextSegment = 0; - long playlistNotFoundFirstEncounter = -1; while(running) { try { - if(playlistNotFoundFirstEncounter != -1) { - LOG.debug("Downloading playlist {}", segmentPlaylistUri); - } SegmentPlaylist lsp = getNextSegments(segmentPlaylistUri); - playlistNotFoundFirstEncounter = -1; + emptyPlaylistCheck(lsp); if(!livestreamDownload) { multiSource.setTotalSegments(lsp.segments.size()); } @@ -209,7 +203,10 @@ public class MergedHlsDownload extends AbstractHlsDownload { if(livestreamDownload) { // split up the recording, if configured - splitRecording(); + boolean split = splitRecording(); + if (split) { + break; + } // wait some time until requesting the segment playlist again to not hammer the server waitForNewSegments(lsp, lastSegment, downloadTookMillis); @@ -246,9 +243,6 @@ public class MergedHlsDownload extends AbstractHlsDownload { private void downloadNewSegments(SegmentPlaylist lsp, int nextSegment) throws MalformedURLException, MissingSegmentException, ExecutionException, HttpException { int skip = nextSegment - lsp.seq; - if(lsp.segments.isEmpty()) { - LOG.debug("Empty playlist: {}", lsp.url); - } // add segments to download threadpool Queue> downloads = new LinkedList<>(); @@ -319,34 +313,37 @@ public class MergedHlsDownload extends AbstractHlsDownload { multiSource.addSource(source); } - private void splitRecording() { + private boolean splitRecording() { if(config.getSettings().splitRecordings > 0) { Duration recordingDuration = Duration.between(splitRecStartTime, ZonedDateTime.now()); long seconds = recordingDuration.getSeconds(); if(seconds >= config.getSettings().splitRecordings) { - try { - File lastTargetFile = targetFile; - - // switch to the next file - targetFile = Config.getInstance().getFileForRecording(model, "ts"); - LOG.debug("Switching to file {}", targetFile.getAbsolutePath()); - fileChannel = FileChannel.open(targetFile.toPath(), CREATE, WRITE); - MTSSink sink = ByteChannelSink.builder().setByteChannel(fileChannel).build(); - streamer.switchSink(sink); - super.startTime = Instant.now(); - splitRecStartTime = ZonedDateTime.now(); - - // post-process current recording - Thread pp = new Thread(() -> postprocess(lastTargetFile)); - pp.setName("Post-Processing split recording"); - pp.setPriority(Thread.MIN_PRIORITY); - pp.start(); - } catch (IOException e) { - LOG.error("Error while splitting recording", e); - running = false; - } + internalStop(); + return true; + // try { + // File lastTargetFile = targetFile; + // + // // switch to the next file + // targetFile = Config.getInstance().getFileForRecording(model, "ts"); + // LOG.debug("Switching to file {}", targetFile.getAbsolutePath()); + // fileChannel = FileChannel.open(targetFile.toPath(), CREATE, WRITE); + // MTSSink sink = ByteChannelSink.builder().setByteChannel(fileChannel).build(); + // streamer.switchSink(sink); + // super.startTime = Instant.now(); + // splitRecStartTime = ZonedDateTime.now(); + // + // // post-process current recording + // Thread pp = new Thread(() -> postprocess(lastTargetFile)); + // pp.setName("Post-Processing split recording"); + // pp.setPriority(Thread.MIN_PRIORITY); + // pp.start(); + // } catch (IOException e) { + // LOG.error("Error while splitting recording", e); + // running = false; + // } } } + return false; } private void waitForNewSegments(SegmentPlaylist lsp, int lastSegment, long downloadTookMillis) { @@ -377,12 +374,10 @@ public class MergedHlsDownload extends AbstractHlsDownload { @Override public void stop() { - running = false; - if(streamer != null) { - streamer.stop(); - } + internalStop(); try { synchronized (downloadFinished) { + LOG.debug("Waiting for finished notify {}", model); downloadFinished.wait(); } } catch (InterruptedException e) { @@ -391,6 +386,15 @@ public class MergedHlsDownload extends AbstractHlsDownload { LOG.debug("Download stopped"); } + @Override + void internalStop() { + running = false; + if (streamer != null) { + streamer.stop(); + streamer = null; + } + } + private Thread createMergeThread(File targetFile, ProgressListener listener, boolean liveStream) { Thread t = new Thread(() -> { multiSource = BlockingMultiMTSSource.builder() @@ -418,10 +422,10 @@ public class MergedHlsDownload extends AbstractHlsDownload { streamer.stream(); LOG.debug("Streamer finished"); } catch (InterruptedException e) { - if(running) { + if (running) { LOG.error("Error while waiting for a download future", e); } - } catch(Exception e) { + } catch (Exception e) { LOG.error("Error while saving stream to file", e); } finally { deleteEmptyRecording(targetFile); @@ -504,4 +508,12 @@ public class MergedHlsDownload extends AbstractHlsDownload { return false; } } + + @Override + public String getPath(Model model) { + String absolutePath = targetFile.getAbsolutePath(); + String recordingsDir = Config.getInstance().getSettings().recordingsDir; + String relativePath = absolutePath.replaceFirst(Pattern.quote(recordingsDir), ""); + return relativePath; + } } diff --git a/common/src/main/java/ctbrec/sites/chaturbate/ChaturbateModel.java b/common/src/main/java/ctbrec/sites/chaturbate/ChaturbateModel.java index 338571ea..9921faa4 100644 --- a/common/src/main/java/ctbrec/sites/chaturbate/ChaturbateModel.java +++ b/common/src/main/java/ctbrec/sites/chaturbate/ChaturbateModel.java @@ -355,12 +355,12 @@ public class ChaturbateModel extends AbstractModel { } private void acquireSlot() throws InterruptedException { - LOG.debug("Acquire: {}", requestThrottle.availablePermits()); + //LOG.debug("Acquire: {}", requestThrottle.availablePermits()); requestThrottle.acquire(); long now = System.currentTimeMillis(); long millisSinceLastRequest = now - lastRequest; if(millisSinceLastRequest < 500) { - LOG.debug("Sleeping: {}", (500-millisSinceLastRequest)); + //LOG.debug("Sleeping: {}", (500-millisSinceLastRequest)); Thread.sleep(500 - millisSinceLastRequest); } } @@ -368,6 +368,6 @@ public class ChaturbateModel extends AbstractModel { private void releaseSlot() { lastRequest = System.currentTimeMillis(); requestThrottle.release(); - LOG.debug("Release: {}", requestThrottle.availablePermits()); + //LOG.debug("Release: {}", requestThrottle.availablePermits()); } } diff --git a/common/src/main/java/ctbrec/sites/fc2live/Fc2HlsDownload.java b/common/src/main/java/ctbrec/sites/fc2live/Fc2HlsDownload.java index 1515593f..d847c7c4 100644 --- a/common/src/main/java/ctbrec/sites/fc2live/Fc2HlsDownload.java +++ b/common/src/main/java/ctbrec/sites/fc2live/Fc2HlsDownload.java @@ -19,11 +19,16 @@ public class Fc2HlsDownload extends HlsDownload { } @Override - public void start(Model model, Config config) throws IOException { + public void init(Config config, Model model) { + super.init(config, model); + } + + @Override + public void start() throws IOException { Fc2Model fc2Model = (Fc2Model) model; try { fc2Model.openWebsocket(); - super.start(model, config); + super.start(); } catch (InterruptedException e) { LOG.error("Couldn't start download for {}", model, e); } finally { diff --git a/common/src/main/java/ctbrec/sites/fc2live/Fc2MergedHlsDownload.java b/common/src/main/java/ctbrec/sites/fc2live/Fc2MergedHlsDownload.java index 9b6d1235..7483bbe1 100644 --- a/common/src/main/java/ctbrec/sites/fc2live/Fc2MergedHlsDownload.java +++ b/common/src/main/java/ctbrec/sites/fc2live/Fc2MergedHlsDownload.java @@ -19,11 +19,16 @@ public class Fc2MergedHlsDownload extends MergedHlsDownload { } @Override - public void start(Model model, Config config) throws IOException { + public void init(Config config, Model model) { + super.init(config, model); + } + + @Override + public void start() throws IOException { Fc2Model fc2Model = (Fc2Model) model; try { fc2Model.openWebsocket(); - super.start(model, config); + super.start(); } catch (InterruptedException e) { LOG.error("Couldn't start download for {}", model, e); } finally { diff --git a/common/src/main/java/ctbrec/sites/flirt4free/Flirt4FreeModel.java b/common/src/main/java/ctbrec/sites/flirt4free/Flirt4FreeModel.java index 5d56fa24..96c85e95 100644 --- a/common/src/main/java/ctbrec/sites/flirt4free/Flirt4FreeModel.java +++ b/common/src/main/java/ctbrec/sites/flirt4free/Flirt4FreeModel.java @@ -470,12 +470,12 @@ public class Flirt4FreeModel extends AbstractModel { } private void acquireSlot() throws InterruptedException { - LOG.debug("Acquire: {}", requestThrottle.availablePermits()); + //LOG.debug("Acquire: {}", requestThrottle.availablePermits()); requestThrottle.acquire(); long now = System.currentTimeMillis(); long millisSinceLastRequest = now - lastRequest; if(millisSinceLastRequest < 500) { - LOG.debug("Sleeping: {}", (500-millisSinceLastRequest)); + //LOG.debug("Sleeping: {}", (500-millisSinceLastRequest)); Thread.sleep(500 - millisSinceLastRequest); } } @@ -483,6 +483,6 @@ public class Flirt4FreeModel extends AbstractModel { private void releaseSlot() { lastRequest = System.currentTimeMillis(); requestThrottle.release(); - LOG.debug("Release: {}", requestThrottle.availablePermits()); + //LOG.debug("Release: {}", requestThrottle.availablePermits()); } } diff --git a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminChunkedHttpDownload.java b/common/src/main/java/ctbrec/sites/jasmin/LiveJasminChunkedHttpDownload.java index 6dca1b99..440e63fe 100644 --- a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminChunkedHttpDownload.java +++ b/common/src/main/java/ctbrec/sites/jasmin/LiveJasminChunkedHttpDownload.java @@ -8,6 +8,7 @@ import java.net.URLEncoder; import java.nio.file.Files; import java.time.Instant; import java.util.Random; +import java.util.regex.Pattern; import org.json.JSONArray; import org.json.JSONObject; @@ -16,7 +17,7 @@ import org.slf4j.LoggerFactory; import ctbrec.Config; import ctbrec.Model; -import ctbrec.Recording.State; +import ctbrec.Recording; import ctbrec.io.HttpClient; import ctbrec.recorder.download.Download; import okhttp3.Request; @@ -55,11 +56,14 @@ public class LiveJasminChunkedHttpDownload implements Download { } @Override - public void start(Model model, Config config) throws IOException { + public void init(Config config, Model model) { this.model = model; - startTime = Instant.now(); - targetFile = config.getFileForRecording(model, "mp4"); + this.startTime = Instant.now(); + this.targetFile = config.getFileForRecording(model, "mp4"); + } + @Override + public void start() throws IOException { getPerformerDetails(model.getName()); try { getStreamPath(); @@ -287,11 +291,14 @@ public class LiveJasminChunkedHttpDownload implements Download { } @Override - public void postprocess(File target) { + public void postprocess(Recording recording) { } @Override - public State getState() { - return State.UNKNOWN; + public String getPath(Model model) { + String absolutePath = targetFile.getAbsolutePath(); + String recordingsDir = Config.getInstance().getSettings().recordingsDir; + String relativePath = absolutePath.replaceFirst(Pattern.quote(recordingsDir), ""); + return relativePath; } } diff --git a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminWebSocketDownload.java b/common/src/main/java/ctbrec/sites/jasmin/LiveJasminWebSocketDownload.java index 3ad4beeb..51790670 100644 --- a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminWebSocketDownload.java +++ b/common/src/main/java/ctbrec/sites/jasmin/LiveJasminWebSocketDownload.java @@ -7,6 +7,7 @@ import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.nio.file.Files; import java.time.Instant; +import java.util.regex.Pattern; import org.json.JSONArray; import org.json.JSONObject; @@ -17,7 +18,7 @@ import com.google.common.eventbus.Subscribe; import ctbrec.Config; import ctbrec.Model; -import ctbrec.Recording.State; +import ctbrec.Recording; import ctbrec.event.Event; import ctbrec.event.EventBusHolder; import ctbrec.event.ModelStateChangedEvent; @@ -56,11 +57,14 @@ public class LiveJasminWebSocketDownload implements Download { } @Override - public void start(Model model, Config config) throws IOException { + public void init(Config config, Model model) { this.model = model; - startTime = Instant.now(); - targetFile = config.getFileForRecording(model, "mp4"); + this.startTime = Instant.now(); + this.targetFile = config.getFileForRecording(model, "mp4"); + } + @Override + public void start() throws IOException { getPerformerDetails(model.getName()); LOG.debug("appid: {}", applicationId); LOG.debug("sessionid: {}",sessionId); @@ -351,11 +355,14 @@ public class LiveJasminWebSocketDownload implements Download { } @Override - public void postprocess(File target) { + public void postprocess(Recording recording) { } @Override - public State getState() { - return State.UNKNOWN; + public String getPath(Model model) { + String absolutePath = targetFile.getAbsolutePath(); + String recordingsDir = Config.getInstance().getSettings().recordingsDir; + String relativePath = absolutePath.replaceFirst(Pattern.quote(recordingsDir), ""); + return relativePath; } } diff --git a/server/.classpath b/server/.classpath index cf2b0ce9..cd04a794 100644 --- a/server/.classpath +++ b/server/.classpath @@ -1,7 +1,16 @@ - - + + + + + + + + + + + @@ -12,5 +21,12 @@ + + + + + + + diff --git a/server/.settings/org.eclipse.core.resources.prefs b/server/.settings/org.eclipse.core.resources.prefs index 99f26c02..abdea9ac 100644 --- a/server/.settings/org.eclipse.core.resources.prefs +++ b/server/.settings/org.eclipse.core.resources.prefs @@ -1,2 +1,4 @@ eclipse.preferences.version=1 +encoding//src/main/java=UTF-8 +encoding//src/main/resources=UTF-8 encoding/=UTF-8 diff --git a/server/src/main/java/ctbrec/recorder/server/HttpServer.java b/server/src/main/java/ctbrec/recorder/server/HttpServer.java index e2451224..d2bb6d13 100644 --- a/server/src/main/java/ctbrec/recorder/server/HttpServer.java +++ b/server/src/main/java/ctbrec/recorder/server/HttpServer.java @@ -24,7 +24,7 @@ import ctbrec.Version; import ctbrec.event.EventBusHolder; import ctbrec.event.EventHandler; import ctbrec.event.EventHandlerConfiguration; -import ctbrec.recorder.LocalRecorder; +import ctbrec.recorder.NextGenLocalRecorder; import ctbrec.recorder.OnlineMonitor; import ctbrec.recorder.Recorder; import ctbrec.sites.Site; @@ -69,7 +69,7 @@ public class HttpServer { if(config.getSettings().key != null) { LOG.info("HMAC authentication is enabled"); } - recorder = new LocalRecorder(config); + recorder = new NextGenLocalRecorder(config, sites); for (Site site : sites) { if(site.isEnabled()) { site.init(); diff --git a/server/src/main/java/ctbrec/recorder/server/RecorderServlet.java b/server/src/main/java/ctbrec/recorder/server/RecorderServlet.java index 44a894ec..e75efc05 100644 --- a/server/src/main/java/ctbrec/recorder/server/RecorderServlet.java +++ b/server/src/main/java/ctbrec/recorder/server/RecorderServlet.java @@ -3,6 +3,8 @@ package ctbrec.recorder.server; import static javax.servlet.http.HttpServletResponse.*; import java.io.IOException; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; import java.time.Instant; import java.util.Iterator; import java.util.List; @@ -68,8 +70,14 @@ public class RecorderServlet extends AbstractCtbrecServlet { resp.getWriter().write(response); break; case "stop": - response = "{\"status\": \"success\", \"msg\": \"Recording stopped\"}"; - recorder.stopRecording(request.model); + new Thread(() -> { + try { + recorder.stopRecording(request.model); + } catch (InvalidKeyException | NoSuchAlgorithmException | IllegalStateException | IOException e) { + LOG.error("Couldn't stop recording for model {}", request.model, e); + } + }).start(); + response = "{\"status\": \"success\", \"msg\": \"Stopping recording\"}"; resp.getWriter().write(response); break; case "list": @@ -113,22 +121,14 @@ public class RecorderServlet extends AbstractCtbrecServlet { break; case "delete": String path = request.recording; - Recording rec = new Recording(path); + Recording rec = new Recording(); + rec.setPath(path); recorder.delete(rec); recAdapter = moshi.adapter(Recording.class); resp.getWriter().write("{\"status\": \"success\", \"msg\": \"List of recordings\", \"recordings\": ["); resp.getWriter().write(recAdapter.toJson(rec)); resp.getWriter().write("]}"); break; - case "regeneratePlaylist": - path = request.recording; - rec = new Recording(path); - recorder.regeneratePlaylist(rec); - recAdapter = moshi.adapter(Recording.class); - resp.getWriter().write("{\"status\": \"success\", \"msg\": \"List of recordings\", \"recordings\": ["); - resp.getWriter().write(recAdapter.toJson(rec)); - resp.getWriter().write("]}"); - break; case "switch": recorder.switchStreamSource(request.model); response = "{\"status\": \"success\", \"msg\": \"Resolution switched\"}"; @@ -136,8 +136,14 @@ public class RecorderServlet extends AbstractCtbrecServlet { break; case "suspend": LOG.debug("Suspend recording for model {} - {}", request.model.getName(), request.model.getUrl()); - recorder.suspendRecording(request.model); - response = "{\"status\": \"success\", \"msg\": \"Recording suspended\"}"; + new Thread(() -> { + try { + recorder.suspendRecording(request.model); + } catch (InvalidKeyException | NoSuchAlgorithmException | IllegalStateException | IOException e) { + LOG.error("Couldn't suspend recording for model {}", request.model, e); + } + }).start(); + response = "{\"status\": \"success\", \"msg\": \"Suspending recording\"}"; resp.getWriter().write(response); break; case "resume":