forked from j62/ctbrec
1
0
Fork 0

Code cleanup

This commit is contained in:
0xb00bface 2021-10-01 18:04:15 +02:00
parent 23666b8079
commit 24cad6e124
1 changed files with 26 additions and 24 deletions

View File

@ -65,28 +65,25 @@ public class NextGenLocalRecorder implements Recorder {
private static final Logger LOG = LoggerFactory.getLogger(NextGenLocalRecorder.class); private static final Logger LOG = LoggerFactory.getLogger(NextGenLocalRecorder.class);
public static final boolean IGNORE_CACHE = true; public static final boolean IGNORE_CACHE = true;
private List<Model> models = Collections.synchronizedList(new ArrayList<>()); private final List<Model> models = Collections.synchronizedList(new ArrayList<>());
private Config config; private final Config config;
private volatile boolean recording = true; private volatile boolean recording = true;
private ReentrantLock recorderLock = new ReentrantLock(); private final ReentrantLock recorderLock = new ReentrantLock();
private ReentrantLock modelGroupLock = new ReentrantLock(); private final ReentrantLock modelGroupLock = new ReentrantLock();
private RecorderHttpClient client; private final RecorderHttpClient client;
private Map<Model, Recording> recordingProcesses = Collections.synchronizedMap(new HashMap<>()); private final Map<Model, Recording> recordingProcesses = Collections.synchronizedMap(new HashMap<>());
private RecordingManager recordingManager; private final RecordingManager recordingManager;
private RecordingPreconditions preconditions; private final RecordingPreconditions preconditions;
// thread pools for downloads and post-processing // thread pools for downloads and post-processing
private ScheduledExecutorService downloadPool; private final ScheduledExecutorService downloadPool;
private ThreadPoolScaler threadPoolScaler; private final ThreadPoolScaler threadPoolScaler;
private ExecutorService segmentDownloadPool = new ThreadPoolExecutor(0, 1000, 30L, SECONDS, new SynchronousQueue<>(), createThreadFactory("SegmentDownload", MAX_PRIORITY - 2)); private final ExecutorService segmentDownloadPool = new ThreadPoolExecutor(0, 1000, 30L, SECONDS, new SynchronousQueue<>(), createThreadFactory("SegmentDownload", MAX_PRIORITY - 2));
private ExecutorService downloadCompletionPool = Executors.newFixedThreadPool(1, createThreadFactory("DownloadCompletionWorker", MAX_PRIORITY - 1)); private final ExecutorService downloadCompletionPool = Executors.newFixedThreadPool(1, createThreadFactory("DownloadCompletionWorker", MAX_PRIORITY - 1));
private BlockingQueue<ScheduledFuture<Recording>> downloadFutureQueue = new LinkedBlockingQueue<>(); private final BlockingQueue<ScheduledFuture<Recording>> downloadFutureQueue = new LinkedBlockingQueue<>();
private Map<ScheduledFuture<Recording>, Recording> downloadFutureRecordingMap = Collections.synchronizedMap(new HashMap<>()); private final Map<ScheduledFuture<Recording>, Recording> downloadFutureRecordingMap = Collections.synchronizedMap(new HashMap<>());
private BlockingQueue<Runnable> ppQueue = new LinkedBlockingQueue<>(); private final ThreadPoolExecutor ppPool;
private ThreadPoolExecutor ppPool;
private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
public NextGenLocalRecorder(Config config, List<Site> sites) throws IOException { public NextGenLocalRecorder(Config config, List<Site> sites) throws IOException {
this.config = config; this.config = config;
@ -96,6 +93,7 @@ public class NextGenLocalRecorder implements Recorder {
recordingManager = new RecordingManager(config, sites); recordingManager = new RecordingManager(config, sites);
loadModels(); loadModels();
int ppThreads = config.getSettings().postProcessingThreads; int ppThreads = config.getSettings().postProcessingThreads;
BlockingQueue<Runnable> ppQueue = new LinkedBlockingQueue<>();
ppPool = new ThreadPoolExecutor(ppThreads, ppThreads, 5, TimeUnit.MINUTES, ppQueue, createThreadFactory("PP", MIN_PRIORITY)); ppPool = new ThreadPoolExecutor(ppThreads, ppThreads, 5, TimeUnit.MINUTES, ppQueue, createThreadFactory("PP", MIN_PRIORITY));
recording = true; recording = true;
@ -109,6 +107,7 @@ public class NextGenLocalRecorder implements Recorder {
startCompletionHandler(); startCompletionHandler();
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleWithFixedDelay(() -> { scheduler.scheduleWithFixedDelay(() -> {
try { try {
if (!recordingProcesses.isEmpty() && !enoughSpaceForRecording()) { if (!recordingProcesses.isEmpty() && !enoughSpaceForRecording()) {
@ -123,7 +122,7 @@ public class NextGenLocalRecorder implements Recorder {
} }
private void loadModels() { private void loadModels() {
config.getSettings().models.stream().forEach(m -> { config.getSettings().models.forEach(m -> {
if (m.getSite() != null) { if (m.getSite() != null) {
if (m.getSite().isEnabled()) { if (m.getSite().isEnabled()) {
models.add(m); models.add(m);
@ -414,7 +413,7 @@ public class NextGenLocalRecorder implements Recorder {
} }
@Override @Override
public void switchStreamSource(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException { public void switchStreamSource(Model model) throws IOException {
if (models.contains(model)) { if (models.contains(model)) {
int index = models.indexOf(model); int index = models.indexOf(model);
models.get(index).setStreamUrlIndex(model.getStreamUrlIndex()); models.get(index).setStreamUrlIndex(model.getStreamUrlIndex());
@ -545,7 +544,7 @@ public class NextGenLocalRecorder implements Recorder {
} }
@Override @Override
public void resumeRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException { public void resumeRecording(Model model) throws IOException {
recorderLock.lock(); recorderLock.lock();
try { try {
if (models.contains(model)) { if (models.contains(model)) {
@ -680,6 +679,9 @@ public class NextGenLocalRecorder implements Recorder {
LOG.info("Restarting recording for model {}", model); LOG.info("Restarting recording for model {}", model);
startRecordingProcess(model); startRecordingProcess(model);
} }
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Couldn't restart recording for model {}", model);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Couldn't restart recording for model {}", model); LOG.error("Couldn't restart recording for model {}", model);
} }
@ -736,7 +738,7 @@ public class NextGenLocalRecorder implements Recorder {
} }
@Override @Override
public void priorityChanged(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException { public void priorityChanged(Model model) {
recorderLock.lock(); recorderLock.lock();
try { try {
if (models.contains(model)) { if (models.contains(model)) {
@ -759,7 +761,7 @@ public class NextGenLocalRecorder implements Recorder {
} }
@Override @Override
public void unpin(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException { public void unpin(Recording recording) throws IOException {
recordingManager.unpin(recording); recordingManager.unpin(recording);
} }
@ -769,7 +771,7 @@ public class NextGenLocalRecorder implements Recorder {
} }
@Override @Override
public void stopRecordingAt(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException { public void stopRecordingAt(Model model) throws IOException {
recorderLock.lock(); recorderLock.lock();
try { try {
int index = models.indexOf(model); int index = models.indexOf(model);