diff --git a/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java index 47cbdc3f..7f07e325 100644 --- a/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java +++ b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java @@ -65,28 +65,25 @@ public class NextGenLocalRecorder implements Recorder { private static final Logger LOG = LoggerFactory.getLogger(NextGenLocalRecorder.class); public static final boolean IGNORE_CACHE = true; - private List models = Collections.synchronizedList(new ArrayList<>()); - private Config config; + private final List models = Collections.synchronizedList(new ArrayList<>()); + private final Config config; private volatile boolean recording = true; - private ReentrantLock recorderLock = new ReentrantLock(); - private ReentrantLock modelGroupLock = new ReentrantLock(); - private RecorderHttpClient client; - private Map recordingProcesses = Collections.synchronizedMap(new HashMap<>()); - private RecordingManager recordingManager; - private RecordingPreconditions preconditions; + private final ReentrantLock recorderLock = new ReentrantLock(); + private final ReentrantLock modelGroupLock = new ReentrantLock(); + private final RecorderHttpClient client; + private final Map recordingProcesses = Collections.synchronizedMap(new HashMap<>()); + private final RecordingManager recordingManager; + private final RecordingPreconditions preconditions; // thread pools for downloads and post-processing - private ScheduledExecutorService downloadPool; - private ThreadPoolScaler threadPoolScaler; - private ExecutorService segmentDownloadPool = new ThreadPoolExecutor(0, 1000, 30L, SECONDS, new SynchronousQueue<>(), createThreadFactory("SegmentDownload", MAX_PRIORITY - 2)); - private ExecutorService downloadCompletionPool = Executors.newFixedThreadPool(1, createThreadFactory("DownloadCompletionWorker", MAX_PRIORITY - 1)); - private BlockingQueue> downloadFutureQueue = new LinkedBlockingQueue<>(); - private Map, Recording> downloadFutureRecordingMap = Collections.synchronizedMap(new HashMap<>()); + private final ScheduledExecutorService downloadPool; + private final ThreadPoolScaler threadPoolScaler; + private final ExecutorService segmentDownloadPool = new ThreadPoolExecutor(0, 1000, 30L, SECONDS, new SynchronousQueue<>(), createThreadFactory("SegmentDownload", MAX_PRIORITY - 2)); + private final ExecutorService downloadCompletionPool = Executors.newFixedThreadPool(1, createThreadFactory("DownloadCompletionWorker", MAX_PRIORITY - 1)); + private final BlockingQueue> downloadFutureQueue = new LinkedBlockingQueue<>(); + private final Map, Recording> downloadFutureRecordingMap = Collections.synchronizedMap(new HashMap<>()); - private BlockingQueue ppQueue = new LinkedBlockingQueue<>(); - private ThreadPoolExecutor ppPool; - - private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final ThreadPoolExecutor ppPool; public NextGenLocalRecorder(Config config, List sites) throws IOException { this.config = config; @@ -96,6 +93,7 @@ public class NextGenLocalRecorder implements Recorder { recordingManager = new RecordingManager(config, sites); loadModels(); int ppThreads = config.getSettings().postProcessingThreads; + BlockingQueue ppQueue = new LinkedBlockingQueue<>(); ppPool = new ThreadPoolExecutor(ppThreads, ppThreads, 5, TimeUnit.MINUTES, ppQueue, createThreadFactory("PP", MIN_PRIORITY)); recording = true; @@ -109,6 +107,7 @@ public class NextGenLocalRecorder implements Recorder { startCompletionHandler(); + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); scheduler.scheduleWithFixedDelay(() -> { try { if (!recordingProcesses.isEmpty() && !enoughSpaceForRecording()) { @@ -123,7 +122,7 @@ public class NextGenLocalRecorder implements Recorder { } private void loadModels() { - config.getSettings().models.stream().forEach(m -> { + config.getSettings().models.forEach(m -> { if (m.getSite() != null) { if (m.getSite().isEnabled()) { models.add(m); @@ -414,7 +413,7 @@ public class NextGenLocalRecorder implements Recorder { } @Override - public void switchStreamSource(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException { + public void switchStreamSource(Model model) throws IOException { if (models.contains(model)) { int index = models.indexOf(model); models.get(index).setStreamUrlIndex(model.getStreamUrlIndex()); @@ -545,7 +544,7 @@ public class NextGenLocalRecorder implements Recorder { } @Override - public void resumeRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException { + public void resumeRecording(Model model) throws IOException { recorderLock.lock(); try { if (models.contains(model)) { @@ -680,6 +679,9 @@ public class NextGenLocalRecorder implements Recorder { LOG.info("Restarting recording for model {}", model); startRecordingProcess(model); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Couldn't restart recording for model {}", model); } catch (Exception e) { LOG.error("Couldn't restart recording for model {}", model); } @@ -736,7 +738,7 @@ public class NextGenLocalRecorder implements Recorder { } @Override - public void priorityChanged(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException { + public void priorityChanged(Model model) { recorderLock.lock(); try { if (models.contains(model)) { @@ -759,7 +761,7 @@ public class NextGenLocalRecorder implements Recorder { } @Override - public void unpin(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException { + public void unpin(Recording recording) throws IOException { recordingManager.unpin(recording); } @@ -769,7 +771,7 @@ public class NextGenLocalRecorder implements Recorder { } @Override - public void stopRecordingAt(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException { + public void stopRecordingAt(Model model) throws IOException { recorderLock.lock(); try { int index = models.indexOf(model);