Refactor and simplify MergedHlsDownload

* Break up the big downloadSegments method into smaller methods
* Remove the mergeQueue, because it is not needed anymore. This was
  a left over from when the download used a thread pool to download
  the segments
This commit is contained in:
0xboobface 2018-11-14 16:24:04 +01:00
parent 434e0a1f64
commit ff8bbeacc2
2 changed files with 131 additions and 108 deletions

View File

@ -30,6 +30,7 @@ import com.iheartradio.m3u8.data.TrackData;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.io.HttpClient;
import ctbrec.io.HttpException;
import okhttp3.Request;
import okhttp3.Response;
@ -52,33 +53,34 @@ public abstract class AbstractHlsDownload implements Download {
SegmentPlaylist getNextSegments(String segments) throws IOException, ParseException, PlaylistException {
URL segmentsUrl = new URL(segments);
Request request = new Request.Builder().url(segmentsUrl).addHeader("connection", "keep-alive").build();
Response response = client.execute(request);
try {
InputStream inputStream = response.body().byteStream();
PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT);
Playlist playlist = parser.parse();
if(playlist.hasMediaPlaylist()) {
MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist();
SegmentPlaylist lsp = new SegmentPlaylist();
lsp.seq = mediaPlaylist.getMediaSequenceNumber();
lsp.targetDuration = mediaPlaylist.getTargetDuration();
List<TrackData> tracks = mediaPlaylist.getTracks();
for (TrackData trackData : tracks) {
String uri = trackData.getUri();
if(!uri.startsWith("http")) {
String _url = segmentsUrl.toString();
_url = _url.substring(0, _url.lastIndexOf('/') + 1);
String segmentUri = _url + uri;
lsp.totalDuration += trackData.getTrackInfo().duration;
lsp.lastSegDuration = trackData.getTrackInfo().duration;
lsp.segments.add(segmentUri);
try(Response response = client.execute(request)) {
if(response.isSuccessful()) {
InputStream inputStream = response.body().byteStream();
PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT);
Playlist playlist = parser.parse();
if(playlist.hasMediaPlaylist()) {
MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist();
SegmentPlaylist lsp = new SegmentPlaylist(segments);
lsp.seq = mediaPlaylist.getMediaSequenceNumber();
lsp.targetDuration = mediaPlaylist.getTargetDuration();
List<TrackData> tracks = mediaPlaylist.getTracks();
for (TrackData trackData : tracks) {
String uri = trackData.getUri();
if(!uri.startsWith("http")) {
String _url = segmentsUrl.toString();
_url = _url.substring(0, _url.lastIndexOf('/') + 1);
String segmentUri = _url + uri;
lsp.totalDuration += trackData.getTrackInfo().duration;
lsp.lastSegDuration = trackData.getTrackInfo().duration;
lsp.segments.add(segmentUri);
}
}
return lsp;
}
return lsp;
return null;
} else {
throw new HttpException(response.code(), response.message());
}
return null;
} finally {
response.close();
}
}
@ -131,10 +133,15 @@ public abstract class AbstractHlsDownload implements Download {
}
public static class SegmentPlaylist {
public String url;
public int seq = 0;
public float totalDuration = 0;
public float lastSegDuration = 0;
public float targetDuration = 0;
public List<String> segments = new ArrayList<>();
public SegmentPlaylist(String url) {
this.url = url;
}
}
}

View File

@ -7,6 +7,7 @@ import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.channels.FileChannel;
import java.nio.file.FileSystems;
@ -19,8 +20,6 @@ import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
@ -38,6 +37,7 @@ import ctbrec.Config;
import ctbrec.Model;
import ctbrec.Recording;
import ctbrec.io.HttpClient;
import ctbrec.io.HttpException;
import ctbrec.recorder.ProgressListener;
import okhttp3.Request;
import okhttp3.Response;
@ -69,9 +69,13 @@ public class MergedHlsDownload extends AbstractHlsDownload {
super.startTime = Instant.now();
downloadDir = targetFile.getParentFile().toPath();
mergeThread = createMergeThread(targetFile, progressListener, false);
LOG.debug("Merge thread started");
mergeThread.start();
LOG.debug("Downloading segments");
downloadSegments(segmentPlaylistUri, false);
LOG.debug("Waiting for merge thread to finish");
mergeThread.join();
LOG.debug("Merge thread to finished");
} catch(ParseException e) {
throw new IOException("Couldn't parse stream information", e);
} catch(PlaylistException e) {
@ -138,98 +142,76 @@ public class MergedHlsDownload extends AbstractHlsDownload {
private void downloadSegments(String segmentPlaylistUri, boolean livestreamDownload) throws IOException, ParseException, PlaylistException {
int lastSegment = 0;
int nextSegment = 0;
Queue<byte[]> mergeQueue = new LinkedList<>();
while(running) {
SegmentPlaylist lsp = getNextSegments(segmentPlaylistUri);
if(!livestreamDownload) {
multiSource.setTotalSegments(lsp.segments.size());
}
if(nextSegment > 0 && lsp.seq > nextSegment) {
LOG.warn("Missed segments {} < {} in download for {}", nextSegment, lsp.seq, segmentPlaylistUri);
String first = lsp.segments.get(0);
int seq = lsp.seq;
for (int i = nextSegment; i < lsp.seq; i++) {
URL segmentUrl = new URL(first.replaceAll(Integer.toString(seq), Integer.toString(i)));
LOG.debug("Loading missed segment {} for model {}", i, segmentPlaylistUri);
byte[] segmentData;
try {
segmentData = new SegmentDownload(segmentUrl, client).call();
mergeQueue.add(segmentData);
} catch (Exception e) {
LOG.error("Error while downloading segment {}", segmentUrl, e);
}
try {
SegmentPlaylist lsp = getNextSegments(segmentPlaylistUri);
if(!livestreamDownload) {
multiSource.setTotalSegments(lsp.segments.size());
}
// TODO switch to a lower bitrate/resolution ?!?
}
int skip = nextSegment - lsp.seq;
for (String segment : lsp.segments) {
if(skip > 0) {
skip--;
} else {
URL segmentUrl = new URL(segment);
try {
byte[] segmentData = new SegmentDownload(segmentUrl, client).call();
if(livestreamDownload) {
mergeQueue.add(segmentData);
} else {
writeSegment(segmentData);
}
} catch (Exception e) {
LOG.error("Error while downloading segment {}", segmentUrl, e);
}
}
}
if(livestreamDownload) {
// download segments, which might have been skipped
downloadMissedSegments(lsp, nextSegment);
// download new segments
while(!mergeQueue.isEmpty()) {
try {
writeSegment(mergeQueue.poll());
} catch (InterruptedException e) {
if(running) {
LOG.error("Interrupted while waiting to add a new segment to multi source", e);
}
}
}
downloadNewSegments(lsp, nextSegment);
// split up the recording, if configured
if(config.getSettings().splitRecordings > 0) {
Duration recordingDuration = Duration.between(startTime, ZonedDateTime.now());
long seconds = recordingDuration.getSeconds();
if(seconds >= config.getSettings().splitRecordings) {
streamer.stop();
File target = new File(targetFile.getAbsolutePath().replaceAll("\\.ts", "-"+df.format(++splitCounter)+".ts"));
mergeThread = createMergeThread(target, null, true);
mergeThread.start();
startTime = ZonedDateTime.now();
}
}
if(livestreamDownload) {
// split up the recording, if configured
splitRecording();
// wait some time until requesting the segment playlist again to not hammer the server
try {
long wait = 0;
if (lastSegment == lsp.seq) {
// playlist didn't change -> wait for at least half the target duration
wait = (long) lsp.targetDuration * 1000 / 2;
LOG.trace("Playlist didn't change... waiting for {}ms", wait);
} else {
// playlist did change -> wait for at least last segment duration
wait = 1;// (long) lsp.lastSegDuration * 1000;
LOG.trace("Playlist changed... waiting for {}ms", wait);
}
Thread.sleep(wait);
} catch (InterruptedException e) {
if (running) {
LOG.error("Couldn't sleep between segment downloads. This might mess up the download!");
}
// wait some time until requesting the segment playlist again to not hammer the server
waitForNewSegments(lsp, lastSegment);
lastSegment = lsp.seq;
nextSegment = lastSegment + lsp.segments.size();
} else {
break;
}
} catch(HttpException e) {
if(e.getResponseCode() == 404) {
// playlist is gone -> model probably logged out
LOG.debug("Playlist not found. Assuming model went offline");
running = false;
} else {
throw e;
}
}
}
}
lastSegment = lsp.seq;
nextSegment = lastSegment + lsp.segments.size();
private void downloadMissedSegments(SegmentPlaylist lsp, int nextSegment) throws MalformedURLException {
if(nextSegment > 0 && lsp.seq > nextSegment) {
LOG.warn("Missed segments {} < {} in download for {}", nextSegment, lsp.seq, lsp.url);
String first = lsp.segments.get(0);
int seq = lsp.seq;
for (int i = nextSegment; i < lsp.seq; i++) {
URL segmentUrl = new URL(first.replaceAll(Integer.toString(seq), Integer.toString(i)));
LOG.debug("Loading missed segment {} for model {}", i, lsp.url);
byte[] segmentData;
try {
segmentData = new SegmentDownload(segmentUrl, client).call();
writeSegment(segmentData);
} catch (Exception e) {
LOG.error("Error while downloading segment {}", segmentUrl, e);
}
}
// TODO switch to a lower bitrate/resolution ?!?
}
}
if(!livestreamDownload) {
break;
private void downloadNewSegments(SegmentPlaylist lsp, int nextSegment) throws MalformedURLException {
int skip = nextSegment - lsp.seq;
for (String segment : lsp.segments) {
if(skip > 0) {
skip--;
} else {
URL segmentUrl = new URL(segment);
try {
byte[] segmentData = new SegmentDownload(segmentUrl, client).call();
writeSegment(segmentData);
} catch (Exception e) {
LOG.error("Error while downloading segment {}", segmentUrl, e);
}
}
}
}
@ -240,6 +222,40 @@ public class MergedHlsDownload extends AbstractHlsDownload {
multiSource.addSource(source);
}
private void splitRecording() {
if(config.getSettings().splitRecordings > 0) {
Duration recordingDuration = Duration.between(startTime, ZonedDateTime.now());
long seconds = recordingDuration.getSeconds();
if(seconds >= config.getSettings().splitRecordings) {
streamer.stop();
File target = new File(targetFile.getAbsolutePath().replaceAll("\\.ts", "-"+df.format(++splitCounter)+".ts"));
mergeThread = createMergeThread(target, null, true);
mergeThread.start();
startTime = ZonedDateTime.now();
}
}
}
private void waitForNewSegments(SegmentPlaylist lsp, int lastSegment) {
try {
long wait = 0;
if (lastSegment == lsp.seq) {
// playlist didn't change -> wait for at least half the target duration
wait = (long) lsp.targetDuration * 1000 / 2;
LOG.trace("Playlist didn't change... waiting for {}ms", wait);
} else {
// playlist did change -> wait for at least last segment duration
wait = 1;// (long) lsp.lastSegDuration * 1000;
LOG.trace("Playlist changed... waiting for {}ms", wait);
}
Thread.sleep(wait);
} catch (InterruptedException e) {
if (running) {
LOG.error("Couldn't sleep between segment downloads. This might mess up the download!");
}
}
}
@Override
public void stop() {
running = false;