forked from j62/ctbrec
Make MergedFfmpegHlsDownload work again
This commit is contained in:
parent
7e03b48895
commit
32429b192c
|
@ -96,6 +96,7 @@ public class SettingsTab extends Tab implements TabSelectionListener {
|
|||
private SimpleFileProperty mediaPlayer;
|
||||
private SimpleStringProperty mediaPlayerParams;
|
||||
private SimpleIntegerProperty maximumResolutionPlayer;
|
||||
private SimpleIntegerProperty recorderThreadPoolSize;
|
||||
private SimpleBooleanProperty showPlayerStarting;
|
||||
private SimpleBooleanProperty singlePlayer;
|
||||
private SimpleListProperty<ProxyType> proxyType;
|
||||
|
@ -182,6 +183,7 @@ public class SettingsTab extends Tab implements TabSelectionListener {
|
|||
onlineCheckSkipsPausedModels = new SimpleBooleanProperty(null, "onlineCheckSkipsPausedModels", settings.onlineCheckSkipsPausedModels);
|
||||
fastScrollSpeed = new SimpleBooleanProperty(null, "fastScrollSpeed", settings.fastScrollSpeed);
|
||||
confirmationDialogs = new SimpleBooleanProperty(null, "confirmationForDangerousActions", settings.confirmationForDangerousActions);
|
||||
recorderThreadPoolSize = new SimpleIntegerProperty(null, "downloadThreadPoolSize", settings.downloadThreadPoolSize);
|
||||
}
|
||||
|
||||
private void createGui() {
|
||||
|
@ -265,7 +267,9 @@ public class SettingsTab extends Tab implements TabSelectionListener {
|
|||
),
|
||||
Category.of("Advanced / Devtools",
|
||||
Group.of("Logging",
|
||||
Setting.of("Log FFmpeg output", logFFmpegOutput, "Log FFmpeg output to files in the system's temp directory")
|
||||
Setting.of("Log FFmpeg output", logFFmpegOutput, "Log FFmpeg output to files in the system's temp directory"),
|
||||
Setting.of("Recorder thread pool size", recorderThreadPoolSize,
|
||||
"Number of threads used for the downloads. More = less missed segments for large amounts of concurrent recordings, but more resource hungry").needsRestart()
|
||||
)
|
||||
)
|
||||
);
|
||||
|
|
|
@ -59,6 +59,7 @@ public class Settings {
|
|||
public boolean determineResolution = false;
|
||||
public List<String> disabledSites = new ArrayList<>();
|
||||
public String downloadFilename = "${modelSanitizedName}-${localDateTime}";
|
||||
public int downloadThreadPoolSize = 20;
|
||||
public List<EventHandlerConfiguration> eventHandlers = new ArrayList<>();
|
||||
public boolean fastPlaylistGenerator = false;
|
||||
public boolean fastScrollSpeed = true;
|
||||
|
|
|
@ -19,6 +19,7 @@ public class MissedSegmentsStatistics {
|
|||
|
||||
private static Map<Model, Short> missegSegmentsCount = new HashMap<>();
|
||||
private static Instant lastOutput = Instant.EPOCH;
|
||||
private static Instant lastclear = Instant.now();
|
||||
private static Thread t;
|
||||
static {
|
||||
increase(new UnknownModel(), (short) 0);
|
||||
|
@ -46,6 +47,10 @@ public class MissedSegmentsStatistics {
|
|||
t.setDaemon(true);
|
||||
t.start();
|
||||
}
|
||||
if (Duration.between(lastclear, Instant.now()).toMinutes() > 60) {
|
||||
missegSegmentsCount.clear();
|
||||
LOG.debug("Missed segments statistics cleared");
|
||||
}
|
||||
}
|
||||
|
||||
private static void printStatistics() {
|
||||
|
|
|
@ -49,8 +49,6 @@ public class FFmpeg {
|
|||
LOG.debug("FFmpeg command line: {}", Arrays.toString(cmdline));
|
||||
process = Runtime.getRuntime().exec(cmdline, env, executionDir);
|
||||
afterStart();
|
||||
int exitCode = process.waitFor();
|
||||
afterExit(exitCode);
|
||||
}
|
||||
|
||||
private void afterStart() throws IOException {
|
||||
|
@ -58,7 +56,7 @@ public class FFmpeg {
|
|||
setupLogging();
|
||||
}
|
||||
|
||||
private void afterExit(int exitCode) throws IOException {
|
||||
public void shutdown(int exitCode) throws IOException {
|
||||
LOG.debug("FFmpeg exit code was {}", exitCode);
|
||||
ffmpegLogStream.flush();
|
||||
ffmpegLogStream.close();
|
||||
|
|
|
@ -68,7 +68,7 @@ public class NextGenLocalRecorder implements Recorder {
|
|||
private RecordingPreconditions preconditions;
|
||||
|
||||
// thread pools for downloads and post-processing
|
||||
private ScheduledExecutorService downloadPool = Executors.newScheduledThreadPool(10, createThreadFactory("Download"));
|
||||
private ScheduledExecutorService downloadPool;
|
||||
private ExecutorService downloadCompletionPool = Executors.newFixedThreadPool(1, createThreadFactory("DownloadCompletionWorker"));
|
||||
private BlockingQueue<ScheduledFuture<Recording>> downloadFutureQueue = new LinkedBlockingQueue<>();
|
||||
private Map<ScheduledFuture<Recording>, Recording> downloadFutureRecordingMap = Collections.synchronizedMap(new HashMap<>());
|
||||
|
@ -80,6 +80,7 @@ public class NextGenLocalRecorder implements Recorder {
|
|||
|
||||
public NextGenLocalRecorder(Config config, List<Site> sites) throws IOException {
|
||||
this.config = config;
|
||||
downloadPool = Executors.newScheduledThreadPool(config.getSettings().downloadThreadPoolSize, createThreadFactory("Download"));
|
||||
recordingManager = new RecordingManager(config, sites);
|
||||
config.getSettings().models.stream().forEach(m -> {
|
||||
if (m.getSite() != null) {
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
package ctbrec.recorder.download.hls;
|
||||
|
||||
import static ctbrec.io.HttpConstants.*;
|
||||
import static ctbrec.io.HttpConstants.ORIGIN;
|
||||
import static ctbrec.recorder.download.StreamSource.*;
|
||||
import static java.nio.charset.StandardCharsets.*;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.EOFException;
|
||||
|
@ -12,7 +12,6 @@ import java.io.InputStream;
|
|||
import java.io.OutputStream;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.text.DecimalFormat;
|
||||
import java.text.NumberFormat;
|
||||
import java.time.Duration;
|
||||
|
@ -51,6 +50,7 @@ import ctbrec.Settings;
|
|||
import ctbrec.UnknownModel;
|
||||
import ctbrec.io.BandwidthMeter;
|
||||
import ctbrec.io.HttpClient;
|
||||
import ctbrec.io.HttpConstants;
|
||||
import ctbrec.io.HttpException;
|
||||
import ctbrec.io.MissedSegmentsStatistics;
|
||||
import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException;
|
||||
|
@ -71,7 +71,6 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
|
|||
private transient NumberFormat nf = new DecimalFormat("000000");
|
||||
private transient int playlistEmptyCount = 0;
|
||||
private transient int segmentCounter = 1;
|
||||
private transient int waitFactor = 2;
|
||||
protected transient Config config;
|
||||
protected transient HttpClient client;
|
||||
protected transient ExecutorService downloadExecutor;
|
||||
|
@ -81,9 +80,12 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
|
|||
protected transient int nextSegmentNumber = 0;
|
||||
protected transient String segmentPlaylistUrl;
|
||||
|
||||
private transient String previousPlaylist;
|
||||
private transient String lastPlaylist;
|
||||
private transient Instant previousPlaylistRequest = Instant.EPOCH;
|
||||
private transient Instant afterLastPlaylistRequest= Instant.EPOCH;
|
||||
private transient Instant beforeLastPlaylistRequest= Instant.EPOCH;
|
||||
private transient int consecutivePlaylistTimeouts = 0;
|
||||
|
||||
protected Model model = new UnknownModel();
|
||||
|
||||
|
@ -119,7 +121,7 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
|
|||
handleMissedSegments(segmentPlaylist, nextSegmentNumber);
|
||||
enqueueNewSegments(segmentPlaylist, nextSegmentNumber);
|
||||
splitRecordingIfNecessary();
|
||||
calculateRescheduleTime(segmentPlaylist, lastSegmentNumber, waitFactor);
|
||||
calculateRescheduleTime(segmentPlaylist, lastSegmentNumber);
|
||||
|
||||
// this if check makes sure, that we don't decrease nextSegment. for some reason
|
||||
// streamate playlists sometimes jump back. e.g. max sequence = 79 -> 80 -> 79
|
||||
|
@ -128,11 +130,13 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
|
|||
nextSegmentNumber = lastSegmentNumber + segmentPlaylist.segments.size();
|
||||
}
|
||||
} catch (ParseException e) {
|
||||
LOG.error("Couldn't parse HLS playlist for model " + model + "\n" + e.getInput(), e);
|
||||
LOG.error("Couldn't parse HLS playlist for model {}\n{}", model, e.getInput(), e);
|
||||
running = false;
|
||||
} catch (PlaylistException e) {
|
||||
LOG.error("Couldn't parse HLS playlist for model " + model, e);
|
||||
LOG.error("Couldn't parse HLS playlist for model {}", model, e);
|
||||
running = false;
|
||||
} catch (PlaylistTimeoutException e) {
|
||||
rescheduleTime = beforeLastPlaylistRequest; // try again as fast as possible
|
||||
} catch (EOFException e) {
|
||||
// end of playlist reached
|
||||
LOG.debug("Reached end of playlist for model {}", model);
|
||||
|
@ -140,14 +144,14 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
|
|||
} catch (HttpException e) {
|
||||
handleHttpException(e);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Couldn't download segment", e);
|
||||
LOG.error("Couldn't download segment for model {}", model, e);
|
||||
running = false;
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
protected void execute(SegmentDownload segmentDownload) {
|
||||
segmentDownload.call();
|
||||
downloadExecutor.submit(segmentDownload);
|
||||
}
|
||||
|
||||
private void handleHttpException(HttpException e) throws IOException {
|
||||
|
@ -207,70 +211,67 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
|
|||
return url;
|
||||
}
|
||||
|
||||
protected SegmentPlaylist getNextSegments(String segmentsURL) throws IOException, ParseException, PlaylistException {
|
||||
URL segmentsUrl = new URL(segmentsURL);
|
||||
protected SegmentPlaylist getNextSegments(String segmentPlaylistUrl) throws IOException, ParseException, PlaylistException {
|
||||
URL segmentsUrl = new URL(segmentPlaylistUrl);
|
||||
Builder builder = new Request.Builder()
|
||||
.url(segmentsUrl);
|
||||
addHeaders(builder, Optional.ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentPlaylistHeaders).orElse(new HashMap<>()), model);
|
||||
Request request = builder.build();
|
||||
|
||||
for (int i = 1; i <= 3; i++) {
|
||||
Instant before = Instant.now();
|
||||
try (Response response = client.execute(request, 1000)) {
|
||||
if (response.isSuccessful()) {
|
||||
String body = response.body().string();
|
||||
if (!body.contains("#EXTINF")) {
|
||||
// no segments, empty playlist
|
||||
return new SegmentPlaylist(segmentsURL);
|
||||
}
|
||||
|
||||
byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
|
||||
BandwidthMeter.add(bytes.length);
|
||||
InputStream inputStream = new ByteArrayInputStream(bytes);
|
||||
PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT);
|
||||
Playlist playlist = parser.parse();
|
||||
if (playlist.hasMediaPlaylist()) {
|
||||
MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist();
|
||||
SegmentPlaylist lsp = new SegmentPlaylist(segmentsURL);
|
||||
lsp.seq = mediaPlaylist.getMediaSequenceNumber();
|
||||
lsp.targetDuration = mediaPlaylist.getTargetDuration();
|
||||
|
||||
List<TrackData> tracks = mediaPlaylist.getTracks();
|
||||
for (TrackData trackData : tracks) {
|
||||
String uri = trackData.getUri();
|
||||
if (!uri.startsWith("http")) {
|
||||
URL context = new URL(segmentsURL);
|
||||
uri = new URL(context, uri).toExternalForm();
|
||||
}
|
||||
lsp.totalDuration += trackData.getTrackInfo().duration;
|
||||
lsp.segments.add(uri);
|
||||
if (trackData.hasEncryptionData()) {
|
||||
lsp.encrypted = true;
|
||||
EncryptionData data = trackData.getEncryptionData();
|
||||
lsp.encryptionKeyUrl = data.getUri();
|
||||
lsp.encryptionMethod = data.getMethod().getValue();
|
||||
}
|
||||
}
|
||||
lsp.avgSegDuration = lsp.totalDuration / tracks.size();
|
||||
Duration d = Duration.between(before, Instant.now());
|
||||
if(i > 1) {
|
||||
LOG.trace("Playlist request took {} ms", d.toMillis());
|
||||
}
|
||||
return lsp;
|
||||
}
|
||||
throw new InvalidPlaylistException("Playlist has no media playlist");
|
||||
} else {
|
||||
throw new HttpException(response.code(), response.message());
|
||||
try (Response response = client.execute(request, 2000)) {
|
||||
if (response.isSuccessful()) {
|
||||
consecutivePlaylistTimeouts = 0;
|
||||
String body = response.body().string();
|
||||
previousPlaylist = lastPlaylist;
|
||||
lastPlaylist = beforeLastPlaylistRequest.toString()+"\n"+body;
|
||||
if (!body.contains("#EXTINF")) {
|
||||
// no segments, empty playlist
|
||||
return new SegmentPlaylist(segmentPlaylistUrl);
|
||||
}
|
||||
} catch (SocketTimeoutException e) {
|
||||
if (i == 3) {
|
||||
throw e;
|
||||
} else {
|
||||
LOG.trace("Playlist timeout {} for model {}", i, model);
|
||||
|
||||
byte[] bytes = body.getBytes(UTF_8);
|
||||
BandwidthMeter.add(bytes.length);
|
||||
InputStream inputStream = new ByteArrayInputStream(bytes);
|
||||
return parsePlaylist(segmentPlaylistUrl, inputStream);
|
||||
} else {
|
||||
throw new HttpException(response.code(), response.message());
|
||||
}
|
||||
} catch (SocketTimeoutException e) {
|
||||
LOG.debug("Playlist request timed out for model {} {} time{}", model, ++consecutivePlaylistTimeouts, (consecutivePlaylistTimeouts > 1) ? 's' : "");
|
||||
// times out, return an empty playlist, so that the process can continue without wasting much more time
|
||||
throw new PlaylistTimeoutException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private SegmentPlaylist parsePlaylist(String segmentPlaylistUrl, InputStream inputStream) throws IOException, ParseException, PlaylistException {
|
||||
PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT);
|
||||
Playlist playlist = parser.parse();
|
||||
if (playlist.hasMediaPlaylist()) {
|
||||
MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist();
|
||||
SegmentPlaylist lsp = new SegmentPlaylist(segmentPlaylistUrl);
|
||||
lsp.seq = mediaPlaylist.getMediaSequenceNumber();
|
||||
lsp.targetDuration = mediaPlaylist.getTargetDuration();
|
||||
|
||||
List<TrackData> tracks = mediaPlaylist.getTracks();
|
||||
for (TrackData trackData : tracks) {
|
||||
String uri = trackData.getUri();
|
||||
if (!uri.startsWith("http")) {
|
||||
URL context = new URL(segmentPlaylistUrl);
|
||||
uri = new URL(context, uri).toExternalForm();
|
||||
}
|
||||
lsp.totalDuration += trackData.getTrackInfo().duration;
|
||||
lsp.segments.add(uri);
|
||||
if (trackData.hasEncryptionData()) {
|
||||
lsp.encrypted = true;
|
||||
EncryptionData data = trackData.getEncryptionData();
|
||||
lsp.encryptionKeyUrl = data.getUri();
|
||||
lsp.encryptionMethod = data.getMethod().getValue();
|
||||
}
|
||||
}
|
||||
lsp.avgSegDuration = lsp.totalDuration / tracks.size();
|
||||
return lsp;
|
||||
}
|
||||
throw new InvalidPlaylistException("Playlist could not be downloaded in time");
|
||||
throw new InvalidPlaylistException("Playlist has no media playlist");
|
||||
}
|
||||
|
||||
protected void emptyPlaylistCheck(SegmentPlaylist playlist) {
|
||||
|
@ -292,9 +293,10 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
|
|||
|
||||
private void handleMissedSegments(SegmentPlaylist playlist, int nextSegmentNumber) {
|
||||
if (nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) {
|
||||
waitFactor *= 2;
|
||||
LOG.warn("Missed segments: {} < {} in download for {} - setting wait factor to 1/{}", nextSegmentNumber, playlist.seq, model, waitFactor);
|
||||
LOG.warn("Missed segments: {} < {} in download for {}", nextSegmentNumber, playlist.seq, model);
|
||||
LOG.warn("Missed segments: Last 2 [{}] [{}], schedule was {}, request took {} ms", previousPlaylistRequest, beforeLastPlaylistRequest, rescheduleTime, Duration.between(beforeLastPlaylistRequest, afterLastPlaylistRequest));
|
||||
LOG.warn("Missed segments: previous playlist\n{}", previousPlaylist);
|
||||
LOG.warn("Missed segments: last playlist\n{}", lastPlaylist);
|
||||
short missedSegments = (short) (playlist.seq - nextSegmentNumber);
|
||||
MissedSegmentsStatistics.increase(model, missedSegments);
|
||||
}
|
||||
|
@ -346,18 +348,19 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
|
|||
}
|
||||
}
|
||||
|
||||
private void calculateRescheduleTime(SegmentPlaylist playlist, int lastSegmentNumber, int waitFactor) {
|
||||
long waitForMillis = 0;
|
||||
if (lastSegmentNumber == playlist.seq) {
|
||||
// playlist didn't change -> wait for at least half the target duration
|
||||
waitForMillis = (long) playlist.avgSegDuration * 1000 / waitFactor;
|
||||
LOG.trace("Playlist didn't change. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis);
|
||||
} else {
|
||||
// playlist did change -> wait for at least the target duration
|
||||
waitForMillis = (long) (playlist.avgSegDuration * 1000);
|
||||
LOG.trace("Playlist changed. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis);
|
||||
}
|
||||
rescheduleTime = beforeLastPlaylistRequest.plusMillis(waitForMillis);
|
||||
private void calculateRescheduleTime(SegmentPlaylist playlist, int lastSegmentNumber) {
|
||||
// long waitForMillis = 0;
|
||||
// if (lastSegmentNumber == playlist.seq) {
|
||||
// // playlist didn't change -> wait for at least half the target duration
|
||||
// waitForMillis = (long) playlist.avgSegDuration * 1000 / 2;
|
||||
// LOG.trace("Playlist didn't change. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis);
|
||||
// } else {
|
||||
// // playlist did change -> wait for at least the target duration
|
||||
// waitForMillis = (long) (playlist.avgSegDuration * 1000);
|
||||
// LOG.trace("Playlist changed. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis);
|
||||
// }
|
||||
// rescheduleTime = beforeLastPlaylistRequest.plusMillis(waitForMillis);
|
||||
rescheduleTime = beforeLastPlaylistRequest.plusMillis(1000);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -381,7 +384,7 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
|
|||
headers.putIfAbsent(ACCEPT_LANGUAGE, Locale.ENGLISH.getLanguage());
|
||||
headers.putIfAbsent(USER_AGENT, Config.getInstance().getSettings().httpUserAgent);
|
||||
headers.putIfAbsent(CONNECTION, KEEP_ALIVE);
|
||||
headers.computeIfAbsent(ORIGIN, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null));
|
||||
headers.computeIfAbsent(HttpConstants.ORIGIN, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null));
|
||||
headers.computeIfAbsent(REFERER, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null));
|
||||
|
||||
for (Entry<String, String> header : headers.entrySet()) {
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package ctbrec.recorder.download.hls;
|
||||
|
||||
import static java.util.Optional.*;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
@ -11,6 +10,7 @@ import java.nio.file.Files;
|
|||
import java.time.Instant;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Pattern;
|
||||
|
@ -36,11 +36,10 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 {
|
|||
private static final Logger LOG = LoggerFactory.getLogger(MergedFfmpegHlsDownload.class);
|
||||
|
||||
private File targetFile;
|
||||
private transient FFmpeg ffmpeg;
|
||||
private transient Process ffmpegProcess;
|
||||
private transient OutputStream ffmpegStdIn;
|
||||
protected transient Thread ffmpegThread;
|
||||
private transient Object ffmpegStartMonitor = new Object();
|
||||
private BlockingQueue<SegmentDownload> queue = new LinkedBlockingQueue<>();
|
||||
private transient BlockingQueue<Future<SegmentDownload>> queue = new LinkedBlockingQueue<>();
|
||||
|
||||
public MergedFfmpegHlsDownload(HttpClient client) {
|
||||
super(client);
|
||||
|
@ -48,41 +47,61 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 {
|
|||
|
||||
@Override
|
||||
public void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException {
|
||||
LOG.debug("init start");
|
||||
super.init(config, model, startTime, executorService);
|
||||
String fileSuffix = config.getSettings().ffmpegFileSuffix;
|
||||
targetFile = config.getFileForRecording(model, fileSuffix, startTime);
|
||||
|
||||
createTargetDirectory();
|
||||
startFfmpegProcess(targetFile);
|
||||
synchronized (ffmpegStartMonitor) {
|
||||
int tries = 0;
|
||||
while (ffmpegProcess == null && tries++ < 15) {
|
||||
LOG.debug("Waiting for FFmpeg to spawn to record {}", model.getName());
|
||||
try {
|
||||
ffmpegStartMonitor.wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (ffmpegProcess == null) {
|
||||
throw new ProcessExitedUncleanException("Couldn't spawn FFmpeg");
|
||||
}
|
||||
}
|
||||
|
||||
downloadExecutor.submit(() -> {
|
||||
while (running && !Thread.currentThread().isInterrupted()) {
|
||||
try {
|
||||
SegmentDownload segmentDownload = queue.poll(5, TimeUnit.SECONDS);
|
||||
if (segmentDownload != null) {
|
||||
segmentDownload.call();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
@Override
|
||||
public AbstractHlsDownload2 call() throws Exception {
|
||||
super.call();
|
||||
try {
|
||||
if (!ffmpegProcess.isAlive()) {
|
||||
running = false;
|
||||
int exitValue = ffmpegProcess.exitValue();
|
||||
ffmpeg.shutdown(exitValue);
|
||||
}
|
||||
});
|
||||
} catch (ProcessExitedUncleanException e) {
|
||||
LOG.error("FFmpeg exited unclean", e);
|
||||
}
|
||||
|
||||
streamSegmentDataToFfmpeg();
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
private void streamSegmentDataToFfmpeg() {
|
||||
while (!queue.isEmpty() && !Thread.currentThread().isInterrupted()) {
|
||||
try {
|
||||
Future<SegmentDownload> future = queue.peek();
|
||||
if (running && future.isDone()) {
|
||||
queue.take();
|
||||
SegmentDownload segmentDownload = future.get();
|
||||
downloadExecutor.submit(() -> {
|
||||
ByteArrayOutputStream downloadData = (ByteArrayOutputStream) segmentDownload.getOutputStream();
|
||||
try {
|
||||
ffmpegStdIn.write(downloadData.toByteArray());
|
||||
} catch (IOException e) {
|
||||
LOG.error("Couldn't stream segment data to FFmpeg", e);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
// first download in queue not finished, let's continue with other stuff
|
||||
break;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Segment download failed for model {}", model, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -90,43 +109,30 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 {
|
|||
return targetFile;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void finalizeDownload() {
|
||||
try {
|
||||
ffmpegThread.join();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
internalStop();
|
||||
}
|
||||
|
||||
private void startFfmpegProcess(File target) {
|
||||
ffmpegThread = new Thread(() -> {
|
||||
try {
|
||||
String[] cmdline = prepareCommandLine(target);
|
||||
FFmpeg ffmpeg = new FFmpeg.Builder()
|
||||
.logOutput(config.getSettings().logFFmpegOutput)
|
||||
.onStarted(p -> {
|
||||
ffmpegProcess = p;
|
||||
ffmpegStdIn = ffmpegProcess.getOutputStream();
|
||||
synchronized (ffmpegStartMonitor) {
|
||||
ffmpegStartMonitor.notifyAll();
|
||||
}
|
||||
})
|
||||
.build();
|
||||
ffmpeg.exec(cmdline, new String[0], target.getParentFile());
|
||||
} catch (IOException | ProcessExitedUncleanException e) {
|
||||
LOG.error("Error in FFmpeg thread", e);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
if (running) {
|
||||
LOG.info("Interrupted while waiting for ffmpeg", e);
|
||||
}
|
||||
try {
|
||||
String[] cmdline = prepareCommandLine(target);
|
||||
ffmpeg = new FFmpeg.Builder()
|
||||
.logOutput(config.getSettings().logFFmpegOutput)
|
||||
.onStarted(p -> {
|
||||
ffmpegProcess = p;
|
||||
ffmpegStdIn = ffmpegProcess.getOutputStream();
|
||||
})
|
||||
.build();
|
||||
ffmpeg.exec(cmdline, new String[0], target.getParentFile());
|
||||
} catch (IOException | ProcessExitedUncleanException e) {
|
||||
LOG.error("Error in FFmpeg thread", e);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
if (running) {
|
||||
LOG.info("Interrupted while waiting for ffmpeg", e);
|
||||
}
|
||||
});
|
||||
String name = "FFmpeg " + ofNullable(model).map(Model::getName).orElse("").trim();
|
||||
ffmpegThread.setName(name);
|
||||
ffmpegThread.start();
|
||||
}
|
||||
}
|
||||
|
||||
private String[] prepareCommandLine(File target) {
|
||||
|
@ -142,7 +148,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 {
|
|||
|
||||
@Override
|
||||
protected void execute(SegmentDownload segmentDownload) {
|
||||
queue.add(segmentDownload);
|
||||
queue.add(downloadExecutor.submit(segmentDownload));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -155,6 +161,10 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 {
|
|||
@Override
|
||||
protected synchronized void internalStop() {
|
||||
running = false;
|
||||
for (Future<SegmentDownload> future : queue) {
|
||||
future.cancel(true);
|
||||
}
|
||||
queue.clear();
|
||||
|
||||
if (ffmpegStdIn != null) {
|
||||
try {
|
||||
|
@ -261,6 +271,6 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 {
|
|||
|
||||
@Override
|
||||
protected OutputStream getSegmentOutputStream(String prefix, String fileName) throws IOException {
|
||||
return ffmpegStdIn;
|
||||
return new ByteArrayOutputStream();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
package ctbrec.recorder.download.hls;
|
||||
|
||||
import java.net.SocketTimeoutException;
|
||||
|
||||
public class PlaylistTimeoutException extends RuntimeException {
|
||||
|
||||
public PlaylistTimeoutException(SocketTimeoutException e) {
|
||||
super(e);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue