From 528d7336e187432ab95ac7771cd1b49bc2d00e9b Mon Sep 17 00:00:00 2001 From: 0xboobface <0xboobface@gmail.com> Date: Sat, 8 Sep 2018 21:53:39 +0200 Subject: [PATCH] Simplify integration of mpegts-streamer Get rid of unneded thread complexity. Use Queue.take() instead of poll() to avoid unnecessary looping and the CPU load it causes. --- .../recorder/download/MergedHlsDownload.java | 132 ++++++++---------- src/main/java/org/taktik/mpegts/Streamer.java | 24 ++-- .../sources/BlockingMultiMTSSource.java | 39 ++++-- 3 files changed, 103 insertions(+), 92 deletions(-) diff --git a/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java b/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java index 02130ce6..2b6f5b84 100644 --- a/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java +++ b/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java @@ -3,6 +3,7 @@ package ctbrec.recorder.download; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.WRITE; +import java.io.ByteArrayInputStream; import java.io.EOFException; import java.io.File; import java.io.IOException; @@ -15,12 +16,10 @@ import java.nio.file.LinkOption; import java.nio.file.Path; import java.text.SimpleDateFormat; import java.util.Date; +import java.util.LinkedList; import java.util.Objects; -import java.util.concurrent.BlockingQueue; +import java.util.Queue; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,10 +45,8 @@ import okhttp3.Response; public class MergedHlsDownload extends AbstractHlsDownload { private static final transient Logger LOG = LoggerFactory.getLogger(MergedHlsDownload.class); - private BlockingQueue> mergeQueue = new LinkedBlockingQueue<>(); private BlockingMultiMTSSource multiSource; private Thread mergeThread; - private Thread handoverThread; private Streamer streamer; public MergedHlsDownload(HttpClient client) { @@ -59,21 +56,13 @@ public class MergedHlsDownload extends AbstractHlsDownload { public void start(String segmentPlaylistUri, File target, ProgressListener progressListener) throws IOException { try { running = true; - mergeThread = createMergeThread(target, progressListener); + mergeThread = createMergeThread(target, progressListener, false); mergeThread.start(); - handoverThread = createHandoverThread(); - handoverThread.start(); - downloadSegments(segmentPlaylistUri, false); } catch(ParseException e) { throw new IOException("Couldn't parse stream information", e); } catch(PlaylistException e) { throw new IOException("Couldn't parse HLS playlist", e); - } catch(EOFException e) { - // end of playlist reached - LOG.debug("Reached end of playlist"); - } catch(Exception e) { - throw new IOException("Couldn't download segment", e); } finally { alive = false; LOG.debug("Download for terminated"); @@ -98,10 +87,8 @@ public class MergedHlsDownload extends AbstractHlsDownload { } File targetFile = Recording.mergedFileFromDirectory(downloadDir.toFile()); - mergeThread = createMergeThread(targetFile, null); + mergeThread = createMergeThread(targetFile, null, true); mergeThread.start(); - handoverThread = createHandoverThread(); - handoverThread.start(); String segments = parseMaster(streamInfo.url, model.getStreamUrlIndex()); if(segments != null) { @@ -127,6 +114,7 @@ public class MergedHlsDownload extends AbstractHlsDownload { private void downloadSegments(String segmentPlaylistUri, boolean livestreamDownload) throws IOException, ParseException, PlaylistException { int lastSegment = 0; int nextSegment = 0; + Queue mergeQueue = new LinkedList<>(); while(running) { SegmentPlaylist lsp = getNextSegments(segmentPlaylistUri); if(!livestreamDownload) { @@ -139,8 +127,13 @@ public class MergedHlsDownload extends AbstractHlsDownload { for (int i = nextSegment; i < lsp.seq; i++) { URL segmentUrl = new URL(first.replaceAll(Integer.toString(seq), Integer.toString(i))); LOG.debug("Loading missed segment {} for model {}", i, segmentPlaylistUri); - Future downloadFuture = downloadThreadPool.submit(new SegmentDownload(segmentUrl, client)); - mergeQueue.add(downloadFuture); + byte[] segmentData; + try { + segmentData = new SegmentDownload(segmentUrl, client).call(); + mergeQueue.add(segmentData); + } catch (Exception e) { + LOG.error("Error while downloading segment {}", segmentUrl, e); + } } // TODO switch to a lower bitrate/resolution ?!? } @@ -150,27 +143,48 @@ public class MergedHlsDownload extends AbstractHlsDownload { skip--; } else { URL segmentUrl = new URL(segment); - Future downloadFuture = downloadThreadPool.submit(new SegmentDownload(segmentUrl, client)); - mergeQueue.add(downloadFuture); + try { + byte[] segmentData = new SegmentDownload(segmentUrl, client).call(); + if(livestreamDownload) { + mergeQueue.add(segmentData); + } else { + writeSegment(segmentData); + } + } catch (Exception e) { + LOG.error("Error while downloading segment {}", segmentUrl, e); + } } } if(livestreamDownload) { + while(!mergeQueue.isEmpty()) { + try { + LOG.debug("Writing segment"); + writeSegment(mergeQueue.poll()); + } catch (InterruptedException e) { + if(running) { + LOG.error("Interrupted while waiting to add a new segment to multi source", e); + } + } + } + } + + if (livestreamDownload) { long wait = 0; - if(lastSegment == lsp.seq) { + if (lastSegment == lsp.seq) { // playlist didn't change -> wait for at least half the target duration wait = (long) lsp.targetDuration * 1000 / 2; LOG.trace("Playlist didn't change... waiting for {}ms", wait); } else { // playlist did change -> wait for at least last segment duration - wait = 1;//(long) lsp.lastSegDuration * 1000; + wait = 1;// (long) lsp.lastSegDuration * 1000; LOG.trace("Playlist changed... waiting for {}ms", wait); } try { Thread.sleep(wait); } catch (InterruptedException e) { - if(running) { + if (running) { LOG.error("Couldn't sleep between segment downloads. This might mess up the download!"); } } @@ -185,33 +199,20 @@ public class MergedHlsDownload extends AbstractHlsDownload { } } - private Thread createHandoverThread() { - Thread t = new Thread() { - @Override - public void run() { - while(running) { - try { - Future downloadFuture = mergeQueue.take(); - InputStream tsData = downloadFuture.get(); - InputStreamMTSSource source = InputStreamMTSSource.builder().setInputStream(tsData).build(); - multiSource.addSource(source); - } catch (InterruptedException e) { - if(running) { - LOG.error("Interrupted while waiting for a download future", e); - } - } catch (ExecutionException e) { - LOG.error("Error while opening segment stream", e); - } - } - LOG.debug("Handover Thread finished"); - }; - }; - t.setName("Segment Handover Thread"); - t.setDaemon(true); - return t; + private void writeSegment(byte[] segmentData) throws InterruptedException { + InputStream in = new ByteArrayInputStream(segmentData); + InputStreamMTSSource source = InputStreamMTSSource.builder().setInputStream(in).build(); + multiSource.addSource(source); } - private Thread createMergeThread(File targetFile, ProgressListener listener) { + @Override + public void stop() { + running = false; + alive = false; + LOG.debug("Download stopped"); + } + + private Thread createMergeThread(File targetFile, ProgressListener listener, boolean liveStream) { Thread t = new Thread(() -> { multiSource = BlockingMultiMTSSource.builder() .setFixContinuity(true) @@ -226,7 +227,7 @@ public class MergedHlsDownload extends AbstractHlsDownload { streamer = Streamer.builder() .setSource(multiSource) .setSink(sink) - .setSleepingEnabled(false) + .setSleepingEnabled(liveStream) .setBufferSize(10) .build(); @@ -252,20 +253,7 @@ public class MergedHlsDownload extends AbstractHlsDownload { return t; } - @Override - public void stop() { - running = false; - alive = false; - LOG.debug("Stopping streamer"); - streamer.stop(); - LOG.debug("Sending interrupt to merger"); - mergeThread.interrupt(); - LOG.debug("Sending interrupt to handover thread"); - handoverThread.interrupt(); - LOG.debug("Download stopped"); - } - - private static class SegmentDownload implements Callable { + private static class SegmentDownload implements Callable { private URL url; private HttpClient client; @@ -275,24 +263,22 @@ public class MergedHlsDownload extends AbstractHlsDownload { } @Override - public InputStream call() throws Exception { + public byte[] call() throws Exception { LOG.trace("Downloading segment " + url.getFile()); int maxTries = 3; for (int i = 1; i <= maxTries; i++) { - Request request = new Request.Builder().url(url).addHeader("connection", "keep-alive").build(); - Response response = client.execute(request); try { - InputStream in = response.body().byteStream(); - return in; + Request request = new Request.Builder().url(url).addHeader("connection", "keep-alive").build(); + Response response = client.execute(request); + byte[] segment = response.body().bytes(); + return segment; } catch(Exception e) { if (i == maxTries) { LOG.warn("Error while downloading segment. Segment {} finally failed", url.getFile()); } else { LOG.warn("Error while downloading segment {} on try {}", url.getFile(), i); } - } /*finally { - response.close(); - }*/ + } } throw new IOException("Unable to download segment " + url.getFile() + " after " + maxTries + " tries"); } diff --git a/src/main/java/org/taktik/mpegts/Streamer.java b/src/main/java/org/taktik/mpegts/Streamer.java index 82d81ef0..c5db16ec 100644 --- a/src/main/java/org/taktik/mpegts/Streamer.java +++ b/src/main/java/org/taktik/mpegts/Streamer.java @@ -83,7 +83,7 @@ public class Streamer { private void internalStream() { boolean resetState = false; - MTSPacket packet; + MTSPacket packet = null; long packetCount = 0; //long pcrPidPacketCount = 0; Long firstPcrValue = null; @@ -106,17 +106,21 @@ public class Streamer { // Initialize time to sleep long sleepNanos = 0; - packet = buffer.poll(); - - if (packet == null) { - if (endOfSourceReached) { - packet = buffer.poll(); - if (packet == null) { - break; + try { + packet = buffer.take(); + if (packet == null) { + if (endOfSourceReached) { + packet = buffer.take(); + if (packet == null) { + break; + } + } else { + continue; } - } else { - continue; } + } catch (InterruptedException e1) { + log.error("INterrupted while eaiting for packet"); + continue; } int pid = packet.getPid(); diff --git a/src/main/java/org/taktik/mpegts/sources/BlockingMultiMTSSource.java b/src/main/java/org/taktik/mpegts/sources/BlockingMultiMTSSource.java index 1ca82c03..69360a2b 100644 --- a/src/main/java/org/taktik/mpegts/sources/BlockingMultiMTSSource.java +++ b/src/main/java/org/taktik/mpegts/sources/BlockingMultiMTSSource.java @@ -21,6 +21,7 @@ public class BlockingMultiMTSSource extends AbstractMTSSource implements AutoClo private int downloadedSegments = 0; private int totalSegments = -1; private ProgressListener listener; + private int lastProgress = 0; private BlockingMultiMTSSource(boolean fixContinuity) { this.fixContinuity = fixContinuity; @@ -41,30 +42,50 @@ public class BlockingMultiMTSSource extends AbstractMTSSource implements AutoClo } MTSPacket packet = currentSource.nextPacket(); + packet = switchSourceIfNeeded(packet); + + if (fixContinuity) { + continuityFixer.fixContinuity(packet); + } + return packet; + } + + private MTSPacket switchSourceIfNeeded(MTSPacket packet) throws Exception { if(packet == null) { // end of source has been reached, switch to the next source - currentSource.close(); + closeCurrentSource(); + downloadedSegments++; if(listener != null && totalSegments > 0) { int progress = (int)(downloadedSegments * 100.0 / totalSegments); - listener.update(progress); + if(progress > lastProgress) { + listener.update(progress); + lastProgress = progress; + } } if(downloadedSegments == totalSegments) { LOG.debug("All segments written. Queue size {}", sources.size()); return null; } - currentSource = sources.take(); - packet = currentSource.nextPacket(); - // } - } - - if (fixContinuity) { - continuityFixer.fixContinuity(packet); + return firstPacketFromNextSource(); } return packet; } + private MTSPacket firstPacketFromNextSource() throws Exception { + switchSource(); + return currentSource.nextPacket(); + } + + private void switchSource() throws InterruptedException { + currentSource = sources.take(); + } + + private void closeCurrentSource() throws Exception { + currentSource.close(); + } + private void setProgressListener(ProgressListener listener) { this.listener = listener; }