forked from j62/ctbrec
1
0
Fork 0

Refactored multi-threading for downloads

This is a first kind of working version. HlsDownload records, but teh error handling is broken.
This commit is contained in:
0xb00bface 2020-12-25 22:58:12 +01:00
parent 65ffbf525d
commit e3270b6221
15 changed files with 199 additions and 164 deletions

View File

@ -12,6 +12,7 @@ import java.time.format.DateTimeFormatter;
import java.util.HashSet; import java.util.HashSet;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable;
import ctbrec.event.EventBusHolder; import ctbrec.event.EventBusHolder;
import ctbrec.event.RecordingStateChangedEvent; import ctbrec.event.RecordingStateChangedEvent;
@ -19,7 +20,7 @@ import ctbrec.io.IoUtils;
import ctbrec.recorder.download.Download; import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.VideoLengthDetector; import ctbrec.recorder.download.VideoLengthDetector;
public class Recording implements Serializable { public class Recording implements Serializable, Callable<Recording> {
private String id; private String id;
private Model model; private Model model;
private transient Download download; private transient Download download;
@ -60,6 +61,12 @@ public class Recording implements Serializable {
} }
} }
@Override
public Recording call() throws Exception {
download.call();
return this;
}
public String getId() { public String getId() {
return id; return id;
} }

View File

