From 698ba72120881997ea2666f7d06eee0d6739e9d1 Mon Sep 17 00:00:00 2001 From: 0xboobface <0xboobface@gmail.com> Date: Fri, 7 Sep 2018 15:16:08 +0200 Subject: [PATCH] Integrate mpegts-streamer to save a recording to a single file Integrate a modified version of mpegts-streamer (https://github.com/igilham/mpegts-streamer) Add BlockingMultiMTSSource to mpegts-streamer, which is used to add new InputStreamMTSSources online for each segment. Remove all settings and methods, which are needed for segment merging. --- pom.xml | 7 +- src/main/java/ctbrec/Settings.java | 3 - .../java/ctbrec/recorder/LocalRecorder.java | 77 +-- .../ctbrec/recorder/PlaylistGenerator.java | 4 +- src/main/java/ctbrec/recorder/Recorder.java | 3 - .../java/ctbrec/recorder/RemoteRecorder.java | 6 - .../download/AbstractHlsDownload.java | 120 ++++ .../ctbrec/recorder/download/HlsDownload.java | 108 +--- .../recorder/download/MergedHlsDownload.java | 257 ++++++++ src/main/java/ctbrec/ui/RecordingsTab.java | 61 -- src/main/java/ctbrec/ui/SettingsTab.java | 104 +-- src/main/java/org/taktik/CHANGELOG.md | 9 + .../java/org/taktik/ioutils/NIOUtils.java | 29 + .../java/org/taktik/mpegts/Constants.java | 6 + .../java/org/taktik/mpegts/MTSPacket.java | 607 ++++++++++++++++++ src/main/java/org/taktik/mpegts/Merger.java | 63 ++ .../java/org/taktik/mpegts/PATSection.java | 49 ++ .../java/org/taktik/mpegts/PMTSection.java | 133 ++++ .../java/org/taktik/mpegts/PSISection.java | 90 +++ .../java/org/taktik/mpegts/PacketSupport.java | 37 ++ src/main/java/org/taktik/mpegts/Streamer.java | 323 ++++++++++ .../java/org/taktik/mpegts/StreamerTest.java | 45 ++ .../taktik/mpegts/sinks/ByteChannelSink.java | 43 ++ .../java/org/taktik/mpegts/sinks/MTSSink.java | 7 + .../org/taktik/mpegts/sinks/UDPTransport.java | 77 +++ .../sources/AbstractByteChannelMTSSource.java | 117 ++++ .../mpegts/sources/AbstractMTSSource.java | 40 ++ .../sources/BlockingMultiMTSSource.java | 71 ++ .../mpegts/sources/ByteChannelMTSSource.java | 37 ++ .../mpegts/sources/ByteSourceMTSSource.java | 75 +++ .../sources/ConcatenatingMTSSource.java | 194 ++++++ .../mpegts/sources/ContinuityFixer.java | 210 ++++++ .../sources/ContinuityFixingMTSSource.java | 30 + .../mpegts/sources/FixedBitrateMTSSource.java | 30 + .../mpegts/sources/InputStreamMTSSource.java | 63 ++ .../mpegts/sources/LoopingMTSSource.java | 74 +++ .../org/taktik/mpegts/sources/MTSSource.java | 7 + .../org/taktik/mpegts/sources/MTSSources.java | 80 +++ .../mpegts/sources/MultiplexingMTSSource.java | 100 +++ .../mpegts/sources/NullPacketSource.java | 34 + .../mpegts/sources/ResettableMTSSource.java | 5 + .../sources/SeekableByteChannelMTSSource.java | 45 ++ src/main/resources/logback.xml | 1 + 43 files changed, 3128 insertions(+), 353 deletions(-) create mode 100644 src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java create mode 100644 src/main/java/ctbrec/recorder/download/MergedHlsDownload.java create mode 100644 src/main/java/org/taktik/CHANGELOG.md create mode 100644 src/main/java/org/taktik/ioutils/NIOUtils.java create mode 100644 src/main/java/org/taktik/mpegts/Constants.java create mode 100644 src/main/java/org/taktik/mpegts/MTSPacket.java create mode 100644 src/main/java/org/taktik/mpegts/Merger.java create mode 100644 src/main/java/org/taktik/mpegts/PATSection.java create mode 100644 src/main/java/org/taktik/mpegts/PMTSection.java create mode 100644 src/main/java/org/taktik/mpegts/PSISection.java create mode 100644 src/main/java/org/taktik/mpegts/PacketSupport.java create mode 100644 src/main/java/org/taktik/mpegts/Streamer.java create mode 100644 src/main/java/org/taktik/mpegts/StreamerTest.java create mode 100644 src/main/java/org/taktik/mpegts/sinks/ByteChannelSink.java create mode 100644 src/main/java/org/taktik/mpegts/sinks/MTSSink.java create mode 100644 src/main/java/org/taktik/mpegts/sinks/UDPTransport.java create mode 100644 src/main/java/org/taktik/mpegts/sources/AbstractByteChannelMTSSource.java create mode 100644 src/main/java/org/taktik/mpegts/sources/AbstractMTSSource.java create mode 100644 src/main/java/org/taktik/mpegts/sources/BlockingMultiMTSSource.java create mode 100644 src/main/java/org/taktik/mpegts/sources/ByteChannelMTSSource.java create mode 100644 src/main/java/org/taktik/mpegts/sources/ByteSourceMTSSource.java create mode 100644 src/main/java/org/taktik/mpegts/sources/ConcatenatingMTSSource.java create mode 100644 src/main/java/org/taktik/mpegts/sources/ContinuityFixer.java create mode 100644 src/main/java/org/taktik/mpegts/sources/ContinuityFixingMTSSource.java create mode 100644 src/main/java/org/taktik/mpegts/sources/FixedBitrateMTSSource.java create mode 100644 src/main/java/org/taktik/mpegts/sources/InputStreamMTSSource.java create mode 100644 src/main/java/org/taktik/mpegts/sources/LoopingMTSSource.java create mode 100644 src/main/java/org/taktik/mpegts/sources/MTSSource.java create mode 100644 src/main/java/org/taktik/mpegts/sources/MTSSources.java create mode 100644 src/main/java/org/taktik/mpegts/sources/MultiplexingMTSSource.java create mode 100644 src/main/java/org/taktik/mpegts/sources/NullPacketSource.java create mode 100644 src/main/java/org/taktik/mpegts/sources/ResettableMTSSource.java create mode 100644 src/main/java/org/taktik/mpegts/sources/SeekableByteChannelMTSSource.java diff --git a/pom.xml b/pom.xml index ee9755ea..24a7a230 100644 --- a/pom.xml +++ b/pom.xml @@ -175,6 +175,11 @@ 4.12 test - + + + com.google.guava + guava + 17.0 + diff --git a/src/main/java/ctbrec/Settings.java b/src/main/java/ctbrec/Settings.java index a552ffab..9b247408 100644 --- a/src/main/java/ctbrec/Settings.java +++ b/src/main/java/ctbrec/Settings.java @@ -19,14 +19,11 @@ public class Settings { public int httpTimeout = 10; public String httpServer = "localhost"; public String recordingsDir = System.getProperty("user.home") + File.separator + "ctbrec"; - public String mergeDir = ""; public String mediaPlayer = "/usr/bin/mpv"; public String username = ""; public String password = ""; public String lastDownloadDir = ""; public List models = new ArrayList(); - public boolean automerge = false; - public boolean automergeKeepSegments = false; public boolean determineResolution = false; public boolean requireAuthentication = false; public boolean chooseStreamQuality = false; diff --git a/src/main/java/ctbrec/recorder/LocalRecorder.java b/src/main/java/ctbrec/recorder/LocalRecorder.java index 158a2af1..33d62dff 100644 --- a/src/main/java/ctbrec/recorder/LocalRecorder.java +++ b/src/main/java/ctbrec/recorder/LocalRecorder.java @@ -8,7 +8,6 @@ import static ctbrec.Recording.STATUS.RECORDING; import java.io.File; import java.io.IOException; import java.nio.file.Files; -import java.nio.file.StandardCopyOption; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.text.SimpleDateFormat; @@ -36,7 +35,7 @@ import ctbrec.ModelParser; import ctbrec.Recording; import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException; import ctbrec.recorder.download.Download; -import ctbrec.recorder.download.HlsDownload; +import ctbrec.recorder.download.MergedHlsDownload; import okhttp3.Request; import okhttp3.Response; @@ -121,7 +120,8 @@ public class LocalRecorder implements Recorder { return; } - Download download = new HlsDownload(client); + //Download download = new HlsDownload(client); + Download download = new MergedHlsDownload(client); recordingProcesses.put(model, download); new Thread() { @Override @@ -301,10 +301,8 @@ public class LocalRecorder implements Recorder { @Override public void run() { boolean local = Config.getInstance().getSettings().localRecording; - boolean automerge = Config.getInstance().getSettings().automerge; - generatePlaylist(directory); - if (local && automerge) { - merge(directory); + if(!local) { + generatePlaylist(directory); } } }; @@ -313,59 +311,6 @@ public class LocalRecorder implements Recorder { t.start(); } - private File merge(File recDir) { - // TODO idea: create a factory for segment merger, which checks for ffmpeg and - // returns the FFmpeg merger, if available and the simple one otherwise - SegmentMerger segmentMerger = new SimpleSegmentMerger(); - //SegmentMerger segmentMerger = new FFmpegSegmentMerger(); - segmentMergers.put(recDir, segmentMerger); - try { - File mergedFile = Recording.mergedFileFromDirectory(recDir); - segmentMerger.merge(recDir, mergedFile); - - if (mergedFile != null && mergedFile.exists() && mergedFile.length() > 0) { - LOG.debug("Merged file {}", mergedFile.getAbsolutePath()); - if (Config.getInstance().getSettings().mergeDir.length() > 0) { - File mergeDir = new File(Config.getInstance().getSettings().mergeDir); - if (!mergeDir.exists()) { - boolean created = mergeDir.mkdirs(); - if (!created) { - LOG.error("Couldn't create directory for merged files {}", mergeDir); - } - } - - File finalLocation = new File(mergeDir, mergedFile.getName()); - try { - Files.move(mergedFile.toPath(), finalLocation.toPath(), StandardCopyOption.ATOMIC_MOVE); - } catch (IOException e) { - LOG.error("Couldn't move merged file to merge directory {}", mergeDir, e); - } - } - - if (Config.getInstance().getSettings().automerge && !Config.getInstance().getSettings().automergeKeepSegments) { - try { - LOG.debug("Deleting directory {}", recDir); - // TODO validate the size of the merged file before deleting the segments - delete(recDir, mergedFile); - } catch (IOException e) { - LOG.error("Couldn't delete directory {}", recDir, e); - } - } - } else { - LOG.error("Merged file not found {}", mergedFile); - } - - return mergedFile; - } catch (IOException e) { - LOG.error("Couldn't merge segments", e); - } catch (ParseException | PlaylistException e) { - LOG.error("Playlist is invalid", e); - } finally { - segmentMergers.remove(recDir); - } - return null; - } - private void generatePlaylist(File recDir) { PlaylistGenerator playlistGenerator = new PlaylistGenerator(); playlistGenerators.put(recDir, playlistGenerator); @@ -605,18 +550,6 @@ public class LocalRecorder implements Recorder { } } - @Override - public File merge(Recording rec, boolean keepSegments) throws IOException { - File recordingsDir = new File(config.getSettings().recordingsDir); - File directory = new File(recordingsDir, rec.getPath()); - File mergedFile = merge(directory); - if (!keepSegments) { - // TODO validate the size of the merged file before deleting the segments - delete(directory, mergedFile); - } - return mergedFile; - } - @Override public void switchStreamSource(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { LOG.debug("Switching stream source to index {} for model {}", model.getStreamUrlIndex(), model.getName()); diff --git a/src/main/java/ctbrec/recorder/PlaylistGenerator.java b/src/main/java/ctbrec/recorder/PlaylistGenerator.java index df31463b..04c136f8 100644 --- a/src/main/java/ctbrec/recorder/PlaylistGenerator.java +++ b/src/main/java/ctbrec/recorder/PlaylistGenerator.java @@ -47,7 +47,9 @@ public class PlaylistGenerator { public File generate(File directory) throws IOException, ParseException, PlaylistException { LOG.debug("Starting playlist generation for {}", directory); // get a list of all ts files and sort them by sequence - File[] files = directory.listFiles((f) -> f.getName().endsWith(".ts")); + File[] files = directory.listFiles((f) -> { + return f.getName().endsWith(".ts") && !f.getName().startsWith("record"); + }); if(files.length == 0) { LOG.debug("{} is empty. Not going to generate a playlist", directory); return null; diff --git a/src/main/java/ctbrec/recorder/Recorder.java b/src/main/java/ctbrec/recorder/Recorder.java index e94eba58..9effa208 100644 --- a/src/main/java/ctbrec/recorder/Recorder.java +++ b/src/main/java/ctbrec/recorder/Recorder.java @@ -1,6 +1,5 @@ package ctbrec.recorder; -import java.io.File; import java.io.IOException; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; @@ -26,8 +25,6 @@ public interface Recorder { public List getRecordings() throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException; - public File merge(Recording recording, boolean keepSegments) throws IOException; - public void delete(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException; public void shutdown(); diff --git a/src/main/java/ctbrec/recorder/RemoteRecorder.java b/src/main/java/ctbrec/recorder/RemoteRecorder.java index 03cb8921..baf6d3f5 100644 --- a/src/main/java/ctbrec/recorder/RemoteRecorder.java +++ b/src/main/java/ctbrec/recorder/RemoteRecorder.java @@ -1,6 +1,5 @@ package ctbrec.recorder; -import java.io.File; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.security.InvalidKeyException; @@ -233,11 +232,6 @@ public class RemoteRecorder implements Recorder { } } - @Override - public File merge(Recording recording, boolean keepSegments) throws IOException { - throw new RuntimeException("Merging not available for remote recorder"); - } - public static class ModelRequest { private String action; private Model model; diff --git a/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java b/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java new file mode 100644 index 00000000..cb4a35f4 --- /dev/null +++ b/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java @@ -0,0 +1,120 @@ +package ctbrec.recorder.download; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import com.iheartradio.m3u8.Encoding; +import com.iheartradio.m3u8.Format; +import com.iheartradio.m3u8.ParseException; +import com.iheartradio.m3u8.PlaylistException; +import com.iheartradio.m3u8.PlaylistParser; +import com.iheartradio.m3u8.data.MasterPlaylist; +import com.iheartradio.m3u8.data.MediaPlaylist; +import com.iheartradio.m3u8.data.Playlist; +import com.iheartradio.m3u8.data.PlaylistData; +import com.iheartradio.m3u8.data.TrackData; + +import ctbrec.HttpClient; +import okhttp3.Request; +import okhttp3.Response; + +public abstract class AbstractHlsDownload implements Download { + + ExecutorService downloadThreadPool = Executors.newFixedThreadPool(5); + HttpClient client; + volatile boolean running = false; + volatile boolean alive = true; + Path downloadDir; + + public AbstractHlsDownload(HttpClient client) { + this.client = client; + } + + String parseMaster(String url, int streamUrlIndex) throws IOException, ParseException, PlaylistException { + Request request = new Request.Builder().url(url).addHeader("connection", "keep-alive").build(); + Response response = client.execute(request); + try { + InputStream inputStream = response.body().byteStream(); + + PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8); + Playlist playlist = parser.parse(); + if(playlist.hasMasterPlaylist()) { + MasterPlaylist master = playlist.getMasterPlaylist(); + PlaylistData bestQuality = null; + if(streamUrlIndex >= 0 && streamUrlIndex < master.getPlaylists().size()) { + bestQuality = master.getPlaylists().get(streamUrlIndex); + } else { + bestQuality = master.getPlaylists().get(master.getPlaylists().size()-1); + } + String uri = bestQuality.getUri(); + if(!uri.startsWith("http")) { + String masterUrl = url; + String baseUri = masterUrl.substring(0, masterUrl.lastIndexOf('/') + 1); + String segmentUri = baseUri + uri; + return segmentUri; + } + } + return null; + } finally { + response.close(); + } + } + + LiveStreamingPlaylist getNextSegments(String segments) throws IOException, ParseException, PlaylistException { + URL segmentsUrl = new URL(segments); + Request request = new Request.Builder().url(segmentsUrl).addHeader("connection", "keep-alive").build(); + Response response = client.execute(request); + try { + InputStream inputStream = response.body().byteStream(); + PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8); + Playlist playlist = parser.parse(); + if(playlist.hasMediaPlaylist()) { + MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist(); + LiveStreamingPlaylist lsp = new LiveStreamingPlaylist(); + lsp.seq = mediaPlaylist.getMediaSequenceNumber(); + lsp.targetDuration = mediaPlaylist.getTargetDuration(); + List tracks = mediaPlaylist.getTracks(); + for (TrackData trackData : tracks) { + String uri = trackData.getUri(); + if(!uri.startsWith("http")) { + String _url = segmentsUrl.toString(); + _url = _url.substring(0, _url.lastIndexOf('/') + 1); + String segmentUri = _url + uri; + lsp.totalDuration += trackData.getTrackInfo().duration; + lsp.lastSegDuration = trackData.getTrackInfo().duration; + lsp.segments.add(segmentUri); + } + } + return lsp; + } + return null; + } finally { + response.close(); + } + } + + @Override + public boolean isAlive() { + return alive; + } + + @Override + public File getDirectory() { + return downloadDir.toFile(); + } + + public static class LiveStreamingPlaylist { + public int seq = 0; + public float totalDuration = 0; + public float lastSegDuration = 0; + public float targetDuration = 0; + public List segments = new ArrayList<>(); + } +} diff --git a/src/main/java/ctbrec/recorder/download/HlsDownload.java b/src/main/java/ctbrec/recorder/download/HlsDownload.java index e220d64b..df7732ab 100644 --- a/src/main/java/ctbrec/recorder/download/HlsDownload.java +++ b/src/main/java/ctbrec/recorder/download/HlsDownload.java @@ -12,27 +12,15 @@ import java.nio.file.Files; import java.nio.file.LinkOption; import java.nio.file.Path; import java.text.SimpleDateFormat; -import java.util.ArrayList; import java.util.Date; -import java.util.List; import java.util.Objects; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.iheartradio.m3u8.Encoding; -import com.iheartradio.m3u8.Format; import com.iheartradio.m3u8.ParseException; import com.iheartradio.m3u8.PlaylistException; -import com.iheartradio.m3u8.PlaylistParser; -import com.iheartradio.m3u8.data.MasterPlaylist; -import com.iheartradio.m3u8.data.MediaPlaylist; -import com.iheartradio.m3u8.data.Playlist; -import com.iheartradio.m3u8.data.PlaylistData; -import com.iheartradio.m3u8.data.TrackData; import ctbrec.Config; import ctbrec.HttpClient; @@ -42,17 +30,12 @@ import ctbrec.recorder.StreamInfo; import okhttp3.Request; import okhttp3.Response; -public class HlsDownload implements Download { +public class HlsDownload extends AbstractHlsDownload { private static final transient Logger LOG = LoggerFactory.getLogger(HlsDownload.class); - private HttpClient client; - private ExecutorService threadPool = Executors.newFixedThreadPool(5); - private volatile boolean running = false; - private volatile boolean alive = true; - private Path downloadDir; public HlsDownload(HttpClient client) { - this.client = client; + super(client); } @Override @@ -77,7 +60,7 @@ public class HlsDownload implements Download { int lastSegment = 0; int nextSegment = 0; while(running) { - LiveStreamingPlaylist lsp = parseSegments(segments); + LiveStreamingPlaylist lsp = getNextSegments(segments); if(nextSegment > 0 && lsp.seq > nextSegment) { LOG.warn("Missed segments {} < {} in download for {}", nextSegment, lsp.seq, model); String first = lsp.segments.get(0); @@ -85,7 +68,7 @@ public class HlsDownload implements Download { for (int i = nextSegment; i < lsp.seq; i++) { URL segmentUrl = new URL(first.replaceAll(Integer.toString(seq), Integer.toString(i))); LOG.debug("Reloading segment {} for model {}", i, model.getName()); - threadPool.submit(new SegmentDownload(segmentUrl, downloadDir, client)); + downloadThreadPool.submit(new SegmentDownload(segmentUrl, downloadDir, client)); } // TODO switch to a lower bitrate/resolution ?!? } @@ -95,7 +78,7 @@ public class HlsDownload implements Download { skip--; } else { URL segmentUrl = new URL(segment); - threadPool.submit(new SegmentDownload(segmentUrl, downloadDir, client)); + downloadThreadPool.submit(new SegmentDownload(segmentUrl, downloadDir, client)); //new SegmentDownload(segment, downloadDir).call(); } } @@ -146,77 +129,6 @@ public class HlsDownload implements Download { alive = false; } - private LiveStreamingPlaylist parseSegments(String segments) throws IOException, ParseException, PlaylistException { - URL segmentsUrl = new URL(segments); - Request request = new Request.Builder().url(segmentsUrl).addHeader("connection", "keep-alive").build(); - Response response = client.execute(request); - try { - InputStream inputStream = response.body().byteStream(); - PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8); - Playlist playlist = parser.parse(); - if(playlist.hasMediaPlaylist()) { - MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist(); - LiveStreamingPlaylist lsp = new LiveStreamingPlaylist(); - lsp.seq = mediaPlaylist.getMediaSequenceNumber(); - lsp.targetDuration = mediaPlaylist.getTargetDuration(); - List tracks = mediaPlaylist.getTracks(); - for (TrackData trackData : tracks) { - String uri = trackData.getUri(); - if(!uri.startsWith("http")) { - String _url = segmentsUrl.toString(); - _url = _url.substring(0, _url.lastIndexOf('/') + 1); - String segmentUri = _url + uri; - lsp.totalDuration += trackData.getTrackInfo().duration; - lsp.lastSegDuration = trackData.getTrackInfo().duration; - lsp.segments.add(segmentUri); - } - } - return lsp; - } - return null; - } finally { - response.close(); - } - } - - private String parseMaster(String url, int streamUrlIndex) throws IOException, ParseException, PlaylistException { - Request request = new Request.Builder().url(url).addHeader("connection", "keep-alive").build(); - Response response = client.execute(request); - try { - InputStream inputStream = response.body().byteStream(); - - PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8); - Playlist playlist = parser.parse(); - if(playlist.hasMasterPlaylist()) { - MasterPlaylist master = playlist.getMasterPlaylist(); - PlaylistData bestQuality = null; - if(streamUrlIndex >= 0 && streamUrlIndex < master.getPlaylists().size()) { - bestQuality = master.getPlaylists().get(streamUrlIndex); - } else { - bestQuality = master.getPlaylists().get(master.getPlaylists().size()-1); - } - String uri = bestQuality.getUri(); - if(!uri.startsWith("http")) { - String masterUrl = url; - String baseUri = masterUrl.substring(0, masterUrl.lastIndexOf('/') + 1); - String segmentUri = baseUri + uri; - return segmentUri; - } - } - return null; - } finally { - response.close(); - } - } - - public static class LiveStreamingPlaylist { - public int seq = 0; - public float totalDuration = 0; - public float lastSegDuration = 0; - public float targetDuration = 0; - public List segments = new ArrayList<>(); - } - private static class SegmentDownload implements Callable { private URL url; private Path file; @@ -262,14 +174,4 @@ public class HlsDownload implements Download { return false; } } - - @Override - public boolean isAlive() { - return alive; - } - - @Override - public File getDirectory() { - return downloadDir.toFile(); - } } diff --git a/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java b/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java new file mode 100644 index 00000000..6770e1c5 --- /dev/null +++ b/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java @@ -0,0 +1,257 @@ +package ctbrec.recorder.download; + +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.WRITE; + +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.nio.channels.FileChannel; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.LinkOption; +import java.nio.file.Path; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.taktik.mpegts.Streamer; +import org.taktik.mpegts.sinks.ByteChannelSink; +import org.taktik.mpegts.sinks.MTSSink; +import org.taktik.mpegts.sources.BlockingMultiMTSSource; +import org.taktik.mpegts.sources.InputStreamMTSSource; + +import com.iheartradio.m3u8.ParseException; +import com.iheartradio.m3u8.PlaylistException; + +import ctbrec.Config; +import ctbrec.HttpClient; +import ctbrec.Model; +import ctbrec.recorder.Chaturbate; +import ctbrec.recorder.StreamInfo; +import okhttp3.Request; +import okhttp3.Response; + +public class MergedHlsDownload extends AbstractHlsDownload { + + private static final transient Logger LOG = LoggerFactory.getLogger(MergedHlsDownload.class); + private BlockingQueue> mergeQueue = new LinkedBlockingQueue<>(); + private BlockingMultiMTSSource multiSource; + private Thread mergeThread; + private Thread handoverThread; + private Streamer streamer; + + public MergedHlsDownload(HttpClient client) { + super(client); + } + + @Override + public void start(Model model, Config config) throws IOException { + try { + running = true; + StreamInfo streamInfo = Chaturbate.getStreamInfo(model, client); + if(!Objects.equals(streamInfo.room_status, "public")) { + throw new IOException(model.getName() +"'s room is not public"); + } + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm"); + String startTime = sdf.format(new Date()); + Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getName()); + downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), startTime); + if (!Files.exists(downloadDir, LinkOption.NOFOLLOW_LINKS)) { + Files.createDirectories(downloadDir); + } + + mergeThread = createMergeThread(downloadDir); + mergeThread.start(); + handoverThread = createHandoverThread(); + handoverThread.start(); + + String segments = parseMaster(streamInfo.url, model.getStreamUrlIndex()); + if(segments != null) { + int lastSegment = 0; + int nextSegment = 0; + while(running) { + LiveStreamingPlaylist lsp = getNextSegments(segments); + if(nextSegment > 0 && lsp.seq > nextSegment) { + LOG.warn("Missed segments {} < {} in download for {}", nextSegment, lsp.seq, model); + String first = lsp.segments.get(0); + int seq = lsp.seq; + for (int i = nextSegment; i < lsp.seq; i++) { + URL segmentUrl = new URL(first.replaceAll(Integer.toString(seq), Integer.toString(i))); + LOG.debug("Reloading segment {} for model {}", i, model.getName()); + // FIXME this does not work with the current mechanism, since the InputStreams for these segments would be added + // to the mergeQueue in the wrong spot (after successors of these segments -> wrong order) + Future downloadFuture = downloadThreadPool.submit(new SegmentDownload(segmentUrl, client)); + mergeQueue.add(downloadFuture); + } + // TODO switch to a lower bitrate/resolution ?!? + } + int skip = nextSegment - lsp.seq; + for (String segment : lsp.segments) { + if(skip > 0) { + skip--; + } else { + URL segmentUrl = new URL(segment); + Future downloadFuture = downloadThreadPool.submit(new SegmentDownload(segmentUrl, client)); + mergeQueue.add(downloadFuture); + } + } + + long wait = 0; + if(lastSegment == lsp.seq) { + // playlist didn't change -> wait for at least half the target duration + wait = (long) lsp.targetDuration * 1000 / 2; + LOG.trace("Playlist didn't change... waiting for {}ms", wait); + } else { + // playlist did change -> wait for at least last segment duration + wait = 1;//(long) lsp.lastSegDuration * 1000; + LOG.trace("Playlist changed... waiting for {}ms", wait); + } + + try { + Thread.sleep(wait); + } catch (InterruptedException e) { + if(running) { + LOG.error("Couldn't sleep between segment downloads. This might mess up the download!"); + } + } + + lastSegment = lsp.seq; + nextSegment = lastSegment + lsp.segments.size(); + } + } else { + throw new IOException("Couldn't determine segments uri"); + } + } catch(ParseException e) { + throw new IOException("Couldn't parse stream information", e); + } catch(PlaylistException e) { + throw new IOException("Couldn't parse HLS playlist", e); + } catch(EOFException e) { + // end of playlist reached + LOG.debug("Reached end of playlist for model {}", model); + } catch(Exception e) { + throw new IOException("Couldn't download segment", e); + } finally { + alive = false; + LOG.debug("Download for {} terminated", model); + } + } + + private Thread createHandoverThread() { + Thread t = new Thread(() -> { + while(running) { + try { + Future downloadFuture = mergeQueue.take(); + InputStream tsData = downloadFuture.get(); + InputStreamMTSSource source = InputStreamMTSSource.builder().setInputStream(tsData).build(); + multiSource.addSource(source); + } catch (InterruptedException e) { + if(running) { + LOG.error("Error while waiting for a download future", e); + } + } catch (ExecutionException e) { + LOG.error("Error while executing download", e); + } catch (IOException e) { + LOG.error("Error while saving stream to file", e); + } + } + }); + t.setName("Segment Handover Thread"); + t.setDaemon(true); + return t; + } + + private Thread createMergeThread(Path downloadDir) { + Thread t = new Thread(() -> { + multiSource = BlockingMultiMTSSource.builder().setFixContinuity(true).build(); + + File out = new File(downloadDir.toFile(), "record.ts"); + FileChannel channel = null; + try { + channel = FileChannel.open(out.toPath(), CREATE, WRITE); + MTSSink sink = ByteChannelSink.builder().setByteChannel(channel).build(); + + streamer = Streamer.builder() + .setSource(multiSource) + .setSink(sink) + .setSleepingEnabled(false) + .setBufferSize(10) + .build(); + + // Start streaming + streamer.stream(); + } catch (InterruptedException e) { + if(running) { + LOG.error("Error while waiting for a download future", e); + } + } catch(Exception e) { + LOG.error("Error while saving stream to file", e); + } finally { + try { + channel.close(); + } catch (IOException e) { + LOG.error("Error while closing file {}", out); + } + } + }); + t.setName("Segment Merger Thread"); + t.setDaemon(true); + return t; + } + + @Override + public void stop() { + running = false; + alive = false; + LOG.debug("Stopping streamer"); + streamer.stop(); + LOG.debug("Sending interrupt to merger"); + mergeThread.interrupt(); + LOG.debug("Sending interrupt to handover thread"); + handoverThread.interrupt(); + LOG.debug("Download stopped"); + } + + private static class SegmentDownload implements Callable { + private URL url; + private HttpClient client; + + public SegmentDownload(URL url, HttpClient client) { + this.url = url; + this.client = client; + } + + @Override + public InputStream call() throws Exception { + LOG.trace("Downloading segment " + url.getFile()); + int maxTries = 3; + for (int i = 1; i <= maxTries; i++) { + Request request = new Request.Builder().url(url).addHeader("connection", "keep-alive").build(); + Response response = client.execute(request); + try { + InputStream in = response.body().byteStream(); + return in; + } catch(Exception e) { + if (i == maxTries) { + LOG.warn("Error while downloading segment. Segment {} finally failed", url.getFile()); + } else { + LOG.warn("Error while downloading segment {} on try {}", url.getFile(), i); + } + } /*finally { + response.close(); + }*/ + } + throw new IOException("Unable to download segment " + url.getFile() + " after " + maxTries + " tries"); + } + } +} diff --git a/src/main/java/ctbrec/ui/RecordingsTab.java b/src/main/java/ctbrec/ui/RecordingsTab.java index fa5a1595..08017ebc 100644 --- a/src/main/java/ctbrec/ui/RecordingsTab.java +++ b/src/main/java/ctbrec/ui/RecordingsTab.java @@ -45,7 +45,6 @@ import javafx.scene.Cursor; import javafx.scene.control.Alert.AlertType; import javafx.scene.control.ButtonType; import javafx.scene.control.ContextMenu; -import javafx.scene.control.Menu; import javafx.scene.control.MenuItem; import javafx.scene.control.ScrollPane; import javafx.scene.control.Tab; @@ -260,69 +259,9 @@ public class RecordingsTab extends Tab implements TabSelectionListener { contextMenu.getItems().add(downloadRecording); } - Menu mergeRecording = new Menu("Merge segments"); - MenuItem mergeKeep = new MenuItem("… and keep segments"); - mergeKeep.setOnAction((e) -> { - try { - merge(recording, true); - } catch (IOException e1) { - showErrorDialog("Error while merging recording", "The playlist does not exist", e1); - LOG.error("Error while merging recording", e); - } - }); - MenuItem mergeDelete = new MenuItem("… and delete segments"); - mergeDelete.setOnAction((e) -> { - try { - merge(recording, false); - } catch (IOException e1) { - showErrorDialog("Error while merging recording", "The playlist does not exist", e1); - LOG.error("Error while merging recording", e); - } - }); - mergeRecording.getItems().addAll(mergeKeep, mergeDelete); - if (Config.getInstance().getSettings().localRecording && recording.getStatus() == STATUS.FINISHED) { - if(!Recording.isMergedRecording(recording)) { - contextMenu.getItems().add(mergeRecording); - } - } - return contextMenu; } - private void merge(Recording recording, boolean keepSegments) throws IOException { - File recDir = new File (Config.getInstance().getSettings().recordingsDir, recording.getPath()); - File playlistFile = new File(recDir, "playlist.m3u8"); - if(!playlistFile.exists()) { - table.setCursor(Cursor.DEFAULT); - throw new IOException("Playlist file does not exist"); - } - String filename = recording.getPath().replaceAll("/", "-") + ".ts"; - File targetFile = new File(recDir, filename); - if(targetFile.exists()) { - return; - } - - Thread t = new Thread() { - @Override - public void run() { - try { - recorder.merge(recording, keepSegments); - } catch (IOException e) { - showErrorDialog("Error while merging segments", "The merged file could not be created", e); - LOG.error("Error while merging segments", e); - } finally { - Platform.runLater(() -> { - recording.setStatus(STATUS.FINISHED); - recording.setProgress(-1); - }); - } - }; - }; - t.setDaemon(true); - t.setName("Segment Merger " + recording.getPath()); - t.start(); - } - private void download(Recording recording) throws IOException, ParseException, PlaylistException { String filename = recording.getPath().replaceAll("/", "-") + ".ts"; FileChooser chooser = new FileChooser(); diff --git a/src/main/java/ctbrec/ui/SettingsTab.java b/src/main/java/ctbrec/ui/SettingsTab.java index 0ea10374..94a8dc1a 100644 --- a/src/main/java/ctbrec/ui/SettingsTab.java +++ b/src/main/java/ctbrec/ui/SettingsTab.java @@ -45,15 +45,12 @@ public class SettingsTab extends Tab implements TabSelectionListener { private static final int CHECKBOX_MARGIN = 6; private TextField recordingsDirectory; private Button recordingsDirectoryButton; - private TextField mergeDirectory; private TextField mediaPlayer; private TextField username; private TextField server; private TextField port; private CheckBox loadResolution; private CheckBox secureCommunication = new CheckBox(); - private CheckBox automerge = new CheckBox(); - private CheckBox automergeKeepSegments = new CheckBox(); private CheckBox chooseStreamQuality = new CheckBox(); private CheckBox autoRecordFollowed = new CheckBox(); private CheckBox multiplePlayers = new CheckBox(); @@ -62,9 +59,7 @@ public class SettingsTab extends Tab implements TabSelectionListener { private RadioButton recordRemote; private ToggleGroup recordLocation; private ProxySettingsPane proxySettingsPane; - private TitledPane ctb; - private TitledPane mergePane; public SettingsTab() { setText("Settings"); @@ -191,37 +186,6 @@ public class SettingsTab extends Tab implements TabSelectionListener { quality.setCollapsible(false); mainLayout.add(quality, 0, 2); - GridPane mergeLayout = createGridLayout(); - l = new Label("Auto-merge recordings"); - mergeLayout.add(l, 0, 0); - automerge.setSelected(Config.getInstance().getSettings().automerge); - automerge.setOnAction((e) -> Config.getInstance().getSettings().automerge = automerge.isSelected()); - GridPane.setMargin(automerge, new Insets(0, 0, 0, CHECKBOX_MARGIN)); - mergeLayout.add(automerge, 1, 0); - - l = new Label("Keep segments after auto-merge"); - mergeLayout.add(l, 0, 1); - automergeKeepSegments.setSelected(Config.getInstance().getSettings().automergeKeepSegments); - automergeKeepSegments.setOnAction((e) -> Config.getInstance().getSettings().automergeKeepSegments = automergeKeepSegments.isSelected()); - GridPane.setMargin(l, new Insets(CHECKBOX_MARGIN, 0, CHECKBOX_MARGIN, 0)); - GridPane.setMargin(automergeKeepSegments, new Insets(CHECKBOX_MARGIN, 0, CHECKBOX_MARGIN, CHECKBOX_MARGIN)); - mergeLayout.add(automergeKeepSegments, 1, 1); - - l = new Label("Move merged files to"); - mergeLayout.add(l, 0, 2); - mergeDirectory = new TextField(Config.getInstance().getSettings().mergeDir); - mergeDirectory.setOnAction((e) -> Config.getInstance().getSettings().mergeDir = mergeDirectory.getText()); - mergeDirectory.focusedProperty().addListener(createMergeDirectoryFocusListener()); - GridPane.setFillWidth(mergeDirectory, true); - GridPane.setHgrow(mergeDirectory, Priority.ALWAYS); - GridPane.setMargin(mergeDirectory, new Insets(0, 0, 0, CHECKBOX_MARGIN)); - mergeLayout.add(mergeDirectory, 1, 2); - mergeLayout.add(createMergeDirButton(), 3, 2); - - mergePane = new TitledPane("Merging", mergeLayout); - mergePane.setCollapsible(false); - mainLayout.add(mergePane, 0, 3); - layout = createGridLayout(); l = new Label("Record Location"); layout.add(l, 0, 0); @@ -303,7 +267,7 @@ public class SettingsTab extends Tab implements TabSelectionListener { TitledPane recordLocation = new TitledPane("Record Location", layout); recordLocation.setCollapsible(false); - mainLayout.add(recordLocation, 0, 4); + mainLayout.add(recordLocation, 0, 3); setRecordingMode(recordLocal.isSelected()); } @@ -328,9 +292,6 @@ public class SettingsTab extends Tab implements TabSelectionListener { server.setDisable(local); port.setDisable(local); secureCommunication.setDisable(local); - automerge.setDisable(!local); - automergeKeepSegments.setDisable(!local); - mergePane.setDisable(!local); ctb.setDisable(!local); recordingsDirectory.setDisable(!local); recordingsDirectoryButton.setDisable(!local); @@ -352,26 +313,6 @@ public class SettingsTab extends Tab implements TabSelectionListener { }; } - private ChangeListener createMergeDirectoryFocusListener() { - return new ChangeListener() { - @Override - public void changed(ObservableValue arg0, Boolean oldPropertyValue, Boolean newPropertyValue) { - if (newPropertyValue) { - mergeDirectory.setBorder(Border.EMPTY); - mergeDirectory.setTooltip(null); - } else { - String input = mergeDirectory.getText(); - if(input.isEmpty()) { - Config.getInstance().getSettings().mergeDir = ""; - } else { - File newDir = new File(input); - setMergeDir(newDir); - } - } - } - }; - } - private ChangeListener createMpvFocusListener() { return new ChangeListener() { @Override @@ -423,22 +364,6 @@ public class SettingsTab extends Tab implements TabSelectionListener { return button; } - private Node createMergeDirButton() { - Button button = new Button("Select"); - button.setOnAction((e) -> { - DirectoryChooser chooser = new DirectoryChooser(); - File currentDir = new File(Config.getInstance().getSettings().mergeDir); - if (currentDir.exists() && currentDir.isDirectory()) { - chooser.setInitialDirectory(currentDir); - } - File selectedDir = chooser.showDialog(null); - if(selectedDir != null) { - setMergeDir(selectedDir); - } - }); - return button; - } - private Node createMpvBrowseButton() { Button button = new Button("Select"); button.setOnAction((e) -> { @@ -485,33 +410,6 @@ public class SettingsTab extends Tab implements TabSelectionListener { } } - private void setMergeDir(File dir) { - if (dir != null) { - if (dir.isDirectory()) { - try { - String path = dir.getCanonicalPath(); - Config.getInstance().getSettings().mergeDir = path; - mergeDirectory.setText(path); - } catch (IOException e1) { - LOG.error("Couldn't determine directory path", e1); - Alert alert = new AutosizeAlert(Alert.AlertType.ERROR); - alert.setTitle("Whoopsie"); - alert.setContentText("Couldn't determine directory path"); - alert.showAndWait(); - } - } else { - mergeDirectory.setBorder(new Border(new BorderStroke(Color.RED, BorderStrokeStyle.DASHED, new CornerRadii(2), new BorderWidths(2)))); - if (!dir.isDirectory()) { - mergeDirectory.setTooltip(new Tooltip("This is not a directory")); - } - if (!dir.exists()) { - mergeDirectory.setTooltip(new Tooltip("Directory does not exist")); - } - - } - } - } - @Override public void selected() { } diff --git a/src/main/java/org/taktik/CHANGELOG.md b/src/main/java/org/taktik/CHANGELOG.md new file mode 100644 index 00000000..0c286235 --- /dev/null +++ b/src/main/java/org/taktik/CHANGELOG.md @@ -0,0 +1,9 @@ +Changes made to mpegts-streamer for ctbrec +================= +* Remove dependency on jcodec +* Add sleepingEnabled flag in Streamer to disable sleeping completely and make processing of stream much faster +* Add sanity check in ContinuityFixer to fix avoid an IndexOutOfBoundsException +* Wait for bufferingThread and streamingThread in Streamer.stream() to make it a blocking method +* Add BlockingMultiMTSSource, which can be used to add sources, after the streaming has been started +* Don't close the stream, if a packet can't be read in one go InputStreamMTSSource. Instead read from + the stream until the packet is complete diff --git a/src/main/java/org/taktik/ioutils/NIOUtils.java b/src/main/java/org/taktik/ioutils/NIOUtils.java new file mode 100644 index 00000000..f9fbe272 --- /dev/null +++ b/src/main/java/org/taktik/ioutils/NIOUtils.java @@ -0,0 +1,29 @@ +package org.taktik.ioutils; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; + +public class NIOUtils { + + public static int read(ReadableByteChannel channel, ByteBuffer buffer) throws IOException { + int rem = buffer.position(); + while (channel.read(buffer) != -1 && buffer.hasRemaining()) { + } + return buffer.position() - rem; + } + + public static int skip(ByteBuffer buffer, int count) { + int toSkip = Math.min(buffer.remaining(), count); + buffer.position(buffer.position() + toSkip); + return toSkip; + } + + public static final ByteBuffer read(ByteBuffer buffer, int count) { + ByteBuffer slice = buffer.duplicate(); + int limit = buffer.position() + count; + slice.limit(limit); + buffer.position(limit); + return slice; + } +} \ No newline at end of file diff --git a/src/main/java/org/taktik/mpegts/Constants.java b/src/main/java/org/taktik/mpegts/Constants.java new file mode 100644 index 00000000..599ac902 --- /dev/null +++ b/src/main/java/org/taktik/mpegts/Constants.java @@ -0,0 +1,6 @@ +package org.taktik.mpegts; + +public class Constants { + public static final int MPEGTS_PACKET_SIZE = 188; + public static final byte TS_MARKER = 0x47; +} diff --git a/src/main/java/org/taktik/mpegts/MTSPacket.java b/src/main/java/org/taktik/mpegts/MTSPacket.java new file mode 100644 index 00000000..2a6c3ea4 --- /dev/null +++ b/src/main/java/org/taktik/mpegts/MTSPacket.java @@ -0,0 +1,607 @@ +package org.taktik.mpegts; + +import java.nio.ByteBuffer; + +import com.google.common.base.Preconditions; +import org.taktik.ioutils.NIOUtils; + +public class MTSPacket extends PacketSupport { + private boolean transportErrorIndicator; // Transport Error Indicator (TEI) + private boolean payloadUnitStartIndicator; // Payload Unit Start Indicator + private boolean transportPriority; // Transport Priority + private int pid; // Packet Identifier (PID) + private int scramblingControl; // Scrambling control + private boolean adaptationFieldExist; // Adaptation field exist + private boolean containsPayload; // Contains payload + private int continuityCounter; // Continuity counter + + private AdaptationField adaptationField; + private ByteBuffer payload; + + public static class AdaptationField { + private MTSPacket packet; + private boolean discontinuityIndicator; // Discontinuity indicator + private boolean randomAccessIndicator; // Random Access indicator + private boolean elementaryStreamPriorityIndicator; // Elementary stream priority indicator + private boolean pcrFlag; // PCR flag + private boolean opcrFlag; // OPCR flag + private boolean splicingPointFlag; // Splicing point flag + private boolean transportPrivateDataFlag; // Transport private data flag + private boolean adaptationFieldExtensionFlag; // Adaptation field extension flag + private PCR pcr; // PCR + private PCR opcr; // OPCR + private byte spliceCountdown; // Splice countdown + private byte[] privateData; // Private data + private byte[] extension; // Extension + + public static class PCR { + private AdaptationField field; + public long base; // 33 bits + public int extension; // 9 bits + public byte reserved; // 6 bits + + public PCR(AdaptationField field, long base, int extension, byte reserved) { + this.base = base; + this.extension = extension; + this.reserved = reserved; + this.field = field; + } + + public long getValue() { + return base * 300 + extension; + } + + public void setValue(long value) { + base = value / 300; + extension = (int)value % 300; + field.markDirty(); + } + + public void write(ByteBuffer buffer) { + buffer.putInt((int) ((base & 0x1FFFFFFFFL) >> 1)); + int middleByte = 0; + middleByte |= ((base & 0x1) << 7); + middleByte |= ((reserved & 0x3F) << 1); + middleByte |= ((extension & 0x1FF) >> 8); + buffer.put((byte) middleByte); + buffer.put((byte) (extension & 0xff)); + } + } + + public AdaptationField(MTSPacket packet, boolean discontinuityIndicator, boolean randomAccessIndicator, boolean elementaryStreamPriorityIndicator, boolean pcrFlag, boolean opcrFlag, boolean splicingPointFlag, boolean transportPrivateDataFlag, boolean adaptationFieldExtensionFlag, PCR pcr, PCR opcr, byte spliceCountdown, byte[] privateData, byte[] extension) { + this.packet = packet; + this.discontinuityIndicator = discontinuityIndicator; + this.randomAccessIndicator = randomAccessIndicator; + this.elementaryStreamPriorityIndicator = elementaryStreamPriorityIndicator; + this.pcrFlag = pcrFlag; + this.opcrFlag = opcrFlag; + this.splicingPointFlag = splicingPointFlag; + this.transportPrivateDataFlag = transportPrivateDataFlag; + this.adaptationFieldExtensionFlag = adaptationFieldExtensionFlag; + this.pcr = pcr; + if (this.pcr != null) { + this.pcr.field = this; + } + this.opcr = opcr; + if (this.opcr != null) { + this.opcr.field = this; + } + this.spliceCountdown = spliceCountdown; + this.privateData = privateData; + this.extension = extension; + } + + public boolean isDiscontinuityIndicator() { + return discontinuityIndicator; + } + + public void setDiscontinuityIndicator(boolean discontinuityIndicator) { + this.discontinuityIndicator = discontinuityIndicator; + markDirty(); + } + + private void markDirty() { + packet.markDirty(); + } + + public boolean isRandomAccessIndicator() { + return randomAccessIndicator; + } + + public void setRandomAccessIndicator(boolean randomAccessIndicator) { + this.randomAccessIndicator = randomAccessIndicator; + markDirty(); + } + + public boolean isElementaryStreamPriorityIndicator() { + return elementaryStreamPriorityIndicator; + } + + public void setElementaryStreamPriorityIndicator(boolean elementaryStreamPriorityIndicator) { + this.elementaryStreamPriorityIndicator = elementaryStreamPriorityIndicator; + markDirty(); + } + + public boolean isPcrFlag() { + return pcrFlag; + } + + public void setPcrFlag(boolean pcrFlag) { + this.pcrFlag = pcrFlag; + markDirty(); + } + + public boolean isOpcrFlag() { + return opcrFlag; + } + + public void setOpcrFlag(boolean opcrFlag) { + this.opcrFlag = opcrFlag; + markDirty(); + } + + public boolean isSplicingPointFlag() { + return splicingPointFlag; + } + + public void setSplicingPointFlag(boolean splicingPointFlag) { + this.splicingPointFlag = splicingPointFlag; + markDirty(); + } + + public boolean isTransportPrivateDataFlag() { + return transportPrivateDataFlag; + } + + public void setTransportPrivateDataFlag(boolean transportPrivateDataFlag) { + this.transportPrivateDataFlag = transportPrivateDataFlag; + markDirty(); + } + + public boolean isAdaptationFieldExtensionFlag() { + return adaptationFieldExtensionFlag; + } + + public void setAdaptationFieldExtensionFlag(boolean adaptationFieldExtensionFlag) { + this.adaptationFieldExtensionFlag = adaptationFieldExtensionFlag; + markDirty(); + } + + public PCR getPcr() { + return pcr; + } + + public void setPcr(PCR pcr) { + this.pcr = pcr; + markDirty(); + } + + public PCR getOpcr() { + return opcr; + } + + public void setOpcr(PCR opcr) { + this.opcr = opcr; + markDirty(); + } + + public byte getSpliceCountdown() { + return spliceCountdown; + } + + public void setSpliceCountdown(byte spliceCountdown) { + this.spliceCountdown = spliceCountdown; + markDirty(); + } + + public byte[] getPrivateData() { + return privateData; + } + + public void setPrivateData(byte[] privateData) { + this.privateData = privateData; + markDirty(); + } + + public byte[] getExtension() { + return extension; + } + + public void setExtension(byte[] extension) { + this.extension = extension; + markDirty(); + } + + public void write(ByteBuffer buffer, int payloadLength) { + int length = 183 - payloadLength; + int remaining = length; + buffer.put((byte) (length & 0xff)); + int firstByte = 0; + + // Discontinuity indicator + if (discontinuityIndicator) { + firstByte |= 0x80; + } + + // Random Access indicator + if (randomAccessIndicator) { + firstByte |= 0x40; + } + + // Elementary stream priority indicator + if (elementaryStreamPriorityIndicator) { + firstByte |= 0x20; + } + + // PCR flag + if (pcrFlag) { + firstByte |= 0x10; + } + + // OPCR flag + if (opcrFlag) { + firstByte |= 0x08; + } + + // Splicing point flag + if (splicingPointFlag) { + firstByte |= 0x04; + } + + // Transport private data flag + if (transportPrivateDataFlag) { + firstByte |= 0x2; + } + + // Adaptation field extension flag + if (adaptationFieldExtensionFlag) { + firstByte |= 0x01; + } + + buffer.put((byte) firstByte); + remaining --; + + if (pcrFlag && pcr != null) { + pcr.write(buffer); + remaining -= 6; + } + + if (opcrFlag && opcr != null) { + opcr.write(buffer); + remaining -= 6; + } + + if (splicingPointFlag) { + buffer.put(spliceCountdown); + remaining--; + } + + if (transportPrivateDataFlag && privateData != null) { + buffer.put(privateData); + remaining -= privateData.length; + } + + if (adaptationFieldExtensionFlag && extension != null) { + buffer.put(extension); + remaining -= extension.length; + } + + if (remaining < 0) { + throw new IllegalStateException("Adaptation field too big!"); + } + while (remaining-- > 0) { + buffer.put((byte)0); + } + } + } + + public MTSPacket(boolean transportErrorIndicator, boolean payloadUnitStartIndicator, boolean transportPriority, int pid, int scramblingControl, int continuityCounter) { + super(); + this.buffer = ByteBuffer.allocate(Constants.MPEGTS_PACKET_SIZE); + this.transportErrorIndicator = transportErrorIndicator; + this.payloadUnitStartIndicator = payloadUnitStartIndicator; + this.transportPriority = transportPriority; + this.pid = pid; + this.scramblingControl = scramblingControl; + this.continuityCounter = continuityCounter; + this.adaptationFieldExist = false; + this.containsPayload = false; + } + + public MTSPacket(ByteBuffer buffer) { + super(buffer); + } + + @Override + protected void write() { + // First write payload + int payloadLength = 0; + if (containsPayload && payload != null) { + payload.clear(); + payloadLength = payload.capacity(); + buffer.position(Constants.MPEGTS_PACKET_SIZE - payloadLength); + buffer.put(payload); + } + buffer.rewind(); + // First byte + buffer.put((byte) 0x47); + + // Bytes 2->3 + int secondAndThirdBytes = 0; + if (transportErrorIndicator) { + secondAndThirdBytes |= 0x8000; + } + if (payloadUnitStartIndicator) { + secondAndThirdBytes |= 0x4000; + } + if (transportPriority) { + secondAndThirdBytes |= 0x2000; + } + secondAndThirdBytes |= (pid & 0x1fff); + + buffer.putShort((short) secondAndThirdBytes); + + int fourthByte = 0; + + // Byte 4 + fourthByte |= (scramblingControl & 0xc0); + if (adaptationFieldExist) { + fourthByte |= 0x20; + } + if (containsPayload) { + fourthByte |= 0x10; + } + fourthByte |= (continuityCounter & 0x0f); + buffer.put((byte) fourthByte); + + // Adaptation field + if (adaptationFieldExist) { + if (adaptationField == null) { + buffer.put((byte) 0); + } else { + adaptationField.write(buffer, payloadLength); + } + } + + // Payload + if (containsPayload && payload != null) { + payload.rewind(); + buffer.put(payload); + } + if (buffer.remaining() != 0) { + throw new IllegalStateException("buffer.remaining() = " + buffer.remaining() + ", should be zero!"); + } + } + + protected void parse() { + // Sync byte + int marker = buffer.get() & 0xff; + Preconditions.checkArgument(Constants.TS_MARKER == marker); + + // Second/Third byte + int secondAndThirdBytes = buffer.getShort(); + + // Transport Error Indicator (TEI) + boolean transportErrorIndicator = (secondAndThirdBytes & 0x8000) != 0; + + // Payload Unit Start Indicator + boolean payloadUnitStartIndicator = (secondAndThirdBytes & 0x4000) != 0; + + // Transport Priority + boolean transportPriority = (secondAndThirdBytes & 0x2000) != 0; + + // Packet Identifier (PID) + int pid = secondAndThirdBytes & 0x1fff; + + // Fourth byte + int fourthByte = buffer.get() & 0xff; + + // Scrambling control + int scramblingControl = fourthByte & 0xc0; + + // Adaptation field exist + boolean adaptationFieldExist = (fourthByte & 0x20) != 0; + + // Contains payload + boolean containsPayload = (fourthByte & 0x10) != 0; + + // Continuity counter + int continuityCounter = fourthByte & 0x0f; + + MTSPacket.AdaptationField adaptationField = null; + if (adaptationFieldExist) { + // Adaptation Field Length + int adaptationFieldLength = buffer.get() & 0xff; + if (adaptationFieldLength != 0) { + + int remainingBytes = adaptationFieldLength; + + // Get next byte + int nextByte = buffer.get() & 0xff; + remainingBytes--; + + // Discontinuity indicator + boolean discontinuityIndicator = (nextByte & 0x80) != 0; + + // Random Access indicator + boolean randomAccessIndicator = (nextByte & 0x40) != 0; + + // Elementary stream priority indicator + boolean elementaryStreamPriorityIndicator = (nextByte & 0x20) != 0; + + // PCR flag + boolean pcrFlag = (nextByte & 0x10) != 0; + + // OPCR flag + boolean opcrFlag = (nextByte & 0x08) != 0; + + // Splicing point flag + boolean splicingPointFlag = (nextByte & 0x04) != 0; + + // Transport private data flag + boolean transportPrivateDataFlag = (nextByte & 0x2) != 0; + + // Adaptation field extension flag + boolean adaptationFieldExtensionFlag = (nextByte & 0x01) != 0; + + // PCR + MTSPacket.AdaptationField.PCR pcr = null; + if (pcrFlag) { + pcr = parsePCR(); + remainingBytes -= 6; + } + + // OPCR + MTSPacket.AdaptationField.PCR opcr = null; + if (opcrFlag) { + opcr = parsePCR(); + remainingBytes -= 6; + } + + // Splice countdown + byte spliceCountdown = 0; + if (splicingPointFlag) { + spliceCountdown = buffer.get(); + remainingBytes--; + } + + byte[] privateData = null; + if (transportPrivateDataFlag) { + int transportPrivateDataLength = buffer.get() & 0xff; + privateData = new byte[transportPrivateDataLength]; + buffer.get(privateData); + remainingBytes -= transportPrivateDataLength; + } + + byte[] extension = null; + if (adaptationFieldExtensionFlag) { + int extensionLength = buffer.get() & 0xff; + extension = new byte[extensionLength]; + buffer.get(extension); + remainingBytes -= extensionLength; + } + + // Skip remaining bytes + NIOUtils.skip(buffer, remainingBytes); + + adaptationField = new MTSPacket.AdaptationField(this, discontinuityIndicator, randomAccessIndicator, elementaryStreamPriorityIndicator, pcrFlag, opcrFlag, splicingPointFlag, transportPrivateDataFlag, adaptationFieldExtensionFlag, pcr, opcr, spliceCountdown, privateData, extension); + } + } + + this.transportErrorIndicator = transportErrorIndicator; + this.payloadUnitStartIndicator = payloadUnitStartIndicator; + this.transportPriority = transportPriority; + this.pid = pid; + this.scramblingControl = scramblingControl; + this.adaptationFieldExist = adaptationFieldExist; + this.containsPayload = containsPayload; + this.continuityCounter = continuityCounter; + this.adaptationField = adaptationField; + + // Payload + this.payload = containsPayload ? buffer.slice() : null; + } + + private AdaptationField.PCR parsePCR() { + AdaptationField.PCR pcr; + byte[] pcrBytes = new byte[6]; + buffer.get(pcrBytes); + + long pcrBits = ((pcrBytes[0] & 0xffL) << 40) | ((pcrBytes[1] & 0xffL) << 32) | ((pcrBytes[2] & 0xffL) << 24) | ((pcrBytes[3] & 0xffL) << 16) | ((pcrBytes[4] & 0xffL) << 8) | (pcrBytes[5] & 0xffL); + long base = (pcrBits & 0xFFFFFFFF8000L) >> 15; + byte reserved = (byte) ((pcrBits & 0x7E00) >> 9); + int extension = (int) (pcrBits & 0x1FFL); + pcr = new AdaptationField.PCR(null, base, extension, reserved); + return pcr; + } + + public boolean isTransportErrorIndicator() { + return transportErrorIndicator; + } + + public void setTransportErrorIndicator(boolean transportErrorIndicator) { + this.transportErrorIndicator = transportErrorIndicator; + markDirty(); + } + + public boolean isPayloadUnitStartIndicator() { + return payloadUnitStartIndicator; + } + + public void setPayloadUnitStartIndicator(boolean payloadUnitStartIndicator) { + this.payloadUnitStartIndicator = payloadUnitStartIndicator; + markDirty(); + } + + public boolean isTransportPriority() { + return transportPriority; + } + + public void setTransportPriority(boolean transportPriority) { + this.transportPriority = transportPriority; + markDirty(); + } + + public int getPid() { + return pid; + } + + public void setPid(int pid) { + this.pid = pid; + markDirty(); + } + + public int getScramblingControl() { + return scramblingControl; + } + + public void setScramblingControl(int scramblingControl) { + this.scramblingControl = scramblingControl; + markDirty(); + } + + public boolean isAdaptationFieldExist() { + return adaptationFieldExist; + } + + public void setAdaptationFieldExist(boolean adaptationFieldExist) { + this.adaptationFieldExist = adaptationFieldExist; + markDirty(); + } + + public boolean isContainsPayload() { + return containsPayload; + } + + public void setContainsPayload(boolean containsPayload) { + this.containsPayload = containsPayload; + markDirty(); + } + + public int getContinuityCounter() { + return continuityCounter; + } + + public void setContinuityCounter(int continuityCounter) { + this.continuityCounter = continuityCounter; + markDirty(); + } + + public AdaptationField getAdaptationField() { + return adaptationField; + } + + public void setAdaptationField(AdaptationField adaptationField) { + this.adaptationField = adaptationField; + markDirty(); + } + + public ByteBuffer getPayload() { + return payload; + } + + public void setPayload(ByteBuffer payload) { + this.payload = payload; + markDirty(); + } +} \ No newline at end of file diff --git a/src/main/java/org/taktik/mpegts/Merger.java b/src/main/java/org/taktik/mpegts/Merger.java new file mode 100644 index 00000000..c3d9e65e --- /dev/null +++ b/src/main/java/org/taktik/mpegts/Merger.java @@ -0,0 +1,63 @@ +package org.taktik.mpegts; +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.WRITE; + +import java.io.File; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.util.Arrays; + +import org.taktik.mpegts.sinks.ByteChannelSink; +import org.taktik.mpegts.sinks.MTSSink; +import org.taktik.mpegts.sources.MTSSource; +import org.taktik.mpegts.sources.MTSSources; +import org.taktik.mpegts.sources.MultiplexingMTSSource; +import org.taktik.mpegts.sources.MultiplexingMTSSource.MultiplexingMTSSourceBuilder; + +public class Merger { + + public static void main(String[] args) throws IOException { + File[] files = new File("/home/henni/devel/ctbrec/remux-ts-mp4/src/test/resources").listFiles((f) -> f.getName().startsWith("63")); + //File[] files = new File("/home/henni/devel/ctbrec/mpegts-streamer/src/test/resources/yesikasaenz/2018-09-04_23-30").listFiles((f) -> { + // return f.getName().startsWith("media") && f.getName().endsWith(".ts"); + // }); + Arrays.sort(files); + + MultiplexingMTSSourceBuilder builder = MultiplexingMTSSource.builder() + .setFixContinuity(true); + + + for (File file : files) { + MTSSource source = MTSSources.from(file); + builder.addSource(source); + } + + File out = new File("merged.ts"); + FileChannel channel = null; + try { + channel = FileChannel.open(out.toPath(), CREATE, WRITE); + MTSSource mtsSource = builder.build(); + MTSSink sink = ByteChannelSink.builder().setByteChannel(channel).build(); + + // build streamer + Streamer streamer = Streamer.builder() + .setSource(mtsSource) + .setSink(sink) + .build(); + + // Start streaming + streamer.stream(); + + // synchronized (streamer) { + // System.out.println("Waiting for streamer to finish"); + // streamer.wait(); + // System.out.println("Streamer finished"); + // } + } catch(Exception e) { + e.printStackTrace(); + } finally { + channel.close(); + } + } + +} diff --git a/src/main/java/org/taktik/mpegts/PATSection.java b/src/main/java/org/taktik/mpegts/PATSection.java new file mode 100644 index 00000000..40e155de --- /dev/null +++ b/src/main/java/org/taktik/mpegts/PATSection.java @@ -0,0 +1,49 @@ +package org.taktik.mpegts; + + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Lists; + +public class PATSection extends PSISection { + private Integer[] networkPids; + private Map programs; + + public PATSection(PSISection psi, Integer[] networkPids, Map programs) { + super(psi); + this.networkPids = networkPids; + this.programs = programs; + } + + public Integer[] getNetworkPids() { + return networkPids; + } + + public Map getPrograms() { + return programs; + } + + public static PATSection parse(ByteBuffer data) { + PSISection psi = PSISection.parse(data); + if (psi == null) { + return null; + } + List networkPids = Lists.newArrayList(); + Map programs = new HashMap<>(); + + while (data.remaining() > 4) { + int programNum = data.getShort() & 0xffff; + int w = data.getShort(); + int pid = w & 0x1fff; + if (programNum == 0) + networkPids.add(pid); + else + programs.put(programNum, pid); + } + + return new PATSection(psi, networkPids.toArray(new Integer[networkPids.size()]), programs); + } +} \ No newline at end of file diff --git a/src/main/java/org/taktik/mpegts/PMTSection.java b/src/main/java/org/taktik/mpegts/PMTSection.java new file mode 100644 index 00000000..de261edc --- /dev/null +++ b/src/main/java/org/taktik/mpegts/PMTSection.java @@ -0,0 +1,133 @@ +package org.taktik.mpegts; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.taktik.ioutils.NIOUtils; + + + +/** + * This class is part of JCodec ( www.jcodec.org ) This software is distributed + * under FreeBSD License + * + * Represents PMT ( Program Map Table ) of the MPEG Transport stream + * + * This section contains information about streams of an individual program, a + * program usually contains two or more streams, such as video, audio, text, + * etc.. + * + * @author The JCodec project + * + */ +public class PMTSection extends PSISection { + + private int pcrPid; + // private Tag[] tags; + // private PMTStream[] streams; + + public PMTSection(PSISection psi, int pcrPid) {//, Tag[] tags, PMTStream[] streams) { + super(psi); + this.pcrPid = pcrPid; + // this.tags = tags; + // this.streams = streams; + } + + public int getPcrPid() { + return pcrPid; + } + + // public Tag[] getTags() { + // return tags; + // } + // + // public PMTStream[] getStreams() { + // return streams; + // } + + public static PMTSection parse(ByteBuffer data) { + PSISection psi = PSISection.parse(data); + + int w1 = data.getShort() & 0xffff; + int pcrPid = w1 & 0x1fff; + + int w2 = data.getShort() & 0xffff; + int programInfoLength = w2 & 0xfff; + + // List tags = parseTags(NIOUtils.read(data, programInfoLength)); + // List streams = new ArrayList(); + // while (data.remaining() > 4) { + // int streamType = data.get() & 0xff; + // int wn = data.getShort() & 0xffff; + // int elementaryPid = wn & 0x1fff; + // + // + // int wn1 = data.getShort() & 0xffff; + // int esInfoLength = wn1 & 0xfff; + // ByteBuffer read = NIOUtils.read(data, esInfoLength); + // streams.add(new PMTStream(streamType, elementaryPid, MPSUtils.parseDescriptors(read))); + // } + + return new PMTSection(psi, pcrPid); + } + + static List parseTags(ByteBuffer bb) { + List tags = new ArrayList(); + while (bb.hasRemaining()) { + int tag = bb.get(); + int tagLen = bb.get(); + tags.add(new Tag(tag, NIOUtils.read(bb, tagLen))); + } + return tags; + } + + + public static class Tag { + private int tag; + private ByteBuffer content; + + public Tag(int tag, ByteBuffer content) { + this.tag = tag; + this.content = content; + } + + public int getTag() { + return tag; + } + + public ByteBuffer getContent() { + return content; + } + } + + // public static class PMTStream { + // private int streamTypeTag; + // private int pid; + // private List descriptors; + // private StreamType streamType; + // + // public PMTStream(int streamTypeTag, int pid, List descriptors) { + // this.streamTypeTag = streamTypeTag; + // this.pid = pid; + // this.descriptors = descriptors; + // this.streamType = StreamType.fromTag(streamTypeTag); + // } + // + // public int getStreamTypeTag() { + // return streamTypeTag; + // } + // + // public StreamType getStreamType() { + // return streamType; + // } + // + // public int getPid() { + // return pid; + // } + // + // public List getDesctiptors() { + // return descriptors; + // } + // } +} \ No newline at end of file diff --git a/src/main/java/org/taktik/mpegts/PSISection.java b/src/main/java/org/taktik/mpegts/PSISection.java new file mode 100644 index 00000000..89b0215b --- /dev/null +++ b/src/main/java/org/taktik/mpegts/PSISection.java @@ -0,0 +1,90 @@ +package org.taktik.mpegts; + + +import java.nio.ByteBuffer; + +/** + * This class is part of JCodec ( www.jcodec.org ) This software is distributed + * under FreeBSD License + * + * Represents a section of PSI payload ( Program Stream Information ) MPEG + * Transport stream + * + * @author The JCodec project + * + */ +public class PSISection { + private int tableId; + private int specificId; + private int versionNumber; + private int currentNextIndicator; + private int sectionNumber; + private int lastSectionNumber; + + /** + * A copy constructor + * + * @param other + */ + public PSISection(PSISection other) { + this(other.tableId, other.specificId, other.versionNumber, other.currentNextIndicator, other.sectionNumber, + other.lastSectionNumber); + } + + public PSISection(int tableId, int specificId, int versionNumber, int currentNextIndicator, int sectionNumber, + int lastSectionNumber) { + this.tableId = tableId; + this.specificId = specificId; + this.versionNumber = versionNumber; + this.currentNextIndicator = currentNextIndicator; + this.sectionNumber = sectionNumber; + this.lastSectionNumber = lastSectionNumber; + } + + public static PSISection parse(ByteBuffer data) { + int tableId = data.get() & 0xff; + int w0 = data.getShort() & 0xffff; + if ((w0 & 0xC000) != 0x8000) { + return null; + } + + int sectionLength = w0 & 0xfff; + + data.limit(data.position() + sectionLength); + + int specificId = data.getShort() & 0xffff; + int b0 = data.get() & 0xff; + int versionNumber = (b0 >> 1) & 0x1f; + int currentNextIndicator = b0 & 1; + + int sectionNumber = data.get() & 0xff; + int lastSectionNumber = data.get() & 0xff; + + return new PSISection(tableId, specificId, versionNumber, currentNextIndicator, sectionNumber, + lastSectionNumber); + } + + public int getTableId() { + return tableId; + } + + public int getSpecificId() { + return specificId; + } + + public int getVersionNumber() { + return versionNumber; + } + + public int getCurrentNextIndicator() { + return currentNextIndicator; + } + + public int getSectionNumber() { + return sectionNumber; + } + + public int getLastSectionNumber() { + return lastSectionNumber; + } +} \ No newline at end of file diff --git a/src/main/java/org/taktik/mpegts/PacketSupport.java b/src/main/java/org/taktik/mpegts/PacketSupport.java new file mode 100644 index 00000000..93dee48b --- /dev/null +++ b/src/main/java/org/taktik/mpegts/PacketSupport.java @@ -0,0 +1,37 @@ +package org.taktik.mpegts; + + +import java.nio.ByteBuffer; + +public abstract class PacketSupport { + protected ByteBuffer buffer; + protected boolean dirty; + + public PacketSupport() { + dirty = false; + } + + public PacketSupport(ByteBuffer buffer) { + this.buffer = buffer; + buffer.rewind(); + parse(); + buffer.rewind(); + dirty = false; + } + + public ByteBuffer getBuffer() { + if (dirty) { + write(); + buffer.rewind(); + dirty = false; + } + return buffer; + } + + protected abstract void parse(); + protected abstract void write(); + + protected void markDirty() { + dirty = true; + } +} diff --git a/src/main/java/org/taktik/mpegts/Streamer.java b/src/main/java/org/taktik/mpegts/Streamer.java new file mode 100644 index 00000000..82d81ef0 --- /dev/null +++ b/src/main/java/org/taktik/mpegts/Streamer.java @@ -0,0 +1,323 @@ +package org.taktik.mpegts; + +import java.nio.ByteBuffer; +import java.util.TreeMap; +import java.util.concurrent.ArrayBlockingQueue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.taktik.mpegts.sinks.MTSSink; +import org.taktik.mpegts.sources.MTSSource; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +public class Streamer { + static final Logger log = LoggerFactory.getLogger("streamer"); + + private MTSSource source; + private MTSSink sink; + + private ArrayBlockingQueue buffer; + private int bufferSize; + private boolean endOfSourceReached; + private boolean streamingShouldStop; + + private PATSection patSection; + private TreeMap pmtSection; + + private Thread bufferingThread; + private Thread streamingThread; + + private boolean sleepingEnabled; + + private Streamer(MTSSource source, MTSSink sink, int bufferSize, boolean sleepingEnabled) { + this.source = source; + this.sink = sink; + this.bufferSize = bufferSize; + this.sleepingEnabled = sleepingEnabled; + } + + public void stream() throws InterruptedException { + buffer = new ArrayBlockingQueue<>(bufferSize); + patSection = null; + pmtSection = Maps.newTreeMap(); + endOfSourceReached = false; + streamingShouldStop = false; + log.info("PreBuffering {} packets", bufferSize); + try { + preBuffer(); + } catch (Exception e) { + throw new IllegalStateException("Error while bufering", e); + } + log.info("Done PreBuffering"); + + bufferingThread = new Thread(this::fillBuffer, "buffering"); + bufferingThread.setDaemon(true); + bufferingThread.start(); + + streamingThread = new Thread(this::internalStream, "streaming"); + streamingThread.setDaemon(true); + streamingThread.start(); + + bufferingThread.join(); + streamingThread.join(); + } + + public void stop() { + try { + source.close(); + } catch (Exception e) { + log.error("Couldn't close source", e); + } + try { + sink.close(); + } catch (Exception e) { + log.error("Couldn't close sink", e); + } + streamingShouldStop = true; + buffer.clear(); + bufferingThread.interrupt(); + streamingThread.interrupt(); + } + + private void internalStream() { + boolean resetState = false; + MTSPacket packet; + long packetCount = 0; + //long pcrPidPacketCount = 0; + Long firstPcrValue = null; + Long firstPcrTime = null; + //Long firstPcrPacketCount = null; + Long lastPcrValue = null; + Long lastPcrTime = null; + //Long lastPcrPacketCount = null; + Long averageSleep = null; + while (!streamingShouldStop) { + if (resetState) { + firstPcrValue = null; + firstPcrTime = null; + lastPcrValue = null; + lastPcrTime = null; + averageSleep = null; + resetState = false; + } + + // Initialize time to sleep + long sleepNanos = 0; + + packet = buffer.poll(); + + if (packet == null) { + if (endOfSourceReached) { + packet = buffer.poll(); + if (packet == null) { + break; + } + } else { + continue; + } + } + + int pid = packet.getPid(); + + if (pid == 0 && packet.isPayloadUnitStartIndicator()) { + ByteBuffer payload = packet.getPayload(); + payload.rewind(); + int pointer = payload.get() & 0xff; + payload.position(payload.position() + pointer); + patSection = PATSection.parse(payload); + for (Integer pmtPid : pmtSection.keySet()) { + if (!patSection.getPrograms().values().contains(pmtPid)) { + pmtSection.remove(pmtPid); + } + } + } + + if (pid != 0 && patSection!=null) { + if (patSection.getPrograms().values().contains(pid)) { + if (packet.isPayloadUnitStartIndicator()) { + ByteBuffer payload = packet.getPayload(); + payload.rewind(); + int pointer = payload.get() & 0xff; + payload.position(payload.position() + pointer); + pmtSection.put(pid, PMTSection.parse(payload)); + } + } + + } + + // Check PID matches PCR PID + if (true) {//mtsPacket.pid == pmt.getPcrPid()) { + //pcrPidPacketCount++; + + if (averageSleep != null) { + sleepNanos = averageSleep; + } else { + // if (pcrPidPacketCount < 2) { + // if (pcrPidPacketCount % 10 == 0) { + // sleepNanos = 15; + // } + // } + } + } + + // Check for PCR + if (packet.getAdaptationField() != null) { + if (packet.getAdaptationField().getPcr() != null) { + if (packet.getPid() == getPCRPid()) { + if (!packet.getAdaptationField().isDiscontinuityIndicator()) { + // Get PCR and current nano time + long pcrValue = packet.getAdaptationField().getPcr().getValue(); + long pcrTime = System.nanoTime(); + + // Compute sleepNanosOrig + if (firstPcrValue == null || firstPcrTime == null) { + firstPcrValue = pcrValue; + firstPcrTime = pcrTime; + //firstPcrPacketCount = pcrPidPacketCount; + } + + // Compute sleepNanosPrevious + Long sleepNanosPrevious = null; + if (lastPcrValue != null && lastPcrTime != null) { + if (pcrValue <= lastPcrValue) { + log.error("PCR discontinuity ! " + packet.getPid()); + resetState = true; + } else { + sleepNanosPrevious = ((pcrValue - lastPcrValue) / 27 * 1000) - (pcrTime - lastPcrTime); + } + } + // System.out.println("pcrValue=" + pcrValue + ", lastPcrValue=" + lastPcrValue + ", sleepNanosPrevious=" + sleepNanosPrevious + ", sleepNanosOrig=" + sleepNanosOrig); + + // Set sleep time based on PCR if possible + if (sleepNanosPrevious != null) { + // Safety : We should never have to wait more than 100ms + if (sleepNanosPrevious > 100000000) { + log.warn("PCR sleep ignored, too high !"); + resetState = true; + } else { + sleepNanos = sleepNanosPrevious; + // averageSleep = sleepNanosPrevious / (pcrPidPacketCount - lastPcrPacketCount - 1); + } + } + + // Set lastPcrValue/lastPcrTime + lastPcrValue = pcrValue; + lastPcrTime = pcrTime + sleepNanos; + //lastPcrPacketCount = pcrPidPacketCount; + } else { + log.warn("Skipped PCR - Discontinuity indicator"); + } + } else { + log.debug("Skipped PCR - PID does not match"); + } + } + } + + // Sleep if needed + if (sleepNanos > 0 && sleepingEnabled) { + log.trace("Sleeping " + sleepNanos / 1000000 + " millis, " + sleepNanos % 1000000 + " nanos"); + try { + Thread.sleep(sleepNanos / 1000000, (int) (sleepNanos % 1000000)); + } catch (InterruptedException e) { + log.warn("Streaming sleep interrupted!"); + } + } + + // Stream packet + // System.out.println("Streaming packet #" + packetCount + ", PID=" + mtsPacket.getPid() + ", pcrCount=" + pcrCount + ", continuityCounter=" + mtsPacket.getContinuityCounter()); + + try { + sink.send(packet); + } catch (Exception e) { + log.error("Error sending packet to sink", e); + } + + packetCount++; + } + log.info("Sent {} MPEG-TS packets", packetCount); + synchronized (this) { + notifyAll(); + } + } + + private void preBuffer() throws Exception { + MTSPacket packet; + int packetNumber = 0; + while ((packetNumber < bufferSize) && (packet = source.nextPacket()) != null) { + buffer.add(packet); + packetNumber++; + } + } + + private void fillBuffer() { + try { + MTSPacket packet; + while (!streamingShouldStop && (packet = source.nextPacket()) != null) { + boolean put = false; + while (!put) { + try { + buffer.put(packet); + put = true; + } catch (InterruptedException ignored) { + + } + } + } + } catch (InterruptedException e) { + if(!streamingShouldStop) { + log.error("Error reading from source", e); + } + } catch (Exception e) { + log.error("Error reading from source", e); + } finally { + endOfSourceReached = true; + } + } + + private int getPCRPid() { + if ((!pmtSection.isEmpty())) { + // TODO change this + return pmtSection.values().iterator().next().getPcrPid(); + } + return -1; + } + + public static StreamerBuilder builder() { + return new StreamerBuilder(); + } + + public static class StreamerBuilder { + private MTSSink sink; + private MTSSource source; + private int bufferSize = 1000; + private boolean sleepingEnabled = false; + + public StreamerBuilder setSink(MTSSink sink) { + this.sink = sink; + return this; + } + + public StreamerBuilder setSource(MTSSource source) { + this.source = source; + return this; + } + + public StreamerBuilder setBufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public StreamerBuilder setSleepingEnabled(boolean sleepingEnabled) { + this.sleepingEnabled = sleepingEnabled; + return this; + } + + public Streamer build() { + Preconditions.checkNotNull(sink); + Preconditions.checkNotNull(source); + return new Streamer(source, sink, bufferSize, sleepingEnabled); + } + } +} \ No newline at end of file diff --git a/src/main/java/org/taktik/mpegts/StreamerTest.java b/src/main/java/org/taktik/mpegts/StreamerTest.java new file mode 100644 index 00000000..35515732 --- /dev/null +++ b/src/main/java/org/taktik/mpegts/StreamerTest.java @@ -0,0 +1,45 @@ +package org.taktik.mpegts; + +import org.taktik.mpegts.sinks.MTSSink; +import org.taktik.mpegts.sinks.UDPTransport; +import org.taktik.mpegts.sources.MTSSource; +import org.taktik.mpegts.sources.MTSSources; +import org.taktik.mpegts.sources.ResettableMTSSource; + +import java.io.File; + +public class StreamerTest { + public static void main(String[] args) throws Exception { + + // Set up mts sink + MTSSink transport = UDPTransport.builder() + //.setAddress("239.222.1.1") + .setAddress("127.0.0.1") + .setPort(1234) + .setSoTimeout(5000) + .setTtl(1) + .build(); + + + ResettableMTSSource ts1 = MTSSources.from(new File("/Users/abaudoux/Downloads/EBSrecording.mpg")); + + // media132, media133 --> ok + // media133, media132 --> ok + // media123, media132 --> ko + + + // Build source + MTSSource source = MTSSources.loop(ts1); + + // build streamer + Streamer streamer = Streamer.builder() + .setSource(source) + //.setSink(ByteChannelSink.builder().setByteChannel(fc).build()) + .setSink(transport) + .build(); + + // Start streaming + streamer.stream(); + + } +} diff --git a/src/main/java/org/taktik/mpegts/sinks/ByteChannelSink.java b/src/main/java/org/taktik/mpegts/sinks/ByteChannelSink.java new file mode 100644 index 00000000..435b1657 --- /dev/null +++ b/src/main/java/org/taktik/mpegts/sinks/ByteChannelSink.java @@ -0,0 +1,43 @@ +package org.taktik.mpegts.sinks; + +import org.taktik.mpegts.MTSPacket; + +import java.nio.channels.ByteChannel; + +public class ByteChannelSink implements MTSSink { + + private ByteChannel byteChannel; + + private ByteChannelSink(ByteChannel byteChannel) { + this.byteChannel = byteChannel; + } + + public static ByteChannelSinkBuilder builder() { + return new ByteChannelSinkBuilder(); + } + + @Override + public void send(MTSPacket packet) throws Exception { + byteChannel.write(packet.getBuffer()); + } + + @Override + public void close() throws Exception { + byteChannel.close(); + } + + public static class ByteChannelSinkBuilder { + private ByteChannel byteChannel; + + private ByteChannelSinkBuilder(){} + + public ByteChannelSink build() { + return new ByteChannelSink(byteChannel); + } + + public ByteChannelSinkBuilder setByteChannel(ByteChannel byteChannel) { + this.byteChannel = byteChannel; + return this; + } + } +} diff --git a/src/main/java/org/taktik/mpegts/sinks/MTSSink.java b/src/main/java/org/taktik/mpegts/sinks/MTSSink.java new file mode 100644 index 00000000..f84f00c2 --- /dev/null +++ b/src/main/java/org/taktik/mpegts/sinks/MTSSink.java @@ -0,0 +1,7 @@ +package org.taktik.mpegts.sinks; + +import org.taktik.mpegts.MTSPacket; + +public interface MTSSink extends AutoCloseable { + void send(MTSPacket packet) throws Exception; +} diff --git a/src/main/java/org/taktik/mpegts/sinks/UDPTransport.java b/src/main/java/org/taktik/mpegts/sinks/UDPTransport.java new file mode 100644 index 00000000..786148ef --- /dev/null +++ b/src/main/java/org/taktik/mpegts/sinks/UDPTransport.java @@ -0,0 +1,77 @@ +package org.taktik.mpegts.sinks; + +import com.google.common.base.Preconditions; +import org.taktik.mpegts.MTSPacket; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.nio.ByteBuffer; + +public class UDPTransport implements MTSSink { + + private final InetSocketAddress inetSocketAddress; + private final MulticastSocket multicastSocket; + + + private UDPTransport(String address, int port, int ttl, int soTimeout) throws IOException { + // InetSocketAddress + inetSocketAddress = new InetSocketAddress(address, port); + + // Create the socket but we don't bind it as we are only going to send data + // Note that we don't have to join the multicast group if we are only sending data and not receiving + multicastSocket = new MulticastSocket(); + multicastSocket.setReuseAddress(true); + multicastSocket.setSoTimeout(soTimeout); + multicastSocket.setTimeToLive(ttl); + } + + public static UDPTransport.UDPTransportBuilder builder() { + return new UDPTransportBuilder(); + } + + @Override + public void send(MTSPacket packet) throws IOException { + ByteBuffer buffer = packet.getBuffer(); + Preconditions.checkArgument(buffer.hasArray()); + DatagramPacket datagramPacket = new DatagramPacket(buffer.array(), buffer.arrayOffset(), buffer.limit(), inetSocketAddress); + multicastSocket.send(datagramPacket); + } + + @Override + public void close() { + multicastSocket.close(); + } + + public static class UDPTransportBuilder { + private String address; + private int port; + private int ttl; + private int soTimeout; + + public UDPTransportBuilder setAddress(String address) { + this.address = address; + return this; + } + + public UDPTransportBuilder setPort(int port) { + this.port = port; + return this; + } + + public UDPTransportBuilder setTtl(int ttl) { + this.ttl = ttl; + return this; + } + + public UDPTransportBuilder setSoTimeout(int timeout) { + this.soTimeout = timeout; + return this; + } + + public UDPTransport build() throws IOException { + return new UDPTransport(address, port, ttl, soTimeout); + } + } +} diff --git a/src/main/java/org/taktik/mpegts/sources/AbstractByteChannelMTSSource.java b/src/main/java/org/taktik/mpegts/sources/AbstractByteChannelMTSSource.java new file mode 100644 index 00000000..5cc52fdd --- /dev/null +++ b/src/main/java/org/taktik/mpegts/sources/AbstractByteChannelMTSSource.java @@ -0,0 +1,117 @@ +package org.taktik.mpegts.sources; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.taktik.ioutils.NIOUtils; +import org.taktik.mpegts.Constants; +import org.taktik.mpegts.MTSPacket; + +public abstract class AbstractByteChannelMTSSource extends AbstractMTSSource { + static final Logger log = LoggerFactory.getLogger("source"); + + private static final int BUFFER_SIZE = Constants.MPEGTS_PACKET_SIZE * 1000; + + protected ByteBuffer buffer; + protected T byteChannel; + + + protected AbstractByteChannelMTSSource(T byteChannel) throws IOException { + this.byteChannel = byteChannel; + fillBuffer(); + } + + protected void fillBuffer() throws IOException { + buffer = ByteBuffer.allocate(BUFFER_SIZE); + NIOUtils.read(byteChannel, buffer); + buffer.flip(); + } + + protected boolean lastBuffer() { + return buffer.capacity() > buffer.limit(); + } + + @Override + protected MTSPacket nextPacketInternal() throws IOException { + ByteBuffer packetBuffer = null; + while (true) { + boolean foundFirstMarker = false; + int skipped = 0; + while (!foundFirstMarker) { + if (!buffer.hasRemaining()) { + if (lastBuffer()) { + return null; + } + buffer = ByteBuffer.allocate(BUFFER_SIZE); + if (NIOUtils.read(byteChannel, buffer) <= 0) { + return null; + } + buffer.flip(); + } + if ((buffer.get(buffer.position()) & 0xff) == Constants.TS_MARKER) { + foundFirstMarker = true; + } else { + buffer.position(buffer.position() + 1); + skipped++; + } + } + if (skipped > 0) { + log.info("Skipped {} bytes looking for TS marker", skipped); + } + if (buffer.remaining() >= Constants.MPEGTS_PACKET_SIZE) { + if ((buffer.remaining() == Constants.MPEGTS_PACKET_SIZE) || + (buffer.get(buffer.position() + Constants.MPEGTS_PACKET_SIZE) & 0xff) == Constants.TS_MARKER) { + packetBuffer = buffer.slice(); + packetBuffer.limit(Constants.MPEGTS_PACKET_SIZE); + buffer.position(buffer.position() + Constants.MPEGTS_PACKET_SIZE); + } else { + log.info("no second marker found"); + buffer.position(buffer.position() + 1); + } + } else if (!lastBuffer()) { + log.info("NEW BUFFER"); + + ByteBuffer newBuffer = ByteBuffer.allocate(BUFFER_SIZE); + newBuffer.put(buffer); + buffer = newBuffer; + if (NIOUtils.read(byteChannel, buffer) <= 0) { + return null; + } + buffer.flip(); + if (buffer.remaining() >= Constants.MPEGTS_PACKET_SIZE) { + if ((buffer.remaining() == Constants.MPEGTS_PACKET_SIZE) || + (buffer.get(buffer.position() + Constants.MPEGTS_PACKET_SIZE) & 0xff) == Constants.TS_MARKER) { + packetBuffer = buffer.slice(); + packetBuffer.limit(Constants.MPEGTS_PACKET_SIZE); + buffer.position(buffer.position() + Constants.MPEGTS_PACKET_SIZE); + } else { + log.info("no second marker found"); + buffer.position(buffer.position() + 1); + } + } else { + return null; + } + } else { + return null; + } + + if (packetBuffer != null) { + // Parse the packet + try { + return new MTSPacket(packetBuffer); + } catch (Exception e) { + packetBuffer = null; + log.warn("Error parsing packet", e); + } + } + } + } + + @Override + protected void closeInternal() throws Exception { + byteChannel.close(); + } +} diff --git a/src/main/java/org/taktik/mpegts/sources/AbstractMTSSource.java b/src/main/java/org/taktik/mpegts/sources/AbstractMTSSource.java new file mode 100644 index 00000000..0c6e6cde --- /dev/null +++ b/src/main/java/org/taktik/mpegts/sources/AbstractMTSSource.java @@ -0,0 +1,40 @@ +package org.taktik.mpegts.sources; + +import org.taktik.mpegts.MTSPacket; + +public abstract class AbstractMTSSource implements MTSSource { + private boolean closed; + + @Override + public final MTSPacket nextPacket() throws Exception { + if (isClosed()) { + throw new IllegalStateException("Source is closed"); + } + return nextPacketInternal(); + } + + @Override + public final void close() throws Exception { + try { + closeInternal(); + } finally { + closed = true; + } + } + + protected boolean isClosed() { + return closed; + } + + protected abstract MTSPacket nextPacketInternal() throws Exception; + protected abstract void closeInternal() throws Exception; + + + @Override + protected void finalize() throws Throwable { + if (!closed) { + close(); + } + super.finalize(); + } +} diff --git a/src/main/java/org/taktik/mpegts/sources/BlockingMultiMTSSource.java b/src/main/java/org/taktik/mpegts/sources/BlockingMultiMTSSource.java new file mode 100644 index 00000000..bca00c7f --- /dev/null +++ b/src/main/java/org/taktik/mpegts/sources/BlockingMultiMTSSource.java @@ -0,0 +1,71 @@ +package org.taktik.mpegts.sources; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.taktik.mpegts.MTSPacket; + +public class BlockingMultiMTSSource extends AbstractMTSSource implements AutoCloseable { + + private final boolean fixContinuity; + private ContinuityFixer continuityFixer; + + private final BlockingQueue sources; + private MTSSource currentSource; + + private BlockingMultiMTSSource(boolean fixContinuity) { + this.fixContinuity = fixContinuity; + if (fixContinuity) { + continuityFixer = new ContinuityFixer(); + } + this.sources = new LinkedBlockingQueue<>(); + } + + public void addSource(MTSSource source) throws InterruptedException { + this.sources.put(source); + } + + @Override + protected MTSPacket nextPacketInternal() throws Exception { + if(currentSource == null) { + currentSource = sources.take(); + } + + MTSPacket packet = currentSource.nextPacket(); + if(packet == null) { + // end of source has been reached, switch to the next source + currentSource.close(); + currentSource = sources.take(); + packet = currentSource.nextPacket(); + } + + if (fixContinuity) { + continuityFixer.fixContinuity(packet); + } + return packet; + } + + @Override + protected void closeInternal() throws Exception { + for (MTSSource source : sources) { + source.close(); + } + } + + public static BlockingMultiMTSSourceBuilder builder() { + return new BlockingMultiMTSSourceBuilder(); + } + + public static class BlockingMultiMTSSourceBuilder { + boolean fixContinuity = false; + + public BlockingMultiMTSSourceBuilder setFixContinuity(boolean fixContinuity) { + this.fixContinuity = fixContinuity; + return this; + } + + public BlockingMultiMTSSource build() { + return new BlockingMultiMTSSource(fixContinuity); + } + } +} diff --git a/src/main/java/org/taktik/mpegts/sources/ByteChannelMTSSource.java b/src/main/java/org/taktik/mpegts/sources/ByteChannelMTSSource.java new file mode 100644 index 00000000..534dab5a --- /dev/null +++ b/src/main/java/org/taktik/mpegts/sources/ByteChannelMTSSource.java @@ -0,0 +1,37 @@ +package org.taktik.mpegts.sources; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; + +import com.google.common.base.Preconditions; +import org.taktik.ioutils.NIOUtils; +import org.taktik.mpegts.Constants; +import org.taktik.mpegts.MTSPacket; + +public class ByteChannelMTSSource extends AbstractByteChannelMTSSource { + + private ByteChannelMTSSource(ByteChannel byteChannel) throws IOException { + super(byteChannel); + } + + public static ByteChannelMTSSourceBuilder builder() { + return new ByteChannelMTSSourceBuilder(); + } + + public static class ByteChannelMTSSourceBuilder { + private ByteChannel byteChannel; + + private ByteChannelMTSSourceBuilder(){} + + public ByteChannelMTSSourceBuilder setByteChannel(ByteChannel byteChannel) { + this.byteChannel = byteChannel; + return this; + } + + public ByteChannelMTSSource build() throws IOException { + Preconditions.checkNotNull(byteChannel, "byteChannel cannot be null"); + return new ByteChannelMTSSource(byteChannel); + } + } +} diff --git a/src/main/java/org/taktik/mpegts/sources/ByteSourceMTSSource.java b/src/main/java/org/taktik/mpegts/sources/ByteSourceMTSSource.java new file mode 100644 index 00000000..48ac4b2e --- /dev/null +++ b/src/main/java/org/taktik/mpegts/sources/ByteSourceMTSSource.java @@ -0,0 +1,75 @@ +package org.taktik.mpegts.sources; + +import com.google.common.base.Preconditions; +import com.google.common.io.ByteSource; +import org.taktik.mpegts.Constants; +import org.taktik.mpegts.MTSPacket; + +import java.io.InputStream; +import java.nio.ByteBuffer; + +public class ByteSourceMTSSource extends AbstractMTSSource implements ResettableMTSSource { + + private ByteSource byteSource; + + private InputStream stream; + + + private ByteSourceMTSSource(ByteSource byteSource) { + this.byteSource = byteSource; + } + + public static ByteSourceMTSSourceBuilder builder() { + return new ByteSourceMTSSourceBuilder(); + } + + @Override + public void reset() throws Exception { + if (stream != null) { + stream.close(); + } + stream = byteSource.openBufferedStream(); + } + + @Override + protected MTSPacket nextPacketInternal() throws Exception { + if (stream == null) { + stream = byteSource.openBufferedStream(); + } + + byte[] barray = new byte[Constants.MPEGTS_PACKET_SIZE]; + if (stream.read(barray) != Constants.MPEGTS_PACKET_SIZE) { + stream.close(); + return null; + } + + // Parse the packet + return new MTSPacket(ByteBuffer.wrap(barray)); + } + + @Override + protected void closeInternal() throws Exception { + if (stream != null) { + try (InputStream ignored = stream){ + //close + } + } + } + + public static class ByteSourceMTSSourceBuilder { + private ByteSource byteSource; + + private ByteSourceMTSSourceBuilder() { + } + + public ByteSourceMTSSource build() { + Preconditions.checkNotNull(byteSource); + return new ByteSourceMTSSource(byteSource); + } + + public ByteSourceMTSSourceBuilder setByteSource(ByteSource byteSource) { + this.byteSource = byteSource; + return this; + } + } +} diff --git a/src/main/java/org/taktik/mpegts/sources/ConcatenatingMTSSource.java b/src/main/java/org/taktik/mpegts/sources/ConcatenatingMTSSource.java new file mode 100644 index 00000000..5f5700a4 --- /dev/null +++ b/src/main/java/org/taktik/mpegts/sources/ConcatenatingMTSSource.java @@ -0,0 +1,194 @@ +package org.taktik.mpegts.sources; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.taktik.mpegts.MTSPacket; + +import java.util.Collection; +import java.util.List; + +public class ConcatenatingMTSSource extends AbstractMTSSource { + static final Logger log = LoggerFactory.getLogger("multisource"); + private List sources; + private MTSSource currentSource; + private int idx; + private boolean fixContinuity; + + private ContinuityFixer continuityFixer; + private int maxLoops; + private int currentLoop; + private boolean closeCurrentSource; + + protected ConcatenatingMTSSource(boolean fixContinuity, int maxloops, Collection sources) { + Preconditions.checkArgument(sources.size() > 0, "Must provide at least contain one source"); + Preconditions.checkArgument(maxloops != 0, "Cannot loop zero times"); + this.sources = Lists.newArrayList(sources); + this.fixContinuity = fixContinuity; + idx = 0; + currentSource = this.sources.get(0); + if (fixContinuity) { + continuityFixer = new ContinuityFixer(); + } + this.maxLoops = maxloops; + if (maxloops != 1) { + checkLoopingPossible(sources); + } + this.currentLoop = 1; + this.closeCurrentSource = false; + } + + public static MultiMTSSourceBuilder builder() { + return new MultiMTSSourceBuilder(); + } + + private void checkLoopingPossible(Collection sources) { + for (MTSSource source : sources) { + checkLoopingPossible(source); + } + } + + private void checkLoopingPossible(MTSSource source) { + if (!(source instanceof ResettableMTSSource)) { + throw new IllegalStateException("Sources must be resettable for looping"); + } + } + + @Override + protected MTSPacket nextPacketInternal() throws Exception { + if (currentSource == null) { + return null; + } + MTSPacket tsPacket = currentSource.nextPacket(); + if (tsPacket != null) { + if (fixContinuity) { + continuityFixer.fixContinuity(tsPacket); + } + return tsPacket; + } else { + // FIXME: infinite loop + nextSource(); + return nextPacket(); + } + } + + public synchronized void updateSources(List newSources) { + checkLoopingPossible(newSources); + List oldSources = this.sources; + this.sources = newSources; + for (MTSSource oldSource : oldSources) { + if (!newSources.contains(oldSource) && oldSource != currentSource) { + try { + oldSource.close(); + } catch (Exception e) { + log.error("Error closing source", e); + + } + } + } + closeCurrentSource = !newSources.contains(currentSource); + // Force next call to nextSource() to pick source 0 (first source) + idx = -1; + } + + @Override + protected synchronized void closeInternal() throws Exception { + for (MTSSource source : sources) { + source.close(); + } + if (closeCurrentSource && currentSource != null && !sources.contains(currentSource)) { + currentSource.close(); + } + } + + private synchronized void nextSource() { + if (closeCurrentSource) { + try { + currentSource.close(); + } catch (Exception e) { + log.error("Error closing source", e); + } finally { + closeCurrentSource = false; + } + } + if (fixContinuity) { + continuityFixer.nextSource(); + } + idx++; + if (idx >= sources.size()) { + currentLoop++; + if (maxLoops > 0 && currentLoop > maxLoops) { + currentSource = null; + } else { + idx = 0; + for (MTSSource source : sources) { + try { + ((ResettableMTSSource) source).reset(); + } catch (Exception e) { + e.printStackTrace(); + } + } + currentSource = sources.get(idx); + } + } else { + currentSource = sources.get(idx); + } + if (idx < sources.size()) { + log.info("Switched to source #{}", idx); + } + } + + public static class MultiMTSSourceBuilder { + boolean fixContinuity = false; + private List sources = Lists.newArrayList(); + private int maxLoops = 1; + + private MultiMTSSourceBuilder() { + } + + public MultiMTSSourceBuilder addSource(MTSSource source) { + this.sources.add(source); + return this; + } + + public MultiMTSSourceBuilder addSources(Collection sources) { + this.sources.addAll(sources); + return this; + } + + public MultiMTSSourceBuilder setSources(MTSSource... sources) { + this.sources = Lists.newArrayList(sources); + return this; + } + + public MultiMTSSourceBuilder setSources(Collection sources) { + this.sources = Lists.newArrayList(sources); + return this; + } + + public MultiMTSSourceBuilder setFixContinuity(boolean fixContinuity) { + this.fixContinuity = fixContinuity; + return this; + } + + public MultiMTSSourceBuilder loop() { + this.maxLoops = -1; + return this; + } + + public MultiMTSSourceBuilder loops(int count) { + this.maxLoops = count; + return this; + } + + public MultiMTSSourceBuilder noLoop() { + this.maxLoops = 1; + return this; + } + + public ConcatenatingMTSSource build() { + return new ConcatenatingMTSSource(fixContinuity, maxLoops, sources); + } + } +} diff --git a/src/main/java/org/taktik/mpegts/sources/ContinuityFixer.java b/src/main/java/org/taktik/mpegts/sources/ContinuityFixer.java new file mode 100644 index 00000000..5fd3a6ca --- /dev/null +++ b/src/main/java/org/taktik/mpegts/sources/ContinuityFixer.java @@ -0,0 +1,210 @@ +package org.taktik.mpegts.sources; + +import java.nio.ByteBuffer; +import java.util.Map; + +import org.taktik.mpegts.MTSPacket; + +import com.google.common.collect.Maps; + + +/** + * This class will attempt to fix timestamp discontinuities + * when switching from one source to another. + * This should allow for smoother transitions between videos.
+ * This class does 3 things: + *
    + *
  1. Rewrite the PCR to be continuous with the previous source
  2. + *
  3. Rewrite the PTS of the PES to be continuous with the previous source
  4. + *
  5. Rewrite the continuity counter to be continuous with the previous source
  6. + *
+ * + * Code using this class should call {@link #fixContinuity(org.taktik.mpegts.MTSPacket)} for each source packet, + * then {@link #nextSource()} after the last packet of the current source and before the first packet of the next source. + */ +public class ContinuityFixer { + private Map pcrPackets; + private Map allPackets; + private Map ptss; + private Map lastPTSsOfPreviousSource; + private Map lastPCRsOfPreviousSource; + private Map firstPCRsOfCurrentSource; + private Map firstPTSsOfCurrentSource; + + private Map lastPacketsOfPreviousSource = Maps.newHashMap(); + private Map firstPacketsOfCurrentSource = Maps.newHashMap(); + private Map continuityFixes = Maps.newHashMap(); + + private boolean firstSource; + + + public ContinuityFixer() { + pcrPackets = Maps.newHashMap(); + allPackets = Maps.newHashMap(); + ptss = Maps.newHashMap(); + lastPTSsOfPreviousSource = Maps.newHashMap(); + lastPCRsOfPreviousSource = Maps.newHashMap(); + firstPCRsOfCurrentSource = Maps.newHashMap(); + firstPTSsOfCurrentSource = Maps.newHashMap(); + + lastPacketsOfPreviousSource = Maps.newHashMap(); + firstPacketsOfCurrentSource = Maps.newHashMap(); + continuityFixes = Maps.newHashMap(); + firstSource = true; + } + + /** + * Signals the {@link org.taktik.mpegts.sources.ContinuityFixer} that the following + * packet will be from another source. + * + * Call this method after the last packet of the current source and before the first packet of the next source. + */ + public void nextSource() { + firstPCRsOfCurrentSource.clear(); + lastPCRsOfPreviousSource.clear(); + firstPTSsOfCurrentSource.clear(); + lastPTSsOfPreviousSource.clear(); + firstPacketsOfCurrentSource.clear(); + lastPacketsOfPreviousSource.clear(); + for (MTSPacket mtsPacket : pcrPackets.values()) { + lastPCRsOfPreviousSource.put(mtsPacket.getPid(), mtsPacket.getAdaptationField().getPcr().getValue()); + } + lastPTSsOfPreviousSource.putAll(ptss); + lastPacketsOfPreviousSource.putAll(allPackets); + pcrPackets.clear(); + ptss.clear(); + allPackets.clear(); + firstSource = false; + } + + /** + * Fix the continuity of the packet. + * + * Call this method for each source packet, in order. + * + * @param tsPacket The packet to fix. + */ + public void fixContinuity(MTSPacket tsPacket) { + int pid = tsPacket.getPid(); + allPackets.put(pid, tsPacket); + if (!firstPacketsOfCurrentSource.containsKey(pid)) { + firstPacketsOfCurrentSource.put(pid, tsPacket); + if (!firstSource) { + MTSPacket lastPacketOfPreviousSource = lastPacketsOfPreviousSource.get(pid); + int continuityFix = lastPacketOfPreviousSource == null ? 0 : lastPacketOfPreviousSource.getContinuityCounter() - tsPacket.getContinuityCounter(); + if (tsPacket.isContainsPayload()) { + continuityFix++; + } + continuityFixes.put(pid, continuityFix); + } + } + if (!firstSource) { + tsPacket.setContinuityCounter((tsPacket.getContinuityCounter() + continuityFixes.get(pid)) % 16); + } + fixPTS(tsPacket, pid); + fixPCR(tsPacket, pid); + } + + private void fixPCR(MTSPacket tsPacket, int pid) { + if (tsPacket.isAdaptationFieldExist() && tsPacket.getAdaptationField() != null) { + if (tsPacket.getAdaptationField().isPcrFlag()) { + if (!firstPCRsOfCurrentSource.containsKey(pid)) { + firstPCRsOfCurrentSource.put(pid, tsPacket.getAdaptationField().getPcr().getValue()); + } + rewritePCR(tsPacket); + pcrPackets.put(pid, tsPacket); + } + } + } + + private void fixPTS(MTSPacket tsPacket, int pid) { + if (tsPacket.isContainsPayload()) { + ByteBuffer payload = tsPacket.getPayload(); + //System.out.println("PKT RMN " + tsPacket.getPayload().remaining()); + if (/*payload.remaining() >= 3 &&*/ ((payload.get(0) & 0xff) == 0) && ((payload.get(1) & 0xff) == 0) && ((payload.get(2) & 0xff) == 1)) { + int extension = payload.getShort(6) & 0xffff; + if ((extension & 0x80) != 0) { + // PTS is present + // TODO add payload size check to avoid indexoutofboundexception + long pts = (((payload.get(9) & 0xE)) << 29) | (((payload.getShort(10) & 0xFFFE)) << 14) | ((payload.getShort(12) & 0xFFFE) >> 1); + if (!firstPTSsOfCurrentSource.containsKey(pid)) { + firstPTSsOfCurrentSource.put(pid, pts); + } + if (!firstSource) { + long newPts = Math.round(pts + (getTimeGap(pid) / 300.0) + 100 * ((27_000_000 / 300.0) / 1_000)); + + payload.put(9, (byte) (0x20 | ((newPts & 0x1C0000000l) >> 29) | 0x1)); + payload.putShort(10, (short) (0x1 | ((newPts & 0x3FFF8000) >> 14))); + payload.putShort(12, (short) (0x1 | ((newPts & 0x7FFF) << 1))); + payload.rewind(); + pts = newPts; + } + + ptss.put(pid, pts); + } + } + } + } + + private long getTimeGap(int pid) { + // Try with PCR of the same PID + Long lastPCROfPreviousSource = lastPCRsOfPreviousSource.get(pid); + if (lastPCROfPreviousSource == null) { + lastPCROfPreviousSource = 0l; + } + Long firstPCROfCurrentSource = firstPCRsOfCurrentSource.get(pid); + if (firstPCROfCurrentSource != null) { + return lastPCROfPreviousSource - firstPCROfCurrentSource; + } + + // Try with any PCR + if (!lastPCRsOfPreviousSource.isEmpty()) { + int pcrPid = lastPCRsOfPreviousSource.keySet().iterator().next(); + lastPCROfPreviousSource = lastPCRsOfPreviousSource.get(pcrPid); + if (lastPCROfPreviousSource == null) { + lastPCROfPreviousSource = 0l; + } + firstPCROfCurrentSource = firstPCRsOfCurrentSource.get(pcrPid); + if (firstPCROfCurrentSource != null) { + return lastPCROfPreviousSource - firstPCROfCurrentSource; + } + } + + // Try with PTS of the same PID + Long lastPTSOfPreviousSource = lastPTSsOfPreviousSource.get(pid); + if (lastPTSOfPreviousSource == null) { + lastPTSOfPreviousSource = 0l; + } + + Long firstPTSofCurrentSource = firstPTSsOfCurrentSource.get(pid); + if (firstPTSofCurrentSource != null) { + return (lastPTSOfPreviousSource - firstPTSofCurrentSource) * 300; + } + + // Try with any PTS + if (!lastPTSsOfPreviousSource.isEmpty()) { + int randomPid = lastPTSsOfPreviousSource.keySet().iterator().next(); + lastPTSOfPreviousSource = lastPTSsOfPreviousSource.get(randomPid); + if (lastPTSOfPreviousSource == null) { + lastPTSOfPreviousSource = 0l; + } + + firstPTSofCurrentSource = firstPTSsOfCurrentSource.get(randomPid); + if (firstPTSofCurrentSource != null) { + return (lastPTSOfPreviousSource - firstPTSofCurrentSource) * 300; + } + } + + return 0; + } + + private void rewritePCR(MTSPacket tsPacket) { + if (firstSource) { + return; + } + long timeGap = getTimeGap(tsPacket.getPid()); + long pcr = tsPacket.getAdaptationField().getPcr().getValue(); + long newPcr = pcr + timeGap + 100 * ((27_000_000) / 1_000); + tsPacket.getAdaptationField().getPcr().setValue(newPcr); + } +} diff --git a/src/main/java/org/taktik/mpegts/sources/ContinuityFixingMTSSource.java b/src/main/java/org/taktik/mpegts/sources/ContinuityFixingMTSSource.java new file mode 100644 index 00000000..21e0ac9b --- /dev/null +++ b/src/main/java/org/taktik/mpegts/sources/ContinuityFixingMTSSource.java @@ -0,0 +1,30 @@ +package org.taktik.mpegts.sources; + +import org.taktik.mpegts.MTSPacket; + +/** + * Decorates an MTSSource with continuity fixing. Not suitable for use with multiple different + * MTSSources as it will not reset counters when switching sources. + */ +public class ContinuityFixingMTSSource extends AbstractMTSSource { + private final ContinuityFixer continuityFixer = new ContinuityFixer(); + private final MTSSource source; + + public ContinuityFixingMTSSource(MTSSource source) { + this.source = source; + } + + @Override + protected MTSPacket nextPacketInternal() throws Exception { + MTSPacket packet = source.nextPacket(); + if (packet != null) { + continuityFixer.fixContinuity(packet); + } + return packet; + } + + @Override + protected void closeInternal() throws Exception { + source.close(); + } +} diff --git a/src/main/java/org/taktik/mpegts/sources/FixedBitrateMTSSource.java b/src/main/java/org/taktik/mpegts/sources/FixedBitrateMTSSource.java new file mode 100644 index 00000000..ba047be9 --- /dev/null +++ b/src/main/java/org/taktik/mpegts/sources/FixedBitrateMTSSource.java @@ -0,0 +1,30 @@ +package org.taktik.mpegts.sources; + +import org.taktik.mpegts.MTSPacket; + +/** + * Decorate a source with a declared bitrate. + */ +public class FixedBitrateMTSSource extends AbstractMTSSource { + private final MTSSource source; + private final long bitrate; + + public FixedBitrateMTSSource(MTSSource source, long bitrate) { + this.source = source; + this.bitrate = bitrate; + } + + public long getBitrate() { + return bitrate; + } + + @Override + protected MTSPacket nextPacketInternal() throws Exception { + return source.nextPacket(); + } + + @Override + protected void closeInternal() throws Exception { + source.close(); + } +} diff --git a/src/main/java/org/taktik/mpegts/sources/InputStreamMTSSource.java b/src/main/java/org/taktik/mpegts/sources/InputStreamMTSSource.java new file mode 100644 index 00000000..d66efe3c --- /dev/null +++ b/src/main/java/org/taktik/mpegts/sources/InputStreamMTSSource.java @@ -0,0 +1,63 @@ +package org.taktik.mpegts.sources; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import org.taktik.mpegts.Constants; +import org.taktik.mpegts.MTSPacket; + +import com.google.common.base.Preconditions; + +public class InputStreamMTSSource extends AbstractMTSSource { + + private InputStream inputStream; + + private InputStreamMTSSource(InputStream inputStream) throws IOException { + this.inputStream = inputStream; + } + + public static InputStreamMTSSourceBuilder builder() { + return new InputStreamMTSSourceBuilder(); + } + + @Override + protected MTSPacket nextPacketInternal() throws IOException { + byte[] packetData = new byte[Constants.MPEGTS_PACKET_SIZE]; + int bytesRead = 0; + while(bytesRead < Constants.MPEGTS_PACKET_SIZE) { + int bytesLeft = Constants.MPEGTS_PACKET_SIZE - bytesRead; + int length = inputStream.read(packetData, bytesRead, bytesLeft); + bytesRead += length; + if(length == -1) { + // no more bytes available + return null; + } + } + + // Parse the packet + return new MTSPacket(ByteBuffer.wrap(packetData)); + } + + @Override + protected void closeInternal() throws Exception { + inputStream.close(); + } + + public static class InputStreamMTSSourceBuilder { + private InputStream inputStream; + + private InputStreamMTSSourceBuilder() { + } + + public InputStreamMTSSourceBuilder setInputStream(InputStream inputStream) { + this.inputStream = inputStream; + return this; + } + + public InputStreamMTSSource build() throws IOException { + Preconditions.checkNotNull(inputStream, "InputStream cannot be null"); + return new InputStreamMTSSource(inputStream); + } + } +} diff --git a/src/main/java/org/taktik/mpegts/sources/LoopingMTSSource.java b/src/main/java/org/taktik/mpegts/sources/LoopingMTSSource.java new file mode 100644 index 00000000..6cc23a18 --- /dev/null +++ b/src/main/java/org/taktik/mpegts/sources/LoopingMTSSource.java @@ -0,0 +1,74 @@ +package org.taktik.mpegts.sources; + +import org.taktik.mpegts.MTSPacket; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +public class LoopingMTSSource extends AbstractMTSSource{ + private ResettableMTSSource source; + private Integer maxLoops; + private long currentLoop; + + public LoopingMTSSource(ResettableMTSSource source, Integer maxLoops) { + this.source = source; + this.maxLoops = maxLoops; + currentLoop = 1; + } + + public static LoopingMTSSourceBuilder builder() { + return new LoopingMTSSourceBuilder(); + } + + @Override + protected MTSPacket nextPacketInternal() throws Exception { + MTSPacket packet = source.nextPacket(); + if (packet == null) { + currentLoop++; + if (maxLoops == null || (currentLoop <= maxLoops)) { + source.reset(); + packet = source.nextPacket(); + } + } + return packet; + } + + @Override + protected void closeInternal() throws Exception { + source.close(); + } + + public static class LoopingMTSSourceBuilder { + private ResettableMTSSource source; + private boolean fixContinuity; + private Integer maxLoops; + + private LoopingMTSSourceBuilder() { + } + + public MTSSource build() { + checkNotNull(source); + checkArgument(maxLoops == null || maxLoops > 0); + MTSSource result = new LoopingMTSSource(source, maxLoops); + if (fixContinuity) { + return new ContinuityFixingMTSSource(result); + } + return result; + } + + public LoopingMTSSourceBuilder setSource(ResettableMTSSource source) { + this.source = source; + return this; + } + + public LoopingMTSSourceBuilder setFixContinuity(boolean fixContinuity) { + this.fixContinuity = fixContinuity; + return this; + } + + public LoopingMTSSourceBuilder setMaxLoops(Integer maxLoops) { + this.maxLoops = maxLoops; + return this; + } + } +} diff --git a/src/main/java/org/taktik/mpegts/sources/MTSSource.java b/src/main/java/org/taktik/mpegts/sources/MTSSource.java new file mode 100644 index 00000000..32c3057e --- /dev/null +++ b/src/main/java/org/taktik/mpegts/sources/MTSSource.java @@ -0,0 +1,7 @@ +package org.taktik.mpegts.sources; + +import org.taktik.mpegts.MTSPacket; + +public interface MTSSource extends AutoCloseable { + MTSPacket nextPacket() throws Exception; +} diff --git a/src/main/java/org/taktik/mpegts/sources/MTSSources.java b/src/main/java/org/taktik/mpegts/sources/MTSSources.java new file mode 100644 index 00000000..5713254c --- /dev/null +++ b/src/main/java/org/taktik/mpegts/sources/MTSSources.java @@ -0,0 +1,80 @@ +package org.taktik.mpegts.sources; + + +import com.google.common.io.ByteSource; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.ByteChannel; +import java.nio.channels.FileChannel; +import java.nio.channels.SeekableByteChannel; + +public class MTSSources { + public static MTSSource fromSources(MTSSource... sources) { + return fromSources(1, false, sources); + } + + public static MTSSource fromSources(int loops, MTSSource... sources) { + return fromSources(loops, false, sources); + } + + public static MTSSource fromSources(int loops, boolean fixContinuity, MTSSource... sources) { + return ConcatenatingMTSSource.builder() + .setFixContinuity(fixContinuity) + .setSources(sources) + .loops(loops) + .build(); + } + + public static MTSSource from(ByteChannel channel) throws IOException { + return ByteChannelMTSSource.builder() + .setByteChannel(channel) + .build(); + } + + public static ResettableMTSSource from(SeekableByteChannel channel) throws IOException { + return SeekableByteChannelMTSSource.builder() + .setByteChannel(channel) + .build(); + } + + public static ResettableMTSSource from(File file) throws IOException { + return SeekableByteChannelMTSSource.builder() + .setByteChannel(FileChannel.open(file.toPath())) + .build(); + } + + public static ResettableMTSSource from(ByteSource byteSource) throws IOException { + return ByteSourceMTSSource.builder() + .setByteSource(byteSource) + .build(); + } + + public static MTSSource from(InputStream inputStream) throws IOException { + return InputStreamMTSSource.builder() + .setInputStream(inputStream) + .build(); + } + + public static MTSSource loop(ResettableMTSSource source) { + return LoopingMTSSource.builder() + .setSource(source) + .build(); + } + + public static MTSSource loop(ResettableMTSSource source, int maxLoops) { + return LoopingMTSSource.builder() + .setSource(source) + .setMaxLoops(maxLoops) + .build(); + } + + public static MTSSource multiplexing(MTSSource... sources) { + return MultiplexingMTSSource.builder() + .setSources(sources) + .setFixContinuity(true) + .build(); + } + +} diff --git a/src/main/java/org/taktik/mpegts/sources/MultiplexingMTSSource.java b/src/main/java/org/taktik/mpegts/sources/MultiplexingMTSSource.java new file mode 100644 index 00000000..3952b17c --- /dev/null +++ b/src/main/java/org/taktik/mpegts/sources/MultiplexingMTSSource.java @@ -0,0 +1,100 @@ +package org.taktik.mpegts.sources; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.taktik.mpegts.MTSPacket; + +import java.util.Collection; +import java.util.List; + +/** + * An MTSSource using a simple round-robin packet multiplexing strategy. + */ +public class MultiplexingMTSSource extends AbstractMTSSource { + private final boolean fixContinuity; + private final List sources; + + private ContinuityFixer continuityFixer; + private int nextSource = 0; + + public MultiplexingMTSSource(boolean fixContinuity, Collection sources) { + Preconditions.checkArgument(sources.size() > 0, "Must provide at least contain one source"); + this.fixContinuity = fixContinuity; + this.sources = Lists.newArrayList(sources); + if (fixContinuity) { + continuityFixer = new ContinuityFixer(); + } + } + + public static MultiplexingMTSSourceBuilder builder() { + return new MultiplexingMTSSourceBuilder(); + } + + @Override + protected MTSPacket nextPacketInternal() throws Exception { + MTSPacket packet = sources.get(nextSource).nextPacket(); + if (packet != null) { + if (fixContinuity) { + continuityFixer.fixContinuity(packet); + } + return packet; + } else { + // FIXME: infinite loop + nextSource(); + return nextPacket(); + } + } + + @Override + protected synchronized void closeInternal() throws Exception { + for (MTSSource source : sources) { + source.close(); + } + } + + private synchronized void nextSource() { + nextSource++; + if (nextSource == sources.size()) { + nextSource = 0; + } + } + + public static class MultiplexingMTSSourceBuilder { + boolean fixContinuity = false; + private List sources = Lists.newArrayList(); + + public MultiplexingMTSSourceBuilder addSource(MTSSource source) { + sources.add(source); + return this; + } + + public MultiplexingMTSSourceBuilder addSources(MTSSource... sources) { + this.sources.addAll(Lists.newArrayList(sources)); + return this; + } + + public MultiplexingMTSSourceBuilder addSources(Collection sources) { + this.sources.addAll(sources); + return this; + } + + public MultiplexingMTSSourceBuilder setSources(MTSSource... sources) { + this.sources = Lists.newArrayList(sources); + return this; + } + + public MultiplexingMTSSourceBuilder setSources(Collection sources) { + this.sources = Lists.newArrayList(sources); + return this; + } + + public MultiplexingMTSSourceBuilder setFixContinuity(boolean fixContinuity) { + this.fixContinuity = fixContinuity; + return this; + } + + public MultiplexingMTSSource build() { + return new MultiplexingMTSSource(fixContinuity, sources); + } + } +} diff --git a/src/main/java/org/taktik/mpegts/sources/NullPacketSource.java b/src/main/java/org/taktik/mpegts/sources/NullPacketSource.java new file mode 100644 index 00000000..c873c9d9 --- /dev/null +++ b/src/main/java/org/taktik/mpegts/sources/NullPacketSource.java @@ -0,0 +1,34 @@ +package org.taktik.mpegts.sources; + +import org.taktik.mpegts.Constants; +import org.taktik.mpegts.MTSPacket; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +public class NullPacketSource extends AbstractMTSSource { + + public NullPacketSource() { + } + + @Override + protected MTSPacket nextPacketInternal() throws Exception { + byte[] buf = new byte[Constants.MPEGTS_PACKET_SIZE]; + + // payload (null bytes) + Arrays.fill(buf, (byte) 0xff); + + // header + buf[0] = 0x47; // sync byte + buf[1] = 0x1f; // PID high + buf[2] = (byte) 0xff; // PID low + buf[3] = 0x10; // adaptation control and continuity + + return new MTSPacket(ByteBuffer.wrap(buf)); + } + + @Override + protected void closeInternal() throws Exception { + // does nothing + } +} diff --git a/src/main/java/org/taktik/mpegts/sources/ResettableMTSSource.java b/src/main/java/org/taktik/mpegts/sources/ResettableMTSSource.java new file mode 100644 index 00000000..b94df9b4 --- /dev/null +++ b/src/main/java/org/taktik/mpegts/sources/ResettableMTSSource.java @@ -0,0 +1,5 @@ +package org.taktik.mpegts.sources; + +public interface ResettableMTSSource extends MTSSource { + void reset() throws Exception; +} diff --git a/src/main/java/org/taktik/mpegts/sources/SeekableByteChannelMTSSource.java b/src/main/java/org/taktik/mpegts/sources/SeekableByteChannelMTSSource.java new file mode 100644 index 00000000..256d1d6a --- /dev/null +++ b/src/main/java/org/taktik/mpegts/sources/SeekableByteChannelMTSSource.java @@ -0,0 +1,45 @@ +package org.taktik.mpegts.sources; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.taktik.ioutils.NIOUtils; +import org.taktik.mpegts.Constants; +import org.taktik.mpegts.MTSPacket; + +public class SeekableByteChannelMTSSource extends AbstractByteChannelMTSSource implements ResettableMTSSource { + + private SeekableByteChannelMTSSource(SeekableByteChannel byteChannel) throws IOException { + super(byteChannel); + } + + public static SeekableByteChannelMTSSourceBuilder builder() { + return new SeekableByteChannelMTSSourceBuilder(); + } + + @Override + public void reset() throws IOException { + byteChannel.position(0); + fillBuffer(); + } + + public static class SeekableByteChannelMTSSourceBuilder { + private SeekableByteChannel byteChannel; + + private SeekableByteChannelMTSSourceBuilder(){} + + public SeekableByteChannelMTSSourceBuilder setByteChannel(SeekableByteChannel byteChannel) { + this.byteChannel = byteChannel; + return this; + } + + public SeekableByteChannelMTSSource build() throws IOException { + Preconditions.checkNotNull(byteChannel, "byteChannel cannot be null"); + return new SeekableByteChannelMTSSource(byteChannel); + } + } +} diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index be906a27..6c731cec 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -44,5 +44,6 @@ +