Merge branch 'hlsdl' into reusedname-dev

This commit is contained in:
reusedname 2025-02-15 14:10:21 +05:00
commit 7fe7d005d9
2 changed files with 98 additions and 16 deletions

View File

@ -4,6 +4,8 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.lang.ProcessBuilder.Redirect;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.concurrent.Executors;
@ -11,18 +13,30 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import ctbrec.io.BandwidthMeter;
import ctbrec.io.DevNull;
import ctbrec.io.ProcessStreamRedirector;
import ctbrec.io.json.ObjectMapperFactory;
import ctbrec.recorder.download.ProcessExitedUncleanException;
import lombok.Data;
public class Hlsdl {
private static final Logger LOG = LoggerFactory.getLogger(Hlsdl.class);
private static ScheduledExecutorService processOutputReader = Executors.newScheduledThreadPool(2, createThreadFactory("hlsdl output stream reader"));
// private static ScheduledExecutorService processOutputReader = Executors.newScheduledThreadPool(2, createThreadFactory("hlsdl output stream reader"));
private final ObjectMapper mapper = ObjectMapperFactory.getMapper();
private Process process;
private boolean logOutput = false;
@ -30,8 +44,9 @@ public class Hlsdl {
private Consumer<Integer> exitCallback;
private File processLog = null;
private OutputStream processLogStream;
private ProcessStreamRedirector stdoutRedirector;
private ProcessStreamRedirector stderrRedirector;
// private ProcessStreamRedirector stdoutRedirector;
// private ProcessStreamRedirector stderrRedirector;
private Thread streamReaderThread;
private Hlsdl() {}
@ -47,21 +62,27 @@ public class Hlsdl {
public void exec(String[] cmdline, String[] env, File executionDir) throws IOException {
LOG.debug("hlsdl command line: {}", Arrays.toString(cmdline));
process = Runtime.getRuntime().exec(cmdline, env, executionDir);
afterStart();
// process = Runtime.getRuntime().exec(cmdline, env, executionDir);
var builder = new ProcessBuilder(cmdline).directory(executionDir);
builder.redirectErrorStream(true);
process = builder.start();
setupLogging();
notifyStartCallback(process);
}
private void afterStart() throws IOException {
notifyStartCallback(process);
setupLogging();
}
public void shutdown(int exitCode) throws IOException {
LOG.debug("hlsdl exit code was {}", exitCode);
processLogStream.flush();
processLogStream.close();
stdoutRedirector.setKeepGoing(false);
stderrRedirector.setKeepGoing(false);
streamReaderThread.interrupt();
try {
streamReaderThread.join();
} catch (InterruptedException e) {
LOG.warn("interrupted while joining stream reader thread: {}", e);
}
// stdoutRedirector.setKeepGoing(false);
// stderrRedirector.setKeepGoing(false);
notifyExitCallback(exitCode);
if (exitCode != 1) {
if (processLog != null && processLog.exists()) {
@ -83,10 +104,45 @@ public class Hlsdl {
} else {
processLogStream = new DevNull();
}
stdoutRedirector = new ProcessStreamRedirector(processOutputReader, process.getInputStream(), processLogStream);
stderrRedirector = new ProcessStreamRedirector(processOutputReader, process.getErrorStream(), processLogStream);
processOutputReader.submit(stdoutRedirector);
processOutputReader.submit(stderrRedirector);
// stdoutRedirector = new ProcessStreamRedirector(processOutputReader, process.getInputStream(), processLogStream);
// stderrRedirector = new ProcessStreamRedirector(processOutputReader, process.getErrorStream(), processLogStream);
// processOutputReader.submit(stdoutRedirector);
// processOutputReader.submit(stderrRedirector);
streamReaderThread = Thread.ofVirtual()
.name("hlsdl output stream reader")
.start(() -> {
var w = new OutputStreamWriter(processLogStream);
long lastDownloadSize = 0;
try {
while (!Thread.currentThread().isInterrupted()) {
var line = process.inputReader().readLine();
if (line == null) {
LOG.trace("stream ended");
w.close();
break;
}
if (line.length() == 0) continue;
// parse API messages in JSON format
if (line.startsWith("{")) {
try {
var stats = mapper.readValue(line, ApiStats.class);
BandwidthMeter.add(stats.downloadSize - lastDownloadSize);
lastDownloadSize = stats.downloadSize;
} catch (JsonMappingException e) {
}
}
w.write(line);
w.write('\n');
}
} catch (IOException e) {
if (process.isAlive())
LOG.debug("Error in stream while process is still alive {}", e);
}
});
}
private void notifyStartCallback(Process process) {
@ -150,5 +206,31 @@ public class Hlsdl {
return instance;
}
}
@Data
private static class ApiError {
@JsonProperty("error_code")
int errorCode;
@JsonProperty("error_msg")
String errorMsg;
}
@Data
private static class ApiDownloadType {
// d_t - download type
@JsonProperty("d_t")
String downloadType;
}
@Data
private static class ApiStats {
// t_d - total duration, d_d - download duration, d_s - download size
@JsonProperty("t_d")
long totalDuration;
@JsonProperty("d_d")
long downloadDuration;
@JsonProperty("d_s")
long downloadSize;
}
}

View File

@ -50,7 +50,7 @@ public class HlsdlDownload extends AbstractDownload {
createTargetDirectory();
startHlsdlProcess();
if (hlsdlProcess == null) {
throw new ProcessExitedUncleanException("Couldn't spawn FFmpeg");
throw new ProcessExitedUncleanException("Couldn't spawn Hlsdl");
}
}