forked from j62/ctbrec
Fixed error handling for new multi-threading
This commit is contained in:
parent
e3270b6221
commit
7e03b48895
|
@ -120,6 +120,13 @@ public abstract class HttpClient {
|
||||||
return resp;
|
return resp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Response execute(Request request, int timeoutInMillis) throws IOException {
|
||||||
|
return client.newBuilder() //
|
||||||
|
.connectTimeout(timeoutInMillis, TimeUnit.MILLISECONDS) //
|
||||||
|
.readTimeout(timeoutInMillis, TimeUnit.MILLISECONDS).build() //
|
||||||
|
.newCall(request).execute();
|
||||||
|
}
|
||||||
|
|
||||||
public abstract boolean login() throws IOException;
|
public abstract boolean login() throws IOException;
|
||||||
|
|
||||||
public void reconfigure() {
|
public void reconfigure() {
|
||||||
|
|
|
@ -2,6 +2,7 @@ package ctbrec.io;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
import java.nio.file.FileVisitOption;
|
import java.nio.file.FileVisitOption;
|
||||||
import java.nio.file.FileVisitResult;
|
import java.nio.file.FileVisitResult;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
|
@ -59,7 +60,7 @@ public class IoUtils {
|
||||||
|
|
||||||
public static long getDirectorySize(File dir) {
|
public static long getDirectorySize(File dir) {
|
||||||
final long[] size = { 0 };
|
final long[] size = { 0 };
|
||||||
int maxDepth = 1; // Don't expect subdirs, so don't even try
|
int maxDepth = 7;
|
||||||
try {
|
try {
|
||||||
Files.walkFileTree(dir.toPath(), EnumSet.noneOf(FileVisitOption.class), maxDepth, new SimpleFileVisitor<Path>() {
|
Files.walkFileTree(dir.toPath(), EnumSet.noneOf(FileVisitOption.class), maxDepth, new SimpleFileVisitor<Path>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -79,4 +80,14 @@ public class IoUtils {
|
||||||
}
|
}
|
||||||
return size[0];
|
return size[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void close(OutputStream outputStream, String errorMsg) {
|
||||||
|
if (outputStream != null) {
|
||||||
|
try {
|
||||||
|
outputStream.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error(errorMsg, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,9 +68,10 @@ public class NextGenLocalRecorder implements Recorder {
|
||||||
private RecordingPreconditions preconditions;
|
private RecordingPreconditions preconditions;
|
||||||
|
|
||||||
// thread pools for downloads and post-processing
|
// thread pools for downloads and post-processing
|
||||||
private ScheduledExecutorService downloadPool = Executors.newScheduledThreadPool(30, createThreadFactory("Download"));
|
private ScheduledExecutorService downloadPool = Executors.newScheduledThreadPool(10, createThreadFactory("Download"));
|
||||||
private ExecutorService downloadCompletionPool = Executors.newFixedThreadPool(1, createThreadFactory("DownloadCompletionWorker"));
|
private ExecutorService downloadCompletionPool = Executors.newFixedThreadPool(1, createThreadFactory("DownloadCompletionWorker"));
|
||||||
private BlockingQueue<ScheduledFuture<Recording>> downloadFutureQueue = new LinkedBlockingQueue<>();
|
private BlockingQueue<ScheduledFuture<Recording>> downloadFutureQueue = new LinkedBlockingQueue<>();
|
||||||
|
private Map<ScheduledFuture<Recording>, Recording> downloadFutureRecordingMap = Collections.synchronizedMap(new HashMap<>());
|
||||||
|
|
||||||
private BlockingQueue<Runnable> ppQueue = new LinkedBlockingQueue<>();
|
private BlockingQueue<Runnable> ppQueue = new LinkedBlockingQueue<>();
|
||||||
private ThreadPoolExecutor ppPool;
|
private ThreadPoolExecutor ppPool;
|
||||||
|
@ -120,38 +121,36 @@ public class NextGenLocalRecorder implements Recorder {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void startCompletionHandler() {
|
private void startCompletionHandler() {
|
||||||
for (int i = 0; i < 1; i++) {
|
|
||||||
downloadCompletionPool.submit(() -> {
|
downloadCompletionPool.submit(() -> {
|
||||||
while (!Thread.currentThread().isInterrupted()) {
|
while (!Thread.currentThread().isInterrupted()) {
|
||||||
try {
|
try {
|
||||||
ScheduledFuture<Recording> future = downloadFutureQueue.take();
|
ScheduledFuture<Recording> future = downloadFutureQueue.take();
|
||||||
|
rescheduleRecordingTask(future);
|
||||||
|
Thread.sleep(1);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
LOG.error("Error while getting recording result from download queue", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void rescheduleRecordingTask(ScheduledFuture<Recording> future) throws InterruptedException{
|
||||||
|
try {
|
||||||
if (!future.isDone()) {
|
if (!future.isDone()) {
|
||||||
downloadFutureQueue.put(future);
|
downloadFutureQueue.put(future);
|
||||||
} else {
|
} else {
|
||||||
|
downloadFutureRecordingMap.remove(future);
|
||||||
Recording rec = future.get();
|
Recording rec = future.get();
|
||||||
Download d = rec.getDownload();
|
Download d = rec.getDownload();
|
||||||
if (d.isRunning()) {
|
if (d.isRunning()) {
|
||||||
long delay = Math.max(0, Duration.between(Instant.now(), d.getRescheduleTime()).toMillis());
|
long delay = Math.max(0, Duration.between(Instant.now(), d.getRescheduleTime()).toMillis());
|
||||||
// LOG.debug("Download still running. Scheduling to run in {} ms", delay);
|
ScheduledFuture<Recording> rescheduledFuture = downloadPool.schedule(rec, delay, TimeUnit.MILLISECONDS);
|
||||||
downloadFutureQueue.add(downloadPool.schedule(rec, delay, TimeUnit.MILLISECONDS));
|
downloadFutureQueue.add(rescheduledFuture);
|
||||||
} else {
|
} else {
|
||||||
try {
|
d.finalizeDownload();
|
||||||
boolean deleted = deleteIfEmpty(rec);
|
deleteIfEmpty(rec);
|
||||||
setRecordingStatus(rec, deleted ? State.DELETED : State.WAITING);
|
removeRecordingProcess(rec);
|
||||||
if (!deleted) {
|
|
||||||
// only save the status, if the recording has not been deleted, otherwise we recreate the metadata file
|
|
||||||
recordingManager.saveRecording(rec);
|
|
||||||
}
|
|
||||||
} catch (InvalidKeyException | NoSuchAlgorithmException | IOException e) {
|
|
||||||
LOG.error("Couldn't execute post-processing step \"delete if empty\"", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
recorderLock.lock();
|
|
||||||
try {
|
|
||||||
recordingProcesses.remove(rec.getModel());
|
|
||||||
} finally {
|
|
||||||
recorderLock.unlock();
|
|
||||||
}
|
|
||||||
if (rec.getStatus() == State.WAITING) {
|
if (rec.getStatus() == State.WAITING) {
|
||||||
LOG.info("Download finished for {} -> Starting post-processing", rec.getModel().getName());
|
LOG.info("Download finished for {} -> Starting post-processing", rec.getModel().getName());
|
||||||
submitPostProcessingJob(rec);
|
submitPostProcessingJob(rec);
|
||||||
|
@ -164,15 +163,28 @@ public class NextGenLocalRecorder implements Recorder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Thread.sleep(1);
|
|
||||||
} catch (ExecutionException | IllegalStateException e) {
|
} catch (ExecutionException | IllegalStateException e) {
|
||||||
LOG.error("Error while completing recording", e);
|
fail(future, e);
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
LOG.error("Error while completing recording", e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
|
||||||
|
private void fail(ScheduledFuture<Recording> future, Exception e) {
|
||||||
|
if (downloadFutureRecordingMap.containsKey(future)) {
|
||||||
|
Recording rec = downloadFutureRecordingMap.remove(future);
|
||||||
|
removeRecordingProcess(rec);
|
||||||
|
rec.getDownload().finalizeDownload();
|
||||||
|
LOG.error("Error while recording stream for model {}", rec.getModel(), e);
|
||||||
|
} else {
|
||||||
|
LOG.error("Error while recording stream", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void removeRecordingProcess(Recording rec) {
|
||||||
|
recorderLock.lock();
|
||||||
|
try {
|
||||||
|
recordingProcesses.remove(rec.getModel());
|
||||||
|
} finally {
|
||||||
|
recorderLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -245,7 +257,9 @@ public class NextGenLocalRecorder implements Recorder {
|
||||||
setRecordingStatus(rec, State.RECORDING);
|
setRecordingStatus(rec, State.RECORDING);
|
||||||
rec.getModel().setLastRecorded(rec.getStartDate());
|
rec.getModel().setLastRecorded(rec.getStartDate());
|
||||||
recordingManager.saveRecording(rec);
|
recordingManager.saveRecording(rec);
|
||||||
downloadFutureQueue.add(downloadPool.schedule(rec, 0, TimeUnit.MILLISECONDS));
|
ScheduledFuture<Recording> future = downloadPool.schedule(rec, 0, TimeUnit.MILLISECONDS);
|
||||||
|
downloadFutureQueue.add(future);
|
||||||
|
downloadFutureRecordingMap.put(future, rec);
|
||||||
} catch (RecordUntilExpiredException e) {
|
} catch (RecordUntilExpiredException e) {
|
||||||
LOG.info("Precondition not met to record {}: {}", model, e.getLocalizedMessage());
|
LOG.info("Precondition not met to record {}: {}", model, e.getLocalizedMessage());
|
||||||
executeRecordUntilSubsequentAction(model);
|
executeRecordUntilSubsequentAction(model);
|
||||||
|
@ -297,16 +311,25 @@ public class NextGenLocalRecorder implements Recorder {
|
||||||
return rec;
|
return rec;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean deleteIfEmpty(Recording rec) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
|
private boolean deleteIfEmpty(Recording rec) {
|
||||||
|
boolean deleted = false;
|
||||||
|
try {
|
||||||
rec.refresh();
|
rec.refresh();
|
||||||
long sizeInByte = rec.getSizeInByte();
|
long sizeInByte = rec.getSizeInByte();
|
||||||
if (sizeInByte <= 0) {
|
if (sizeInByte <= 0) {
|
||||||
LOG.info("Deleting empty recording {}", rec);
|
LOG.info("Deleting empty recording {}", rec);
|
||||||
delete(rec);
|
delete(rec);
|
||||||
return true;
|
deleted = true;
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
setRecordingStatus(rec, deleted ? State.DELETED : State.WAITING);
|
||||||
|
if (!deleted) {
|
||||||
|
// only save the status, if the recording has not been deleted, otherwise we recreate the metadata file
|
||||||
|
recordingManager.saveRecording(rec);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Couldn't execute post-processing step \"delete if empty\"", e);
|
||||||
|
}
|
||||||
|
return deleted;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -397,18 +420,17 @@ public class NextGenLocalRecorder implements Recorder {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Recording> getRecordings() throws IOException, InvalidKeyException, NoSuchAlgorithmException {
|
public List<Recording> getRecordings() throws IOException {
|
||||||
return recordingManager.getAll();
|
return recordingManager.getAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void delete(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
|
public void delete(Recording recording) throws IOException {
|
||||||
recordingManager.delete(recording);
|
recordingManager.delete(recording);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void shutdown(boolean immediately) {
|
public void shutdown(boolean immediately) {
|
||||||
// TODO add a config flag for waitign or stopping immediately
|
|
||||||
LOG.info("Shutting down");
|
LOG.info("Shutting down");
|
||||||
recording = false;
|
recording = false;
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@ import ctbrec.Recording;
|
||||||
public interface Download extends Serializable, Callable<Download> {
|
public interface Download extends Serializable, Callable<Download> {
|
||||||
void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException;
|
void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException;
|
||||||
void stop();
|
void stop();
|
||||||
|
void finalizeDownload();
|
||||||
boolean isRunning();
|
boolean isRunning();
|
||||||
Model getModel();
|
Model getModel();
|
||||||
Instant getStartTime();
|
Instant getStartTime();
|
||||||
|
|
|
@ -429,4 +429,10 @@ public class DashDownload extends AbstractDownload {
|
||||||
return running;
|
return running;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void finalizeDownload() {
|
||||||
|
// TODO Auto-generated method stub
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,10 +10,12 @@ import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.net.SocketTimeoutException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.text.DecimalFormat;
|
import java.text.DecimalFormat;
|
||||||
import java.text.NumberFormat;
|
import java.text.NumberFormat;
|
||||||
|
import java.time.Duration;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -80,7 +82,8 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
|
||||||
protected transient String segmentPlaylistUrl;
|
protected transient String segmentPlaylistUrl;
|
||||||
|
|
||||||
private transient Instant previousPlaylistRequest = Instant.EPOCH;
|
private transient Instant previousPlaylistRequest = Instant.EPOCH;
|
||||||
private transient Instant lastPlaylistRequest= Instant.EPOCH;
|
private transient Instant afterLastPlaylistRequest= Instant.EPOCH;
|
||||||
|
private transient Instant beforeLastPlaylistRequest= Instant.EPOCH;
|
||||||
|
|
||||||
protected Model model = new UnknownModel();
|
protected Model model = new UnknownModel();
|
||||||
|
|
||||||
|
@ -88,11 +91,9 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
|
||||||
this.client = client;
|
this.client = client;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract void execute(SegmentDownload segmentDownload);
|
|
||||||
protected abstract OutputStream getSegmentOutputStream(String prefix, String fileName) throws IOException;
|
protected abstract OutputStream getSegmentOutputStream(String prefix, String fileName) throws IOException;
|
||||||
protected void segmentDownloadFinished(SegmentDownload segmentDownload) {}
|
protected void segmentDownloadFinished(SegmentDownload segmentDownload) {}
|
||||||
protected abstract void internalStop();
|
protected abstract void internalStop();
|
||||||
protected void finalizeDownload() {}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException {
|
public void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException {
|
||||||
|
@ -110,9 +111,10 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
|
||||||
segmentPlaylistUrl = getSegmentPlaylistUrl(model);
|
segmentPlaylistUrl = getSegmentPlaylistUrl(model);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
previousPlaylistRequest = beforeLastPlaylistRequest;
|
||||||
|
beforeLastPlaylistRequest = Instant.now();
|
||||||
SegmentPlaylist segmentPlaylist = getNextSegments(segmentPlaylistUrl);
|
SegmentPlaylist segmentPlaylist = getNextSegments(segmentPlaylistUrl);
|
||||||
previousPlaylistRequest = lastPlaylistRequest;
|
afterLastPlaylistRequest = Instant.now();
|
||||||
lastPlaylistRequest = Instant.now();
|
|
||||||
emptyPlaylistCheck(segmentPlaylist);
|
emptyPlaylistCheck(segmentPlaylist);
|
||||||
handleMissedSegments(segmentPlaylist, nextSegmentNumber);
|
handleMissedSegments(segmentPlaylist, nextSegmentNumber);
|
||||||
enqueueNewSegments(segmentPlaylist, nextSegmentNumber);
|
enqueueNewSegments(segmentPlaylist, nextSegmentNumber);
|
||||||
|
@ -126,23 +128,53 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
|
||||||
nextSegmentNumber = lastSegmentNumber + segmentPlaylist.segments.size();
|
nextSegmentNumber = lastSegmentNumber + segmentPlaylist.segments.size();
|
||||||
}
|
}
|
||||||
} catch (ParseException e) {
|
} catch (ParseException e) {
|
||||||
|
LOG.error("Couldn't parse HLS playlist for model " + model + "\n" + e.getInput(), e);
|
||||||
running = false;
|
running = false;
|
||||||
throw new IOException("Couldn't parse HLS playlist for model " + model + "\n" + e.getInput(), e);
|
|
||||||
} catch (PlaylistException e) {
|
} catch (PlaylistException e) {
|
||||||
|
LOG.error("Couldn't parse HLS playlist for model " + model, e);
|
||||||
running = false;
|
running = false;
|
||||||
throw new IOException("Couldn't parse HLS playlist for model " + model, e);
|
|
||||||
} catch (EOFException e) {
|
} catch (EOFException e) {
|
||||||
// end of playlist reached
|
// end of playlist reached
|
||||||
LOG.debug("Reached end of playlist for model {}", model);
|
LOG.debug("Reached end of playlist for model {}", model);
|
||||||
|
running = false;
|
||||||
} catch (HttpException e) {
|
} catch (HttpException e) {
|
||||||
handleHttpException(e);
|
handleHttpException(e);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
LOG.error("Couldn't download segment", e);
|
||||||
running = false;
|
running = false;
|
||||||
throw new IOException("Couldn't download segment", e);
|
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void execute(SegmentDownload segmentDownload) {
|
||||||
|
segmentDownload.call();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleHttpException(HttpException e) throws IOException {
|
||||||
|
if (e.getResponseCode() == 404) {
|
||||||
|
checkIfModelIsStillOnline("Playlist not found (404). Model {} probably went offline. Model state: {}");
|
||||||
|
} else if (e.getResponseCode() == 403) {
|
||||||
|
checkIfModelIsStillOnline("Playlist access forbidden (403). Model {} probably went private or offline. Model state: {}");
|
||||||
|
} else {
|
||||||
|
LOG.info("Playlist couldn't not be downloaded for model {}. Stopping recording", model, e);
|
||||||
|
running = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void checkIfModelIsStillOnline(String errorMsg) throws IOException {
|
||||||
|
ctbrec.Model.State modelState;
|
||||||
|
try {
|
||||||
|
modelState = model.getOnlineState(false);
|
||||||
|
if (modelState != State.ONLINE) {
|
||||||
|
running = false;
|
||||||
|
}
|
||||||
|
} catch (ExecutionException e1) {
|
||||||
|
modelState = ctbrec.Model.State.UNKNOWN;
|
||||||
|
}
|
||||||
|
LOG.info(errorMsg, model, modelState);
|
||||||
|
waitSomeTime(TEN_SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
protected String getSegmentPlaylistUrl(Model model) throws IOException, ExecutionException, ParseException, PlaylistException, JAXBException {
|
protected String getSegmentPlaylistUrl(Model model) throws IOException, ExecutionException, ParseException, PlaylistException, JAXBException {
|
||||||
LOG.debug("{} stream idx: {}", model.getName(), model.getStreamUrlIndex());
|
LOG.debug("{} stream idx: {}", model.getName(), model.getStreamUrlIndex());
|
||||||
List<StreamSource> streamSources = model.getStreamSources();
|
List<StreamSource> streamSources = model.getStreamSources();
|
||||||
|
@ -182,7 +214,9 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
|
||||||
addHeaders(builder, Optional.ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentPlaylistHeaders).orElse(new HashMap<>()), model);
|
addHeaders(builder, Optional.ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentPlaylistHeaders).orElse(new HashMap<>()), model);
|
||||||
Request request = builder.build();
|
Request request = builder.build();
|
||||||
|
|
||||||
try (Response response = client.execute(request)) {
|
for (int i = 1; i <= 3; i++) {
|
||||||
|
Instant before = Instant.now();
|
||||||
|
try (Response response = client.execute(request, 1000)) {
|
||||||
if (response.isSuccessful()) {
|
if (response.isSuccessful()) {
|
||||||
String body = response.body().string();
|
String body = response.body().string();
|
||||||
if (!body.contains("#EXTINF")) {
|
if (!body.contains("#EXTINF")) {
|
||||||
|
@ -218,14 +252,26 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
lsp.avgSegDuration = lsp.totalDuration / tracks.size();
|
lsp.avgSegDuration = lsp.totalDuration / tracks.size();
|
||||||
|
Duration d = Duration.between(before, Instant.now());
|
||||||
|
if(i > 1) {
|
||||||
|
LOG.trace("Playlist request took {} ms", d.toMillis());
|
||||||
|
}
|
||||||
return lsp;
|
return lsp;
|
||||||
}
|
}
|
||||||
throw new InvalidPlaylistException("Playlist has no media playlist");
|
throw new InvalidPlaylistException("Playlist has no media playlist");
|
||||||
} else {
|
} else {
|
||||||
throw new HttpException(response.code(), response.message());
|
throw new HttpException(response.code(), response.message());
|
||||||
}
|
}
|
||||||
|
} catch (SocketTimeoutException e) {
|
||||||
|
if (i == 3) {
|
||||||
|
throw e;
|
||||||
|
} else {
|
||||||
|
LOG.trace("Playlist timeout {} for model {}", i, model);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
throw new InvalidPlaylistException("Playlist could not be downloaded in time");
|
||||||
|
}
|
||||||
|
|
||||||
protected void emptyPlaylistCheck(SegmentPlaylist playlist) {
|
protected void emptyPlaylistCheck(SegmentPlaylist playlist) {
|
||||||
if(playlist.segments.isEmpty()) {
|
if(playlist.segments.isEmpty()) {
|
||||||
|
@ -247,8 +293,8 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
|
||||||
private void handleMissedSegments(SegmentPlaylist playlist, int nextSegmentNumber) {
|
private void handleMissedSegments(SegmentPlaylist playlist, int nextSegmentNumber) {
|
||||||
if (nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) {
|
if (nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) {
|
||||||
waitFactor *= 2;
|
waitFactor *= 2;
|
||||||
LOG.warn("Missed segments {} < {} in download for {} - setting wait factor to 1/{}. Last 2 playlist requests at [{}] [{}] schedule was {}", nextSegmentNumber, playlist.seq, model,
|
LOG.warn("Missed segments: {} < {} in download for {} - setting wait factor to 1/{}", nextSegmentNumber, playlist.seq, model, waitFactor);
|
||||||
waitFactor, previousPlaylistRequest, lastPlaylistRequest, rescheduleTime);
|
LOG.warn("Missed segments: Last 2 [{}] [{}], schedule was {}, request took {} ms", previousPlaylistRequest, beforeLastPlaylistRequest, rescheduleTime, Duration.between(beforeLastPlaylistRequest, afterLastPlaylistRequest));
|
||||||
short missedSegments = (short) (playlist.seq - nextSegmentNumber);
|
short missedSegments = (short) (playlist.seq - nextSegmentNumber);
|
||||||
MissedSegmentsStatistics.increase(model, missedSegments);
|
MissedSegmentsStatistics.increase(model, missedSegments);
|
||||||
}
|
}
|
||||||
|
@ -295,6 +341,7 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
|
||||||
OutputStream targetStream = getSegmentOutputStream(prefix, tmp.getName());
|
OutputStream targetStream = getSegmentOutputStream(prefix, tmp.getName());
|
||||||
SegmentDownload segmentDownload = new SegmentDownload(model, playlist, segmentUrl, client, targetStream);
|
SegmentDownload segmentDownload = new SegmentDownload(model, playlist, segmentUrl, client, targetStream);
|
||||||
execute(segmentDownload);
|
execute(segmentDownload);
|
||||||
|
segmentDownloadFinished(segmentDownload);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -310,7 +357,7 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
|
||||||
waitForMillis = (long) (playlist.avgSegDuration * 1000);
|
waitForMillis = (long) (playlist.avgSegDuration * 1000);
|
||||||
LOG.trace("Playlist changed. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis);
|
LOG.trace("Playlist changed. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis);
|
||||||
}
|
}
|
||||||
rescheduleTime = Instant.now().plusMillis(waitForMillis);
|
rescheduleTime = beforeLastPlaylistRequest.plusMillis(waitForMillis);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -329,34 +376,6 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleHttpException(HttpException e) throws IOException {
|
|
||||||
if (e.getResponseCode() == 404) {
|
|
||||||
ctbrec.Model.State modelState;
|
|
||||||
try {
|
|
||||||
modelState = model.getOnlineState(false);
|
|
||||||
} catch (ExecutionException e1) {
|
|
||||||
modelState = ctbrec.Model.State.UNKNOWN;
|
|
||||||
}
|
|
||||||
LOG.info("Playlist not found (404). Model {} probably went offline. Model state: {}", model, modelState);
|
|
||||||
waitSomeTime(TEN_SECONDS);
|
|
||||||
} else if (e.getResponseCode() == 403) {
|
|
||||||
ctbrec.Model.State modelState;
|
|
||||||
try {
|
|
||||||
modelState = model.getOnlineState(false);
|
|
||||||
if (modelState != State.ONLINE) {
|
|
||||||
running = false;
|
|
||||||
}
|
|
||||||
} catch (ExecutionException e1) {
|
|
||||||
modelState = ctbrec.Model.State.UNKNOWN;
|
|
||||||
}
|
|
||||||
LOG.info("Playlist access forbidden (403). Model {} probably went private or offline. Model state: {}", model, modelState);
|
|
||||||
waitSomeTime(TEN_SECONDS);
|
|
||||||
} else {
|
|
||||||
running = false;
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected static void addHeaders(Builder builder, Map<String, String> headers, Model model) {
|
protected static void addHeaders(Builder builder, Map<String, String> headers, Model model) {
|
||||||
headers.putIfAbsent(ACCEPT, "*/*");
|
headers.putIfAbsent(ACCEPT, "*/*");
|
||||||
headers.putIfAbsent(ACCEPT_LANGUAGE, Locale.ENGLISH.getLanguage());
|
headers.putIfAbsent(ACCEPT_LANGUAGE, Locale.ENGLISH.getLanguage());
|
||||||
|
|
|
@ -169,4 +169,10 @@ public class FFmpegDownload extends AbstractHlsDownload2 {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void finalizeDownload() {
|
||||||
|
// TODO Auto-generated method stub
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,6 @@ import java.time.Instant;
|
||||||
import java.time.ZoneId;
|
import java.time.ZoneId;
|
||||||
import java.time.ZonedDateTime;
|
import java.time.ZonedDateTime;
|
||||||
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeFormatter;
|
||||||
import java.util.concurrent.CompletableFuture;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
@ -57,7 +56,7 @@ public class HlsDownload extends AbstractHlsDownload2 {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void finalizeDownload() {
|
public void finalizeDownload() {
|
||||||
LOG.debug("Download for {} terminated", model);
|
LOG.debug("Download for {} terminated", model);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,19 +88,6 @@ public class HlsDownload extends AbstractHlsDownload2 {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void execute(SegmentDownload segmentDownload) {
|
|
||||||
CompletableFuture.supplyAsync(segmentDownload::call, downloadExecutor).whenComplete((result, exception) -> {
|
|
||||||
if (result != null) {
|
|
||||||
try {
|
|
||||||
result.getOutputStream().close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Couldn't close segment file", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop() {
|
public void stop() {
|
||||||
LOG.debug("Recording stopped");
|
LOG.debug("Recording stopped");
|
||||||
|
@ -144,10 +130,6 @@ public class HlsDownload extends AbstractHlsDownload2 {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void segmentDownloadFinished(SegmentDownload segmentDownload) {
|
protected void segmentDownloadFinished(SegmentDownload segmentDownload) {
|
||||||
try {
|
IoUtils.close(segmentDownload.getOutputStream(), "Couldn't close segment file");
|
||||||
segmentDownload.getOutputStream().close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("Couldn't close segment file");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,13 +92,12 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 {
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void finalizeDownload() {
|
public void finalizeDownload() {
|
||||||
try {
|
try {
|
||||||
ffmpegThread.join();
|
ffmpegThread.join();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
super.finalizeDownload();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void startFfmpegProcess(File target) {
|
private void startFfmpegProcess(File target) {
|
||||||
|
|
Loading…
Reference in New Issue