forked from j62/ctbrec
1
0
Fork 0

Improve MergedHlsDownload

* Add better exception handling
* Check, if the model is still online, when an error occurs
* Download segments in parallel, so that less segments are missed
This commit is contained in:
0xboobface 2018-11-28 15:22:42 +01:00
parent 7edc79b0e3
commit c4c8fe83fa
3 changed files with 179 additions and 75 deletions

View File

@ -19,7 +19,16 @@ import java.text.DecimalFormat;
import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -53,6 +62,8 @@ public class MergedHlsDownload extends AbstractHlsDownload {
private File targetFile;
private DecimalFormat df = new DecimalFormat("00000");
private int splitCounter = 0;
private BlockingQueue<Runnable> downloadQueue = new LinkedBlockingQueue<>(50);
private ExecutorService downloadThreadPool = new ThreadPoolExecutor(5, 5, 2, TimeUnit.MINUTES, downloadQueue);
public MergedHlsDownload(HttpClient client) {
super(client);
@ -81,7 +92,7 @@ public class MergedHlsDownload extends AbstractHlsDownload {
downloadSegments(segmentPlaylistUri, false);
LOG.debug("Waiting for merge thread to finish");
mergeThread.join();
LOG.debug("Merge thread to finished");
LOG.debug("Merge thread finished");
} catch(ParseException e) {
throw new IOException("Couldn't parse stream information", e);
} catch(PlaylistException e) {
@ -92,7 +103,12 @@ public class MergedHlsDownload extends AbstractHlsDownload {
throw new IOException("Couldn't add HMAC to playlist url", e);
} finally {
alive = false;
streamer.stop();
try {
streamer.stop();
} catch(Exception e) {
LOG.error("Couldn't stop streamer", e);
}
downloadThreadPool.shutdown();
LOG.debug("Download terminated for {}", segmentPlaylistUri);
}
}
@ -129,7 +145,11 @@ public class MergedHlsDownload extends AbstractHlsDownload {
} finally {
alive = false;
if(streamer != null) {
streamer.stop();
try {
streamer.stop();
} catch(Exception e) {
LOG.error("Couldn't stop streamer", e);
}
}
LOG.debug("Download for {} terminated", model);
}
@ -138,36 +158,109 @@ public class MergedHlsDownload extends AbstractHlsDownload {
private void downloadSegments(String segmentPlaylistUri, boolean livestreamDownload) throws IOException, ParseException, PlaylistException {
int lastSegment = 0;
int nextSegment = 0;
long playlistNotFoundFirstEncounter = -1;
while(running) {
try {
if(playlistNotFoundFirstEncounter != -1) {
LOG.debug("Downloading playlist {}", segmentPlaylistUri);
}
SegmentPlaylist lsp = getNextSegments(segmentPlaylistUri);
playlistNotFoundFirstEncounter = -1;
if(!livestreamDownload) {
multiSource.setTotalSegments(lsp.segments.size());
}
// download segments, which might have been skipped
downloadMissedSegments(lsp, nextSegment);
// download new segments
long downloadStart = System.currentTimeMillis();
downloadNewSegments(lsp, nextSegment);
long downloadTookMillis = System.currentTimeMillis() - downloadStart;
// download segments, which might have been skipped
//downloadMissedSegments(lsp, nextSegment);
if(nextSegment > 0 && lsp.seq > nextSegment) {
LOG.warn("Missed segments {} < {} in download for {}. Download took {}ms. Playlist is {}sec", nextSegment, lsp.seq, lsp.url, downloadTookMillis, lsp.totalDuration);
}
if(livestreamDownload) {
// split up the recording, if configured
splitRecording();
// wait some time until requesting the segment playlist again to not hammer the server
waitForNewSegments(lsp, lastSegment);
waitForNewSegments(lsp, lastSegment, downloadTookMillis);
lastSegment = lsp.seq;
nextSegment = lastSegment + lsp.segments.size();
} else {
break;
}
} catch(HttpException e) {
if(e.getResponseCode() == 404) {
// playlist is gone -> model probably logged out
LOG.debug("Playlist not found. Assuming model went offline");
running = false;
} catch(Exception e) {
LOG.info("Unexpected error while downloading ", model.getName());
running = false;
}
}
}
private void downloadNewSegments(SegmentPlaylist lsp, int nextSegment) throws MalformedURLException, MissingSegmentException, ExecutionException, HttpException {
int skip = nextSegment - lsp.seq;
if(lsp.segments.isEmpty()) {
LOG.debug("Empty playlist: {}", lsp.url);
}
// add segments to download threadpool
Queue<Future<byte[]>> downloads = new LinkedList<>();
if(downloadQueue.remainingCapacity() == 0) {
LOG.warn("Download to slow for this stream. Download queue is full. Skipping segment");
} else {
for (String segment : lsp.segments) {
if(!running) {
break;
}
if(skip > 0) {
skip--;
} else {
URL segmentUrl = new URL(segment);
Future<byte[]> download = downloadThreadPool.submit(new SegmentDownload(segmentUrl, client));
downloads.add(download);
}
}
}
// get completed downloads and write them to the file
writeFinishedSegments(downloads);
}
private void writeFinishedSegments(Queue<Future<byte[]>> downloads) throws ExecutionException, HttpException {
for (Future<byte[]> downloadFuture : downloads) {
try {
byte[] segmentData = downloadFuture.get();
writeSegment(segmentData);
} catch (InterruptedException e) {
LOG.error("Error while downloading segment", e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if(cause instanceof MissingSegmentException) {
if(model != null && !isModelOnline()) {
LOG.debug("Error while downloading segment, because model {} is offline. Stopping now", model.getName());
running = false;
} else {
LOG.debug("Segment not available, but model {} still online. Going on", model.getName());
}
} else if(cause instanceof HttpException) {
HttpException he = (HttpException) cause;
if(model != null && !isModelOnline()) {
LOG.debug("Error {} while downloading segment, because model {} is offline. Stopping now", he.getResponseCode(), model.getName());
running = false;
} else {
if(he.getResponseCode() == 404) {
LOG.info("Playlist for {} not found [HTTP 404]. Stopping now", model.getName());
running = false;
} else if(he.getResponseCode() == 403) {
LOG.info("Playlist for {} not accessible [HTTP 403]. Stopping now", model.getName());
running = false;
} else {
throw he;
}
}
} else {
throw e;
}
@ -175,43 +268,6 @@ public class MergedHlsDownload extends AbstractHlsDownload {
}
}
private void downloadMissedSegments(SegmentPlaylist lsp, int nextSegment) throws MalformedURLException {
if(nextSegment > 0 && lsp.seq > nextSegment) {
LOG.warn("Missed segments {} < {} in download for {}", nextSegment, lsp.seq, lsp.url);
String first = lsp.segments.get(0);
int seq = lsp.seq;
for (int i = nextSegment; i < lsp.seq; i++) {
URL segmentUrl = new URL(first.replaceAll(Integer.toString(seq), Integer.toString(i)));
LOG.debug("Loading missed segment {} for model {}", i, lsp.url);
byte[] segmentData;
try {
segmentData = new SegmentDownload(segmentUrl, client).call();
writeSegment(segmentData);
} catch (Exception e) {
LOG.error("Error while downloading segment {}", segmentUrl, e);
}
}
// TODO switch to a lower bitrate/resolution ?!?
}
}
private void downloadNewSegments(SegmentPlaylist lsp, int nextSegment) throws MalformedURLException {
int skip = nextSegment - lsp.seq;
for (String segment : lsp.segments) {
if(skip > 0) {
skip--;
} else {
URL segmentUrl = new URL(segment);
try {
byte[] segmentData = new SegmentDownload(segmentUrl, client).call();
writeSegment(segmentData);
} catch (Exception e) {
LOG.error("Error while downloading segment {}", segmentUrl, e);
}
}
}
}
private void writeSegment(byte[] segmentData) throws InterruptedException {
InputStream in = new ByteArrayInputStream(segmentData);
InputStreamMTSSource source = InputStreamMTSSource.builder().setInputStream(in).build();
@ -232,12 +288,18 @@ public class MergedHlsDownload extends AbstractHlsDownload {
}
}
private void waitForNewSegments(SegmentPlaylist lsp, int lastSegment) {
private void waitForNewSegments(SegmentPlaylist lsp, int lastSegment, long downloadTookMillis) {
try {
long wait = 0;
if (lastSegment == lsp.seq) {
// playlist didn't change -> wait for at least half the target duration
wait = (long) lsp.targetDuration * 1000 / 2;
int timeLeftMillis = (int)(lsp.totalDuration * 1000 - downloadTookMillis);
if(timeLeftMillis < 3000) { // we have less than 3 seconds to get the new playlist and start downloading it
wait = 1;
} else {
// wait a second to be nice to the server (don't hammer it with requests)
// 1 second seems to be a good compromise. every other calculation resulted in more missing segments
wait = 1000;
}
LOG.trace("Playlist didn't change... waiting for {}ms", wait);
} else {
// playlist did change -> wait for at least last segment duration
@ -256,7 +318,9 @@ public class MergedHlsDownload extends AbstractHlsDownload {
public void stop() {
running = false;
alive = false;
streamer.stop();
if(streamer != null) {
streamer.stop();
}
LOG.debug("Download stopped");
}
@ -281,6 +345,7 @@ public class MergedHlsDownload extends AbstractHlsDownload {
.setSink(sink)
.setSleepingEnabled(liveStream)
.setBufferSize(10)
.setName(model.getName())
.build();
// Start streaming
@ -295,9 +360,10 @@ public class MergedHlsDownload extends AbstractHlsDownload {
} finally {
closeFile(channel);
deleteEmptyRecording(targetFile);
running = false;
}
});
t.setName("Segment Merger Thread");
t.setName("Segment Merger Thread [" + model.getName() + "]");
t.setDaemon(true);
return t;
}
@ -308,7 +374,7 @@ public class MergedHlsDownload extends AbstractHlsDownload {
Files.delete(targetFile.toPath());
Files.delete(targetFile.getParentFile().toPath());
}
} catch (IOException e) {
} catch (Exception e) {
LOG.error("Error while deleting empty recording {}", targetFile);
}
}
@ -318,12 +384,12 @@ public class MergedHlsDownload extends AbstractHlsDownload {
if (channel != null) {
channel.close();
}
} catch (IOException e) {
} catch (Exception e) {
LOG.error("Error while closing file channel", e);
}
}
private static class SegmentDownload implements Callable<byte[]> {
private class SegmentDownload implements Callable<byte[]> {
private URL url;
private HttpClient client;
@ -333,24 +399,38 @@ public class MergedHlsDownload extends AbstractHlsDownload {
}
@Override
public byte[] call() throws Exception {
public byte[] call() throws IOException {
LOG.trace("Downloading segment " + url.getFile());
int maxTries = 3;
for (int i = 1; i <= maxTries; i++) {
try {
Request request = new Request.Builder().url(url).addHeader("connection", "keep-alive").build();
Response response = client.execute(request);
byte[] segment = response.body().bytes();
return segment;
for (int i = 1; i <= maxTries && running; i++) {
Request request = new Request.Builder().url(url).addHeader("connection", "keep-alive").build();
try (Response response = client.execute(request)) {
if(response.isSuccessful()) {
byte[] segment = response.body().bytes();
return segment;
} else {
throw new HttpException(response.code(), response.message());
}
} catch(Exception e) {
if (i == maxTries) {
LOG.warn("Error while downloading segment. Segment {} finally failed", url.getFile());
} else {
LOG.warn("Error while downloading segment {} on try {}", url.getFile(), i);
LOG.warn("Error while downloading segment {} on try {}", url.getFile(), i, e);
}
if(model != null && !isModelOnline()) {
break;
}
}
}
throw new IOException("Unable to download segment " + url.getFile() + " after " + maxTries + " tries");
throw new MissingSegmentException("Unable to download segment " + url.getFile() + " after " + maxTries + " tries");
}
}
public boolean isModelOnline() {
try {
return model.isOnline(IGNORE_CACHE);
} catch (IOException | ExecutionException | InterruptedException e) {
return false;
}
}
}

View File

@ -0,0 +1,11 @@
package ctbrec.recorder.download;
import java.io.IOException;
public class MissingSegmentException extends IOException {
public MissingSegmentException(String msg) {
super(msg);
}
}

View File

@ -30,12 +30,14 @@ public class Streamer {
private Thread streamingThread;
private boolean sleepingEnabled;
private String name;
private Streamer(MTSSource source, MTSSink sink, int bufferSize, boolean sleepingEnabled) {
private Streamer(MTSSource source, MTSSink sink, int bufferSize, boolean sleepingEnabled, String name) {
this.source = source;
this.sink = sink;
this.bufferSize = bufferSize;
this.sleepingEnabled = sleepingEnabled;
this.name = name;
}
public void stream() throws InterruptedException {
@ -48,15 +50,15 @@ public class Streamer {
try {
preBuffer();
} catch (Exception e) {
throw new IllegalStateException("Error while bufering", e);
throw new IllegalStateException("Error while buffering", e);
}
log.info("Done PreBuffering");
bufferingThread = new Thread(this::fillBuffer, "buffering");
bufferingThread = new Thread(this::fillBuffer, "Buffering ["+name+"]");
bufferingThread.setDaemon(true);
bufferingThread.start();
streamingThread = new Thread(this::internalStream, "streaming");
streamingThread = new Thread(this::internalStream, "Streaming ["+name+"]");
streamingThread.setDaemon(true);
streamingThread.start();
@ -123,7 +125,7 @@ public class Streamer {
}
}
} catch (InterruptedException e1) {
if(!endOfSourceReached) {
if(!endOfSourceReached && !streamingShouldStop) {
log.error("Interrupted while waiting for packet");
continue;
} else {
@ -240,7 +242,7 @@ public class Streamer {
// Stream packet
// System.out.println("Streaming packet #" + packetCount + ", PID=" + mtsPacket.getPid() + ", pcrCount=" + pcrCount + ", continuityCounter=" + mtsPacket.getContinuityCounter());
if(!streamingShouldStop) {
if(!streamingShouldStop && !Thread.interrupted()) {
try {
sink.send(packet);
} catch (Exception e) {
@ -275,7 +277,7 @@ public class Streamer {
buffer.put(packet);
put = true;
} catch (InterruptedException ignored) {
log.error("Error adding packet to buffer", ignored);
}
}
}
@ -287,7 +289,11 @@ public class Streamer {
log.error("Error reading from source", e);
} finally {
endOfSourceReached = true;
streamingThread.interrupt();
try {
streamingThread.interrupt();
} catch(Exception e) {
log.error("Couldn't interrupt streaming thread", e);
}
}
}
@ -308,6 +314,7 @@ public class Streamer {
private MTSSource source;
private int bufferSize = 1000;
private boolean sleepingEnabled = false;
private String name;
public StreamerBuilder setSink(MTSSink sink) {
this.sink = sink;
@ -329,10 +336,16 @@ public class Streamer {
return this;
}
public StreamerBuilder setName(String name) {
this.name = name;
return this;
}
public Streamer build() {
Preconditions.checkNotNull(sink);
Preconditions.checkNotNull(source);
return new Streamer(source, sink, bufferSize, sleepingEnabled);
return new Streamer(source, sink, bufferSize, sleepingEnabled, name);
}
}
}