From 8855591f0fc1a97537a4e4b6735d73a75bf58620 Mon Sep 17 00:00:00 2001 From: 0xb00bface <0xboobface@gmail.com> Date: Sat, 23 Jan 2021 16:24:24 +0100 Subject: [PATCH] Fix race condition in code for streaming segments to FFmpeg This caused stuttering and jumps mostly in the beginning of a recording --- .../download/hls/MergedFfmpegHlsDownload.java | 44 ++++++++++--------- .../download/hls/SegmentDownload.java | 5 +++ 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java index fc01767d..01b72d17 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java @@ -13,6 +13,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; import org.slf4j.Logger; @@ -40,6 +42,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { private transient Process ffmpegProcess; private transient OutputStream ffmpegStdIn; private transient BlockingQueue> queue = new LinkedBlockingQueue<>(); + private transient Lock ffmpegStreamLock = new ReentrantLock(); public MergedFfmpegHlsDownload(HttpClient client) { super(client); @@ -78,30 +81,31 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { } private void streamSegmentDataToFfmpeg() { - while (!queue.isEmpty() && !Thread.currentThread().isInterrupted()) { + downloadExecutor.submit(() -> { + ffmpegStreamLock.lock(); try { - Future future = queue.peek(); - if (running && future.isDone()) { - queue.take(); - SegmentDownload segmentDownload = future.get(); - downloadExecutor.submit(() -> { - ByteArrayOutputStream downloadData = (ByteArrayOutputStream) segmentDownload.getOutputStream(); - try { - ffmpegStdIn.write(downloadData.toByteArray()); - } catch (IOException e) { - LOG.error("Couldn't stream segment data to FFmpeg", e); + while (!queue.isEmpty() && !Thread.currentThread().isInterrupted()) { + try { + Future future = queue.peek(); + if (running && future.isDone()) { + queue.take(); + SegmentDownload segmentDownload = future.get(); + ByteArrayOutputStream downloadData = (ByteArrayOutputStream) segmentDownload.getOutputStream(); + downloadData.writeTo(ffmpegStdIn); + } else { + // first download in queue not finished, let's continue with other stuff + break; } - }); - } else { - // first download in queue not finished, let's continue with other stuff - break; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOG.error("Segment download failed for model {}", model, e); + } } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - LOG.error("Segment download failed for model {}", model, e); + } finally { + ffmpegStreamLock.unlock(); } - } + }); } @Override diff --git a/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java index b8e1eb64..1b512ec3 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java @@ -81,6 +81,7 @@ public class SegmentDownload implements Callable { out.write(b, 0, length); BandwidthMeter.add(length); } + out.flush(); return true; } else { throw new HttpException(response.code(), response.message()); @@ -96,4 +97,8 @@ public class SegmentDownload implements Callable { public OutputStream getOutputStream() { return out; } + + public URL getUrl() { + return url; + } } \ No newline at end of file