diff --git a/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java index c7826077..c0c93f57 100644 --- a/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java +++ b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java @@ -25,6 +25,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -505,8 +506,20 @@ public class NextGenLocalRecorder implements Recorder { try { // make a copy to avoid ConcurrentModificationException List toStop = new ArrayList<>(recordingProcesses.values()); - for (Recording rec : toStop) { - Optional.ofNullable(rec.getDownload()).ifPresent(Download::stop); + if (!toStop.isEmpty()) { + ExecutorService shutdownPool = Executors.newFixedThreadPool(toStop.size()); + List> shutdownFutures = new ArrayList<>(toStop.size()); + for (Recording rec : toStop) { + Optional.ofNullable(rec.getDownload()).ifPresent(d -> { + shutdownFutures.add(shutdownPool.submit(() -> d.stop())); + }); + } + shutdownPool.shutdown(); + try { + shutdownPool.awaitTermination(10, TimeUnit.MINUTES); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } } finally { recorderLock.unlock(); diff --git a/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java index 08d8a641..b90028fe 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java @@ -391,13 +391,13 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { try { downloadQueue.clear(); + for (Future future : downloads) { + future.cancel(true); + } downloadThreadPool.shutdownNow(); LOG.debug("Waiting for segment download thread pool to terminate for model {}", getModel()); downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS); LOG.debug("Segment download thread pool terminated for model {}", getModel()); - for (Future future : downloads) { - future.cancel(true); - } } catch (InterruptedException e) { LOG.error("Interrupted while waiting for segment pool to shutdown"); Thread.currentThread().interrupt();