From 8b55e9d374245cc7e4d0c9dc4eb869abe78cfb17 Mon Sep 17 00:00:00 2001 From: 0xb00bface <0xboobface@gmail.com> Date: Thu, 24 Dec 2020 12:28:58 +0100 Subject: [PATCH] Reduce number of threads to read FFmpeg output --- .../ctbrec/io/FfmpegStreamRedirector.java | 52 +++++++++++++++ .../ctbrec/io/MissedSegmentsStatistics.java | 66 +++++++++++++++++++ .../src/main/java/ctbrec/recorder/FFmpeg.java | 37 +++++++---- 3 files changed, 144 insertions(+), 11 deletions(-) create mode 100644 common/src/main/java/ctbrec/io/FfmpegStreamRedirector.java create mode 100644 common/src/main/java/ctbrec/io/MissedSegmentsStatistics.java diff --git a/common/src/main/java/ctbrec/io/FfmpegStreamRedirector.java b/common/src/main/java/ctbrec/io/FfmpegStreamRedirector.java new file mode 100644 index 00000000..2be0e63f --- /dev/null +++ b/common/src/main/java/ctbrec/io/FfmpegStreamRedirector.java @@ -0,0 +1,52 @@ +package ctbrec.io; + +import static java.util.concurrent.TimeUnit.*; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.ScheduledExecutorService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FfmpegStreamRedirector implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(FfmpegStreamRedirector.class); + + private InputStream in; + private OutputStream out; + private boolean keepGoing = true; + private ScheduledExecutorService executor; + + public FfmpegStreamRedirector(ScheduledExecutorService executor, InputStream in, OutputStream out) { + super(); + this.executor = executor; + this.in = in; + this.out = out; + } + + @Override + public void run() { + try { + int available = 0; + while ((available = in.available()) > 0) { + byte[] buffer = new byte[available]; + int length = in.read(buffer); + out.write(buffer, 0, length); + } + if (keepGoing) { + executor.schedule(this, 100, MILLISECONDS); + } + } catch (Exception e) { + LOG.debug("Error while reading from FFmpeg output stream: {}", e.getLocalizedMessage()); + keepGoing = false; + } + } + + public boolean keepGoing() { + return keepGoing; + } + + public void setKeepGoing(boolean keepGoing) { + this.keepGoing = keepGoing; + } +} diff --git a/common/src/main/java/ctbrec/io/MissedSegmentsStatistics.java b/common/src/main/java/ctbrec/io/MissedSegmentsStatistics.java new file mode 100644 index 00000000..fbb439f9 --- /dev/null +++ b/common/src/main/java/ctbrec/io/MissedSegmentsStatistics.java @@ -0,0 +1,66 @@ +package ctbrec.io; + +import java.time.Duration; +import java.time.Instant; +import java.util.ConcurrentModificationException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import ctbrec.Model; +import ctbrec.UnknownModel; + +public class MissedSegmentsStatistics { + + private static final Logger LOG = LoggerFactory.getLogger(MissedSegmentsStatistics.class); + + private static Map missegSegmentsCount = new HashMap<>(); + private static Instant lastOutput = Instant.EPOCH; + private static Thread t; + static { + increase(new UnknownModel(), (short) 0); + } + + private MissedSegmentsStatistics() {} + + public static void increase(Model model, short amount) { + short total = missegSegmentsCount.getOrDefault(model, (short) 0); + missegSegmentsCount.put(model, (short) (total + amount)); + if (t == null) { + t = new Thread(() -> { + while (true) { + printStatistics(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + e.printStackTrace(); + } + } + }); + t.setName("MissingSegments logger"); + t.setPriority(Thread.MIN_PRIORITY); + t.setDaemon(true); + t.start(); + } + } + + private static void printStatistics() { + Instant now = Instant.now(); + StringBuilder sb = new StringBuilder("Missed segments stats:\n"); + if (Duration.between(lastOutput, now).getSeconds() > 120) { + try { + for (Entry entry : missegSegmentsCount.entrySet()) { + sb.append('\t').append(entry.getKey().getName()).append(": ").append(entry.getValue()).append('\n'); + } + LOG.debug(sb.toString()); + lastOutput = now; + } catch(ConcurrentModificationException e) { + // ignore + } + } + } +} diff --git a/common/src/main/java/ctbrec/recorder/FFmpeg.java b/common/src/main/java/ctbrec/recorder/FFmpeg.java index fdfa3bb1..9d13e511 100644 --- a/common/src/main/java/ctbrec/recorder/FFmpeg.java +++ b/common/src/main/java/ctbrec/recorder/FFmpeg.java @@ -6,30 +6,45 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.file.Files; import java.util.Arrays; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import ctbrec.io.DevNull; -import ctbrec.io.StreamRedirector; +import ctbrec.io.FfmpegStreamRedirector; import ctbrec.recorder.download.ProcessExitedUncleanException; public class FFmpeg { private static final Logger LOG = LoggerFactory.getLogger(FFmpeg.class); + private static ScheduledExecutorService processOutputReader = Executors.newScheduledThreadPool(2, createThreadFactory("FFmpeg output stream reader")); + private Process process; private boolean logOutput = false; private Consumer startCallback; private Consumer exitCallback; private File ffmpegLog = null; private OutputStream ffmpegLogStream; - private Thread stdout; - private Thread stderr; + private FfmpegStreamRedirector stdoutRedirector; + private FfmpegStreamRedirector stderrRedirector; private FFmpeg() {} + private static ThreadFactory createThreadFactory(String name) { + return r -> { + Thread t = new Thread(r); + t.setName(name); + t.setDaemon(true); + t.setPriority(Thread.MIN_PRIORITY); + return t; + }; + } + public void exec(String[] cmdline, String[] env, File executionDir) throws IOException, InterruptedException { LOG.debug("FFmpeg command line: {}", Arrays.toString(cmdline)); process = Runtime.getRuntime().exec(cmdline, env, executionDir); @@ -43,13 +58,13 @@ public class FFmpeg { setupLogging(); } - private void afterExit(int exitCode) throws InterruptedException, IOException { + private void afterExit(int exitCode) throws IOException { LOG.debug("FFmpeg exit code was {}", exitCode); - notifyExitCallback(exitCode); - stdout.join(); - stderr.join(); ffmpegLogStream.flush(); ffmpegLogStream.close(); + stdoutRedirector.setKeepGoing(false); + stderrRedirector.setKeepGoing(false); + notifyExitCallback(exitCode); if (exitCode != 1) { if (ffmpegLog != null && ffmpegLog.exists()) { Files.delete(ffmpegLog.toPath()); @@ -70,10 +85,10 @@ public class FFmpeg { } else { ffmpegLogStream = new DevNull(); } - stdout = new Thread(new StreamRedirector(process.getInputStream(), ffmpegLogStream)); - stderr = new Thread(new StreamRedirector(process.getErrorStream(), ffmpegLogStream)); - stdout.start(); - stderr.start(); + stdoutRedirector = new FfmpegStreamRedirector(processOutputReader, process.getInputStream(), ffmpegLogStream); + stderrRedirector = new FfmpegStreamRedirector(processOutputReader, process.getErrorStream(), ffmpegLogStream); + processOutputReader.submit(stdoutRedirector); + processOutputReader.submit(stderrRedirector); } private void notifyStartCallback(Process process) {