From b4d0442f2c291d13f4ef2f1bfc22dfa975576413 Mon Sep 17 00:00:00 2001 From: reusedname <155286845+reusedname@users.noreply.github.com> Date: Sun, 24 Nov 2024 16:06:43 +0500 Subject: [PATCH 1/2] Hlsdl: use virtual thread, add bandwith meter support (cherry picked from commit 4ca9bd163d3d88f1032c83994b972d6bbf5ee3b6) --- .../ctbrec/recorder/download/hls/Hlsdl.java | 111 +++++++++++++++--- 1 file changed, 96 insertions(+), 15 deletions(-) diff --git a/common/src/main/java/ctbrec/recorder/download/hls/Hlsdl.java b/common/src/main/java/ctbrec/recorder/download/hls/Hlsdl.java index d4abfc61..b598586f 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/Hlsdl.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/Hlsdl.java @@ -4,6 +4,8 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.lang.ProcessBuilder.Redirect; import java.nio.file.Files; import java.util.Arrays; import java.util.concurrent.Executors; @@ -11,18 +13,30 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.function.Consumer; +import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import ctbrec.io.BandwidthMeter; import ctbrec.io.DevNull; import ctbrec.io.ProcessStreamRedirector; +import ctbrec.io.json.ObjectMapperFactory; import ctbrec.recorder.download.ProcessExitedUncleanException; +import lombok.Data; public class Hlsdl { private static final Logger LOG = LoggerFactory.getLogger(Hlsdl.class); - private static ScheduledExecutorService processOutputReader = Executors.newScheduledThreadPool(2, createThreadFactory("hlsdl output stream reader")); + // private static ScheduledExecutorService processOutputReader = Executors.newScheduledThreadPool(2, createThreadFactory("hlsdl output stream reader")); + + private final ObjectMapper mapper = ObjectMapperFactory.getMapper(); private Process process; private boolean logOutput = false; @@ -30,8 +44,9 @@ public class Hlsdl { private Consumer exitCallback; private File processLog = null; private OutputStream processLogStream; - private ProcessStreamRedirector stdoutRedirector; - private ProcessStreamRedirector stderrRedirector; + // private ProcessStreamRedirector stdoutRedirector; + // private ProcessStreamRedirector stderrRedirector; + private Thread streamReaderThread; private Hlsdl() {} @@ -47,21 +62,27 @@ public class Hlsdl { public void exec(String[] cmdline, String[] env, File executionDir) throws IOException { LOG.debug("hlsdl command line: {}", Arrays.toString(cmdline)); - process = Runtime.getRuntime().exec(cmdline, env, executionDir); - afterStart(); + // process = Runtime.getRuntime().exec(cmdline, env, executionDir); + var builder = new ProcessBuilder(cmdline).directory(executionDir); + builder.redirectErrorStream(true); + process = builder.start(); + setupLogging(); + notifyStartCallback(process); } - private void afterStart() throws IOException { - notifyStartCallback(process); - setupLogging(); - } public void shutdown(int exitCode) throws IOException { LOG.debug("hlsdl exit code was {}", exitCode); processLogStream.flush(); processLogStream.close(); - stdoutRedirector.setKeepGoing(false); - stderrRedirector.setKeepGoing(false); + streamReaderThread.interrupt(); + try { + streamReaderThread.join(); + } catch (InterruptedException e) { + LOG.warn("interrupted while joining stream reader thread: {}", e); + } + // stdoutRedirector.setKeepGoing(false); + // stderrRedirector.setKeepGoing(false); notifyExitCallback(exitCode); if (exitCode != 1) { if (processLog != null && processLog.exists()) { @@ -83,10 +104,44 @@ public class Hlsdl { } else { processLogStream = new DevNull(); } - stdoutRedirector = new ProcessStreamRedirector(processOutputReader, process.getInputStream(), processLogStream); - stderrRedirector = new ProcessStreamRedirector(processOutputReader, process.getErrorStream(), processLogStream); - processOutputReader.submit(stdoutRedirector); - processOutputReader.submit(stderrRedirector); + // stdoutRedirector = new ProcessStreamRedirector(processOutputReader, process.getInputStream(), processLogStream); + // stderrRedirector = new ProcessStreamRedirector(processOutputReader, process.getErrorStream(), processLogStream); + // processOutputReader.submit(stdoutRedirector); + // processOutputReader.submit(stderrRedirector); + + streamReaderThread = Thread.ofVirtual() + .name("hlsdl output stream reader") + .start(() -> { + var w = new OutputStreamWriter(processLogStream); + long lastDownloadSize = 0; + while (!Thread.currentThread().isInterrupted()) { + try { + var line = process.inputReader().readLine(); + + if (line == null) { + LOG.trace("stream ended"); + w.close(); + break; + } + if (line.length() == 0) continue; + + // parse API messages in JSON format + if (line.startsWith("{")) { + try { + var stats = mapper.readValue(line, ApiStats.class); + BandwidthMeter.add(stats.downloadSize - lastDownloadSize); + lastDownloadSize = stats.downloadSize; + } catch (JsonMappingException e) { + } + } + + w.write(line); + w.write('\n'); + } catch (IOException e) { + LOG.debug("{}", e); + } + } + }); } private void notifyStartCallback(Process process) { @@ -150,5 +205,31 @@ public class Hlsdl { return instance; } } + + @Data + private static class ApiError { + @JsonProperty("error_code") + int errorCode; + @JsonProperty("error_msg") + String errorMsg; + } + + @Data + private static class ApiDownloadType { + // d_t - download type + @JsonProperty("d_t") + String downloadType; + } + + @Data + private static class ApiStats { + // t_d - total duration, d_d - download duration, d_s - download size + @JsonProperty("t_d") + long totalDuration; + @JsonProperty("d_d") + long downloadDuration; + @JsonProperty("d_s") + long downloadSize; + } } From 78accb906df7a644ff13c56d457c44379fcda441 Mon Sep 17 00:00:00 2001 From: reusedname <155286845+reusedname@users.noreply.github.com> Date: Tue, 3 Dec 2024 14:17:56 +0500 Subject: [PATCH 2/2] small hlsdl fixes (cherry picked from commit 0c9c4c1e06321a9ee83994be8f6fd32a6e2c852a) --- .../main/java/ctbrec/recorder/download/hls/Hlsdl.java | 9 +++++---- .../java/ctbrec/recorder/download/hls/HlsdlDownload.java | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/common/src/main/java/ctbrec/recorder/download/hls/Hlsdl.java b/common/src/main/java/ctbrec/recorder/download/hls/Hlsdl.java index b598586f..7733232f 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/Hlsdl.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/Hlsdl.java @@ -114,8 +114,8 @@ public class Hlsdl { .start(() -> { var w = new OutputStreamWriter(processLogStream); long lastDownloadSize = 0; - while (!Thread.currentThread().isInterrupted()) { - try { + try { + while (!Thread.currentThread().isInterrupted()) { var line = process.inputReader().readLine(); if (line == null) { @@ -137,9 +137,10 @@ public class Hlsdl { w.write(line); w.write('\n'); - } catch (IOException e) { - LOG.debug("{}", e); } + } catch (IOException e) { + if (process.isAlive()) + LOG.debug("Error in stream while process is still alive {}", e); } }); } diff --git a/common/src/main/java/ctbrec/recorder/download/hls/HlsdlDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/HlsdlDownload.java index 05e7e39b..4f5f64ad 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/HlsdlDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/HlsdlDownload.java @@ -50,7 +50,7 @@ public class HlsdlDownload extends AbstractDownload { createTargetDirectory(); startHlsdlProcess(); if (hlsdlProcess == null) { - throw new ProcessExitedUncleanException("Couldn't spawn FFmpeg"); + throw new ProcessExitedUncleanException("Couldn't spawn Hlsdl"); } }