@ -9,6 +9,7 @@ import java.nio.file.FileStore;
import java.nio.file.Files; import java.nio.file.Files;
import java.security.InvalidKeyException; import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.ArrayList; import java.util.ArrayList;
@ -21,15 +22,13 @@ import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -69,10 +68,10 @@ public class NextGenLocalRecorder implements Recorder {
private RecordingPreconditions preconditions; private RecordingPreconditions preconditions;
// thread pools for downloads and post-processing // thread pools for downloads and post-processing
private BlockingQueue<Runnable> downloadQueue = new SynchronousQueue<>(); private ScheduledExecutorService downloadPool = Executors.newScheduledThreadPool(30, createThreadFactory("Download"));
private ThreadPoolExecutor downloadPool = new ThreadPoolExecutor(2, 100, 5, TimeUnit.MINUTES, downloadQueue, createThreadFactory("Download")); private ExecutorService downloadCompletionPool = Executors.newFixedThreadPool(1, createThreadFactory("DownloadCompletionWorker"));
private BlockingQueue<ScheduledFuture<Recording>> downloadFutureQueue = new LinkedBlockingQueue<>();
private ExecutorCompletionService<Recording> completionService = new ExecutorCompletionService<>(downloadPool);
private BlockingQueue<Runnable> ppQueue = new LinkedBlockingQueue<>(); private BlockingQueue<Runnable> ppQueue = new LinkedBlockingQueue<>();
private ThreadPoolExecutor ppPool; private ThreadPoolExecutor ppPool;
@ -121,38 +120,60 @@ public class NextGenLocalRecorder implements Recorder {
} }
private void startCompletionHandler() { private void startCompletionHandler() {
Thread completionHandler = new Thread(() -> { for (int i = 0; i < 1; i++) {
while (!Thread.interrupted()) { downloadCompletionPool.submit(() -> {
try { while (!Thread.currentThread().isInterrupted()) {
Future<Recording> result = completionService.take();
Recording rec = result.get();
recorderLock.lock();
try { try {
recordingProcesses.remove(rec.getModel()); ScheduledFuture<Recording> future = downloadFutureQueue.take();
} finally { if (!future.isDone()) {
recorderLock.unlock(); downloadFutureQueue.put(future);
} } else {
if (rec.getStatus() == State.WAITING) { Recording rec = future.get();
LOG.info("Download finished for {} -> Starting post-processing", rec.getModel().getName()); Download d = rec.getDownload();
submitPostProcessingJob(rec); if (d.isRunning()) {
long delay = Math.max(0, Duration.between(Instant.now(), d.getRescheduleTime()).toMillis());
// LOG.debug("Download still running. Scheduling to run in {} ms", delay);
downloadFutureQueue.add(downloadPool.schedule(rec, delay, TimeUnit.MILLISECONDS));
} else {
try {
boolean deleted = deleteIfEmpty(rec);
setRecordingStatus(rec, deleted ? State.DELETED : State.WAITING);
if (!deleted) {
// only save the status, if the recording has not been deleted, otherwise we recreate the metadata file
recordingManager.saveRecording(rec);
}
} catch (InvalidKeyException | NoSuchAlgorithmException | IOException e) {
LOG.error("Couldn't execute post-processing step \"delete if empty\"", e);
}
// check, if we have to restart the recording recorderLock.lock();
Model model = rec.getModel(); try {
tryRestartRecording(model); recordingProcesses.remove(rec.getModel());
} else { } finally {
setRecordingStatus(rec, State.FAILED); recorderLock.unlock();
}
if (rec.getStatus() == State.WAITING) {
LOG.info("Download finished for {} -> Starting post-processing", rec.getModel().getName());
submitPostProcessingJob(rec);
// check, if we have to restart the recording
Model model = rec.getModel();
tryRestartRecording(model);
} else {
setRecordingStatus(rec, State.FAILED);
}
}
}
Thread.sleep(1);
} catch (ExecutionException | IllegalStateException e) {
LOG.error("Error while completing recording", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Error while completing recording", e);
} }
} catch (ExecutionException | IllegalStateException e) {
LOG.error("Error while completing recording", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Error while completing recording", e);
} }
} });
}); }
completionHandler.setName("CompletionHandler");
completionHandler.setDaemon(true);
completionHandler.start();
} }
private void submitPostProcessingJob(Recording recording) { private void submitPostProcessingJob(Recording recording) {
@ -214,26 +235,28 @@ public class NextGenLocalRecorder implements Recorder {
} }
} }
private void startRecordingProcess(Model model) throws IOException { private void startRecordingProcess(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
recorderLock.lock(); recorderLock.lock();
try { try {
preconditions.check(model); preconditions.check(model);
LOG.info("Starting recording for model {}", model.getName()); LOG.info("Starting recording for model {}", model.getName());
Download download = createDownload(model); Download download = createDownload(model);
Recording rec = createRecording(download); Recording rec = createRecording(download);
completionService.submit(createDownloadJob(rec)); setRecordingStatus(rec, State.RECORDING);
rec.getModel().setLastRecorded(rec.getStartDate());
recordingManager.saveRecording(rec);
downloadFutureQueue.add(downloadPool.schedule(rec, 0, TimeUnit.MILLISECONDS));
} catch (RecordUntilExpiredException e) { } catch (RecordUntilExpiredException e) {
LOG.info("Precondition not met to record {}: {}", model, e.getLocalizedMessage()); LOG.info("Precondition not met to record {}: {}", model, e.getLocalizedMessage());
executeRecordUntilSubsequentAction(model); executeRecordUntilSubsequentAction(model);
} catch (PreconditionNotMetException e) { } catch (PreconditionNotMetException e) {
LOG.info("Precondition not met to record {}: {}", model, e.getLocalizedMessage()); LOG.info("Precondition not met to record {}: {}", model, e.getLocalizedMessage());
return;
} finally { } finally {
recorderLock.unlock(); recorderLock.unlock();
} }
} }
private Download createDownload(Model model) { private Download createDownload(Model model) throws IOException {
Download download = model.createDownload(); Download download = model.createDownload();
download.init(config, model, Instant.now(), downloadPool); download.init(config, model, Instant.now(), downloadPool);
Objects.requireNonNull(download.getStartTime(), Objects.requireNonNull(download.getStartTime(),
@ -242,26 +265,6 @@ public class NextGenLocalRecorder implements Recorder {
return download; return download;
} }
private Callable<Recording> createDownloadJob(Recording rec) {
return () -> {
try {
setRecordingStatus(rec, State.RECORDING);
rec.getModel().setLastRecorded(rec.getStartDate());
recordingManager.saveRecording(rec);
rec.getDownload().start();
} catch (Exception e) {
LOG.error("Download for {} failed. Download state: {}", rec.getModel().getName(), rec.getStatus(), e);
}
boolean deleted = deleteIfEmpty(rec);
setRecordingStatus(rec, deleted ? State.DELETED : State.WAITING);
if (!deleted) {
// only save the status, if the recording has not been deleted, otherwise we recreate the metadata file
recordingManager.saveRecording(rec);
}
return rec;
};
}
private void executeRecordUntilSubsequentAction(Model model) throws IOException { private void executeRecordUntilSubsequentAction(Model model) throws IOException {
if (model.getRecordUntilSubsequentAction() == PAUSE) { if (model.getRecordUntilSubsequentAction() == PAUSE) {
model.setSuspended(true); model.setSuspended(true);
@ -283,7 +286,7 @@ public class NextGenLocalRecorder implements Recorder {
Recording rec = new Recording(); Recording rec = new Recording();
rec.setId(UUID.randomUUID().toString()); rec.setId(UUID.randomUUID().toString());
rec.setDownload(download); rec.setDownload(download);
String recordingFile = download.getPath(model).replaceAll("\\\\", "/"); String recordingFile = download.getPath(model).replace('\\', '/');
File absoluteFile = new File(config.getSettings().recordingsDir, recordingFile); File absoluteFile = new File(config.getSettings().recordingsDir, recordingFile);
rec.setAbsoluteFile(absoluteFile); rec.setAbsoluteFile(absoluteFile);
rec.setModel(model); rec.setModel(model);
@ -489,7 +492,7 @@ public class NextGenLocalRecorder implements Recorder {
} }
@Override @Override
public void resumeRecording(Model model) throws IOException { public void resumeRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
recorderLock.lock(); recorderLock.lock();
try { try {
if (models.contains(model)) { if (models.contains(model)) {
@ -615,16 +618,17 @@ public class NextGenLocalRecorder implements Recorder {
Thread t = new Thread(r); Thread t = new Thread(r);
t.setName(name + " " + UUID.randomUUID().toString().substring(0, 8)); t.setName(name + " " + UUID.randomUUID().toString().substring(0, 8));
t.setDaemon(true); t.setDaemon(true);
t.setPriority(Thread.MAX_PRIORITY);
return t; return t;
}; };
} }
@Override @Override
public void rerunPostProcessing(Recording recording) { public void rerunPostProcessing(Recording recording) throws IOException {
recording.setPostProcessedFile(null); recording.setPostProcessedFile(null);
List<Recording> recordings = recordingManager.getAll(); List<Recording> recordings = recordingManager.getAll();
for (Recording other : recordings) { for (Recording other : recordings) {
if(other.equals(recording)) { if (other.equals(recording)) {
Download download = other.getModel().createDownload(); Download download = other.getModel().createDownload();
download.init(Config.getInstance(), other.getModel(), other.getStartDate(), downloadPool); download.init(Config.getInstance(), other.getModel(), other.getStartDate(), downloadPool);
other.setDownload(download); other.setDownload(download);

View File

@ -5,9 +5,15 @@ import java.time.Instant;
public abstract class AbstractDownload implements Download { public abstract class AbstractDownload implements Download {
protected Instant startTime; protected Instant startTime;
protected Instant rescheduleTime = Instant.now();
@Override @Override
public Instant getStartTime() { public Instant getStartTime() {
return startTime; return startTime;
} }
@Override
public Instant getRescheduleTime() {
return rescheduleTime;
}
} }

View File

@ -4,19 +4,21 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.time.Instant; import java.time.Instant;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import ctbrec.Config; import ctbrec.Config;
import ctbrec.Model; import ctbrec.Model;
import ctbrec.Recording; import ctbrec.Recording;
public interface Download extends Serializable { public interface Download extends Serializable, Callable<Download> {
public void init(Config config, Model model, Instant startTime, ExecutorService executorService); void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException;
public void start() throws IOException; void stop();
public void stop(); boolean isRunning();
public Model getModel(); Model getModel();
public Instant getStartTime(); Instant getStartTime();
public void postprocess(Recording recording); Instant getRescheduleTime();
void postprocess(Recording recording);
/** /**
* Returns the path to the recording in the filesystem as file object * Returns the path to the recording in the filesystem as file object

View File

@ -240,7 +240,7 @@ public class DashDownload extends AbstractDownload {
} }
@Override @Override
public void start() throws IOException { public DashDownload call() throws IOException {
try { try {
Thread.currentThread().setName("Download " + model.getName()); Thread.currentThread().setName("Download " + model.getName());
running = true; running = true;
@ -275,6 +275,7 @@ public class DashDownload extends AbstractDownload {
downloadFinished.notifyAll(); downloadFinished.notifyAll();
} }
} }
return this;
} }
private boolean splitRecording() { private boolean splitRecording() {
@ -423,4 +424,9 @@ public class DashDownload extends AbstractDownload {
return IoUtils.getDirectorySize(downloadDir.toFile()); return IoUtils.getDirectorySize(downloadDir.toFile());
} }
@Override
public boolean isRunning() {
return running;
}
} }

View File

@ -44,11 +44,13 @@ import com.iheartradio.m3u8.data.TrackData;
import ctbrec.Config; import ctbrec.Config;
import ctbrec.Model; import ctbrec.Model;
import ctbrec.Model.State;
import ctbrec.Settings; import ctbrec.Settings;
import ctbrec.UnknownModel; import ctbrec.UnknownModel;
import ctbrec.io.BandwidthMeter; import ctbrec.io.BandwidthMeter;
import ctbrec.io.HttpClient; import ctbrec.io.HttpClient;
import ctbrec.io.HttpException; import ctbrec.io.HttpException;
import ctbrec.io.MissedSegmentsStatistics;
import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException; import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException;
import ctbrec.recorder.download.AbstractDownload; import ctbrec.recorder.download.AbstractDownload;
import ctbrec.recorder.download.HttpHeaderFactory; import ctbrec.recorder.download.HttpHeaderFactory;
@ -67,12 +69,18 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
private transient NumberFormat nf = new DecimalFormat("000000"); private transient NumberFormat nf = new DecimalFormat("000000");
private transient int playlistEmptyCount = 0; private transient int playlistEmptyCount = 0;
private transient int segmentCounter = 1; private transient int segmentCounter = 1;
private transient int waitFactor = 1; private transient int waitFactor = 2;
protected transient Config config; protected transient Config config;
protected transient HttpClient client; protected transient HttpClient client;
protected transient ExecutorService downloadExecutor; protected transient ExecutorService downloadExecutor;
protected transient volatile boolean running = false; protected transient volatile boolean running = true;
protected transient SplittingStrategy splittingStrategy; protected transient SplittingStrategy splittingStrategy;
protected transient int lastSegmentNumber = 0;
protected transient int nextSegmentNumber = 0;
protected transient String segmentPlaylistUrl;
private transient Instant previousPlaylistRequest = Instant.EPOCH;
private transient Instant lastPlaylistRequest= Instant.EPOCH;
protected Model model = new UnknownModel(); protected Model model = new UnknownModel();
@ -80,17 +88,14 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
this.client = client; this.client = client;
} }
protected void onStart() throws IOException {}
protected abstract void createTargetDirectory() throws IOException;
protected abstract void execute(SegmentDownload segmentDownload); protected abstract void execute(SegmentDownload segmentDownload);
protected abstract OutputStream getSegmentOutputStream(String prefix, String fileName) throws IOException; protected abstract OutputStream getSegmentOutputStream(String prefix, String fileName) throws IOException;
protected void segmentDownloadFinished(SegmentDownload segmentDownload) {} protected void segmentDownloadFinished(SegmentDownload segmentDownload) {}
protected abstract void internalStop(); protected abstract void internalStop();
protected void onFinish() {}
protected void finalizeDownload() {} protected void finalizeDownload() {}
@Override @Override
public void init(Config config, Model model, Instant startTime, ExecutorService executorService) { public void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException {
this.config = config; this.config = config;
this.model = model; this.model = model;
this.startTime = startTime; this.startTime = startTime;
@ -99,33 +104,32 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
} }
@Override @Override
public void start() throws IOException { public AbstractHlsDownload2 call() throws Exception {
running = true;
try { try {
onStart(); if (segmentPlaylistUrl == null) {
String segmentPlaylistUrl = getSegmentPlaylistUrl(model); segmentPlaylistUrl = getSegmentPlaylistUrl(model);
createTargetDirectory(); }
int lastSegmentNumber = 0;
int nextSegmentNumber = 0; SegmentPlaylist segmentPlaylist = getNextSegments(segmentPlaylistUrl);
while (running) { previousPlaylistRequest = lastPlaylistRequest;
SegmentPlaylist segmentPlaylist = getNextSegments(segmentPlaylistUrl); lastPlaylistRequest = Instant.now();
emptyPlaylistCheck(segmentPlaylist); emptyPlaylistCheck(segmentPlaylist);
handleMissedSegments(segmentPlaylist, nextSegmentNumber); handleMissedSegments(segmentPlaylist, nextSegmentNumber);
enqueueNewSegments(segmentPlaylist, nextSegmentNumber); enqueueNewSegments(segmentPlaylist, nextSegmentNumber);
splitRecordingIfNecessary(); splitRecordingIfNecessary();
waitSomeTime(segmentPlaylist, lastSegmentNumber, waitFactor); calculateRescheduleTime(segmentPlaylist, lastSegmentNumber, waitFactor);
// this if check makes sure, that we don't decrease nextSegment. for some reason // this if check makes sure, that we don't decrease nextSegment. for some reason
// streamate playlists sometimes jump back. e.g. max sequence = 79 -> 80 -> 79 // streamate playlists sometimes jump back. e.g. max sequence = 79 -> 80 -> 79
lastSegmentNumber = segmentPlaylist.seq; lastSegmentNumber = segmentPlaylist.seq;
if (lastSegmentNumber + segmentPlaylist.segments.size() > nextSegmentNumber) { if (lastSegmentNumber + segmentPlaylist.segments.size() > nextSegmentNumber) {
nextSegmentNumber = lastSegmentNumber + segmentPlaylist.segments.size(); nextSegmentNumber = lastSegmentNumber + segmentPlaylist.segments.size();
}
} }
onFinish();
} catch (ParseException e) { } catch (ParseException e) {
running = false;
throw new IOException("Couldn't parse HLS playlist for model " + model + "\n" + e.getInput(), e); throw new IOException("Couldn't parse HLS playlist for model " + model + "\n" + e.getInput(), e);
} catch (PlaylistException e) { } catch (PlaylistException e) {
running = false;
throw new IOException("Couldn't parse HLS playlist for model " + model, e); throw new IOException("Couldn't parse HLS playlist for model " + model, e);
} catch (EOFException e) { } catch (EOFException e) {
// end of playlist reached // end of playlist reached
@ -133,11 +137,10 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
} catch (HttpException e) { } catch (HttpException e) {
handleHttpException(e); handleHttpException(e);
} catch (Exception e) { } catch (Exception e) {
throw new IOException("Couldn't download segment", e);
} finally {
finalizeDownload();
running = false; running = false;
throw new IOException("Couldn't download segment", e);
} }
return this;
} }
protected String getSegmentPlaylistUrl(Model model) throws IOException, ExecutionException, ParseException, PlaylistException, JAXBException { protected String getSegmentPlaylistUrl(Model model) throws IOException, ExecutionException, ParseException, PlaylistException, JAXBException {
@ -197,6 +200,7 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
SegmentPlaylist lsp = new SegmentPlaylist(segmentsURL); SegmentPlaylist lsp = new SegmentPlaylist(segmentsURL);
lsp.seq = mediaPlaylist.getMediaSequenceNumber(); lsp.seq = mediaPlaylist.getMediaSequenceNumber();
lsp.targetDuration = mediaPlaylist.getTargetDuration(); lsp.targetDuration = mediaPlaylist.getTargetDuration();
List<TrackData> tracks = mediaPlaylist.getTracks(); List<TrackData> tracks = mediaPlaylist.getTracks();
for (TrackData trackData : tracks) { for (TrackData trackData : tracks) {
String uri = trackData.getUri(); String uri = trackData.getUri();
@ -205,7 +209,6 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
uri = new URL(context, uri).toExternalForm(); uri = new URL(context, uri).toExternalForm();
} }
lsp.totalDuration += trackData.getTrackInfo().duration; lsp.totalDuration += trackData.getTrackInfo().duration;
lsp.lastSegDuration = trackData.getTrackInfo().duration;
lsp.segments.add(uri); lsp.segments.add(uri);
if (trackData.hasEncryptionData()) { if (trackData.hasEncryptionData()) {
lsp.encrypted = true; lsp.encrypted = true;
@ -214,6 +217,7 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
lsp.encryptionMethod = data.getMethod().getValue(); lsp.encryptionMethod = data.getMethod().getValue();
} }
} }
lsp.avgSegDuration = lsp.totalDuration / tracks.size();
return lsp; return lsp;
} }
throw new InvalidPlaylistException("Playlist has no media playlist"); throw new InvalidPlaylistException("Playlist has no media playlist");
@ -243,8 +247,10 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
private void handleMissedSegments(SegmentPlaylist playlist, int nextSegmentNumber) { private void handleMissedSegments(SegmentPlaylist playlist, int nextSegmentNumber) {
if (nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) { if (nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) {
waitFactor *= 2; waitFactor *= 2;
LOG.warn("Missed segments {} < {} in download for {} - setting wait factor to 1/{}", nextSegmentNumber, playlist.seq, model, LOG.warn("Missed segments {} < {} in download for {} - setting wait factor to 1/{}. Last 2 playlist requests at [{}] [{}] schedule was {}", nextSegmentNumber, playlist.seq, model,
waitFactor); waitFactor, previousPlaylistRequest, lastPlaylistRequest, rescheduleTime);
short missedSegments = (short) (playlist.seq - nextSegmentNumber);
MissedSegmentsStatistics.increase(model, missedSegments);
} }
} }
@ -293,19 +299,18 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
} }
} }
private void waitSomeTime(SegmentPlaylist playlist, int lastSegmentNumber, int waitFactor) { private void calculateRescheduleTime(SegmentPlaylist playlist, int lastSegmentNumber, int waitFactor) {
long waitForMillis = 0; long waitForMillis = 0;
if (lastSegmentNumber == playlist.seq) { if (lastSegmentNumber == playlist.seq) {
// playlist didn't change -> wait for at least half the target duration // playlist didn't change -> wait for at least half the target duration
waitForMillis = (long) playlist.targetDuration * 1000 / waitFactor; waitForMillis = (long) playlist.avgSegDuration * 1000 / waitFactor;
LOG.trace("Playlist didn't change... waiting for {}ms", waitForMillis); LOG.trace("Playlist didn't change. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis);
} else { } else {
// playlist did change -> wait for at least last segment duration // playlist did change -> wait for at least the target duration
waitForMillis = 1; waitForMillis = (long) (playlist.avgSegDuration * 1000);
LOG.trace("Playlist changed... waiting for {}ms", waitForMillis); LOG.trace("Playlist changed. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis);
} }
rescheduleTime = Instant.now().plusMillis(waitForMillis);
waitSomeTime(waitForMillis);
} }
/** /**
@ -338,12 +343,16 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
ctbrec.Model.State modelState; ctbrec.Model.State modelState;
try { try {
modelState = model.getOnlineState(false); modelState = model.getOnlineState(false);
if (modelState != State.ONLINE) {
running = false;
}
} catch (ExecutionException e1) { } catch (ExecutionException e1) {
modelState = ctbrec.Model.State.UNKNOWN; modelState = ctbrec.Model.State.UNKNOWN;
} }
LOG.info("Playlist access forbidden (403). Model {} probably went private or offline. Model state: {}", model, modelState); LOG.info("Playlist access forbidden (403). Model {} probably went private or offline. Model state: {}", model, modelState);
waitSomeTime(TEN_SECONDS); waitSomeTime(TEN_SECONDS);
} else { } else {
running = false;
throw e; throw e;
} }
} }
@ -365,4 +374,9 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload {
public Model getModel() { public Model getModel() {
return model; return model;
} }
@Override
public boolean isRunning() {
return running;
}
} }

View File

@ -32,7 +32,7 @@ import ctbrec.recorder.download.ProcessExitedUncleanException;
* Does the whole HLS download with FFmpeg. Not used at the moment, because FFMpeg can't * Does the whole HLS download with FFmpeg. Not used at the moment, because FFMpeg can't
* handle the HLS encryption of Flirt4Free correctly * handle the HLS encryption of Flirt4Free correctly
*/ */
public class FFmpegDownload extends AbstractHlsDownload { public class FFmpegDownload extends AbstractHlsDownload2 {
private static final transient Logger LOG = LoggerFactory.getLogger(FFmpegDownload.class); private static final transient Logger LOG = LoggerFactory.getLogger(FFmpegDownload.class);
private transient Config config; private transient Config config;
@ -53,7 +53,7 @@ public class FFmpegDownload extends AbstractHlsDownload {
} }
@Override @Override
public void start() throws IOException { public FFmpegDownload call() throws IOException {
try { try {
Files.createDirectories(targetFile.getParentFile().toPath()); Files.createDirectories(targetFile.getParentFile().toPath());
String chunkPlaylist = getSegmentPlaylistUrl(model); String chunkPlaylist = getSegmentPlaylistUrl(model);
@ -97,6 +97,7 @@ public class FFmpegDownload extends AbstractHlsDownload {
} catch (ExecutionException | ParseException | PlaylistException | JAXBException e) { } catch (ExecutionException | ParseException | PlaylistException | JAXBException e) {
LOG.error("Couldn't start FFmpeg process for stream download", e); LOG.error("Couldn't start FFmpeg process for stream download", e);
} }
return this;
} }
@Override @Override
@ -137,7 +138,7 @@ public class FFmpegDownload extends AbstractHlsDownload {
} }
@Override @Override
void internalStop() { protected void internalStop() {
stop(); stop();
} }
@ -151,4 +152,21 @@ public class FFmpegDownload extends AbstractHlsDownload {
return getTarget().length(); return getTarget().length();
} }
protected void createTargetDirectory() throws IOException {
// TODO Auto-generated method stub
}
@Override
protected void execute(SegmentDownload segmentDownload) {
// TODO Auto-generated method stub
}
@Override
protected OutputStream getSegmentOutputStream(String prefix, String fileName) throws IOException {
// TODO Auto-generated method stub
return null;
}
} }

View File

@ -14,8 +14,6 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -37,22 +35,21 @@ public class HlsDownload extends AbstractHlsDownload2 {
private static final Logger LOG = LoggerFactory.getLogger(HlsDownload.class); private static final Logger LOG = LoggerFactory.getLogger(HlsDownload.class);
protected transient Path downloadDir; protected transient Path downloadDir;
private transient AtomicBoolean downloadFinished = new AtomicBoolean(false);
public HlsDownload(HttpClient client) { public HlsDownload(HttpClient client) {
super(client); super(client);
} }
@Override @Override
public void init(Config config, Model model, Instant startTime, ExecutorService executorService) { public void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException {
super.init(config, model, startTime, executorService); super.init(config, model, startTime, executorService);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.RECORDING_DATE_FORMAT); DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.RECORDING_DATE_FORMAT);
String formattedStartTime = formatter.format(ZonedDateTime.ofInstant(this.startTime, ZoneId.systemDefault())); String formattedStartTime = formatter.format(ZonedDateTime.ofInstant(this.startTime, ZoneId.systemDefault()));
Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getSanitizedNamed()); Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getSanitizedNamed());
downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), formattedStartTime); downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), formattedStartTime);
createTargetDirectory();
} }
@Override
protected void createTargetDirectory() throws IOException { protected void createTargetDirectory() throws IOException {
if (!downloadDir.toFile().exists()) { if (!downloadDir.toFile().exists()) {
Files.createDirectories(downloadDir); Files.createDirectories(downloadDir);
@ -61,10 +58,6 @@ public class HlsDownload extends AbstractHlsDownload2 {
@Override @Override
protected void finalizeDownload() { protected void finalizeDownload() {
downloadFinished.set(true);
synchronized (downloadFinished) {
downloadFinished.notifyAll();
}
LOG.debug("Download for {} terminated", model); LOG.debug("Download for {} terminated", model);
} }
@ -98,7 +91,7 @@ public class HlsDownload extends AbstractHlsDownload2 {
@Override @Override
protected void execute(SegmentDownload segmentDownload) { protected void execute(SegmentDownload segmentDownload) {
CompletableFuture.supplyAsync(segmentDownload::call).whenComplete((result, exception) -> { CompletableFuture.supplyAsync(segmentDownload::call, downloadExecutor).whenComplete((result, exception) -> {
if (result != null) { if (result != null) {
try { try {
result.getOutputStream().close(); result.getOutputStream().close();
@ -111,18 +104,7 @@ public class HlsDownload extends AbstractHlsDownload2 {
@Override @Override
public void stop() { public void stop() {
if (running) { LOG.debug("Recording stopped");
try {
synchronized (downloadFinished) {
while (!downloadFinished.get()) {
downloadFinished.wait(TimeUnit.SECONDS.toMillis(60));
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Couldn't wait for download to finish", e);
}
}
internalStop(); internalStop();
} }

View File

@ -47,19 +47,11 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 {
} }
@Override @Override
public void init(Config config, Model model, Instant startTime, ExecutorService executorService) { public void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException {
super.init(config, model, startTime, executorService); super.init(config, model, startTime, executorService);
String fileSuffix = config.getSettings().ffmpegFileSuffix; String fileSuffix = config.getSettings().ffmpegFileSuffix;
targetFile = config.getFileForRecording(model, fileSuffix, startTime); targetFile = config.getFileForRecording(model, fileSuffix, startTime);
}
@Override
public File getTarget() {
return targetFile;
}
@Override
protected void onStart() throws IOException {
createTargetDirectory(); createTargetDirectory();
startFfmpegProcess(targetFile); startFfmpegProcess(targetFile);
synchronized (ffmpegStartMonitor) { synchronized (ffmpegStartMonitor) {
@ -94,12 +86,19 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 {
} }
@Override @Override
protected void onFinish() { public File getTarget() {
return targetFile;
}
@Override
protected void finalizeDownload() {
try { try {
ffmpegThread.join(); ffmpegThread.join();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
super.finalizeDownload();
} }
private void startFfmpegProcess(File target) { private void startFfmpegProcess(File target) {
@ -257,7 +256,6 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 {
return getTarget().length(); return getTarget().length();
} }
@Override
protected void createTargetDirectory() throws IOException { protected void createTargetDirectory() throws IOException {
Files.createDirectories(targetFile.getParentFile().toPath()); Files.createDirectories(targetFile.getParentFile().toPath());
} }

View File

@ -48,7 +48,6 @@ public class SegmentDownload implements Callable<SegmentDownload> {
@Override @Override
public SegmentDownload call() { public SegmentDownload call() {
LOG.trace("Downloading segment {}", url);
for (int tries = 1; tries <= 3 && !Thread.currentThread().isInterrupted(); tries++) { for (int tries = 1; tries <= 3 && !Thread.currentThread().isInterrupted(); tries++) {
Request request = createRequest(); Request request = createRequest();
try (Response response = client.execute(request)) { try (Response response = client.execute(request)) {

View File

@ -7,7 +7,7 @@ public class SegmentPlaylist {
public String url; public String url;
public int seq = 0; public int seq = 0;
public float totalDuration = 0; public float totalDuration = 0;
public float lastSegDuration = 0; public float avgSegDuration = 0;
public float targetDuration = 0; public float targetDuration = 0;
public List<String> segments = new ArrayList<>(); public List<String> segments = new ArrayList<>();
public boolean encrypted = false; public boolean encrypted = false;

View File

@ -1,7 +1,5 @@
package ctbrec.sites.fc2live; package ctbrec.sites.fc2live;
import java.io.IOException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -17,16 +15,17 @@ public class Fc2HlsDownload extends HlsDownload {
} }
@Override @Override
public void start() throws IOException { public Fc2HlsDownload call() throws Exception {
Fc2Model fc2Model = (Fc2Model) model; Fc2Model fc2Model = (Fc2Model) model;
try { try {
fc2Model.openWebsocket(); fc2Model.openWebsocket();
super.start(); super.call();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
LOG.error("Couldn't start download for {}", model, e); LOG.error("Couldn't start download for {}", model, e);
} finally { } finally {
fc2Model.closeWebsocket(); fc2Model.closeWebsocket();
} }
return this;
} }
} }

View File

@ -1,7 +1,5 @@
package ctbrec.sites.fc2live; package ctbrec.sites.fc2live;
import java.io.IOException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -17,16 +15,17 @@ public class Fc2MergedHlsDownload extends MergedFfmpegHlsDownload {
} }
@Override @Override
public void start() throws IOException { public Fc2MergedHlsDownload call() throws Exception {
Fc2Model fc2Model = (Fc2Model) model; Fc2Model fc2Model = (Fc2Model) model;
try { try {
fc2Model.openWebsocket(); fc2Model.openWebsocket();
super.start(); super.call();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
LOG.error("Couldn't start download for {}", model, e); LOG.error("Couldn't start download for {}", model, e);
} finally { } finally {
fc2Model.closeWebsocket(); fc2Model.closeWebsocket();
} }
return this;
} }
} }

View File

@ -21,7 +21,7 @@ public class MVLiveHlsDownload extends HlsDownload {
} }
@Override @Override
public void start() throws IOException { public MVLiveHlsDownload call() throws Exception {
try { try {
scheduler = new ScheduledThreadPoolExecutor(1, r -> { scheduler = new ScheduledThreadPoolExecutor(1, r -> {
Thread t = new Thread(r); Thread t = new Thread(r);
@ -32,11 +32,11 @@ public class MVLiveHlsDownload extends HlsDownload {
}); });
scheduler.scheduleAtFixedRate(this::updateCloudFlareCookies, 120, 120, TimeUnit.SECONDS); scheduler.scheduleAtFixedRate(this::updateCloudFlareCookies, 120, 120, TimeUnit.SECONDS);
updateCloudFlareCookies(); updateCloudFlareCookies();
super.start(); super.call();
} finally { } finally {
scheduler.shutdown(); scheduler.shutdown();
} }
return this;
} }
private void updateCloudFlareCookies() { private void updateCloudFlareCookies() {

View File

@ -22,7 +22,7 @@ public class MVLiveMergedHlsDownload extends MergedFfmpegHlsDownload {
} }
@Override @Override
public void start() throws IOException { public MVLiveMergedHlsDownload call() throws Exception {
try { try {
scheduler = new ScheduledThreadPoolExecutor(1, r -> { scheduler = new ScheduledThreadPoolExecutor(1, r -> {
Thread t = new Thread(r); Thread t = new Thread(r);
@ -33,10 +33,11 @@ public class MVLiveMergedHlsDownload extends MergedFfmpegHlsDownload {
}); });
scheduler.scheduleAtFixedRate(this::updateCloudFlareCookies, 2, 2, TimeUnit.MINUTES); scheduler.scheduleAtFixedRate(this::updateCloudFlareCookies, 2, 2, TimeUnit.MINUTES);
updateCloudFlareCookies(); updateCloudFlareCookies();
super.start(); super.call();
} finally { } finally {
scheduler.shutdown(); scheduler.shutdown();
} }
return this;
} }
private void updateCloudFlareCookies() { private void updateCloudFlareCookies() {