Implement file system monitoring to monitor the size of recordings

This commit is contained in:
0xb00bface 2023-06-17 15:38:08 +02:00
parent 4d978e2ee6
commit 5b688e4cbe
4 changed files with 179 additions and 20 deletions

View File

@ -11,6 +11,9 @@ import lombok.extern.slf4j.Slf4j;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.WatchEvent;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
@ -22,6 +25,7 @@ import java.util.Set;
import java.util.concurrent.Future;
import static ctbrec.Recording.State.*;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
@Slf4j
public class Recording implements Serializable {
@ -43,11 +47,8 @@ public class Recording implements Serializable {
private File absoluteFile = null;
private File postProcessedFile = null;
private int selectedResolution = -1;
private long lastSizeUpdate = 0;
/**
* Signals, if the recording has been changed and it has to be refreshed
*/
private boolean dirtyFlag = true;
public enum State {
RECORDING("recording"),
@ -272,7 +273,7 @@ public class Recording implements Serializable {
}
}
private Set<File> getAllRecordingFiles() throws IOException {
public Set<File> getAllRecordingFiles() throws IOException {
Set<File> files = new HashSet<>();
if (absoluteFile != null) {
files.add(absoluteFile.getCanonicalFile());
@ -287,9 +288,19 @@ public class Recording implements Serializable {
}
public void refresh() {
if ((status != FINISHED && status != FAILED) || dirtyFlag) {
long now = System.currentTimeMillis();
if (now - lastSizeUpdate > 1000) {
sizeInByte = getSize();
dirtyFlag = false;
lastSizeUpdate = now;
}
}
public void refresh(Path dir, WatchEvent<Path> event) throws IOException {
Path child = dir.resolve(event.context());
if (event.kind() == ENTRY_CREATE) {
sizeInByte += Files.size(child);
} else {
refresh();
}
}
@ -312,10 +323,6 @@ public class Recording implements Serializable {
.map(File::new);
}
public void setDirtyFlag(boolean dirtyFlag) {
this.dirtyFlag = dirtyFlag;
}
public Future<RecordingProcess> getCurrentIteration() {
return currentIteration;
}

View File

@ -0,0 +1,150 @@
package ctbrec;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.*;
import static java.nio.file.LinkOption.NOFOLLOW_LINKS;
import static java.nio.file.StandardWatchEventKinds.*;
@Slf4j
public class RecordingSizeMonitor {
protected final WatchService service;
protected final Map<WatchKey, Path> keys;
protected final Map<WatchKey, Recording> recordingByKey;
protected final Map<Recording, List<WatchKey>> keysByRecording;
protected final Set<Path> registeredPaths;
public RecordingSizeMonitor() throws IOException {
this.service = FileSystems.getDefault().newWatchService();
this.keys = new HashMap<>();
this.recordingByKey = new HashMap<>();
this.keysByRecording = new HashMap<>();
this.registeredPaths = new HashSet<>();
}
public void monitor(Recording rec) {
GlobalThreadPool.submit(() -> {
try {
for (File file : rec.getAllRecordingFiles()) {
for (int i = 0; i < 300; i++) { // wait up to 30 seconds for a file to show up
Path path = file.toPath();
if (Files.exists(path)) {
if (Files.isDirectory(path, NOFOLLOW_LINKS)) {
registerAll(path, rec);
} else if (Files.isRegularFile(path)) {
registerAll(path.getParent(), rec);
}
break;
} else {
Thread.sleep(100);
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
log.warn("Couldn't register file monitor for {}", rec, e);
}
});
}
private void register(Path path, Recording rec) throws IOException {
if (!registeredPaths.contains(path)) {
WatchKey key = path.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
keys.put(key, path);
recordingByKey.put(key, rec);
keysByRecording.computeIfAbsent(rec, r -> new ArrayList<>()).add(key);
registeredPaths.add(path);
}
}
private void registerAll(Path path, Recording rec) throws IOException {
Files.walkFileTree(path, new SimpleFileVisitor<>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
register(dir, rec);
return FileVisitResult.CONTINUE;
}
});
}
public void uninstall(Recording rec) {
List<WatchKey> keysForRecording = this.keysByRecording.getOrDefault(rec, Collections.emptyList());
keysForRecording.forEach(key -> {
Path path = keys.get(key);
key.cancel();
keys.remove(key);
recordingByKey.remove(key);
registeredPaths.remove(path);
});
this.keysByRecording.remove(rec);
}
public void processEvents() {
while (!Thread.interrupted()) {
WatchKey key = null;
try {
try {
key = service.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
continue;
}
Path dir = keys.get(key);
if (dir == null) {
log.error("WatchKey not recognized");
continue;
}
for (WatchEvent<?> event : key.pollEvents()) {
WatchEvent.Kind<?> kind = event.kind();
Recording r = recordingByKey.get(key);
if (r == null) {
continue;
}
// events might have been lost, we just refresh the recording to be sure
if (kind == OVERFLOW) {
r.refresh();
continue;
}
// Context for directory entry event is the file name of entry
WatchEvent<Path> ev = cast(event);
Path name = ev.context();
Path child = dir.resolve(name);
try {
r.refresh(dir, ev);
} catch (IOException e) {
log.error("Error while updating recording size of {}", r, e);
}
// if directory is created, and watching recursively, then
// register it and its sub-directories
if (kind == ENTRY_CREATE && Files.isDirectory(child, NOFOLLOW_LINKS)) {
registerAll(child, r);
}
}
} catch (Exception e) {
log.error("Error while processing file system events", e);
} finally {
if (key != null) {
key.reset();
}
}
}
}
@SuppressWarnings("unchecked")
static <T> WatchEvent<T> cast(WatchEvent<?> event) {
return (WatchEvent<T>) event;
}
}

View File

@ -6,6 +6,7 @@ import ctbrec.Config;
import ctbrec.Model;
import ctbrec.Recording;
import ctbrec.Recording.State;
import ctbrec.RecordingSizeMonitor;
import ctbrec.io.FileJsonAdapter;
import ctbrec.io.InstantJsonAdapter;
import ctbrec.io.ModelJsonAdapter;
@ -16,9 +17,9 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
import static ctbrec.Recording.State.*;
@ -34,6 +35,8 @@ public class RecordingManager {
private final List<Recording> recordings = new ArrayList<>();
private final ReentrantLock recordingsLock = new ReentrantLock();
private final RecordingSizeMonitor sizeMonitor;
public RecordingManager(Config config, List<Site> sites) throws IOException {
this.config = config;
Moshi moshi = new Moshi.Builder()
@ -43,6 +46,9 @@ public class RecordingManager {
.build();
adapter = moshi.adapter(Recording.class).indent(" ");
sizeMonitor = new RecordingSizeMonitor();
Executors.newSingleThreadExecutor().submit(sizeMonitor::processEvents);
loadRecordings();
}
@ -58,6 +64,7 @@ public class RecordingManager {
} finally {
recordingsLock.unlock();
}
sizeMonitor.monitor(rec);
}
public void saveRecording(Recording rec) throws IOException {
@ -97,6 +104,7 @@ public class RecordingManager {
}
if (recordingExists(recording)) {
recordings.add(recording);
sizeMonitor.monitor(recording);
} else {
LOG.info("Recording {} does not exist anymore -> ignoring recording", recording);
}
@ -169,6 +177,7 @@ public class RecordingManager {
} finally {
recordingsLock.unlock();
}
sizeMonitor.uninstall(recording);
}
/**
@ -189,18 +198,12 @@ public class RecordingManager {
} finally {
recordingsLock.unlock();
}
sizeMonitor.uninstall(recording);
}
public List<Recording> getAll() {
recordingsLock.lock();
try {
Instant start = Instant.now();
for (Recording recording : recordings) {
recording.refresh();
}
Instant finish = Instant.now();
long timeElapsed = Duration.between(start, finish).toMillis();
LOG.trace("Recordings list refreshed in {} ms", timeElapsed);
return new ArrayList<>(recordings);
} finally {
recordingsLock.unlock();

View File

@ -233,7 +233,6 @@ public class SimplifiedLocalRecorder implements Recorder {
setRecordingStatus(recording, State.POST_PROCESSING);
recording.getRecordingProcess().stop();
recording.getRecordingProcess().awaitEnd();
recording.setDirtyFlag(true);
recording.getRecordingProcess().finalizeDownload();
recording.refresh();
recordingManager.saveRecording(recording);