diff --git a/src/main/java/ctbrec/recorder/LocalRecorder.java b/src/main/java/ctbrec/recorder/LocalRecorder.java index 77578872..ac61ba58 100644 --- a/src/main/java/ctbrec/recorder/LocalRecorder.java +++ b/src/main/java/ctbrec/recorder/LocalRecorder.java @@ -34,6 +34,7 @@ import ctbrec.ModelParser; import ctbrec.Recording; import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException; import ctbrec.recorder.download.Download; +import ctbrec.recorder.download.HlsDownload; import ctbrec.recorder.download.MergedHlsDownload; import okhttp3.Request; import okhttp3.Response; @@ -67,8 +68,11 @@ public class LocalRecorder implements Recorder { processMonitor.start(); onlineMonitor = new OnlineMonitor(); onlineMonitor.start(); + playlistGenTrigger = new PlaylistGeneratorTrigger(); - playlistGenTrigger.start(); + if(Config.getInstance().isServerMode()) { + playlistGenTrigger.start(); + } if (config.getSettings().recordFollowed) { followedMonitor = new FollowedMonitor(); @@ -118,8 +122,13 @@ public class LocalRecorder implements Recorder { return; } - //Download download = new HlsDownload(client); - Download download = new MergedHlsDownload(client); + Download download; + if (Config.getInstance().isServerMode()) { + download = new HlsDownload(client); + } else { + download = new MergedHlsDownload(client); + } + recordingProcesses.put(model, download); new Thread() { @Override @@ -298,8 +307,7 @@ public class LocalRecorder implements Recorder { Thread t = new Thread() { @Override public void run() { - boolean local = Config.getInstance().getSettings().localRecording; - if(!local) { + if(Config.getInstance().isServerMode()) { generatePlaylist(directory); } } diff --git a/src/main/java/ctbrec/recorder/PlaylistGenerator.java b/src/main/java/ctbrec/recorder/PlaylistGenerator.java index 04c136f8..df31463b 100644 --- a/src/main/java/ctbrec/recorder/PlaylistGenerator.java +++ b/src/main/java/ctbrec/recorder/PlaylistGenerator.java @@ -47,9 +47,7 @@ public class PlaylistGenerator { public File generate(File directory) throws IOException, ParseException, PlaylistException { LOG.debug("Starting playlist generation for {}", directory); // get a list of all ts files and sort them by sequence - File[] files = directory.listFiles((f) -> { - return f.getName().endsWith(".ts") && !f.getName().startsWith("record"); - }); + File[] files = directory.listFiles((f) -> f.getName().endsWith(".ts")); if(files.length == 0) { LOG.debug("{} is empty. Not going to generate a playlist", directory); return null; diff --git a/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java b/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java index cb4a35f4..acfa8990 100644 --- a/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java +++ b/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java @@ -67,7 +67,7 @@ public abstract class AbstractHlsDownload implements Download { } } - LiveStreamingPlaylist getNextSegments(String segments) throws IOException, ParseException, PlaylistException { + SegmentPlaylist getNextSegments(String segments) throws IOException, ParseException, PlaylistException { URL segmentsUrl = new URL(segments); Request request = new Request.Builder().url(segmentsUrl).addHeader("connection", "keep-alive").build(); Response response = client.execute(request); @@ -77,7 +77,7 @@ public abstract class AbstractHlsDownload implements Download { Playlist playlist = parser.parse(); if(playlist.hasMediaPlaylist()) { MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist(); - LiveStreamingPlaylist lsp = new LiveStreamingPlaylist(); + SegmentPlaylist lsp = new SegmentPlaylist(); lsp.seq = mediaPlaylist.getMediaSequenceNumber(); lsp.targetDuration = mediaPlaylist.getTargetDuration(); List tracks = mediaPlaylist.getTracks(); @@ -110,7 +110,7 @@ public abstract class AbstractHlsDownload implements Download { return downloadDir.toFile(); } - public static class LiveStreamingPlaylist { + public static class SegmentPlaylist { public int seq = 0; public float totalDuration = 0; public float lastSegDuration = 0; diff --git a/src/main/java/ctbrec/recorder/download/HlsDownload.java b/src/main/java/ctbrec/recorder/download/HlsDownload.java index df7732ab..2ca9f0ee 100644 --- a/src/main/java/ctbrec/recorder/download/HlsDownload.java +++ b/src/main/java/ctbrec/recorder/download/HlsDownload.java @@ -60,7 +60,7 @@ public class HlsDownload extends AbstractHlsDownload { int lastSegment = 0; int nextSegment = 0; while(running) { - LiveStreamingPlaylist lsp = getNextSegments(segments); + SegmentPlaylist lsp = getNextSegments(segments); if(nextSegment > 0 && lsp.seq > nextSegment) { LOG.warn("Missed segments {} < {} in download for {}", nextSegment, lsp.seq, model); String first = lsp.segments.get(0); diff --git a/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java b/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java index 9cbe5d03..337a9468 100644 --- a/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java +++ b/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java @@ -37,6 +37,7 @@ import ctbrec.Config; import ctbrec.HttpClient; import ctbrec.Model; import ctbrec.recorder.Chaturbate; +import ctbrec.recorder.ProgressListener; import ctbrec.recorder.StreamInfo; import okhttp3.Request; import okhttp3.Response; @@ -54,6 +55,30 @@ public class MergedHlsDownload extends AbstractHlsDownload { super(client); } + public void start(String segmentPlaylistUri, File target, ProgressListener progressListener) throws IOException { + try { + running = true; + mergeThread = createMergeThread(target, progressListener); + mergeThread.start(); + handoverThread = createHandoverThread(); + handoverThread.start(); + + downloadSegments(segmentPlaylistUri, false); + } catch(ParseException e) { + throw new IOException("Couldn't parse stream information", e); + } catch(PlaylistException e) { + throw new IOException("Couldn't parse HLS playlist", e); + } catch(EOFException e) { + // end of playlist reached + LOG.debug("Reached end of playlist"); + } catch(Exception e) { + throw new IOException("Couldn't download segment", e); + } finally { + alive = false; + LOG.debug("Download for terminated"); + } + } + @Override public void start(Model model, Config config) throws IOException { try { @@ -71,62 +96,15 @@ public class MergedHlsDownload extends AbstractHlsDownload { Files.createDirectories(downloadDir); } - mergeThread = createMergeThread(downloadDir); + File targetFile = new File(downloadDir.toFile(), "record.ts"); + mergeThread = createMergeThread(targetFile, null); mergeThread.start(); handoverThread = createHandoverThread(); handoverThread.start(); String segments = parseMaster(streamInfo.url, model.getStreamUrlIndex()); if(segments != null) { - int lastSegment = 0; - int nextSegment = 0; - while(running) { - LiveStreamingPlaylist lsp = getNextSegments(segments); - if(nextSegment > 0 && lsp.seq > nextSegment) { - LOG.warn("Missed segments {} < {} in download for {}", nextSegment, lsp.seq, model); - String first = lsp.segments.get(0); - int seq = lsp.seq; - for (int i = nextSegment; i < lsp.seq; i++) { - URL segmentUrl = new URL(first.replaceAll(Integer.toString(seq), Integer.toString(i))); - LOG.debug("Loading missed segment {} for model {}", i, model.getName()); - Future downloadFuture = downloadThreadPool.submit(new SegmentDownload(segmentUrl, client)); - mergeQueue.add(downloadFuture); - } - // TODO switch to a lower bitrate/resolution ?!? - } - int skip = nextSegment - lsp.seq; - for (String segment : lsp.segments) { - if(skip > 0) { - skip--; - } else { - URL segmentUrl = new URL(segment); - Future downloadFuture = downloadThreadPool.submit(new SegmentDownload(segmentUrl, client)); - mergeQueue.add(downloadFuture); - } - } - - long wait = 0; - if(lastSegment == lsp.seq) { - // playlist didn't change -> wait for at least half the target duration - wait = (long) lsp.targetDuration * 1000 / 2; - LOG.trace("Playlist didn't change... waiting for {}ms", wait); - } else { - // playlist did change -> wait for at least last segment duration - wait = 1;//(long) lsp.lastSegDuration * 1000; - LOG.trace("Playlist changed... waiting for {}ms", wait); - } - - try { - Thread.sleep(wait); - } catch (InterruptedException e) { - if(running) { - LOG.error("Couldn't sleep between segment downloads. This might mess up the download!"); - } - } - - lastSegment = lsp.seq; - nextSegment = lastSegment + lsp.segments.size(); - } + downloadSegments(segments, true); } else { throw new IOException("Couldn't determine segments uri"); } @@ -145,36 +123,103 @@ public class MergedHlsDownload extends AbstractHlsDownload { } } - private Thread createHandoverThread() { - Thread t = new Thread(() -> { - while(running) { - try { - Future downloadFuture = mergeQueue.take(); - InputStream tsData = downloadFuture.get(); - InputStreamMTSSource source = InputStreamMTSSource.builder().setInputStream(tsData).build(); - multiSource.addSource(source); - } catch (InterruptedException e) { - if(running) { - LOG.error("Interrupted while waiting for a download future", e); - } - } catch (ExecutionException e) { - LOG.error("Error while opening segment stream", e); + private void downloadSegments(String segmentPlaylistUri, boolean livestreamDownload) throws IOException, ParseException, PlaylistException { + int lastSegment = 0; + int nextSegment = 0; + while(running) { + SegmentPlaylist lsp = getNextSegments(segmentPlaylistUri); + if(!livestreamDownload) { + multiSource.setTotalSegments(lsp.segments.size()); + } + if(nextSegment > 0 && lsp.seq > nextSegment) { + LOG.warn("Missed segments {} < {} in download for {}", nextSegment, lsp.seq, segmentPlaylistUri); + String first = lsp.segments.get(0); + int seq = lsp.seq; + for (int i = nextSegment; i < lsp.seq; i++) { + URL segmentUrl = new URL(first.replaceAll(Integer.toString(seq), Integer.toString(i))); + LOG.debug("Loading missed segment {} for model {}", i, segmentPlaylistUri); + Future downloadFuture = downloadThreadPool.submit(new SegmentDownload(segmentUrl, client)); + mergeQueue.add(downloadFuture); + } + // TODO switch to a lower bitrate/resolution ?!? + } + int skip = nextSegment - lsp.seq; + for (String segment : lsp.segments) { + if(skip > 0) { + skip--; + } else { + URL segmentUrl = new URL(segment); + Future downloadFuture = downloadThreadPool.submit(new SegmentDownload(segmentUrl, client)); + mergeQueue.add(downloadFuture); } } - }); + + if(livestreamDownload) { + long wait = 0; + if(lastSegment == lsp.seq) { + // playlist didn't change -> wait for at least half the target duration + wait = (long) lsp.targetDuration * 1000 / 2; + LOG.trace("Playlist didn't change... waiting for {}ms", wait); + } else { + // playlist did change -> wait for at least last segment duration + wait = 1;//(long) lsp.lastSegDuration * 1000; + LOG.trace("Playlist changed... waiting for {}ms", wait); + } + + try { + Thread.sleep(wait); + } catch (InterruptedException e) { + if(running) { + LOG.error("Couldn't sleep between segment downloads. This might mess up the download!"); + } + } + } + + lastSegment = lsp.seq; + nextSegment = lastSegment + lsp.segments.size(); + + if(!livestreamDownload) { + break; + } + } + } + + private Thread createHandoverThread() { + Thread t = new Thread() { + @Override + public void run() { + while(running) { + try { + Future downloadFuture = mergeQueue.take(); + InputStream tsData = downloadFuture.get(); + InputStreamMTSSource source = InputStreamMTSSource.builder().setInputStream(tsData).build(); + multiSource.addSource(source); + } catch (InterruptedException e) { + if(running) { + LOG.error("Interrupted while waiting for a download future", e); + } + } catch (ExecutionException e) { + LOG.error("Error while opening segment stream", e); + } + } + LOG.debug("Handover Thread finished"); + }; + }; t.setName("Segment Handover Thread"); t.setDaemon(true); return t; } - private Thread createMergeThread(Path downloadDir) { + private Thread createMergeThread(File targetFile, ProgressListener listener) { Thread t = new Thread(() -> { - multiSource = BlockingMultiMTSSource.builder().setFixContinuity(true).build(); + multiSource = BlockingMultiMTSSource.builder() + .setFixContinuity(true) + .setProgressListener(listener) + .build(); - File out = new File(downloadDir.toFile(), "record.ts"); FileChannel channel = null; try { - channel = FileChannel.open(out.toPath(), CREATE, WRITE); + channel = FileChannel.open(targetFile.toPath(), CREATE, WRITE); MTSSink sink = ByteChannelSink.builder().setByteChannel(channel).build(); streamer = Streamer.builder() @@ -186,6 +231,7 @@ public class MergedHlsDownload extends AbstractHlsDownload { // Start streaming streamer.stream(); + LOG.debug("Streamer finished"); } catch (InterruptedException e) { if(running) { LOG.error("Error while waiting for a download future", e); @@ -196,7 +242,7 @@ public class MergedHlsDownload extends AbstractHlsDownload { try { channel.close(); } catch (IOException e) { - LOG.error("Error while closing file {}", out); + LOG.error("Error while closing file {}", targetFile); } } }); diff --git a/src/main/java/ctbrec/ui/RecordingsTab.java b/src/main/java/ctbrec/ui/RecordingsTab.java index 08017ebc..f8074472 100644 --- a/src/main/java/ctbrec/ui/RecordingsTab.java +++ b/src/main/java/ctbrec/ui/RecordingsTab.java @@ -5,9 +5,7 @@ import static javafx.scene.control.ButtonType.YES; import java.io.File; import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; import java.net.URL; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; @@ -22,19 +20,15 @@ import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.iheartradio.m3u8.Encoding; -import com.iheartradio.m3u8.Format; import com.iheartradio.m3u8.ParseException; import com.iheartradio.m3u8.PlaylistException; -import com.iheartradio.m3u8.PlaylistParser; -import com.iheartradio.m3u8.data.MediaPlaylist; -import com.iheartradio.m3u8.data.Playlist; -import com.iheartradio.m3u8.data.TrackData; import ctbrec.Config; +import ctbrec.HttpClient; import ctbrec.Recording; import ctbrec.Recording.STATUS; import ctbrec.recorder.Recorder; +import ctbrec.recorder.download.MergedHlsDownload; import javafx.application.Platform; import javafx.collections.FXCollections; import javafx.collections.ObservableList; @@ -279,39 +273,24 @@ public class RecordingsTab extends Tab implements TabSelectionListener { String hlsBase = "http://" + config.getSettings().httpServer + ":" + config.getSettings().httpPort + "/hls"; URL url = new URL(hlsBase + "/" + recording.getPath() + "/playlist.m3u8"); LOG.info("Downloading {}", recording.getPath()); - PlaylistParser parser = new PlaylistParser(url.openStream(), Format.EXT_M3U, Encoding.UTF_8); - Playlist playlist = parser.parse(); - MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist(); - List tracks = mediaPlaylist.getTracks(); - List segmentUris = new ArrayList<>(); - for (TrackData trackData : tracks) { - String segmentUri = hlsBase + "/" + recording.getPath() + "/" + trackData.getUri(); - segmentUris.add(segmentUri); - } Thread t = new Thread() { @Override public void run() { - try(FileOutputStream fos = new FileOutputStream(target)) { - for (int i = 0; i < segmentUris.size(); i++) { - URL segment = new URL(segmentUris.get(i)); - InputStream in = segment.openStream(); - byte[] b = new byte[1024]; - int length = -1; - while( (length = in.read(b)) >= 0 ) { - fos.write(b, 0, length); - } - in.close(); - int progress = (int) (i * 100.0 / segmentUris.size()); - Platform.runLater(new Runnable() { - @Override - public void run() { + try { + MergedHlsDownload download = new MergedHlsDownload(HttpClient.getInstance()); + download.start(url.toString(), target, (progress) -> { + Platform.runLater(() -> { + if (progress == 100) { + recording.setStatus(STATUS.FINISHED); + recording.setProgress(-1); + LOG.debug("Download finished for recording {}", recording.getPath()); + } else { recording.setStatus(STATUS.DOWNLOADING); recording.setProgress(progress); } }); - } - + }); } catch (FileNotFoundException e) { showErrorDialog("Error while downloading recording", "The target file couldn't be created", e); LOG.error("Error while downloading recording", e); @@ -332,9 +311,86 @@ public class RecordingsTab extends Tab implements TabSelectionListener { t.setDaemon(true); t.setName("Download Thread " + recording.getPath()); t.start(); + + recording.setStatus(STATUS.DOWNLOADING); + recording.setProgress(0); } } + // private void download(Recording recording) throws IOException, ParseException, PlaylistException { + // String filename = recording.getPath().replaceAll("/", "-") + ".ts"; + // FileChooser chooser = new FileChooser(); + // chooser.setInitialFileName(filename); + // if(config.getSettings().lastDownloadDir != null && !config.getSettings().lastDownloadDir.equals("")) { + // File dir = new File(config.getSettings().lastDownloadDir); + // while(!dir.exists()) { + // dir = dir.getParentFile(); + // } + // chooser.setInitialDirectory(dir); + // } + // File target = chooser.showSaveDialog(null); + // if(target != null) { + // config.getSettings().lastDownloadDir = target.getParent(); + // String hlsBase = "http://" + config.getSettings().httpServer + ":" + config.getSettings().httpPort + "/hls"; + // URL url = new URL(hlsBase + "/" + recording.getPath() + "/playlist.m3u8"); + // LOG.info("Downloading {}", recording.getPath()); + // + // PlaylistParser parser = new PlaylistParser(url.openStream(), Format.EXT_M3U, Encoding.UTF_8); + // Playlist playlist = parser.parse(); + // MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist(); + // List tracks = mediaPlaylist.getTracks(); + // List segmentUris = new ArrayList<>(); + // for (TrackData trackData : tracks) { + // String segmentUri = hlsBase + "/" + recording.getPath() + "/" + trackData.getUri(); + // segmentUris.add(segmentUri); + // } + // + // Thread t = new Thread() { + // @Override + // public void run() { + // try(FileOutputStream fos = new FileOutputStream(target)) { + // for (int i = 0; i < segmentUris.size(); i++) { + // URL segment = new URL(segmentUris.get(i)); + // InputStream in = segment.openStream(); + // byte[] b = new byte[1024]; + // int length = -1; + // while( (length = in.read(b)) >= 0 ) { + // fos.write(b, 0, length); + // } + // in.close(); + // int progress = (int) (i * 100.0 / segmentUris.size()); + // Platform.runLater(new Runnable() { + // @Override + // public void run() { + // recording.setStatus(STATUS.DOWNLOADING); + // recording.setProgress(progress); + // } + // }); + // } + // + // } catch (FileNotFoundException e) { + // showErrorDialog("Error while downloading recording", "The target file couldn't be created", e); + // LOG.error("Error while downloading recording", e); + // } catch (IOException e) { + // showErrorDialog("Error while downloading recording", "The recording could not be downloaded", e); + // LOG.error("Error while downloading recording", e); + // } finally { + // Platform.runLater(new Runnable() { + // @Override + // public void run() { + // recording.setStatus(STATUS.FINISHED); + // recording.setProgress(-1); + // } + // }); + // } + // } + // }; + // t.setDaemon(true); + // t.setName("Download Thread " + recording.getPath()); + // t.start(); + // } + // } + private void showErrorDialog(final String title, final String msg, final Exception e) { Platform.runLater(new Runnable() { @Override diff --git a/src/main/java/org/taktik/mpegts/sources/BlockingMultiMTSSource.java b/src/main/java/org/taktik/mpegts/sources/BlockingMultiMTSSource.java index bca00c7f..1ca82c03 100644 --- a/src/main/java/org/taktik/mpegts/sources/BlockingMultiMTSSource.java +++ b/src/main/java/org/taktik/mpegts/sources/BlockingMultiMTSSource.java @@ -3,15 +3,24 @@ package org.taktik.mpegts.sources; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.taktik.mpegts.MTSPacket; +import ctbrec.recorder.ProgressListener; + public class BlockingMultiMTSSource extends AbstractMTSSource implements AutoCloseable { + private static final transient Logger LOG = LoggerFactory.getLogger(BlockingMultiMTSSource.class); + private final boolean fixContinuity; private ContinuityFixer continuityFixer; private final BlockingQueue sources; private MTSSource currentSource; + private int downloadedSegments = 0; + private int totalSegments = -1; + private ProgressListener listener; private BlockingMultiMTSSource(boolean fixContinuity) { this.fixContinuity = fixContinuity; @@ -35,8 +44,19 @@ public class BlockingMultiMTSSource extends AbstractMTSSource implements AutoClo if(packet == null) { // end of source has been reached, switch to the next source currentSource.close(); + downloadedSegments++; + if(listener != null && totalSegments > 0) { + int progress = (int)(downloadedSegments * 100.0 / totalSegments); + listener.update(progress); + } + if(downloadedSegments == totalSegments) { + LOG.debug("All segments written. Queue size {}", sources.size()); + return null; + } + currentSource = sources.take(); packet = currentSource.nextPacket(); + // } } if (fixContinuity) { @@ -45,6 +65,14 @@ public class BlockingMultiMTSSource extends AbstractMTSSource implements AutoClo return packet; } + private void setProgressListener(ProgressListener listener) { + this.listener = listener; + } + + public void setTotalSegments(int total) { + this.totalSegments = total; + } + @Override protected void closeInternal() throws Exception { for (MTSSource source : sources) { @@ -58,14 +86,24 @@ public class BlockingMultiMTSSource extends AbstractMTSSource implements AutoClo public static class BlockingMultiMTSSourceBuilder { boolean fixContinuity = false; + ProgressListener listener; public BlockingMultiMTSSourceBuilder setFixContinuity(boolean fixContinuity) { this.fixContinuity = fixContinuity; return this; } + public BlockingMultiMTSSourceBuilder setProgressListener(ProgressListener listener) { + this.listener = listener; + return this; + } public BlockingMultiMTSSource build() { - return new BlockingMultiMTSSource(fixContinuity); + BlockingMultiMTSSource source = new BlockingMultiMTSSource(fixContinuity); + if(listener != null) { + source.setProgressListener(listener); + } + return source; } + } }