From 99f1aa5429a0ffa026d580a6c0687208059bfb3e Mon Sep 17 00:00:00 2001 From: 0xb00bface <0xboobface@gmail.com> Date: Sat, 11 Nov 2023 11:05:12 +0100 Subject: [PATCH] Remove RecordingSizeMonitor and update the recording size while recording and during post-processing instead --- common/src/main/java/ctbrec/Recording.java | 125 ++----------- .../java/ctbrec/RecordingSizeMonitor.java | 165 ------------------ .../java/ctbrec/io/json/dto/RecordingDto.java | 1 - .../ctbrec/recorder/RecordingManager.java | 14 -- .../recorder/download/RecordingProcess.java | 8 + .../download/hls/AbstractHlsDownload.java | 11 +- .../recorder/download/hls/HlsDownload.java | 6 +- .../download/hls/SegmentDownload.java | 6 + .../sites/amateurtv/AmateurTvDownload.java | 1 + .../io/json/ObjectMapperRecordingTest.java | 2 - .../io/json/mapper/RecordingMapperTest.java | 2 +- 11 files changed, 43 insertions(+), 298 deletions(-) delete mode 100644 common/src/main/java/ctbrec/RecordingSizeMonitor.java diff --git a/common/src/main/java/ctbrec/Recording.java b/common/src/main/java/ctbrec/Recording.java index 642f6bf6..28525f24 100644 --- a/common/src/main/java/ctbrec/Recording.java +++ b/common/src/main/java/ctbrec/Recording.java @@ -6,15 +6,14 @@ import ctbrec.io.IoUtils; import ctbrec.recorder.download.RecordingProcess; import ctbrec.recorder.download.StreamSource; import ctbrec.recorder.download.VideoLengthDetector; +import lombok.Getter; import lombok.NoArgsConstructor; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import java.io.File; import java.io.IOException; import java.io.Serializable; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.WatchEvent; import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; @@ -26,9 +25,10 @@ import java.util.Set; import java.util.concurrent.Future; import static ctbrec.Recording.State.*; -import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; @Slf4j +@Getter +@Setter @NoArgsConstructor public class Recording implements Serializable { @@ -80,43 +80,11 @@ public class Recording implements Serializable { } } - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public Instant getStartDate() { - return startDate; - } - - public void setStartDate(Instant startDate) { - this.startDate = startDate; - } - - public State getStatus() { - return status; - } - - public void setStatus(State status) { - this.status = status; - } - public void setStatusWithEvent(State status) { setStatus(status); fireStatusEvent(status); } - public int getProgress() { - return this.progress; - } - - public void setProgress(int progress) { - this.progress = progress; - } - public void setPath(String path) { this.path = path; } @@ -145,7 +113,7 @@ public class Recording implements Serializable { } public long getSizeInByte() { - if (sizeInByte == -1) { + if (sizeInByte == -1 || getStatus() == RECORDING) { refresh(); } return sizeInByte; @@ -164,53 +132,6 @@ public class Recording implements Serializable { EventBusHolder.BUS.post(evt); } - public Model getModel() { - return model; - } - - public void setModel(Model model) { - this.model = model; - } - - public RecordingProcess getRecordingProcess() { - return recordingProcess; - } - - public void setRecordingProcess(RecordingProcess recordingProcess) { - this.recordingProcess = recordingProcess; - } - - public boolean isSingleFile() { - return singleFile; - } - - public void setSingleFile(boolean singleFile) { - this.singleFile = singleFile; - } - - public String getMetaDataFile() { - return metaDataFile; - } - - public void setMetaDataFile(String metaDataFile) { - this.metaDataFile = metaDataFile; - } - - public boolean isPinned() { - return pinned; - } - - public void setPinned(boolean pinned) { - this.pinned = pinned; - } - - public String getNote() { - return note; - } - - public void setNote(String note) { - this.note = note; - } public int getSelectedResolution() { if ((selectedResolution == -1 || selectedResolution == StreamSource.UNKNOWN) && recordingProcess != null) { @@ -300,19 +221,15 @@ public class Recording implements Serializable { } public void refresh() { - long now = System.currentTimeMillis(); - if (now - lastSizeUpdate > 2500) { - sizeInByte = getSize(); - lastSizeUpdate = now; - } - } - - public void refresh(Path dir, WatchEvent event) throws IOException { - Path child = dir.resolve(event.context()); - if (event.kind() == ENTRY_CREATE) { - sizeInByte += Files.size(child); + if (getStatus() == RECORDING && recordingProcess != null) { + sizeInByte = recordingProcess.getSizeInByte(); } else { - refresh(); + long now = System.currentTimeMillis(); + if (now - lastSizeUpdate > 2500) { + log.debug("full size update for {}", this); + sizeInByte = getSize(); + lastSizeUpdate = now; + } } } @@ -320,26 +237,10 @@ public class Recording implements Serializable { return getStatus() == FAILED || getStatus() == WAITING || getStatus() == FINISHED; } - public void setAssociatedFiles(Set associatedFiles) { - this.associatedFiles = associatedFiles; - } - - public Set getAssociatedFiles() { - return associatedFiles; - } - public Optional getContactSheet() { return getAssociatedFiles().stream() .filter(filePath -> filePath.endsWith(".jpg")) .findFirst() .map(File::new); } - - public Future getCurrentIteration() { - return currentIteration; - } - - public void setCurrentIteration(Future currentIteration) { - this.currentIteration = currentIteration; - } } diff --git a/common/src/main/java/ctbrec/RecordingSizeMonitor.java b/common/src/main/java/ctbrec/RecordingSizeMonitor.java deleted file mode 100644 index 0e1938de..00000000 --- a/common/src/main/java/ctbrec/RecordingSizeMonitor.java +++ /dev/null @@ -1,165 +0,0 @@ -package ctbrec; - -import lombok.extern.slf4j.Slf4j; - -import java.io.File; -import java.io.IOException; -import java.nio.file.*; -import java.nio.file.attribute.BasicFileAttributes; -import java.util.*; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import static java.nio.file.LinkOption.NOFOLLOW_LINKS; -import static java.nio.file.StandardWatchEventKinds.*; - -@Slf4j -public class RecordingSizeMonitor { - - protected final WatchService service; - protected final Map keys; - protected final Map recordingByKey; - protected final Map> keysByRecording; - protected final Set registeredPaths; - private final Lock lock = new ReentrantLock(); - - public RecordingSizeMonitor() throws IOException { - this.service = FileSystems.getDefault().newWatchService(); - this.keys = new HashMap<>(); - this.recordingByKey = new HashMap<>(); - this.keysByRecording = new HashMap<>(); - this.registeredPaths = new HashSet<>(); - } - - public void monitor(Recording rec) { - GlobalThreadPool.submit(() -> { - try { - for (File file : rec.getAllRecordingFiles()) { - for (int i = 0; i < 300; i++) { // wait up to 30 seconds for a file to show up - Path path = file.toPath(); - if (Files.exists(path)) { - if (Files.isDirectory(path, NOFOLLOW_LINKS)) { - registerAll(path, rec); - } else if (Files.isRegularFile(path)) { - registerAll(path.getParent(), rec); - } - break; - } else { - Thread.sleep(100); - } - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - log.warn("Couldn't register file monitor for {}", rec, e); - } - }); - } - - private synchronized void register(Path path, Recording rec) throws IOException { - lock.lock(); - try { - if (!registeredPaths.contains(path)) { - WatchKey key = path.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); - keys.put(key, path); - recordingByKey.put(key, rec); - keysByRecording.computeIfAbsent(rec, r -> new ArrayList<>()).add(key); - registeredPaths.add(path); - } - } finally { - lock.unlock(); - } - } - - private void registerAll(Path path, Recording rec) throws IOException { - Files.walkFileTree(path, new SimpleFileVisitor<>() { - @Override - public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { - register(dir, rec); - return FileVisitResult.CONTINUE; - } - }); - } - - public void uninstall(Recording rec) { - lock.lock(); - try { - List keysForRecording = this.keysByRecording.getOrDefault(rec, Collections.emptyList()); - keysForRecording.forEach(key -> { - Path path = keys.get(key); - key.cancel(); - keys.remove(key); - recordingByKey.remove(key); - registeredPaths.remove(path); - }); - this.keysByRecording.remove(rec); - } finally { - lock.unlock(); - } - } - - public void processEvents() { - while (!Thread.interrupted()) { - WatchKey key = null; - try { - try { - key = service.take(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - continue; - } - - lock.lock(); - Path dir = keys.get(key); - if (dir == null) { - log.error("WatchKey not recognized"); - continue; - } - - for (WatchEvent event : key.pollEvents()) { - WatchEvent.Kind kind = event.kind(); - Recording r = recordingByKey.get(key); - if (r == null) { - continue; - } - - // events might have been lost, we just refresh the recording to be sure - if (kind == OVERFLOW) { - r.refresh(); - continue; - } - - // Context for directory entry event is the file name of entry - WatchEvent ev = cast(event); - Path name = ev.context(); - Path child = dir.resolve(name); - - try { - r.refresh(dir, ev); - } catch (IOException e) { - log.error("Error while updating recording size of {}", r, e); - } - - // if directory is created, and watching recursively, then - // register it and its sub-directories - if (kind == ENTRY_CREATE && Files.isDirectory(child, NOFOLLOW_LINKS)) { - registerAll(child, r); - } - } - } catch (Exception e) { - log.error("Error while processing file system events", e); - } finally { - lock.unlock(); - if (key != null) { - key.reset(); - } - } - } - } - - @SuppressWarnings("unchecked") - static WatchEvent cast(WatchEvent event) { - return (WatchEvent) event; - } -} diff --git a/common/src/main/java/ctbrec/io/json/dto/RecordingDto.java b/common/src/main/java/ctbrec/io/json/dto/RecordingDto.java index 2e344557..bc45fbc8 100644 --- a/common/src/main/java/ctbrec/io/json/dto/RecordingDto.java +++ b/common/src/main/java/ctbrec/io/json/dto/RecordingDto.java @@ -30,5 +30,4 @@ public class RecordingDto { private File absoluteFile = null; private File postProcessedFile = null; private int selectedResolution = -1; - private long lastSizeUpdate = 0; } diff --git a/common/src/main/java/ctbrec/recorder/RecordingManager.java b/common/src/main/java/ctbrec/recorder/RecordingManager.java index 7ec03cda..a1f988ad 100644 --- a/common/src/main/java/ctbrec/recorder/RecordingManager.java +++ b/common/src/main/java/ctbrec/recorder/RecordingManager.java @@ -5,7 +5,6 @@ import ctbrec.Config; import ctbrec.MigrateModel5_1_2; import ctbrec.Recording; import ctbrec.Recording.State; -import ctbrec.RecordingSizeMonitor; import ctbrec.io.json.ObjectMapperFactory; import ctbrec.io.json.dto.RecordingDto; import ctbrec.io.json.mapper.RecordingMapper; @@ -19,7 +18,6 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.*; -import java.util.concurrent.Executors; import java.util.concurrent.locks.ReentrantLock; import static ctbrec.Recording.State.*; @@ -36,15 +34,10 @@ public class RecordingManager { private final List recordings = new ArrayList<>(); private final ReentrantLock recordingsLock = new ReentrantLock(); - private final RecordingSizeMonitor sizeMonitor; - public RecordingManager(Config config, List sites) throws IOException { this.config = config; this.sites = sites; - sizeMonitor = new RecordingSizeMonitor(); - Executors.newSingleThreadExecutor().submit(sizeMonitor::processEvents); - loadRecordings(); } @@ -57,7 +50,6 @@ public class RecordingManager { recordingsLock.lock(); try { recordings.add(rec); - sizeMonitor.monitor(rec); } finally { recordingsLock.unlock(); } @@ -111,7 +103,6 @@ public class RecordingManager { } if (recordingExists(recording)) { recordings.add(recording); - sizeMonitor.monitor(recording); } else { log.info("Recording {} does not exist anymore -> ignoring recording", recording); } @@ -148,9 +139,6 @@ public class RecordingManager { boolean isFile = path.isFile(); log.debug("Deleting {}", path); - // uninstall file monitor - sizeMonitor.uninstall(recording); - // delete the video files if (isFile) { Files.delete(path.toPath()); @@ -200,8 +188,6 @@ public class RecordingManager { try { int idx = recordings.indexOf(recording); recording = recordings.get(idx); - // uninstall file monitor - sizeMonitor.uninstall(recording); // delete the meta data Files.deleteIfExists(new File(recording.getMetaDataFile()).toPath()); // remove from data structure diff --git a/common/src/main/java/ctbrec/recorder/download/RecordingProcess.java b/common/src/main/java/ctbrec/recorder/download/RecordingProcess.java index c88352a9..f9cdd020 100644 --- a/common/src/main/java/ctbrec/recorder/download/RecordingProcess.java +++ b/common/src/main/java/ctbrec/recorder/download/RecordingProcess.java @@ -9,8 +9,11 @@ import java.io.IOException; import java.time.Instant; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; public interface RecordingProcess extends Callable { + AtomicLong downloadedBytes = new AtomicLong(); + void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException; void stop(); @@ -58,4 +61,9 @@ public interface RecordingProcess extends Callable { long getSizeInByte(); void awaitEnd(); + + default AtomicLong getDownloadedBytes() { + return downloadedBytes; + } + } diff --git a/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload.java index a6512dba..e997c611 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload.java @@ -44,9 +44,9 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; import static ctbrec.io.HttpConstants.*; -import static ctbrec.recorder.download.StreamSource.UNKNOWN; import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.file.StandardOpenOption.*; @@ -70,9 +70,9 @@ public abstract class AbstractHlsDownload extends AbstractDownload { private Instant beforeLastPlaylistRequest = Instant.EPOCH; private int consecutivePlaylistTimeouts = 0; private int consecutivePlaylistErrors = 0; + private final AtomicLong sizeInByte = new AtomicLong(); protected Instant lastSegmentDownload = Instant.MIN; private final List segmentErrorTimestamps = new LinkedList<>(); - private int selectedResolution = UNKNOWN; private final List recordingEvents = new LinkedList<>(); protected ExecutorCompletionService segmentDownloadService; @@ -88,9 +88,16 @@ public abstract class AbstractHlsDownload extends AbstractDownload { lastSegmentDownload = Instant.now(); } + @Override + public long getSizeInByte() { + return sizeInByte.get(); + } + protected abstract OutputStream getSegmentOutputStream(Segment segment) throws IOException; protected void segmentDownloadFinished(SegmentDownload segmentDownload) { // NOSONAR + long bytes = getDownloadedBytes().addAndGet(segmentDownload.size()); + sizeInByte.addAndGet(bytes); lastSegmentDownload = Instant.now(); } diff --git a/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java index d616d07e..0b1412a1 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java @@ -190,7 +190,11 @@ public class HlsDownload extends AbstractHlsDownload { @Override public long getSizeInByte() { - return IoUtils.getDirectorySize(getTarget()); + if (super.getSizeInByte() == 0) { + return IoUtils.getDirectorySize(getTarget()); + } else { + return super.getSizeInByte(); + } } @Override diff --git a/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java index b9a57896..74d0bb93 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java @@ -37,6 +37,7 @@ public class SegmentDownload implements Callable { protected final Model model; protected final OutputStream out; + private long size = 0; protected boolean failed = false; protected Exception exception; @@ -89,6 +90,7 @@ public class SegmentDownload implements Callable { int length; while ((length = in.read(b)) >= 0 && !Thread.currentThread().isInterrupted()) { out.write(b, 0, length); + size += length; BandwidthMeter.add(length); } out.flush(); @@ -118,4 +120,8 @@ public class SegmentDownload implements Callable { public Exception getException() { return exception; } + + public long size() { + return size; + } } diff --git a/common/src/main/java/ctbrec/sites/amateurtv/AmateurTvDownload.java b/common/src/main/java/ctbrec/sites/amateurtv/AmateurTvDownload.java index 81f5d6f6..6ad69c56 100644 --- a/common/src/main/java/ctbrec/sites/amateurtv/AmateurTvDownload.java +++ b/common/src/main/java/ctbrec/sites/amateurtv/AmateurTvDownload.java @@ -154,6 +154,7 @@ public class AmateurTvDownload extends AbstractDownload { while (running && !Thread.currentThread().isInterrupted() && (len = in.read(b)) >= 0) { fout.write(b, 0, len); timeOfLastTransfer = Instant.now(); + getDownloadedBytes().addAndGet(len); BandwidthMeter.add(len); } } else { diff --git a/common/src/test/java/ctbrec/io/json/ObjectMapperRecordingTest.java b/common/src/test/java/ctbrec/io/json/ObjectMapperRecordingTest.java index 27bdc06d..6095bf27 100644 --- a/common/src/test/java/ctbrec/io/json/ObjectMapperRecordingTest.java +++ b/common/src/test/java/ctbrec/io/json/ObjectMapperRecordingTest.java @@ -39,7 +39,6 @@ class ObjectMapperRecordingTest { assertEquals(rec.getAbsoluteFile().toString(), j.getString("absoluteFile")); assertEquals(rec.getPostProcessedFile().toString(), j.getString("postProcessedFile")); assertEquals(rec.getSelectedResolution(), j.getInt("selectedResolution")); - assertEquals(rec.getLastSizeUpdate(), j.getLong("lastSizeUpdate")); } @Test @@ -67,7 +66,6 @@ class ObjectMapperRecordingTest { recording.setAbsoluteFile(new File("/tmp/test.rec.ts")); recording.setPostProcessedFile(new File("/tmp/pp.ts")); recording.setSelectedResolution(2); - recording.setLastSizeUpdate(Instant.now().toEpochMilli()); return recording; } } diff --git a/common/src/test/java/ctbrec/io/json/mapper/RecordingMapperTest.java b/common/src/test/java/ctbrec/io/json/mapper/RecordingMapperTest.java index 3a5c6b74..9b0f1cd3 100644 --- a/common/src/test/java/ctbrec/io/json/mapper/RecordingMapperTest.java +++ b/common/src/test/java/ctbrec/io/json/mapper/RecordingMapperTest.java @@ -71,7 +71,7 @@ class RecordingMapperTest { dto.setId(UUID.randomUUID().toString()); dto.setModel(model); dto.setAbsoluteFile(new File("/tmp/recording")); - dto.setStatus(Recording.State.RECORDING); + dto.setStatus(Recording.State.FINISHED); dto.setSelectedResolution(1080); dto.setProgress(23); dto.setStartDate(Instant.now());