From ae2a026ad1fa4227a11d87026e4d3d47c734c451 Mon Sep 17 00:00:00 2001 From: reusedname <155286845+reusedname@users.noreply.github.com> Date: Tue, 11 Feb 2025 19:16:30 +0500 Subject: [PATCH] Refactor serment writing - use a dedicated virtual thread. (Subjectively) simpler code, but could be imitated with tail-call-like rescheduling - use condition variable to wait until the quene becomes non-empty. Could be replaced by polling in a loop, but this will either introduce latency, or higher CPU load (?) (cherry picked from commit dbf78cfb919aad0b0ef786d7815ac329808401fc) --- .../download/hls/MergedFfmpegHlsDownload.java | 98 +++++++++++-------- 1 file changed, 58 insertions(+), 40 deletions(-) diff --git a/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java index 12c3249d..cec41eb2 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java @@ -9,7 +9,6 @@ import ctbrec.recorder.FFmpeg; import ctbrec.recorder.SimplifiedLocalRecorder; import ctbrec.recorder.download.ProcessExitedUncleanException; import ctbrec.recorder.download.hls.SegmentPlaylist.Segment; -import lombok.Getter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,7 +21,7 @@ import java.nio.file.Files; import java.time.Duration; import java.time.Instant; import java.util.concurrent.*; -import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; @@ -34,19 +33,18 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { protected FFmpeg ffmpeg; protected Process ffmpegProcess; protected OutputStream ffmpegStdIn; - protected BlockingQueue> queue = new LinkedBlockingQueue<>(50); - protected Lock ffmpegStreamLock = new ReentrantLock(); + protected BlockingQueue> queue = new LinkedBlockingQueue<>(); + protected Thread segmentWriterThread; + protected ReentrantLock conditionLock = new ReentrantLock(); + protected Condition queueNotEmpty = conditionLock.newCondition(); + public String getStats() { - // check if lock is held - var locked = ffmpegStreamLock.tryLock(); - if (locked) ffmpegStreamLock.unlock(); - String text = (running ? "RUN" : "stp") - + String.format(" lag=%6dms lock=%b %d: ", + + String.format(" lag=%6dms %d/%d: ", Duration.between(lastSegmentDownload, Instant.now()).toMillis(), - !locked, - queue.size()); + queue.size(), + queue.size()+queue.remainingCapacity()); for (var elem : queue) { text += elem.isDone() ? "|" : "-"; } @@ -68,7 +66,11 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { startFfmpegProcess(targetFile); if (ffmpegProcess == null) { throw new ProcessExitedUncleanException("Couldn't spawn FFmpeg"); - } + } + + segmentWriterThread = Thread.ofVirtual() + .name("SegmentWriter-" + model.getDisplayName()) + .start(() -> streamSegmentDataToFfmpeg()); } @Override @@ -84,41 +86,42 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { LOG.error("FFmpeg exited unclean", e); } - streamSegmentDataToFfmpeg(); - return this; } protected void streamSegmentDataToFfmpeg() { - downloadExecutor.submit(() -> { - ffmpegStreamLock.lock(); + LOG.trace("Starting streaming segments to file {}", targetFile); + while (running && !Thread.currentThread().isInterrupted()) { try { - LOG.trace("Starting streaming segments to file {}", targetFile); - while (!queue.isEmpty() && !Thread.currentThread().isInterrupted()) { - try { - Future future = queue.peek(); - if (running && future.isDone()) { - queue.take(); - SegmentDownload segmentDownload = future.get(); - ByteArrayOutputStream downloadData = (ByteArrayOutputStream) segmentDownload.getOutputStream(); - downloadData.writeTo(ffmpegStdIn); - lastSegmentDownload = Instant.now(); - } else { - // first download in queue not finished, let's continue with other stuff - break; - } - } catch (InterruptedException e) { - LOG.trace("Segment download interrupted for model {}", model, e); - Thread.currentThread().interrupt(); - } catch (Exception e) { - LOG.error("Segment download failed for model {}", model, e); - } + try { + conditionLock.lock(); + while (queue.isEmpty()) + queueNotEmpty.await(); + } finally { + conditionLock.unlock(); } - LOG.trace("Finishing streaming segments to file {}", targetFile); - } finally { - ffmpegStreamLock.unlock(); + + // use peek() instead of take() so that we can look at first element from other places (currently for debug statistics) + // only take finished (downloaded or failed) elements + Future future = queue.peek(); + SegmentDownload segmentDownload = future.get(); + queue.take(); + + if (segmentDownload.isFailed()) { + throw segmentDownload.getException(); + } + + ByteArrayOutputStream downloadData = (ByteArrayOutputStream) segmentDownload.getOutputStream(); + downloadData.writeTo(ffmpegStdIn); + lastSegmentDownload = Instant.now(); + } catch (InterruptedException e) { + LOG.trace("Segment download interrupted for model {}", model, e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOG.error("Segment download failed for model {}", model, e); } - }); + } + LOG.trace("Finishing streaming segments to file {}", targetFile); } @Override @@ -162,6 +165,14 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { protected void execute(SegmentDownload segmentDownload) { try { queue.add(segmentDownloadService.submit(segmentDownload)); + + // notify the writer thread that we have added an element + try { + conditionLock.lock(); + queueNotEmpty.signal(); + } finally { + conditionLock.unlock(); + } } catch (Exception e) { SimplifiedLocalRecorder.STATS.errorCount++; throw e; @@ -179,6 +190,13 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { @Override protected synchronized void internalStop() { running = false; + + segmentWriterThread.interrupt(); + try { + segmentWriterThread.join(); + } catch (InterruptedException e) { + } + for (Future future : queue) { future.cancel(true); }