Shut down all recordings simultaneously

This commit is contained in:
0xb00bface 2020-08-15 16:07:16 +02:00
parent 6cfdb59c96
commit 5c0d841474
2 changed files with 18 additions and 5 deletions

View File

@ -25,6 +25,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -505,8 +506,20 @@ public class NextGenLocalRecorder implements Recorder {
try { try {
// make a copy to avoid ConcurrentModificationException // make a copy to avoid ConcurrentModificationException
List<Recording> toStop = new ArrayList<>(recordingProcesses.values()); List<Recording> toStop = new ArrayList<>(recordingProcesses.values());
if (!toStop.isEmpty()) {
ExecutorService shutdownPool = Executors.newFixedThreadPool(toStop.size());
List<Future<?>> shutdownFutures = new ArrayList<>(toStop.size());
for (Recording rec : toStop) { for (Recording rec : toStop) {
Optional.ofNullable(rec.getDownload()).ifPresent(Download::stop); Optional.ofNullable(rec.getDownload()).ifPresent(d -> {
shutdownFutures.add(shutdownPool.submit(() -> d.stop()));
});
}
shutdownPool.shutdown();
try {
shutdownPool.awaitTermination(10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} }
} finally { } finally {
recorderLock.unlock(); recorderLock.unlock();

View File

@ -391,13 +391,13 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
try { try {
downloadQueue.clear(); downloadQueue.clear();
for (Future<?> future : downloads) {
future.cancel(true);
}
downloadThreadPool.shutdownNow(); downloadThreadPool.shutdownNow();
LOG.debug("Waiting for segment download thread pool to terminate for model {}", getModel()); LOG.debug("Waiting for segment download thread pool to terminate for model {}", getModel());
downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS); downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS);
LOG.debug("Segment download thread pool terminated for model {}", getModel()); LOG.debug("Segment download thread pool terminated for model {}", getModel());
for (Future<?> future : downloads) {
future.cancel(true);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.error("Interrupted while waiting for segment pool to shutdown"); LOG.error("Interrupted while waiting for segment pool to shutdown");
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();