Simplify integration of mpegts-streamer

Get rid of unneded thread complexity.
Use Queue.take() instead of poll() to avoid unnecessary looping and
the CPU load it causes.
This commit is contained in:
0xboobface 2018-09-08 21:53:39 +02:00
parent 7efbeac565
commit 528d7336e1
3 changed files with 103 additions and 92 deletions

View File

@ -3,6 +3,7 @@ package ctbrec.recorder.download;
import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.WRITE; import static java.nio.file.StandardOpenOption.WRITE;
import java.io.ByteArrayInputStream;
import java.io.EOFException; import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -15,12 +16,10 @@ import java.nio.file.LinkOption;
import java.nio.file.Path; import java.nio.file.Path;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Date; import java.util.Date;
import java.util.LinkedList;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.BlockingQueue; import java.util.Queue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -46,10 +45,8 @@ import okhttp3.Response;
public class MergedHlsDownload extends AbstractHlsDownload { public class MergedHlsDownload extends AbstractHlsDownload {
private static final transient Logger LOG = LoggerFactory.getLogger(MergedHlsDownload.class); private static final transient Logger LOG = LoggerFactory.getLogger(MergedHlsDownload.class);
private BlockingQueue<Future<InputStream>> mergeQueue = new LinkedBlockingQueue<>();
private BlockingMultiMTSSource multiSource; private BlockingMultiMTSSource multiSource;
private Thread mergeThread; private Thread mergeThread;
private Thread handoverThread;
private Streamer streamer; private Streamer streamer;
public MergedHlsDownload(HttpClient client) { public MergedHlsDownload(HttpClient client) {
@ -59,21 +56,13 @@ public class MergedHlsDownload extends AbstractHlsDownload {
public void start(String segmentPlaylistUri, File target, ProgressListener progressListener) throws IOException { public void start(String segmentPlaylistUri, File target, ProgressListener progressListener) throws IOException {
try { try {
running = true; running = true;
mergeThread = createMergeThread(target, progressListener); mergeThread = createMergeThread(target, progressListener, false);
mergeThread.start(); mergeThread.start();
handoverThread = createHandoverThread();
handoverThread.start();
downloadSegments(segmentPlaylistUri, false); downloadSegments(segmentPlaylistUri, false);
} catch(ParseException e) { } catch(ParseException e) {
throw new IOException("Couldn't parse stream information", e); throw new IOException("Couldn't parse stream information", e);
} catch(PlaylistException e) { } catch(PlaylistException e) {
throw new IOException("Couldn't parse HLS playlist", e); throw new IOException("Couldn't parse HLS playlist", e);
} catch(EOFException e) {
// end of playlist reached
LOG.debug("Reached end of playlist");
} catch(Exception e) {
throw new IOException("Couldn't download segment", e);
} finally { } finally {
alive = false; alive = false;
LOG.debug("Download for terminated"); LOG.debug("Download for terminated");
@ -98,10 +87,8 @@ public class MergedHlsDownload extends AbstractHlsDownload {
} }
File targetFile = Recording.mergedFileFromDirectory(downloadDir.toFile()); File targetFile = Recording.mergedFileFromDirectory(downloadDir.toFile());
mergeThread = createMergeThread(targetFile, null); mergeThread = createMergeThread(targetFile, null, true);
mergeThread.start(); mergeThread.start();
handoverThread = createHandoverThread();
handoverThread.start();
String segments = parseMaster(streamInfo.url, model.getStreamUrlIndex()); String segments = parseMaster(streamInfo.url, model.getStreamUrlIndex());
if(segments != null) { if(segments != null) {
@ -127,6 +114,7 @@ public class MergedHlsDownload extends AbstractHlsDownload {
private void downloadSegments(String segmentPlaylistUri, boolean livestreamDownload) throws IOException, ParseException, PlaylistException { private void downloadSegments(String segmentPlaylistUri, boolean livestreamDownload) throws IOException, ParseException, PlaylistException {
int lastSegment = 0; int lastSegment = 0;
int nextSegment = 0; int nextSegment = 0;
Queue<byte[]> mergeQueue = new LinkedList<>();
while(running) { while(running) {
SegmentPlaylist lsp = getNextSegments(segmentPlaylistUri); SegmentPlaylist lsp = getNextSegments(segmentPlaylistUri);
if(!livestreamDownload) { if(!livestreamDownload) {
@ -139,8 +127,13 @@ public class MergedHlsDownload extends AbstractHlsDownload {
for (int i = nextSegment; i < lsp.seq; i++) { for (int i = nextSegment; i < lsp.seq; i++) {
URL segmentUrl = new URL(first.replaceAll(Integer.toString(seq), Integer.toString(i))); URL segmentUrl = new URL(first.replaceAll(Integer.toString(seq), Integer.toString(i)));
LOG.debug("Loading missed segment {} for model {}", i, segmentPlaylistUri); LOG.debug("Loading missed segment {} for model {}", i, segmentPlaylistUri);
Future<InputStream> downloadFuture = downloadThreadPool.submit(new SegmentDownload(segmentUrl, client)); byte[] segmentData;
mergeQueue.add(downloadFuture); try {
segmentData = new SegmentDownload(segmentUrl, client).call();
mergeQueue.add(segmentData);
} catch (Exception e) {
LOG.error("Error while downloading segment {}", segmentUrl, e);
}
} }
// TODO switch to a lower bitrate/resolution ?!? // TODO switch to a lower bitrate/resolution ?!?
} }
@ -150,8 +143,29 @@ public class MergedHlsDownload extends AbstractHlsDownload {
skip--; skip--;
} else { } else {
URL segmentUrl = new URL(segment); URL segmentUrl = new URL(segment);
Future<InputStream> downloadFuture = downloadThreadPool.submit(new SegmentDownload(segmentUrl, client)); try {
mergeQueue.add(downloadFuture); byte[] segmentData = new SegmentDownload(segmentUrl, client).call();
if(livestreamDownload) {
mergeQueue.add(segmentData);
} else {
writeSegment(segmentData);
}
} catch (Exception e) {
LOG.error("Error while downloading segment {}", segmentUrl, e);
}
}
}
if(livestreamDownload) {
while(!mergeQueue.isEmpty()) {
try {
LOG.debug("Writing segment");
writeSegment(mergeQueue.poll());
} catch (InterruptedException e) {
if(running) {
LOG.error("Interrupted while waiting to add a new segment to multi source", e);
}
}
} }
} }
@ -185,33 +199,20 @@ public class MergedHlsDownload extends AbstractHlsDownload {
} }
} }
private Thread createHandoverThread() { private void writeSegment(byte[] segmentData) throws InterruptedException {
Thread t = new Thread() { InputStream in = new ByteArrayInputStream(segmentData);
@Override InputStreamMTSSource source = InputStreamMTSSource.builder().setInputStream(in).build();
public void run() {
while(running) {
try {
Future<InputStream> downloadFuture = mergeQueue.take();
InputStream tsData = downloadFuture.get();
InputStreamMTSSource source = InputStreamMTSSource.builder().setInputStream(tsData).build();
multiSource.addSource(source); multiSource.addSource(source);
} catch (InterruptedException e) {
if(running) {
LOG.error("Interrupted while waiting for a download future", e);
}
} catch (ExecutionException e) {
LOG.error("Error while opening segment stream", e);
}
}
LOG.debug("Handover Thread finished");
};
};
t.setName("Segment Handover Thread");
t.setDaemon(true);
return t;
} }
private Thread createMergeThread(File targetFile, ProgressListener listener) { @Override
public void stop() {
running = false;
alive = false;
LOG.debug("Download stopped");
}
private Thread createMergeThread(File targetFile, ProgressListener listener, boolean liveStream) {
Thread t = new Thread(() -> { Thread t = new Thread(() -> {
multiSource = BlockingMultiMTSSource.builder() multiSource = BlockingMultiMTSSource.builder()
.setFixContinuity(true) .setFixContinuity(true)
@ -226,7 +227,7 @@ public class MergedHlsDownload extends AbstractHlsDownload {
streamer = Streamer.builder() streamer = Streamer.builder()
.setSource(multiSource) .setSource(multiSource)
.setSink(sink) .setSink(sink)
.setSleepingEnabled(false) .setSleepingEnabled(liveStream)
.setBufferSize(10) .setBufferSize(10)
.build(); .build();
@ -252,20 +253,7 @@ public class MergedHlsDownload extends AbstractHlsDownload {
return t; return t;
} }
@Override private static class SegmentDownload implements Callable<byte[]> {
public void stop() {
running = false;
alive = false;
LOG.debug("Stopping streamer");
streamer.stop();
LOG.debug("Sending interrupt to merger");
mergeThread.interrupt();
LOG.debug("Sending interrupt to handover thread");
handoverThread.interrupt();
LOG.debug("Download stopped");
}
private static class SegmentDownload implements Callable<InputStream> {
private URL url; private URL url;
private HttpClient client; private HttpClient client;
@ -275,24 +263,22 @@ public class MergedHlsDownload extends AbstractHlsDownload {
} }
@Override @Override
public InputStream call() throws Exception { public byte[] call() throws Exception {
LOG.trace("Downloading segment " + url.getFile()); LOG.trace("Downloading segment " + url.getFile());
int maxTries = 3; int maxTries = 3;
for (int i = 1; i <= maxTries; i++) { for (int i = 1; i <= maxTries; i++) {
try {
Request request = new Request.Builder().url(url).addHeader("connection", "keep-alive").build(); Request request = new Request.Builder().url(url).addHeader("connection", "keep-alive").build();
Response response = client.execute(request); Response response = client.execute(request);
try { byte[] segment = response.body().bytes();
InputStream in = response.body().byteStream(); return segment;
return in;
} catch(Exception e) { } catch(Exception e) {
if (i == maxTries) { if (i == maxTries) {
LOG.warn("Error while downloading segment. Segment {} finally failed", url.getFile()); LOG.warn("Error while downloading segment. Segment {} finally failed", url.getFile());
} else { } else {
LOG.warn("Error while downloading segment {} on try {}", url.getFile(), i); LOG.warn("Error while downloading segment {} on try {}", url.getFile(), i);
} }
} /*finally { }
response.close();
}*/
} }
throw new IOException("Unable to download segment " + url.getFile() + " after " + maxTries + " tries"); throw new IOException("Unable to download segment " + url.getFile() + " after " + maxTries + " tries");
} }

View File

@ -83,7 +83,7 @@ public class Streamer {
private void internalStream() { private void internalStream() {
boolean resetState = false; boolean resetState = false;
MTSPacket packet; MTSPacket packet = null;
long packetCount = 0; long packetCount = 0;
//long pcrPidPacketCount = 0; //long pcrPidPacketCount = 0;
Long firstPcrValue = null; Long firstPcrValue = null;
@ -106,11 +106,11 @@ public class Streamer {
// Initialize time to sleep // Initialize time to sleep
long sleepNanos = 0; long sleepNanos = 0;
packet = buffer.poll(); try {
packet = buffer.take();
if (packet == null) { if (packet == null) {
if (endOfSourceReached) { if (endOfSourceReached) {
packet = buffer.poll(); packet = buffer.take();
if (packet == null) { if (packet == null) {
break; break;
} }
@ -118,6 +118,10 @@ public class Streamer {
continue; continue;
} }
} }
} catch (InterruptedException e1) {
log.error("INterrupted while eaiting for packet");
continue;
}
int pid = packet.getPid(); int pid = packet.getPid();

View File

@ -21,6 +21,7 @@ public class BlockingMultiMTSSource extends AbstractMTSSource implements AutoClo
private int downloadedSegments = 0; private int downloadedSegments = 0;
private int totalSegments = -1; private int totalSegments = -1;
private ProgressListener listener; private ProgressListener listener;
private int lastProgress = 0;
private BlockingMultiMTSSource(boolean fixContinuity) { private BlockingMultiMTSSource(boolean fixContinuity) {
this.fixContinuity = fixContinuity; this.fixContinuity = fixContinuity;
@ -41,30 +42,50 @@ public class BlockingMultiMTSSource extends AbstractMTSSource implements AutoClo
} }
MTSPacket packet = currentSource.nextPacket(); MTSPacket packet = currentSource.nextPacket();
packet = switchSourceIfNeeded(packet);
if (fixContinuity) {
continuityFixer.fixContinuity(packet);
}
return packet;
}
private MTSPacket switchSourceIfNeeded(MTSPacket packet) throws Exception {
if(packet == null) { if(packet == null) {
// end of source has been reached, switch to the next source // end of source has been reached, switch to the next source
currentSource.close(); closeCurrentSource();
downloadedSegments++; downloadedSegments++;
if(listener != null && totalSegments > 0) { if(listener != null && totalSegments > 0) {
int progress = (int)(downloadedSegments * 100.0 / totalSegments); int progress = (int)(downloadedSegments * 100.0 / totalSegments);
if(progress > lastProgress) {
listener.update(progress); listener.update(progress);
lastProgress = progress;
}
} }
if(downloadedSegments == totalSegments) { if(downloadedSegments == totalSegments) {
LOG.debug("All segments written. Queue size {}", sources.size()); LOG.debug("All segments written. Queue size {}", sources.size());
return null; return null;
} }
currentSource = sources.take(); return firstPacketFromNextSource();
packet = currentSource.nextPacket();
// }
}
if (fixContinuity) {
continuityFixer.fixContinuity(packet);
} }
return packet; return packet;
} }
private MTSPacket firstPacketFromNextSource() throws Exception {
switchSource();
return currentSource.nextPacket();
}
private void switchSource() throws InterruptedException {
currentSource = sources.take();
}
private void closeCurrentSource() throws Exception {
currentSource.close();
}
private void setProgressListener(ProgressListener listener) { private void setProgressListener(ProgressListener listener) {
this.listener = listener; this.listener = listener;
} }