Remove RecordingSizeMonitor and update the recording size while recording and during post-processing instead

This commit is contained in:
0xb00bface 2023-11-11 11:05:12 +01:00
parent 28d93674ca
commit 99f1aa5429
11 changed files with 43 additions and 298 deletions

View File

@ -6,15 +6,14 @@ import ctbrec.io.IoUtils;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.recorder.download.StreamSource;
import ctbrec.recorder.download.VideoLengthDetector;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.WatchEvent;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
@ -26,9 +25,10 @@ import java.util.Set;
import java.util.concurrent.Future;
import static ctbrec.Recording.State.*;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
@Slf4j
@Getter
@Setter
@NoArgsConstructor
public class Recording implements Serializable {
@ -80,43 +80,11 @@ public class Recording implements Serializable {
}
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Instant getStartDate() {
return startDate;
}
public void setStartDate(Instant startDate) {
this.startDate = startDate;
}
public State getStatus() {
return status;
}
public void setStatus(State status) {
this.status = status;
}
public void setStatusWithEvent(State status) {
setStatus(status);
fireStatusEvent(status);
}
public int getProgress() {
return this.progress;
}
public void setProgress(int progress) {
this.progress = progress;
}
public void setPath(String path) {
this.path = path;
}
@ -145,7 +113,7 @@ public class Recording implements Serializable {
}
public long getSizeInByte() {
if (sizeInByte == -1) {
if (sizeInByte == -1 || getStatus() == RECORDING) {
refresh();
}
return sizeInByte;
@ -164,53 +132,6 @@ public class Recording implements Serializable {
EventBusHolder.BUS.post(evt);
}
public Model getModel() {
return model;
}
public void setModel(Model model) {
this.model = model;
}
public RecordingProcess getRecordingProcess() {
return recordingProcess;
}
public void setRecordingProcess(RecordingProcess recordingProcess) {
this.recordingProcess = recordingProcess;
}
public boolean isSingleFile() {
return singleFile;
}
public void setSingleFile(boolean singleFile) {
this.singleFile = singleFile;
}
public String getMetaDataFile() {
return metaDataFile;
}
public void setMetaDataFile(String metaDataFile) {
this.metaDataFile = metaDataFile;
}
public boolean isPinned() {
return pinned;
}
public void setPinned(boolean pinned) {
this.pinned = pinned;
}
public String getNote() {
return note;
}
public void setNote(String note) {
this.note = note;
}
public int getSelectedResolution() {
if ((selectedResolution == -1 || selectedResolution == StreamSource.UNKNOWN) && recordingProcess != null) {
@ -300,19 +221,15 @@ public class Recording implements Serializable {
}
public void refresh() {
long now = System.currentTimeMillis();
if (now - lastSizeUpdate > 2500) {
sizeInByte = getSize();
lastSizeUpdate = now;
}
}
public void refresh(Path dir, WatchEvent<Path> event) throws IOException {
Path child = dir.resolve(event.context());
if (event.kind() == ENTRY_CREATE) {
sizeInByte += Files.size(child);
if (getStatus() == RECORDING && recordingProcess != null) {
sizeInByte = recordingProcess.getSizeInByte();
} else {
refresh();
long now = System.currentTimeMillis();
if (now - lastSizeUpdate > 2500) {
log.debug("full size update for {}", this);
sizeInByte = getSize();
lastSizeUpdate = now;
}
}
}
@ -320,26 +237,10 @@ public class Recording implements Serializable {
return getStatus() == FAILED || getStatus() == WAITING || getStatus() == FINISHED;
}
public void setAssociatedFiles(Set<String> associatedFiles) {
this.associatedFiles = associatedFiles;
}
public Set<String> getAssociatedFiles() {
return associatedFiles;
}
public Optional<File> getContactSheet() {
return getAssociatedFiles().stream()
.filter(filePath -> filePath.endsWith(".jpg"))
.findFirst()
.map(File::new);
}
public Future<RecordingProcess> getCurrentIteration() {
return currentIteration;
}
public void setCurrentIteration(Future<RecordingProcess> currentIteration) {
this.currentIteration = currentIteration;
}
}

View File

@ -1,165 +0,0 @@
package ctbrec;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static java.nio.file.LinkOption.NOFOLLOW_LINKS;
import static java.nio.file.StandardWatchEventKinds.*;
@Slf4j
public class RecordingSizeMonitor {
protected final WatchService service;
protected final Map<WatchKey, Path> keys;
protected final Map<WatchKey, Recording> recordingByKey;
protected final Map<Recording, List<WatchKey>> keysByRecording;
protected final Set<Path> registeredPaths;
private final Lock lock = new ReentrantLock();
public RecordingSizeMonitor() throws IOException {
this.service = FileSystems.getDefault().newWatchService();
this.keys = new HashMap<>();
this.recordingByKey = new HashMap<>();
this.keysByRecording = new HashMap<>();
this.registeredPaths = new HashSet<>();
}
public void monitor(Recording rec) {
GlobalThreadPool.submit(() -> {
try {
for (File file : rec.getAllRecordingFiles()) {
for (int i = 0; i < 300; i++) { // wait up to 30 seconds for a file to show up
Path path = file.toPath();
if (Files.exists(path)) {
if (Files.isDirectory(path, NOFOLLOW_LINKS)) {
registerAll(path, rec);
} else if (Files.isRegularFile(path)) {
registerAll(path.getParent(), rec);
}
break;
} else {
Thread.sleep(100);
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
log.warn("Couldn't register file monitor for {}", rec, e);
}
});
}
private synchronized void register(Path path, Recording rec) throws IOException {
lock.lock();
try {
if (!registeredPaths.contains(path)) {
WatchKey key = path.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
keys.put(key, path);
recordingByKey.put(key, rec);
keysByRecording.computeIfAbsent(rec, r -> new ArrayList<>()).add(key);
registeredPaths.add(path);
}
} finally {
lock.unlock();
}
}
private void registerAll(Path path, Recording rec) throws IOException {
Files.walkFileTree(path, new SimpleFileVisitor<>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
register(dir, rec);
return FileVisitResult.CONTINUE;
}
});
}
public void uninstall(Recording rec) {
lock.lock();
try {
List<WatchKey> keysForRecording = this.keysByRecording.getOrDefault(rec, Collections.emptyList());
keysForRecording.forEach(key -> {
Path path = keys.get(key);
key.cancel();
keys.remove(key);
recordingByKey.remove(key);
registeredPaths.remove(path);
});
this.keysByRecording.remove(rec);
} finally {
lock.unlock();
}
}
public void processEvents() {
while (!Thread.interrupted()) {
WatchKey key = null;
try {
try {
key = service.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
continue;
}
lock.lock();
Path dir = keys.get(key);
if (dir == null) {
log.error("WatchKey not recognized");
continue;
}
for (WatchEvent<?> event : key.pollEvents()) {
WatchEvent.Kind<?> kind = event.kind();
Recording r = recordingByKey.get(key);
if (r == null) {
continue;
}
// events might have been lost, we just refresh the recording to be sure
if (kind == OVERFLOW) {
r.refresh();
continue;
}
// Context for directory entry event is the file name of entry
WatchEvent<Path> ev = cast(event);
Path name = ev.context();
Path child = dir.resolve(name);
try {
r.refresh(dir, ev);
} catch (IOException e) {
log.error("Error while updating recording size of {}", r, e);
}
// if directory is created, and watching recursively, then
// register it and its sub-directories
if (kind == ENTRY_CREATE && Files.isDirectory(child, NOFOLLOW_LINKS)) {
registerAll(child, r);
}
}
} catch (Exception e) {
log.error("Error while processing file system events", e);
} finally {
lock.unlock();
if (key != null) {
key.reset();
}
}
}
}
@SuppressWarnings("unchecked")
static <T> WatchEvent<T> cast(WatchEvent<?> event) {
return (WatchEvent<T>) event;
}
}

View File

@ -30,5 +30,4 @@ public class RecordingDto {
private File absoluteFile = null;
private File postProcessedFile = null;
private int selectedResolution = -1;
private long lastSizeUpdate = 0;
}

View File

@ -5,7 +5,6 @@ import ctbrec.Config;
import ctbrec.MigrateModel5_1_2;
import ctbrec.Recording;
import ctbrec.Recording.State;
import ctbrec.RecordingSizeMonitor;
import ctbrec.io.json.ObjectMapperFactory;
import ctbrec.io.json.dto.RecordingDto;
import ctbrec.io.json.mapper.RecordingMapper;
@ -19,7 +18,6 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
import static ctbrec.Recording.State.*;
@ -36,15 +34,10 @@ public class RecordingManager {
private final List<Recording> recordings = new ArrayList<>();
private final ReentrantLock recordingsLock = new ReentrantLock();
private final RecordingSizeMonitor sizeMonitor;
public RecordingManager(Config config, List<Site> sites) throws IOException {
this.config = config;
this.sites = sites;
sizeMonitor = new RecordingSizeMonitor();
Executors.newSingleThreadExecutor().submit(sizeMonitor::processEvents);
loadRecordings();
}
@ -57,7 +50,6 @@ public class RecordingManager {
recordingsLock.lock();
try {
recordings.add(rec);
sizeMonitor.monitor(rec);
} finally {
recordingsLock.unlock();
}
@ -111,7 +103,6 @@ public class RecordingManager {
}
if (recordingExists(recording)) {
recordings.add(recording);
sizeMonitor.monitor(recording);
} else {
log.info("Recording {} does not exist anymore -> ignoring recording", recording);
}
@ -148,9 +139,6 @@ public class RecordingManager {
boolean isFile = path.isFile();
log.debug("Deleting {}", path);
// uninstall file monitor
sizeMonitor.uninstall(recording);
// delete the video files
if (isFile) {
Files.delete(path.toPath());
@ -200,8 +188,6 @@ public class RecordingManager {
try {
int idx = recordings.indexOf(recording);
recording = recordings.get(idx);
// uninstall file monitor
sizeMonitor.uninstall(recording);
// delete the meta data
Files.deleteIfExists(new File(recording.getMetaDataFile()).toPath());
// remove from data structure

View File

@ -9,8 +9,11 @@ import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
public interface RecordingProcess extends Callable<RecordingProcess> {
AtomicLong downloadedBytes = new AtomicLong();
void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException;
void stop();
@ -58,4 +61,9 @@ public interface RecordingProcess extends Callable<RecordingProcess> {
long getSizeInByte();
void awaitEnd();
default AtomicLong getDownloadedBytes() {
return downloadedBytes;
}
}

View File

@ -44,9 +44,9 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import static ctbrec.io.HttpConstants.*;
import static ctbrec.recorder.download.StreamSource.UNKNOWN;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.nio.file.StandardOpenOption.*;
@ -70,9 +70,9 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
private Instant beforeLastPlaylistRequest = Instant.EPOCH;
private int consecutivePlaylistTimeouts = 0;
private int consecutivePlaylistErrors = 0;
private final AtomicLong sizeInByte = new AtomicLong();
protected Instant lastSegmentDownload = Instant.MIN;
private final List<Instant> segmentErrorTimestamps = new LinkedList<>();
private int selectedResolution = UNKNOWN;
private final List<RecordingEvent> recordingEvents = new LinkedList<>();
protected ExecutorCompletionService<SegmentDownload> segmentDownloadService;
@ -88,9 +88,16 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
lastSegmentDownload = Instant.now();
}
@Override
public long getSizeInByte() {
return sizeInByte.get();
}
protected abstract OutputStream getSegmentOutputStream(Segment segment) throws IOException;
protected void segmentDownloadFinished(SegmentDownload segmentDownload) { // NOSONAR
long bytes = getDownloadedBytes().addAndGet(segmentDownload.size());
sizeInByte.addAndGet(bytes);
lastSegmentDownload = Instant.now();
}

View File

@ -190,7 +190,11 @@ public class HlsDownload extends AbstractHlsDownload {
@Override
public long getSizeInByte() {
return IoUtils.getDirectorySize(getTarget());
if (super.getSizeInByte() == 0) {
return IoUtils.getDirectorySize(getTarget());
} else {
return super.getSizeInByte();
}
}
@Override

View File

@ -37,6 +37,7 @@ public class SegmentDownload implements Callable<SegmentDownload> {
protected final Model model;
protected final OutputStream out;
private long size = 0;
protected boolean failed = false;
protected Exception exception;
@ -89,6 +90,7 @@ public class SegmentDownload implements Callable<SegmentDownload> {
int length;
while ((length = in.read(b)) >= 0 && !Thread.currentThread().isInterrupted()) {
out.write(b, 0, length);
size += length;
BandwidthMeter.add(length);
}
out.flush();
@ -118,4 +120,8 @@ public class SegmentDownload implements Callable<SegmentDownload> {
public Exception getException() {
return exception;
}
public long size() {
return size;
}
}

View File

@ -154,6 +154,7 @@ public class AmateurTvDownload extends AbstractDownload {
while (running && !Thread.currentThread().isInterrupted() && (len = in.read(b)) >= 0) {
fout.write(b, 0, len);
timeOfLastTransfer = Instant.now();
getDownloadedBytes().addAndGet(len);
BandwidthMeter.add(len);
}
} else {

View File

@ -39,7 +39,6 @@ class ObjectMapperRecordingTest {
assertEquals(rec.getAbsoluteFile().toString(), j.getString("absoluteFile"));
assertEquals(rec.getPostProcessedFile().toString(), j.getString("postProcessedFile"));
assertEquals(rec.getSelectedResolution(), j.getInt("selectedResolution"));
assertEquals(rec.getLastSizeUpdate(), j.getLong("lastSizeUpdate"));
}
@Test
@ -67,7 +66,6 @@ class ObjectMapperRecordingTest {
recording.setAbsoluteFile(new File("/tmp/test.rec.ts"));
recording.setPostProcessedFile(new File("/tmp/pp.ts"));
recording.setSelectedResolution(2);
recording.setLastSizeUpdate(Instant.now().toEpochMilli());
return recording;
}
}

View File

@ -71,7 +71,7 @@ class RecordingMapperTest {
dto.setId(UUID.randomUUID().toString());
dto.setModel(model);
dto.setAbsoluteFile(new File("/tmp/recording"));
dto.setStatus(Recording.State.RECORDING);
dto.setStatus(Recording.State.FINISHED);
dto.setSelectedResolution(1080);
dto.setProgress(23);
dto.setStartDate(Instant.now());