Make recorder and RecordingManager thread-safe

This commit is contained in:
0xboobface 2019-06-02 16:51:42 +02:00
parent f2cae6a312
commit da486bbf4a
2 changed files with 131 additions and 112 deletions

View File

@ -101,16 +101,31 @@ public class NextGenLocalRecorder implements Recorder {
try { try {
Future<Recording> result = completionService.take(); Future<Recording> result = completionService.take();
Recording recording = result.get(); Recording recording = result.get();
recordingProcesses.remove(recording.getModel()); recordingsLock.lock();
try {
recordingProcesses.remove(recording.getModel());
} finally {
recordingsLock.unlock();
}
if (recording.getStatus() == State.WAITING) { if (recording.getStatus() == State.WAITING) {
LOG.debug("Download finished for {} -> Starting post-processing", recording.getModel().getName()); LOG.debug("Download finished for {} -> Starting post-processing", recording.getModel().getName());
ppPool.submit(() -> { ppPool.submit(() -> {
setRecordingStatus(recording, State.POST_PROCESSING); try {
recordingManager.saveRecording(recording); setRecordingStatus(recording, State.POST_PROCESSING);
recording.postprocess(); recordingManager.saveRecording(recording);
setRecordingStatus(recording, State.FINISHED); recording.postprocess();
recordingManager.saveRecording(recording); setRecordingStatus(recording, State.FINISHED);
return recording; recordingManager.saveRecording(recording);
deleteIfTooShort(recording);
} catch (Exception e) {
LOG.error("Error while post-processing recording {}", recording, e);
recording.setStatus(State.FAILED);
try {
recordingManager.saveRecording(recording);
} catch (IOException e1) {
LOG.error("Couldn't update recording state for recording {}", recording, e1);
}
}
}); });
// check, if we have to restart the recording // check, if we have to restart the recording
@ -118,19 +133,12 @@ public class NextGenLocalRecorder implements Recorder {
tryRestartRecording(model); tryRestartRecording(model);
} else { } else {
if(recording.getStatus() != State.DELETED) { if(recording.getStatus() != State.DELETED) {
recordingsLock.lock(); delete(recording);
try {
recordingManager.delete(recording);
} catch (IOException e) {
LOG.error("Couldn't delete recording {}", recording, e);
} finally {
recordingsLock.unlock();
}
} }
setRecordingStatus(recording, State.FAILED); setRecordingStatus(recording, State.FAILED);
} }
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException | InvalidKeyException | NoSuchAlgorithmException | IllegalStateException | IOException e) {
e.printStackTrace(); LOG.error("Error while completing recording", e);
} }
} }
}); });
@ -231,7 +239,7 @@ public class NextGenLocalRecorder implements Recorder {
Recording rec = new Recording(); Recording rec = new Recording();
rec.setDownload(download); rec.setDownload(download);
rec.setPath(download.getPath(model)); rec.setPath(download.getPath(model).replaceAll("\\\\", "/"));
rec.setModel(model); rec.setModel(model);
rec.setStartDate(Instant.ofEpochMilli(System.currentTimeMillis())); rec.setStartDate(Instant.ofEpochMilli(System.currentTimeMillis()));
recordingProcesses.put(model, rec); recordingProcesses.put(model, rec);
@ -241,7 +249,7 @@ public class NextGenLocalRecorder implements Recorder {
setRecordingStatus(rec, State.RECORDING); setRecordingStatus(rec, State.RECORDING);
recordingManager.saveRecording(rec); recordingManager.saveRecording(rec);
download.start(); download.start();
boolean deleted = deleteIfTooShort(rec); boolean deleted = deleteIfEmpty(rec);
setRecordingStatus(rec, deleted ? State.DELETED : State.WAITING); setRecordingStatus(rec, deleted ? State.DELETED : State.WAITING);
recordingManager.saveRecording(rec); recordingManager.saveRecording(rec);
} catch (IOException e) { } catch (IOException e) {
@ -254,15 +262,19 @@ public class NextGenLocalRecorder implements Recorder {
} }
} }
private boolean deleteIfTooShort(Recording rec) throws IOException, ParseException, PlaylistException { private boolean deleteIfEmpty(Recording rec) throws IOException, ParseException, PlaylistException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
// if the size is 0, we don't need to go ahead and check the length
rec.refresh(); rec.refresh();
long sizeInByte = rec.getSizeInByte(); long sizeInByte = rec.getSizeInByte();
if (sizeInByte == 0) { if (sizeInByte == 0) {
recordingManager.delete(rec); LOG.info("Deleting empty recording {}", rec);
delete(rec);
return true; return true;
} else {
return false;
} }
}
private boolean deleteIfTooShort(Recording rec) throws IOException, ParseException, PlaylistException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
Duration minimumLengthInSeconds = Duration.ofSeconds(Config.getInstance().getSettings().minimumLengthInSeconds); Duration minimumLengthInSeconds = Duration.ofSeconds(Config.getInstance().getSettings().minimumLengthInSeconds);
if (minimumLengthInSeconds.getSeconds() <= 0) { if (minimumLengthInSeconds.getSeconds() <= 0) {
return false; return false;
@ -270,7 +282,8 @@ public class NextGenLocalRecorder implements Recorder {
Duration recordingLength = rec.getLength(); Duration recordingLength = rec.getLength();
if (recordingLength.compareTo(minimumLengthInSeconds) < 0) { if (recordingLength.compareTo(minimumLengthInSeconds) < 0) {
recordingManager.delete(rec); LOG.info("Deleting too short recording {} [{} < {}]", rec, recordingLength, minimumLengthInSeconds);
delete(rec);
return true; return true;
} }
@ -293,9 +306,14 @@ public class NextGenLocalRecorder implements Recorder {
modelLock.unlock(); modelLock.unlock();
} }
if (recordingProcesses.containsKey(model)) { recordingsLock.lock();
Recording recording = recordingProcesses.get(model); try {
recording.getDownload().stop(); if (recordingProcesses.containsKey(model)) {
Recording recording = recordingProcesses.get(model);
recording.getDownload().stop();
}
} finally {
recordingsLock.unlock();
} }
} }
@ -306,9 +324,14 @@ public class NextGenLocalRecorder implements Recorder {
models.get(index).setStreamUrlIndex(model.getStreamUrlIndex()); models.get(index).setStreamUrlIndex(model.getStreamUrlIndex());
config.save(); config.save();
LOG.debug("Switching stream source to index {} for model {}", model.getStreamUrlIndex(), model.getName()); LOG.debug("Switching stream source to index {} for model {}", model.getStreamUrlIndex(), model.getName());
Recording recording = recordingProcesses.get(model); recordingsLock.lock();
if (recording != null) { try {
stopRecordingProcess(model); Recording recording = recordingProcesses.get(model);
if (recording != null) {
stopRecordingProcess(model);
}
} finally {
recordingsLock.unlock();
} }
tryRestartRecording(model); tryRestartRecording(model);
} else { } else {
@ -318,10 +341,15 @@ public class NextGenLocalRecorder implements Recorder {
} }
private void stopRecordingProcess(Model model) { private void stopRecordingProcess(Model model) {
LOG.debug("Stopping recording for {}", model); recordingsLock.lock();
Recording recording = recordingProcesses.get(model); try {
LOG.debug("Stopping download for {}", model); LOG.debug("Stopping recording for {}", model);
recording.getDownload().stop(); Recording recording = recordingProcesses.get(model);
LOG.debug("Stopping download for {}", model);
recording.getDownload().stop();
} finally {
recordingsLock.unlock();
}
} }
private void stopRecordingProcesses() { private void stopRecordingProcesses() {
@ -357,12 +385,7 @@ public class NextGenLocalRecorder implements Recorder {
@Override @Override
public List<Recording> getRecordings() throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { public List<Recording> getRecordings() throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
recordingsLock.lock(); return recordingManager.getAll();
try {
return recordingManager.getAll();
} finally {
recordingsLock.unlock();
}
} }
@Override @Override
@ -377,20 +400,25 @@ public class NextGenLocalRecorder implements Recorder {
recording = false; recording = false;
LOG.debug("Stopping all recording processes"); LOG.debug("Stopping all recording processes");
// make a copy to avoid ConcurrentModificationException recordingsLock.lock();
List<Recording> toStop = new ArrayList<>(recordingProcesses.values()); try {
for (Recording rec : toStop) { // make a copy to avoid ConcurrentModificationException
Optional.ofNullable(rec.getDownload()).ifPresent(Download::stop); List<Recording> toStop = new ArrayList<>(recordingProcesses.values());
} for (Recording rec : toStop) {
Optional.ofNullable(rec.getDownload()).ifPresent(Download::stop);
// wait for post-processing to finish
LOG.info("Waiting for downloads to finish");
while (!recordingProcesses.isEmpty()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.error("Error while waiting for downloads to finish", e);
} }
// wait for post-processing to finish
LOG.info("Waiting for downloads to finish");
while (!recordingProcesses.isEmpty()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.error("Error while waiting for downloads to finish", e);
}
}
} finally {
recordingsLock.unlock();
} }
// shutdown threadpools // shutdown threadpools
@ -426,8 +454,13 @@ public class NextGenLocalRecorder implements Recorder {
modelLock.unlock(); modelLock.unlock();
} }
Recording recording = recordingProcesses.get(model); recordingsLock.lock();
Optional.ofNullable(recording).map(Recording::getDownload).ifPresent(Download::stop); try {
Recording recording = recordingProcesses.get(model);
Optional.ofNullable(recording).map(Recording::getDownload).ifPresent(Download::stop);
} finally {
recordingsLock.unlock();
}
} }
@Override @Override
@ -541,6 +574,7 @@ public class NextGenLocalRecorder implements Recorder {
EventBusHolder.BUS.register(new Object() { EventBusHolder.BUS.register(new Object() {
@Subscribe @Subscribe
public void modelEvent(Event e) { public void modelEvent(Event e) {
recordingsLock.lock();
try { try {
if (e.getType() == MODEL_ONLINE) { if (e.getType() == MODEL_ONLINE) {
ModelIsOnlineEvent evt = (ModelIsOnlineEvent) e; ModelIsOnlineEvent evt = (ModelIsOnlineEvent) e;
@ -551,6 +585,8 @@ public class NextGenLocalRecorder implements Recorder {
} }
} catch (Exception e1) { } catch (Exception e1) {
LOG.error("Error while handling model state changed event {}", e, e1); LOG.error("Error while handling model state changed event {}", e, e1);
} finally {
recordingsLock.unlock();
} }
} }
}); });

View File

@ -9,6 +9,7 @@ import java.nio.file.Files;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -31,7 +32,7 @@ public class RecordingManager {
private Moshi moshi; private Moshi moshi;
private JsonAdapter<Recording> adapter; private JsonAdapter<Recording> adapter;
private List<Recording> recordings = new ArrayList<>(); private List<Recording> recordings = new ArrayList<>();
// private RecordingFileMonitor monitor = new RecordingFileMonitor(this); private ReentrantLock recordingsLock = new ReentrantLock();
public RecordingManager(Config config, List<Site> sites) throws IOException { public RecordingManager(Config config, List<Site> sites) throws IOException {
this.config = config; this.config = config;
@ -42,13 +43,16 @@ public class RecordingManager {
adapter = moshi.adapter(Recording.class).indent(" "); adapter = moshi.adapter(Recording.class).indent(" ");
loadRecordings(); loadRecordings();
// startMonitoring();
} }
public void add(Recording rec) throws UnsupportedEncodingException, IOException { public void add(Recording rec) throws UnsupportedEncodingException, IOException {
saveRecording(rec); saveRecording(rec);
recordings.add(rec); recordingsLock.lock();
// registerFileWatch(rec); try {
recordings.add(rec);
} finally {
recordingsLock.unlock();
}
} }
public void saveRecording(Recording rec) throws UnsupportedEncodingException, IOException { public void saveRecording(Recording rec) throws UnsupportedEncodingException, IOException {
@ -81,35 +85,6 @@ public class RecordingManager {
} }
} }
// private void startMonitoring() {
// for (Recording recording : recordings) {
// registerFileWatch(recording);
// }
// Thread watcher = new Thread(() -> monitor.processEvents());
// watcher.setDaemon(true);
// watcher.setPriority(Thread.MIN_PRIORITY);
// watcher.setName("RecordingFileMonitor");
// watcher.start();
// }
//
// private void registerFileWatch(Recording recording) {
// File rec = recording.getAbsoluteFile();
// if (rec.isDirectory()) {
// monitor.register(rec.toPath());
// } else {
// monitor.register(rec.getParentFile().toPath());
// }
// }
//
// private void removeFileWatch(Recording recording) {
// File rec = recording.getAbsoluteFile();
// if (rec.isDirectory()) {
// monitor.unregister(rec.toPath());
// } else {
// monitor.unregister(rec.getParentFile().toPath());
// }
// }
private boolean recordingExists(Recording recording) { private boolean recordingExists(Recording recording) {
File rec = new File(config.getSettings().recordingsDir, recording.getPath()); File rec = new File(config.getSettings().recordingsDir, recording.getPath());
return rec.exists(); return rec.exists();
@ -122,38 +97,46 @@ public class RecordingManager {
} }
public void delete(Recording recording) throws IOException { public void delete(Recording recording) throws IOException {
int idx = recordings.indexOf(recording); recordingsLock.lock();
recording = recordings.get(idx); try {
int idx = recordings.indexOf(recording);
recording = recordings.get(idx);
recording.setStatus(State.DELETING); recording.setStatus(State.DELETING);
File recordingsDir = new File(config.getSettings().recordingsDir); File recordingsDir = new File(config.getSettings().recordingsDir);
File path = new File(recordingsDir, recording.getPath()); File path = new File(recordingsDir, recording.getPath());
LOG.debug("Deleting {}", path); LOG.debug("Deleting {}", path);
// delete the video files // delete the video files
if (path.isFile()) { if (path.isFile()) {
Files.delete(path.toPath()); Files.delete(path.toPath());
deleteEmptyParents(path.getParentFile()); deleteEmptyParents(path.getParentFile());
} else { } else {
deleteDirectory(path); deleteDirectory(path);
deleteEmptyParents(path); deleteEmptyParents(path);
}
// delete the meta data
Files.deleteIfExists(new File(recording.getMetaDataFile()).toPath());
// remove from data structure
recordings.remove(recording);
recording.setStatus(State.DELETED);
} finally {
recordingsLock.unlock();
} }
// delete the meta data
Files.deleteIfExists(new File(recording.getMetaDataFile()).toPath());
// remove from data structure
recordings.remove(recording);
recording.setStatus(State.DELETED);
// removeFileWatch(recording);
} }
public List<Recording> getAll() { public List<Recording> getAll() {
for (Recording recording : recordings) { recordingsLock.lock();
recording.refresh(); try {
for (Recording recording : recordings) {
recording.refresh();
}
return new ArrayList<>(recordings);
} finally {
recordingsLock.unlock();
} }
return new ArrayList<>(recordings);
} }
private void deleteEmptyParents(File parent) throws IOException { private void deleteEmptyParents(File parent) throws IOException {