diff --git a/common/src/main/java/ctbrec/recorder/LocalRecorder.java b/common/src/main/java/ctbrec/recorder/LocalRecorder.java index 2cd50878..0732220b 100644 --- a/common/src/main/java/ctbrec/recorder/LocalRecorder.java +++ b/common/src/main/java/ctbrec/recorder/LocalRecorder.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -62,13 +64,14 @@ public class LocalRecorder implements Recorder { private Map playlistGenerators = new HashMap<>(); private Config config; private ProcessMonitor processMonitor; - private PostProcessingTrigger postProcessingTrigger; private volatile boolean recording = true; private List deleteInProgress = Collections.synchronizedList(new ArrayList<>()); private RecorderHttpClient client = new RecorderHttpClient(); private ReentrantLock lock = new ReentrantLock(); private long lastSpaceMessage = 0; + private ExecutorService ppThreadPool = Executors.newFixedThreadPool(2); + public LocalRecorder(Config config) { this.config = config; config.getSettings().models.stream().forEach((m) -> { @@ -83,12 +86,10 @@ public class LocalRecorder implements Recorder { processMonitor = new ProcessMonitor(); processMonitor.start(); - postProcessingTrigger = new PostProcessingTrigger(); - if(Config.isServerMode()) { - postProcessingTrigger.start(); - } - registerEventBusListener(); + if(Config.isServerMode()) { + processUnfinishedRecordings(); + } LOG.debug("Recorder initialized"); LOG.info("Models to record: {}", models); @@ -206,47 +207,40 @@ public class LocalRecorder implements Recorder { Download download = recordingProcesses.get(model); download.stop(); recordingProcesses.remove(model); - if(!Config.isServerMode()) { - postprocess(download); - } - fireRecordingStateChanged(download.getTarget(), FINISHED, model, download.getStartTime()); + fireRecordingStateChanged(download.getTarget(), STOPPED, model, download.getStartTime()); + ppThreadPool.submit(createPostProcessor(download)); } private void postprocess(Download download) { - if(!(download instanceof MergedHlsDownload)) { - throw new IllegalArgumentException("Download should be of type MergedHlsDownload"); - } String postProcessing = Config.getInstance().getSettings().postProcessing; if (postProcessing != null && !postProcessing.isEmpty()) { - new Thread(() -> { - Runtime rt = Runtime.getRuntime(); - try { - MergedHlsDownload d = (MergedHlsDownload) download; - String[] args = new String[] { - postProcessing, - d.getTarget().getParentFile().getAbsolutePath(), - d.getTarget().getAbsolutePath(), - d.getModel().getName(), - d.getModel().getSite().getName(), - Long.toString(download.getStartTime().getEpochSecond()) - }; - LOG.debug("Running {}", Arrays.toString(args)); - Process process = rt.exec(args, OS.getEnvironment()); - Thread std = new Thread(new StreamRedirectThread(process.getInputStream(), System.out)); - std.setName("Process stdout pipe"); - std.setDaemon(true); - std.start(); - Thread err = new Thread(new StreamRedirectThread(process.getErrorStream(), System.err)); - err.setName("Process stderr pipe"); - err.setDaemon(true); - err.start(); + Runtime rt = Runtime.getRuntime(); + try { + String[] args = new String[] { + postProcessing, + download.getTarget().getParentFile().getAbsolutePath(), + download.getTarget().getAbsolutePath(), + download.getModel().getName(), + download.getModel().getSite().getName(), + Long.toString(download.getStartTime().getEpochSecond()) + }; + LOG.debug("Running {}", Arrays.toString(args)); + Process process = rt.exec(args, OS.getEnvironment()); + // TODO maybe write these to a separate log file, e.g. recname.ts.pp.log + Thread std = new Thread(new StreamRedirectThread(process.getInputStream(), System.out)); + std.setName("Process stdout pipe"); + std.setDaemon(true); + std.start(); + Thread err = new Thread(new StreamRedirectThread(process.getErrorStream(), System.err)); + err.setName("Process stderr pipe"); + err.setDaemon(true); + err.start(); - process.waitFor(); - LOG.debug("Process finished."); - } catch (Exception e) { - LOG.error("Error in process thread", e); - } - }).start(); + process.waitFor(); + LOG.debug("Process finished."); + } catch (Exception e) { + LOG.error("Error in process thread", e); + } } } @@ -306,9 +300,9 @@ public class LocalRecorder implements Recorder { recording = false; LOG.debug("Stopping monitor threads"); processMonitor.running = false; - postProcessingTrigger.running = false; LOG.debug("Stopping all recording processes"); stopRecordingProcesses(); + ppThreadPool.shutdown(); client.shutdown(); } @@ -318,12 +312,7 @@ public class LocalRecorder implements Recorder { for (Model model : models) { Download recordingProcess = recordingProcesses.get(model); if (recordingProcess != null) { - try { - recordingProcess.stop(); - LOG.debug("Stopped recording for {}", model); - } catch (Exception e) { - LOG.error("Couldn't stop recording for model {}", model, e); - } + stopRecordingProcess(model); } } } finally { @@ -375,21 +364,14 @@ public class LocalRecorder implements Recorder { for (Iterator> iterator = recordingProcesses.entrySet().iterator(); iterator.hasNext();) { Entry entry = iterator.next(); Model m = entry.getKey(); - Download d = entry.getValue(); - if (!d.isAlive()) { + Download download = entry.getValue(); + if (!download.isAlive()) { LOG.debug("Recording terminated for model {}", m.getName()); iterator.remove(); restart.add(m); - if(Config.isServerMode()) { - try { - finishRecording(d.getTarget()); - } catch(Exception e) { - LOG.error("Error while finishing recording for model {}", m.getName(), e); - } - } else { - postprocess(d); - } - fireRecordingStateChanged(d.getTarget(), FINISHED, m, d.getStartTime()); // TODO fire all the events + fireRecordingStateChanged(download.getTarget(), STOPPED, m, download.getStartTime()); + Runnable pp = createPostProcessor(download); + ppThreadPool.submit(pp); } } for (Model m : restart) { @@ -407,20 +389,6 @@ public class LocalRecorder implements Recorder { } } - private void finishRecording(File directory) { - if(Config.isServerMode()) { - Thread t = new Thread() { - @Override - public void run() { - generatePlaylist(directory); - } - }; - t.setDaemon(true); - t.setName("Post-Processing " + directory.toString()); - t.start(); - } - } - private void generatePlaylist(File recDir) { PlaylistGenerator playlistGenerator = new PlaylistGenerator(); playlistGenerators.put(recDir, playlistGenerator); @@ -445,49 +413,32 @@ public class LocalRecorder implements Recorder { EventBusHolder.BUS.post(evt); } - private class PostProcessingTrigger extends Thread { - private volatile boolean running = false; - - public PostProcessingTrigger() { - setName("PostProcessingTrigger"); - setDaemon(true); - } - - @Override - public void run() { - running = true; - while (running) { - try { - List recs = getRecordings(); - for (Recording rec : recs) { - if (rec.getStatus() == RECORDING) { - boolean recordingProcessFound = false; - File recordingsDir = new File(config.getSettings().recordingsDir); - File recDir = new File(recordingsDir, rec.getPath()); - for (Entry download : recordingProcesses.entrySet()) { - if (download.getValue().getTarget().equals(recDir)) { - recordingProcessFound = true; - } - } - if (!recordingProcessFound) { - if (deleteInProgress.contains(recDir)) { - LOG.debug("{} is being deleted. Not going to start post-processing", recDir); - } else { - finishRecording(recDir); - } - } + /** + * This is called once at start for server mode. When the server is killed, recordings are + * left without playlist. This method creates playlists for them. + */ + private void processUnfinishedRecordings() { + try { + List recs = getRecordings(); + for (Recording rec : recs) { + if (rec.getStatus() == RECORDING) { + boolean recordingProcessFound = false; + File recordingsDir = new File(config.getSettings().recordingsDir); + File recDir = new File(recordingsDir, rec.getPath()); + for (Entry download : recordingProcesses.entrySet()) { + if (download.getValue().getTarget().equals(recDir)) { + recordingProcessFound = true; } } - - if (running) - Thread.sleep(10000); - } catch (InterruptedException e) { - LOG.error("Couldn't sleep", e); - } catch (Exception e) { - LOG.error("Unexpected error in playlist trigger thread", e); + if (!recordingProcessFound) { + ppThreadPool.submit(() -> { + generatePlaylist(recDir); + }); + } } } - LOG.debug(getName() + " terminated"); + } catch (Exception e) { + LOG.error("Unexpected error in playlist trigger", e); } } @@ -781,4 +732,17 @@ public class LocalRecorder implements Recorder { return getFreeSpaceBytes() > minimum; } } + + private Runnable createPostProcessor(Download download) { + return () -> { + LOG.debug("Starting post-processing for {}", download.getTarget()); + if(Config.isServerMode()) { + fireRecordingStateChanged(download.getTarget(), GENERATING_PLAYLIST, download.getModel(), download.getStartTime()); + generatePlaylist(download.getTarget()); + } + fireRecordingStateChanged(download.getTarget(), POST_PROCESSING, download.getModel(), download.getStartTime()); + postprocess(download); + fireRecordingStateChanged(download.getTarget(), FINISHED, download.getModel(), download.getStartTime()); + }; + } }