diff --git a/common/src/main/java/ctbrec/Recording.java b/common/src/main/java/ctbrec/Recording.java index d279b114..6db923c2 100644 --- a/common/src/main/java/ctbrec/Recording.java +++ b/common/src/main/java/ctbrec/Recording.java @@ -11,6 +11,9 @@ import lombok.extern.slf4j.Slf4j; import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.WatchEvent; import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; @@ -22,6 +25,7 @@ import java.util.Set; import java.util.concurrent.Future; import static ctbrec.Recording.State.*; +import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; @Slf4j public class Recording implements Serializable { @@ -43,11 +47,8 @@ public class Recording implements Serializable { private File absoluteFile = null; private File postProcessedFile = null; private int selectedResolution = -1; + private long lastSizeUpdate = 0; - /** - * Signals, if the recording has been changed and it has to be refreshed - */ - private boolean dirtyFlag = true; public enum State { RECORDING("recording"), @@ -272,7 +273,7 @@ public class Recording implements Serializable { } } - private Set<File> getAllRecordingFiles() throws IOException { + public Set<File> getAllRecordingFiles() throws IOException { Set<File> files = new HashSet<>(); if (absoluteFile != null) { files.add(absoluteFile.getCanonicalFile()); @@ -287,9 +288,19 @@ public class Recording implements Serializable { } public void refresh() { - if ((status != FINISHED && status != FAILED) || dirtyFlag) { + long now = System.currentTimeMillis(); + if (now - lastSizeUpdate > 1000) { sizeInByte = getSize(); - dirtyFlag = false; + lastSizeUpdate = now; + } + } + + public void refresh(Path dir, WatchEvent<Path> event) throws IOException { + Path child = dir.resolve(event.context()); + if (event.kind() == ENTRY_CREATE) { + sizeInByte += Files.size(child); + } else { + refresh(); } } @@ -312,10 +323,6 @@ public class Recording implements Serializable { .map(File::new); } - public void setDirtyFlag(boolean dirtyFlag) { - this.dirtyFlag = dirtyFlag; - } - public Future<RecordingProcess> getCurrentIteration() { return currentIteration; } diff --git a/common/src/main/java/ctbrec/RecordingSizeMonitor.java b/common/src/main/java/ctbrec/RecordingSizeMonitor.java new file mode 100644 index 00000000..62931a88 --- /dev/null +++ b/common/src/main/java/ctbrec/RecordingSizeMonitor.java @@ -0,0 +1,150 @@ +package ctbrec; + +import lombok.extern.slf4j.Slf4j; + +import java.io.File; +import java.io.IOException; +import java.nio.file.*; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.*; + +import static java.nio.file.LinkOption.NOFOLLOW_LINKS; +import static java.nio.file.StandardWatchEventKinds.*; + +@Slf4j +public class RecordingSizeMonitor { + + protected final WatchService service; + protected final Map<WatchKey, Path> keys; + protected final Map<WatchKey, Recording> recordingByKey; + protected final Map<Recording, List<WatchKey>> keysByRecording; + protected final Set<Path> registeredPaths; + + public RecordingSizeMonitor() throws IOException { + this.service = FileSystems.getDefault().newWatchService(); + this.keys = new HashMap<>(); + this.recordingByKey = new HashMap<>(); + this.keysByRecording = new HashMap<>(); + this.registeredPaths = new HashSet<>(); + } + + public void monitor(Recording rec) { + GlobalThreadPool.submit(() -> { + try { + for (File file : rec.getAllRecordingFiles()) { + for (int i = 0; i < 300; i++) { // wait up to 30 seconds for a file to show up + Path path = file.toPath(); + if (Files.exists(path)) { + if (Files.isDirectory(path, NOFOLLOW_LINKS)) { + registerAll(path, rec); + } else if (Files.isRegularFile(path)) { + registerAll(path.getParent(), rec); + } + break; + } else { + Thread.sleep(100); + } + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + log.warn("Couldn't register file monitor for {}", rec, e); + } + }); + } + + private void register(Path path, Recording rec) throws IOException { + if (!registeredPaths.contains(path)) { + WatchKey key = path.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); + keys.put(key, path); + recordingByKey.put(key, rec); + keysByRecording.computeIfAbsent(rec, r -> new ArrayList<>()).add(key); + registeredPaths.add(path); + } + } + + private void registerAll(Path path, Recording rec) throws IOException { + Files.walkFileTree(path, new SimpleFileVisitor<>() { + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { + register(dir, rec); + return FileVisitResult.CONTINUE; + } + }); + } + + public void uninstall(Recording rec) { + List<WatchKey> keysForRecording = this.keysByRecording.getOrDefault(rec, Collections.emptyList()); + keysForRecording.forEach(key -> { + Path path = keys.get(key); + key.cancel(); + keys.remove(key); + recordingByKey.remove(key); + registeredPaths.remove(path); + }); + this.keysByRecording.remove(rec); + } + + public void processEvents() { + while (!Thread.interrupted()) { + WatchKey key = null; + try { + try { + key = service.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + continue; + } + + Path dir = keys.get(key); + if (dir == null) { + log.error("WatchKey not recognized"); + continue; + } + + for (WatchEvent<?> event : key.pollEvents()) { + WatchEvent.Kind<?> kind = event.kind(); + Recording r = recordingByKey.get(key); + if (r == null) { + continue; + } + + // events might have been lost, we just refresh the recording to be sure + if (kind == OVERFLOW) { + r.refresh(); + continue; + } + + // Context for directory entry event is the file name of entry + WatchEvent<Path> ev = cast(event); + Path name = ev.context(); + Path child = dir.resolve(name); + + try { + r.refresh(dir, ev); + } catch (IOException e) { + log.error("Error while updating recording size of {}", r, e); + } + + // if directory is created, and watching recursively, then + // register it and its sub-directories + if (kind == ENTRY_CREATE && Files.isDirectory(child, NOFOLLOW_LINKS)) { + registerAll(child, r); + } + } + } catch (Exception e) { + log.error("Error while processing file system events", e); + } finally { + if (key != null) { + key.reset(); + } + } + } + } + + @SuppressWarnings("unchecked") + static <T> WatchEvent<T> cast(WatchEvent<?> event) { + return (WatchEvent<T>) event; + } +} diff --git a/common/src/main/java/ctbrec/recorder/RecordingManager.java b/common/src/main/java/ctbrec/recorder/RecordingManager.java index 08e2ae59..da7d5aea 100644 --- a/common/src/main/java/ctbrec/recorder/RecordingManager.java +++ b/common/src/main/java/ctbrec/recorder/RecordingManager.java @@ -6,6 +6,7 @@ import ctbrec.Config; import ctbrec.Model; import ctbrec.Recording; import ctbrec.Recording.State; +import ctbrec.RecordingSizeMonitor; import ctbrec.io.FileJsonAdapter; import ctbrec.io.InstantJsonAdapter; import ctbrec.io.ModelJsonAdapter; @@ -16,9 +17,9 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.nio.file.Files; -import java.time.Duration; import java.time.Instant; import java.util.*; +import java.util.concurrent.Executors; import java.util.concurrent.locks.ReentrantLock; import static ctbrec.Recording.State.*; @@ -34,6 +35,8 @@ public class RecordingManager { private final List<Recording> recordings = new ArrayList<>(); private final ReentrantLock recordingsLock = new ReentrantLock(); + private final RecordingSizeMonitor sizeMonitor; + public RecordingManager(Config config, List<Site> sites) throws IOException { this.config = config; Moshi moshi = new Moshi.Builder() @@ -43,6 +46,9 @@ public class RecordingManager { .build(); adapter = moshi.adapter(Recording.class).indent(" "); + sizeMonitor = new RecordingSizeMonitor(); + Executors.newSingleThreadExecutor().submit(sizeMonitor::processEvents); + loadRecordings(); } @@ -58,6 +64,7 @@ public class RecordingManager { } finally { recordingsLock.unlock(); } + sizeMonitor.monitor(rec); } public void saveRecording(Recording rec) throws IOException { @@ -97,6 +104,7 @@ public class RecordingManager { } if (recordingExists(recording)) { recordings.add(recording); + sizeMonitor.monitor(recording); } else { LOG.info("Recording {} does not exist anymore -> ignoring recording", recording); } @@ -169,6 +177,7 @@ public class RecordingManager { } finally { recordingsLock.unlock(); } + sizeMonitor.uninstall(recording); } /** @@ -189,18 +198,12 @@ public class RecordingManager { } finally { recordingsLock.unlock(); } + sizeMonitor.uninstall(recording); } public List<Recording> getAll() { recordingsLock.lock(); try { - Instant start = Instant.now(); - for (Recording recording : recordings) { - recording.refresh(); - } - Instant finish = Instant.now(); - long timeElapsed = Duration.between(start, finish).toMillis(); - LOG.trace("Recordings list refreshed in {} ms", timeElapsed); return new ArrayList<>(recordings); } finally { recordingsLock.unlock(); diff --git a/common/src/main/java/ctbrec/recorder/SimplifiedLocalRecorder.java b/common/src/main/java/ctbrec/recorder/SimplifiedLocalRecorder.java index 846d5aa1..a00b612c 100644 --- a/common/src/main/java/ctbrec/recorder/SimplifiedLocalRecorder.java +++ b/common/src/main/java/ctbrec/recorder/SimplifiedLocalRecorder.java @@ -233,7 +233,6 @@ public class SimplifiedLocalRecorder implements Recorder { setRecordingStatus(recording, State.POST_PROCESSING); recording.getRecordingProcess().stop(); recording.getRecordingProcess().awaitEnd(); - recording.setDirtyFlag(true); recording.getRecordingProcess().finalizeDownload(); recording.refresh(); recordingManager.saveRecording(recording);