diff --git a/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload.java index b5b9fa6c..db6d548a 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload.java @@ -1,15 +1,21 @@ package ctbrec.recorder.download.hls; import static ctbrec.io.HttpConstants.*; -import static ctbrec.io.HttpConstants.ORIGIN; import static ctbrec.recorder.download.StreamSource.*; +import static java.nio.charset.StandardCharsets.*; import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; +import java.net.SocketTimeoutException; import java.net.URL; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.time.Duration; +import java.time.Instant; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -17,12 +23,9 @@ import java.util.Locale; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.xml.bind.JAXBException; @@ -43,12 +46,14 @@ import com.iheartradio.m3u8.data.TrackData; import ctbrec.Config; import ctbrec.Model; -import ctbrec.Recording.State; +import ctbrec.Model.State; import ctbrec.Settings; import ctbrec.UnknownModel; import ctbrec.io.BandwidthMeter; import ctbrec.io.HttpClient; +import ctbrec.io.HttpConstants; import ctbrec.io.HttpException; +import ctbrec.io.MissedSegmentsStatistics; import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException; import ctbrec.recorder.download.AbstractDownload; import ctbrec.recorder.download.HttpHeaderFactory; @@ -61,94 +66,128 @@ import okhttp3.Response; public abstract class AbstractHlsDownload extends AbstractDownload { - private static final Logger LOG = LoggerFactory.getLogger(AbstractHlsDownload.class); - private static int threadCounter = 0; + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractHlsDownload.class); + private static final int TEN_SECONDS = 10_000; + private transient NumberFormat nf = new DecimalFormat("000000"); + private transient int playlistEmptyCount = 0; + private transient int segmentCounter = 1; + protected transient Config config; protected transient HttpClient client; - protected volatile boolean running = false; - protected Model model = new UnknownModel(); - protected transient LinkedBlockingQueue downloadQueue = new LinkedBlockingQueue<>(50); - protected transient ExecutorService downloadThreadPool = new ThreadPoolExecutor(0, 5, 20, TimeUnit.SECONDS, downloadQueue, createThreadFactory()); + protected transient ExecutorService downloadExecutor; + protected transient volatile boolean running = true; protected transient SplittingStrategy splittingStrategy; - protected State state = State.UNKNOWN; - private int playlistEmptyCount = 0; + protected transient int lastSegmentNumber = 0; + protected transient int nextSegmentNumber = 0; + protected transient String segmentPlaylistUrl; + + private transient String previousPlaylist; + private transient String lastPlaylist; + private transient Instant previousPlaylistRequest = Instant.EPOCH; + private transient Instant afterLastPlaylistRequest= Instant.EPOCH; + private transient Instant beforeLastPlaylistRequest= Instant.EPOCH; + private transient int consecutivePlaylistTimeouts = 0; + + protected Model model = new UnknownModel(); protected AbstractHlsDownload(HttpClient client) { this.client = client; } - private ThreadFactory createThreadFactory() { - return r -> { - Thread t = new Thread(r); - t.setDaemon(true); - t.setName("SegmentDownloadThread-" + threadCounter++); - return t; - }; + protected abstract OutputStream getSegmentOutputStream(String prefix, String fileName) throws IOException; + protected void segmentDownloadFinished(SegmentDownload segmentDownload) {} + protected abstract void internalStop(); + + @Override + public void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException { + this.config = config; + this.model = model; + this.startTime = startTime; + this.downloadExecutor = executorService; + splittingStrategy = initSplittingStrategy(config.getSettings()); } - protected SegmentPlaylist getNextSegments(String segmentsURL) throws IOException, ParseException, PlaylistException { - URL segmentsUrl = new URL(segmentsURL); - Builder builder = new Request.Builder() - .url(segmentsUrl); - addHeaders(builder, Optional.ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentPlaylistHeaders).orElse(new HashMap<>())); - Request request = builder.build(); - - try (Response response = client.execute(request)) { - if (response.isSuccessful()) { - String body = response.body().string(); - if (!body.contains("#EXTINF")) { - // no segments, empty playlist - return new SegmentPlaylist(segmentsURL); - } - - byte[] bytes = body.getBytes(StandardCharsets.UTF_8); - BandwidthMeter.add(bytes.length); - InputStream inputStream = new ByteArrayInputStream(bytes); - PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT); - Playlist playlist = parser.parse(); - if (playlist.hasMediaPlaylist()) { - MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist(); - SegmentPlaylist lsp = new SegmentPlaylist(segmentsURL); - lsp.seq = mediaPlaylist.getMediaSequenceNumber(); - lsp.targetDuration = mediaPlaylist.getTargetDuration(); - List tracks = mediaPlaylist.getTracks(); - for (TrackData trackData : tracks) { - String uri = trackData.getUri(); - if (!uri.startsWith("http")) { - URL context = new URL(segmentsURL); - uri = new URL(context, uri).toExternalForm(); - } - lsp.totalDuration += trackData.getTrackInfo().duration; - lsp.lastSegDuration = trackData.getTrackInfo().duration; - lsp.segments.add(uri); - if (trackData.hasEncryptionData()) { - lsp.encrypted = true; - EncryptionData data = trackData.getEncryptionData(); - lsp.encryptionKeyUrl = data.getUri(); - lsp.encryptionMethod = data.getMethod().getValue(); - } - } - return lsp; - } - throw new InvalidPlaylistException("Playlist has no media playlist"); - } else { - throw new HttpException(response.code(), response.message()); + @Override + public AbstractHlsDownload call() throws Exception { + try { + if (segmentPlaylistUrl == null) { + segmentPlaylistUrl = getSegmentPlaylistUrl(model); } + + previousPlaylistRequest = beforeLastPlaylistRequest; + beforeLastPlaylistRequest = Instant.now(); + SegmentPlaylist segmentPlaylist = getNextSegments(segmentPlaylistUrl); + afterLastPlaylistRequest = Instant.now(); + emptyPlaylistCheck(segmentPlaylist); + handleMissedSegments(segmentPlaylist, nextSegmentNumber); + enqueueNewSegments(segmentPlaylist, nextSegmentNumber); + splitRecordingIfNecessary(); + calculateRescheduleTime(segmentPlaylist, lastSegmentNumber); + + // this if check makes sure, that we don't decrease nextSegment. for some reason + // streamate playlists sometimes jump back. e.g. max sequence = 79 -> 80 -> 79 + lastSegmentNumber = segmentPlaylist.seq; + if (lastSegmentNumber + segmentPlaylist.segments.size() > nextSegmentNumber) { + nextSegmentNumber = lastSegmentNumber + segmentPlaylist.segments.size(); + } + } catch (ParseException e) { + LOG.error("Couldn't parse HLS playlist for model {}\n{}", model, e.getInput(), e); + running = false; + } catch (PlaylistException e) { + LOG.error("Couldn't parse HLS playlist for model {}", model, e); + running = false; + } catch (PlaylistTimeoutException e) { + rescheduleTime = beforeLastPlaylistRequest; // try again as fast as possible + } catch (EOFException e) { + // end of playlist reached + LOG.debug("Reached end of playlist for model {}", model); + running = false; + } catch (HttpException e) { + handleHttpException(e); + } catch (Exception e) { + LOG.error("Couldn't download segment for model {}", model, e); + running = false; + } + return this; + } + + protected void execute(SegmentDownload segmentDownload) { + CompletableFuture.supplyAsync(() -> downloadExecutor.submit(segmentDownload), downloadExecutor) + .whenComplete((result, executor) -> { + try { + segmentDownloadFinished(result.get()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + e.printStackTrace(); + } catch (ExecutionException e) { + e.printStackTrace(); + } + }); + } + + private void handleHttpException(HttpException e) throws IOException { + if (e.getResponseCode() == 404) { + checkIfModelIsStillOnline("Playlist not found (404). Model {} probably went offline. Model state: {}"); + } else if (e.getResponseCode() == 403) { + checkIfModelIsStillOnline("Playlist access forbidden (403). Model {} probably went private or offline. Model state: {}"); + } else { + LOG.info("Playlist couldn't not be downloaded for model {}. Stopping recording", model, e); + running = false; } } - - protected void addHeaders(Builder builder, Map headers) { - headers.putIfAbsent(ACCEPT, "*/*"); - headers.putIfAbsent(ACCEPT_LANGUAGE, Locale.ENGLISH.getLanguage()); - headers.putIfAbsent(USER_AGENT, Config.getInstance().getSettings().httpUserAgent); - headers.putIfAbsent(CONNECTION, KEEP_ALIVE); - headers.computeIfAbsent(ORIGIN, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null)); - headers.computeIfAbsent(REFERER, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null)); - - for (Entry header : headers.entrySet()) { - builder.header(header.getKey(), header.getValue()); + protected void checkIfModelIsStillOnline(String errorMsg) throws IOException { + ctbrec.Model.State modelState; + try { + modelState = model.getOnlineState(false); + if (modelState != State.ONLINE) { + running = false; + } + } catch (ExecutionException e1) { + modelState = ctbrec.Model.State.UNKNOWN; } + LOG.info(errorMsg, model, modelState); + waitSomeTime(TEN_SECONDS); } protected String getSegmentPlaylistUrl(Model model) throws IOException, ExecutionException, ParseException, PlaylistException, JAXBException { @@ -183,6 +222,69 @@ public abstract class AbstractHlsDownload extends AbstractDownload { return url; } + protected SegmentPlaylist getNextSegments(String segmentPlaylistUrl) throws IOException, ParseException, PlaylistException { + URL segmentsUrl = new URL(segmentPlaylistUrl); + Builder builder = new Request.Builder() + .url(segmentsUrl); + addHeaders(builder, Optional.ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentPlaylistHeaders).orElse(new HashMap<>()), model); + Request request = builder.build(); + + try (Response response = client.execute(request, 2000)) { + if (response.isSuccessful()) { + consecutivePlaylistTimeouts = 0; + String body = response.body().string(); + previousPlaylist = lastPlaylist; + lastPlaylist = beforeLastPlaylistRequest.toString()+"\n"+body; + if (!body.contains("#EXTINF")) { + // no segments, empty playlist + return new SegmentPlaylist(segmentPlaylistUrl); + } + + byte[] bytes = body.getBytes(UTF_8); + BandwidthMeter.add(bytes.length); + InputStream inputStream = new ByteArrayInputStream(bytes); + return parsePlaylist(segmentPlaylistUrl, inputStream); + } else { + throw new HttpException(response.code(), response.message()); + } + } catch (SocketTimeoutException e) { + LOG.debug("Playlist request timed out for model {} {} time{}", model, ++consecutivePlaylistTimeouts, (consecutivePlaylistTimeouts > 1) ? 's' : ""); + // times out, return an empty playlist, so that the process can continue without wasting much more time + throw new PlaylistTimeoutException(e); + } + } + + private SegmentPlaylist parsePlaylist(String segmentPlaylistUrl, InputStream inputStream) throws IOException, ParseException, PlaylistException { + PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT); + Playlist playlist = parser.parse(); + if (playlist.hasMediaPlaylist()) { + MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist(); + SegmentPlaylist lsp = new SegmentPlaylist(segmentPlaylistUrl); + lsp.seq = mediaPlaylist.getMediaSequenceNumber(); + lsp.targetDuration = mediaPlaylist.getTargetDuration(); + + List tracks = mediaPlaylist.getTracks(); + for (TrackData trackData : tracks) { + String uri = trackData.getUri(); + if (!uri.startsWith("http")) { + URL context = new URL(segmentPlaylistUrl); + uri = new URL(context, uri).toExternalForm(); + } + lsp.totalDuration += trackData.getTrackInfo().duration; + lsp.segments.add(uri); + if (trackData.hasEncryptionData()) { + lsp.encrypted = true; + EncryptionData data = trackData.getEncryptionData(); + lsp.encryptionKeyUrl = data.getUri(); + lsp.encryptionMethod = data.getMethod().getValue(); + } + } + lsp.avgSegDuration = lsp.totalDuration / tracks.size(); + return lsp; + } + throw new InvalidPlaylistException("Playlist has no media playlist"); + } + protected void emptyPlaylistCheck(SegmentPlaylist playlist) { if(playlist.segments.isEmpty()) { playlistEmptyCount++; @@ -200,42 +302,20 @@ public abstract class AbstractHlsDownload extends AbstractDownload { } } - abstract void internalStop(); - - @Override - public Model getModel() { - return model; - } - - /** - * Causes the current thread to sleep for a short amount of time. - * This is used to slow down retries, if something is wrong with the playlist. - * E.g. HTTP 403 or 404 - */ - protected void waitSomeTime(long waitForMillis) { - try { - Thread.sleep(waitForMillis); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - if(running) { - LOG.error("Couldn't sleep. This might mess up the download!"); - } + private void handleMissedSegments(SegmentPlaylist playlist, int nextSegmentNumber) { + if (nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) { + LOG.warn("Missed segments: {} < {} in download for {}", nextSegmentNumber, playlist.seq, model); + LOG.warn("Missed segments: Last 2 [{}] [{}], schedule was {}, request took {} ms", previousPlaylistRequest, beforeLastPlaylistRequest, rescheduleTime, Duration.between(beforeLastPlaylistRequest, afterLastPlaylistRequest)); + LOG.warn("Missed segments: previous playlist\n{}", previousPlaylist); + LOG.warn("Missed segments: last playlist\n{}", lastPlaylist); + short missedSegments = (short) (playlist.seq - nextSegmentNumber); + MissedSegmentsStatistics.increase(model, missedSegments); } } - public static class SegmentPlaylist { - public String url; - public int seq = 0; - public float totalDuration = 0; - public float lastSegDuration = 0; - public float targetDuration = 0; - public List segments = new ArrayList<>(); - public boolean encrypted = false; - public String encryptionMethod = "AES-128"; - public String encryptionKeyUrl; - - public SegmentPlaylist(String url) { - this.url = url; + private void splitRecordingIfNecessary() { + if (splittingStrategy.splitNecessary(this)) { + internalStop(); } } @@ -261,4 +341,74 @@ public abstract class AbstractHlsDownload extends AbstractDownload { strategy.init(settings); return strategy; } + + protected void enqueueNewSegments(SegmentPlaylist playlist, int nextSegmentNumber) throws IOException { + int skip = nextSegmentNumber - playlist.seq; + for (String segment : playlist.segments) { + if (skip > 0) { + skip--; + } else { + URL segmentUrl = new URL(segment); + String prefix = nf.format(segmentCounter++); + File tmp = new File(segmentUrl.getFile()); + OutputStream targetStream = getSegmentOutputStream(prefix, tmp.getName()); + SegmentDownload segmentDownload = new SegmentDownload(model, playlist, segmentUrl, client, targetStream); + execute(segmentDownload); + } + } + } + + private void calculateRescheduleTime(SegmentPlaylist playlist, int lastSegmentNumber) { + // long waitForMillis = 0; + // if (lastSegmentNumber == playlist.seq) { + // // playlist didn't change -> wait for at least half the target duration + // waitForMillis = (long) playlist.avgSegDuration * 1000 / 2; + // LOG.trace("Playlist didn't change. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis); + // } else { + // // playlist did change -> wait for at least the target duration + // waitForMillis = (long) (playlist.avgSegDuration * 1000); + // LOG.trace("Playlist changed. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis); + // } + // rescheduleTime = beforeLastPlaylistRequest.plusMillis(waitForMillis); + rescheduleTime = beforeLastPlaylistRequest.plusMillis(1000); + } + + /** + * Causes the current thread to sleep for a short amount of time. + * This is used to slow down retries, if something is wrong with the playlist. + * E.g. HTTP 403 or 404 + */ + protected void waitSomeTime(long waitForMillis) { + try { + Thread.sleep(waitForMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if(running) { + LOG.error("Couldn't sleep. This might mess up the download!"); + } + } + } + + protected static void addHeaders(Builder builder, Map headers, Model model) { + headers.putIfAbsent(ACCEPT, "*/*"); + headers.putIfAbsent(ACCEPT_LANGUAGE, Locale.ENGLISH.getLanguage()); + headers.putIfAbsent(USER_AGENT, Config.getInstance().getSettings().httpUserAgent); + headers.putIfAbsent(CONNECTION, KEEP_ALIVE); + headers.computeIfAbsent(HttpConstants.ORIGIN, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null)); + headers.computeIfAbsent(REFERER, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null)); + + for (Entry header : headers.entrySet()) { + builder.header(header.getKey(), header.getValue()); + } + } + + @Override + public Model getModel() { + return model; + } + + @Override + public boolean isRunning() { + return running; + } } diff --git a/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload2.java b/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload2.java deleted file mode 100644 index e2938e59..00000000 --- a/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload2.java +++ /dev/null @@ -1,414 +0,0 @@ -package ctbrec.recorder.download.hls; - -import static ctbrec.io.HttpConstants.*; -import static ctbrec.recorder.download.StreamSource.*; -import static java.nio.charset.StandardCharsets.*; - -import java.io.ByteArrayInputStream; -import java.io.EOFException; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.SocketTimeoutException; -import java.net.URL; -import java.text.DecimalFormat; -import java.text.NumberFormat; -import java.time.Duration; -import java.time.Instant; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.stream.Collectors; - -import javax.xml.bind.JAXBException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.iheartradio.m3u8.Encoding; -import com.iheartradio.m3u8.Format; -import com.iheartradio.m3u8.ParseException; -import com.iheartradio.m3u8.ParsingMode; -import com.iheartradio.m3u8.PlaylistException; -import com.iheartradio.m3u8.PlaylistParser; -import com.iheartradio.m3u8.data.EncryptionData; -import com.iheartradio.m3u8.data.MediaPlaylist; -import com.iheartradio.m3u8.data.Playlist; -import com.iheartradio.m3u8.data.TrackData; - -import ctbrec.Config; -import ctbrec.Model; -import ctbrec.Model.State; -import ctbrec.Settings; -import ctbrec.UnknownModel; -import ctbrec.io.BandwidthMeter; -import ctbrec.io.HttpClient; -import ctbrec.io.HttpConstants; -import ctbrec.io.HttpException; -import ctbrec.io.MissedSegmentsStatistics; -import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException; -import ctbrec.recorder.download.AbstractDownload; -import ctbrec.recorder.download.HttpHeaderFactory; -import ctbrec.recorder.download.SplittingStrategy; -import ctbrec.recorder.download.StreamSource; -import ctbrec.sites.Site; -import okhttp3.Request; -import okhttp3.Request.Builder; -import okhttp3.Response; - -public abstract class AbstractHlsDownload2 extends AbstractDownload { - - private static final transient Logger LOG = LoggerFactory.getLogger(AbstractHlsDownload2.class); - private static final int TEN_SECONDS = 10_000; - - private transient NumberFormat nf = new DecimalFormat("000000"); - private transient int playlistEmptyCount = 0; - private transient int segmentCounter = 1; - protected transient Config config; - protected transient HttpClient client; - protected transient ExecutorService downloadExecutor; - protected transient volatile boolean running = true; - protected transient SplittingStrategy splittingStrategy; - protected transient int lastSegmentNumber = 0; - protected transient int nextSegmentNumber = 0; - protected transient String segmentPlaylistUrl; - - private transient String previousPlaylist; - private transient String lastPlaylist; - private transient Instant previousPlaylistRequest = Instant.EPOCH; - private transient Instant afterLastPlaylistRequest= Instant.EPOCH; - private transient Instant beforeLastPlaylistRequest= Instant.EPOCH; - private transient int consecutivePlaylistTimeouts = 0; - - protected Model model = new UnknownModel(); - - protected AbstractHlsDownload2(HttpClient client) { - this.client = client; - } - - protected abstract OutputStream getSegmentOutputStream(String prefix, String fileName) throws IOException; - protected void segmentDownloadFinished(SegmentDownload segmentDownload) {} - protected abstract void internalStop(); - - @Override - public void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException { - this.config = config; - this.model = model; - this.startTime = startTime; - this.downloadExecutor = executorService; - splittingStrategy = initSplittingStrategy(config.getSettings()); - } - - @Override - public AbstractHlsDownload2 call() throws Exception { - try { - if (segmentPlaylistUrl == null) { - segmentPlaylistUrl = getSegmentPlaylistUrl(model); - } - - previousPlaylistRequest = beforeLastPlaylistRequest; - beforeLastPlaylistRequest = Instant.now(); - SegmentPlaylist segmentPlaylist = getNextSegments(segmentPlaylistUrl); - afterLastPlaylistRequest = Instant.now(); - emptyPlaylistCheck(segmentPlaylist); - handleMissedSegments(segmentPlaylist, nextSegmentNumber); - enqueueNewSegments(segmentPlaylist, nextSegmentNumber); - splitRecordingIfNecessary(); - calculateRescheduleTime(segmentPlaylist, lastSegmentNumber); - - // this if check makes sure, that we don't decrease nextSegment. for some reason - // streamate playlists sometimes jump back. e.g. max sequence = 79 -> 80 -> 79 - lastSegmentNumber = segmentPlaylist.seq; - if (lastSegmentNumber + segmentPlaylist.segments.size() > nextSegmentNumber) { - nextSegmentNumber = lastSegmentNumber + segmentPlaylist.segments.size(); - } - } catch (ParseException e) { - LOG.error("Couldn't parse HLS playlist for model {}\n{}", model, e.getInput(), e); - running = false; - } catch (PlaylistException e) { - LOG.error("Couldn't parse HLS playlist for model {}", model, e); - running = false; - } catch (PlaylistTimeoutException e) { - rescheduleTime = beforeLastPlaylistRequest; // try again as fast as possible - } catch (EOFException e) { - // end of playlist reached - LOG.debug("Reached end of playlist for model {}", model); - running = false; - } catch (HttpException e) { - handleHttpException(e); - } catch (Exception e) { - LOG.error("Couldn't download segment for model {}", model, e); - running = false; - } - return this; - } - - protected void execute(SegmentDownload segmentDownload) { - CompletableFuture.supplyAsync(() -> downloadExecutor.submit(segmentDownload), downloadExecutor) - .whenComplete((result, executor) -> { - try { - segmentDownloadFinished(result.get()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - e.printStackTrace(); - } catch (ExecutionException e) { - e.printStackTrace(); - } - }); - } - - private void handleHttpException(HttpException e) throws IOException { - if (e.getResponseCode() == 404) { - checkIfModelIsStillOnline("Playlist not found (404). Model {} probably went offline. Model state: {}"); - } else if (e.getResponseCode() == 403) { - checkIfModelIsStillOnline("Playlist access forbidden (403). Model {} probably went private or offline. Model state: {}"); - } else { - LOG.info("Playlist couldn't not be downloaded for model {}. Stopping recording", model, e); - running = false; - } - } - - protected void checkIfModelIsStillOnline(String errorMsg) throws IOException { - ctbrec.Model.State modelState; - try { - modelState = model.getOnlineState(false); - if (modelState != State.ONLINE) { - running = false; - } - } catch (ExecutionException e1) { - modelState = ctbrec.Model.State.UNKNOWN; - } - LOG.info(errorMsg, model, modelState); - waitSomeTime(TEN_SECONDS); - } - - protected String getSegmentPlaylistUrl(Model model) throws IOException, ExecutionException, ParseException, PlaylistException, JAXBException { - LOG.debug("{} stream idx: {}", model.getName(), model.getStreamUrlIndex()); - List streamSources = model.getStreamSources(); - Collections.sort(streamSources); - for (StreamSource streamSource : streamSources) { - LOG.debug("{} src {}", model.getName(), streamSource); - } - String url = null; - if (model.getStreamUrlIndex() >= 0 && model.getStreamUrlIndex() < streamSources.size()) { - // TODO don't use the index, but the bandwidth. if the bandwidth does not match, take the closest one - LOG.debug("{} selected {}", model.getName(), streamSources.get(model.getStreamUrlIndex())); - url = streamSources.get(model.getStreamUrlIndex()).getMediaPlaylistUrl(); - } else { - // filter out stream resolutions, which are out of range of the configured min and max - int minRes = Config.getInstance().getSettings().minimumResolution; - int maxRes = Config.getInstance().getSettings().maximumResolution; - List filteredStreamSources = streamSources.stream() - .filter(src -> src.height == 0 || src.height == UNKNOWN || minRes <= src.height) - .filter(src -> src.height == 0 || src.height == UNKNOWN || maxRes >= src.height) - .collect(Collectors.toList()); - - if (filteredStreamSources.isEmpty()) { - throw new ExecutionException(new RuntimeException("No stream left in playlist")); - } else { - LOG.debug("{} selected {}", model.getName(), filteredStreamSources.get(filteredStreamSources.size() - 1)); - url = filteredStreamSources.get(filteredStreamSources.size() - 1).getMediaPlaylistUrl(); - } - } - LOG.debug("Segment playlist url {}", url); - return url; - } - - protected SegmentPlaylist getNextSegments(String segmentPlaylistUrl) throws IOException, ParseException, PlaylistException { - URL segmentsUrl = new URL(segmentPlaylistUrl); - Builder builder = new Request.Builder() - .url(segmentsUrl); - addHeaders(builder, Optional.ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentPlaylistHeaders).orElse(new HashMap<>()), model); - Request request = builder.build(); - - try (Response response = client.execute(request, 2000)) { - if (response.isSuccessful()) { - consecutivePlaylistTimeouts = 0; - String body = response.body().string(); - previousPlaylist = lastPlaylist; - lastPlaylist = beforeLastPlaylistRequest.toString()+"\n"+body; - if (!body.contains("#EXTINF")) { - // no segments, empty playlist - return new SegmentPlaylist(segmentPlaylistUrl); - } - - byte[] bytes = body.getBytes(UTF_8); - BandwidthMeter.add(bytes.length); - InputStream inputStream = new ByteArrayInputStream(bytes); - return parsePlaylist(segmentPlaylistUrl, inputStream); - } else { - throw new HttpException(response.code(), response.message()); - } - } catch (SocketTimeoutException e) { - LOG.debug("Playlist request timed out for model {} {} time{}", model, ++consecutivePlaylistTimeouts, (consecutivePlaylistTimeouts > 1) ? 's' : ""); - // times out, return an empty playlist, so that the process can continue without wasting much more time - throw new PlaylistTimeoutException(e); - } - } - - private SegmentPlaylist parsePlaylist(String segmentPlaylistUrl, InputStream inputStream) throws IOException, ParseException, PlaylistException { - PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT); - Playlist playlist = parser.parse(); - if (playlist.hasMediaPlaylist()) { - MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist(); - SegmentPlaylist lsp = new SegmentPlaylist(segmentPlaylistUrl); - lsp.seq = mediaPlaylist.getMediaSequenceNumber(); - lsp.targetDuration = mediaPlaylist.getTargetDuration(); - - List tracks = mediaPlaylist.getTracks(); - for (TrackData trackData : tracks) { - String uri = trackData.getUri(); - if (!uri.startsWith("http")) { - URL context = new URL(segmentPlaylistUrl); - uri = new URL(context, uri).toExternalForm(); - } - lsp.totalDuration += trackData.getTrackInfo().duration; - lsp.segments.add(uri); - if (trackData.hasEncryptionData()) { - lsp.encrypted = true; - EncryptionData data = trackData.getEncryptionData(); - lsp.encryptionKeyUrl = data.getUri(); - lsp.encryptionMethod = data.getMethod().getValue(); - } - } - lsp.avgSegDuration = lsp.totalDuration / tracks.size(); - return lsp; - } - throw new InvalidPlaylistException("Playlist has no media playlist"); - } - - protected void emptyPlaylistCheck(SegmentPlaylist playlist) { - if(playlist.segments.isEmpty()) { - playlistEmptyCount++; - try { - Thread.sleep(6000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } else { - playlistEmptyCount = 0; - } - if(playlistEmptyCount == 10) { - LOG.info("Last 10 playlists were empty for {}. Stopping recording!", getModel()); - internalStop(); - } - } - - private void handleMissedSegments(SegmentPlaylist playlist, int nextSegmentNumber) { - if (nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) { - LOG.warn("Missed segments: {} < {} in download for {}", nextSegmentNumber, playlist.seq, model); - LOG.warn("Missed segments: Last 2 [{}] [{}], schedule was {}, request took {} ms", previousPlaylistRequest, beforeLastPlaylistRequest, rescheduleTime, Duration.between(beforeLastPlaylistRequest, afterLastPlaylistRequest)); - LOG.warn("Missed segments: previous playlist\n{}", previousPlaylist); - LOG.warn("Missed segments: last playlist\n{}", lastPlaylist); - short missedSegments = (short) (playlist.seq - nextSegmentNumber); - MissedSegmentsStatistics.increase(model, missedSegments); - } - } - - private void splitRecordingIfNecessary() { - if (splittingStrategy.splitNecessary(this)) { - internalStop(); - } - } - - protected SplittingStrategy initSplittingStrategy(Settings settings) { - SplittingStrategy strategy; - switch (settings.splitStrategy) { - case TIME: - strategy = new TimeSplittingStrategy(); - break; - case SIZE: - strategy = new SizeSplittingStrategy(); - break; - case TIME_OR_SIZE: - SplittingStrategy timeSplittingStrategy = new TimeSplittingStrategy(); - SplittingStrategy sizeSplittingStrategy = new SizeSplittingStrategy(); - strategy = new CombinedSplittingStrategy(timeSplittingStrategy, sizeSplittingStrategy); - break; - case DONT: - default: - strategy = new NoopSplittingStrategy(); - break; - } - strategy.init(settings); - return strategy; - } - - protected void enqueueNewSegments(SegmentPlaylist playlist, int nextSegmentNumber) throws IOException { - int skip = nextSegmentNumber - playlist.seq; - for (String segment : playlist.segments) { - if (skip > 0) { - skip--; - } else { - URL segmentUrl = new URL(segment); - String prefix = nf.format(segmentCounter++); - File tmp = new File(segmentUrl.getFile()); - OutputStream targetStream = getSegmentOutputStream(prefix, tmp.getName()); - SegmentDownload segmentDownload = new SegmentDownload(model, playlist, segmentUrl, client, targetStream); - execute(segmentDownload); - } - } - } - - private void calculateRescheduleTime(SegmentPlaylist playlist, int lastSegmentNumber) { - // long waitForMillis = 0; - // if (lastSegmentNumber == playlist.seq) { - // // playlist didn't change -> wait for at least half the target duration - // waitForMillis = (long) playlist.avgSegDuration * 1000 / 2; - // LOG.trace("Playlist didn't change. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis); - // } else { - // // playlist did change -> wait for at least the target duration - // waitForMillis = (long) (playlist.avgSegDuration * 1000); - // LOG.trace("Playlist changed. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis); - // } - // rescheduleTime = beforeLastPlaylistRequest.plusMillis(waitForMillis); - rescheduleTime = beforeLastPlaylistRequest.plusMillis(1000); - } - - /** - * Causes the current thread to sleep for a short amount of time. - * This is used to slow down retries, if something is wrong with the playlist. - * E.g. HTTP 403 or 404 - */ - protected void waitSomeTime(long waitForMillis) { - try { - Thread.sleep(waitForMillis); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - if(running) { - LOG.error("Couldn't sleep. This might mess up the download!"); - } - } - } - - protected static void addHeaders(Builder builder, Map headers, Model model) { - headers.putIfAbsent(ACCEPT, "*/*"); - headers.putIfAbsent(ACCEPT_LANGUAGE, Locale.ENGLISH.getLanguage()); - headers.putIfAbsent(USER_AGENT, Config.getInstance().getSettings().httpUserAgent); - headers.putIfAbsent(CONNECTION, KEEP_ALIVE); - headers.computeIfAbsent(HttpConstants.ORIGIN, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null)); - headers.computeIfAbsent(REFERER, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null)); - - for (Entry header : headers.entrySet()) { - builder.header(header.getKey(), header.getValue()); - } - } - - @Override - public Model getModel() { - return model; - } - - @Override - public boolean isRunning() { - return running; - } -} diff --git a/common/src/main/java/ctbrec/recorder/download/hls/FFmpegDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/FFmpegDownload.java index 750bb775..cdad907d 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/FFmpegDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/FFmpegDownload.java @@ -32,7 +32,7 @@ import ctbrec.recorder.download.ProcessExitedUncleanException; * Does the whole HLS download with FFmpeg. Not used at the moment, because FFMpeg can't * handle the HLS encryption of Flirt4Free correctly */ -public class FFmpegDownload extends AbstractHlsDownload2 { +public class FFmpegDownload extends AbstractHlsDownload { private static final transient Logger LOG = LoggerFactory.getLogger(FFmpegDownload.class); private transient Config config; diff --git a/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java index d83e53c2..365e738e 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java @@ -29,7 +29,7 @@ import ctbrec.io.HttpClient; import ctbrec.io.IoUtils; import ctbrec.recorder.PlaylistGenerator; -public class HlsDownload extends AbstractHlsDownload2 { +public class HlsDownload extends AbstractHlsDownload { private static final Logger LOG = LoggerFactory.getLogger(HlsDownload.class); diff --git a/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java index ee05d049..109d82e6 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java @@ -31,7 +31,7 @@ import ctbrec.recorder.download.ProcessExitedUncleanException; import okhttp3.Request; import okhttp3.Response; -public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 { +public class MergedFfmpegHlsDownload extends AbstractHlsDownload { private static final Logger LOG = LoggerFactory.getLogger(MergedFfmpegHlsDownload.class); @@ -60,7 +60,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 { } @Override - public AbstractHlsDownload2 call() throws Exception { + public AbstractHlsDownload call() throws Exception { super.call(); try { if (!ffmpegProcess.isAlive()) { diff --git a/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java index c10fc7d1..b8e1eb64 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java @@ -1,6 +1,6 @@ package ctbrec.recorder.download.hls; -import static ctbrec.recorder.download.hls.AbstractHlsDownload2.*; +import static ctbrec.recorder.download.hls.AbstractHlsDownload.*; import java.io.FileNotFoundException; import java.io.IOException;