From ff8bbeacc23f504e58235c23f7bf7b835033e073 Mon Sep 17 00:00:00 2001 From: 0xboobface <0xboobface@gmail.com> Date: Wed, 14 Nov 2018 16:24:04 +0100 Subject: [PATCH] Refactor and simplify MergedHlsDownload * Break up the big downloadSegments method into smaller methods * Remove the mergeQueue, because it is not needed anymore. This was a left over from when the download used a thread pool to download the segments --- .../download/AbstractHlsDownload.java | 55 +++--- .../recorder/download/MergedHlsDownload.java | 184 ++++++++++-------- 2 files changed, 131 insertions(+), 108 deletions(-) diff --git a/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java b/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java index 62a8f897..f45bb106 100644 --- a/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java +++ b/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java @@ -30,6 +30,7 @@ import com.iheartradio.m3u8.data.TrackData; import ctbrec.Config; import ctbrec.Model; import ctbrec.io.HttpClient; +import ctbrec.io.HttpException; import okhttp3.Request; import okhttp3.Response; @@ -52,33 +53,34 @@ public abstract class AbstractHlsDownload implements Download { SegmentPlaylist getNextSegments(String segments) throws IOException, ParseException, PlaylistException { URL segmentsUrl = new URL(segments); Request request = new Request.Builder().url(segmentsUrl).addHeader("connection", "keep-alive").build(); - Response response = client.execute(request); - try { - InputStream inputStream = response.body().byteStream(); - PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT); - Playlist playlist = parser.parse(); - if(playlist.hasMediaPlaylist()) { - MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist(); - SegmentPlaylist lsp = new SegmentPlaylist(); - lsp.seq = mediaPlaylist.getMediaSequenceNumber(); - lsp.targetDuration = mediaPlaylist.getTargetDuration(); - List tracks = mediaPlaylist.getTracks(); - for (TrackData trackData : tracks) { - String uri = trackData.getUri(); - if(!uri.startsWith("http")) { - String _url = segmentsUrl.toString(); - _url = _url.substring(0, _url.lastIndexOf('/') + 1); - String segmentUri = _url + uri; - lsp.totalDuration += trackData.getTrackInfo().duration; - lsp.lastSegDuration = trackData.getTrackInfo().duration; - lsp.segments.add(segmentUri); + try(Response response = client.execute(request)) { + if(response.isSuccessful()) { + InputStream inputStream = response.body().byteStream(); + PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT); + Playlist playlist = parser.parse(); + if(playlist.hasMediaPlaylist()) { + MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist(); + SegmentPlaylist lsp = new SegmentPlaylist(segments); + lsp.seq = mediaPlaylist.getMediaSequenceNumber(); + lsp.targetDuration = mediaPlaylist.getTargetDuration(); + List tracks = mediaPlaylist.getTracks(); + for (TrackData trackData : tracks) { + String uri = trackData.getUri(); + if(!uri.startsWith("http")) { + String _url = segmentsUrl.toString(); + _url = _url.substring(0, _url.lastIndexOf('/') + 1); + String segmentUri = _url + uri; + lsp.totalDuration += trackData.getTrackInfo().duration; + lsp.lastSegDuration = trackData.getTrackInfo().duration; + lsp.segments.add(segmentUri); + } } + return lsp; } - return lsp; + return null; + } else { + throw new HttpException(response.code(), response.message()); } - return null; - } finally { - response.close(); } } @@ -131,10 +133,15 @@ public abstract class AbstractHlsDownload implements Download { } public static class SegmentPlaylist { + public String url; public int seq = 0; public float totalDuration = 0; public float lastSegDuration = 0; public float targetDuration = 0; public List segments = new ArrayList<>(); + + public SegmentPlaylist(String url) { + this.url = url; + } } } diff --git a/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java b/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java index 59668dbd..97cc6bf1 100644 --- a/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java +++ b/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java @@ -7,6 +7,7 @@ import java.io.EOFException; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.net.MalformedURLException; import java.net.URL; import java.nio.channels.FileChannel; import java.nio.file.FileSystems; @@ -19,8 +20,6 @@ import java.time.Duration; import java.time.Instant; import java.time.ZonedDateTime; import java.util.Date; -import java.util.LinkedList; -import java.util.Queue; import java.util.concurrent.Callable; import org.slf4j.Logger; @@ -38,6 +37,7 @@ import ctbrec.Config; import ctbrec.Model; import ctbrec.Recording; import ctbrec.io.HttpClient; +import ctbrec.io.HttpException; import ctbrec.recorder.ProgressListener; import okhttp3.Request; import okhttp3.Response; @@ -69,9 +69,13 @@ public class MergedHlsDownload extends AbstractHlsDownload { super.startTime = Instant.now(); downloadDir = targetFile.getParentFile().toPath(); mergeThread = createMergeThread(targetFile, progressListener, false); + LOG.debug("Merge thread started"); mergeThread.start(); + LOG.debug("Downloading segments"); downloadSegments(segmentPlaylistUri, false); + LOG.debug("Waiting for merge thread to finish"); mergeThread.join(); + LOG.debug("Merge thread to finished"); } catch(ParseException e) { throw new IOException("Couldn't parse stream information", e); } catch(PlaylistException e) { @@ -138,98 +142,76 @@ public class MergedHlsDownload extends AbstractHlsDownload { private void downloadSegments(String segmentPlaylistUri, boolean livestreamDownload) throws IOException, ParseException, PlaylistException { int lastSegment = 0; int nextSegment = 0; - Queue mergeQueue = new LinkedList<>(); while(running) { - SegmentPlaylist lsp = getNextSegments(segmentPlaylistUri); - if(!livestreamDownload) { - multiSource.setTotalSegments(lsp.segments.size()); - } - if(nextSegment > 0 && lsp.seq > nextSegment) { - LOG.warn("Missed segments {} < {} in download for {}", nextSegment, lsp.seq, segmentPlaylistUri); - String first = lsp.segments.get(0); - int seq = lsp.seq; - for (int i = nextSegment; i < lsp.seq; i++) { - URL segmentUrl = new URL(first.replaceAll(Integer.toString(seq), Integer.toString(i))); - LOG.debug("Loading missed segment {} for model {}", i, segmentPlaylistUri); - byte[] segmentData; - try { - segmentData = new SegmentDownload(segmentUrl, client).call(); - mergeQueue.add(segmentData); - } catch (Exception e) { - LOG.error("Error while downloading segment {}", segmentUrl, e); - } + try { + SegmentPlaylist lsp = getNextSegments(segmentPlaylistUri); + if(!livestreamDownload) { + multiSource.setTotalSegments(lsp.segments.size()); } - // TODO switch to a lower bitrate/resolution ?!? - } - int skip = nextSegment - lsp.seq; - for (String segment : lsp.segments) { - if(skip > 0) { - skip--; - } else { - URL segmentUrl = new URL(segment); - try { - byte[] segmentData = new SegmentDownload(segmentUrl, client).call(); - if(livestreamDownload) { - mergeQueue.add(segmentData); - } else { - writeSegment(segmentData); - } - } catch (Exception e) { - LOG.error("Error while downloading segment {}", segmentUrl, e); - } - } - } - if(livestreamDownload) { + // download segments, which might have been skipped + downloadMissedSegments(lsp, nextSegment); + // download new segments - while(!mergeQueue.isEmpty()) { - try { - writeSegment(mergeQueue.poll()); - } catch (InterruptedException e) { - if(running) { - LOG.error("Interrupted while waiting to add a new segment to multi source", e); - } - } - } + downloadNewSegments(lsp, nextSegment); - // split up the recording, if configured - if(config.getSettings().splitRecordings > 0) { - Duration recordingDuration = Duration.between(startTime, ZonedDateTime.now()); - long seconds = recordingDuration.getSeconds(); - if(seconds >= config.getSettings().splitRecordings) { - streamer.stop(); - File target = new File(targetFile.getAbsolutePath().replaceAll("\\.ts", "-"+df.format(++splitCounter)+".ts")); - mergeThread = createMergeThread(target, null, true); - mergeThread.start(); - startTime = ZonedDateTime.now(); - } - } + if(livestreamDownload) { + // split up the recording, if configured + splitRecording(); - // wait some time until requesting the segment playlist again to not hammer the server - try { - long wait = 0; - if (lastSegment == lsp.seq) { - // playlist didn't change -> wait for at least half the target duration - wait = (long) lsp.targetDuration * 1000 / 2; - LOG.trace("Playlist didn't change... waiting for {}ms", wait); - } else { - // playlist did change -> wait for at least last segment duration - wait = 1;// (long) lsp.lastSegDuration * 1000; - LOG.trace("Playlist changed... waiting for {}ms", wait); - } - Thread.sleep(wait); - } catch (InterruptedException e) { - if (running) { - LOG.error("Couldn't sleep between segment downloads. This might mess up the download!"); - } + // wait some time until requesting the segment playlist again to not hammer the server + waitForNewSegments(lsp, lastSegment); + + lastSegment = lsp.seq; + nextSegment = lastSegment + lsp.segments.size(); + } else { + break; + } + } catch(HttpException e) { + if(e.getResponseCode() == 404) { + // playlist is gone -> model probably logged out + LOG.debug("Playlist not found. Assuming model went offline"); + running = false; + } else { + throw e; } } + } + } - lastSegment = lsp.seq; - nextSegment = lastSegment + lsp.segments.size(); + private void downloadMissedSegments(SegmentPlaylist lsp, int nextSegment) throws MalformedURLException { + if(nextSegment > 0 && lsp.seq > nextSegment) { + LOG.warn("Missed segments {} < {} in download for {}", nextSegment, lsp.seq, lsp.url); + String first = lsp.segments.get(0); + int seq = lsp.seq; + for (int i = nextSegment; i < lsp.seq; i++) { + URL segmentUrl = new URL(first.replaceAll(Integer.toString(seq), Integer.toString(i))); + LOG.debug("Loading missed segment {} for model {}", i, lsp.url); + byte[] segmentData; + try { + segmentData = new SegmentDownload(segmentUrl, client).call(); + writeSegment(segmentData); + } catch (Exception e) { + LOG.error("Error while downloading segment {}", segmentUrl, e); + } + } + // TODO switch to a lower bitrate/resolution ?!? + } + } - if(!livestreamDownload) { - break; + private void downloadNewSegments(SegmentPlaylist lsp, int nextSegment) throws MalformedURLException { + int skip = nextSegment - lsp.seq; + for (String segment : lsp.segments) { + if(skip > 0) { + skip--; + } else { + URL segmentUrl = new URL(segment); + try { + byte[] segmentData = new SegmentDownload(segmentUrl, client).call(); + writeSegment(segmentData); + } catch (Exception e) { + LOG.error("Error while downloading segment {}", segmentUrl, e); + } } } } @@ -240,6 +222,40 @@ public class MergedHlsDownload extends AbstractHlsDownload { multiSource.addSource(source); } + private void splitRecording() { + if(config.getSettings().splitRecordings > 0) { + Duration recordingDuration = Duration.between(startTime, ZonedDateTime.now()); + long seconds = recordingDuration.getSeconds(); + if(seconds >= config.getSettings().splitRecordings) { + streamer.stop(); + File target = new File(targetFile.getAbsolutePath().replaceAll("\\.ts", "-"+df.format(++splitCounter)+".ts")); + mergeThread = createMergeThread(target, null, true); + mergeThread.start(); + startTime = ZonedDateTime.now(); + } + } + } + + private void waitForNewSegments(SegmentPlaylist lsp, int lastSegment) { + try { + long wait = 0; + if (lastSegment == lsp.seq) { + // playlist didn't change -> wait for at least half the target duration + wait = (long) lsp.targetDuration * 1000 / 2; + LOG.trace("Playlist didn't change... waiting for {}ms", wait); + } else { + // playlist did change -> wait for at least last segment duration + wait = 1;// (long) lsp.lastSegDuration * 1000; + LOG.trace("Playlist changed... waiting for {}ms", wait); + } + Thread.sleep(wait); + } catch (InterruptedException e) { + if (running) { + LOG.error("Couldn't sleep between segment downloads. This might mess up the download!"); + } + } + } + @Override public void stop() { running = false;