forked from j62/ctbrec
1
0
Fork 0

Refactored HLS downloads

This commit is contained in:
0xb00bface 2020-12-23 01:53:05 +01:00
parent 52cdc82044
commit 65ffbf525d
16 changed files with 617 additions and 608 deletions

View File

@ -703,7 +703,7 @@ public class RecordingsTab extends Tab implements TabSelectionListener {
} else { } else {
URL url = new URL(hlsBase + '/' + recording.getId() + "/playlist.m3u8"); URL url = new URL(hlsBase + '/' + recording.getId() + "/playlist.m3u8");
MergedFfmpegHlsDownload download = new MergedFfmpegHlsDownload(CamrecApplication.httpClient); MergedFfmpegHlsDownload download = new MergedFfmpegHlsDownload(CamrecApplication.httpClient);
download.init(config, recording.getModel(), Instant.now()); download.init(config, recording.getModel(), Instant.now(), Executors.newSingleThreadExecutor());
LOG.info("Downloading {}", url); LOG.info("Downloading {}", url);
download.downloadFinishedRecording(url.toString(), target, createDownloadListener(recording), recording.getSizeInByte()); download.downloadFinishedRecording(url.toString(), target, createDownloadListener(recording), recording.getSizeInByte());
} }

View File

@ -235,7 +235,7 @@ public class NextGenLocalRecorder implements Recorder {
private Download createDownload(Model model) { private Download createDownload(Model model) {
Download download = model.createDownload(); Download download = model.createDownload();
download.init(config, model, Instant.now()); download.init(config, model, Instant.now(), downloadPool);
Objects.requireNonNull(download.getStartTime(), Objects.requireNonNull(download.getStartTime(),
"At this point the download should have set a startTime. Make sure to set a startTime in " + download.getClass() + ".init()"); "At this point the download should have set a startTime. Make sure to set a startTime in " + download.getClass() + ".init()");
LOG.debug("Downloading with {}", download.getClass().getSimpleName()); LOG.debug("Downloading with {}", download.getClass().getSimpleName());
@ -626,7 +626,7 @@ public class NextGenLocalRecorder implements Recorder {
for (Recording other : recordings) { for (Recording other : recordings) {
if(other.equals(recording)) { if(other.equals(recording)) {
Download download = other.getModel().createDownload(); Download download = other.getModel().createDownload();
download.init(Config.getInstance(), other.getModel(), other.getStartDate()); download.init(Config.getInstance(), other.getModel(), other.getStartDate(), downloadPool);
other.setDownload(download); other.setDownload(download);
other.setPostProcessedFile(null); other.setPostProcessedFile(null);
other.setStatus(State.WAITING); other.setStatus(State.WAITING);

View File

@ -4,13 +4,14 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.time.Instant; import java.time.Instant;
import java.util.concurrent.ExecutorService;
import ctbrec.Config; import ctbrec.Config;
import ctbrec.Model; import ctbrec.Model;
import ctbrec.Recording; import ctbrec.Recording;
public interface Download extends Serializable { public interface Download extends Serializable {
public void init(Config config, Model model, Instant startTime); public void init(Config config, Model model, Instant startTime, ExecutorService executorService);
public void start() throws IOException; public void start() throws IOException;
public void stop(); public void stop();
public Model getModel(); public Model getModel();

View File

@ -19,6 +19,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -229,7 +230,7 @@ public class DashDownload extends AbstractDownload {
} }
@Override @Override
public void init(Config config, Model model, Instant startTime) { public void init(Config config, Model model, Instant startTime, ExecutorService executorService) {
this.config = config; this.config = config;
this.model = model; this.model = model;
this.startTime = startTime; this.startTime = startTime;

View File

@ -0,0 +1,368 @@
package ctbrec.recorder.download.hls;
import static ctbrec.io.HttpConstants.*;
import static ctbrec.io.HttpConstants.ORIGIN;
import static ctbrec.recorder.download.StreamSource.*;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import javax.xml.bind.JAXBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.iheartradio.m3u8.Encoding;
import com.iheartradio.m3u8.Format;
import com.iheartradio.m3u8.ParseException;
import com.iheartradio.m3u8.ParsingMode;
import com.iheartradio.m3u8.PlaylistException;
import com.iheartradio.m3u8.PlaylistParser;
import com.iheartradio.m3u8.data.EncryptionData;
import com.iheartradio.m3u8.data.MediaPlaylist;
import com.iheartradio.m3u8.data.Playlist;
import com.iheartradio.m3u8.data.TrackData;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.Settings;
import ctbrec.UnknownModel;
import ctbrec.io.BandwidthMeter;
import ctbrec.io.HttpClient;
import ctbrec.io.HttpException;
import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException;
import ctbrec.recorder.download.AbstractDownload;
import ctbrec.recorder.download.HttpHeaderFactory;
import ctbrec.recorder.download.SplittingStrategy;
import ctbrec.recorder.download.StreamSource;
import ctbrec.sites.Site;
import okhttp3.Request;
import okhttp3.Request.Builder;
import okhttp3.Response;
public abstract class AbstractHlsDownload2 extends AbstractDownload {
private static final transient Logger LOG = LoggerFactory.getLogger(AbstractHlsDownload2.class);
private static final int TEN_SECONDS = 10_000;
private transient NumberFormat nf = new DecimalFormat("000000");
private transient int playlistEmptyCount = 0;
private transient int segmentCounter = 1;
private transient int waitFactor = 1;
protected transient Config config;
protected transient HttpClient client;
protected transient ExecutorService downloadExecutor;
protected transient volatile boolean running = false;
protected transient SplittingStrategy splittingStrategy;
protected Model model = new UnknownModel();
protected AbstractHlsDownload2(HttpClient client) {
this.client = client;
}
protected void onStart() throws IOException {}
protected abstract void createTargetDirectory() throws IOException;
protected abstract void execute(SegmentDownload segmentDownload);
protected abstract OutputStream getSegmentOutputStream(String prefix, String fileName) throws IOException;
protected void segmentDownloadFinished(SegmentDownload segmentDownload) {}
protected abstract void internalStop();
protected void onFinish() {}
protected void finalizeDownload() {}
@Override
public void init(Config config, Model model, Instant startTime, ExecutorService executorService) {
this.config = config;
this.model = model;
this.startTime = startTime;
this.downloadExecutor = executorService;
splittingStrategy = initSplittingStrategy(config.getSettings());
}
@Override
public void start() throws IOException {
running = true;
try {
onStart();
String segmentPlaylistUrl = getSegmentPlaylistUrl(model);
createTargetDirectory();
int lastSegmentNumber = 0;
int nextSegmentNumber = 0;
while (running) {
SegmentPlaylist segmentPlaylist = getNextSegments(segmentPlaylistUrl);
emptyPlaylistCheck(segmentPlaylist);
handleMissedSegments(segmentPlaylist, nextSegmentNumber);
enqueueNewSegments(segmentPlaylist, nextSegmentNumber);
splitRecordingIfNecessary();
waitSomeTime(segmentPlaylist, lastSegmentNumber, waitFactor);
// this if check makes sure, that we don't decrease nextSegment. for some reason
// streamate playlists sometimes jump back. e.g. max sequence = 79 -> 80 -> 79
lastSegmentNumber = segmentPlaylist.seq;
if (lastSegmentNumber + segmentPlaylist.segments.size() > nextSegmentNumber) {
nextSegmentNumber = lastSegmentNumber + segmentPlaylist.segments.size();
}
}
onFinish();
} catch (ParseException e) {
throw new IOException("Couldn't parse HLS playlist for model " + model + "\n" + e.getInput(), e);
} catch (PlaylistException e) {
throw new IOException("Couldn't parse HLS playlist for model " + model, e);
} catch (EOFException e) {
// end of playlist reached
LOG.debug("Reached end of playlist for model {}", model);
} catch (HttpException e) {
handleHttpException(e);
} catch (Exception e) {
throw new IOException("Couldn't download segment", e);
} finally {
finalizeDownload();
running = false;
}
}
protected String getSegmentPlaylistUrl(Model model) throws IOException, ExecutionException, ParseException, PlaylistException, JAXBException {
LOG.debug("{} stream idx: {}", model.getName(), model.getStreamUrlIndex());
List<StreamSource> streamSources = model.getStreamSources();
Collections.sort(streamSources);
for (StreamSource streamSource : streamSources) {
LOG.debug("{} src {}", model.getName(), streamSource);
}
String url = null;
if (model.getStreamUrlIndex() >= 0 && model.getStreamUrlIndex() < streamSources.size()) {
// TODO don't use the index, but the bandwidth. if the bandwidth does not match, take the closest one
LOG.debug("{} selected {}", model.getName(), streamSources.get(model.getStreamUrlIndex()));
url = streamSources.get(model.getStreamUrlIndex()).getMediaPlaylistUrl();
} else {
// filter out stream resolutions, which are out of range of the configured min and max
int minRes = Config.getInstance().getSettings().minimumResolution;
int maxRes = Config.getInstance().getSettings().maximumResolution;
List<StreamSource> filteredStreamSources = streamSources.stream()
.filter(src -> src.height == 0 || src.height == UNKNOWN || minRes <= src.height)
.filter(src -> src.height == 0 || src.height == UNKNOWN || maxRes >= src.height)
.collect(Collectors.toList());
if (filteredStreamSources.isEmpty()) {
throw new ExecutionException(new RuntimeException("No stream left in playlist"));
} else {
LOG.debug("{} selected {}", model.getName(), filteredStreamSources.get(filteredStreamSources.size() - 1));
url = filteredStreamSources.get(filteredStreamSources.size() - 1).getMediaPlaylistUrl();
}
}
LOG.debug("Segment playlist url {}", url);
return url;
}
protected SegmentPlaylist getNextSegments(String segmentsURL) throws IOException, ParseException, PlaylistException {
URL segmentsUrl = new URL(segmentsURL);
Builder builder = new Request.Builder()
.url(segmentsUrl);
addHeaders(builder, Optional.ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentPlaylistHeaders).orElse(new HashMap<>()), model);
Request request = builder.build();
try (Response response = client.execute(request)) {
if (response.isSuccessful()) {
String body = response.body().string();
if (!body.contains("#EXTINF")) {
// no segments, empty playlist
return new SegmentPlaylist(segmentsURL);
}
byte[] bytes = body.getBytes(StandardCharsets.UTF_8);
BandwidthMeter.add(bytes.length);
InputStream inputStream = new ByteArrayInputStream(bytes);
PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT);
Playlist playlist = parser.parse();
if (playlist.hasMediaPlaylist()) {
MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist();
SegmentPlaylist lsp = new SegmentPlaylist(segmentsURL);
lsp.seq = mediaPlaylist.getMediaSequenceNumber();
lsp.targetDuration = mediaPlaylist.getTargetDuration();
List<TrackData> tracks = mediaPlaylist.getTracks();
for (TrackData trackData : tracks) {
String uri = trackData.getUri();
if (!uri.startsWith("http")) {
URL context = new URL(segmentsURL);
uri = new URL(context, uri).toExternalForm();
}
lsp.totalDuration += trackData.getTrackInfo().duration;
lsp.lastSegDuration = trackData.getTrackInfo().duration;
lsp.segments.add(uri);
if (trackData.hasEncryptionData()) {
lsp.encrypted = true;
EncryptionData data = trackData.getEncryptionData();
lsp.encryptionKeyUrl = data.getUri();
lsp.encryptionMethod = data.getMethod().getValue();
}
}
return lsp;
}
throw new InvalidPlaylistException("Playlist has no media playlist");
} else {
throw new HttpException(response.code(), response.message());
}
}
}
protected void emptyPlaylistCheck(SegmentPlaylist playlist) {
if(playlist.segments.isEmpty()) {
playlistEmptyCount++;
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
playlistEmptyCount = 0;
}
if(playlistEmptyCount == 10) {
LOG.info("Last 10 playlists were empty for {}. Stopping recording!", getModel());
internalStop();
}
}
private void handleMissedSegments(SegmentPlaylist playlist, int nextSegmentNumber) {
if (nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) {
waitFactor *= 2;
LOG.warn("Missed segments {} < {} in download for {} - setting wait factor to 1/{}", nextSegmentNumber, playlist.seq, model,
waitFactor);
}
}
private void splitRecordingIfNecessary() {
if (splittingStrategy.splitNecessary(this)) {
internalStop();
}
}
protected SplittingStrategy initSplittingStrategy(Settings settings) {
SplittingStrategy strategy;
switch (settings.splitStrategy) {
case TIME:
strategy = new TimeSplittingStrategy();
break;
case SIZE:
strategy = new SizeSplittingStrategy();
break;
case TIME_OR_SIZE:
SplittingStrategy timeSplittingStrategy = new TimeSplittingStrategy();
SplittingStrategy sizeSplittingStrategy = new SizeSplittingStrategy();
strategy = new CombinedSplittingStrategy(timeSplittingStrategy, sizeSplittingStrategy);
break;
case DONT:
default:
strategy = new NoopSplittingStrategy();
break;
}
strategy.init(settings);
return strategy;
}
protected void enqueueNewSegments(SegmentPlaylist playlist, int nextSegmentNumber) throws IOException {
int skip = nextSegmentNumber - playlist.seq;
for (String segment : playlist.segments) {
if (skip > 0) {
skip--;
} else {
URL segmentUrl = new URL(segment);
String prefix = nf.format(segmentCounter++);
File tmp = new File(segmentUrl.getFile());
OutputStream targetStream = getSegmentOutputStream(prefix, tmp.getName());
SegmentDownload segmentDownload = new SegmentDownload(model, playlist, segmentUrl, client, targetStream);
execute(segmentDownload);
}
}
}
private void waitSomeTime(SegmentPlaylist playlist, int lastSegmentNumber, int waitFactor) {
long waitForMillis = 0;
if (lastSegmentNumber == playlist.seq) {
// playlist didn't change -> wait for at least half the target duration
waitForMillis = (long) playlist.targetDuration * 1000 / waitFactor;
LOG.trace("Playlist didn't change... waiting for {}ms", waitForMillis);
} else {
// playlist did change -> wait for at least last segment duration
waitForMillis = 1;
LOG.trace("Playlist changed... waiting for {}ms", waitForMillis);
}
waitSomeTime(waitForMillis);
}
/**
* Causes the current thread to sleep for a short amount of time.
* This is used to slow down retries, if something is wrong with the playlist.
* E.g. HTTP 403 or 404
*/
protected void waitSomeTime(long waitForMillis) {
try {
Thread.sleep(waitForMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if(running) {
LOG.error("Couldn't sleep. This might mess up the download!");
}
}
}
private void handleHttpException(HttpException e) throws IOException {
if (e.getResponseCode() == 404) {
ctbrec.Model.State modelState;
try {
modelState = model.getOnlineState(false);
} catch (ExecutionException e1) {
modelState = ctbrec.Model.State.UNKNOWN;
}
LOG.info("Playlist not found (404). Model {} probably went offline. Model state: {}", model, modelState);
waitSomeTime(TEN_SECONDS);
} else if (e.getResponseCode() == 403) {
ctbrec.Model.State modelState;
try {
modelState = model.getOnlineState(false);
} catch (ExecutionException e1) {
modelState = ctbrec.Model.State.UNKNOWN;
}
LOG.info("Playlist access forbidden (403). Model {} probably went private or offline. Model state: {}", model, modelState);
waitSomeTime(TEN_SECONDS);
} else {
throw e;
}
}
protected static void addHeaders(Builder builder, Map<String, String> headers, Model model) {
headers.putIfAbsent(ACCEPT, "*/*");
headers.putIfAbsent(ACCEPT_LANGUAGE, Locale.ENGLISH.getLanguage());
headers.putIfAbsent(USER_AGENT, Config.getInstance().getSettings().httpUserAgent);
headers.putIfAbsent(CONNECTION, KEEP_ALIVE);
headers.computeIfAbsent(ORIGIN, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null));
headers.computeIfAbsent(REFERER, k -> Optional.ofNullable(model).map(Model::getSite).map(Site::getBaseUrl).orElse(null));
for (Entry<String, String> header : headers.entrySet()) {
builder.header(header.getKey(), header.getValue());
}
}
@Override
public Model getModel() {
return model;
}
}

View File

@ -9,6 +9,7 @@ import java.nio.file.Files;
import java.time.Instant; import java.time.Instant;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import javax.xml.bind.JAXBException; import javax.xml.bind.JAXBException;
@ -43,7 +44,7 @@ public class FFmpegDownload extends AbstractHlsDownload {
} }
@Override @Override
public void init(Config config, Model model, Instant startTime) { public void init(Config config, Model model, Instant startTime, ExecutorService executorService) {
this.config = config; this.config = config;
this.model = model; this.model = model;
this.startTime = startTime; this.startTime = startTime;

View File

@ -1,27 +1,19 @@
package ctbrec.recorder.download.hls; package ctbrec.recorder.download.hls;
import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.OutputStream;
import java.io.InterruptedIOException;
import java.net.URL;
import java.nio.file.FileSystems; import java.nio.file.FileSystems;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.time.Instant; import java.time.Instant;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.HashMap; import java.util.concurrent.CompletableFuture;
import java.util.Optional; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -36,100 +28,39 @@ import ctbrec.Config;
import ctbrec.Model; import ctbrec.Model;
import ctbrec.Recording; import ctbrec.Recording;
import ctbrec.Recording.State; import ctbrec.Recording.State;
import ctbrec.io.BandwidthMeter;
import ctbrec.io.HttpClient; import ctbrec.io.HttpClient;
import ctbrec.io.HttpException;
import ctbrec.io.IoUtils; import ctbrec.io.IoUtils;
import ctbrec.recorder.PlaylistGenerator; import ctbrec.recorder.PlaylistGenerator;
import ctbrec.recorder.download.HttpHeaderFactory;
import okhttp3.Request;
import okhttp3.Request.Builder;
import okhttp3.Response;
public class HlsDownload extends AbstractHlsDownload { public class HlsDownload extends AbstractHlsDownload2 {
private static final int TEN_SECONDS = 10_000;
private static final Logger LOG = LoggerFactory.getLogger(HlsDownload.class); private static final Logger LOG = LoggerFactory.getLogger(HlsDownload.class);
protected transient Path downloadDir; protected transient Path downloadDir;
private int segmentCounter = 1;
private NumberFormat nf = new DecimalFormat("000000");
private transient AtomicBoolean downloadFinished = new AtomicBoolean(false); private transient AtomicBoolean downloadFinished = new AtomicBoolean(false);
protected transient Config config;
private transient int waitFactor = 1;
public HlsDownload(HttpClient client) { public HlsDownload(HttpClient client) {
super(client); super(client);
} }
@Override @Override
public void init(Config config, Model model, Instant startTime) { public void init(Config config, Model model, Instant startTime, ExecutorService executorService) {
this.config = config; super.init(config, model, startTime, executorService);
super.model = model;
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.RECORDING_DATE_FORMAT); DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.RECORDING_DATE_FORMAT);
this.startTime = startTime;
String formattedStartTime = formatter.format(ZonedDateTime.ofInstant(this.startTime, ZoneId.systemDefault())); String formattedStartTime = formatter.format(ZonedDateTime.ofInstant(this.startTime, ZoneId.systemDefault()));
Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getSanitizedNamed()); Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getSanitizedNamed());
downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), formattedStartTime); downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), formattedStartTime);
splittingStrategy = initSplittingStrategy(config.getSettings());
} }
@Override @Override
public void start() throws IOException { protected void createTargetDirectory() throws IOException {
try { if (!downloadDir.toFile().exists()) {
running = true; Files.createDirectories(downloadDir);
Thread.currentThread().setName("Download " + model.getName());
String segments = getSegmentPlaylistUrl(model);
if (segments != null) {
if (!downloadDir.toFile().exists()) {
Files.createDirectories(downloadDir);
}
int lastSegmentNumber = 0;
int nextSegmentNumber = 0;
while (running) {
SegmentPlaylist playlist = getNextSegments(segments);
emptyPlaylistCheck(playlist);
logMissedSegments(playlist, nextSegmentNumber);
enqueueNewSegments(playlist, nextSegmentNumber);
splitRecordingIfNecessary();
waitSomeTime(playlist, lastSegmentNumber, waitFactor);
// this if check makes sure, that we don't decrease nextSegment. for some reason
// streamate playlists sometimes jump back. e.g. max sequence = 79 -> 80 -> 79
lastSegmentNumber = playlist.seq;
if (lastSegmentNumber + playlist.segments.size() > nextSegmentNumber) {
nextSegmentNumber = lastSegmentNumber + playlist.segments.size();
}
}
} else {
throw new IOException("Couldn't determine segments uri");
}
} catch (ParseException e) {
throw new IOException("Couldn't parse HLS playlist for model " + model + "\n" + e.getInput(), e);
} catch (PlaylistException e) {
throw new IOException("Couldn't parse HLS playlist for model " + model, e);
} catch (EOFException e) {
// end of playlist reached
LOG.debug("Reached end of playlist for model {}", model);
} catch (HttpException e) {
handleHttpException(e);
} catch (Exception e) {
throw new IOException("Couldn't download segment", e);
} finally {
finalizeDownload();
} }
} }
private void finalizeDownload() { @Override
downloadThreadPool.shutdown(); protected void finalizeDownload() {
try {
LOG.debug("Waiting for last segments for {}", model);
downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
downloadFinished.set(true); downloadFinished.set(true);
synchronized (downloadFinished) { synchronized (downloadFinished) {
downloadFinished.notifyAll(); downloadFinished.notifyAll();
@ -137,90 +68,6 @@ public class HlsDownload extends AbstractHlsDownload {
LOG.debug("Download for {} terminated", model); LOG.debug("Download for {} terminated", model);
} }
private void handleHttpException(HttpException e) throws IOException {
if (e.getResponseCode() == 404) {
ctbrec.Model.State modelState;
try {
modelState = model.getOnlineState(false);
} catch (ExecutionException e1) {
modelState = ctbrec.Model.State.UNKNOWN;
}
LOG.info("Playlist not found (404). Model {} probably went offline. Model state: {}", model, modelState);
waitSomeTime(TEN_SECONDS);
} else if (e.getResponseCode() == 403) {
ctbrec.Model.State modelState;
try {
modelState = model.getOnlineState(false);
} catch (ExecutionException e1) {
modelState = ctbrec.Model.State.UNKNOWN;
}
LOG.info("Playlist access forbidden (403). Model {} probably went private or offline. Model state: {}", model, modelState);
waitSomeTime(TEN_SECONDS);
} else {
throw e;
}
}
private void splitRecordingIfNecessary() {
if (splittingStrategy.splitNecessary(this)) {
internalStop();
}
}
private void enqueueNewSegments(SegmentPlaylist playlist, int nextSegmentNumber) throws IOException, ExecutionException, InterruptedException {
int skip = nextSegmentNumber - playlist.seq;
for (String segment : playlist.segments) {
if (skip > 0) {
skip--;
} else {
URL segmentUrl = new URL(segment);
String prefix = nf.format(segmentCounter++);
SegmentDownload segmentDownload = new SegmentDownload(playlist, segmentUrl, downloadDir, client, prefix);
enqueueDownload(segmentDownload, prefix, segmentUrl);
}
}
}
private void logMissedSegments(SegmentPlaylist playlist, int nextSegmentNumber) {
if (nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) {
waitFactor *= 2;
LOG.warn("Missed segments {} < {} in download for {} - setting wait factor to 1/{}", nextSegmentNumber, playlist.seq, model,
waitFactor);
}
}
private void waitSomeTime(SegmentPlaylist playlist, int lastSegmentNumber, int waitFactor) {
long waitForMillis = 0;
if (lastSegmentNumber == playlist.seq) {
// playlist didn't change -> wait for at least half the target duration
waitForMillis = (long) playlist.targetDuration * 1000 / waitFactor;
LOG.trace("Playlist didn't change... waiting for {}ms", waitForMillis);
} else {
// playlist did change -> wait for at least last segment duration
waitForMillis = 1;
LOG.trace("Playlist changed... waiting for {}ms", waitForMillis);
}
waitSomeTime(waitForMillis);
}
private void enqueueDownload(SegmentDownload segmentDownload, String prefix, URL segmentUrl) throws IOException, ExecutionException, InterruptedException {
try {
downloadThreadPool.submit(segmentDownload);
if (downloadQueue.remainingCapacity() < 10) {
LOG.debug("space left in queue {}", downloadQueue.remainingCapacity());
// if the queue is running full, we might be struggling with timeouts
// let's check, if the model is still online
if (!model.isOnline(true)) {
downloadQueue.clear();
internalStop();
}
}
} catch(RejectedExecutionException e) {
LOG.warn("Download queue is full ({}). Skipping segment {} {}", downloadQueue.size(), prefix, segmentUrl);
}
}
@Override @Override
public void postprocess(Recording recording) { public void postprocess(Recording recording) {
Thread.currentThread().setName("PP " + model.getName()); Thread.currentThread().setName("PP " + model.getName());
@ -249,10 +96,22 @@ public class HlsDownload extends AbstractHlsDownload {
} }
@Override
protected void execute(SegmentDownload segmentDownload) {
CompletableFuture.supplyAsync(segmentDownload::call).whenComplete((result, exception) -> {
if (result != null) {
try {
result.getOutputStream().close();
} catch (IOException e) {
LOG.error("Couldn't close segment file", e);
}
}
});
}
@Override @Override
public void stop() { public void stop() {
if (running) { if (running) {
internalStop();
try { try {
synchronized (downloadFinished) { synchronized (downloadFinished) {
while (!downloadFinished.get()) { while (!downloadFinished.get()) {
@ -264,66 +123,12 @@ public class HlsDownload extends AbstractHlsDownload {
LOG.error("Couldn't wait for download to finish", e); LOG.error("Couldn't wait for download to finish", e);
} }
} }
internalStop();
} }
@Override @Override
void internalStop() { protected void internalStop() {
running = false; running = false;
downloadThreadPool.shutdownNow();
}
private class SegmentDownload implements Callable<Boolean> {
private URL url;
private Path file;
private HttpClient client;
private SegmentPlaylist playlist;
public SegmentDownload(SegmentPlaylist playlist, URL url, Path dir, HttpClient client, String prefix) {
this.playlist = playlist;
this.url = url;
this.client = client;
File path = new File(url.getPath());
file = FileSystems.getDefault().getPath(dir.toString(), prefix + '_' + path.getName());
}
@Override
public Boolean call() throws Exception {
LOG.trace("Downloading segment {} to {}", url, file);
for (int tries = 1; tries <= 3 && !Thread.currentThread().isInterrupted(); tries++) {
Builder builder = new Request.Builder().url(url);
addHeaders(builder, Optional.ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentHeaders).orElse(new HashMap<>()));
Request request = builder.build();
InputStream in = null;
try (Response response = client.execute(request); FileOutputStream fos = new FileOutputStream(file.toFile())) {
if (response.isSuccessful()) {
in = response.body().byteStream();
if(playlist.encrypted) {
in = new Crypto(playlist.encryptionKeyUrl, client).wrap(in);
}
byte[] b = new byte[1024 * 100];
int length = -1;
while( (length = in.read(b)) >= 0 && !Thread.currentThread().isInterrupted()) {
fos.write(b, 0, length);
BandwidthMeter.add(length);
}
return true;
}
} catch(FileNotFoundException e) {
LOG.debug("Segment does not exist {}", url.getFile());
break;
} catch (InterruptedIOException e) {
break;
} catch(Exception e) {
LOG.error("Error", e);
if (tries == 3) {
LOG.warn("Error while downloading segment. Segment {} finally failed: {}", file.toFile().getName(), e.getMessage());
} else {
LOG.debug("Error while downloading segment on try {} - {}", tries, e.getMessage());
}
}
}
return false;
}
} }
@Override @Override
@ -348,4 +153,19 @@ public class HlsDownload extends AbstractHlsDownload {
public long getSizeInByte() { public long getSizeInByte() {
return IoUtils.getDirectorySize(getTarget()); return IoUtils.getDirectorySize(getTarget());
} }
@Override
protected OutputStream getSegmentOutputStream(String prefix, String fileName) throws FileNotFoundException {
File file = FileSystems.getDefault().getPath(downloadDir.toAbsolutePath().toString(), prefix + '_' + fileName).toFile();
return new FileOutputStream(file);
}
@Override
protected void segmentDownloadFinished(SegmentDownload segmentDownload) {
try {
segmentDownload.getOutputStream().close();
} catch (IOException e) {
LOG.warn("Couldn't close segment file");
}
}
} }

View File

@ -2,75 +2,55 @@ package ctbrec.recorder.download.hls;
import static java.util.Optional.*; import static java.util.Optional.*;
import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.nio.file.Files; import java.nio.file.Files;
import java.time.Instant; import java.time.Instant;
import java.util.HashMap; import java.util.concurrent.BlockingQueue;
import java.util.LinkedList; import java.util.concurrent.ExecutorService;
import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.iheartradio.m3u8.ParseException;
import com.iheartradio.m3u8.PlaylistException;
import ctbrec.Config; import ctbrec.Config;
import ctbrec.Hmac; import ctbrec.Hmac;
import ctbrec.Model; import ctbrec.Model;
import ctbrec.OS; import ctbrec.OS;
import ctbrec.Recording; import ctbrec.Recording;
import ctbrec.io.BandwidthMeter;
import ctbrec.io.HttpClient; import ctbrec.io.HttpClient;
import ctbrec.io.HttpException; import ctbrec.io.HttpException;
import ctbrec.recorder.FFmpeg; import ctbrec.recorder.FFmpeg;
import ctbrec.recorder.ProgressListener; import ctbrec.recorder.ProgressListener;
import ctbrec.recorder.download.HttpHeaderFactory;
import ctbrec.recorder.download.ProcessExitedUncleanException; import ctbrec.recorder.download.ProcessExitedUncleanException;
import okhttp3.Request; import okhttp3.Request;
import okhttp3.Request.Builder;
import okhttp3.Response; import okhttp3.Response;
public class MergedFfmpegHlsDownload extends AbstractHlsDownload { public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(MergedFfmpegHlsDownload.class); private static final Logger LOG = LoggerFactory.getLogger(MergedFfmpegHlsDownload.class);
private static final boolean IGNORE_CACHE = true;
private File targetFile; private File targetFile;
private transient Config config;
private transient Process ffmpegProcess; private transient Process ffmpegProcess;
private transient OutputStream ffmpegStdIn; private transient OutputStream ffmpegStdIn;
protected transient Thread ffmpegThread; protected transient Thread ffmpegThread;
private transient Object ffmpegStartMonitor = new Object(); private transient Object ffmpegStartMonitor = new Object();
private transient Queue<Future<byte[]>> downloads = new LinkedList<>(); private BlockingQueue<SegmentDownload> queue = new LinkedBlockingQueue<>();
private transient int lastSegment = 0;
private transient int nextSegment = 0;
public MergedFfmpegHlsDownload(HttpClient client) { public MergedFfmpegHlsDownload(HttpClient client) {
super(client); super(client);
} }
@Override @Override
public void init(Config config, Model model, Instant startTime) { public void init(Config config, Model model, Instant startTime, ExecutorService executorService) {
super.startTime = startTime; super.init(config, model, startTime, executorService);
this.config = config;
this.model = model;
String fileSuffix = config.getSettings().ffmpegFileSuffix; String fileSuffix = config.getSettings().ffmpegFileSuffix;
targetFile = config.getFileForRecording(model, fileSuffix, startTime); targetFile = config.getFileForRecording(model, fileSuffix, startTime);
splittingStrategy = initSplittingStrategy(config.getSettings());
} }
@Override @Override
@ -79,52 +59,46 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
} }
@Override @Override
public void start() throws IOException { protected void onStart() throws IOException {
try { createTargetDirectory();
running = true; startFfmpegProcess(targetFile);
Thread.currentThread().setName("Download " + model.getName()); synchronized (ffmpegStartMonitor) {
super.startTime = Instant.now(); int tries = 0;
while (ffmpegProcess == null && tries++ < 15) {
String segments = getSegmentPlaylistUrl(model); LOG.debug("Waiting for FFmpeg to spawn to record {}", model.getName());
try {
Files.createDirectories(targetFile.getParentFile().toPath());
startFfmpegProcess(targetFile);
synchronized (ffmpegStartMonitor) {
int tries = 0;
while (ffmpegProcess == null && tries++ < 15) {
LOG.debug("Waiting for FFmpeg to spawn to record {}", model.getName());
ffmpegStartMonitor.wait(1000); ffmpegStartMonitor.wait(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} }
} }
}
if (ffmpegProcess == null) { if (ffmpegProcess == null) {
throw new ProcessExitedUncleanException("Couldn't spawn FFmpeg"); throw new ProcessExitedUncleanException("Couldn't spawn FFmpeg");
} else { }
LOG.debug("Starting to download segments");
startDownloadLoop(segments); downloadExecutor.submit(() -> {
ffmpegThread.join(); while (running && !Thread.currentThread().isInterrupted()) {
LOG.debug("FFmpeg thread terminated"); try {
SegmentDownload segmentDownload = queue.poll(5, TimeUnit.SECONDS);
if (segmentDownload != null) {
segmentDownload.call();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} }
} catch (ParseException e) { });
throw new IOException("Couldn't parse stream information", e); }
} catch (PlaylistException e) {
throw new IOException("Couldn't parse HLS playlist", e); @Override
} catch (EOFException e) { protected void onFinish() {
// end of playlist reached try {
LOG.debug("Reached end of playlist for model {}", model); ffmpegThread.join();
} catch (Exception e) { } catch (InterruptedException e) {
throw new IOException("Exception while downloading segments", e); Thread.currentThread().interrupt();
} finally {
internalStop();
downloadThreadPool.shutdown();
try {
LOG.debug("Waiting for last segments for {}", model);
downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
running = false;
LOG.debug("Download for {} terminated", model);
} }
} }
@ -168,179 +142,9 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
return OS.getFFmpegCommand(argsPlusFile); return OS.getFFmpegCommand(argsPlusFile);
} }
protected void startDownloadLoop(String segmentPlaylistUri) throws IOException, ParseException, PlaylistException { @Override
while (running) { protected void execute(SegmentDownload segmentDownload) {
try { queue.add(segmentDownload);
downloadSegments(segmentPlaylistUri);
} catch (HttpException e) {
logHttpException(e);
running = false;
} catch (MalformedURLException e) {
LOG.info("Malformed URL {} - {}", model, segmentPlaylistUri, e);
running = false;
} catch (Exception e) {
LOG.info("Unexpected error while downloading {}", model, e);
running = false;
}
}
ffmpegThread.interrupt();
}
private void downloadSegments(String segmentPlaylistUri) throws IOException, ParseException, PlaylistException, ExecutionException {
SegmentPlaylist lsp = getNextSegments(segmentPlaylistUri);
emptyPlaylistCheck(lsp);
// download new segments
long downloadStart = System.currentTimeMillis();
downloadNewSegments(lsp, nextSegment);
long downloadTookMillis = System.currentTimeMillis() - downloadStart;
// download segments, which might have been skipped
if (nextSegment > 0 && lsp.seq > nextSegment) {
LOG.warn("Missed segments {} < {} in download for {}. Download took {}ms. Playlist is {}sec", nextSegment, lsp.seq, lsp.url,
downloadTookMillis, lsp.totalDuration);
}
splitRecordingIfNecessary();
// wait some time until requesting the segment playlist again to not hammer the server
waitForNewSegments(lsp, lastSegment, downloadTookMillis);
lastSegment = lsp.seq;
nextSegment = lastSegment + lsp.segments.size();
}
private void logHttpException(HttpException e) {
if (e.getResponseCode() == 404) {
LOG.debug("Playlist not found (404). Model {} probably went offline", model);
} else if (e.getResponseCode() == 403) {
LOG.debug("Playlist access forbidden (403). Model {} probably went private or offline", model);
} else {
LOG.info("Unexpected error while downloading {}", model, e);
}
}
protected void splitRecordingIfNecessary() {
if (splittingStrategy.splitNecessary(this)) {
internalStop();
}
}
private void downloadNewSegments(SegmentPlaylist lsp, int nextSegment) throws ExecutionException, IOException {
int skip = nextSegment - lsp.seq;
// add segments to download threadpool
downloads.clear();
if (downloadQueue.remainingCapacity() == 0) {
LOG.warn("Download to slow for this stream. Download queue is full. Skipping segment");
} else {
for (String segment : lsp.segments) {
if (!running) {
break;
}
if (skip > 0) {
skip--;
} else {
URL segmentUrl = new URL(segment);
Future<byte[]> download = downloadThreadPool.submit(new SegmentDownload(lsp, segmentUrl, client));
downloads.add(download);
}
}
}
writeFinishedSegments(downloads);
}
private void writeFinishedSegments(Queue<Future<byte[]>> downloads) throws ExecutionException, IOException {
for (Future<byte[]> downloadFuture : downloads) {
try {
byte[] segmentData = downloadFuture.get(30, TimeUnit.SECONDS);
writeSegment(segmentData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Error while downloading segment", e);
} catch (TimeoutException e) {
LOG.info("Segment download took too long for {}. Not waiting for it any longer", getModel());
} catch (CancellationException e) {
LOG.info("Segment download cancelled");
} catch (ExecutionException e) {
handleExecutionExceptione(e);
}
}
}
private void handleExecutionExceptione(ExecutionException e) throws HttpException, ExecutionException {
Throwable cause = e.getCause();
if (cause instanceof MissingSegmentException) {
if (model != null && !isModelOnline()) {
LOG.debug("Error while downloading segment, because model {} is offline. Stopping now", model.getName());
running = false;
} else {
LOG.debug("Segment not available, but model {} still online. Going on", ofNullable(model).map(Model::getName).orElse("n/a"));
}
} else if (cause instanceof HttpException) {
handleHttpException((HttpException)cause);
} else {
throw e;
}
}
private void handleHttpException(HttpException he) throws HttpException {
if (model != null && !isModelOnline()) {
LOG.debug("Error {} while downloading segment, because model {} is offline. Stopping now", he.getResponseCode(), model.getName());
running = false;
} else {
if (he.getResponseCode() == 404) {
LOG.info("Playlist for {} not found [HTTP 404]. Stopping now", ofNullable(model).map(Model::getName).orElse("n/a"));
running = false;
} else if (he.getResponseCode() == 403) {
LOG.info("Playlist for {} not accessible [HTTP 403]. Stopping now", ofNullable(model).map(Model::getName).orElse("n/a"));
running = false;
} else {
throw he;
}
}
}
protected void writeSegment(byte[] segmentData, int offset, int length) throws IOException {
if (running) {
if (ffmpegStdIn != null) {
ffmpegStdIn.write(segmentData, offset, length);
} else {
LOG.error("FFmpeg stdin stream is null - skipping writing of segment");
}
}
}
private void writeSegment(byte[] segmentData) throws IOException {
writeSegment(segmentData, 0, segmentData.length);
}
private void waitForNewSegments(SegmentPlaylist lsp, int lastSegment, long downloadTookMillis) {
try {
long wait = 0;
if (lastSegment == lsp.seq) {
int timeLeftMillis = (int) (lsp.totalDuration * 1000 - downloadTookMillis);
if (timeLeftMillis < 3000) { // we have less than 3 seconds to get the new playlist and start downloading it
wait = 1;
} else {
// wait a second to be nice to the server (don't hammer it with requests)
// 1 second seems to be a good compromise. every other calculation resulted in more missing segments
wait = 1000;
}
LOG.trace("Playlist didn't change... waiting for {}ms", wait);
} else {
// playlist did change -> wait for at least last segment duration
wait = 1;
LOG.trace("Playlist changed... waiting for {}ms", wait);
}
Thread.sleep(wait);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (running) {
LOG.error("Couldn't sleep between segment downloads. This might mess up the download!");
}
}
} }
@Override @Override
@ -351,23 +155,9 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
} }
@Override @Override
synchronized void internalStop() { protected synchronized void internalStop() {
running = false; running = false;
try {
downloadQueue.clear();
for (Future<?> future : downloads) {
future.cancel(true);
}
downloadThreadPool.shutdownNow();
LOG.debug("Waiting for segment download thread pool to terminate for model {}", getModel());
downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS);
LOG.debug("Segment download thread pool terminated for model {}", getModel());
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for segment pool to shutdown");
Thread.currentThread().interrupt();
}
if (ffmpegStdIn != null) { if (ffmpegStdIn != null) {
try { try {
ffmpegStdIn.close(); ffmpegStdIn.close();
@ -394,62 +184,6 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
} }
} }
private class SegmentDownload implements Callable<byte[]> {
private URL url;
private HttpClient client;
private SegmentPlaylist lsp;
public SegmentDownload(SegmentPlaylist lsp, URL url, HttpClient client) {
this.lsp = lsp;
this.url = url;
this.client = client;
}
@Override
public byte[] call() throws IOException {
LOG.trace("Downloading segment {}", url.getFile());
int maxTries = 3;
for (int i = 1; i <= maxTries && running; i++) {
Builder builder = new Request.Builder().url(url);
addHeaders(builder, ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentHeaders).orElse(new HashMap<>()));
Request request = builder.build();
try (Response response = client.execute(request)) {
if (response.isSuccessful()) {
byte[] segment = response.body().bytes();
BandwidthMeter.add(segment.length);
if (lsp.encrypted) {
segment = new Crypto(lsp.encryptionKeyUrl, client).decrypt(segment);
}
return segment;
} else {
throw new HttpException(response.code(), response.message());
}
} catch (Exception e) {
if (i == maxTries) {
LOG.error("Error while downloading segment. Segment {} finally failed", url.getFile());
} else {
LOG.trace("Error while downloading segment {} on try {}", url.getFile(), i, e);
}
if (model != null && !isModelOnline()) {
break;
}
}
}
throw new MissingSegmentException("Unable to download segment " + url.getFile() + " after " + maxTries + " tries");
}
}
public boolean isModelOnline() {
try {
return model.isOnline(IGNORE_CACHE);
} catch (IOException | ExecutionException e) {
return false;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
@Override @Override
public String getPath(Model model) { public String getPath(Model model) {
String absolutePath = targetFile.getAbsolutePath(); String absolutePath = targetFile.getAbsolutePath();
@ -522,4 +256,14 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
public long getSizeInByte() { public long getSizeInByte() {
return getTarget().length(); return getTarget().length();
} }
@Override
protected void createTargetDirectory() throws IOException {
Files.createDirectories(targetFile.getParentFile().toPath());
}
@Override
protected OutputStream getSegmentOutputStream(String prefix, String fileName) throws IOException {
return ffmpegStdIn;
}
} }

View File

@ -0,0 +1,100 @@
package ctbrec.recorder.download.hls;
import static ctbrec.recorder.download.hls.AbstractHlsDownload2.*;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.URL;
import java.security.InvalidAlgorithmParameterException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.Callable;
import javax.crypto.NoSuchPaddingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ctbrec.Model;
import ctbrec.io.BandwidthMeter;
import ctbrec.io.HttpClient;
import ctbrec.io.HttpException;
import ctbrec.recorder.download.HttpHeaderFactory;
import okhttp3.Request;
import okhttp3.Request.Builder;
import okhttp3.Response;
public class SegmentDownload implements Callable<SegmentDownload> {
private static final Logger LOG = LoggerFactory.getLogger(SegmentDownload.class);
private URL url;
private HttpClient client;
private SegmentPlaylist playlist;
private Model model;
private OutputStream out;
public SegmentDownload(Model model, SegmentPlaylist playlist, URL url, HttpClient client, OutputStream out) {
this.model = model;
this.playlist = playlist;
this.url = url;
this.client = client;
this.out = out;
}
@Override
public SegmentDownload call() {
LOG.trace("Downloading segment {}", url);
for (int tries = 1; tries <= 3 && !Thread.currentThread().isInterrupted(); tries++) {
Request request = createRequest();
try (Response response = client.execute(request)) {
handleResponse(response);
break;
} catch (FileNotFoundException e) {
LOG.debug("Segment does not exist {}", url.getFile());
break;
} catch (InterruptedIOException e) {
break;
} catch (Exception e) {
if (tries == 3) {
LOG.warn("Error while downloading segment for {}. Segment {} finally failed: {}", model, url.getFile(), e.getMessage());
} else {
LOG.debug("Error while downloading segment for {} on try {} - {}", model, tries, e.getMessage());
}
}
}
return this;
}
private boolean handleResponse(Response response) throws InvalidKeyException, NoSuchAlgorithmException, NoSuchPaddingException, InvalidAlgorithmParameterException, IOException {
if (response.isSuccessful()) {
InputStream in = response.body().byteStream();
if (playlist.encrypted) {
in = new Crypto(playlist.encryptionKeyUrl, client).wrap(in);
}
byte[] b = new byte[1024 * 100];
int length = -1;
while ((length = in.read(b)) >= 0 && !Thread.currentThread().isInterrupted()) {
out.write(b, 0, length);
BandwidthMeter.add(length);
}
return true;
} else {
throw new HttpException(response.code(), response.message());
}
}
private Request createRequest() {
Builder builder = new Request.Builder().url(url);
addHeaders(builder, Optional.ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentHeaders).orElse(new HashMap<>()), model);
return builder.build();
}
public OutputStream getOutputStream() {
return out;
}
}

View File

@ -0,0 +1,9 @@
package ctbrec.recorder.download.hls;
public class SegmentDownloadException extends RuntimeException {
public SegmentDownloadException(Exception e) {
super("Segment download failed", e);
}
}

View File

@ -0,0 +1,20 @@
package ctbrec.recorder.download.hls;
import java.util.ArrayList;
import java.util.List;
public class SegmentPlaylist {
public String url;
public int seq = 0;
public float totalDuration = 0;
public float lastSegDuration = 0;
public float targetDuration = 0;
public List<String> segments = new ArrayList<>();
public boolean encrypted = false;
public String encryptionMethod = "AES-128";
public String encryptionKeyUrl;
public SegmentPlaylist(String url) {
this.url = url;
}
}

View File

@ -14,6 +14,7 @@ import com.iheartradio.m3u8.PlaylistException;
import ctbrec.io.HttpClient; import ctbrec.io.HttpClient;
import ctbrec.recorder.download.hls.HlsDownload; import ctbrec.recorder.download.hls.HlsDownload;
import ctbrec.recorder.download.hls.SegmentPlaylist;
public class LiveJasminHlsDownload extends HlsDownload { public class LiveJasminHlsDownload extends HlsDownload {
@ -33,7 +34,7 @@ public class LiveJasminHlsDownload extends HlsDownload {
SegmentPlaylist playlist = super.getNextSegments(segmentUrl); SegmentPlaylist playlist = super.getNextSegments(segmentUrl);
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
if( (now - lastMasterPlaylistUpdate) > TimeUnit.SECONDS.toMillis(60)) { if( (now - lastMasterPlaylistUpdate) > TimeUnit.SECONDS.toMillis(60)) {
super.downloadThreadPool.submit(this::updatePlaylistUrl); super.downloadExecutor.submit(this::updatePlaylistUrl);
lastMasterPlaylistUpdate = now; lastMasterPlaylistUpdate = now;
} }
return playlist; return playlist;

View File

@ -14,6 +14,7 @@ import com.iheartradio.m3u8.PlaylistException;
import ctbrec.io.HttpClient; import ctbrec.io.HttpClient;
import ctbrec.recorder.download.hls.MergedFfmpegHlsDownload; import ctbrec.recorder.download.hls.MergedFfmpegHlsDownload;
import ctbrec.recorder.download.hls.SegmentPlaylist;
public class LiveJasminMergedHlsDownload extends MergedFfmpegHlsDownload { public class LiveJasminMergedHlsDownload extends MergedFfmpegHlsDownload {
@ -33,7 +34,7 @@ public class LiveJasminMergedHlsDownload extends MergedFfmpegHlsDownload {
SegmentPlaylist playlist = super.getNextSegments(segmentUrl); SegmentPlaylist playlist = super.getNextSegments(segmentUrl);
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
if( (now - lastMasterPlaylistUpdate) > TimeUnit.SECONDS.toMillis(60)) { if( (now - lastMasterPlaylistUpdate) > TimeUnit.SECONDS.toMillis(60)) {
super.downloadThreadPool.submit(this::updatePlaylistUrl); super.downloadExecutor.submit(this::updatePlaylistUrl);
lastMasterPlaylistUpdate = now; lastMasterPlaylistUpdate = now;
} }
return playlist; return playlist;

View File

@ -12,9 +12,9 @@ import ctbrec.io.HttpClient;
import ctbrec.recorder.download.hls.HlsDownload; import ctbrec.recorder.download.hls.HlsDownload;
public class MVLiveHlsDownload extends HlsDownload { public class MVLiveHlsDownload extends HlsDownload {
private static final Logger LOG = LoggerFactory.getLogger(MVLiveMergedHlsDownload.class); private static final Logger LOG = LoggerFactory.getLogger(MVLiveHlsDownload.class);
private ScheduledExecutorService scheduler; private transient ScheduledExecutorService scheduler;
public MVLiveHlsDownload(HttpClient client) { public MVLiveHlsDownload(HttpClient client) {
super(client); super(client);
@ -30,7 +30,7 @@ public class MVLiveHlsDownload extends HlsDownload {
t.setPriority(Thread.MIN_PRIORITY); t.setPriority(Thread.MIN_PRIORITY);
return t; return t;
}); });
scheduler.scheduleAtFixedRate(() -> updateCloudFlareCookies(), 120, 120, TimeUnit.SECONDS); scheduler.scheduleAtFixedRate(this::updateCloudFlareCookies, 120, 120, TimeUnit.SECONDS);
updateCloudFlareCookies(); updateCloudFlareCookies();
super.start(); super.start();
} finally { } finally {

View File

@ -1,15 +1,16 @@
package ctbrec.sites.manyvids; package ctbrec.sites.manyvids;
import ctbrec.io.HttpClient;
import ctbrec.recorder.download.hls.MergedFfmpegHlsDownload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ctbrec.io.HttpClient;
import ctbrec.recorder.download.hls.MergedFfmpegHlsDownload;
public class MVLiveMergedHlsDownload extends MergedFfmpegHlsDownload { public class MVLiveMergedHlsDownload extends MergedFfmpegHlsDownload {
private static final Logger LOG = LoggerFactory.getLogger(MVLiveMergedHlsDownload.class); private static final Logger LOG = LoggerFactory.getLogger(MVLiveMergedHlsDownload.class);
@ -35,7 +36,6 @@ public class MVLiveMergedHlsDownload extends MergedFfmpegHlsDownload {
super.start(); super.start();
} finally { } finally {
scheduler.shutdown(); scheduler.shutdown();
} }
} }

View File

@ -1,23 +1,10 @@
package ctbrec.sites.showup; package ctbrec.sites.showup;
import static ctbrec.io.HttpConstants.*;
import java.io.IOException;
import java.io.InputStream;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.iheartradio.m3u8.ParseException;
import com.iheartradio.m3u8.PlaylistException;
import ctbrec.Config;
import ctbrec.io.BandwidthMeter;
import ctbrec.io.HttpClient; import ctbrec.io.HttpClient;
import ctbrec.io.HttpException;
import ctbrec.recorder.download.hls.MergedFfmpegHlsDownload; import ctbrec.recorder.download.hls.MergedFfmpegHlsDownload;
import okhttp3.Request;
import okhttp3.Response;
public class ShowupMergedDownload extends MergedFfmpegHlsDownload { public class ShowupMergedDownload extends MergedFfmpegHlsDownload {
@ -27,48 +14,4 @@ public class ShowupMergedDownload extends MergedFfmpegHlsDownload {
super(client); super(client);
} }
@Override
protected void startDownloadLoop(String segmentPlaylistUri) throws IOException, ParseException, PlaylistException {
try {
SegmentPlaylist lsp = getNextSegments(segmentPlaylistUri);
emptyPlaylistCheck(lsp);
for (String segment : lsp.segments) {
Request request = new Request.Builder().url(segment)
.header(USER_AGENT, Config.getInstance().getSettings().httpUserAgent)
.header(CONNECTION, KEEP_ALIVE)
.build();
try (Response response = client.execute(request)) {
if (response.isSuccessful()) {
InputStream in = response.body().byteStream();
byte[] buffer = new byte[10240];
int length = -1;
boolean keepGoing = true;
while ((length = in.read(buffer)) >= 0 && keepGoing) {
BandwidthMeter.add(length);
writeSegment(buffer, 0, length);
keepGoing = running && !Thread.interrupted() && model.isOnline(true);
splitRecordingIfNecessary();
}
} else {
throw new HttpException(response.code(), response.message());
}
}
}
} catch (HttpException e) {
if (e.getResponseCode() == 404) {
LOG.debug("Playlist not found (404). Model {} probably went offline", model);
} else if (e.getResponseCode() == 403) {
LOG.debug("Playlist access forbidden (403). Model {} probably went private or offline", model);
} else {
LOG.info("Unexpected error while downloading {}", model, e);
}
running = false;
} catch (Exception e) {
LOG.info("Unexpected error while downloading {}", model, e);
running = false;
}
ffmpegThread.interrupt();
}
} }