diff --git a/client/src/main/java/ctbrec/ui/tabs/RecordingsTab.java b/client/src/main/java/ctbrec/ui/tabs/RecordingsTab.java index 11485d5e..d8741756 100644 --- a/client/src/main/java/ctbrec/ui/tabs/RecordingsTab.java +++ b/client/src/main/java/ctbrec/ui/tabs/RecordingsTab.java @@ -703,7 +703,7 @@ public class RecordingsTab extends Tab implements TabSelectionListener { } else { URL url = new URL(hlsBase + '/' + recording.getId() + "/playlist.m3u8"); MergedFfmpegHlsDownload download = new MergedFfmpegHlsDownload(CamrecApplication.httpClient); - download.init(config, recording.getModel(), Instant.now()); + download.init(config, recording.getModel(), Instant.now(), Executors.newSingleThreadExecutor()); LOG.info("Downloading {}", url); download.downloadFinishedRecording(url.toString(), target, createDownloadListener(recording), recording.getSizeInByte()); } diff --git a/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java index 7e1a3c96..afefe07e 100644 --- a/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java +++ b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java @@ -235,7 +235,7 @@ public class NextGenLocalRecorder implements Recorder { private Download createDownload(Model model) { Download download = model.createDownload(); - download.init(config, model, Instant.now()); + download.init(config, model, Instant.now(), downloadPool); Objects.requireNonNull(download.getStartTime(), "At this point the download should have set a startTime. Make sure to set a startTime in " + download.getClass() + ".init()"); LOG.debug("Downloading with {}", download.getClass().getSimpleName()); @@ -626,7 +626,7 @@ public class NextGenLocalRecorder implements Recorder { for (Recording other : recordings) { if(other.equals(recording)) { Download download = other.getModel().createDownload(); - download.init(Config.getInstance(), other.getModel(), other.getStartDate()); + download.init(Config.getInstance(), other.getModel(), other.getStartDate(), downloadPool); other.setDownload(download); other.setPostProcessedFile(null); other.setStatus(State.WAITING); diff --git a/common/src/main/java/ctbrec/recorder/download/Download.java b/common/src/main/java/ctbrec/recorder/download/Download.java index b5497a1d..4916f8fe 100644 --- a/common/src/main/java/ctbrec/recorder/download/Download.java +++ b/common/src/main/java/ctbrec/recorder/download/Download.java @@ -4,13 +4,14 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; import java.time.Instant; +import java.util.concurrent.ExecutorService; import ctbrec.Config; import ctbrec.Model; import ctbrec.Recording; public interface Download extends Serializable { - public void init(Config config, Model model, Instant startTime); + public void init(Config config, Model model, Instant startTime, ExecutorService executorService); public void start() throws IOException; public void stop(); public Model getModel(); diff --git a/common/src/main/java/ctbrec/recorder/download/dash/DashDownload.java b/common/src/main/java/ctbrec/recorder/download/dash/DashDownload.java index e28a6ba6..59a49b19 100644 --- a/common/src/main/java/ctbrec/recorder/download/dash/DashDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/dash/DashDownload.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Locale; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; @@ -229,7 +230,7 @@ public class DashDownload extends AbstractDownload { } @Override - public void init(Config config, Model model, Instant startTime) { + public void init(Config config, Model model, Instant startTime, ExecutorService executorService) { this.config = config; this.model = model; this.startTime = startTime; diff --git a/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload2.java b/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload2.java new file mode 100644 index 00000000..5add831f --- /dev/null +++ b/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload2.java @@ -0,0 +1,368 @@ +package ctbrec.recorder.download.hls; + +import static ctbrec.io.HttpConstants.*; +import static ctbrec.io.HttpConstants.ORIGIN; +import static ctbrec.recorder.download.StreamSource.*; + +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; + +import javax.xml.bind.JAXBException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.iheartradio.m3u8.Encoding; +import com.iheartradio.m3u8.Format; +import com.iheartradio.m3u8.ParseException; +import com.iheartradio.m3u8.ParsingMode; +import com.iheartradio.m3u8.PlaylistException; +import com.iheartradio.m3u8.PlaylistParser; +import com.iheartradio.m3u8.data.EncryptionData; +import com.iheartradio.m3u8.data.MediaPlaylist; +import com.iheartradio.m3u8.data.Playlist; +import com.iheartradio.m3u8.data.TrackData; + +import ctbrec.Config; +import ctbrec.Model; +import ctbrec.Settings; +import ctbrec.UnknownModel; +import ctbrec.io.BandwidthMeter; +import ctbrec.io.HttpClient; +import ctbrec.io.HttpException; +import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException; +import ctbrec.recorder.download.AbstractDownload; +import ctbrec.recorder.download.HttpHeaderFactory; +import ctbrec.recorder.download.SplittingStrategy; +import ctbrec.recorder.download.StreamSource; +import ctbrec.sites.Site; +import okhttp3.Request; +import okhttp3.Request.Builder; +import okhttp3.Response; + +public abstract class AbstractHlsDownload2 extends AbstractDownload { + + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractHlsDownload2.class); + private static final int TEN_SECONDS = 10_000; + + private transient NumberFormat nf = new DecimalFormat("000000"); + private transient int playlistEmptyCount = 0; + private transient int segmentCounter = 1; + private transient int waitFactor = 1; + protected transient Config config; + protected transient HttpClient client; + protected transient ExecutorService downloadExecutor; + protected transient volatile boolean running = false; + protected transient SplittingStrategy splittingStrategy; + + protected Model model = new UnknownModel(); + + protected AbstractHlsDownload2(HttpClient client) { + this.client = client; + } + + protected void onStart() throws IOException {} + protected abstract void createTargetDirectory() throws IOException; + protected abstract void execute(SegmentDownload segmentDownload); + protected abstract OutputStream getSegmentOutputStream(String prefix, String fileName) throws IOException; + protected void segmentDownloadFinished(SegmentDownload segmentDownload) {} + protected abstract void internalStop(); + protected void onFinish() {} + protected void finalizeDownload() {} + + @Override + public void init(Config config, Model model, Instant startTime, ExecutorService executorService) { + this.config = config; + this.model = model; + this.startTime = startTime; + this.downloadExecutor = executorService; + splittingStrategy = initSplittingStrategy(config.getSettings()); + } + + @Override + public void start() throws IOException { + running = true; + try { + onStart(); + String segmentPlaylistUrl = getSegmentPlaylistUrl(model); + createTargetDirectory(); + int lastSegmentNumber = 0; + int nextSegmentNumber = 0; + while (running) { + SegmentPlaylist segmentPlaylist = getNextSegments(segmentPlaylistUrl); + emptyPlaylistCheck(segmentPlaylist); + handleMissedSegments(segmentPlaylist, nextSegmentNumber); + enqueueNewSegments(segmentPlaylist, nextSegmentNumber); + splitRecordingIfNecessary(); + waitSomeTime(segmentPlaylist, lastSegmentNumber, waitFactor); + + // this if check makes sure, that we don't decrease nextSegment. for some reason + // streamate playlists sometimes jump back. e.g. max sequence = 79 -> 80 -> 79 + lastSegmentNumber = segmentPlaylist.seq; + if (lastSegmentNumber + segmentPlaylist.segments.size() > nextSegmentNumber) { + nextSegmentNumber = lastSegmentNumber + segmentPlaylist.segments.size(); + } + } + onFinish(); + } catch (ParseException e) { + throw new IOException("Couldn't parse HLS playlist for model " + model + "\n" + e.getInput(), e); + } catch (PlaylistException e) { + throw new IOException("Couldn't parse HLS playlist for model " + model, e); + } catch (EOFException e) { + // end of playlist reached + LOG.debug("Reached end of playlist for model {}", model); + } catch (HttpException e) { + handleHttpException(e); + } catch (Exception e) { + throw new IOException("Couldn't download segment", e); + } finally { + finalizeDownload(); + running = false; + } + } + + protected String getSegmentPlaylistUrl(Model model) throws IOException, ExecutionException, ParseException, PlaylistException, JAXBException { + LOG.debug("{} stream idx: {}", model.getName(), model.getStreamUrlIndex()); + List streamSources = model.getStreamSources(); + Collections.sort(streamSources); + for (StreamSource streamSource : streamSources) { + LOG.debug("{} src {}", model.getName(), streamSource); + } + String url = null; + if (model.getStreamUrlIndex() >= 0 && model.getStreamUrlIndex() < streamSources.size()) { + // TODO don't use the index, but the bandwidth. if the bandwidth does not match, take the closest one + LOG.debug("{} selected {}", model.getName(), streamSources.get(model.getStreamUrlIndex())); + url = streamSources.get(model.getStreamUrlIndex()).getMediaPlaylistUrl(); + } else { + // filter out stream resolutions, which are out of range of the configured min and max + int minRes = Config.getInstance().getSettings().minimumResolution; + int maxRes = Config.getInstance().getSettings().maximumResolution; + List filteredStreamSources = streamSources.stream() + .filter(src -> src.height == 0 || src.height == UNKNOWN || minRes <= src.height) + .filter(src -> src.height == 0 || src.height == UNKNOWN || maxRes >= src.height) + .collect(Collectors.toList()); + + if (filteredStreamSources.isEmpty()) { + throw new ExecutionException(new RuntimeException("No stream left in playlist")); + } else { + LOG.debug("{} selected {}", model.getName(), filteredStreamSources.get(filteredStreamSources.size() - 1)); + url = filteredStreamSources.get(filteredStreamSources.size() - 1).getMediaPlaylistUrl(); + } + } + LOG.debug("Segment playlist url {}", url); + return url; + } + + protected SegmentPlaylist getNextSegments(String segmentsURL) throws IOException, ParseException, PlaylistException { + URL segmentsUrl = new URL(segmentsURL); + Builder builder = new Request.Builder() + .url(segmentsUrl); + addHeaders(builder, Optional.ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentPlaylistHeaders).orElse(new HashMap<>()), model); + Request request = builder.build(); + + try (Response response = client.execute(request)) { + if (response.isSuccessful()) { + String body = response.body().string(); + if (!body.contains("#EXTINF")) { + // no segments, empty playlist + return new SegmentPlaylist(segmentsURL); + } + + byte[] bytes = body.getBytes(StandardCharsets.UTF_8); + BandwidthMeter.add(bytes.length); + InputStream inputStream = new ByteArrayInputStream(bytes); + PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT); + Playlist playlist = parser.parse(); + if (playlist.hasMediaPlaylist()) { + MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist(); + SegmentPlaylist lsp = new SegmentPlaylist(segmentsURL); + lsp.seq = mediaPlaylist.getMediaSequenceNumber(); + lsp.targetDuration = mediaPlaylist.getTargetDuration(); + List tracks = mediaPlaylist.getTracks(); + for (TrackData trackData : tracks) { + String uri = trackData.getUri(); + if (!uri.startsWith("http")) { + URL context = new URL(segmentsURL); + uri = new URL(context, uri).toExternalForm(); + } + lsp.totalDuration += trackData.getTrackInfo().duration; + lsp.lastSegDuration = trackData.getTrackInfo().duration; + lsp.segments.add(uri); + if (trackData.hasEncryptionData()) { + lsp.encrypted = true; + EncryptionData data = trackData.getEncryptionData(); + lsp.encryptionKeyUrl = data.getUri(); + lsp.encryptionMethod = data.getMethod().getValue(); + } + } + return lsp; + } + throw new InvalidPlaylistException("Playlist has no media playlist"); + } else { + throw new HttpException(response.code(), response.message()); + } + } + } + + protected void emptyPlaylistCheck(SegmentPlaylist playlist) { + if(playlist.segments.isEmpty()) { + playlistEmptyCount++; + try { + Thread.sleep(6000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } else { + playlistEmptyCount = 0; + } + if(playlistEmptyCount == 10) { + LOG.info("Last 10 playlists were empty for {}. Stopping recording!", getModel()); + internalStop(); + } + } + + private void handleMissedSegments(SegmentPlaylist playlist, int nextSegmentNumber) { + if (nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) { + waitFactor *= 2; + LOG.warn("Missed segments {} < {} in download for {} - setting wait factor to 1/{}", nextSegmentNumber, playlist.seq, model, + waitFactor); + } + } + + private void splitRecordingIfNecessary() { + if (splittingStrategy.splitNecessary(this)) { + internalStop(); + } + } + + protected SplittingStrategy initSplittingStrategy(Settings settings) { + SplittingStrategy strategy; + switch (settings.splitStrategy) { + case TIME: + strategy = new TimeSplittingStrategy(); + break; + case SIZE: + strategy = new SizeSplittingStrategy(); + break; + case TIME_OR_SIZE: + SplittingStrategy timeSplittingStrategy = new TimeSplittingStrategy(); + SplittingStrategy sizeSplittingStrategy = new SizeSplittingStrategy(); + strategy = new CombinedSplittingStrategy(timeSplittingStrategy, sizeSplittingStrategy); + break; + case DONT: + default: + strategy = new NoopSplittingStrategy(); + break; + } + strategy.init(settings); + return strategy; + } + + protected void enqueueNewSegments(SegmentPlaylist playlist, int nextSegmentNumber) throws IOException { + int skip = nextSegmentNumber - playlist.seq; + for (String segment : playlist.segments) { + if (skip > 0) { + skip--; + } else { + URL segmentUrl = new URL(segment); + String prefix = nf.format(segmentCounter++); + File tmp = new File(segmentUrl.getFile()); + OutputStream targetStream = getSegmentOutputStream(prefix, tmp.getName()); + SegmentDownload segmentDownload = new SegmentDownload(model, playlist, segmentUrl, client, targetStream); + execute(segmentDownload); + } + } + } + + private void waitSomeTime(SegmentPlaylist playlist, int lastSegmentNumber, int waitFactor) { + long waitForMillis = 0; + if (lastSegmentNumber == playlist.seq) { + // playlist didn't change -> wait for at least half the target duration + waitForMillis = (long) playlist.targetDuration * 1000 / waitFactor; + LOG.trace("Playlist didn't change... waiting for {}ms", waitForMillis); + } else { + // playlist did change -> wait for at least last segment duration + waitForMillis = 1; + LOG.trace("Playlist changed... waiting for {}ms", waitForMillis); + } + + waitSomeTime(waitForMillis); + } + + /** + * Causes the current thread to sleep for a short amount of time. + * This is used to slow down retries, if something is wrong with the playlist. + * E.g. HTTP 403 or 404 + */ + protected void waitSomeTime(long waitForMillis) { + try { + Thread.sleep(waitForMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if(running) { + LOG.error("Couldn't sleep. This might mess up the download!"); + } + } + } + + private void handleHttpException(HttpException e) throws IOException { + if (e.getResponseCode() == 404) { + ctbrec.Model.State modelState; + try { + modelState = model.getOnlineState(false); + } catch (ExecutionException e1) { + modelState = ctbrec.Model.State.UNKNOWN; + } + LOG.info("Playlist not found (404). Model {} probably went offline. Model state: {}", model, modelState); + waitSomeTime(TEN_SECONDS); + } else if (e.getResponseCode() == 403) { + ctbrec.Model.State modelState; + try { + modelState = model.getOnlineState(false); + } catch (ExecutionException e1) { + modelState = ctbrec.Model.State.UNKNOWN; + } + LOG.info("Playlist access forbidden (403). Model {} probably went private or offline. Model state: {}", model, modelState); + waitSomeTime(TEN_SECONDS); + } else { + throw e; + } + } + + protected static void addHeaders(Builder builder, Map headers, Model model) { + headers.putIfAbsent(ACCEPT, "*/*"); + headers.putIfAbsent(ACCEPT_LANGUAGE, Locale.ENGLISH.getLanguage()); + headers.putIfAbsent(USER_AGENT, Config.getInstance().getSettings().httpUserAgent); + headers.putIfAbsent(CONNECTION, KEEP_ALIVE); + headers.computeIfAbsent(ORIGIN, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null)); + headers.computeIfAbsent(REFERER, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null)); + + for (Entry header : headers.entrySet()) { + builder.header(header.getKey(), header.getValue()); + } + } + + @Override + public Model getModel() { + return model; + } +} diff --git a/common/src/main/java/ctbrec/recorder/download/hls/FFmpegDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/FFmpegDownload.java index c58239d6..311ae108 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/FFmpegDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/FFmpegDownload.java @@ -9,6 +9,7 @@ import java.nio.file.Files; import java.time.Instant; import java.util.Arrays; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.regex.Pattern; import javax.xml.bind.JAXBException; @@ -43,7 +44,7 @@ public class FFmpegDownload extends AbstractHlsDownload { } @Override - public void init(Config config, Model model, Instant startTime) { + public void init(Config config, Model model, Instant startTime, ExecutorService executorService) { this.config = config; this.model = model; this.startTime = startTime; diff --git a/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java index 3e2db3b4..4a64429e 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java @@ -1,27 +1,19 @@ package ctbrec.recorder.download.hls; -import java.io.EOFException; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.net.URL; +import java.io.OutputStream; import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; -import java.text.DecimalFormat; -import java.text.NumberFormat; import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; -import java.util.HashMap; -import java.util.Optional; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; @@ -36,100 +28,39 @@ import ctbrec.Config; import ctbrec.Model; import ctbrec.Recording; import ctbrec.Recording.State; -import ctbrec.io.BandwidthMeter; import ctbrec.io.HttpClient; -import ctbrec.io.HttpException; import ctbrec.io.IoUtils; import ctbrec.recorder.PlaylistGenerator; -import ctbrec.recorder.download.HttpHeaderFactory; -import okhttp3.Request; -import okhttp3.Request.Builder; -import okhttp3.Response; -public class HlsDownload extends AbstractHlsDownload { - - private static final int TEN_SECONDS = 10_000; +public class HlsDownload extends AbstractHlsDownload2 { private static final Logger LOG = LoggerFactory.getLogger(HlsDownload.class); protected transient Path downloadDir; - - private int segmentCounter = 1; - private NumberFormat nf = new DecimalFormat("000000"); private transient AtomicBoolean downloadFinished = new AtomicBoolean(false); - protected transient Config config; - private transient int waitFactor = 1; public HlsDownload(HttpClient client) { super(client); } @Override - public void init(Config config, Model model, Instant startTime) { - this.config = config; - super.model = model; + public void init(Config config, Model model, Instant startTime, ExecutorService executorService) { + super.init(config, model, startTime, executorService); DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.RECORDING_DATE_FORMAT); - this.startTime = startTime; String formattedStartTime = formatter.format(ZonedDateTime.ofInstant(this.startTime, ZoneId.systemDefault())); Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getSanitizedNamed()); downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), formattedStartTime); - splittingStrategy = initSplittingStrategy(config.getSettings()); } @Override - public void start() throws IOException { - try { - running = true; - Thread.currentThread().setName("Download " + model.getName()); - String segments = getSegmentPlaylistUrl(model); - if (segments != null) { - if (!downloadDir.toFile().exists()) { - Files.createDirectories(downloadDir); - } - int lastSegmentNumber = 0; - int nextSegmentNumber = 0; - while (running) { - SegmentPlaylist playlist = getNextSegments(segments); - emptyPlaylistCheck(playlist); - logMissedSegments(playlist, nextSegmentNumber); - enqueueNewSegments(playlist, nextSegmentNumber); - splitRecordingIfNecessary(); - waitSomeTime(playlist, lastSegmentNumber, waitFactor); - - // this if check makes sure, that we don't decrease nextSegment. for some reason - // streamate playlists sometimes jump back. e.g. max sequence = 79 -> 80 -> 79 - lastSegmentNumber = playlist.seq; - if (lastSegmentNumber + playlist.segments.size() > nextSegmentNumber) { - nextSegmentNumber = lastSegmentNumber + playlist.segments.size(); - } - } - } else { - throw new IOException("Couldn't determine segments uri"); - } - } catch (ParseException e) { - throw new IOException("Couldn't parse HLS playlist for model " + model + "\n" + e.getInput(), e); - } catch (PlaylistException e) { - throw new IOException("Couldn't parse HLS playlist for model " + model, e); - } catch (EOFException e) { - // end of playlist reached - LOG.debug("Reached end of playlist for model {}", model); - } catch (HttpException e) { - handleHttpException(e); - } catch (Exception e) { - throw new IOException("Couldn't download segment", e); - } finally { - finalizeDownload(); + protected void createTargetDirectory() throws IOException { + if (!downloadDir.toFile().exists()) { + Files.createDirectories(downloadDir); } } - private void finalizeDownload() { - downloadThreadPool.shutdown(); - try { - LOG.debug("Waiting for last segments for {}", model); - downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + @Override + protected void finalizeDownload() { downloadFinished.set(true); synchronized (downloadFinished) { downloadFinished.notifyAll(); @@ -137,90 +68,6 @@ public class HlsDownload extends AbstractHlsDownload { LOG.debug("Download for {} terminated", model); } - private void handleHttpException(HttpException e) throws IOException { - if (e.getResponseCode() == 404) { - ctbrec.Model.State modelState; - try { - modelState = model.getOnlineState(false); - } catch (ExecutionException e1) { - modelState = ctbrec.Model.State.UNKNOWN; - } - LOG.info("Playlist not found (404). Model {} probably went offline. Model state: {}", model, modelState); - waitSomeTime(TEN_SECONDS); - } else if (e.getResponseCode() == 403) { - ctbrec.Model.State modelState; - try { - modelState = model.getOnlineState(false); - } catch (ExecutionException e1) { - modelState = ctbrec.Model.State.UNKNOWN; - } - LOG.info("Playlist access forbidden (403). Model {} probably went private or offline. Model state: {}", model, modelState); - waitSomeTime(TEN_SECONDS); - } else { - throw e; - } - } - - private void splitRecordingIfNecessary() { - if (splittingStrategy.splitNecessary(this)) { - internalStop(); - } - } - - private void enqueueNewSegments(SegmentPlaylist playlist, int nextSegmentNumber) throws IOException, ExecutionException, InterruptedException { - int skip = nextSegmentNumber - playlist.seq; - for (String segment : playlist.segments) { - if (skip > 0) { - skip--; - } else { - URL segmentUrl = new URL(segment); - String prefix = nf.format(segmentCounter++); - SegmentDownload segmentDownload = new SegmentDownload(playlist, segmentUrl, downloadDir, client, prefix); - enqueueDownload(segmentDownload, prefix, segmentUrl); - } - } - } - - private void logMissedSegments(SegmentPlaylist playlist, int nextSegmentNumber) { - if (nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) { - waitFactor *= 2; - LOG.warn("Missed segments {} < {} in download for {} - setting wait factor to 1/{}", nextSegmentNumber, playlist.seq, model, - waitFactor); - } - } - - private void waitSomeTime(SegmentPlaylist playlist, int lastSegmentNumber, int waitFactor) { - long waitForMillis = 0; - if (lastSegmentNumber == playlist.seq) { - // playlist didn't change -> wait for at least half the target duration - waitForMillis = (long) playlist.targetDuration * 1000 / waitFactor; - LOG.trace("Playlist didn't change... waiting for {}ms", waitForMillis); - } else { - // playlist did change -> wait for at least last segment duration - waitForMillis = 1; - LOG.trace("Playlist changed... waiting for {}ms", waitForMillis); - } - - waitSomeTime(waitForMillis); - } - - private void enqueueDownload(SegmentDownload segmentDownload, String prefix, URL segmentUrl) throws IOException, ExecutionException, InterruptedException { - try { - downloadThreadPool.submit(segmentDownload); - if (downloadQueue.remainingCapacity() < 10) { - LOG.debug("space left in queue {}", downloadQueue.remainingCapacity()); - // if the queue is running full, we might be struggling with timeouts - // let's check, if the model is still online - if (!model.isOnline(true)) { - downloadQueue.clear(); - internalStop(); - } - } - } catch(RejectedExecutionException e) { - LOG.warn("Download queue is full ({}). Skipping segment {} {}", downloadQueue.size(), prefix, segmentUrl); - } - } - @Override public void postprocess(Recording recording) { Thread.currentThread().setName("PP " + model.getName()); @@ -249,10 +96,22 @@ public class HlsDownload extends AbstractHlsDownload { } + @Override + protected void execute(SegmentDownload segmentDownload) { + CompletableFuture.supplyAsync(segmentDownload::call).whenComplete((result, exception) -> { + if (result != null) { + try { + result.getOutputStream().close(); + } catch (IOException e) { + LOG.error("Couldn't close segment file", e); + } + } + }); + } + @Override public void stop() { if (running) { - internalStop(); try { synchronized (downloadFinished) { while (!downloadFinished.get()) { @@ -264,66 +123,12 @@ public class HlsDownload extends AbstractHlsDownload { LOG.error("Couldn't wait for download to finish", e); } } + internalStop(); } @Override - void internalStop() { + protected void internalStop() { running = false; - downloadThreadPool.shutdownNow(); - } - - private class SegmentDownload implements Callable { - private URL url; - private Path file; - private HttpClient client; - private SegmentPlaylist playlist; - - public SegmentDownload(SegmentPlaylist playlist, URL url, Path dir, HttpClient client, String prefix) { - this.playlist = playlist; - this.url = url; - this.client = client; - File path = new File(url.getPath()); - file = FileSystems.getDefault().getPath(dir.toString(), prefix + '_' + path.getName()); - } - - @Override - public Boolean call() throws Exception { - LOG.trace("Downloading segment {} to {}", url, file); - for (int tries = 1; tries <= 3 && !Thread.currentThread().isInterrupted(); tries++) { - Builder builder = new Request.Builder().url(url); - addHeaders(builder, Optional.ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentHeaders).orElse(new HashMap<>())); - Request request = builder.build(); - InputStream in = null; - try (Response response = client.execute(request); FileOutputStream fos = new FileOutputStream(file.toFile())) { - if (response.isSuccessful()) { - in = response.body().byteStream(); - if(playlist.encrypted) { - in = new Crypto(playlist.encryptionKeyUrl, client).wrap(in); - } - byte[] b = new byte[1024 * 100]; - int length = -1; - while( (length = in.read(b)) >= 0 && !Thread.currentThread().isInterrupted()) { - fos.write(b, 0, length); - BandwidthMeter.add(length); - } - return true; - } - } catch(FileNotFoundException e) { - LOG.debug("Segment does not exist {}", url.getFile()); - break; - } catch (InterruptedIOException e) { - break; - } catch(Exception e) { - LOG.error("Error", e); - if (tries == 3) { - LOG.warn("Error while downloading segment. Segment {} finally failed: {}", file.toFile().getName(), e.getMessage()); - } else { - LOG.debug("Error while downloading segment on try {} - {}", tries, e.getMessage()); - } - } - } - return false; - } } @Override @@ -348,4 +153,19 @@ public class HlsDownload extends AbstractHlsDownload { public long getSizeInByte() { return IoUtils.getDirectorySize(getTarget()); } + + @Override + protected OutputStream getSegmentOutputStream(String prefix, String fileName) throws FileNotFoundException { + File file = FileSystems.getDefault().getPath(downloadDir.toAbsolutePath().toString(), prefix + '_' + fileName).toFile(); + return new FileOutputStream(file); + } + + @Override + protected void segmentDownloadFinished(SegmentDownload segmentDownload) { + try { + segmentDownload.getOutputStream().close(); + } catch (IOException e) { + LOG.warn("Couldn't close segment file"); + } + } } diff --git a/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java index ddc04067..f868aee0 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java @@ -2,75 +2,55 @@ package ctbrec.recorder.download.hls; import static java.util.Optional.*; -import java.io.EOFException; import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.net.MalformedURLException; import java.net.URL; import java.nio.file.Files; import java.time.Instant; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Queue; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.iheartradio.m3u8.ParseException; -import com.iheartradio.m3u8.PlaylistException; - import ctbrec.Config; import ctbrec.Hmac; import ctbrec.Model; import ctbrec.OS; import ctbrec.Recording; -import ctbrec.io.BandwidthMeter; import ctbrec.io.HttpClient; import ctbrec.io.HttpException; import ctbrec.recorder.FFmpeg; import ctbrec.recorder.ProgressListener; -import ctbrec.recorder.download.HttpHeaderFactory; import ctbrec.recorder.download.ProcessExitedUncleanException; import okhttp3.Request; -import okhttp3.Request.Builder; import okhttp3.Response; -public class MergedFfmpegHlsDownload extends AbstractHlsDownload { +public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 { - private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(MergedFfmpegHlsDownload.class); - private static final boolean IGNORE_CACHE = true; + private File targetFile; - private transient Config config; private transient Process ffmpegProcess; private transient OutputStream ffmpegStdIn; protected transient Thread ffmpegThread; private transient Object ffmpegStartMonitor = new Object(); - private transient Queue> downloads = new LinkedList<>(); - private transient int lastSegment = 0; - private transient int nextSegment = 0; + private BlockingQueue queue = new LinkedBlockingQueue<>(); public MergedFfmpegHlsDownload(HttpClient client) { super(client); } @Override - public void init(Config config, Model model, Instant startTime) { - super.startTime = startTime; - this.config = config; - this.model = model; + public void init(Config config, Model model, Instant startTime, ExecutorService executorService) { + super.init(config, model, startTime, executorService); String fileSuffix = config.getSettings().ffmpegFileSuffix; targetFile = config.getFileForRecording(model, fileSuffix, startTime); - splittingStrategy = initSplittingStrategy(config.getSettings()); } @Override @@ -79,52 +59,46 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { } @Override - public void start() throws IOException { - try { - running = true; - Thread.currentThread().setName("Download " + model.getName()); - super.startTime = Instant.now(); - - String segments = getSegmentPlaylistUrl(model); - - Files.createDirectories(targetFile.getParentFile().toPath()); - startFfmpegProcess(targetFile); - synchronized (ffmpegStartMonitor) { - int tries = 0; - while (ffmpegProcess == null && tries++ < 15) { - LOG.debug("Waiting for FFmpeg to spawn to record {}", model.getName()); + protected void onStart() throws IOException { + createTargetDirectory(); + startFfmpegProcess(targetFile); + synchronized (ffmpegStartMonitor) { + int tries = 0; + while (ffmpegProcess == null && tries++ < 15) { + LOG.debug("Waiting for FFmpeg to spawn to record {}", model.getName()); + try { ffmpegStartMonitor.wait(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; } } + } - if (ffmpegProcess == null) { - throw new ProcessExitedUncleanException("Couldn't spawn FFmpeg"); - } else { - LOG.debug("Starting to download segments"); - startDownloadLoop(segments); - ffmpegThread.join(); - LOG.debug("FFmpeg thread terminated"); + if (ffmpegProcess == null) { + throw new ProcessExitedUncleanException("Couldn't spawn FFmpeg"); + } + + downloadExecutor.submit(() -> { + while (running && !Thread.currentThread().isInterrupted()) { + try { + SegmentDownload segmentDownload = queue.poll(5, TimeUnit.SECONDS); + if (segmentDownload != null) { + segmentDownload.call(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } - } catch (ParseException e) { - throw new IOException("Couldn't parse stream information", e); - } catch (PlaylistException e) { - throw new IOException("Couldn't parse HLS playlist", e); - } catch (EOFException e) { - // end of playlist reached - LOG.debug("Reached end of playlist for model {}", model); - } catch (Exception e) { - throw new IOException("Exception while downloading segments", e); - } finally { - internalStop(); - downloadThreadPool.shutdown(); - try { - LOG.debug("Waiting for last segments for {}", model); - downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - running = false; - LOG.debug("Download for {} terminated", model); + }); + } + + @Override + protected void onFinish() { + try { + ffmpegThread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } @@ -168,179 +142,9 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { return OS.getFFmpegCommand(argsPlusFile); } - protected void startDownloadLoop(String segmentPlaylistUri) throws IOException, ParseException, PlaylistException { - while (running) { - try { - downloadSegments(segmentPlaylistUri); - } catch (HttpException e) { - logHttpException(e); - running = false; - } catch (MalformedURLException e) { - LOG.info("Malformed URL {} - {}", model, segmentPlaylistUri, e); - running = false; - } catch (Exception e) { - LOG.info("Unexpected error while downloading {}", model, e); - running = false; - } - } - ffmpegThread.interrupt(); - } - - private void downloadSegments(String segmentPlaylistUri) throws IOException, ParseException, PlaylistException, ExecutionException { - SegmentPlaylist lsp = getNextSegments(segmentPlaylistUri); - emptyPlaylistCheck(lsp); - - // download new segments - long downloadStart = System.currentTimeMillis(); - downloadNewSegments(lsp, nextSegment); - long downloadTookMillis = System.currentTimeMillis() - downloadStart; - - // download segments, which might have been skipped - if (nextSegment > 0 && lsp.seq > nextSegment) { - LOG.warn("Missed segments {} < {} in download for {}. Download took {}ms. Playlist is {}sec", nextSegment, lsp.seq, lsp.url, - downloadTookMillis, lsp.totalDuration); - } - - splitRecordingIfNecessary(); - - // wait some time until requesting the segment playlist again to not hammer the server - waitForNewSegments(lsp, lastSegment, downloadTookMillis); - - lastSegment = lsp.seq; - nextSegment = lastSegment + lsp.segments.size(); - } - - private void logHttpException(HttpException e) { - if (e.getResponseCode() == 404) { - LOG.debug("Playlist not found (404). Model {} probably went offline", model); - } else if (e.getResponseCode() == 403) { - LOG.debug("Playlist access forbidden (403). Model {} probably went private or offline", model); - } else { - LOG.info("Unexpected error while downloading {}", model, e); - } - } - - protected void splitRecordingIfNecessary() { - if (splittingStrategy.splitNecessary(this)) { - internalStop(); - } - } - - private void downloadNewSegments(SegmentPlaylist lsp, int nextSegment) throws ExecutionException, IOException { - int skip = nextSegment - lsp.seq; - - // add segments to download threadpool - downloads.clear(); - if (downloadQueue.remainingCapacity() == 0) { - LOG.warn("Download to slow for this stream. Download queue is full. Skipping segment"); - } else { - for (String segment : lsp.segments) { - if (!running) { - break; - } - if (skip > 0) { - skip--; - } else { - URL segmentUrl = new URL(segment); - Future download = downloadThreadPool.submit(new SegmentDownload(lsp, segmentUrl, client)); - downloads.add(download); - } - } - } - - writeFinishedSegments(downloads); - } - - private void writeFinishedSegments(Queue> downloads) throws ExecutionException, IOException { - for (Future downloadFuture : downloads) { - try { - byte[] segmentData = downloadFuture.get(30, TimeUnit.SECONDS); - writeSegment(segmentData); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("Error while downloading segment", e); - } catch (TimeoutException e) { - LOG.info("Segment download took too long for {}. Not waiting for it any longer", getModel()); - } catch (CancellationException e) { - LOG.info("Segment download cancelled"); - } catch (ExecutionException e) { - handleExecutionExceptione(e); - } - } - } - - private void handleExecutionExceptione(ExecutionException e) throws HttpException, ExecutionException { - Throwable cause = e.getCause(); - if (cause instanceof MissingSegmentException) { - if (model != null && !isModelOnline()) { - LOG.debug("Error while downloading segment, because model {} is offline. Stopping now", model.getName()); - running = false; - } else { - LOG.debug("Segment not available, but model {} still online. Going on", ofNullable(model).map(Model::getName).orElse("n/a")); - } - } else if (cause instanceof HttpException) { - handleHttpException((HttpException)cause); - } else { - throw e; - } - } - - private void handleHttpException(HttpException he) throws HttpException { - if (model != null && !isModelOnline()) { - LOG.debug("Error {} while downloading segment, because model {} is offline. Stopping now", he.getResponseCode(), model.getName()); - running = false; - } else { - if (he.getResponseCode() == 404) { - LOG.info("Playlist for {} not found [HTTP 404]. Stopping now", ofNullable(model).map(Model::getName).orElse("n/a")); - running = false; - } else if (he.getResponseCode() == 403) { - LOG.info("Playlist for {} not accessible [HTTP 403]. Stopping now", ofNullable(model).map(Model::getName).orElse("n/a")); - running = false; - } else { - throw he; - } - } - } - - protected void writeSegment(byte[] segmentData, int offset, int length) throws IOException { - if (running) { - if (ffmpegStdIn != null) { - ffmpegStdIn.write(segmentData, offset, length); - } else { - LOG.error("FFmpeg stdin stream is null - skipping writing of segment"); - } - } - } - - private void writeSegment(byte[] segmentData) throws IOException { - writeSegment(segmentData, 0, segmentData.length); - } - - private void waitForNewSegments(SegmentPlaylist lsp, int lastSegment, long downloadTookMillis) { - try { - long wait = 0; - if (lastSegment == lsp.seq) { - int timeLeftMillis = (int) (lsp.totalDuration * 1000 - downloadTookMillis); - if (timeLeftMillis < 3000) { // we have less than 3 seconds to get the new playlist and start downloading it - wait = 1; - } else { - // wait a second to be nice to the server (don't hammer it with requests) - // 1 second seems to be a good compromise. every other calculation resulted in more missing segments - wait = 1000; - } - LOG.trace("Playlist didn't change... waiting for {}ms", wait); - } else { - // playlist did change -> wait for at least last segment duration - wait = 1; - LOG.trace("Playlist changed... waiting for {}ms", wait); - } - Thread.sleep(wait); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - if (running) { - LOG.error("Couldn't sleep between segment downloads. This might mess up the download!"); - } - } + @Override + protected void execute(SegmentDownload segmentDownload) { + queue.add(segmentDownload); } @Override @@ -351,23 +155,9 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { } @Override - synchronized void internalStop() { + protected synchronized void internalStop() { running = false; - try { - downloadQueue.clear(); - for (Future future : downloads) { - future.cancel(true); - } - downloadThreadPool.shutdownNow(); - LOG.debug("Waiting for segment download thread pool to terminate for model {}", getModel()); - downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS); - LOG.debug("Segment download thread pool terminated for model {}", getModel()); - } catch (InterruptedException e) { - LOG.error("Interrupted while waiting for segment pool to shutdown"); - Thread.currentThread().interrupt(); - } - if (ffmpegStdIn != null) { try { ffmpegStdIn.close(); @@ -394,62 +184,6 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { } } - private class SegmentDownload implements Callable { - private URL url; - private HttpClient client; - private SegmentPlaylist lsp; - - public SegmentDownload(SegmentPlaylist lsp, URL url, HttpClient client) { - this.lsp = lsp; - this.url = url; - this.client = client; - } - - @Override - public byte[] call() throws IOException { - LOG.trace("Downloading segment {}", url.getFile()); - int maxTries = 3; - for (int i = 1; i <= maxTries && running; i++) { - Builder builder = new Request.Builder().url(url); - addHeaders(builder, ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentHeaders).orElse(new HashMap<>())); - Request request = builder.build(); - try (Response response = client.execute(request)) { - if (response.isSuccessful()) { - byte[] segment = response.body().bytes(); - BandwidthMeter.add(segment.length); - if (lsp.encrypted) { - segment = new Crypto(lsp.encryptionKeyUrl, client).decrypt(segment); - } - return segment; - } else { - throw new HttpException(response.code(), response.message()); - } - } catch (Exception e) { - if (i == maxTries) { - LOG.error("Error while downloading segment. Segment {} finally failed", url.getFile()); - } else { - LOG.trace("Error while downloading segment {} on try {}", url.getFile(), i, e); - } - if (model != null && !isModelOnline()) { - break; - } - } - } - throw new MissingSegmentException("Unable to download segment " + url.getFile() + " after " + maxTries + " tries"); - } - } - - public boolean isModelOnline() { - try { - return model.isOnline(IGNORE_CACHE); - } catch (IOException | ExecutionException e) { - return false; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return false; - } - } - @Override public String getPath(Model model) { String absolutePath = targetFile.getAbsolutePath(); @@ -522,4 +256,14 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { public long getSizeInByte() { return getTarget().length(); } + + @Override + protected void createTargetDirectory() throws IOException { + Files.createDirectories(targetFile.getParentFile().toPath()); + } + + @Override + protected OutputStream getSegmentOutputStream(String prefix, String fileName) throws IOException { + return ffmpegStdIn; + } } diff --git a/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java new file mode 100644 index 00000000..af0b25a0 --- /dev/null +++ b/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java @@ -0,0 +1,100 @@ +package ctbrec.recorder.download.hls; + +import static ctbrec.recorder.download.hls.AbstractHlsDownload2.*; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.net.URL; +import java.security.InvalidAlgorithmParameterException; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.util.HashMap; +import java.util.Optional; +import java.util.concurrent.Callable; + +import javax.crypto.NoSuchPaddingException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import ctbrec.Model; +import ctbrec.io.BandwidthMeter; +import ctbrec.io.HttpClient; +import ctbrec.io.HttpException; +import ctbrec.recorder.download.HttpHeaderFactory; +import okhttp3.Request; +import okhttp3.Request.Builder; +import okhttp3.Response; + +public class SegmentDownload implements Callable { + private static final Logger LOG = LoggerFactory.getLogger(SegmentDownload.class); + + private URL url; + private HttpClient client; + private SegmentPlaylist playlist; + private Model model; + private OutputStream out; + + public SegmentDownload(Model model, SegmentPlaylist playlist, URL url, HttpClient client, OutputStream out) { + this.model = model; + this.playlist = playlist; + this.url = url; + this.client = client; + this.out = out; + } + + @Override + public SegmentDownload call() { + LOG.trace("Downloading segment {}", url); + for (int tries = 1; tries <= 3 && !Thread.currentThread().isInterrupted(); tries++) { + Request request = createRequest(); + try (Response response = client.execute(request)) { + handleResponse(response); + break; + } catch (FileNotFoundException e) { + LOG.debug("Segment does not exist {}", url.getFile()); + break; + } catch (InterruptedIOException e) { + break; + } catch (Exception e) { + if (tries == 3) { + LOG.warn("Error while downloading segment for {}. Segment {} finally failed: {}", model, url.getFile(), e.getMessage()); + } else { + LOG.debug("Error while downloading segment for {} on try {} - {}", model, tries, e.getMessage()); + } + } + } + return this; + } + + private boolean handleResponse(Response response) throws InvalidKeyException, NoSuchAlgorithmException, NoSuchPaddingException, InvalidAlgorithmParameterException, IOException { + if (response.isSuccessful()) { + InputStream in = response.body().byteStream(); + if (playlist.encrypted) { + in = new Crypto(playlist.encryptionKeyUrl, client).wrap(in); + } + byte[] b = new byte[1024 * 100]; + int length = -1; + while ((length = in.read(b)) >= 0 && !Thread.currentThread().isInterrupted()) { + out.write(b, 0, length); + BandwidthMeter.add(length); + } + return true; + } else { + throw new HttpException(response.code(), response.message()); + } + } + + private Request createRequest() { + Builder builder = new Request.Builder().url(url); + addHeaders(builder, Optional.ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentHeaders).orElse(new HashMap<>()), model); + return builder.build(); + } + + public OutputStream getOutputStream() { + return out; + } +} \ No newline at end of file diff --git a/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownloadException.java b/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownloadException.java new file mode 100644 index 00000000..445704c5 --- /dev/null +++ b/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownloadException.java @@ -0,0 +1,9 @@ +package ctbrec.recorder.download.hls; + +public class SegmentDownloadException extends RuntimeException { + + public SegmentDownloadException(Exception e) { + super("Segment download failed", e); + } + +} diff --git a/common/src/main/java/ctbrec/recorder/download/hls/SegmentPlaylist.java b/common/src/main/java/ctbrec/recorder/download/hls/SegmentPlaylist.java new file mode 100644 index 00000000..2b135776 --- /dev/null +++ b/common/src/main/java/ctbrec/recorder/download/hls/SegmentPlaylist.java @@ -0,0 +1,20 @@ +package ctbrec.recorder.download.hls; + +import java.util.ArrayList; +import java.util.List; + +public class SegmentPlaylist { + public String url; + public int seq = 0; + public float totalDuration = 0; + public float lastSegDuration = 0; + public float targetDuration = 0; + public List segments = new ArrayList<>(); + public boolean encrypted = false; + public String encryptionMethod = "AES-128"; + public String encryptionKeyUrl; + + public SegmentPlaylist(String url) { + this.url = url; + } +} diff --git a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminHlsDownload.java b/common/src/main/java/ctbrec/sites/jasmin/LiveJasminHlsDownload.java index 2fd0703a..ecefe7d0 100644 --- a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminHlsDownload.java +++ b/common/src/main/java/ctbrec/sites/jasmin/LiveJasminHlsDownload.java @@ -14,6 +14,7 @@ import com.iheartradio.m3u8.PlaylistException; import ctbrec.io.HttpClient; import ctbrec.recorder.download.hls.HlsDownload; +import ctbrec.recorder.download.hls.SegmentPlaylist; public class LiveJasminHlsDownload extends HlsDownload { @@ -33,7 +34,7 @@ public class LiveJasminHlsDownload extends HlsDownload { SegmentPlaylist playlist = super.getNextSegments(segmentUrl); long now = System.currentTimeMillis(); if( (now - lastMasterPlaylistUpdate) > TimeUnit.SECONDS.toMillis(60)) { - super.downloadThreadPool.submit(this::updatePlaylistUrl); + super.downloadExecutor.submit(this::updatePlaylistUrl); lastMasterPlaylistUpdate = now; } return playlist; diff --git a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminMergedHlsDownload.java b/common/src/main/java/ctbrec/sites/jasmin/LiveJasminMergedHlsDownload.java index 4170ddb7..94a9bf6c 100644 --- a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminMergedHlsDownload.java +++ b/common/src/main/java/ctbrec/sites/jasmin/LiveJasminMergedHlsDownload.java @@ -14,6 +14,7 @@ import com.iheartradio.m3u8.PlaylistException; import ctbrec.io.HttpClient; import ctbrec.recorder.download.hls.MergedFfmpegHlsDownload; +import ctbrec.recorder.download.hls.SegmentPlaylist; public class LiveJasminMergedHlsDownload extends MergedFfmpegHlsDownload { @@ -33,7 +34,7 @@ public class LiveJasminMergedHlsDownload extends MergedFfmpegHlsDownload { SegmentPlaylist playlist = super.getNextSegments(segmentUrl); long now = System.currentTimeMillis(); if( (now - lastMasterPlaylistUpdate) > TimeUnit.SECONDS.toMillis(60)) { - super.downloadThreadPool.submit(this::updatePlaylistUrl); + super.downloadExecutor.submit(this::updatePlaylistUrl); lastMasterPlaylistUpdate = now; } return playlist; diff --git a/common/src/main/java/ctbrec/sites/manyvids/MVLiveHlsDownload.java b/common/src/main/java/ctbrec/sites/manyvids/MVLiveHlsDownload.java index de0eab83..be4adef2 100644 --- a/common/src/main/java/ctbrec/sites/manyvids/MVLiveHlsDownload.java +++ b/common/src/main/java/ctbrec/sites/manyvids/MVLiveHlsDownload.java @@ -12,9 +12,9 @@ import ctbrec.io.HttpClient; import ctbrec.recorder.download.hls.HlsDownload; public class MVLiveHlsDownload extends HlsDownload { - private static final Logger LOG = LoggerFactory.getLogger(MVLiveMergedHlsDownload.class); + private static final Logger LOG = LoggerFactory.getLogger(MVLiveHlsDownload.class); - private ScheduledExecutorService scheduler; + private transient ScheduledExecutorService scheduler; public MVLiveHlsDownload(HttpClient client) { super(client); @@ -30,7 +30,7 @@ public class MVLiveHlsDownload extends HlsDownload { t.setPriority(Thread.MIN_PRIORITY); return t; }); - scheduler.scheduleAtFixedRate(() -> updateCloudFlareCookies(), 120, 120, TimeUnit.SECONDS); + scheduler.scheduleAtFixedRate(this::updateCloudFlareCookies, 120, 120, TimeUnit.SECONDS); updateCloudFlareCookies(); super.start(); } finally { diff --git a/common/src/main/java/ctbrec/sites/manyvids/MVLiveMergedHlsDownload.java b/common/src/main/java/ctbrec/sites/manyvids/MVLiveMergedHlsDownload.java index 4a7d2327..0b55bd02 100644 --- a/common/src/main/java/ctbrec/sites/manyvids/MVLiveMergedHlsDownload.java +++ b/common/src/main/java/ctbrec/sites/manyvids/MVLiveMergedHlsDownload.java @@ -1,15 +1,16 @@ package ctbrec.sites.manyvids; -import ctbrec.io.HttpClient; -import ctbrec.recorder.download.hls.MergedFfmpegHlsDownload; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import ctbrec.io.HttpClient; +import ctbrec.recorder.download.hls.MergedFfmpegHlsDownload; + public class MVLiveMergedHlsDownload extends MergedFfmpegHlsDownload { private static final Logger LOG = LoggerFactory.getLogger(MVLiveMergedHlsDownload.class); @@ -35,7 +36,6 @@ public class MVLiveMergedHlsDownload extends MergedFfmpegHlsDownload { super.start(); } finally { scheduler.shutdown(); - } } diff --git a/common/src/main/java/ctbrec/sites/showup/ShowupMergedDownload.java b/common/src/main/java/ctbrec/sites/showup/ShowupMergedDownload.java index 920f51e1..d0e0ca15 100644 --- a/common/src/main/java/ctbrec/sites/showup/ShowupMergedDownload.java +++ b/common/src/main/java/ctbrec/sites/showup/ShowupMergedDownload.java @@ -1,23 +1,10 @@ package ctbrec.sites.showup; -import static ctbrec.io.HttpConstants.*; - -import java.io.IOException; -import java.io.InputStream; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.iheartradio.m3u8.ParseException; -import com.iheartradio.m3u8.PlaylistException; - -import ctbrec.Config; -import ctbrec.io.BandwidthMeter; import ctbrec.io.HttpClient; -import ctbrec.io.HttpException; import ctbrec.recorder.download.hls.MergedFfmpegHlsDownload; -import okhttp3.Request; -import okhttp3.Response; public class ShowupMergedDownload extends MergedFfmpegHlsDownload { @@ -27,48 +14,4 @@ public class ShowupMergedDownload extends MergedFfmpegHlsDownload { super(client); } - @Override - protected void startDownloadLoop(String segmentPlaylistUri) throws IOException, ParseException, PlaylistException { - try { - SegmentPlaylist lsp = getNextSegments(segmentPlaylistUri); - emptyPlaylistCheck(lsp); - - for (String segment : lsp.segments) { - Request request = new Request.Builder().url(segment) - .header(USER_AGENT, Config.getInstance().getSettings().httpUserAgent) - .header(CONNECTION, KEEP_ALIVE) - .build(); - try (Response response = client.execute(request)) { - if (response.isSuccessful()) { - InputStream in = response.body().byteStream(); - byte[] buffer = new byte[10240]; - int length = -1; - boolean keepGoing = true; - while ((length = in.read(buffer)) >= 0 && keepGoing) { - BandwidthMeter.add(length); - writeSegment(buffer, 0, length); - keepGoing = running && !Thread.interrupted() && model.isOnline(true); - splitRecordingIfNecessary(); - } - } else { - throw new HttpException(response.code(), response.message()); - } - } - } - } catch (HttpException e) { - if (e.getResponseCode() == 404) { - LOG.debug("Playlist not found (404). Model {} probably went offline", model); - } else if (e.getResponseCode() == 403) { - LOG.debug("Playlist access forbidden (403). Model {} probably went private or offline", model); - } else { - LOG.info("Unexpected error while downloading {}", model, e); - } - running = false; - } catch (Exception e) { - LOG.info("Unexpected error while downloading {}", model, e); - running = false; - } - - ffmpegThread.interrupt(); - } }