Run post-processing steps in runnable in a thread pool

Server and client now create a runnable for post-processing steps,
which run in a thread pool. This ensures, that the steps run linearly so
that RecordingStateChange events make sense, too.
This commit is contained in:
0xboobface 2018-12-10 15:27:56 +01:00
parent 5b15b77014
commit 1d409fa1d4
1 changed files with 77 additions and 113 deletions

View File

@ -24,6 +24,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@ -62,13 +64,14 @@ public class LocalRecorder implements Recorder {
private Map<File, PlaylistGenerator> playlistGenerators = new HashMap<>();
private Config config;
private ProcessMonitor processMonitor;
private PostProcessingTrigger postProcessingTrigger;
private volatile boolean recording = true;
private List<File> deleteInProgress = Collections.synchronizedList(new ArrayList<>());
private RecorderHttpClient client = new RecorderHttpClient();
private ReentrantLock lock = new ReentrantLock();
private long lastSpaceMessage = 0;
private ExecutorService ppThreadPool = Executors.newFixedThreadPool(2);
public LocalRecorder(Config config) {
this.config = config;
config.getSettings().models.stream().forEach((m) -> {
@ -83,12 +86,10 @@ public class LocalRecorder implements Recorder {
processMonitor = new ProcessMonitor();
processMonitor.start();
postProcessingTrigger = new PostProcessingTrigger();
if(Config.isServerMode()) {
postProcessingTrigger.start();
}
registerEventBusListener();
if(Config.isServerMode()) {
processUnfinishedRecordings();
}
LOG.debug("Recorder initialized");
LOG.info("Models to record: {}", models);
@ -206,47 +207,40 @@ public class LocalRecorder implements Recorder {
Download download = recordingProcesses.get(model);
download.stop();
recordingProcesses.remove(model);
if(!Config.isServerMode()) {
postprocess(download);
}
fireRecordingStateChanged(download.getTarget(), FINISHED, model, download.getStartTime());
fireRecordingStateChanged(download.getTarget(), STOPPED, model, download.getStartTime());
ppThreadPool.submit(createPostProcessor(download));
}
private void postprocess(Download download) {
if(!(download instanceof MergedHlsDownload)) {
throw new IllegalArgumentException("Download should be of type MergedHlsDownload");
}
String postProcessing = Config.getInstance().getSettings().postProcessing;
if (postProcessing != null && !postProcessing.isEmpty()) {
new Thread(() -> {
Runtime rt = Runtime.getRuntime();
try {
MergedHlsDownload d = (MergedHlsDownload) download;
String[] args = new String[] {
postProcessing,
d.getTarget().getParentFile().getAbsolutePath(),
d.getTarget().getAbsolutePath(),
d.getModel().getName(),
d.getModel().getSite().getName(),
Long.toString(download.getStartTime().getEpochSecond())
};
LOG.debug("Running {}", Arrays.toString(args));
Process process = rt.exec(args, OS.getEnvironment());
Thread std = new Thread(new StreamRedirectThread(process.getInputStream(), System.out));
std.setName("Process stdout pipe");
std.setDaemon(true);
std.start();
Thread err = new Thread(new StreamRedirectThread(process.getErrorStream(), System.err));
err.setName("Process stderr pipe");
err.setDaemon(true);
err.start();
Runtime rt = Runtime.getRuntime();
try {
String[] args = new String[] {
postProcessing,
download.getTarget().getParentFile().getAbsolutePath(),
download.getTarget().getAbsolutePath(),
download.getModel().getName(),
download.getModel().getSite().getName(),
Long.toString(download.getStartTime().getEpochSecond())
};
LOG.debug("Running {}", Arrays.toString(args));
Process process = rt.exec(args, OS.getEnvironment());
// TODO maybe write these to a separate log file, e.g. recname.ts.pp.log
Thread std = new Thread(new StreamRedirectThread(process.getInputStream(), System.out));
std.setName("Process stdout pipe");
std.setDaemon(true);
std.start();
Thread err = new Thread(new StreamRedirectThread(process.getErrorStream(), System.err));
err.setName("Process stderr pipe");
err.setDaemon(true);
err.start();
process.waitFor();
LOG.debug("Process finished.");
} catch (Exception e) {
LOG.error("Error in process thread", e);
}
}).start();
process.waitFor();
LOG.debug("Process finished.");
} catch (Exception e) {
LOG.error("Error in process thread", e);
}
}
}
@ -306,9 +300,9 @@ public class LocalRecorder implements Recorder {
recording = false;
LOG.debug("Stopping monitor threads");
processMonitor.running = false;
postProcessingTrigger.running = false;
LOG.debug("Stopping all recording processes");
stopRecordingProcesses();
ppThreadPool.shutdown();
client.shutdown();
}
@ -318,12 +312,7 @@ public class LocalRecorder implements Recorder {
for (Model model : models) {
Download recordingProcess = recordingProcesses.get(model);
if (recordingProcess != null) {
try {
recordingProcess.stop();
LOG.debug("Stopped recording for {}", model);
} catch (Exception e) {
LOG.error("Couldn't stop recording for model {}", model, e);
}
stopRecordingProcess(model);
}
}
} finally {
@ -375,21 +364,14 @@ public class LocalRecorder implements Recorder {
for (Iterator<Entry<Model, Download>> iterator = recordingProcesses.entrySet().iterator(); iterator.hasNext();) {
Entry<Model, Download> entry = iterator.next();
Model m = entry.getKey();
Download d = entry.getValue();
if (!d.isAlive()) {
Download download = entry.getValue();
if (!download.isAlive()) {
LOG.debug("Recording terminated for model {}", m.getName());
iterator.remove();
restart.add(m);
if(Config.isServerMode()) {
try {
finishRecording(d.getTarget());
} catch(Exception e) {
LOG.error("Error while finishing recording for model {}", m.getName(), e);
}
} else {
postprocess(d);
}
fireRecordingStateChanged(d.getTarget(), FINISHED, m, d.getStartTime()); // TODO fire all the events
fireRecordingStateChanged(download.getTarget(), STOPPED, m, download.getStartTime());
Runnable pp = createPostProcessor(download);
ppThreadPool.submit(pp);
}
}
for (Model m : restart) {
@ -407,20 +389,6 @@ public class LocalRecorder implements Recorder {
}
}
private void finishRecording(File directory) {
if(Config.isServerMode()) {
Thread t = new Thread() {
@Override
public void run() {
generatePlaylist(directory);
}
};
t.setDaemon(true);
t.setName("Post-Processing " + directory.toString());
t.start();
}
}
private void generatePlaylist(File recDir) {
PlaylistGenerator playlistGenerator = new PlaylistGenerator();
playlistGenerators.put(recDir, playlistGenerator);
@ -445,49 +413,32 @@ public class LocalRecorder implements Recorder {
EventBusHolder.BUS.post(evt);
}
private class PostProcessingTrigger extends Thread {
private volatile boolean running = false;
public PostProcessingTrigger() {
setName("PostProcessingTrigger");
setDaemon(true);
}
@Override
public void run() {
running = true;
while (running) {
try {
List<Recording> recs = getRecordings();
for (Recording rec : recs) {
if (rec.getStatus() == RECORDING) {
boolean recordingProcessFound = false;
File recordingsDir = new File(config.getSettings().recordingsDir);
File recDir = new File(recordingsDir, rec.getPath());
for (Entry<Model, Download> download : recordingProcesses.entrySet()) {
if (download.getValue().getTarget().equals(recDir)) {
recordingProcessFound = true;
}
}
if (!recordingProcessFound) {
if (deleteInProgress.contains(recDir)) {
LOG.debug("{} is being deleted. Not going to start post-processing", recDir);
} else {
finishRecording(recDir);
}
}
/**
* This is called once at start for server mode. When the server is killed, recordings are
* left without playlist. This method creates playlists for them.
*/
private void processUnfinishedRecordings() {
try {
List<Recording> recs = getRecordings();
for (Recording rec : recs) {
if (rec.getStatus() == RECORDING) {
boolean recordingProcessFound = false;
File recordingsDir = new File(config.getSettings().recordingsDir);
File recDir = new File(recordingsDir, rec.getPath());
for (Entry<Model, Download> download : recordingProcesses.entrySet()) {
if (download.getValue().getTarget().equals(recDir)) {
recordingProcessFound = true;
}
}
if (running)
Thread.sleep(10000);
} catch (InterruptedException e) {
LOG.error("Couldn't sleep", e);
} catch (Exception e) {
LOG.error("Unexpected error in playlist trigger thread", e);
if (!recordingProcessFound) {
ppThreadPool.submit(() -> {
generatePlaylist(recDir);
});
}
}
}
LOG.debug(getName() + " terminated");
} catch (Exception e) {
LOG.error("Unexpected error in playlist trigger", e);
}
}
@ -781,4 +732,17 @@ public class LocalRecorder implements Recorder {
return getFreeSpaceBytes() > minimum;
}
}
private Runnable createPostProcessor(Download download) {
return () -> {
LOG.debug("Starting post-processing for {}", download.getTarget());
if(Config.isServerMode()) {
fireRecordingStateChanged(download.getTarget(), GENERATING_PLAYLIST, download.getModel(), download.getStartTime());
generatePlaylist(download.getTarget());
}
fireRecordingStateChanged(download.getTarget(), POST_PROCESSING, download.getModel(), download.getStartTime());
postprocess(download);
fireRecordingStateChanged(download.getTarget(), FINISHED, download.getModel(), download.getStartTime());
};
}
}