Merge branch 'fix-choking-dev' into reusedname-dev

This commit is contained in:
reusedname 2025-02-15 14:11:18 +05:00
commit 8bf9bb2b18
10 changed files with 277 additions and 133 deletions

View File

@ -93,6 +93,11 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.0.pr1</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.2</version>
</dependency>
</dependencies>

View File

@ -37,6 +37,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
@Slf4j
public abstract class HttpClient {
@Getter
private static final ConnectionPool GLOBAL_HTTP_CONN_POOL = new ConnectionPool(10, 2, TimeUnit.MINUTES);
@Getter

View File

@ -1,67 +1,43 @@
package ctbrec.recorder;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.ProcessBuilder.Redirect;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import ctbrec.io.DevNull;
import ctbrec.io.ProcessStreamRedirector;
import ctbrec.recorder.download.ProcessExitedUncleanException;
@Slf4j
public class FFmpeg {
private static final Logger LOG = LoggerFactory.getLogger(FFmpeg.class);
private static ScheduledExecutorService processOutputReader = Executors.newScheduledThreadPool(2, createThreadFactory("FFmpeg output stream reader"));
private Process process;
private boolean logOutput = false;
private Consumer<Process> startCallback;
private Consumer<Integer> exitCallback;
private File ffmpegLog = null;
private OutputStream ffmpegLogStream;
private ProcessStreamRedirector stdoutRedirector;
private ProcessStreamRedirector stderrRedirector;
private FFmpeg() {}
private static ThreadFactory createThreadFactory(String name) {
return r -> {
Thread t = new Thread(r);
t.setName(name);
t.setDaemon(true);
t.setPriority(Thread.MIN_PRIORITY);
return t;
};
}
public void exec(String[] cmdline, String[] env, File executionDir) throws IOException {
LOG.trace("FFmpeg command line: {}", Arrays.toString(cmdline));
process = Runtime.getRuntime().exec(cmdline, env, executionDir);
afterStart();
log.trace("FFmpeg command line: {}", Arrays.toString(cmdline));
// process = Runtime.getRuntime().exec(cmdline, env, executionDir);
var builder = new ProcessBuilder(cmdline);
// builder.environment().();
builder.directory(executionDir);
setupLogging(builder);
process = builder.start();
notifyStartCallback(process);
}
private void afterStart() throws IOException {
notifyStartCallback(process);
setupLogging();
}
public void shutdown(int exitCode) throws IOException {
LOG.trace("FFmpeg exit code was {}", exitCode);
ffmpegLogStream.flush();
ffmpegLogStream.close();
stdoutRedirector.setKeepGoing(false);
stderrRedirector.setKeepGoing(false);
process.destroy();
log.trace("FFmpeg exit code was {}", exitCode);
notifyExitCallback(exitCode);
if (exitCode != 1) {
if (ffmpegLog != null && ffmpegLog.exists()) {
@ -72,28 +48,27 @@ public class FFmpeg {
}
}
private void setupLogging() throws IOException {
private void setupLogging(ProcessBuilder builder) throws IOException {
if (logOutput) {
if (ffmpegLog == null) {
ffmpegLog = File.createTempFile("ffmpeg_", ".log");
}
LOG.trace("Logging FFmpeg output to {}", ffmpegLog);
log.trace("Logging FFmpeg output to {}", ffmpegLog);
ffmpegLog.deleteOnExit();
ffmpegLogStream = new FileOutputStream(ffmpegLog);
builder.redirectOutput(Redirect.to(ffmpegLog));
builder.redirectErrorStream(true);
} else {
ffmpegLogStream = new DevNull();
builder.redirectOutput(Redirect.DISCARD);
builder.redirectError(Redirect.DISCARD);
}
stdoutRedirector = new ProcessStreamRedirector(processOutputReader, process.getInputStream(), ffmpegLogStream);
stderrRedirector = new ProcessStreamRedirector(processOutputReader, process.getErrorStream(), ffmpegLogStream);
processOutputReader.submit(stdoutRedirector);
processOutputReader.submit(stderrRedirector);
}
private void notifyStartCallback(Process process) {
try {
startCallback.accept(process);
} catch(Exception e) {
LOG.error("Exception in onStart callback", e);
log.error("Exception in onStart callback", e);
}
}
@ -101,7 +76,7 @@ public class FFmpeg {
try {
exitCallback.accept(exitCode);
} catch(Exception e) {
LOG.error("Exception in onExit callback", e);
log.error("Exception in onExit callback", e);
}
}
@ -110,7 +85,7 @@ public class FFmpeg {
try {
shutdown(exitCode);
} catch (IOException e) {
LOG.error("Error while shutting down FFmpeg process", e);
log.error("Error while shutting down FFmpeg process", e);
}
return exitCode;
}

View File

@ -1,6 +1,7 @@
package ctbrec.recorder;
import com.google.common.eventbus.Subscribe;
import ctbrec.*;
import ctbrec.Recording.State;
import ctbrec.event.*;
@ -29,16 +30,19 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import com.google.common.collect.Ordering;
import static ctbrec.Recording.State.WAITING;
import static ctbrec.SubsequentAction.*;
import static ctbrec.event.Event.Type.MODEL_ONLINE;
import static java.lang.Thread.MAX_PRIORITY;
import static java.lang.Thread.MIN_PRIORITY;
import static java.util.concurrent.TimeUnit.SECONDS;
@Slf4j
public class SimplifiedLocalRecorder implements Recorder {
public static final Statistics STATS = new Statistics();
public static final boolean IGNORE_CACHE = true;
private final List<Model> models = Collections.synchronizedList(new ArrayList<>());
private final Config config;
@ -51,28 +55,24 @@ public class SimplifiedLocalRecorder implements Recorder {
private final RecordingManager recordingManager;
private final RecordingPreconditions preconditions;
private final BlockingQueue<Recording> recordings = new LinkedBlockingQueue<>();
// thread pools for downloads and post-processing
private final ScheduledExecutorService scheduler;
private final ExecutorService playlistDownloadPool = Executors.newFixedThreadPool(10);
private final ExecutorService segmentDownloadPool = Executors.newFixedThreadPool(10);
private final ExecutorService postProcessing;
private final ThreadPoolScaler threadPoolScaler;
private final ExecutorService segmentDownloadPool = Executors.newVirtualThreadPerTaskExecutor();
private final ExecutorService recordingLoopPool = Executors.newVirtualThreadPerTaskExecutor();
private final ThreadPoolExecutor postProcessing;
private final Thread maintenanceThread;
private long lastSpaceCheck;
public SimplifiedLocalRecorder(Config config, List<Site> sites) throws IOException {
this.config = config;
client = new RecorderHttpClient(config);
scheduler = Executors.newScheduledThreadPool(5, createThreadFactory("Download", MAX_PRIORITY));
threadPoolScaler = new ThreadPoolScaler((ThreadPoolExecutor) scheduler, 5);
recordingManager = new RecordingManager(config, sites);
loadModels(sites);
int ppThreads = config.getSettings().postProcessingThreads;
BlockingQueue<Runnable> ppQueue = new LinkedBlockingQueue<>();
postProcessing = new ThreadPoolExecutor(ppThreads, ppThreads, 5, TimeUnit.MINUTES, ppQueue, createThreadFactory("PP", MIN_PRIORITY));
running = true;
registerEventBusListener();
@ -82,21 +82,20 @@ public class SimplifiedLocalRecorder implements Recorder {
log.info("Models to record: {}", models);
log.info("Saving recordings in {}", config.getSettings().recordingsDir);
startRecordingLoop();
maintenanceThread = startMaintenanceLoop();
}
private void startRecordingLoop() {
new Thread(() -> {
while (running) {
Recording rec = recordings.poll();
if (rec != null) {
processRecording(rec);
}
private Thread startMaintenanceLoop() {
var t = new Thread(() -> {
while (running && !Thread.currentThread().isInterrupted()) {
checkFreeSpace();
threadPoolScaler.tick();
waitABit(100);
//threadPoolScaler.tick();
waitABit(1000);
}
}).start();
});
t.setName("Recording loop");
t.start();
return t;
}
private void checkFreeSpace() {
@ -123,33 +122,42 @@ public class SimplifiedLocalRecorder implements Recorder {
}
}
private void processRecording(Recording recording) {
if (recording.getCurrentIteration().isDone()) {
if (recording.getRecordingProcess().isRunning()) {
try {
Instant rescheduleAt = recording.getCurrentIteration().get().getRescheduleTime();
Duration duration = Duration.between(Instant.now(), rescheduleAt);
long delayInMillis = Math.max(0, duration.toMillis());
log.trace("Current iteration is done {}. Recording status {}. Rescheduling in {}ms", recording.getModel().getName(), recording.getStatus().name(), delayInMillis);
scheduleRecording(recording, delayInMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
fail(recording);
} catch (ExecutionException e) {
log.error("Error while recording model {}. Stopping recording.", recording.getModel(), e);
fail(recording);
private void singleRecordingLoop(Recording recording) {
while (recording.getRecordingProcess().isRunning()) {
try {
// run single iteration
recording.getRecordingProcess().call();
if (recording.getModel().isSuspended()) {
log.info("Recording process for suspended model found: {}. Stopping now", recording.getModel());
stopRecordingProcess(recording);
submitPostProcessingJob(recording);
break;
}
} else {
removeRecordingProcess(recording);
if (deleteIfEmpty(recording)) {
return;
}
submitPostProcessingJob(recording);
tryRestartRecording(recording.getModel());
// wait necessary time
Instant rescheduleAt = recording.getRecordingProcess().getRescheduleTime();
Duration duration = Duration.between(Instant.now(), rescheduleAt);
long delayInMillis = Math.max(0, duration.toMillis());
log.trace("Current iteration is done {}. Recording status {}. Rescheduling in {}ms", recording.getModel().getName(), recording.getStatus().name(), delayInMillis);
Thread.sleep(delayInMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
fail(recording);
return;
} catch (Exception e) {
log.error("Error while recording model {}. Stopping recording.", recording.getModel(), e);
fail(recording);
return;
}
} else {
recordings.add(recording);
}
removeRecordingProcess(recording);
if (deleteIfEmpty(recording)) {
return;
}
submitPostProcessingJob(recording);
tryRestartRecording(recording.getModel());
}
private void removeRecordingProcess(Recording rec) {
@ -171,19 +179,6 @@ public class SimplifiedLocalRecorder implements Recorder {
tryRestartRecording(recording.getModel());
}
private void scheduleRecording(Recording recording, long delayInMillis) {
if (recording.getModel().isSuspended()) {
log.info("Recording process for suspended model found: {}. Stopping now", recording.getModel());
stopRecordingProcess(recording);
submitPostProcessingJob(recording);
return;
}
ScheduledFuture<RecordingProcess> future = scheduler.schedule(recording.getRecordingProcess(), delayInMillis, TimeUnit.MILLISECONDS);
recording.setCurrentIteration(future);
recording.getSelectedResolution();
recordings.add(recording);
}
private void loadModels(List<Site> sites) {
config.getSettings().models
.stream()
@ -215,9 +210,16 @@ public class SimplifiedLocalRecorder implements Recorder {
private void stopRecordings() {
log.info("Stopping all recordings");
for (Recording recording : recordings) {
recording.getRecordingProcess().stop();
recording.getRecordingProcess().awaitEnd();
recorderLock.lock();
try {
for (Recording recording : recordingProcesses) {
recording.getRecordingProcess().stop();
}
for (Recording recording : recordingProcesses) {
recording.getRecordingProcess().awaitEnd();
}
} finally {
recorderLock.unlock();
}
waitForRecordingsToTerminate();
log.info("Recordings have been stopped");
@ -516,11 +518,11 @@ public class SimplifiedLocalRecorder implements Recorder {
public void shutdown(boolean immediately) {
log.info("Shutting down");
shuttingDown = true;
maintenanceThread.interrupt();
if (!immediately) {
try {
stopRecordings();
shutdownPool("Scheduler", scheduler, 60);
shutdownPool("PlaylistDownloadPool", playlistDownloadPool, 60);
shutdownPool("Recording loops", recordingLoopPool, 60);
shutdownPool("SegmentDownloadPool", segmentDownloadPool, 60);
shutdownPool("Post-Processing", postProcessing, 600);
} catch (InterruptedException e) {
@ -726,23 +728,35 @@ public class SimplifiedLocalRecorder implements Recorder {
}
private void tryRestartRecording(Model model) {
if (!running) {
if (!running || shuttingDown) {
// recorder is not in recording state
return;
}
try {
boolean modelInRecordingList = isTracked(model);
boolean online = model.isOnline(IGNORE_CACHE);
if (modelInRecordingList && online) {
log.info("Restarting recording for model {}", model);
startRecordingProcess(model);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Couldn't restart recording for model {}", model);
} catch (Exception e) {
log.error("Couldn't restart recording for model {}", model);
boolean modelInRecordingList = isTracked(model);
if (modelInRecordingList) {
// .isOnline() check does blocking http request, so do this async
recordingLoopPool.submit(() -> {
try {
boolean online = model.isOnline(IGNORE_CACHE);
if (online) {
log.info("Restarting recording for model {}", model);
try {
recorderLock.lock();
startRecordingProcess(model);
} finally {
recorderLock.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Couldn't restart recording for model {}", model);
} catch (Exception e) {
log.error("Couldn't restart recording for model {}", model);
}
});
}
}
@ -770,8 +784,8 @@ public class SimplifiedLocalRecorder implements Recorder {
});
}
private CompletableFuture<Void> startRecordingProcess(Model model) {
return CompletableFuture.runAsync(() -> {
private void startRecordingProcess(Model model) {
recordingLoopPool.submit(() -> {
recorderLock.lock();
try {
preconditions.check(model);
@ -781,7 +795,7 @@ public class SimplifiedLocalRecorder implements Recorder {
setRecordingStatus(rec, State.RECORDING);
rec.getModel().setLastRecorded(rec.getStartDate());
recordingManager.saveRecording(rec);
scheduleRecording(rec, 0);
recordingLoopPool.submit(() -> {singleRecordingLoop(rec);});
} catch (RecordUntilExpiredException e) {
log.info("Precondition not met to record {}: {}", model, e.getLocalizedMessage());
executeRecordUntilSubsequentAction(model);
@ -792,12 +806,12 @@ public class SimplifiedLocalRecorder implements Recorder {
} finally {
recorderLock.unlock();
}
}, segmentDownloadPool);
});
}
private ThreadFactory createThreadFactory(String name, int priority) {
return r -> {
Thread t = new Thread(r);
Thread t = Thread.ofPlatform().unstarted(r);
t.setName(name + " " + UUID.randomUUID().toString().substring(0, 8));
t.setDaemon(true);
t.setPriority(priority);

View File

@ -0,0 +1,21 @@
package ctbrec.recorder;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.collections.buffer.CircularFifoBuffer;
public class Statistics {
public ReadWriteLock statsLock = new ReentrantReadWriteLock();
public CircularFifoBuffer stats = new CircularFifoBuffer(100);
public void add(Object val) {
statsLock.writeLock().lock();
try {
stats.add(val);
} finally {
statsLock.writeLock().unlock();
}
}
}

View File

@ -62,4 +62,8 @@ public interface RecordingProcess extends Callable<RecordingProcess> {
void awaitEnd();
AtomicLong getDownloadedBytes();
default String getStats() {
return "";
}
}

View File

@ -14,6 +14,7 @@ import ctbrec.io.HttpClient;
import ctbrec.io.HttpConstants;
import ctbrec.io.HttpException;
import ctbrec.recorder.InvalidPlaylistException;
import ctbrec.recorder.SimplifiedLocalRecorder;
import ctbrec.recorder.download.AbstractDownload;
import ctbrec.recorder.download.HttpHeaderFactory;
import ctbrec.recorder.download.StreamSource;
@ -108,6 +109,7 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
@Override
public AbstractHlsDownload call() throws Exception {
SimplifiedLocalRecorder.STATS.add(Duration.between(rescheduleTime, Instant.now()).toMillis());
try {
if (segmentPlaylistUrl == null) {
@ -152,7 +154,7 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
model.delay();
stop();
} else {
rescheduleTime = beforeLastPlaylistRequest; // try again as fast as possible
rescheduleTime = Instant.now(); // try again as fast as possible
}
} catch (EOFException e) {
// end of playlist reached
@ -420,6 +422,8 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
private void calculateRescheduleTime() {
rescheduleTime = beforeLastPlaylistRequest.plusMillis(1000);
if (Instant.now().isAfter(rescheduleTime))
rescheduleTime = Instant.now();
recordingEvents.add(RecordingEvent.of("next playlist download scheduled for " + rescheduleTime.toString()));
}

View File

@ -32,6 +32,14 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
protected OutputStream ffmpegStdIn;
protected BlockingQueue<Future<SegmentDownload>> queue = new LinkedBlockingQueue<>();
protected Lock ffmpegStreamLock = new ReentrantLock();
public String getStats() {
String text = (running ? "RUN" : "stp") + String.format(" %d: ", queue.size());
for (var elem : queue) {
text += elem.isDone() ? "|" : "-";
}
return text;
}
public MergedFfmpegHlsDownload(HttpClient client) {
super(client);

View File

@ -0,0 +1,108 @@
package ctbrec.recorder.server;
import ctbrec.Config;
import ctbrec.io.HttpClient;
import ctbrec.recorder.Recorder;
import ctbrec.recorder.SimplifiedLocalRecorder;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.json.JSONObject;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.URLDecoder;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.text.MessageFormat;
import static java.nio.charset.StandardCharsets.UTF_8;
import static javax.servlet.http.HttpServletResponse.*;
@Slf4j
public class DebugServlet extends AbstractCtbrecServlet {
public static final String BASE_URL = "/debug";
private static final Pattern URL_PATTERN_DEBUG_STACK = Pattern.compile(BASE_URL + "/stack(/.*?)");
private static final Pattern URL_PATTERN_DEBUG_STATS = Pattern.compile(BASE_URL + "/stats");
protected Recorder recorder;
public DebugServlet(Recorder rec)
{
this.recorder = rec;
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) {
String requestURI = req.getRequestURI().substring(req.getContextPath().length());
try {
// boolean authenticated = checkAuthentication(req, "");
// if (!authenticated) {
// sendResponse(resp, SC_UNAUTHORIZED, HMAC_ERROR_DOCUMENT);
// return;
// }
Matcher m;
if ((m = URL_PATTERN_DEBUG_STACK.matcher(requestURI)).matches()) {
String threadUrl = URLDecoder.decode(m.group(1), UTF_8);
var stacks = Thread.getAllStackTraces();
var box = new Object() { String text = ""; };//stacks.toString();
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
// e.printStackTrace(pw);
stacks.forEach((thread, stack) -> {
box.text += String.format("%s:\n", thread.getName());
var idx = 0;
for (var s : stack) {
box.text += String.format("[%d] %s\n", idx++, s.toString());
}
});
log.debug("Stacks Request {}", threadUrl);
resp.setContentType("text/plain");
sendResponse(resp, SC_OK, box.text);
} else if ((m = URL_PATTERN_DEBUG_STATS.matcher(requestURI)).matches()) {
String text = "<html><body style=\"body { font-family: Monospace; }\">";
text += String.format("GLOBAL_HTTP_CONN_POOL: connectionCount=%d, idleConnectionCount=%d\n",
HttpClient.getGLOBAL_HTTP_CONN_POOL().connectionCount(),
HttpClient.getGLOBAL_HTTP_CONN_POOL().idleConnectionCount());
var rlock = SimplifiedLocalRecorder.STATS.statsLock.readLock();
rlock.lock();
try {
text += String.format("Time between download iterations = %s\n", SimplifiedLocalRecorder.STATS.stats.toString());
} finally {
rlock.unlock();
}
text += "\nRecording stats:\n<table>";
text = text.replace("\n", "<br>");
for (var rec : recorder.getRecordings()) {
var proc = rec.getRecordingProcess();
if (proc == null) continue;
text += String.format("<tr><td>%s</td><td>%s</td></tr>\n", proc.getModel().getDisplayName(), proc.getStats());
}
text += "</table>";
text += "</body></html>";
log.debug("Stats Request");
resp.setContentType("text/html");
sendResponse(resp, SC_OK, text);
} else
sendResponse(resp, SC_NOT_FOUND, "");
} catch (Exception e) {
log.error(INTERNAL_SERVER_ERROR, e);
sendResponse(resp, SC_INTERNAL_SERVER_ERROR, INTERNAL_SERVER_ERROR);
}
}
}

View File

@ -258,6 +258,10 @@ public class HttpServer {
ModelServlet modelServlet = new ModelServlet(config);
holder = new ServletHolder(modelServlet);
defaultContext.addServlet(holder, ModelServlet.BASE_URL + "/*");
DebugServlet debugServlet = new DebugServlet(recorder);
holder = new ServletHolder(debugServlet);
defaultContext.addServlet(holder, DebugServlet.BASE_URL + "/*");
if (this.config.getSettings().webinterface) {
startWebInterface(defaultContext, basicAuthContext);