forked from j62/ctbrec
1
0
Fork 0

Reimplement the download so that it uses the BlockingMultiMTSSource

This should create a transport stream file with proper continuity counters
and timestamps
This commit is contained in:
0xboobface 2018-09-07 23:59:37 +02:00
parent 7f881504a6
commit 8fcc8e0f91
7 changed files with 262 additions and 116 deletions

View File

@ -34,6 +34,7 @@ import ctbrec.ModelParser;
import ctbrec.Recording; import ctbrec.Recording;
import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException; import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException;
import ctbrec.recorder.download.Download; import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.HlsDownload;
import ctbrec.recorder.download.MergedHlsDownload; import ctbrec.recorder.download.MergedHlsDownload;
import okhttp3.Request; import okhttp3.Request;
import okhttp3.Response; import okhttp3.Response;
@ -67,8 +68,11 @@ public class LocalRecorder implements Recorder {
processMonitor.start(); processMonitor.start();
onlineMonitor = new OnlineMonitor(); onlineMonitor = new OnlineMonitor();
onlineMonitor.start(); onlineMonitor.start();
playlistGenTrigger = new PlaylistGeneratorTrigger(); playlistGenTrigger = new PlaylistGeneratorTrigger();
playlistGenTrigger.start(); if(Config.getInstance().isServerMode()) {
playlistGenTrigger.start();
}
if (config.getSettings().recordFollowed) { if (config.getSettings().recordFollowed) {
followedMonitor = new FollowedMonitor(); followedMonitor = new FollowedMonitor();
@ -118,8 +122,13 @@ public class LocalRecorder implements Recorder {
return; return;
} }
//Download download = new HlsDownload(client); Download download;
Download download = new MergedHlsDownload(client); if (Config.getInstance().isServerMode()) {
download = new HlsDownload(client);
} else {
download = new MergedHlsDownload(client);
}
recordingProcesses.put(model, download); recordingProcesses.put(model, download);
new Thread() { new Thread() {
@Override @Override
@ -298,8 +307,7 @@ public class LocalRecorder implements Recorder {
Thread t = new Thread() { Thread t = new Thread() {
@Override @Override
public void run() { public void run() {
boolean local = Config.getInstance().getSettings().localRecording; if(Config.getInstance().isServerMode()) {
if(!local) {
generatePlaylist(directory); generatePlaylist(directory);
} }
} }

View File

@ -47,9 +47,7 @@ public class PlaylistGenerator {
public File generate(File directory) throws IOException, ParseException, PlaylistException { public File generate(File directory) throws IOException, ParseException, PlaylistException {
LOG.debug("Starting playlist generation for {}", directory); LOG.debug("Starting playlist generation for {}", directory);
// get a list of all ts files and sort them by sequence // get a list of all ts files and sort them by sequence
File[] files = directory.listFiles((f) -> { File[] files = directory.listFiles((f) -> f.getName().endsWith(".ts"));
return f.getName().endsWith(".ts") && !f.getName().startsWith("record");
});
if(files.length == 0) { if(files.length == 0) {
LOG.debug("{} is empty. Not going to generate a playlist", directory); LOG.debug("{} is empty. Not going to generate a playlist", directory);
return null; return null;

View File

@ -67,7 +67,7 @@ public abstract class AbstractHlsDownload implements Download {
} }
} }
LiveStreamingPlaylist getNextSegments(String segments) throws IOException, ParseException, PlaylistException { SegmentPlaylist getNextSegments(String segments) throws IOException, ParseException, PlaylistException {
URL segmentsUrl = new URL(segments); URL segmentsUrl = new URL(segments);
Request request = new Request.Builder().url(segmentsUrl).addHeader("connection", "keep-alive").build(); Request request = new Request.Builder().url(segmentsUrl).addHeader("connection", "keep-alive").build();
Response response = client.execute(request); Response response = client.execute(request);
@ -77,7 +77,7 @@ public abstract class AbstractHlsDownload implements Download {
Playlist playlist = parser.parse(); Playlist playlist = parser.parse();
if(playlist.hasMediaPlaylist()) { if(playlist.hasMediaPlaylist()) {
MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist(); MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist();
LiveStreamingPlaylist lsp = new LiveStreamingPlaylist(); SegmentPlaylist lsp = new SegmentPlaylist();
lsp.seq = mediaPlaylist.getMediaSequenceNumber(); lsp.seq = mediaPlaylist.getMediaSequenceNumber();
lsp.targetDuration = mediaPlaylist.getTargetDuration(); lsp.targetDuration = mediaPlaylist.getTargetDuration();
List<TrackData> tracks = mediaPlaylist.getTracks(); List<TrackData> tracks = mediaPlaylist.getTracks();
@ -110,7 +110,7 @@ public abstract class AbstractHlsDownload implements Download {
return downloadDir.toFile(); return downloadDir.toFile();
} }
public static class LiveStreamingPlaylist { public static class SegmentPlaylist {
public int seq = 0; public int seq = 0;
public float totalDuration = 0; public float totalDuration = 0;
public float lastSegDuration = 0; public float lastSegDuration = 0;

View File

@ -60,7 +60,7 @@ public class HlsDownload extends AbstractHlsDownload {
int lastSegment = 0; int lastSegment = 0;
int nextSegment = 0; int nextSegment = 0;
while(running) { while(running) {
LiveStreamingPlaylist lsp = getNextSegments(segments); SegmentPlaylist lsp = getNextSegments(segments);
if(nextSegment > 0 && lsp.seq > nextSegment) { if(nextSegment > 0 && lsp.seq > nextSegment) {
LOG.warn("Missed segments {} < {} in download for {}", nextSegment, lsp.seq, model); LOG.warn("Missed segments {} < {} in download for {}", nextSegment, lsp.seq, model);
String first = lsp.segments.get(0); String first = lsp.segments.get(0);

View File

@ -37,6 +37,7 @@ import ctbrec.Config;
import ctbrec.HttpClient; import ctbrec.HttpClient;
import ctbrec.Model; import ctbrec.Model;
import ctbrec.recorder.Chaturbate; import ctbrec.recorder.Chaturbate;
import ctbrec.recorder.ProgressListener;
import ctbrec.recorder.StreamInfo; import ctbrec.recorder.StreamInfo;
import okhttp3.Request; import okhttp3.Request;
import okhttp3.Response; import okhttp3.Response;
@ -54,6 +55,30 @@ public class MergedHlsDownload extends AbstractHlsDownload {
super(client); super(client);
} }
public void start(String segmentPlaylistUri, File target, ProgressListener progressListener) throws IOException {
try {
running = true;
mergeThread = createMergeThread(target, progressListener);
mergeThread.start();
handoverThread = createHandoverThread();
handoverThread.start();
downloadSegments(segmentPlaylistUri, false);
} catch(ParseException e) {
throw new IOException("Couldn't parse stream information", e);
} catch(PlaylistException e) {
throw new IOException("Couldn't parse HLS playlist", e);
} catch(EOFException e) {
// end of playlist reached
LOG.debug("Reached end of playlist");
} catch(Exception e) {
throw new IOException("Couldn't download segment", e);
} finally {
alive = false;
LOG.debug("Download for terminated");
}
}
@Override @Override
public void start(Model model, Config config) throws IOException { public void start(Model model, Config config) throws IOException {
try { try {
@ -71,62 +96,15 @@ public class MergedHlsDownload extends AbstractHlsDownload {
Files.createDirectories(downloadDir); Files.createDirectories(downloadDir);
} }
mergeThread = createMergeThread(downloadDir); File targetFile = new File(downloadDir.toFile(), "record.ts");
mergeThread = createMergeThread(targetFile, null);
mergeThread.start(); mergeThread.start();
handoverThread = createHandoverThread(); handoverThread = createHandoverThread();
handoverThread.start(); handoverThread.start();
String segments = parseMaster(streamInfo.url, model.getStreamUrlIndex()); String segments = parseMaster(streamInfo.url, model.getStreamUrlIndex());
if(segments != null) { if(segments != null) {
int lastSegment = 0; downloadSegments(segments, true);
int nextSegment = 0;
while(running) {
LiveStreamingPlaylist lsp = getNextSegments(segments);
if(nextSegment > 0 && lsp.seq > nextSegment) {
LOG.warn("Missed segments {} < {} in download for {}", nextSegment, lsp.seq, model);
String first = lsp.segments.get(0);
int seq = lsp.seq;
for (int i = nextSegment; i < lsp.seq; i++) {
URL segmentUrl = new URL(first.replaceAll(Integer.toString(seq), Integer.toString(i)));
LOG.debug("Loading missed segment {} for model {}", i, model.getName());
Future<InputStream> downloadFuture = downloadThreadPool.submit(new SegmentDownload(segmentUrl, client));
mergeQueue.add(downloadFuture);
}
// TODO switch to a lower bitrate/resolution ?!?
}
int skip = nextSegment - lsp.seq;
for (String segment : lsp.segments) {
if(skip > 0) {
skip--;
} else {
URL segmentUrl = new URL(segment);
Future<InputStream> downloadFuture = downloadThreadPool.submit(new SegmentDownload(segmentUrl, client));
mergeQueue.add(downloadFuture);
}
}
long wait = 0;
if(lastSegment == lsp.seq) {
// playlist didn't change -> wait for at least half the target duration
wait = (long) lsp.targetDuration * 1000 / 2;
LOG.trace("Playlist didn't change... waiting for {}ms", wait);
} else {
// playlist did change -> wait for at least last segment duration
wait = 1;//(long) lsp.lastSegDuration * 1000;
LOG.trace("Playlist changed... waiting for {}ms", wait);
}
try {
Thread.sleep(wait);
} catch (InterruptedException e) {
if(running) {
LOG.error("Couldn't sleep between segment downloads. This might mess up the download!");
}
}
lastSegment = lsp.seq;
nextSegment = lastSegment + lsp.segments.size();
}
} else { } else {
throw new IOException("Couldn't determine segments uri"); throw new IOException("Couldn't determine segments uri");
} }
@ -145,36 +123,103 @@ public class MergedHlsDownload extends AbstractHlsDownload {
} }
} }
private Thread createHandoverThread() { private void downloadSegments(String segmentPlaylistUri, boolean livestreamDownload) throws IOException, ParseException, PlaylistException {
Thread t = new Thread(() -> { int lastSegment = 0;
while(running) { int nextSegment = 0;
try { while(running) {
Future<InputStream> downloadFuture = mergeQueue.take(); SegmentPlaylist lsp = getNextSegments(segmentPlaylistUri);
InputStream tsData = downloadFuture.get(); if(!livestreamDownload) {
InputStreamMTSSource source = InputStreamMTSSource.builder().setInputStream(tsData).build(); multiSource.setTotalSegments(lsp.segments.size());
multiSource.addSource(source); }
} catch (InterruptedException e) { if(nextSegment > 0 && lsp.seq > nextSegment) {
if(running) { LOG.warn("Missed segments {} < {} in download for {}", nextSegment, lsp.seq, segmentPlaylistUri);
LOG.error("Interrupted while waiting for a download future", e); String first = lsp.segments.get(0);
} int seq = lsp.seq;
} catch (ExecutionException e) { for (int i = nextSegment; i < lsp.seq; i++) {
LOG.error("Error while opening segment stream", e); URL segmentUrl = new URL(first.replaceAll(Integer.toString(seq), Integer.toString(i)));
LOG.debug("Loading missed segment {} for model {}", i, segmentPlaylistUri);
Future<InputStream> downloadFuture = downloadThreadPool.submit(new SegmentDownload(segmentUrl, client));
mergeQueue.add(downloadFuture);
}
// TODO switch to a lower bitrate/resolution ?!?
}
int skip = nextSegment - lsp.seq;
for (String segment : lsp.segments) {
if(skip > 0) {
skip--;
} else {
URL segmentUrl = new URL(segment);
Future<InputStream> downloadFuture = downloadThreadPool.submit(new SegmentDownload(segmentUrl, client));
mergeQueue.add(downloadFuture);
} }
} }
});
if(livestreamDownload) {
long wait = 0;
if(lastSegment == lsp.seq) {
// playlist didn't change -> wait for at least half the target duration
wait = (long) lsp.targetDuration * 1000 / 2;
LOG.trace("Playlist didn't change... waiting for {}ms", wait);
} else {
// playlist did change -> wait for at least last segment duration
wait = 1;//(long) lsp.lastSegDuration * 1000;
LOG.trace("Playlist changed... waiting for {}ms", wait);
}
try {
Thread.sleep(wait);
} catch (InterruptedException e) {
if(running) {
LOG.error("Couldn't sleep between segment downloads. This might mess up the download!");
}
}
}
lastSegment = lsp.seq;
nextSegment = lastSegment + lsp.segments.size();
if(!livestreamDownload) {
break;
}
}
}
private Thread createHandoverThread() {
Thread t = new Thread() {
@Override
public void run() {
while(running) {
try {
Future<InputStream> downloadFuture = mergeQueue.take();
InputStream tsData = downloadFuture.get();
InputStreamMTSSource source = InputStreamMTSSource.builder().setInputStream(tsData).build();
multiSource.addSource(source);
} catch (InterruptedException e) {
if(running) {
LOG.error("Interrupted while waiting for a download future", e);
}
} catch (ExecutionException e) {
LOG.error("Error while opening segment stream", e);
}
}
LOG.debug("Handover Thread finished");
};
};
t.setName("Segment Handover Thread"); t.setName("Segment Handover Thread");
t.setDaemon(true); t.setDaemon(true);
return t; return t;
} }
private Thread createMergeThread(Path downloadDir) { private Thread createMergeThread(File targetFile, ProgressListener listener) {
Thread t = new Thread(() -> { Thread t = new Thread(() -> {
multiSource = BlockingMultiMTSSource.builder().setFixContinuity(true).build(); multiSource = BlockingMultiMTSSource.builder()
.setFixContinuity(true)
.setProgressListener(listener)
.build();
File out = new File(downloadDir.toFile(), "record.ts");
FileChannel channel = null; FileChannel channel = null;
try { try {
channel = FileChannel.open(out.toPath(), CREATE, WRITE); channel = FileChannel.open(targetFile.toPath(), CREATE, WRITE);
MTSSink sink = ByteChannelSink.builder().setByteChannel(channel).build(); MTSSink sink = ByteChannelSink.builder().setByteChannel(channel).build();
streamer = Streamer.builder() streamer = Streamer.builder()
@ -186,6 +231,7 @@ public class MergedHlsDownload extends AbstractHlsDownload {
// Start streaming // Start streaming
streamer.stream(); streamer.stream();
LOG.debug("Streamer finished");
} catch (InterruptedException e) { } catch (InterruptedException e) {
if(running) { if(running) {
LOG.error("Error while waiting for a download future", e); LOG.error("Error while waiting for a download future", e);
@ -196,7 +242,7 @@ public class MergedHlsDownload extends AbstractHlsDownload {
try { try {
channel.close(); channel.close();
} catch (IOException e) { } catch (IOException e) {
LOG.error("Error while closing file {}", out); LOG.error("Error while closing file {}", targetFile);
} }
} }
}); });

View File

@ -5,9 +5,7 @@ import static javafx.scene.control.ButtonType.YES;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.net.URL; import java.net.URL;
import java.security.InvalidKeyException; import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
@ -22,19 +20,15 @@ import java.util.concurrent.TimeUnit;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.iheartradio.m3u8.Encoding;
import com.iheartradio.m3u8.Format;
import com.iheartradio.m3u8.ParseException; import com.iheartradio.m3u8.ParseException;
import com.iheartradio.m3u8.PlaylistException; import com.iheartradio.m3u8.PlaylistException;
import com.iheartradio.m3u8.PlaylistParser;
import com.iheartradio.m3u8.data.MediaPlaylist;
import com.iheartradio.m3u8.data.Playlist;
import com.iheartradio.m3u8.data.TrackData;
import ctbrec.Config; import ctbrec.Config;
import ctbrec.HttpClient;
import ctbrec.Recording; import ctbrec.Recording;
import ctbrec.Recording.STATUS; import ctbrec.Recording.STATUS;
import ctbrec.recorder.Recorder; import ctbrec.recorder.Recorder;
import ctbrec.recorder.download.MergedHlsDownload;
import javafx.application.Platform; import javafx.application.Platform;
import javafx.collections.FXCollections; import javafx.collections.FXCollections;
import javafx.collections.ObservableList; import javafx.collections.ObservableList;
@ -279,39 +273,24 @@ public class RecordingsTab extends Tab implements TabSelectionListener {
String hlsBase = "http://" + config.getSettings().httpServer + ":" + config.getSettings().httpPort + "/hls"; String hlsBase = "http://" + config.getSettings().httpServer + ":" + config.getSettings().httpPort + "/hls";
URL url = new URL(hlsBase + "/" + recording.getPath() + "/playlist.m3u8"); URL url = new URL(hlsBase + "/" + recording.getPath() + "/playlist.m3u8");
LOG.info("Downloading {}", recording.getPath()); LOG.info("Downloading {}", recording.getPath());
PlaylistParser parser = new PlaylistParser(url.openStream(), Format.EXT_M3U, Encoding.UTF_8);
Playlist playlist = parser.parse();
MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist();
List<TrackData> tracks = mediaPlaylist.getTracks();
List<String> segmentUris = new ArrayList<>();
for (TrackData trackData : tracks) {
String segmentUri = hlsBase + "/" + recording.getPath() + "/" + trackData.getUri();
segmentUris.add(segmentUri);
}
Thread t = new Thread() { Thread t = new Thread() {
@Override @Override
public void run() { public void run() {
try(FileOutputStream fos = new FileOutputStream(target)) { try {
for (int i = 0; i < segmentUris.size(); i++) { MergedHlsDownload download = new MergedHlsDownload(HttpClient.getInstance());
URL segment = new URL(segmentUris.get(i)); download.start(url.toString(), target, (progress) -> {
InputStream in = segment.openStream(); Platform.runLater(() -> {
byte[] b = new byte[1024]; if (progress == 100) {
int length = -1; recording.setStatus(STATUS.FINISHED);
while( (length = in.read(b)) >= 0 ) { recording.setProgress(-1);
fos.write(b, 0, length); LOG.debug("Download finished for recording {}", recording.getPath());
} } else {
in.close();
int progress = (int) (i * 100.0 / segmentUris.size());
Platform.runLater(new Runnable() {
@Override
public void run() {
recording.setStatus(STATUS.DOWNLOADING); recording.setStatus(STATUS.DOWNLOADING);
recording.setProgress(progress); recording.setProgress(progress);
} }
}); });
} });
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
showErrorDialog("Error while downloading recording", "The target file couldn't be created", e); showErrorDialog("Error while downloading recording", "The target file couldn't be created", e);
LOG.error("Error while downloading recording", e); LOG.error("Error while downloading recording", e);
@ -332,9 +311,86 @@ public class RecordingsTab extends Tab implements TabSelectionListener {
t.setDaemon(true); t.setDaemon(true);
t.setName("Download Thread " + recording.getPath()); t.setName("Download Thread " + recording.getPath());
t.start(); t.start();
recording.setStatus(STATUS.DOWNLOADING);
recording.setProgress(0);
} }
} }
// private void download(Recording recording) throws IOException, ParseException, PlaylistException {
// String filename = recording.getPath().replaceAll("/", "-") + ".ts";
// FileChooser chooser = new FileChooser();
// chooser.setInitialFileName(filename);
// if(config.getSettings().lastDownloadDir != null && !config.getSettings().lastDownloadDir.equals("")) {
// File dir = new File(config.getSettings().lastDownloadDir);
// while(!dir.exists()) {
// dir = dir.getParentFile();
// }
// chooser.setInitialDirectory(dir);
// }
// File target = chooser.showSaveDialog(null);
// if(target != null) {
// config.getSettings().lastDownloadDir = target.getParent();
// String hlsBase = "http://" + config.getSettings().httpServer + ":" + config.getSettings().httpPort + "/hls";
// URL url = new URL(hlsBase + "/" + recording.getPath() + "/playlist.m3u8");
// LOG.info("Downloading {}", recording.getPath());
//
// PlaylistParser parser = new PlaylistParser(url.openStream(), Format.EXT_M3U, Encoding.UTF_8);
// Playlist playlist = parser.parse();
// MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist();
// List<TrackData> tracks = mediaPlaylist.getTracks();
// List<String> segmentUris = new ArrayList<>();
// for (TrackData trackData : tracks) {
// String segmentUri = hlsBase + "/" + recording.getPath() + "/" + trackData.getUri();
// segmentUris.add(segmentUri);
// }
//
// Thread t = new Thread() {
// @Override
// public void run() {
// try(FileOutputStream fos = new FileOutputStream(target)) {
// for (int i = 0; i < segmentUris.size(); i++) {
// URL segment = new URL(segmentUris.get(i));
// InputStream in = segment.openStream();
// byte[] b = new byte[1024];
// int length = -1;
// while( (length = in.read(b)) >= 0 ) {
// fos.write(b, 0, length);
// }
// in.close();
// int progress = (int) (i * 100.0 / segmentUris.size());
// Platform.runLater(new Runnable() {
// @Override
// public void run() {
// recording.setStatus(STATUS.DOWNLOADING);
// recording.setProgress(progress);
// }
// });
// }
//
// } catch (FileNotFoundException e) {
// showErrorDialog("Error while downloading recording", "The target file couldn't be created", e);
// LOG.error("Error while downloading recording", e);
// } catch (IOException e) {
// showErrorDialog("Error while downloading recording", "The recording could not be downloaded", e);
// LOG.error("Error while downloading recording", e);
// } finally {
// Platform.runLater(new Runnable() {
// @Override
// public void run() {
// recording.setStatus(STATUS.FINISHED);
// recording.setProgress(-1);
// }
// });
// }
// }
// };
// t.setDaemon(true);
// t.setName("Download Thread " + recording.getPath());
// t.start();
// }
// }
private void showErrorDialog(final String title, final String msg, final Exception e) { private void showErrorDialog(final String title, final String msg, final Exception e) {
Platform.runLater(new Runnable() { Platform.runLater(new Runnable() {
@Override @Override

View File

@ -3,15 +3,24 @@ package org.taktik.mpegts.sources;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.taktik.mpegts.MTSPacket; import org.taktik.mpegts.MTSPacket;
import ctbrec.recorder.ProgressListener;
public class BlockingMultiMTSSource extends AbstractMTSSource implements AutoCloseable { public class BlockingMultiMTSSource extends AbstractMTSSource implements AutoCloseable {
private static final transient Logger LOG = LoggerFactory.getLogger(BlockingMultiMTSSource.class);
private final boolean fixContinuity; private final boolean fixContinuity;
private ContinuityFixer continuityFixer; private ContinuityFixer continuityFixer;
private final BlockingQueue<MTSSource> sources; private final BlockingQueue<MTSSource> sources;
private MTSSource currentSource; private MTSSource currentSource;
private int downloadedSegments = 0;
private int totalSegments = -1;
private ProgressListener listener;
private BlockingMultiMTSSource(boolean fixContinuity) { private BlockingMultiMTSSource(boolean fixContinuity) {
this.fixContinuity = fixContinuity; this.fixContinuity = fixContinuity;
@ -35,8 +44,19 @@ public class BlockingMultiMTSSource extends AbstractMTSSource implements AutoClo
if(packet == null) { if(packet == null) {
// end of source has been reached, switch to the next source // end of source has been reached, switch to the next source
currentSource.close(); currentSource.close();
downloadedSegments++;
if(listener != null && totalSegments > 0) {
int progress = (int)(downloadedSegments * 100.0 / totalSegments);
listener.update(progress);
}
if(downloadedSegments == totalSegments) {
LOG.debug("All segments written. Queue size {}", sources.size());
return null;
}
currentSource = sources.take(); currentSource = sources.take();
packet = currentSource.nextPacket(); packet = currentSource.nextPacket();
// }
} }
if (fixContinuity) { if (fixContinuity) {
@ -45,6 +65,14 @@ public class BlockingMultiMTSSource extends AbstractMTSSource implements AutoClo
return packet; return packet;
} }
private void setProgressListener(ProgressListener listener) {
this.listener = listener;
}
public void setTotalSegments(int total) {
this.totalSegments = total;
}
@Override @Override
protected void closeInternal() throws Exception { protected void closeInternal() throws Exception {
for (MTSSource source : sources) { for (MTSSource source : sources) {
@ -58,14 +86,24 @@ public class BlockingMultiMTSSource extends AbstractMTSSource implements AutoClo
public static class BlockingMultiMTSSourceBuilder { public static class BlockingMultiMTSSourceBuilder {
boolean fixContinuity = false; boolean fixContinuity = false;
ProgressListener listener;
public BlockingMultiMTSSourceBuilder setFixContinuity(boolean fixContinuity) { public BlockingMultiMTSSourceBuilder setFixContinuity(boolean fixContinuity) {
this.fixContinuity = fixContinuity; this.fixContinuity = fixContinuity;
return this; return this;
} }
public BlockingMultiMTSSourceBuilder setProgressListener(ProgressListener listener) {
this.listener = listener;
return this;
}
public BlockingMultiMTSSource build() { public BlockingMultiMTSSource build() {
return new BlockingMultiMTSSource(fixContinuity); BlockingMultiMTSSource source = new BlockingMultiMTSSource(fixContinuity);
if(listener != null) {
source.setProgressListener(listener);
}
return source;
} }
} }
} }