diff --git a/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java index da63dd00..08d8a641 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java @@ -1,5 +1,7 @@ package ctbrec.recorder.download.hls; +import static java.util.Optional.*; + import java.io.EOFException; import java.io.File; import java.io.FileOutputStream; @@ -15,12 +17,13 @@ import java.time.ZonedDateTime; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; -import java.util.Optional; import java.util.Queue; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; import org.slf4j.Logger; @@ -57,6 +60,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { private transient OutputStream ffmpegStdIn; protected transient Thread ffmpegThread; private transient Object ffmpegStartMonitor = new Object(); + private Queue> downloads = new LinkedList<>(); public MergedFfmpegHlsDownload(HttpClient client) { super(client); @@ -105,6 +109,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { LOG.debug("Starting to download segments"); downloadSegments(segments, true); ffmpegThread.join(); + LOG.debug("FFmpeg thread terminated"); } } catch (ParseException e) { throw new IOException("Couldn't parse stream information", e); @@ -179,7 +184,8 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { } } }); - ffmpegThread.setName("FFmpeg"); + String name = "FFmpeg " + ofNullable(model).map(Model::getName).orElse("").trim(); + ffmpegThread.setName(name); ffmpegThread.start(); } @@ -254,7 +260,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { int skip = nextSegment - lsp.seq; // add segments to download threadpool - Queue> downloads = new LinkedList<>(); + downloads.clear(); if (downloadQueue.remainingCapacity() == 0) { LOG.warn("Download to slow for this stream. Download queue is full. Skipping segment"); } else { @@ -278,11 +284,15 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { private void writeFinishedSegments(Queue> downloads) throws ExecutionException, IOException { for (Future downloadFuture : downloads) { try { - byte[] segmentData = downloadFuture.get(); + byte[] segmentData = downloadFuture.get(30, TimeUnit.SECONDS); writeSegment(segmentData); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.error("Error while downloading segment", e); + } catch (TimeoutException e) { + LOG.info("Segment download took too long for {}. Not waiting for it any longer", getModel()); + } catch (CancellationException e) { + LOG.info("Segment download cancelled"); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof MissingSegmentException) { @@ -290,7 +300,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { LOG.debug("Error while downloading segment, because model {} is offline. Stopping now", model.getName()); running = false; } else { - LOG.debug("Segment not available, but model {} still online. Going on", Optional.ofNullable(model).map(Model::getName).orElse("n/a")); + LOG.debug("Segment not available, but model {} still online. Going on", ofNullable(model).map(Model::getName).orElse("n/a")); } } else if (cause instanceof HttpException) { HttpException he = (HttpException) cause; @@ -299,10 +309,10 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { running = false; } else { if (he.getResponseCode() == 404) { - LOG.info("Playlist for {} not found [HTTP 404]. Stopping now", Optional.ofNullable(model).map(Model::getName).orElse("n/a")); + LOG.info("Playlist for {} not found [HTTP 404]. Stopping now", ofNullable(model).map(Model::getName).orElse("n/a")); running = false; } else if (he.getResponseCode() == 403) { - LOG.info("Playlist for {} not accessible [HTTP 403]. Stopping now", Optional.ofNullable(model).map(Model::getName).orElse("n/a")); + LOG.info("Playlist for {} not accessible [HTTP 403]. Stopping now", ofNullable(model).map(Model::getName).orElse("n/a")); running = false; } else { throw he; @@ -378,9 +388,23 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { @Override synchronized void internalStop() { running = false; + + try { + downloadQueue.clear(); + downloadThreadPool.shutdownNow(); + LOG.debug("Waiting for segment download thread pool to terminate for model {}", getModel()); + downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS); + LOG.debug("Segment download thread pool terminated for model {}", getModel()); + for (Future future : downloads) { + future.cancel(true); + } + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for segment pool to shutdown"); + Thread.currentThread().interrupt(); + } + if (ffmpegStdIn != null) { try { - downloadQueue.clear(); ffmpegStdIn.close(); } catch (IOException e) { LOG.error("Couldn't terminate FFmpeg by closing stdin", e); @@ -389,7 +413,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { if (ffmpeg != null) { try { - boolean waitFor = ffmpeg.waitFor(5, TimeUnit.MINUTES); + boolean waitFor = ffmpeg.waitFor(45, TimeUnit.SECONDS); if (!waitFor && ffmpeg.isAlive()) { LOG.info("FFmpeg didn't terminate. Destroying the process with force!"); ffmpeg.destroyForcibly(); @@ -419,7 +443,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { int maxTries = 3; for (int i = 1; i <= maxTries && running; i++) { Builder builder = new Request.Builder().url(url); - addHeaders(builder, Optional.ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentHeaders).orElse(new HashMap<>())); + addHeaders(builder, ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentHeaders).orElse(new HashMap<>())); Request request = builder.build(); try (Response response = client.execute(request)) { if (response.isSuccessful()) {