From e3270b62210b07a7311108854a8798249fed5574 Mon Sep 17 00:00:00 2001 From: 0xb00bface <0xboobface@gmail.com> Date: Fri, 25 Dec 2020 22:58:12 +0100 Subject: [PATCH] Refactored multi-threading for downloads This is a first kind of working version. HlsDownload records, but teh error handling is broken. --- common/src/main/java/ctbrec/Recording.java | 9 +- .../ctbrec/recorder/NextGenLocalRecorder.java | 128 +++++++++--------- .../recorder/download/AbstractDownload.java | 6 + .../ctbrec/recorder/download/Download.java | 16 ++- .../recorder/download/dash/DashDownload.java | 8 +- .../download/hls/AbstractHlsDownload2.java | 98 ++++++++------ .../recorder/download/hls/FFmpegDownload.java | 24 +++- .../recorder/download/hls/HlsDownload.java | 26 +--- .../download/hls/MergedFfmpegHlsDownload.java | 20 ++- .../download/hls/SegmentDownload.java | 1 - .../download/hls/SegmentPlaylist.java | 2 +- .../ctbrec/sites/fc2live/Fc2HlsDownload.java | 7 +- .../sites/fc2live/Fc2MergedHlsDownload.java | 7 +- .../sites/manyvids/MVLiveHlsDownload.java | 6 +- .../manyvids/MVLiveMergedHlsDownload.java | 5 +- 15 files changed, 199 insertions(+), 164 deletions(-) diff --git a/common/src/main/java/ctbrec/Recording.java b/common/src/main/java/ctbrec/Recording.java index f2ccc264..c268ac06 100644 --- a/common/src/main/java/ctbrec/Recording.java +++ b/common/src/main/java/ctbrec/Recording.java @@ -12,6 +12,7 @@ import java.time.format.DateTimeFormatter; import java.util.HashSet; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Callable; import ctbrec.event.EventBusHolder; import ctbrec.event.RecordingStateChangedEvent; @@ -19,7 +20,7 @@ import ctbrec.io.IoUtils; import ctbrec.recorder.download.Download; import ctbrec.recorder.download.VideoLengthDetector; -public class Recording implements Serializable { +public class Recording implements Serializable, Callable { private String id; private Model model; private transient Download download; @@ -60,6 +61,12 @@ public class Recording implements Serializable { } } + @Override + public Recording call() throws Exception { + download.call(); + return this; + } + public String getId() { return id; } diff --git a/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java index afefe07e..0ef7e692 100644 --- a/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java +++ b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java @@ -9,6 +9,7 @@ import java.nio.file.FileStore; import java.nio.file.Files; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; +import java.time.Duration; import java.time.Instant; import java.time.ZoneId; import java.util.ArrayList; @@ -21,15 +22,13 @@ import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -69,10 +68,10 @@ public class NextGenLocalRecorder implements Recorder { private RecordingPreconditions preconditions; // thread pools for downloads and post-processing - private BlockingQueue downloadQueue = new SynchronousQueue<>(); - private ThreadPoolExecutor downloadPool = new ThreadPoolExecutor(2, 100, 5, TimeUnit.MINUTES, downloadQueue, createThreadFactory("Download")); + private ScheduledExecutorService downloadPool = Executors.newScheduledThreadPool(30, createThreadFactory("Download")); + private ExecutorService downloadCompletionPool = Executors.newFixedThreadPool(1, createThreadFactory("DownloadCompletionWorker")); + private BlockingQueue> downloadFutureQueue = new LinkedBlockingQueue<>(); - private ExecutorCompletionService completionService = new ExecutorCompletionService<>(downloadPool); private BlockingQueue ppQueue = new LinkedBlockingQueue<>(); private ThreadPoolExecutor ppPool; @@ -121,38 +120,60 @@ public class NextGenLocalRecorder implements Recorder { } private void startCompletionHandler() { - Thread completionHandler = new Thread(() -> { - while (!Thread.interrupted()) { - try { - Future result = completionService.take(); - Recording rec = result.get(); - recorderLock.lock(); + for (int i = 0; i < 1; i++) { + downloadCompletionPool.submit(() -> { + while (!Thread.currentThread().isInterrupted()) { try { - recordingProcesses.remove(rec.getModel()); - } finally { - recorderLock.unlock(); - } - if (rec.getStatus() == State.WAITING) { - LOG.info("Download finished for {} -> Starting post-processing", rec.getModel().getName()); - submitPostProcessingJob(rec); + ScheduledFuture future = downloadFutureQueue.take(); + if (!future.isDone()) { + downloadFutureQueue.put(future); + } else { + Recording rec = future.get(); + Download d = rec.getDownload(); + if (d.isRunning()) { + long delay = Math.max(0, Duration.between(Instant.now(), d.getRescheduleTime()).toMillis()); + // LOG.debug("Download still running. Scheduling to run in {} ms", delay); + downloadFutureQueue.add(downloadPool.schedule(rec, delay, TimeUnit.MILLISECONDS)); + } else { + try { + boolean deleted = deleteIfEmpty(rec); + setRecordingStatus(rec, deleted ? State.DELETED : State.WAITING); + if (!deleted) { + // only save the status, if the recording has not been deleted, otherwise we recreate the metadata file + recordingManager.saveRecording(rec); + } + } catch (InvalidKeyException | NoSuchAlgorithmException | IOException e) { + LOG.error("Couldn't execute post-processing step \"delete if empty\"", e); + } - // check, if we have to restart the recording - Model model = rec.getModel(); - tryRestartRecording(model); - } else { - setRecordingStatus(rec, State.FAILED); + recorderLock.lock(); + try { + recordingProcesses.remove(rec.getModel()); + } finally { + recorderLock.unlock(); + } + if (rec.getStatus() == State.WAITING) { + LOG.info("Download finished for {} -> Starting post-processing", rec.getModel().getName()); + submitPostProcessingJob(rec); + + // check, if we have to restart the recording + Model model = rec.getModel(); + tryRestartRecording(model); + } else { + setRecordingStatus(rec, State.FAILED); + } + } + } + Thread.sleep(1); + } catch (ExecutionException | IllegalStateException e) { + LOG.error("Error while completing recording", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Error while completing recording", e); } - } catch (ExecutionException | IllegalStateException e) { - LOG.error("Error while completing recording", e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("Error while completing recording", e); } - } - }); - completionHandler.setName("CompletionHandler"); - completionHandler.setDaemon(true); - completionHandler.start(); + }); + } } private void submitPostProcessingJob(Recording recording) { @@ -214,26 +235,28 @@ public class NextGenLocalRecorder implements Recorder { } } - private void startRecordingProcess(Model model) throws IOException { + private void startRecordingProcess(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException { recorderLock.lock(); try { preconditions.check(model); LOG.info("Starting recording for model {}", model.getName()); Download download = createDownload(model); Recording rec = createRecording(download); - completionService.submit(createDownloadJob(rec)); + setRecordingStatus(rec, State.RECORDING); + rec.getModel().setLastRecorded(rec.getStartDate()); + recordingManager.saveRecording(rec); + downloadFutureQueue.add(downloadPool.schedule(rec, 0, TimeUnit.MILLISECONDS)); } catch (RecordUntilExpiredException e) { LOG.info("Precondition not met to record {}: {}", model, e.getLocalizedMessage()); executeRecordUntilSubsequentAction(model); } catch (PreconditionNotMetException e) { LOG.info("Precondition not met to record {}: {}", model, e.getLocalizedMessage()); - return; } finally { recorderLock.unlock(); } } - private Download createDownload(Model model) { + private Download createDownload(Model model) throws IOException { Download download = model.createDownload(); download.init(config, model, Instant.now(), downloadPool); Objects.requireNonNull(download.getStartTime(), @@ -242,26 +265,6 @@ public class NextGenLocalRecorder implements Recorder { return download; } - private Callable createDownloadJob(Recording rec) { - return () -> { - try { - setRecordingStatus(rec, State.RECORDING); - rec.getModel().setLastRecorded(rec.getStartDate()); - recordingManager.saveRecording(rec); - rec.getDownload().start(); - } catch (Exception e) { - LOG.error("Download for {} failed. Download state: {}", rec.getModel().getName(), rec.getStatus(), e); - } - boolean deleted = deleteIfEmpty(rec); - setRecordingStatus(rec, deleted ? State.DELETED : State.WAITING); - if (!deleted) { - // only save the status, if the recording has not been deleted, otherwise we recreate the metadata file - recordingManager.saveRecording(rec); - } - return rec; - }; - } - private void executeRecordUntilSubsequentAction(Model model) throws IOException { if (model.getRecordUntilSubsequentAction() == PAUSE) { model.setSuspended(true); @@ -283,7 +286,7 @@ public class NextGenLocalRecorder implements Recorder { Recording rec = new Recording(); rec.setId(UUID.randomUUID().toString()); rec.setDownload(download); - String recordingFile = download.getPath(model).replaceAll("\\\\", "/"); + String recordingFile = download.getPath(model).replace('\\', '/'); File absoluteFile = new File(config.getSettings().recordingsDir, recordingFile); rec.setAbsoluteFile(absoluteFile); rec.setModel(model); @@ -489,7 +492,7 @@ public class NextGenLocalRecorder implements Recorder { } @Override - public void resumeRecording(Model model) throws IOException { + public void resumeRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException { recorderLock.lock(); try { if (models.contains(model)) { @@ -615,16 +618,17 @@ public class NextGenLocalRecorder implements Recorder { Thread t = new Thread(r); t.setName(name + " " + UUID.randomUUID().toString().substring(0, 8)); t.setDaemon(true); + t.setPriority(Thread.MAX_PRIORITY); return t; }; } @Override - public void rerunPostProcessing(Recording recording) { + public void rerunPostProcessing(Recording recording) throws IOException { recording.setPostProcessedFile(null); List recordings = recordingManager.getAll(); for (Recording other : recordings) { - if(other.equals(recording)) { + if (other.equals(recording)) { Download download = other.getModel().createDownload(); download.init(Config.getInstance(), other.getModel(), other.getStartDate(), downloadPool); other.setDownload(download); diff --git a/common/src/main/java/ctbrec/recorder/download/AbstractDownload.java b/common/src/main/java/ctbrec/recorder/download/AbstractDownload.java index c1f1a939..2dbe71e4 100644 --- a/common/src/main/java/ctbrec/recorder/download/AbstractDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/AbstractDownload.java @@ -5,9 +5,15 @@ import java.time.Instant; public abstract class AbstractDownload implements Download { protected Instant startTime; + protected Instant rescheduleTime = Instant.now(); @Override public Instant getStartTime() { return startTime; } + + @Override + public Instant getRescheduleTime() { + return rescheduleTime; + } } diff --git a/common/src/main/java/ctbrec/recorder/download/Download.java b/common/src/main/java/ctbrec/recorder/download/Download.java index 4916f8fe..23fca731 100644 --- a/common/src/main/java/ctbrec/recorder/download/Download.java +++ b/common/src/main/java/ctbrec/recorder/download/Download.java @@ -4,19 +4,21 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; import java.time.Instant; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import ctbrec.Config; import ctbrec.Model; import ctbrec.Recording; -public interface Download extends Serializable { - public void init(Config config, Model model, Instant startTime, ExecutorService executorService); - public void start() throws IOException; - public void stop(); - public Model getModel(); - public Instant getStartTime(); - public void postprocess(Recording recording); +public interface Download extends Serializable, Callable { + void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException; + void stop(); + boolean isRunning(); + Model getModel(); + Instant getStartTime(); + Instant getRescheduleTime(); + void postprocess(Recording recording); /** * Returns the path to the recording in the filesystem as file object diff --git a/common/src/main/java/ctbrec/recorder/download/dash/DashDownload.java b/common/src/main/java/ctbrec/recorder/download/dash/DashDownload.java index 59a49b19..29f9f229 100644 --- a/common/src/main/java/ctbrec/recorder/download/dash/DashDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/dash/DashDownload.java @@ -240,7 +240,7 @@ public class DashDownload extends AbstractDownload { } @Override - public void start() throws IOException { + public DashDownload call() throws IOException { try { Thread.currentThread().setName("Download " + model.getName()); running = true; @@ -275,6 +275,7 @@ public class DashDownload extends AbstractDownload { downloadFinished.notifyAll(); } } + return this; } private boolean splitRecording() { @@ -423,4 +424,9 @@ public class DashDownload extends AbstractDownload { return IoUtils.getDirectorySize(downloadDir.toFile()); } + @Override + public boolean isRunning() { + return running; + } + } diff --git a/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload2.java b/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload2.java index 5add831f..d75a8389 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload2.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload2.java @@ -44,11 +44,13 @@ import com.iheartradio.m3u8.data.TrackData; import ctbrec.Config; import ctbrec.Model; +import ctbrec.Model.State; import ctbrec.Settings; import ctbrec.UnknownModel; import ctbrec.io.BandwidthMeter; import ctbrec.io.HttpClient; import ctbrec.io.HttpException; +import ctbrec.io.MissedSegmentsStatistics; import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException; import ctbrec.recorder.download.AbstractDownload; import ctbrec.recorder.download.HttpHeaderFactory; @@ -67,12 +69,18 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { private transient NumberFormat nf = new DecimalFormat("000000"); private transient int playlistEmptyCount = 0; private transient int segmentCounter = 1; - private transient int waitFactor = 1; + private transient int waitFactor = 2; protected transient Config config; protected transient HttpClient client; protected transient ExecutorService downloadExecutor; - protected transient volatile boolean running = false; + protected transient volatile boolean running = true; protected transient SplittingStrategy splittingStrategy; + protected transient int lastSegmentNumber = 0; + protected transient int nextSegmentNumber = 0; + protected transient String segmentPlaylistUrl; + + private transient Instant previousPlaylistRequest = Instant.EPOCH; + private transient Instant lastPlaylistRequest= Instant.EPOCH; protected Model model = new UnknownModel(); @@ -80,17 +88,14 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { this.client = client; } - protected void onStart() throws IOException {} - protected abstract void createTargetDirectory() throws IOException; protected abstract void execute(SegmentDownload segmentDownload); protected abstract OutputStream getSegmentOutputStream(String prefix, String fileName) throws IOException; protected void segmentDownloadFinished(SegmentDownload segmentDownload) {} protected abstract void internalStop(); - protected void onFinish() {} protected void finalizeDownload() {} @Override - public void init(Config config, Model model, Instant startTime, ExecutorService executorService) { + public void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException { this.config = config; this.model = model; this.startTime = startTime; @@ -99,33 +104,32 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { } @Override - public void start() throws IOException { - running = true; + public AbstractHlsDownload2 call() throws Exception { try { - onStart(); - String segmentPlaylistUrl = getSegmentPlaylistUrl(model); - createTargetDirectory(); - int lastSegmentNumber = 0; - int nextSegmentNumber = 0; - while (running) { - SegmentPlaylist segmentPlaylist = getNextSegments(segmentPlaylistUrl); - emptyPlaylistCheck(segmentPlaylist); - handleMissedSegments(segmentPlaylist, nextSegmentNumber); - enqueueNewSegments(segmentPlaylist, nextSegmentNumber); - splitRecordingIfNecessary(); - waitSomeTime(segmentPlaylist, lastSegmentNumber, waitFactor); - - // this if check makes sure, that we don't decrease nextSegment. for some reason - // streamate playlists sometimes jump back. e.g. max sequence = 79 -> 80 -> 79 - lastSegmentNumber = segmentPlaylist.seq; - if (lastSegmentNumber + segmentPlaylist.segments.size() > nextSegmentNumber) { - nextSegmentNumber = lastSegmentNumber + segmentPlaylist.segments.size(); - } + if (segmentPlaylistUrl == null) { + segmentPlaylistUrl = getSegmentPlaylistUrl(model); + } + + SegmentPlaylist segmentPlaylist = getNextSegments(segmentPlaylistUrl); + previousPlaylistRequest = lastPlaylistRequest; + lastPlaylistRequest = Instant.now(); + emptyPlaylistCheck(segmentPlaylist); + handleMissedSegments(segmentPlaylist, nextSegmentNumber); + enqueueNewSegments(segmentPlaylist, nextSegmentNumber); + splitRecordingIfNecessary(); + calculateRescheduleTime(segmentPlaylist, lastSegmentNumber, waitFactor); + + // this if check makes sure, that we don't decrease nextSegment. for some reason + // streamate playlists sometimes jump back. e.g. max sequence = 79 -> 80 -> 79 + lastSegmentNumber = segmentPlaylist.seq; + if (lastSegmentNumber + segmentPlaylist.segments.size() > nextSegmentNumber) { + nextSegmentNumber = lastSegmentNumber + segmentPlaylist.segments.size(); } - onFinish(); } catch (ParseException e) { + running = false; throw new IOException("Couldn't parse HLS playlist for model " + model + "\n" + e.getInput(), e); } catch (PlaylistException e) { + running = false; throw new IOException("Couldn't parse HLS playlist for model " + model, e); } catch (EOFException e) { // end of playlist reached @@ -133,11 +137,10 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { } catch (HttpException e) { handleHttpException(e); } catch (Exception e) { - throw new IOException("Couldn't download segment", e); - } finally { - finalizeDownload(); running = false; + throw new IOException("Couldn't download segment", e); } + return this; } protected String getSegmentPlaylistUrl(Model model) throws IOException, ExecutionException, ParseException, PlaylistException, JAXBException { @@ -197,6 +200,7 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { SegmentPlaylist lsp = new SegmentPlaylist(segmentsURL); lsp.seq = mediaPlaylist.getMediaSequenceNumber(); lsp.targetDuration = mediaPlaylist.getTargetDuration(); + List tracks = mediaPlaylist.getTracks(); for (TrackData trackData : tracks) { String uri = trackData.getUri(); @@ -205,7 +209,6 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { uri = new URL(context, uri).toExternalForm(); } lsp.totalDuration += trackData.getTrackInfo().duration; - lsp.lastSegDuration = trackData.getTrackInfo().duration; lsp.segments.add(uri); if (trackData.hasEncryptionData()) { lsp.encrypted = true; @@ -214,6 +217,7 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { lsp.encryptionMethod = data.getMethod().getValue(); } } + lsp.avgSegDuration = lsp.totalDuration / tracks.size(); return lsp; } throw new InvalidPlaylistException("Playlist has no media playlist"); @@ -243,8 +247,10 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { private void handleMissedSegments(SegmentPlaylist playlist, int nextSegmentNumber) { if (nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) { waitFactor *= 2; - LOG.warn("Missed segments {} < {} in download for {} - setting wait factor to 1/{}", nextSegmentNumber, playlist.seq, model, - waitFactor); + LOG.warn("Missed segments {} < {} in download for {} - setting wait factor to 1/{}. Last 2 playlist requests at [{}] [{}] schedule was {}", nextSegmentNumber, playlist.seq, model, + waitFactor, previousPlaylistRequest, lastPlaylistRequest, rescheduleTime); + short missedSegments = (short) (playlist.seq - nextSegmentNumber); + MissedSegmentsStatistics.increase(model, missedSegments); } } @@ -293,19 +299,18 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { } } - private void waitSomeTime(SegmentPlaylist playlist, int lastSegmentNumber, int waitFactor) { + private void calculateRescheduleTime(SegmentPlaylist playlist, int lastSegmentNumber, int waitFactor) { long waitForMillis = 0; if (lastSegmentNumber == playlist.seq) { // playlist didn't change -> wait for at least half the target duration - waitForMillis = (long) playlist.targetDuration * 1000 / waitFactor; - LOG.trace("Playlist didn't change... waiting for {}ms", waitForMillis); + waitForMillis = (long) playlist.avgSegDuration * 1000 / waitFactor; + LOG.trace("Playlist didn't change. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis); } else { - // playlist did change -> wait for at least last segment duration - waitForMillis = 1; - LOG.trace("Playlist changed... waiting for {}ms", waitForMillis); + // playlist did change -> wait for at least the target duration + waitForMillis = (long) (playlist.avgSegDuration * 1000); + LOG.trace("Playlist changed. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis); } - - waitSomeTime(waitForMillis); + rescheduleTime = Instant.now().plusMillis(waitForMillis); } /** @@ -338,12 +343,16 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { ctbrec.Model.State modelState; try { modelState = model.getOnlineState(false); + if (modelState != State.ONLINE) { + running = false; + } } catch (ExecutionException e1) { modelState = ctbrec.Model.State.UNKNOWN; } LOG.info("Playlist access forbidden (403). Model {} probably went private or offline. Model state: {}", model, modelState); waitSomeTime(TEN_SECONDS); } else { + running = false; throw e; } } @@ -365,4 +374,9 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { public Model getModel() { return model; } + + @Override + public boolean isRunning() { + return running; + } } diff --git a/common/src/main/java/ctbrec/recorder/download/hls/FFmpegDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/FFmpegDownload.java index 311ae108..6703e587 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/FFmpegDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/FFmpegDownload.java @@ -32,7 +32,7 @@ import ctbrec.recorder.download.ProcessExitedUncleanException; * Does the whole HLS download with FFmpeg. Not used at the moment, because FFMpeg can't * handle the HLS encryption of Flirt4Free correctly */ -public class FFmpegDownload extends AbstractHlsDownload { +public class FFmpegDownload extends AbstractHlsDownload2 { private static final transient Logger LOG = LoggerFactory.getLogger(FFmpegDownload.class); private transient Config config; @@ -53,7 +53,7 @@ public class FFmpegDownload extends AbstractHlsDownload { } @Override - public void start() throws IOException { + public FFmpegDownload call() throws IOException { try { Files.createDirectories(targetFile.getParentFile().toPath()); String chunkPlaylist = getSegmentPlaylistUrl(model); @@ -97,6 +97,7 @@ public class FFmpegDownload extends AbstractHlsDownload { } catch (ExecutionException | ParseException | PlaylistException | JAXBException e) { LOG.error("Couldn't start FFmpeg process for stream download", e); } + return this; } @Override @@ -137,7 +138,7 @@ public class FFmpegDownload extends AbstractHlsDownload { } @Override - void internalStop() { + protected void internalStop() { stop(); } @@ -151,4 +152,21 @@ public class FFmpegDownload extends AbstractHlsDownload { return getTarget().length(); } + protected void createTargetDirectory() throws IOException { + // TODO Auto-generated method stub + + } + + @Override + protected void execute(SegmentDownload segmentDownload) { + // TODO Auto-generated method stub + + } + + @Override + protected OutputStream getSegmentOutputStream(String prefix, String fileName) throws IOException { + // TODO Auto-generated method stub + return null; + } + } diff --git a/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java index 4a64429e..726723c8 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java @@ -14,8 +14,6 @@ import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import org.slf4j.Logger; @@ -37,22 +35,21 @@ public class HlsDownload extends AbstractHlsDownload2 { private static final Logger LOG = LoggerFactory.getLogger(HlsDownload.class); protected transient Path downloadDir; - private transient AtomicBoolean downloadFinished = new AtomicBoolean(false); public HlsDownload(HttpClient client) { super(client); } @Override - public void init(Config config, Model model, Instant startTime, ExecutorService executorService) { + public void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException { super.init(config, model, startTime, executorService); DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.RECORDING_DATE_FORMAT); String formattedStartTime = formatter.format(ZonedDateTime.ofInstant(this.startTime, ZoneId.systemDefault())); Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getSanitizedNamed()); downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), formattedStartTime); + createTargetDirectory(); } - @Override protected void createTargetDirectory() throws IOException { if (!downloadDir.toFile().exists()) { Files.createDirectories(downloadDir); @@ -61,10 +58,6 @@ public class HlsDownload extends AbstractHlsDownload2 { @Override protected void finalizeDownload() { - downloadFinished.set(true); - synchronized (downloadFinished) { - downloadFinished.notifyAll(); - } LOG.debug("Download for {} terminated", model); } @@ -98,7 +91,7 @@ public class HlsDownload extends AbstractHlsDownload2 { @Override protected void execute(SegmentDownload segmentDownload) { - CompletableFuture.supplyAsync(segmentDownload::call).whenComplete((result, exception) -> { + CompletableFuture.supplyAsync(segmentDownload::call, downloadExecutor).whenComplete((result, exception) -> { if (result != null) { try { result.getOutputStream().close(); @@ -111,18 +104,7 @@ public class HlsDownload extends AbstractHlsDownload2 { @Override public void stop() { - if (running) { - try { - synchronized (downloadFinished) { - while (!downloadFinished.get()) { - downloadFinished.wait(TimeUnit.SECONDS.toMillis(60)); - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("Couldn't wait for download to finish", e); - } - } + LOG.debug("Recording stopped"); internalStop(); } diff --git a/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java index f868aee0..aa1065b8 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java @@ -47,19 +47,11 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 { } @Override - public void init(Config config, Model model, Instant startTime, ExecutorService executorService) { + public void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException { super.init(config, model, startTime, executorService); String fileSuffix = config.getSettings().ffmpegFileSuffix; targetFile = config.getFileForRecording(model, fileSuffix, startTime); - } - @Override - public File getTarget() { - return targetFile; - } - - @Override - protected void onStart() throws IOException { createTargetDirectory(); startFfmpegProcess(targetFile); synchronized (ffmpegStartMonitor) { @@ -94,12 +86,19 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 { } @Override - protected void onFinish() { + public File getTarget() { + return targetFile; + } + + + @Override + protected void finalizeDownload() { try { ffmpegThread.join(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } + super.finalizeDownload(); } private void startFfmpegProcess(File target) { @@ -257,7 +256,6 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 { return getTarget().length(); } - @Override protected void createTargetDirectory() throws IOException { Files.createDirectories(targetFile.getParentFile().toPath()); } diff --git a/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java index af0b25a0..c10fc7d1 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java @@ -48,7 +48,6 @@ public class SegmentDownload implements Callable { @Override public SegmentDownload call() { - LOG.trace("Downloading segment {}", url); for (int tries = 1; tries <= 3 && !Thread.currentThread().isInterrupted(); tries++) { Request request = createRequest(); try (Response response = client.execute(request)) { diff --git a/common/src/main/java/ctbrec/recorder/download/hls/SegmentPlaylist.java b/common/src/main/java/ctbrec/recorder/download/hls/SegmentPlaylist.java index 2b135776..5cb11ffc 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/SegmentPlaylist.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/SegmentPlaylist.java @@ -7,7 +7,7 @@ public class SegmentPlaylist { public String url; public int seq = 0; public float totalDuration = 0; - public float lastSegDuration = 0; + public float avgSegDuration = 0; public float targetDuration = 0; public List segments = new ArrayList<>(); public boolean encrypted = false; diff --git a/common/src/main/java/ctbrec/sites/fc2live/Fc2HlsDownload.java b/common/src/main/java/ctbrec/sites/fc2live/Fc2HlsDownload.java index 509d6cf3..2e498644 100644 --- a/common/src/main/java/ctbrec/sites/fc2live/Fc2HlsDownload.java +++ b/common/src/main/java/ctbrec/sites/fc2live/Fc2HlsDownload.java @@ -1,7 +1,5 @@ package ctbrec.sites.fc2live; -import java.io.IOException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,16 +15,17 @@ public class Fc2HlsDownload extends HlsDownload { } @Override - public void start() throws IOException { + public Fc2HlsDownload call() throws Exception { Fc2Model fc2Model = (Fc2Model) model; try { fc2Model.openWebsocket(); - super.start(); + super.call(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.error("Couldn't start download for {}", model, e); } finally { fc2Model.closeWebsocket(); } + return this; } } diff --git a/common/src/main/java/ctbrec/sites/fc2live/Fc2MergedHlsDownload.java b/common/src/main/java/ctbrec/sites/fc2live/Fc2MergedHlsDownload.java index f6fac7e5..50233ab6 100644 --- a/common/src/main/java/ctbrec/sites/fc2live/Fc2MergedHlsDownload.java +++ b/common/src/main/java/ctbrec/sites/fc2live/Fc2MergedHlsDownload.java @@ -1,7 +1,5 @@ package ctbrec.sites.fc2live; -import java.io.IOException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,16 +15,17 @@ public class Fc2MergedHlsDownload extends MergedFfmpegHlsDownload { } @Override - public void start() throws IOException { + public Fc2MergedHlsDownload call() throws Exception { Fc2Model fc2Model = (Fc2Model) model; try { fc2Model.openWebsocket(); - super.start(); + super.call(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.error("Couldn't start download for {}", model, e); } finally { fc2Model.closeWebsocket(); } + return this; } } diff --git a/common/src/main/java/ctbrec/sites/manyvids/MVLiveHlsDownload.java b/common/src/main/java/ctbrec/sites/manyvids/MVLiveHlsDownload.java index be4adef2..139074dc 100644 --- a/common/src/main/java/ctbrec/sites/manyvids/MVLiveHlsDownload.java +++ b/common/src/main/java/ctbrec/sites/manyvids/MVLiveHlsDownload.java @@ -21,7 +21,7 @@ public class MVLiveHlsDownload extends HlsDownload { } @Override - public void start() throws IOException { + public MVLiveHlsDownload call() throws Exception { try { scheduler = new ScheduledThreadPoolExecutor(1, r -> { Thread t = new Thread(r); @@ -32,11 +32,11 @@ public class MVLiveHlsDownload extends HlsDownload { }); scheduler.scheduleAtFixedRate(this::updateCloudFlareCookies, 120, 120, TimeUnit.SECONDS); updateCloudFlareCookies(); - super.start(); + super.call(); } finally { scheduler.shutdown(); - } + return this; } private void updateCloudFlareCookies() { diff --git a/common/src/main/java/ctbrec/sites/manyvids/MVLiveMergedHlsDownload.java b/common/src/main/java/ctbrec/sites/manyvids/MVLiveMergedHlsDownload.java index 0b55bd02..491fdba3 100644 --- a/common/src/main/java/ctbrec/sites/manyvids/MVLiveMergedHlsDownload.java +++ b/common/src/main/java/ctbrec/sites/manyvids/MVLiveMergedHlsDownload.java @@ -22,7 +22,7 @@ public class MVLiveMergedHlsDownload extends MergedFfmpegHlsDownload { } @Override - public void start() throws IOException { + public MVLiveMergedHlsDownload call() throws Exception { try { scheduler = new ScheduledThreadPoolExecutor(1, r -> { Thread t = new Thread(r); @@ -33,10 +33,11 @@ public class MVLiveMergedHlsDownload extends MergedFfmpegHlsDownload { }); scheduler.scheduleAtFixedRate(this::updateCloudFlareCookies, 2, 2, TimeUnit.MINUTES); updateCloudFlareCookies(); - super.start(); + super.call(); } finally { scheduler.shutdown(); } + return this; } private void updateCloudFlareCookies() {