From c4c8fe83fa44b9ccdb3506fdf3f1faeab15ee22a Mon Sep 17 00:00:00 2001 From: 0xboobface <0xboobface@gmail.com> Date: Wed, 28 Nov 2018 15:22:42 +0100 Subject: [PATCH] Improve MergedHlsDownload * Add better exception handling * Check, if the model is still online, when an error occurs * Download segments in parallel, so that less segments are missed --- .../recorder/download/MergedHlsDownload.java | 212 ++++++++++++------ .../download/MissingSegmentException.java | 11 + .../main/java/org/taktik/mpegts/Streamer.java | 31 ++- 3 files changed, 179 insertions(+), 75 deletions(-) create mode 100644 common/src/main/java/ctbrec/recorder/download/MissingSegmentException.java diff --git a/common/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java b/common/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java index 84c4c7d6..d4a935e3 100644 --- a/common/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java @@ -19,7 +19,16 @@ import java.text.DecimalFormat; import java.time.Duration; import java.time.Instant; import java.time.ZonedDateTime; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +62,8 @@ public class MergedHlsDownload extends AbstractHlsDownload { private File targetFile; private DecimalFormat df = new DecimalFormat("00000"); private int splitCounter = 0; + private BlockingQueue downloadQueue = new LinkedBlockingQueue<>(50); + private ExecutorService downloadThreadPool = new ThreadPoolExecutor(5, 5, 2, TimeUnit.MINUTES, downloadQueue); public MergedHlsDownload(HttpClient client) { super(client); @@ -81,7 +92,7 @@ public class MergedHlsDownload extends AbstractHlsDownload { downloadSegments(segmentPlaylistUri, false); LOG.debug("Waiting for merge thread to finish"); mergeThread.join(); - LOG.debug("Merge thread to finished"); + LOG.debug("Merge thread finished"); } catch(ParseException e) { throw new IOException("Couldn't parse stream information", e); } catch(PlaylistException e) { @@ -92,7 +103,12 @@ public class MergedHlsDownload extends AbstractHlsDownload { throw new IOException("Couldn't add HMAC to playlist url", e); } finally { alive = false; - streamer.stop(); + try { + streamer.stop(); + } catch(Exception e) { + LOG.error("Couldn't stop streamer", e); + } + downloadThreadPool.shutdown(); LOG.debug("Download terminated for {}", segmentPlaylistUri); } } @@ -129,7 +145,11 @@ public class MergedHlsDownload extends AbstractHlsDownload { } finally { alive = false; if(streamer != null) { - streamer.stop(); + try { + streamer.stop(); + } catch(Exception e) { + LOG.error("Couldn't stop streamer", e); + } } LOG.debug("Download for {} terminated", model); } @@ -138,36 +158,109 @@ public class MergedHlsDownload extends AbstractHlsDownload { private void downloadSegments(String segmentPlaylistUri, boolean livestreamDownload) throws IOException, ParseException, PlaylistException { int lastSegment = 0; int nextSegment = 0; + long playlistNotFoundFirstEncounter = -1; while(running) { try { + if(playlistNotFoundFirstEncounter != -1) { + LOG.debug("Downloading playlist {}", segmentPlaylistUri); + } SegmentPlaylist lsp = getNextSegments(segmentPlaylistUri); + playlistNotFoundFirstEncounter = -1; if(!livestreamDownload) { multiSource.setTotalSegments(lsp.segments.size()); } - // download segments, which might have been skipped - downloadMissedSegments(lsp, nextSegment); - // download new segments + long downloadStart = System.currentTimeMillis(); downloadNewSegments(lsp, nextSegment); + long downloadTookMillis = System.currentTimeMillis() - downloadStart; + + // download segments, which might have been skipped + //downloadMissedSegments(lsp, nextSegment); + if(nextSegment > 0 && lsp.seq > nextSegment) { + LOG.warn("Missed segments {} < {} in download for {}. Download took {}ms. Playlist is {}sec", nextSegment, lsp.seq, lsp.url, downloadTookMillis, lsp.totalDuration); + } if(livestreamDownload) { // split up the recording, if configured splitRecording(); // wait some time until requesting the segment playlist again to not hammer the server - waitForNewSegments(lsp, lastSegment); + waitForNewSegments(lsp, lastSegment, downloadTookMillis); lastSegment = lsp.seq; nextSegment = lastSegment + lsp.segments.size(); } else { break; } - } catch(HttpException e) { - if(e.getResponseCode() == 404) { - // playlist is gone -> model probably logged out - LOG.debug("Playlist not found. Assuming model went offline"); - running = false; + } catch(Exception e) { + LOG.info("Unexpected error while downloading ", model.getName()); + running = false; + } + } + } + + private void downloadNewSegments(SegmentPlaylist lsp, int nextSegment) throws MalformedURLException, MissingSegmentException, ExecutionException, HttpException { + int skip = nextSegment - lsp.seq; + if(lsp.segments.isEmpty()) { + LOG.debug("Empty playlist: {}", lsp.url); + } + + // add segments to download threadpool + Queue> downloads = new LinkedList<>(); + if(downloadQueue.remainingCapacity() == 0) { + LOG.warn("Download to slow for this stream. Download queue is full. Skipping segment"); + } else { + for (String segment : lsp.segments) { + if(!running) { + break; + } + if(skip > 0) { + skip--; + } else { + URL segmentUrl = new URL(segment); + Future download = downloadThreadPool.submit(new SegmentDownload(segmentUrl, client)); + downloads.add(download); + } + } + } + + // get completed downloads and write them to the file + writeFinishedSegments(downloads); + } + + private void writeFinishedSegments(Queue> downloads) throws ExecutionException, HttpException { + for (Future downloadFuture : downloads) { + try { + byte[] segmentData = downloadFuture.get(); + writeSegment(segmentData); + } catch (InterruptedException e) { + LOG.error("Error while downloading segment", e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if(cause instanceof MissingSegmentException) { + if(model != null && !isModelOnline()) { + LOG.debug("Error while downloading segment, because model {} is offline. Stopping now", model.getName()); + running = false; + } else { + LOG.debug("Segment not available, but model {} still online. Going on", model.getName()); + } + } else if(cause instanceof HttpException) { + HttpException he = (HttpException) cause; + if(model != null && !isModelOnline()) { + LOG.debug("Error {} while downloading segment, because model {} is offline. Stopping now", he.getResponseCode(), model.getName()); + running = false; + } else { + if(he.getResponseCode() == 404) { + LOG.info("Playlist for {} not found [HTTP 404]. Stopping now", model.getName()); + running = false; + } else if(he.getResponseCode() == 403) { + LOG.info("Playlist for {} not accessible [HTTP 403]. Stopping now", model.getName()); + running = false; + } else { + throw he; + } + } } else { throw e; } @@ -175,43 +268,6 @@ public class MergedHlsDownload extends AbstractHlsDownload { } } - private void downloadMissedSegments(SegmentPlaylist lsp, int nextSegment) throws MalformedURLException { - if(nextSegment > 0 && lsp.seq > nextSegment) { - LOG.warn("Missed segments {} < {} in download for {}", nextSegment, lsp.seq, lsp.url); - String first = lsp.segments.get(0); - int seq = lsp.seq; - for (int i = nextSegment; i < lsp.seq; i++) { - URL segmentUrl = new URL(first.replaceAll(Integer.toString(seq), Integer.toString(i))); - LOG.debug("Loading missed segment {} for model {}", i, lsp.url); - byte[] segmentData; - try { - segmentData = new SegmentDownload(segmentUrl, client).call(); - writeSegment(segmentData); - } catch (Exception e) { - LOG.error("Error while downloading segment {}", segmentUrl, e); - } - } - // TODO switch to a lower bitrate/resolution ?!? - } - } - - private void downloadNewSegments(SegmentPlaylist lsp, int nextSegment) throws MalformedURLException { - int skip = nextSegment - lsp.seq; - for (String segment : lsp.segments) { - if(skip > 0) { - skip--; - } else { - URL segmentUrl = new URL(segment); - try { - byte[] segmentData = new SegmentDownload(segmentUrl, client).call(); - writeSegment(segmentData); - } catch (Exception e) { - LOG.error("Error while downloading segment {}", segmentUrl, e); - } - } - } - } - private void writeSegment(byte[] segmentData) throws InterruptedException { InputStream in = new ByteArrayInputStream(segmentData); InputStreamMTSSource source = InputStreamMTSSource.builder().setInputStream(in).build(); @@ -232,12 +288,18 @@ public class MergedHlsDownload extends AbstractHlsDownload { } } - private void waitForNewSegments(SegmentPlaylist lsp, int lastSegment) { + private void waitForNewSegments(SegmentPlaylist lsp, int lastSegment, long downloadTookMillis) { try { long wait = 0; if (lastSegment == lsp.seq) { - // playlist didn't change -> wait for at least half the target duration - wait = (long) lsp.targetDuration * 1000 / 2; + int timeLeftMillis = (int)(lsp.totalDuration * 1000 - downloadTookMillis); + if(timeLeftMillis < 3000) { // we have less than 3 seconds to get the new playlist and start downloading it + wait = 1; + } else { + // wait a second to be nice to the server (don't hammer it with requests) + // 1 second seems to be a good compromise. every other calculation resulted in more missing segments + wait = 1000; + } LOG.trace("Playlist didn't change... waiting for {}ms", wait); } else { // playlist did change -> wait for at least last segment duration @@ -256,7 +318,9 @@ public class MergedHlsDownload extends AbstractHlsDownload { public void stop() { running = false; alive = false; - streamer.stop(); + if(streamer != null) { + streamer.stop(); + } LOG.debug("Download stopped"); } @@ -281,6 +345,7 @@ public class MergedHlsDownload extends AbstractHlsDownload { .setSink(sink) .setSleepingEnabled(liveStream) .setBufferSize(10) + .setName(model.getName()) .build(); // Start streaming @@ -295,9 +360,10 @@ public class MergedHlsDownload extends AbstractHlsDownload { } finally { closeFile(channel); deleteEmptyRecording(targetFile); + running = false; } }); - t.setName("Segment Merger Thread"); + t.setName("Segment Merger Thread [" + model.getName() + "]"); t.setDaemon(true); return t; } @@ -308,7 +374,7 @@ public class MergedHlsDownload extends AbstractHlsDownload { Files.delete(targetFile.toPath()); Files.delete(targetFile.getParentFile().toPath()); } - } catch (IOException e) { + } catch (Exception e) { LOG.error("Error while deleting empty recording {}", targetFile); } } @@ -318,12 +384,12 @@ public class MergedHlsDownload extends AbstractHlsDownload { if (channel != null) { channel.close(); } - } catch (IOException e) { + } catch (Exception e) { LOG.error("Error while closing file channel", e); } } - private static class SegmentDownload implements Callable { + private class SegmentDownload implements Callable { private URL url; private HttpClient client; @@ -333,24 +399,38 @@ public class MergedHlsDownload extends AbstractHlsDownload { } @Override - public byte[] call() throws Exception { + public byte[] call() throws IOException { LOG.trace("Downloading segment " + url.getFile()); int maxTries = 3; - for (int i = 1; i <= maxTries; i++) { - try { - Request request = new Request.Builder().url(url).addHeader("connection", "keep-alive").build(); - Response response = client.execute(request); - byte[] segment = response.body().bytes(); - return segment; + for (int i = 1; i <= maxTries && running; i++) { + Request request = new Request.Builder().url(url).addHeader("connection", "keep-alive").build(); + try (Response response = client.execute(request)) { + if(response.isSuccessful()) { + byte[] segment = response.body().bytes(); + return segment; + } else { + throw new HttpException(response.code(), response.message()); + } } catch(Exception e) { if (i == maxTries) { LOG.warn("Error while downloading segment. Segment {} finally failed", url.getFile()); } else { - LOG.warn("Error while downloading segment {} on try {}", url.getFile(), i); + LOG.warn("Error while downloading segment {} on try {}", url.getFile(), i, e); + } + if(model != null && !isModelOnline()) { + break; } } } - throw new IOException("Unable to download segment " + url.getFile() + " after " + maxTries + " tries"); + throw new MissingSegmentException("Unable to download segment " + url.getFile() + " after " + maxTries + " tries"); + } + } + + public boolean isModelOnline() { + try { + return model.isOnline(IGNORE_CACHE); + } catch (IOException | ExecutionException | InterruptedException e) { + return false; } } } diff --git a/common/src/main/java/ctbrec/recorder/download/MissingSegmentException.java b/common/src/main/java/ctbrec/recorder/download/MissingSegmentException.java new file mode 100644 index 00000000..d6971aab --- /dev/null +++ b/common/src/main/java/ctbrec/recorder/download/MissingSegmentException.java @@ -0,0 +1,11 @@ +package ctbrec.recorder.download; + +import java.io.IOException; + +public class MissingSegmentException extends IOException { + + public MissingSegmentException(String msg) { + super(msg); + } + +} diff --git a/common/src/main/java/org/taktik/mpegts/Streamer.java b/common/src/main/java/org/taktik/mpegts/Streamer.java index d844da92..3ae5312d 100644 --- a/common/src/main/java/org/taktik/mpegts/Streamer.java +++ b/common/src/main/java/org/taktik/mpegts/Streamer.java @@ -30,12 +30,14 @@ public class Streamer { private Thread streamingThread; private boolean sleepingEnabled; + private String name; - private Streamer(MTSSource source, MTSSink sink, int bufferSize, boolean sleepingEnabled) { + private Streamer(MTSSource source, MTSSink sink, int bufferSize, boolean sleepingEnabled, String name) { this.source = source; this.sink = sink; this.bufferSize = bufferSize; this.sleepingEnabled = sleepingEnabled; + this.name = name; } public void stream() throws InterruptedException { @@ -48,15 +50,15 @@ public class Streamer { try { preBuffer(); } catch (Exception e) { - throw new IllegalStateException("Error while bufering", e); + throw new IllegalStateException("Error while buffering", e); } log.info("Done PreBuffering"); - bufferingThread = new Thread(this::fillBuffer, "buffering"); + bufferingThread = new Thread(this::fillBuffer, "Buffering ["+name+"]"); bufferingThread.setDaemon(true); bufferingThread.start(); - streamingThread = new Thread(this::internalStream, "streaming"); + streamingThread = new Thread(this::internalStream, "Streaming ["+name+"]"); streamingThread.setDaemon(true); streamingThread.start(); @@ -123,7 +125,7 @@ public class Streamer { } } } catch (InterruptedException e1) { - if(!endOfSourceReached) { + if(!endOfSourceReached && !streamingShouldStop) { log.error("Interrupted while waiting for packet"); continue; } else { @@ -240,7 +242,7 @@ public class Streamer { // Stream packet // System.out.println("Streaming packet #" + packetCount + ", PID=" + mtsPacket.getPid() + ", pcrCount=" + pcrCount + ", continuityCounter=" + mtsPacket.getContinuityCounter()); - if(!streamingShouldStop) { + if(!streamingShouldStop && !Thread.interrupted()) { try { sink.send(packet); } catch (Exception e) { @@ -275,7 +277,7 @@ public class Streamer { buffer.put(packet); put = true; } catch (InterruptedException ignored) { - + log.error("Error adding packet to buffer", ignored); } } } @@ -287,7 +289,11 @@ public class Streamer { log.error("Error reading from source", e); } finally { endOfSourceReached = true; - streamingThread.interrupt(); + try { + streamingThread.interrupt(); + } catch(Exception e) { + log.error("Couldn't interrupt streaming thread", e); + } } } @@ -308,6 +314,7 @@ public class Streamer { private MTSSource source; private int bufferSize = 1000; private boolean sleepingEnabled = false; + private String name; public StreamerBuilder setSink(MTSSink sink) { this.sink = sink; @@ -329,10 +336,16 @@ public class Streamer { return this; } + public StreamerBuilder setName(String name) { + this.name = name; + return this; + } + public Streamer build() { Preconditions.checkNotNull(sink); Preconditions.checkNotNull(source); - return new Streamer(source, sink, bufferSize, sleepingEnabled); + return new Streamer(source, sink, bufferSize, sleepingEnabled, name); } + } } \ No newline at end of file