Fix problem, that downloads wouldn't finish properly
Some downloads couldn't be stopped properly, because they would wait for segment data to arrive to write to disk indefinitely. We now only wait for a max of 30 seconds and also cancel all futures, which are waiting for segment data.
This commit is contained in:
parent
963f0f0f5f
commit
2154aacdbe
|
@ -1,5 +1,7 @@
|
|||
package ctbrec.recorder.download.hls;
|
||||
|
||||
import static java.util.Optional.*;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
|
@ -15,12 +17,13 @@ import java.time.ZonedDateTime;
|
|||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Optional;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
@ -57,6 +60,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
|
|||
private transient OutputStream ffmpegStdIn;
|
||||
protected transient Thread ffmpegThread;
|
||||
private transient Object ffmpegStartMonitor = new Object();
|
||||
private Queue<Future<byte[]>> downloads = new LinkedList<>();
|
||||
|
||||
public MergedFfmpegHlsDownload(HttpClient client) {
|
||||
super(client);
|
||||
|
@ -105,6 +109,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
|
|||
LOG.debug("Starting to download segments");
|
||||
downloadSegments(segments, true);
|
||||
ffmpegThread.join();
|
||||
LOG.debug("FFmpeg thread terminated");
|
||||
}
|
||||
} catch (ParseException e) {
|
||||
throw new IOException("Couldn't parse stream information", e);
|
||||
|
@ -179,7 +184,8 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
|
|||
}
|
||||
}
|
||||
});
|
||||
ffmpegThread.setName("FFmpeg");
|
||||
String name = "FFmpeg " + ofNullable(model).map(Model::getName).orElse("").trim();
|
||||
ffmpegThread.setName(name);
|
||||
ffmpegThread.start();
|
||||
}
|
||||
|
||||
|
@ -254,7 +260,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
|
|||
int skip = nextSegment - lsp.seq;
|
||||
|
||||
// add segments to download threadpool
|
||||
Queue<Future<byte[]>> downloads = new LinkedList<>();
|
||||
downloads.clear();
|
||||
if (downloadQueue.remainingCapacity() == 0) {
|
||||
LOG.warn("Download to slow for this stream. Download queue is full. Skipping segment");
|
||||
} else {
|
||||
|
@ -278,11 +284,15 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
|
|||
private void writeFinishedSegments(Queue<Future<byte[]>> downloads) throws ExecutionException, IOException {
|
||||
for (Future<byte[]> downloadFuture : downloads) {
|
||||
try {
|
||||
byte[] segmentData = downloadFuture.get();
|
||||
byte[] segmentData = downloadFuture.get(30, TimeUnit.SECONDS);
|
||||
writeSegment(segmentData);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
LOG.error("Error while downloading segment", e);
|
||||
} catch (TimeoutException e) {
|
||||
LOG.info("Segment download took too long for {}. Not waiting for it any longer", getModel());
|
||||
} catch (CancellationException e) {
|
||||
LOG.info("Segment download cancelled");
|
||||
} catch (ExecutionException e) {
|
||||
Throwable cause = e.getCause();
|
||||
if (cause instanceof MissingSegmentException) {
|
||||
|
@ -290,7 +300,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
|
|||
LOG.debug("Error while downloading segment, because model {} is offline. Stopping now", model.getName());
|
||||
running = false;
|
||||
} else {
|
||||
LOG.debug("Segment not available, but model {} still online. Going on", Optional.ofNullable(model).map(Model::getName).orElse("n/a"));
|
||||
LOG.debug("Segment not available, but model {} still online. Going on", ofNullable(model).map(Model::getName).orElse("n/a"));
|
||||
}
|
||||
} else if (cause instanceof HttpException) {
|
||||
HttpException he = (HttpException) cause;
|
||||
|
@ -299,10 +309,10 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
|
|||
running = false;
|
||||
} else {
|
||||
if (he.getResponseCode() == 404) {
|
||||
LOG.info("Playlist for {} not found [HTTP 404]. Stopping now", Optional.ofNullable(model).map(Model::getName).orElse("n/a"));
|
||||
LOG.info("Playlist for {} not found [HTTP 404]. Stopping now", ofNullable(model).map(Model::getName).orElse("n/a"));
|
||||
running = false;
|
||||
} else if (he.getResponseCode() == 403) {
|
||||
LOG.info("Playlist for {} not accessible [HTTP 403]. Stopping now", Optional.ofNullable(model).map(Model::getName).orElse("n/a"));
|
||||
LOG.info("Playlist for {} not accessible [HTTP 403]. Stopping now", ofNullable(model).map(Model::getName).orElse("n/a"));
|
||||
running = false;
|
||||
} else {
|
||||
throw he;
|
||||
|
@ -378,9 +388,23 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
|
|||
@Override
|
||||
synchronized void internalStop() {
|
||||
running = false;
|
||||
|
||||
try {
|
||||
downloadQueue.clear();
|
||||
downloadThreadPool.shutdownNow();
|
||||
LOG.debug("Waiting for segment download thread pool to terminate for model {}", getModel());
|
||||
downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS);
|
||||
LOG.debug("Segment download thread pool terminated for model {}", getModel());
|
||||
for (Future<?> future : downloads) {
|
||||
future.cancel(true);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Interrupted while waiting for segment pool to shutdown");
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
if (ffmpegStdIn != null) {
|
||||
try {
|
||||
downloadQueue.clear();
|
||||
ffmpegStdIn.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Couldn't terminate FFmpeg by closing stdin", e);
|
||||
|
@ -389,7 +413,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
|
|||
|
||||
if (ffmpeg != null) {
|
||||
try {
|
||||
boolean waitFor = ffmpeg.waitFor(5, TimeUnit.MINUTES);
|
||||
boolean waitFor = ffmpeg.waitFor(45, TimeUnit.SECONDS);
|
||||
if (!waitFor && ffmpeg.isAlive()) {
|
||||
LOG.info("FFmpeg didn't terminate. Destroying the process with force!");
|
||||
ffmpeg.destroyForcibly();
|
||||
|
@ -419,7 +443,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
|
|||
int maxTries = 3;
|
||||
for (int i = 1; i <= maxTries && running; i++) {
|
||||
Builder builder = new Request.Builder().url(url);
|
||||
addHeaders(builder, Optional.ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentHeaders).orElse(new HashMap<>()));
|
||||
addHeaders(builder, ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentHeaders).orElse(new HashMap<>()));
|
||||
Request request = builder.build();
|
||||
try (Response response = client.execute(request)) {
|
||||
if (response.isSuccessful()) {
|
||||
|
|
Loading…
Reference in New Issue