forked from j62/ctbrec
1
0
Fork 0

Combine recordings and model locks to one lock

Using 2 locks caused deadlocks.
This commit is contained in:
0xboobface 2019-12-27 23:35:49 +01:00
parent fc42b21a70
commit d742756413
1 changed files with 35 additions and 51 deletions

View File

@ -57,8 +57,7 @@ public class NextGenLocalRecorder implements Recorder {
private List<Model> models = Collections.synchronizedList(new ArrayList<>()); private List<Model> models = Collections.synchronizedList(new ArrayList<>());
private Config config; private Config config;
private volatile boolean recording = true; private volatile boolean recording = true;
private ReentrantLock modelLock = new ReentrantLock(); private ReentrantLock recorderLock = new ReentrantLock();
private ReentrantLock recordingsLock = new ReentrantLock();
private RecorderHttpClient client = new RecorderHttpClient(); private RecorderHttpClient client = new RecorderHttpClient();
private long lastSpaceMessage = 0; private long lastSpaceMessage = 0;
private Map<Model, Recording> recordingProcesses = Collections.synchronizedMap(new HashMap<>()); private Map<Model, Recording> recordingProcesses = Collections.synchronizedMap(new HashMap<>());
@ -116,11 +115,11 @@ public class NextGenLocalRecorder implements Recorder {
try { try {
Future<Recording> result = completionService.take(); Future<Recording> result = completionService.take();
Recording rec = result.get(); Recording rec = result.get();
recordingsLock.lock(); recorderLock.lock();
try { try {
recordingProcesses.remove(rec.getModel()); recordingProcesses.remove(rec.getModel());
} finally { } finally {
recordingsLock.unlock(); recorderLock.unlock();
} }
if (rec.getStatus() == State.WAITING) { if (rec.getStatus() == State.WAITING) {
LOG.info("Download finished for {} -> Starting post-processing", rec.getModel().getName()); LOG.info("Download finished for {} -> Starting post-processing", rec.getModel().getName());
@ -181,7 +180,7 @@ public class NextGenLocalRecorder implements Recorder {
public void startRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException { public void startRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
if (!models.contains(model)) { if (!models.contains(model)) {
LOG.info("Model {} added", model); LOG.info("Model {} added", model);
modelLock.lock(); recorderLock.lock();
try { try {
models.add(model); models.add(model);
config.getSettings().models.add(model); config.getSettings().models.add(model);
@ -189,7 +188,7 @@ public class NextGenLocalRecorder implements Recorder {
} catch (IOException e) { } catch (IOException e) {
LOG.error("Couldn't save config", e); LOG.error("Couldn't save config", e);
} finally { } finally {
modelLock.unlock(); recorderLock.unlock();
} }
// try to start the recording immediately // try to start the recording immediately
@ -206,7 +205,7 @@ public class NextGenLocalRecorder implements Recorder {
} }
private void startRecordingProcess(Model model) throws IOException { private void startRecordingProcess(Model model) throws IOException {
recordingsLock.lock(); recorderLock.lock();
try { try {
if (!recording) { if (!recording) {
// recorder is not in recording mode // recorder is not in recording mode
@ -223,15 +222,10 @@ public class NextGenLocalRecorder implements Recorder {
return; return;
} }
modelLock.lock();
try {
if (!models.contains(model)) { if (!models.contains(model)) {
LOG.info("Model {} has been removed. Restarting of recording cancelled.", model); LOG.info("Model {} has been removed. Restarting of recording cancelled.", model);
return; return;
} }
} finally {
modelLock.unlock();
}
if (!enoughSpaceForRecording()) { if (!enoughSpaceForRecording()) {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
@ -275,7 +269,7 @@ public class NextGenLocalRecorder implements Recorder {
return rec; return rec;
}); });
} finally { } finally {
recordingsLock.unlock(); recorderLock.unlock();
} }
} }
@ -309,7 +303,7 @@ public class NextGenLocalRecorder implements Recorder {
@Override @Override
public void stopRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException { public void stopRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
modelLock.lock(); recorderLock.lock();
try { try {
if (models.contains(model)) { if (models.contains(model)) {
models.remove(model); models.remove(model);
@ -319,18 +313,13 @@ public class NextGenLocalRecorder implements Recorder {
} else { } else {
throw new NoSuchElementException("Model " + model.getName() + " [" + model.getUrl() + "] not found in list of recorded models"); throw new NoSuchElementException("Model " + model.getName() + " [" + model.getUrl() + "] not found in list of recorded models");
} }
} finally {
modelLock.unlock();
}
recordingsLock.lock();
try {
if (recordingProcesses.containsKey(model)) { if (recordingProcesses.containsKey(model)) {
Recording rec = recordingProcesses.get(model); Recording rec = recordingProcesses.get(model);
rec.getDownload().stop(); rec.getDownload().stop();
} }
} finally { } finally {
recordingsLock.unlock(); recorderLock.unlock();
} }
} }
@ -341,14 +330,14 @@ public class NextGenLocalRecorder implements Recorder {
models.get(index).setStreamUrlIndex(model.getStreamUrlIndex()); models.get(index).setStreamUrlIndex(model.getStreamUrlIndex());
config.save(); config.save();
LOG.debug("Switching stream source to index {} for model {}", model.getStreamUrlIndex(), model.getName()); LOG.debug("Switching stream source to index {} for model {}", model.getStreamUrlIndex(), model.getName());
recordingsLock.lock(); recorderLock.lock();
try { try {
Recording rec = recordingProcesses.get(model); Recording rec = recordingProcesses.get(model);
if (rec != null) { if (rec != null) {
stopRecordingProcess(model); stopRecordingProcess(model);
} }
} finally { } finally {
recordingsLock.unlock(); recorderLock.unlock();
} }
tryRestartRecording(model); tryRestartRecording(model);
} else { } else {
@ -357,45 +346,45 @@ public class NextGenLocalRecorder implements Recorder {
} }
private void stopRecordingProcess(Model model) { private void stopRecordingProcess(Model model) {
recordingsLock.lock(); recorderLock.lock();
try { try {
LOG.debug("Stopping recording for {}", model); LOG.debug("Stopping recording for {}", model);
Recording rec = recordingProcesses.get(model); Recording rec = recordingProcesses.get(model);
LOG.debug("Stopping download for {}", model); LOG.debug("Stopping download for {}", model);
rec.getDownload().stop(); rec.getDownload().stop();
} finally { } finally {
recordingsLock.unlock(); recorderLock.unlock();
} }
} }
private void stopRecordingProcesses() { private void stopRecordingProcesses() {
recordingsLock.lock(); recorderLock.lock();
try { try {
for (Recording rec : recordingProcesses.values()) { for (Recording rec : recordingProcesses.values()) {
rec.getDownload().stop(); rec.getDownload().stop();
} }
} finally { } finally {
recordingsLock.unlock(); recorderLock.unlock();
} }
} }
@Override @Override
public boolean isTracked(Model model) { public boolean isTracked(Model model) {
modelLock.lock(); recorderLock.lock();
try { try {
return models.contains(model); return models.contains(model);
} finally { } finally {
modelLock.unlock(); recorderLock.unlock();
} }
} }
@Override @Override
public List<Model> getModels() { public List<Model> getModels() {
modelLock.lock(); recorderLock.lock();
try { try {
return new ArrayList<>(models); return new ArrayList<>(models);
} finally { } finally {
modelLock.unlock(); recorderLock.unlock();
} }
} }
@ -416,7 +405,7 @@ public class NextGenLocalRecorder implements Recorder {
recording = false; recording = false;
LOG.debug("Stopping all recording processes"); LOG.debug("Stopping all recording processes");
recordingsLock.lock(); recorderLock.lock();
try { try {
// make a copy to avoid ConcurrentModificationException // make a copy to avoid ConcurrentModificationException
List<Recording> toStop = new ArrayList<>(recordingProcesses.values()); List<Recording> toStop = new ArrayList<>(recordingProcesses.values());
@ -424,7 +413,7 @@ public class NextGenLocalRecorder implements Recorder {
Optional.ofNullable(rec.getDownload()).ifPresent(Download::stop); Optional.ofNullable(rec.getDownload()).ifPresent(Download::stop);
} }
} finally { } finally {
recordingsLock.unlock(); recorderLock.unlock();
} }
// wait for post-processing to finish // wait for post-processing to finish
@ -457,7 +446,7 @@ public class NextGenLocalRecorder implements Recorder {
@Override @Override
public void suspendRecording(Model model) { public void suspendRecording(Model model) {
modelLock.lock(); recorderLock.lock();
try { try {
if (models.contains(model)) { if (models.contains(model)) {
int index = models.indexOf(model); int index = models.indexOf(model);
@ -468,24 +457,19 @@ public class NextGenLocalRecorder implements Recorder {
LOG.warn("Couldn't suspend model {}. Not found in list", model.getName()); LOG.warn("Couldn't suspend model {}. Not found in list", model.getName());
return; return;
} }
Recording rec = recordingProcesses.get(model);
Optional.ofNullable(rec).map(Recording::getDownload).ifPresent(Download::stop);
} catch (IOException e) { } catch (IOException e) {
LOG.error("Couldn't save config", e); LOG.error("Couldn't save config", e);
} finally { } finally {
modelLock.unlock(); recorderLock.unlock();
}
recordingsLock.lock();
try {
Recording rec = recordingProcesses.get(model);
Optional.ofNullable(rec).map(Recording::getDownload).ifPresent(Download::stop);
} finally {
recordingsLock.unlock();
} }
} }
@Override @Override
public void resumeRecording(Model model) throws IOException { public void resumeRecording(Model model) throws IOException {
modelLock.lock(); recorderLock.lock();
try { try {
if (models.contains(model)) { if (models.contains(model)) {
int index = models.indexOf(model); int index = models.indexOf(model);
@ -505,13 +489,13 @@ public class NextGenLocalRecorder implements Recorder {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
LOG.error("Couldn't check, if model {} is online", model.getName()); LOG.error("Couldn't check, if model {} is online", model.getName());
} finally { } finally {
modelLock.unlock(); recorderLock.unlock();
} }
} }
@Override @Override
public boolean isSuspended(Model model) { public boolean isSuspended(Model model) {
modelLock.lock(); recorderLock.lock();
try { try {
int index = models.indexOf(model); int index = models.indexOf(model);
if (index >= 0) { if (index >= 0) {
@ -521,7 +505,7 @@ public class NextGenLocalRecorder implements Recorder {
return false; return false;
} }
} finally { } finally {
modelLock.unlock(); recorderLock.unlock();
} }
} }
@ -599,7 +583,7 @@ public class NextGenLocalRecorder implements Recorder {
EventBusHolder.BUS.register(new Object() { EventBusHolder.BUS.register(new Object() {
@Subscribe @Subscribe
public void modelEvent(Event e) { public void modelEvent(Event e) {
recordingsLock.lock(); recorderLock.lock();
try { try {
if (e.getType() == MODEL_ONLINE) { if (e.getType() == MODEL_ONLINE) {
ModelIsOnlineEvent evt = (ModelIsOnlineEvent) e; ModelIsOnlineEvent evt = (ModelIsOnlineEvent) e;
@ -611,7 +595,7 @@ public class NextGenLocalRecorder implements Recorder {
} catch (Exception e1) { } catch (Exception e1) {
LOG.error("Error while handling model state changed event {}", e, e1); LOG.error("Error while handling model state changed event {}", e, e1);
} finally { } finally {
recordingsLock.unlock(); recorderLock.unlock();
} }
} }
}); });