Remove old AbstractHlsDownload class
This commit is contained in:
parent
f730943245
commit
a1a5fbc3a6
|
@ -1,15 +1,21 @@
|
|||
package ctbrec.recorder.download.hls;
|
||||
|
||||
import static ctbrec.io.HttpConstants.*;
|
||||
import static ctbrec.io.HttpConstants.ORIGIN;
|
||||
import static ctbrec.recorder.download.StreamSource.*;
|
||||
import static java.nio.charset.StandardCharsets.*;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.text.DecimalFormat;
|
||||
import java.text.NumberFormat;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -17,12 +23,9 @@ import java.util.Locale;
|
|||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.xml.bind.JAXBException;
|
||||
|
@ -43,12 +46,14 @@ import com.iheartradio.m3u8.data.TrackData;
|
|||
|
||||
import ctbrec.Config;
|
||||
import ctbrec.Model;
|
||||
import ctbrec.Recording.State;
|
||||
import ctbrec.Model.State;
|
||||
import ctbrec.Settings;
|
||||
import ctbrec.UnknownModel;
|
||||
import ctbrec.io.BandwidthMeter;
|
||||
import ctbrec.io.HttpClient;
|
||||
import ctbrec.io.HttpConstants;
|
||||
import ctbrec.io.HttpException;
|
||||
import ctbrec.io.MissedSegmentsStatistics;
|
||||
import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException;
|
||||
import ctbrec.recorder.download.AbstractDownload;
|
||||
import ctbrec.recorder.download.HttpHeaderFactory;
|
||||
|
@ -61,94 +66,128 @@ import okhttp3.Response;
|
|||
|
||||
public abstract class AbstractHlsDownload extends AbstractDownload {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AbstractHlsDownload.class);
|
||||
private static int threadCounter = 0;
|
||||
private static final transient Logger LOG = LoggerFactory.getLogger(AbstractHlsDownload.class);
|
||||
private static final int TEN_SECONDS = 10_000;
|
||||
|
||||
private transient NumberFormat nf = new DecimalFormat("000000");
|
||||
private transient int playlistEmptyCount = 0;
|
||||
private transient int segmentCounter = 1;
|
||||
protected transient Config config;
|
||||
protected transient HttpClient client;
|
||||
protected volatile boolean running = false;
|
||||
protected Model model = new UnknownModel();
|
||||
protected transient LinkedBlockingQueue<Runnable> downloadQueue = new LinkedBlockingQueue<>(50);
|
||||
protected transient ExecutorService downloadThreadPool = new ThreadPoolExecutor(0, 5, 20, TimeUnit.SECONDS, downloadQueue, createThreadFactory());
|
||||
protected transient ExecutorService downloadExecutor;
|
||||
protected transient volatile boolean running = true;
|
||||
protected transient SplittingStrategy splittingStrategy;
|
||||
protected State state = State.UNKNOWN;
|
||||
private int playlistEmptyCount = 0;
|
||||
protected transient int lastSegmentNumber = 0;
|
||||
protected transient int nextSegmentNumber = 0;
|
||||
protected transient String segmentPlaylistUrl;
|
||||
|
||||
private transient String previousPlaylist;
|
||||
private transient String lastPlaylist;
|
||||
private transient Instant previousPlaylistRequest = Instant.EPOCH;
|
||||
private transient Instant afterLastPlaylistRequest= Instant.EPOCH;
|
||||
private transient Instant beforeLastPlaylistRequest= Instant.EPOCH;
|
||||
private transient int consecutivePlaylistTimeouts = 0;
|
||||
|
||||
protected Model model = new UnknownModel();
|
||||
|
||||
protected AbstractHlsDownload(HttpClient client) {
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
private ThreadFactory createThreadFactory() {
|
||||
return r -> {
|
||||
Thread t = new Thread(r);
|
||||
t.setDaemon(true);
|
||||
t.setName("SegmentDownloadThread-" + threadCounter++);
|
||||
return t;
|
||||
};
|
||||
protected abstract OutputStream getSegmentOutputStream(String prefix, String fileName) throws IOException;
|
||||
protected void segmentDownloadFinished(SegmentDownload segmentDownload) {}
|
||||
protected abstract void internalStop();
|
||||
|
||||
@Override
|
||||
public void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException {
|
||||
this.config = config;
|
||||
this.model = model;
|
||||
this.startTime = startTime;
|
||||
this.downloadExecutor = executorService;
|
||||
splittingStrategy = initSplittingStrategy(config.getSettings());
|
||||
}
|
||||
|
||||
protected SegmentPlaylist getNextSegments(String segmentsURL) throws IOException, ParseException, PlaylistException {
|
||||
URL segmentsUrl = new URL(segmentsURL);
|
||||
Builder builder = new Request.Builder()
|
||||
.url(segmentsUrl);
|
||||
addHeaders(builder, Optional.ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentPlaylistHeaders).orElse(new HashMap<>()));
|
||||
Request request = builder.build();
|
||||
|
||||
try (Response response = client.execute(request)) {
|
||||
if (response.isSuccessful()) {
|
||||
String body = response.body().string();
|
||||
if (!body.contains("#EXTINF")) {
|
||||
// no segments, empty playlist
|
||||
return new SegmentPlaylist(segmentsURL);
|
||||
}
|
||||
|
||||
byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
|
||||
BandwidthMeter.add(bytes.length);
|
||||
InputStream inputStream = new ByteArrayInputStream(bytes);
|
||||
PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT);
|
||||
Playlist playlist = parser.parse();
|
||||
if (playlist.hasMediaPlaylist()) {
|
||||
MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist();
|
||||
SegmentPlaylist lsp = new SegmentPlaylist(segmentsURL);
|
||||
lsp.seq = mediaPlaylist.getMediaSequenceNumber();
|
||||
lsp.targetDuration = mediaPlaylist.getTargetDuration();
|
||||
List<TrackData> tracks = mediaPlaylist.getTracks();
|
||||
for (TrackData trackData : tracks) {
|
||||
String uri = trackData.getUri();
|
||||
if (!uri.startsWith("http")) {
|
||||
URL context = new URL(segmentsURL);
|
||||
uri = new URL(context, uri).toExternalForm();
|
||||
}
|
||||
lsp.totalDuration += trackData.getTrackInfo().duration;
|
||||
lsp.lastSegDuration = trackData.getTrackInfo().duration;
|
||||
lsp.segments.add(uri);
|
||||
if (trackData.hasEncryptionData()) {
|
||||
lsp.encrypted = true;
|
||||
EncryptionData data = trackData.getEncryptionData();
|
||||
lsp.encryptionKeyUrl = data.getUri();
|
||||
lsp.encryptionMethod = data.getMethod().getValue();
|
||||
}
|
||||
}
|
||||
return lsp;
|
||||
}
|
||||
throw new InvalidPlaylistException("Playlist has no media playlist");
|
||||
} else {
|
||||
throw new HttpException(response.code(), response.message());
|
||||
@Override
|
||||
public AbstractHlsDownload call() throws Exception {
|
||||
try {
|
||||
if (segmentPlaylistUrl == null) {
|
||||
segmentPlaylistUrl = getSegmentPlaylistUrl(model);
|
||||
}
|
||||
|
||||
previousPlaylistRequest = beforeLastPlaylistRequest;
|
||||
beforeLastPlaylistRequest = Instant.now();
|
||||
SegmentPlaylist segmentPlaylist = getNextSegments(segmentPlaylistUrl);
|
||||
afterLastPlaylistRequest = Instant.now();
|
||||
emptyPlaylistCheck(segmentPlaylist);
|
||||
handleMissedSegments(segmentPlaylist, nextSegmentNumber);
|
||||
enqueueNewSegments(segmentPlaylist, nextSegmentNumber);
|
||||
splitRecordingIfNecessary();
|
||||
calculateRescheduleTime(segmentPlaylist, lastSegmentNumber);
|
||||
|
||||
// this if check makes sure, that we don't decrease nextSegment. for some reason
|
||||
// streamate playlists sometimes jump back. e.g. max sequence = 79 -> 80 -> 79
|
||||
lastSegmentNumber = segmentPlaylist.seq;
|
||||
if (lastSegmentNumber + segmentPlaylist.segments.size() > nextSegmentNumber) {
|
||||
nextSegmentNumber = lastSegmentNumber + segmentPlaylist.segments.size();
|
||||
}
|
||||
} catch (ParseException e) {
|
||||
LOG.error("Couldn't parse HLS playlist for model {}\n{}", model, e.getInput(), e);
|
||||
running = false;
|
||||
} catch (PlaylistException e) {
|
||||
LOG.error("Couldn't parse HLS playlist for model {}", model, e);
|
||||
running = false;
|
||||
} catch (PlaylistTimeoutException e) {
|
||||
rescheduleTime = beforeLastPlaylistRequest; // try again as fast as possible
|
||||
} catch (EOFException e) {
|
||||
// end of playlist reached
|
||||
LOG.debug("Reached end of playlist for model {}", model);
|
||||
running = false;
|
||||
} catch (HttpException e) {
|
||||
handleHttpException(e);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Couldn't download segment for model {}", model, e);
|
||||
running = false;
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
protected void execute(SegmentDownload segmentDownload) {
|
||||
CompletableFuture.supplyAsync(() -> downloadExecutor.submit(segmentDownload), downloadExecutor)
|
||||
.whenComplete((result, executor) -> {
|
||||
try {
|
||||
segmentDownloadFinished(result.get());
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
e.printStackTrace();
|
||||
} catch (ExecutionException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void handleHttpException(HttpException e) throws IOException {
|
||||
if (e.getResponseCode() == 404) {
|
||||
checkIfModelIsStillOnline("Playlist not found (404). Model {} probably went offline. Model state: {}");
|
||||
} else if (e.getResponseCode() == 403) {
|
||||
checkIfModelIsStillOnline("Playlist access forbidden (403). Model {} probably went private or offline. Model state: {}");
|
||||
} else {
|
||||
LOG.info("Playlist couldn't not be downloaded for model {}. Stopping recording", model, e);
|
||||
running = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected void addHeaders(Builder builder, Map<String, String> headers) {
|
||||
headers.putIfAbsent(ACCEPT, "*/*");
|
||||
headers.putIfAbsent(ACCEPT_LANGUAGE, Locale.ENGLISH.getLanguage());
|
||||
headers.putIfAbsent(USER_AGENT, Config.getInstance().getSettings().httpUserAgent);
|
||||
headers.putIfAbsent(CONNECTION, KEEP_ALIVE);
|
||||
headers.computeIfAbsent(ORIGIN, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null));
|
||||
headers.computeIfAbsent(REFERER, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null));
|
||||
|
||||
for (Entry<String, String> header : headers.entrySet()) {
|
||||
builder.header(header.getKey(), header.getValue());
|
||||
protected void checkIfModelIsStillOnline(String errorMsg) throws IOException {
|
||||
ctbrec.Model.State modelState;
|
||||
try {
|
||||
modelState = model.getOnlineState(false);
|
||||
if (modelState != State.ONLINE) {
|
||||
running = false;
|
||||
}
|
||||
} catch (ExecutionException e1) {
|
||||
modelState = ctbrec.Model.State.UNKNOWN;
|
||||
}
|
||||
LOG.info(errorMsg, model, modelState);
|
||||
waitSomeTime(TEN_SECONDS);
|
||||
}
|
||||
|
||||
protected String getSegmentPlaylistUrl(Model model) throws IOException, ExecutionException, ParseException, PlaylistException, JAXBException {
|
||||
|
@ -183,6 +222,69 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
|
|||
return url;
|
||||
}
|
||||
|
||||
protected SegmentPlaylist getNextSegments(String segmentPlaylistUrl) throws IOException, ParseException, PlaylistException {
|
||||
URL segmentsUrl = new URL(segmentPlaylistUrl);
|
||||
Builder builder = new Request.Builder()
|
||||
.url(segmentsUrl);
|
||||
addHeaders(builder, Optional.ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentPlaylistHeaders).orElse(new HashMap<>()), model);
|
||||
Request request = builder.build();
|
||||
|
||||
try (Response response = client.execute(request, 2000)) {
|
||||
if (response.isSuccessful()) {
|
||||
consecutivePlaylistTimeouts = 0;
|
||||
String body = response.body().string();
|
||||
previousPlaylist = lastPlaylist;
|
||||
lastPlaylist = beforeLastPlaylistRequest.toString()+"\n"+body;
|
||||
if (!body.contains("#EXTINF")) {
|
||||
// no segments, empty playlist
|
||||
return new SegmentPlaylist(segmentPlaylistUrl);
|
||||
}
|
||||
|
||||
byte[] bytes = body.getBytes(UTF_8);
|
||||
BandwidthMeter.add(bytes.length);
|
||||
InputStream inputStream = new ByteArrayInputStream(bytes);
|
||||
return parsePlaylist(segmentPlaylistUrl, inputStream);
|
||||
} else {
|
||||
throw new HttpException(response.code(), response.message());
|
||||
}
|
||||
} catch (SocketTimeoutException e) {
|
||||
LOG.debug("Playlist request timed out for model {} {} time{}", model, ++consecutivePlaylistTimeouts, (consecutivePlaylistTimeouts > 1) ? 's' : "");
|
||||
// times out, return an empty playlist, so that the process can continue without wasting much more time
|
||||
throw new PlaylistTimeoutException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private SegmentPlaylist parsePlaylist(String segmentPlaylistUrl, InputStream inputStream) throws IOException, ParseException, PlaylistException {
|
||||
PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT);
|
||||
Playlist playlist = parser.parse();
|
||||
if (playlist.hasMediaPlaylist()) {
|
||||
MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist();
|
||||
SegmentPlaylist lsp = new SegmentPlaylist(segmentPlaylistUrl);
|
||||
lsp.seq = mediaPlaylist.getMediaSequenceNumber();
|
||||
lsp.targetDuration = mediaPlaylist.getTargetDuration();
|
||||
|
||||
List<TrackData> tracks = mediaPlaylist.getTracks();
|
||||
for (TrackData trackData : tracks) {
|
||||
String uri = trackData.getUri();
|
||||
if (!uri.startsWith("http")) {
|
||||
URL context = new URL(segmentPlaylistUrl);
|
||||
uri = new URL(context, uri).toExternalForm();
|
||||
}
|
||||
lsp.totalDuration += trackData.getTrackInfo().duration;
|
||||
lsp.segments.add(uri);
|
||||
if (trackData.hasEncryptionData()) {
|
||||
lsp.encrypted = true;
|
||||
EncryptionData data = trackData.getEncryptionData();
|
||||
lsp.encryptionKeyUrl = data.getUri();
|
||||
lsp.encryptionMethod = data.getMethod().getValue();
|
||||
}
|
||||
}
|
||||
lsp.avgSegDuration = lsp.totalDuration / tracks.size();
|
||||
return lsp;
|
||||
}
|
||||
throw new InvalidPlaylistException("Playlist has no media playlist");
|
||||
}
|
||||
|
||||
protected void emptyPlaylistCheck(SegmentPlaylist playlist) {
|
||||
if(playlist.segments.isEmpty()) {
|
||||
playlistEmptyCount++;
|
||||
|
@ -200,42 +302,20 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
|
|||
}
|
||||
}
|
||||
|
||||
abstract void internalStop();
|
||||
|
||||
@Override
|
||||
public Model getModel() {
|
||||
return model;
|
||||
}
|
||||
|
||||
/**
|
||||
* Causes the current thread to sleep for a short amount of time.
|
||||
* This is used to slow down retries, if something is wrong with the playlist.
|
||||
* E.g. HTTP 403 or 404
|
||||
*/
|
||||
protected void waitSomeTime(long waitForMillis) {
|
||||
try {
|
||||
Thread.sleep(waitForMillis);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
if(running) {
|
||||
LOG.error("Couldn't sleep. This might mess up the download!");
|
||||
}
|
||||
private void handleMissedSegments(SegmentPlaylist playlist, int nextSegmentNumber) {
|
||||
if (nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) {
|
||||
LOG.warn("Missed segments: {} < {} in download for {}", nextSegmentNumber, playlist.seq, model);
|
||||
LOG.warn("Missed segments: Last 2 [{}] [{}], schedule was {}, request took {} ms", previousPlaylistRequest, beforeLastPlaylistRequest, rescheduleTime, Duration.between(beforeLastPlaylistRequest, afterLastPlaylistRequest));
|
||||
LOG.warn("Missed segments: previous playlist\n{}", previousPlaylist);
|
||||
LOG.warn("Missed segments: last playlist\n{}", lastPlaylist);
|
||||
short missedSegments = (short) (playlist.seq - nextSegmentNumber);
|
||||
MissedSegmentsStatistics.increase(model, missedSegments);
|
||||
}
|
||||
}
|
||||
|
||||
public static class SegmentPlaylist {
|
||||
public String url;
|
||||
public int seq = 0;
|
||||
public float totalDuration = 0;
|
||||
public float lastSegDuration = 0;
|
||||
public float targetDuration = 0;
|
||||
public List<String> segments = new ArrayList<>();
|
||||
public boolean encrypted = false;
|
||||
public String encryptionMethod = "AES-128";
|
||||
public String encryptionKeyUrl;
|
||||
|
||||
public SegmentPlaylist(String url) {
|
||||
this.url = url;
|
||||
private void splitRecordingIfNecessary() {
|
||||
if (splittingStrategy.splitNecessary(this)) {
|
||||
internalStop();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -261,4 +341,74 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
|
|||
strategy.init(settings);
|
||||
return strategy;
|
||||
}
|
||||
|
||||
protected void enqueueNewSegments(SegmentPlaylist playlist, int nextSegmentNumber) throws IOException {
|
||||
int skip = nextSegmentNumber - playlist.seq;
|
||||
for (String segment : playlist.segments) {
|
||||
if (skip > 0) {
|
||||
skip--;
|
||||
} else {
|
||||
URL segmentUrl = new URL(segment);
|
||||
String prefix = nf.format(segmentCounter++);
|
||||
File tmp = new File(segmentUrl.getFile());
|
||||
OutputStream targetStream = getSegmentOutputStream(prefix, tmp.getName());
|
||||
SegmentDownload segmentDownload = new SegmentDownload(model, playlist, segmentUrl, client, targetStream);
|
||||
execute(segmentDownload);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void calculateRescheduleTime(SegmentPlaylist playlist, int lastSegmentNumber) {
|
||||
// long waitForMillis = 0;
|
||||
// if (lastSegmentNumber == playlist.seq) {
|
||||
// // playlist didn't change -> wait for at least half the target duration
|
||||
// waitForMillis = (long) playlist.avgSegDuration * 1000 / 2;
|
||||
// LOG.trace("Playlist didn't change. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis);
|
||||
// } else {
|
||||
// // playlist did change -> wait for at least the target duration
|
||||
// waitForMillis = (long) (playlist.avgSegDuration * 1000);
|
||||
// LOG.trace("Playlist changed. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis);
|
||||
// }
|
||||
// rescheduleTime = beforeLastPlaylistRequest.plusMillis(waitForMillis);
|
||||
rescheduleTime = beforeLastPlaylistRequest.plusMillis(1000);
|
||||
}
|
||||
|
||||
/**
|
||||
* Causes the current thread to sleep for a short amount of time.
|
||||
* This is used to slow down retries, if something is wrong with the playlist.
|
||||
* E.g. HTTP 403 or 404
|
||||
*/
|
||||
protected void waitSomeTime(long waitForMillis) {
|
||||
try {
|
||||
Thread.sleep(waitForMillis);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
if(running) {
|
||||
LOG.error("Couldn't sleep. This might mess up the download!");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected static void addHeaders(Builder builder, Map<String, String> headers, Model model) {
|
||||
headers.putIfAbsent(ACCEPT, "*/*");
|
||||
headers.putIfAbsent(ACCEPT_LANGUAGE, Locale.ENGLISH.getLanguage());
|
||||
headers.putIfAbsent(USER_AGENT, Config.getInstance().getSettings().httpUserAgent);
|
||||
headers.putIfAbsent(CONNECTION, KEEP_ALIVE);
|
||||
headers.computeIfAbsent(HttpConstants.ORIGIN, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null));
|
||||
headers.computeIfAbsent(REFERER, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null));
|
||||
|
||||
for (Entry<String, String> header : headers.entrySet()) {
|
||||
builder.header(header.getKey(), header.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Model getModel() {
|
||||
return model;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return running;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,414 +0,0 @@
|
|||
package ctbrec.recorder.download.hls;
|
||||
|
||||
import static ctbrec.io.HttpConstants.*;
|
||||
import static ctbrec.recorder.download.StreamSource.*;
|
||||
import static java.nio.charset.StandardCharsets.*;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URL;
|
||||
import java.text.DecimalFormat;
|
||||
import java.text.NumberFormat;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.xml.bind.JAXBException;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.iheartradio.m3u8.Encoding;
|
||||
import com.iheartradio.m3u8.Format;
|
||||
import com.iheartradio.m3u8.ParseException;
|
||||
import com.iheartradio.m3u8.ParsingMode;
|
||||
import com.iheartradio.m3u8.PlaylistException;
|
||||
import com.iheartradio.m3u8.PlaylistParser;
|
||||
import com.iheartradio.m3u8.data.EncryptionData;
|
||||
import com.iheartradio.m3u8.data.MediaPlaylist;
|
||||
import com.iheartradio.m3u8.data.Playlist;
|
||||
import com.iheartradio.m3u8.data.TrackData;
|
||||
|
||||
import ctbrec.Config;
|
||||
import ctbrec.Model;
|
||||
import ctbrec.Model.State;
|
||||
import ctbrec.Settings;
|
||||
import ctbrec.UnknownModel;
|
||||
import ctbrec.io.BandwidthMeter;
|
||||
import ctbrec.io.HttpClient;
|
||||
import ctbrec.io.HttpConstants;
|
||||
import ctbrec.io.HttpException;
|
||||
import ctbrec.io.MissedSegmentsStatistics;
|
||||
import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException;
|
||||
import ctbrec.recorder.download.AbstractDownload;
|
||||
import ctbrec.recorder.download.HttpHeaderFactory;
|
||||
import ctbrec.recorder.download.SplittingStrategy;
|
||||
import ctbrec.recorder.download.StreamSource;
|
||||
import ctbrec.sites.Site;
|
||||
import okhttp3.Request;
|
||||
import okhttp3.Request.Builder;
|
||||
import okhttp3.Response;
|
||||
|
||||
public abstract class AbstractHlsDownload2 extends AbstractDownload {
|
||||
|
||||
private static final transient Logger LOG = LoggerFactory.getLogger(AbstractHlsDownload2.class);
|
||||
private static final int TEN_SECONDS = 10_000;
|
||||
|
||||
private transient NumberFormat nf = new DecimalFormat("000000");
|
||||
private transient int playlistEmptyCount = 0;
|
||||
private transient int segmentCounter = 1;
|
||||
protected transient Config config;
|
||||
protected transient HttpClient client;
|
||||
protected transient ExecutorService downloadExecutor;
|
||||
protected transient volatile boolean running = true;
|
||||
protected transient SplittingStrategy splittingStrategy;
|
||||
protected transient int lastSegmentNumber = 0;
|
||||
protected transient int nextSegmentNumber = 0;
|
||||
protected transient String segmentPlaylistUrl;
|
||||
|
||||
private transient String previousPlaylist;
|
||||
private transient String lastPlaylist;
|
||||
private transient Instant previousPlaylistRequest = Instant.EPOCH;
|
||||
private transient Instant afterLastPlaylistRequest= Instant.EPOCH;
|
||||
private transient Instant beforeLastPlaylistRequest= Instant.EPOCH;
|
||||
private transient int consecutivePlaylistTimeouts = 0;
|
||||
|
||||
protected Model model = new UnknownModel();
|
||||
|
||||
protected AbstractHlsDownload2(HttpClient client) {
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
protected abstract OutputStream getSegmentOutputStream(String prefix, String fileName) throws IOException;
|
||||
protected void segmentDownloadFinished(SegmentDownload segmentDownload) {}
|
||||
protected abstract void internalStop();
|
||||
|
||||
@Override
|
||||
public void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException {
|
||||
this.config = config;
|
||||
this.model = model;
|
||||
this.startTime = startTime;
|
||||
this.downloadExecutor = executorService;
|
||||
splittingStrategy = initSplittingStrategy(config.getSettings());
|
||||
}
|
||||
|
||||
@Override
|
||||
public AbstractHlsDownload2 call() throws Exception {
|
||||
try {
|
||||
if (segmentPlaylistUrl == null) {
|
||||
segmentPlaylistUrl = getSegmentPlaylistUrl(model);
|
||||
}
|
||||
|
||||
previousPlaylistRequest = beforeLastPlaylistRequest;
|
||||
beforeLastPlaylistRequest = Instant.now();
|
||||
SegmentPlaylist segmentPlaylist = getNextSegments(segmentPlaylistUrl);
|
||||
afterLastPlaylistRequest = Instant.now();
|
||||
emptyPlaylistCheck(segmentPlaylist);
|
||||
handleMissedSegments(segmentPlaylist, nextSegmentNumber);
|
||||
enqueueNewSegments(segmentPlaylist, nextSegmentNumber);
|
||||
splitRecordingIfNecessary();
|
||||
calculateRescheduleTime(segmentPlaylist, lastSegmentNumber);
|
||||
|
||||
// this if check makes sure, that we don't decrease nextSegment. for some reason
|
||||
// streamate playlists sometimes jump back. e.g. max sequence = 79 -> 80 -> 79
|
||||
lastSegmentNumber = segmentPlaylist.seq;
|
||||
if (lastSegmentNumber + segmentPlaylist.segments.size() > nextSegmentNumber) {
|
||||
nextSegmentNumber = lastSegmentNumber + segmentPlaylist.segments.size();
|
||||
}
|
||||
} catch (ParseException e) {
|
||||
LOG.error("Couldn't parse HLS playlist for model {}\n{}", model, e.getInput(), e);
|
||||
running = false;
|
||||
} catch (PlaylistException e) {
|
||||
LOG.error("Couldn't parse HLS playlist for model {}", model, e);
|
||||
running = false;
|
||||
} catch (PlaylistTimeoutException e) {
|
||||
rescheduleTime = beforeLastPlaylistRequest; // try again as fast as possible
|
||||
} catch (EOFException e) {
|
||||
// end of playlist reached
|
||||
LOG.debug("Reached end of playlist for model {}", model);
|
||||
running = false;
|
||||
} catch (HttpException e) {
|
||||
handleHttpException(e);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Couldn't download segment for model {}", model, e);
|
||||
running = false;
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
protected void execute(SegmentDownload segmentDownload) {
|
||||
CompletableFuture.supplyAsync(() -> downloadExecutor.submit(segmentDownload), downloadExecutor)
|
||||
.whenComplete((result, executor) -> {
|
||||
try {
|
||||
segmentDownloadFinished(result.get());
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
e.printStackTrace();
|
||||
} catch (ExecutionException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void handleHttpException(HttpException e) throws IOException {
|
||||
if (e.getResponseCode() == 404) {
|
||||
checkIfModelIsStillOnline("Playlist not found (404). Model {} probably went offline. Model state: {}");
|
||||
} else if (e.getResponseCode() == 403) {
|
||||
checkIfModelIsStillOnline("Playlist access forbidden (403). Model {} probably went private or offline. Model state: {}");
|
||||
} else {
|
||||
LOG.info("Playlist couldn't not be downloaded for model {}. Stopping recording", model, e);
|
||||
running = false;
|
||||
}
|
||||
}
|
||||
|
||||
protected void checkIfModelIsStillOnline(String errorMsg) throws IOException {
|
||||
ctbrec.Model.State modelState;
|
||||
try {
|
||||
modelState = model.getOnlineState(false);
|
||||
if (modelState != State.ONLINE) {
|
||||
running = false;
|
||||
}
|
||||
} catch (ExecutionException e1) {
|
||||
modelState = ctbrec.Model.State.UNKNOWN;
|
||||
}
|
||||
LOG.info(errorMsg, model, modelState);
|
||||
waitSomeTime(TEN_SECONDS);
|
||||
}
|
||||
|
||||
protected String getSegmentPlaylistUrl(Model model) throws IOException, ExecutionException, ParseException, PlaylistException, JAXBException {
|
||||
LOG.debug("{} stream idx: {}", model.getName(), model.getStreamUrlIndex());
|
||||
List<StreamSource> streamSources = model.getStreamSources();
|
||||
Collections.sort(streamSources);
|
||||
for (StreamSource streamSource : streamSources) {
|
||||
LOG.debug("{} src {}", model.getName(), streamSource);
|
||||
}
|
||||
String url = null;
|
||||
if (model.getStreamUrlIndex() >= 0 && model.getStreamUrlIndex() < streamSources.size()) {
|
||||
// TODO don't use the index, but the bandwidth. if the bandwidth does not match, take the closest one
|
||||
LOG.debug("{} selected {}", model.getName(), streamSources.get(model.getStreamUrlIndex()));
|
||||
url = streamSources.get(model.getStreamUrlIndex()).getMediaPlaylistUrl();
|
||||
} else {
|
||||
// filter out stream resolutions, which are out of range of the configured min and max
|
||||
int minRes = Config.getInstance().getSettings().minimumResolution;
|
||||
int maxRes = Config.getInstance().getSettings().maximumResolution;
|
||||
List<StreamSource> filteredStreamSources = streamSources.stream()
|
||||
.filter(src -> src.height == 0 || src.height == UNKNOWN || minRes <= src.height)
|
||||
.filter(src -> src.height == 0 || src.height == UNKNOWN || maxRes >= src.height)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (filteredStreamSources.isEmpty()) {
|
||||
throw new ExecutionException(new RuntimeException("No stream left in playlist"));
|
||||
} else {
|
||||
LOG.debug("{} selected {}", model.getName(), filteredStreamSources.get(filteredStreamSources.size() - 1));
|
||||
url = filteredStreamSources.get(filteredStreamSources.size() - 1).getMediaPlaylistUrl();
|
||||
}
|
||||
}
|
||||
LOG.debug("Segment playlist url {}", url);
|
||||
return url;
|
||||
}
|
||||
|
||||
protected SegmentPlaylist getNextSegments(String segmentPlaylistUrl) throws IOException, ParseException, PlaylistException {
|
||||
URL segmentsUrl = new URL(segmentPlaylistUrl);
|
||||
Builder builder = new Request.Builder()
|
||||
.url(segmentsUrl);
|
||||
addHeaders(builder, Optional.ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentPlaylistHeaders).orElse(new HashMap<>()), model);
|
||||
Request request = builder.build();
|
||||
|
||||
try (Response response = client.execute(request, 2000)) {
|
||||
if (response.isSuccessful()) {
|
||||
consecutivePlaylistTimeouts = 0;
|
||||
String body = response.body().string();
|
||||
previousPlaylist = lastPlaylist;
|
||||
lastPlaylist = beforeLastPlaylistRequest.toString()+"\n"+body;
|
||||
if (!body.contains("#EXTINF")) {
|
||||
// no segments, empty playlist
|
||||
return new SegmentPlaylist(segmentPlaylistUrl);
|
||||
}
|
||||
|
||||
byte[] bytes = body.getBytes(UTF_8);
|
||||
BandwidthMeter.add(bytes.length);
|
||||
InputStream inputStream = new ByteArrayInputStream(bytes);
|
||||
return parsePlaylist(segmentPlaylistUrl, inputStream);
|
||||
} else {
|
||||
throw new HttpException(response.code(), response.message());
|
||||
}
|
||||
} catch (SocketTimeoutException e) {
|
||||
LOG.debug("Playlist request timed out for model {} {} time{}", model, ++consecutivePlaylistTimeouts, (consecutivePlaylistTimeouts > 1) ? 's' : "");
|
||||
// times out, return an empty playlist, so that the process can continue without wasting much more time
|
||||
throw new PlaylistTimeoutException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private SegmentPlaylist parsePlaylist(String segmentPlaylistUrl, InputStream inputStream) throws IOException, ParseException, PlaylistException {
|
||||
PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT);
|
||||
Playlist playlist = parser.parse();
|
||||
if (playlist.hasMediaPlaylist()) {
|
||||
MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist();
|
||||
SegmentPlaylist lsp = new SegmentPlaylist(segmentPlaylistUrl);
|
||||
lsp.seq = mediaPlaylist.getMediaSequenceNumber();
|
||||
lsp.targetDuration = mediaPlaylist.getTargetDuration();
|
||||
|
||||
List<TrackData> tracks = mediaPlaylist.getTracks();
|
||||
for (TrackData trackData : tracks) {
|
||||
String uri = trackData.getUri();
|
||||
if (!uri.startsWith("http")) {
|
||||
URL context = new URL(segmentPlaylistUrl);
|
||||
uri = new URL(context, uri).toExternalForm();
|
||||
}
|
||||
lsp.totalDuration += trackData.getTrackInfo().duration;
|
||||
lsp.segments.add(uri);
|
||||
if (trackData.hasEncryptionData()) {
|
||||
lsp.encrypted = true;
|
||||
EncryptionData data = trackData.getEncryptionData();
|
||||
lsp.encryptionKeyUrl = data.getUri();
|
||||
lsp.encryptionMethod = data.getMethod().getValue();
|
||||
}
|
||||
}
|
||||
lsp.avgSegDuration = lsp.totalDuration / tracks.size();
|
||||
return lsp;
|
||||
}
|
||||
throw new InvalidPlaylistException("Playlist has no media playlist");
|
||||
}
|
||||
|
||||
protected void emptyPlaylistCheck(SegmentPlaylist playlist) {
|
||||
if(playlist.segments.isEmpty()) {
|
||||
playlistEmptyCount++;
|
||||
try {
|
||||
Thread.sleep(6000);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
} else {
|
||||
playlistEmptyCount = 0;
|
||||
}
|
||||
if(playlistEmptyCount == 10) {
|
||||
LOG.info("Last 10 playlists were empty for {}. Stopping recording!", getModel());
|
||||
internalStop();
|
||||
}
|
||||
}
|
||||
|
||||
private void handleMissedSegments(SegmentPlaylist playlist, int nextSegmentNumber) {
|
||||
if (nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) {
|
||||
LOG.warn("Missed segments: {} < {} in download for {}", nextSegmentNumber, playlist.seq, model);
|
||||
LOG.warn("Missed segments: Last 2 [{}] [{}], schedule was {}, request took {} ms", previousPlaylistRequest, beforeLastPlaylistRequest, rescheduleTime, Duration.between(beforeLastPlaylistRequest, afterLastPlaylistRequest));
|
||||
LOG.warn("Missed segments: previous playlist\n{}", previousPlaylist);
|
||||
LOG.warn("Missed segments: last playlist\n{}", lastPlaylist);
|
||||
short missedSegments = (short) (playlist.seq - nextSegmentNumber);
|
||||
MissedSegmentsStatistics.increase(model, missedSegments);
|
||||
}
|
||||
}
|
||||
|
||||
private void splitRecordingIfNecessary() {
|
||||
if (splittingStrategy.splitNecessary(this)) {
|
||||
internalStop();
|
||||
}
|
||||
}
|
||||
|
||||
protected SplittingStrategy initSplittingStrategy(Settings settings) {
|
||||
SplittingStrategy strategy;
|
||||
switch (settings.splitStrategy) {
|
||||
case TIME:
|
||||
strategy = new TimeSplittingStrategy();
|
||||
break;
|
||||
case SIZE:
|
||||
strategy = new SizeSplittingStrategy();
|
||||
break;
|
||||
case TIME_OR_SIZE:
|
||||
SplittingStrategy timeSplittingStrategy = new TimeSplittingStrategy();
|
||||
SplittingStrategy sizeSplittingStrategy = new SizeSplittingStrategy();
|
||||
strategy = new CombinedSplittingStrategy(timeSplittingStrategy, sizeSplittingStrategy);
|
||||
break;
|
||||
case DONT:
|
||||
default:
|
||||
strategy = new NoopSplittingStrategy();
|
||||
break;
|
||||
}
|
||||
strategy.init(settings);
|
||||
return strategy;
|
||||
}
|
||||
|
||||
protected void enqueueNewSegments(SegmentPlaylist playlist, int nextSegmentNumber) throws IOException {
|
||||
int skip = nextSegmentNumber - playlist.seq;
|
||||
for (String segment : playlist.segments) {
|
||||
if (skip > 0) {
|
||||
skip--;
|
||||
} else {
|
||||
URL segmentUrl = new URL(segment);
|
||||
String prefix = nf.format(segmentCounter++);
|
||||
File tmp = new File(segmentUrl.getFile());
|
||||
OutputStream targetStream = getSegmentOutputStream(prefix, tmp.getName());
|
||||
SegmentDownload segmentDownload = new SegmentDownload(model, playlist, segmentUrl, client, targetStream);
|
||||
execute(segmentDownload);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void calculateRescheduleTime(SegmentPlaylist playlist, int lastSegmentNumber) {
|
||||
// long waitForMillis = 0;
|
||||
// if (lastSegmentNumber == playlist.seq) {
|
||||
// // playlist didn't change -> wait for at least half the target duration
|
||||
// waitForMillis = (long) playlist.avgSegDuration * 1000 / 2;
|
||||
// LOG.trace("Playlist didn't change. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis);
|
||||
// } else {
|
||||
// // playlist did change -> wait for at least the target duration
|
||||
// waitForMillis = (long) (playlist.avgSegDuration * 1000);
|
||||
// LOG.trace("Playlist changed. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis);
|
||||
// }
|
||||
// rescheduleTime = beforeLastPlaylistRequest.plusMillis(waitForMillis);
|
||||
rescheduleTime = beforeLastPlaylistRequest.plusMillis(1000);
|
||||
}
|
||||
|
||||
/**
|
||||
* Causes the current thread to sleep for a short amount of time.
|
||||
* This is used to slow down retries, if something is wrong with the playlist.
|
||||
* E.g. HTTP 403 or 404
|
||||
*/
|
||||
protected void waitSomeTime(long waitForMillis) {
|
||||
try {
|
||||
Thread.sleep(waitForMillis);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
if(running) {
|
||||
LOG.error("Couldn't sleep. This might mess up the download!");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected static void addHeaders(Builder builder, Map<String, String> headers, Model model) {
|
||||
headers.putIfAbsent(ACCEPT, "*/*");
|
||||
headers.putIfAbsent(ACCEPT_LANGUAGE, Locale.ENGLISH.getLanguage());
|
||||
headers.putIfAbsent(USER_AGENT, Config.getInstance().getSettings().httpUserAgent);
|
||||
headers.putIfAbsent(CONNECTION, KEEP_ALIVE);
|
||||
headers.computeIfAbsent(HttpConstants.ORIGIN, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null));
|
||||
headers.computeIfAbsent(REFERER, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null));
|
||||
|
||||
for (Entry<String, String> header : headers.entrySet()) {
|
||||
builder.header(header.getKey(), header.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Model getModel() {
|
||||
return model;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return running;
|
||||
}
|
||||
}
|
|
@ -32,7 +32,7 @@ import ctbrec.recorder.download.ProcessExitedUncleanException;
|
|||
* Does the whole HLS download with FFmpeg. Not used at the moment, because FFMpeg can't
|
||||
* handle the HLS encryption of Flirt4Free correctly
|
||||
*/
|
||||
public class FFmpegDownload extends AbstractHlsDownload2 {
|
||||
public class FFmpegDownload extends AbstractHlsDownload {
|
||||
private static final transient Logger LOG = LoggerFactory.getLogger(FFmpegDownload.class);
|
||||
|
||||
private transient Config config;
|
||||
|
|
|
@ -29,7 +29,7 @@ import ctbrec.io.HttpClient;
|
|||
import ctbrec.io.IoUtils;
|
||||
import ctbrec.recorder.PlaylistGenerator;
|
||||
|
||||
public class HlsDownload extends AbstractHlsDownload2 {
|
||||
public class HlsDownload extends AbstractHlsDownload {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(HlsDownload.class);
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ import ctbrec.recorder.download.ProcessExitedUncleanException;
|
|||
import okhttp3.Request;
|
||||
import okhttp3.Response;
|
||||
|
||||
public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 {
|
||||
public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MergedFfmpegHlsDownload.class);
|
||||
|
||||
|
@ -60,7 +60,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 {
|
|||
}
|
||||
|
||||
@Override
|
||||
public AbstractHlsDownload2 call() throws Exception {
|
||||
public AbstractHlsDownload call() throws Exception {
|
||||
super.call();
|
||||
try {
|
||||
if (!ffmpegProcess.isAlive()) {
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
package ctbrec.recorder.download.hls;
|
||||
|
||||
import static ctbrec.recorder.download.hls.AbstractHlsDownload2.*;
|
||||
import static ctbrec.recorder.download.hls.AbstractHlsDownload.*;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
|
Loading…
Reference in New Issue