diff --git a/client/src/main/java/ctbrec/ui/settings/SettingsTab.java b/client/src/main/java/ctbrec/ui/settings/SettingsTab.java index ab9e55e5..de57fed6 100644 --- a/client/src/main/java/ctbrec/ui/settings/SettingsTab.java +++ b/client/src/main/java/ctbrec/ui/settings/SettingsTab.java @@ -96,6 +96,7 @@ public class SettingsTab extends Tab implements TabSelectionListener { private SimpleFileProperty mediaPlayer; private SimpleStringProperty mediaPlayerParams; private SimpleIntegerProperty maximumResolutionPlayer; + private SimpleIntegerProperty recorderThreadPoolSize; private SimpleBooleanProperty showPlayerStarting; private SimpleBooleanProperty singlePlayer; private SimpleListProperty proxyType; @@ -182,6 +183,7 @@ public class SettingsTab extends Tab implements TabSelectionListener { onlineCheckSkipsPausedModels = new SimpleBooleanProperty(null, "onlineCheckSkipsPausedModels", settings.onlineCheckSkipsPausedModels); fastScrollSpeed = new SimpleBooleanProperty(null, "fastScrollSpeed", settings.fastScrollSpeed); confirmationDialogs = new SimpleBooleanProperty(null, "confirmationForDangerousActions", settings.confirmationForDangerousActions); + recorderThreadPoolSize = new SimpleIntegerProperty(null, "downloadThreadPoolSize", settings.downloadThreadPoolSize); } private void createGui() { @@ -265,7 +267,9 @@ public class SettingsTab extends Tab implements TabSelectionListener { ), Category.of("Advanced / Devtools", Group.of("Logging", - Setting.of("Log FFmpeg output", logFFmpegOutput, "Log FFmpeg output to files in the system's temp directory") + Setting.of("Log FFmpeg output", logFFmpegOutput, "Log FFmpeg output to files in the system's temp directory"), + Setting.of("Recorder thread pool size", recorderThreadPoolSize, + "Number of threads used for the downloads. More = less missed segments for large amounts of concurrent recordings, but more resource hungry").needsRestart() ) ) ); diff --git a/common/src/main/java/ctbrec/Settings.java b/common/src/main/java/ctbrec/Settings.java index d120ba3c..3564f849 100644 --- a/common/src/main/java/ctbrec/Settings.java +++ b/common/src/main/java/ctbrec/Settings.java @@ -59,6 +59,7 @@ public class Settings { public boolean determineResolution = false; public List disabledSites = new ArrayList<>(); public String downloadFilename = "${modelSanitizedName}-${localDateTime}"; + public int downloadThreadPoolSize = 20; public List eventHandlers = new ArrayList<>(); public boolean fastPlaylistGenerator = false; public boolean fastScrollSpeed = true; diff --git a/common/src/main/java/ctbrec/io/MissedSegmentsStatistics.java b/common/src/main/java/ctbrec/io/MissedSegmentsStatistics.java index fbb439f9..06d61ee3 100644 --- a/common/src/main/java/ctbrec/io/MissedSegmentsStatistics.java +++ b/common/src/main/java/ctbrec/io/MissedSegmentsStatistics.java @@ -19,6 +19,7 @@ public class MissedSegmentsStatistics { private static Map missegSegmentsCount = new HashMap<>(); private static Instant lastOutput = Instant.EPOCH; + private static Instant lastclear = Instant.now(); private static Thread t; static { increase(new UnknownModel(), (short) 0); @@ -46,6 +47,10 @@ public class MissedSegmentsStatistics { t.setDaemon(true); t.start(); } + if (Duration.between(lastclear, Instant.now()).toMinutes() > 60) { + missegSegmentsCount.clear(); + LOG.debug("Missed segments statistics cleared"); + } } private static void printStatistics() { diff --git a/common/src/main/java/ctbrec/recorder/FFmpeg.java b/common/src/main/java/ctbrec/recorder/FFmpeg.java index 9d13e511..916e11c5 100644 --- a/common/src/main/java/ctbrec/recorder/FFmpeg.java +++ b/common/src/main/java/ctbrec/recorder/FFmpeg.java @@ -49,8 +49,6 @@ public class FFmpeg { LOG.debug("FFmpeg command line: {}", Arrays.toString(cmdline)); process = Runtime.getRuntime().exec(cmdline, env, executionDir); afterStart(); - int exitCode = process.waitFor(); - afterExit(exitCode); } private void afterStart() throws IOException { @@ -58,7 +56,7 @@ public class FFmpeg { setupLogging(); } - private void afterExit(int exitCode) throws IOException { + public void shutdown(int exitCode) throws IOException { LOG.debug("FFmpeg exit code was {}", exitCode); ffmpegLogStream.flush(); ffmpegLogStream.close(); diff --git a/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java index 5e4b92e6..c46146df 100644 --- a/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java +++ b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java @@ -68,7 +68,7 @@ public class NextGenLocalRecorder implements Recorder { private RecordingPreconditions preconditions; // thread pools for downloads and post-processing - private ScheduledExecutorService downloadPool = Executors.newScheduledThreadPool(10, createThreadFactory("Download")); + private ScheduledExecutorService downloadPool; private ExecutorService downloadCompletionPool = Executors.newFixedThreadPool(1, createThreadFactory("DownloadCompletionWorker")); private BlockingQueue> downloadFutureQueue = new LinkedBlockingQueue<>(); private Map, Recording> downloadFutureRecordingMap = Collections.synchronizedMap(new HashMap<>()); @@ -80,6 +80,7 @@ public class NextGenLocalRecorder implements Recorder { public NextGenLocalRecorder(Config config, List sites) throws IOException { this.config = config; + downloadPool = Executors.newScheduledThreadPool(config.getSettings().downloadThreadPoolSize, createThreadFactory("Download")); recordingManager = new RecordingManager(config, sites); config.getSettings().models.stream().forEach(m -> { if (m.getSite() != null) { diff --git a/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload2.java b/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload2.java index f567e915..e9a858b5 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload2.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload2.java @@ -1,8 +1,8 @@ package ctbrec.recorder.download.hls; import static ctbrec.io.HttpConstants.*; -import static ctbrec.io.HttpConstants.ORIGIN; import static ctbrec.recorder.download.StreamSource.*; +import static java.nio.charset.StandardCharsets.*; import java.io.ByteArrayInputStream; import java.io.EOFException; @@ -12,7 +12,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.SocketTimeoutException; import java.net.URL; -import java.nio.charset.StandardCharsets; import java.text.DecimalFormat; import java.text.NumberFormat; import java.time.Duration; @@ -51,6 +50,7 @@ import ctbrec.Settings; import ctbrec.UnknownModel; import ctbrec.io.BandwidthMeter; import ctbrec.io.HttpClient; +import ctbrec.io.HttpConstants; import ctbrec.io.HttpException; import ctbrec.io.MissedSegmentsStatistics; import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException; @@ -71,7 +71,6 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { private transient NumberFormat nf = new DecimalFormat("000000"); private transient int playlistEmptyCount = 0; private transient int segmentCounter = 1; - private transient int waitFactor = 2; protected transient Config config; protected transient HttpClient client; protected transient ExecutorService downloadExecutor; @@ -81,9 +80,12 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { protected transient int nextSegmentNumber = 0; protected transient String segmentPlaylistUrl; + private transient String previousPlaylist; + private transient String lastPlaylist; private transient Instant previousPlaylistRequest = Instant.EPOCH; private transient Instant afterLastPlaylistRequest= Instant.EPOCH; private transient Instant beforeLastPlaylistRequest= Instant.EPOCH; + private transient int consecutivePlaylistTimeouts = 0; protected Model model = new UnknownModel(); @@ -119,7 +121,7 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { handleMissedSegments(segmentPlaylist, nextSegmentNumber); enqueueNewSegments(segmentPlaylist, nextSegmentNumber); splitRecordingIfNecessary(); - calculateRescheduleTime(segmentPlaylist, lastSegmentNumber, waitFactor); + calculateRescheduleTime(segmentPlaylist, lastSegmentNumber); // this if check makes sure, that we don't decrease nextSegment. for some reason // streamate playlists sometimes jump back. e.g. max sequence = 79 -> 80 -> 79 @@ -128,11 +130,13 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { nextSegmentNumber = lastSegmentNumber + segmentPlaylist.segments.size(); } } catch (ParseException e) { - LOG.error("Couldn't parse HLS playlist for model " + model + "\n" + e.getInput(), e); + LOG.error("Couldn't parse HLS playlist for model {}\n{}", model, e.getInput(), e); running = false; } catch (PlaylistException e) { - LOG.error("Couldn't parse HLS playlist for model " + model, e); + LOG.error("Couldn't parse HLS playlist for model {}", model, e); running = false; + } catch (PlaylistTimeoutException e) { + rescheduleTime = beforeLastPlaylistRequest; // try again as fast as possible } catch (EOFException e) { // end of playlist reached LOG.debug("Reached end of playlist for model {}", model); @@ -140,14 +144,14 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { } catch (HttpException e) { handleHttpException(e); } catch (Exception e) { - LOG.error("Couldn't download segment", e); + LOG.error("Couldn't download segment for model {}", model, e); running = false; } return this; } protected void execute(SegmentDownload segmentDownload) { - segmentDownload.call(); + downloadExecutor.submit(segmentDownload); } private void handleHttpException(HttpException e) throws IOException { @@ -207,70 +211,67 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { return url; } - protected SegmentPlaylist getNextSegments(String segmentsURL) throws IOException, ParseException, PlaylistException { - URL segmentsUrl = new URL(segmentsURL); + protected SegmentPlaylist getNextSegments(String segmentPlaylistUrl) throws IOException, ParseException, PlaylistException { + URL segmentsUrl = new URL(segmentPlaylistUrl); Builder builder = new Request.Builder() .url(segmentsUrl); addHeaders(builder, Optional.ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentPlaylistHeaders).orElse(new HashMap<>()), model); Request request = builder.build(); - for (int i = 1; i <= 3; i++) { - Instant before = Instant.now(); - try (Response response = client.execute(request, 1000)) { - if (response.isSuccessful()) { - String body = response.body().string(); - if (!body.contains("#EXTINF")) { - // no segments, empty playlist - return new SegmentPlaylist(segmentsURL); - } - - byte[] bytes = body.getBytes(StandardCharsets.UTF_8); - BandwidthMeter.add(bytes.length); - InputStream inputStream = new ByteArrayInputStream(bytes); - PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT); - Playlist playlist = parser.parse(); - if (playlist.hasMediaPlaylist()) { - MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist(); - SegmentPlaylist lsp = new SegmentPlaylist(segmentsURL); - lsp.seq = mediaPlaylist.getMediaSequenceNumber(); - lsp.targetDuration = mediaPlaylist.getTargetDuration(); - - List tracks = mediaPlaylist.getTracks(); - for (TrackData trackData : tracks) { - String uri = trackData.getUri(); - if (!uri.startsWith("http")) { - URL context = new URL(segmentsURL); - uri = new URL(context, uri).toExternalForm(); - } - lsp.totalDuration += trackData.getTrackInfo().duration; - lsp.segments.add(uri); - if (trackData.hasEncryptionData()) { - lsp.encrypted = true; - EncryptionData data = trackData.getEncryptionData(); - lsp.encryptionKeyUrl = data.getUri(); - lsp.encryptionMethod = data.getMethod().getValue(); - } - } - lsp.avgSegDuration = lsp.totalDuration / tracks.size(); - Duration d = Duration.between(before, Instant.now()); - if(i > 1) { - LOG.trace("Playlist request took {} ms", d.toMillis()); - } - return lsp; - } - throw new InvalidPlaylistException("Playlist has no media playlist"); - } else { - throw new HttpException(response.code(), response.message()); + try (Response response = client.execute(request, 2000)) { + if (response.isSuccessful()) { + consecutivePlaylistTimeouts = 0; + String body = response.body().string(); + previousPlaylist = lastPlaylist; + lastPlaylist = beforeLastPlaylistRequest.toString()+"\n"+body; + if (!body.contains("#EXTINF")) { + // no segments, empty playlist + return new SegmentPlaylist(segmentPlaylistUrl); } - } catch (SocketTimeoutException e) { - if (i == 3) { - throw e; - } else { - LOG.trace("Playlist timeout {} for model {}", i, model); + + byte[] bytes = body.getBytes(UTF_8); + BandwidthMeter.add(bytes.length); + InputStream inputStream = new ByteArrayInputStream(bytes); + return parsePlaylist(segmentPlaylistUrl, inputStream); + } else { + throw new HttpException(response.code(), response.message()); + } + } catch (SocketTimeoutException e) { + LOG.debug("Playlist request timed out for model {} {} time{}", model, ++consecutivePlaylistTimeouts, (consecutivePlaylistTimeouts > 1) ? 's' : ""); + // times out, return an empty playlist, so that the process can continue without wasting much more time + throw new PlaylistTimeoutException(e); + } + } + + private SegmentPlaylist parsePlaylist(String segmentPlaylistUrl, InputStream inputStream) throws IOException, ParseException, PlaylistException { + PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT); + Playlist playlist = parser.parse(); + if (playlist.hasMediaPlaylist()) { + MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist(); + SegmentPlaylist lsp = new SegmentPlaylist(segmentPlaylistUrl); + lsp.seq = mediaPlaylist.getMediaSequenceNumber(); + lsp.targetDuration = mediaPlaylist.getTargetDuration(); + + List tracks = mediaPlaylist.getTracks(); + for (TrackData trackData : tracks) { + String uri = trackData.getUri(); + if (!uri.startsWith("http")) { + URL context = new URL(segmentPlaylistUrl); + uri = new URL(context, uri).toExternalForm(); + } + lsp.totalDuration += trackData.getTrackInfo().duration; + lsp.segments.add(uri); + if (trackData.hasEncryptionData()) { + lsp.encrypted = true; + EncryptionData data = trackData.getEncryptionData(); + lsp.encryptionKeyUrl = data.getUri(); + lsp.encryptionMethod = data.getMethod().getValue(); } } + lsp.avgSegDuration = lsp.totalDuration / tracks.size(); + return lsp; } - throw new InvalidPlaylistException("Playlist could not be downloaded in time"); + throw new InvalidPlaylistException("Playlist has no media playlist"); } protected void emptyPlaylistCheck(SegmentPlaylist playlist) { @@ -292,9 +293,10 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { private void handleMissedSegments(SegmentPlaylist playlist, int nextSegmentNumber) { if (nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) { - waitFactor *= 2; - LOG.warn("Missed segments: {} < {} in download for {} - setting wait factor to 1/{}", nextSegmentNumber, playlist.seq, model, waitFactor); + LOG.warn("Missed segments: {} < {} in download for {}", nextSegmentNumber, playlist.seq, model); LOG.warn("Missed segments: Last 2 [{}] [{}], schedule was {}, request took {} ms", previousPlaylistRequest, beforeLastPlaylistRequest, rescheduleTime, Duration.between(beforeLastPlaylistRequest, afterLastPlaylistRequest)); + LOG.warn("Missed segments: previous playlist\n{}", previousPlaylist); + LOG.warn("Missed segments: last playlist\n{}", lastPlaylist); short missedSegments = (short) (playlist.seq - nextSegmentNumber); MissedSegmentsStatistics.increase(model, missedSegments); } @@ -346,18 +348,19 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { } } - private void calculateRescheduleTime(SegmentPlaylist playlist, int lastSegmentNumber, int waitFactor) { - long waitForMillis = 0; - if (lastSegmentNumber == playlist.seq) { - // playlist didn't change -> wait for at least half the target duration - waitForMillis = (long) playlist.avgSegDuration * 1000 / waitFactor; - LOG.trace("Playlist didn't change. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis); - } else { - // playlist did change -> wait for at least the target duration - waitForMillis = (long) (playlist.avgSegDuration * 1000); - LOG.trace("Playlist changed. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis); - } - rescheduleTime = beforeLastPlaylistRequest.plusMillis(waitForMillis); + private void calculateRescheduleTime(SegmentPlaylist playlist, int lastSegmentNumber) { + // long waitForMillis = 0; + // if (lastSegmentNumber == playlist.seq) { + // // playlist didn't change -> wait for at least half the target duration + // waitForMillis = (long) playlist.avgSegDuration * 1000 / 2; + // LOG.trace("Playlist didn't change. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis); + // } else { + // // playlist did change -> wait for at least the target duration + // waitForMillis = (long) (playlist.avgSegDuration * 1000); + // LOG.trace("Playlist changed. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis); + // } + // rescheduleTime = beforeLastPlaylistRequest.plusMillis(waitForMillis); + rescheduleTime = beforeLastPlaylistRequest.plusMillis(1000); } /** @@ -381,7 +384,7 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { headers.putIfAbsent(ACCEPT_LANGUAGE, Locale.ENGLISH.getLanguage()); headers.putIfAbsent(USER_AGENT, Config.getInstance().getSettings().httpUserAgent); headers.putIfAbsent(CONNECTION, KEEP_ALIVE); - headers.computeIfAbsent(ORIGIN, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null)); + headers.computeIfAbsent(HttpConstants.ORIGIN, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null)); headers.computeIfAbsent(REFERER, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null)); for (Entry header : headers.entrySet()) { diff --git a/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java index 367ab8fd..ee05d049 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java @@ -1,7 +1,6 @@ package ctbrec.recorder.download.hls; -import static java.util.Optional.*; - +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -11,6 +10,7 @@ import java.nio.file.Files; import java.time.Instant; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -36,11 +36,10 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 { private static final Logger LOG = LoggerFactory.getLogger(MergedFfmpegHlsDownload.class); private File targetFile; + private transient FFmpeg ffmpeg; private transient Process ffmpegProcess; private transient OutputStream ffmpegStdIn; - protected transient Thread ffmpegThread; - private transient Object ffmpegStartMonitor = new Object(); - private BlockingQueue queue = new LinkedBlockingQueue<>(); + private transient BlockingQueue> queue = new LinkedBlockingQueue<>(); public MergedFfmpegHlsDownload(HttpClient client) { super(client); @@ -48,41 +47,61 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 { @Override public void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException { + LOG.debug("init start"); super.init(config, model, startTime, executorService); String fileSuffix = config.getSettings().ffmpegFileSuffix; targetFile = config.getFileForRecording(model, fileSuffix, startTime); createTargetDirectory(); startFfmpegProcess(targetFile); - synchronized (ffmpegStartMonitor) { - int tries = 0; - while (ffmpegProcess == null && tries++ < 15) { - LOG.debug("Waiting for FFmpeg to spawn to record {}", model.getName()); - try { - ffmpegStartMonitor.wait(1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - break; - } - } - } - if (ffmpegProcess == null) { throw new ProcessExitedUncleanException("Couldn't spawn FFmpeg"); } + } - downloadExecutor.submit(() -> { - while (running && !Thread.currentThread().isInterrupted()) { - try { - SegmentDownload segmentDownload = queue.poll(5, TimeUnit.SECONDS); - if (segmentDownload != null) { - segmentDownload.call(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + @Override + public AbstractHlsDownload2 call() throws Exception { + super.call(); + try { + if (!ffmpegProcess.isAlive()) { + running = false; + int exitValue = ffmpegProcess.exitValue(); + ffmpeg.shutdown(exitValue); } - }); + } catch (ProcessExitedUncleanException e) { + LOG.error("FFmpeg exited unclean", e); + } + + streamSegmentDataToFfmpeg(); + + return this; + } + + private void streamSegmentDataToFfmpeg() { + while (!queue.isEmpty() && !Thread.currentThread().isInterrupted()) { + try { + Future future = queue.peek(); + if (running && future.isDone()) { + queue.take(); + SegmentDownload segmentDownload = future.get(); + downloadExecutor.submit(() -> { + ByteArrayOutputStream downloadData = (ByteArrayOutputStream) segmentDownload.getOutputStream(); + try { + ffmpegStdIn.write(downloadData.toByteArray()); + } catch (IOException e) { + LOG.error("Couldn't stream segment data to FFmpeg", e); + } + }); + } else { + // first download in queue not finished, let's continue with other stuff + break; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOG.error("Segment download failed for model {}", model, e); + } + } } @Override @@ -90,43 +109,30 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 { return targetFile; } - @Override public void finalizeDownload() { - try { - ffmpegThread.join(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + internalStop(); } private void startFfmpegProcess(File target) { - ffmpegThread = new Thread(() -> { - try { - String[] cmdline = prepareCommandLine(target); - FFmpeg ffmpeg = new FFmpeg.Builder() - .logOutput(config.getSettings().logFFmpegOutput) - .onStarted(p -> { - ffmpegProcess = p; - ffmpegStdIn = ffmpegProcess.getOutputStream(); - synchronized (ffmpegStartMonitor) { - ffmpegStartMonitor.notifyAll(); - } - }) - .build(); - ffmpeg.exec(cmdline, new String[0], target.getParentFile()); - } catch (IOException | ProcessExitedUncleanException e) { - LOG.error("Error in FFmpeg thread", e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - if (running) { - LOG.info("Interrupted while waiting for ffmpeg", e); - } + try { + String[] cmdline = prepareCommandLine(target); + ffmpeg = new FFmpeg.Builder() + .logOutput(config.getSettings().logFFmpegOutput) + .onStarted(p -> { + ffmpegProcess = p; + ffmpegStdIn = ffmpegProcess.getOutputStream(); + }) + .build(); + ffmpeg.exec(cmdline, new String[0], target.getParentFile()); + } catch (IOException | ProcessExitedUncleanException e) { + LOG.error("Error in FFmpeg thread", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (running) { + LOG.info("Interrupted while waiting for ffmpeg", e); } - }); - String name = "FFmpeg " + ofNullable(model).map(Model::getName).orElse("").trim(); - ffmpegThread.setName(name); - ffmpegThread.start(); + } } private String[] prepareCommandLine(File target) { @@ -142,7 +148,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 { @Override protected void execute(SegmentDownload segmentDownload) { - queue.add(segmentDownload); + queue.add(downloadExecutor.submit(segmentDownload)); } @Override @@ -155,6 +161,10 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 { @Override protected synchronized void internalStop() { running = false; + for (Future future : queue) { + future.cancel(true); + } + queue.clear(); if (ffmpegStdIn != null) { try { @@ -261,6 +271,6 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 { @Override protected OutputStream getSegmentOutputStream(String prefix, String fileName) throws IOException { - return ffmpegStdIn; + return new ByteArrayOutputStream(); } } diff --git a/common/src/main/java/ctbrec/recorder/download/hls/PlaylistTimeoutException.java b/common/src/main/java/ctbrec/recorder/download/hls/PlaylistTimeoutException.java new file mode 100644 index 00000000..6b7dbcaa --- /dev/null +++ b/common/src/main/java/ctbrec/recorder/download/hls/PlaylistTimeoutException.java @@ -0,0 +1,11 @@ +package ctbrec.recorder.download.hls; + +import java.net.SocketTimeoutException; + +public class PlaylistTimeoutException extends RuntimeException { + + public PlaylistTimeoutException(SocketTimeoutException e) { + super(e); + } + +}