Fix race condition in code for streaming segments to FFmpeg

This caused stuttering and jumps mostly in the beginning of a recording
This commit is contained in:
0xb00bface 2021-01-23 16:24:24 +01:00
parent 9be4c07049
commit 8855591f0f
2 changed files with 29 additions and 20 deletions

View File

@ -13,6 +13,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -40,6 +42,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
private transient Process ffmpegProcess; private transient Process ffmpegProcess;
private transient OutputStream ffmpegStdIn; private transient OutputStream ffmpegStdIn;
private transient BlockingQueue<Future<SegmentDownload>> queue = new LinkedBlockingQueue<>(); private transient BlockingQueue<Future<SegmentDownload>> queue = new LinkedBlockingQueue<>();
private transient Lock ffmpegStreamLock = new ReentrantLock();
public MergedFfmpegHlsDownload(HttpClient client) { public MergedFfmpegHlsDownload(HttpClient client) {
super(client); super(client);
@ -78,30 +81,31 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
} }
private void streamSegmentDataToFfmpeg() { private void streamSegmentDataToFfmpeg() {
while (!queue.isEmpty() && !Thread.currentThread().isInterrupted()) { downloadExecutor.submit(() -> {
ffmpegStreamLock.lock();
try { try {
Future<SegmentDownload> future = queue.peek(); while (!queue.isEmpty() && !Thread.currentThread().isInterrupted()) {
if (running && future.isDone()) { try {
queue.take(); Future<SegmentDownload> future = queue.peek();
SegmentDownload segmentDownload = future.get(); if (running && future.isDone()) {
downloadExecutor.submit(() -> { queue.take();
ByteArrayOutputStream downloadData = (ByteArrayOutputStream) segmentDownload.getOutputStream(); SegmentDownload segmentDownload = future.get();
try { ByteArrayOutputStream downloadData = (ByteArrayOutputStream) segmentDownload.getOutputStream();
ffmpegStdIn.write(downloadData.toByteArray()); downloadData.writeTo(ffmpegStdIn);
} catch (IOException e) { } else {
LOG.error("Couldn't stream segment data to FFmpeg", e); // first download in queue not finished, let's continue with other stuff
break;
} }
}); } catch (InterruptedException e) {
} else { Thread.currentThread().interrupt();
// first download in queue not finished, let's continue with other stuff } catch (Exception e) {
break; LOG.error("Segment download failed for model {}", model, e);
}
} }
} catch (InterruptedException e) { } finally {
Thread.currentThread().interrupt(); ffmpegStreamLock.unlock();
} catch (Exception e) {
LOG.error("Segment download failed for model {}", model, e);
} }
} });
} }
@Override @Override

View File

@ -81,6 +81,7 @@ public class SegmentDownload implements Callable<SegmentDownload> {
out.write(b, 0, length); out.write(b, 0, length);
BandwidthMeter.add(length); BandwidthMeter.add(length);
} }
out.flush();
return true; return true;
} else { } else {
throw new HttpException(response.code(), response.message()); throw new HttpException(response.code(), response.message());
@ -96,4 +97,8 @@ public class SegmentDownload implements Callable<SegmentDownload> {
public OutputStream getOutputStream() { public OutputStream getOutputStream() {
return out; return out;
} }
public URL getUrl() {
return url;
}
} }