Reduce number of threads to read FFmpeg output

This commit is contained in:
0xb00bface 2020-12-24 12:28:58 +01:00
parent 9a270bb84b
commit 8b55e9d374
3 changed files with 144 additions and 11 deletions

View File

@ -0,0 +1,52 @@
package ctbrec.io;
import static java.util.concurrent.TimeUnit.*;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ScheduledExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FfmpegStreamRedirector implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(FfmpegStreamRedirector.class);
private InputStream in;
private OutputStream out;
private boolean keepGoing = true;
private ScheduledExecutorService executor;
public FfmpegStreamRedirector(ScheduledExecutorService executor, InputStream in, OutputStream out) {
super();
this.executor = executor;
this.in = in;
this.out = out;
}
@Override
public void run() {
try {
int available = 0;
while ((available = in.available()) > 0) {
byte[] buffer = new byte[available];
int length = in.read(buffer);
out.write(buffer, 0, length);
}
if (keepGoing) {
executor.schedule(this, 100, MILLISECONDS);
}
} catch (Exception e) {
LOG.debug("Error while reading from FFmpeg output stream: {}", e.getLocalizedMessage());
keepGoing = false;
}
}
public boolean keepGoing() {
return keepGoing;
}
public void setKeepGoing(boolean keepGoing) {
this.keepGoing = keepGoing;
}
}

View File

@ -0,0 +1,66 @@
package ctbrec.io;
import java.time.Duration;
import java.time.Instant;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ctbrec.Model;
import ctbrec.UnknownModel;
public class MissedSegmentsStatistics {
private static final Logger LOG = LoggerFactory.getLogger(MissedSegmentsStatistics.class);
private static Map<Model, Short> missegSegmentsCount = new HashMap<>();
private static Instant lastOutput = Instant.EPOCH;
private static Thread t;
static {
increase(new UnknownModel(), (short) 0);
}
private MissedSegmentsStatistics() {}
public static void increase(Model model, short amount) {
short total = missegSegmentsCount.getOrDefault(model, (short) 0);
missegSegmentsCount.put(model, (short) (total + amount));
if (t == null) {
t = new Thread(() -> {
while (true) {
printStatistics();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
});
t.setName("MissingSegments logger");
t.setPriority(Thread.MIN_PRIORITY);
t.setDaemon(true);
t.start();
}
}
private static void printStatistics() {
Instant now = Instant.now();
StringBuilder sb = new StringBuilder("Missed segments stats:\n");
if (Duration.between(lastOutput, now).getSeconds() > 120) {
try {
for (Entry<Model, Short> entry : missegSegmentsCount.entrySet()) {
sb.append('\t').append(entry.getKey().getName()).append(": ").append(entry.getValue()).append('\n');
}
LOG.debug(sb.toString());
lastOutput = now;
} catch(ConcurrentModificationException e) {
// ignore
}
}
}
}

View File

@ -6,30 +6,45 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ctbrec.io.DevNull;
import ctbrec.io.StreamRedirector;
import ctbrec.io.FfmpegStreamRedirector;
import ctbrec.recorder.download.ProcessExitedUncleanException;
public class FFmpeg {
private static final Logger LOG = LoggerFactory.getLogger(FFmpeg.class);
private static ScheduledExecutorService processOutputReader = Executors.newScheduledThreadPool(2, createThreadFactory("FFmpeg output stream reader"));
private Process process;
private boolean logOutput = false;
private Consumer<Process> startCallback;
private Consumer<Integer> exitCallback;
private File ffmpegLog = null;
private OutputStream ffmpegLogStream;
private Thread stdout;
private Thread stderr;
private FfmpegStreamRedirector stdoutRedirector;
private FfmpegStreamRedirector stderrRedirector;
private FFmpeg() {}
private static ThreadFactory createThreadFactory(String name) {
return r -> {
Thread t = new Thread(r);
t.setName(name);
t.setDaemon(true);
t.setPriority(Thread.MIN_PRIORITY);
return t;
};
}
public void exec(String[] cmdline, String[] env, File executionDir) throws IOException, InterruptedException {
LOG.debug("FFmpeg command line: {}", Arrays.toString(cmdline));
process = Runtime.getRuntime().exec(cmdline, env, executionDir);
@ -43,13 +58,13 @@ public class FFmpeg {
setupLogging();
}
private void afterExit(int exitCode) throws InterruptedException, IOException {
private void afterExit(int exitCode) throws IOException {
LOG.debug("FFmpeg exit code was {}", exitCode);
notifyExitCallback(exitCode);
stdout.join();
stderr.join();
ffmpegLogStream.flush();
ffmpegLogStream.close();
stdoutRedirector.setKeepGoing(false);
stderrRedirector.setKeepGoing(false);
notifyExitCallback(exitCode);
if (exitCode != 1) {
if (ffmpegLog != null && ffmpegLog.exists()) {
Files.delete(ffmpegLog.toPath());
@ -70,10 +85,10 @@ public class FFmpeg {
} else {
ffmpegLogStream = new DevNull();
}
stdout = new Thread(new StreamRedirector(process.getInputStream(), ffmpegLogStream));
stderr = new Thread(new StreamRedirector(process.getErrorStream(), ffmpegLogStream));
stdout.start();
stderr.start();
stdoutRedirector = new FfmpegStreamRedirector(processOutputReader, process.getInputStream(), ffmpegLogStream);
stderrRedirector = new FfmpegStreamRedirector(processOutputReader, process.getErrorStream(), ffmpegLogStream);
processOutputReader.submit(stdoutRedirector);
processOutputReader.submit(stderrRedirector);
}
private void notifyStartCallback(Process process) {