Refactor serment writing

- use a dedicated virtual thread. (Subjectively) simpler code, but could be imitated with tail-call-like rescheduling
 - use condition variable to wait until the quene becomes non-empty. Could be replaced by polling in a loop, but this will either introduce latency, or higher CPU load (?)

(cherry picked from commit dbf78cfb919aad0b0ef786d7815ac329808401fc)
This commit is contained in:
reusedname 2025-02-11 19:16:30 +05:00
parent bd8c5b9aaa
commit ae2a026ad1
1 changed files with 58 additions and 40 deletions

View File

@ -9,7 +9,6 @@ import ctbrec.recorder.FFmpeg;
import ctbrec.recorder.SimplifiedLocalRecorder;
import ctbrec.recorder.download.ProcessExitedUncleanException;
import ctbrec.recorder.download.hls.SegmentPlaylist.Segment;
import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -22,7 +21,7 @@ import java.nio.file.Files;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
@ -34,19 +33,18 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
protected FFmpeg ffmpeg;
protected Process ffmpegProcess;
protected OutputStream ffmpegStdIn;
protected BlockingQueue<Future<SegmentDownload>> queue = new LinkedBlockingQueue<>(50);
protected Lock ffmpegStreamLock = new ReentrantLock();
protected BlockingQueue<Future<SegmentDownload>> queue = new LinkedBlockingQueue<>();
protected Thread segmentWriterThread;
protected ReentrantLock conditionLock = new ReentrantLock();
protected Condition queueNotEmpty = conditionLock.newCondition();
public String getStats() {
// check if lock is held
var locked = ffmpegStreamLock.tryLock();
if (locked) ffmpegStreamLock.unlock();
String text = (running ? "RUN" : "stp")
+ String.format(" lag=%6dms lock=%b %d: ",
+ String.format(" lag=%6dms %d/%d: ",
Duration.between(lastSegmentDownload, Instant.now()).toMillis(),
!locked,
queue.size());
queue.size(),
queue.size()+queue.remainingCapacity());
for (var elem : queue) {
text += elem.isDone() ? "|" : "-";
}
@ -68,7 +66,11 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
startFfmpegProcess(targetFile);
if (ffmpegProcess == null) {
throw new ProcessExitedUncleanException("Couldn't spawn FFmpeg");
}
}
segmentWriterThread = Thread.ofVirtual()
.name("SegmentWriter-" + model.getDisplayName())
.start(() -> streamSegmentDataToFfmpeg());
}
@Override
@ -84,41 +86,42 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
LOG.error("FFmpeg exited unclean", e);
}
streamSegmentDataToFfmpeg();
return this;
}
protected void streamSegmentDataToFfmpeg() {
downloadExecutor.submit(() -> {
ffmpegStreamLock.lock();
LOG.trace("Starting streaming segments to file {}", targetFile);
while (running && !Thread.currentThread().isInterrupted()) {
try {
LOG.trace("Starting streaming segments to file {}", targetFile);
while (!queue.isEmpty() && !Thread.currentThread().isInterrupted()) {
try {
Future<SegmentDownload> future = queue.peek();
if (running && future.isDone()) {
queue.take();
SegmentDownload segmentDownload = future.get();
ByteArrayOutputStream downloadData = (ByteArrayOutputStream) segmentDownload.getOutputStream();
downloadData.writeTo(ffmpegStdIn);
lastSegmentDownload = Instant.now();
} else {
// first download in queue not finished, let's continue with other stuff
break;
}
} catch (InterruptedException e) {
LOG.trace("Segment download interrupted for model {}", model, e);
Thread.currentThread().interrupt();
} catch (Exception e) {
LOG.error("Segment download failed for model {}", model, e);
}
try {
conditionLock.lock();
while (queue.isEmpty())
queueNotEmpty.await();
} finally {
conditionLock.unlock();
}
LOG.trace("Finishing streaming segments to file {}", targetFile);
} finally {
ffmpegStreamLock.unlock();
// use peek() instead of take() so that we can look at first element from other places (currently for debug statistics)
// only take finished (downloaded or failed) elements
Future<SegmentDownload> future = queue.peek();
SegmentDownload segmentDownload = future.get();
queue.take();
if (segmentDownload.isFailed()) {
throw segmentDownload.getException();
}
ByteArrayOutputStream downloadData = (ByteArrayOutputStream) segmentDownload.getOutputStream();
downloadData.writeTo(ffmpegStdIn);
lastSegmentDownload = Instant.now();
} catch (InterruptedException e) {
LOG.trace("Segment download interrupted for model {}", model, e);
Thread.currentThread().interrupt();
} catch (Exception e) {
LOG.error("Segment download failed for model {}", model, e);
}
});
}
LOG.trace("Finishing streaming segments to file {}", targetFile);
}
@Override
@ -162,6 +165,14 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
protected void execute(SegmentDownload segmentDownload) {
try {
queue.add(segmentDownloadService.submit(segmentDownload));
// notify the writer thread that we have added an element
try {
conditionLock.lock();
queueNotEmpty.signal();
} finally {
conditionLock.unlock();
}
} catch (Exception e) {
SimplifiedLocalRecorder.STATS.errorCount++;
throw e;
@ -179,6 +190,13 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
@Override
protected synchronized void internalStop() {
running = false;
segmentWriterThread.interrupt();
try {
segmentWriterThread.join();
} catch (InterruptedException e) {
}
for (Future<SegmentDownload> future : queue) {
future.cancel(true);
}