Hlsdl: use virtual thread, add bandwith meter support
(cherry picked from commit 4ca9bd163d3d88f1032c83994b972d6bbf5ee3b6)
This commit is contained in:
parent
33f99488e2
commit
b4d0442f2c
|
@ -4,6 +4,8 @@ import java.io.File;
|
|||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.lang.ProcessBuilder.Redirect;
|
||||
import java.nio.file.Files;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.Executors;
|
||||
|
@ -11,18 +13,30 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.json.JSONObject;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.JsonParseException;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.databind.JsonMappingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import ctbrec.io.BandwidthMeter;
|
||||
import ctbrec.io.DevNull;
|
||||
import ctbrec.io.ProcessStreamRedirector;
|
||||
import ctbrec.io.json.ObjectMapperFactory;
|
||||
import ctbrec.recorder.download.ProcessExitedUncleanException;
|
||||
import lombok.Data;
|
||||
|
||||
public class Hlsdl {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Hlsdl.class);
|
||||
|
||||
private static ScheduledExecutorService processOutputReader = Executors.newScheduledThreadPool(2, createThreadFactory("hlsdl output stream reader"));
|
||||
// private static ScheduledExecutorService processOutputReader = Executors.newScheduledThreadPool(2, createThreadFactory("hlsdl output stream reader"));
|
||||
|
||||
private final ObjectMapper mapper = ObjectMapperFactory.getMapper();
|
||||
|
||||
private Process process;
|
||||
private boolean logOutput = false;
|
||||
|
@ -30,8 +44,9 @@ public class Hlsdl {
|
|||
private Consumer<Integer> exitCallback;
|
||||
private File processLog = null;
|
||||
private OutputStream processLogStream;
|
||||
private ProcessStreamRedirector stdoutRedirector;
|
||||
private ProcessStreamRedirector stderrRedirector;
|
||||
// private ProcessStreamRedirector stdoutRedirector;
|
||||
// private ProcessStreamRedirector stderrRedirector;
|
||||
private Thread streamReaderThread;
|
||||
|
||||
private Hlsdl() {}
|
||||
|
||||
|
@ -47,21 +62,27 @@ public class Hlsdl {
|
|||
|
||||
public void exec(String[] cmdline, String[] env, File executionDir) throws IOException {
|
||||
LOG.debug("hlsdl command line: {}", Arrays.toString(cmdline));
|
||||
process = Runtime.getRuntime().exec(cmdline, env, executionDir);
|
||||
afterStart();
|
||||
// process = Runtime.getRuntime().exec(cmdline, env, executionDir);
|
||||
var builder = new ProcessBuilder(cmdline).directory(executionDir);
|
||||
builder.redirectErrorStream(true);
|
||||
process = builder.start();
|
||||
setupLogging();
|
||||
notifyStartCallback(process);
|
||||
}
|
||||
|
||||
private void afterStart() throws IOException {
|
||||
notifyStartCallback(process);
|
||||
setupLogging();
|
||||
}
|
||||
|
||||
public void shutdown(int exitCode) throws IOException {
|
||||
LOG.debug("hlsdl exit code was {}", exitCode);
|
||||
processLogStream.flush();
|
||||
processLogStream.close();
|
||||
stdoutRedirector.setKeepGoing(false);
|
||||
stderrRedirector.setKeepGoing(false);
|
||||
streamReaderThread.interrupt();
|
||||
try {
|
||||
streamReaderThread.join();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("interrupted while joining stream reader thread: {}", e);
|
||||
}
|
||||
// stdoutRedirector.setKeepGoing(false);
|
||||
// stderrRedirector.setKeepGoing(false);
|
||||
notifyExitCallback(exitCode);
|
||||
if (exitCode != 1) {
|
||||
if (processLog != null && processLog.exists()) {
|
||||
|
@ -83,10 +104,44 @@ public class Hlsdl {
|
|||
} else {
|
||||
processLogStream = new DevNull();
|
||||
}
|
||||
stdoutRedirector = new ProcessStreamRedirector(processOutputReader, process.getInputStream(), processLogStream);
|
||||
stderrRedirector = new ProcessStreamRedirector(processOutputReader, process.getErrorStream(), processLogStream);
|
||||
processOutputReader.submit(stdoutRedirector);
|
||||
processOutputReader.submit(stderrRedirector);
|
||||
// stdoutRedirector = new ProcessStreamRedirector(processOutputReader, process.getInputStream(), processLogStream);
|
||||
// stderrRedirector = new ProcessStreamRedirector(processOutputReader, process.getErrorStream(), processLogStream);
|
||||
// processOutputReader.submit(stdoutRedirector);
|
||||
// processOutputReader.submit(stderrRedirector);
|
||||
|
||||
streamReaderThread = Thread.ofVirtual()
|
||||
.name("hlsdl output stream reader")
|
||||
.start(() -> {
|
||||
var w = new OutputStreamWriter(processLogStream);
|
||||
long lastDownloadSize = 0;
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
try {
|
||||
var line = process.inputReader().readLine();
|
||||
|
||||
if (line == null) {
|
||||
LOG.trace("stream ended");
|
||||
w.close();
|
||||
break;
|
||||
}
|
||||
if (line.length() == 0) continue;
|
||||
|
||||
// parse API messages in JSON format
|
||||
if (line.startsWith("{")) {
|
||||
try {
|
||||
var stats = mapper.readValue(line, ApiStats.class);
|
||||
BandwidthMeter.add(stats.downloadSize - lastDownloadSize);
|
||||
lastDownloadSize = stats.downloadSize;
|
||||
} catch (JsonMappingException e) {
|
||||
}
|
||||
}
|
||||
|
||||
w.write(line);
|
||||
w.write('\n');
|
||||
} catch (IOException e) {
|
||||
LOG.debug("{}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void notifyStartCallback(Process process) {
|
||||
|
@ -150,5 +205,31 @@ public class Hlsdl {
|
|||
return instance;
|
||||
}
|
||||
}
|
||||
|
||||
@Data
|
||||
private static class ApiError {
|
||||
@JsonProperty("error_code")
|
||||
int errorCode;
|
||||
@JsonProperty("error_msg")
|
||||
String errorMsg;
|
||||
}
|
||||
|
||||
@Data
|
||||
private static class ApiDownloadType {
|
||||
// d_t - download type
|
||||
@JsonProperty("d_t")
|
||||
String downloadType;
|
||||
}
|
||||
|
||||
@Data
|
||||
private static class ApiStats {
|
||||
// t_d - total duration, d_d - download duration, d_s - download size
|
||||
@JsonProperty("t_d")
|
||||
long totalDuration;
|
||||
@JsonProperty("d_d")
|
||||
long downloadDuration;
|
||||
@JsonProperty("d_s")
|
||||
long downloadSize;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue