From 6cc8fd9cc25909e954ec9dc64dc9c4649be19cb8 Mon Sep 17 00:00:00 2001 From: 0xboobface <0xboobface@gmail.com> Date: Wed, 25 Dec 2019 17:55:33 +0100 Subject: [PATCH] Fix post-processing timestamp problems --- common/src/main/java/ctbrec/Config.java | 21 ++++--- .../ctbrec/recorder/NextGenLocalRecorder.java | 38 +++++++----- .../recorder/download/AbstractDownload.java | 61 ++++++++++--------- .../ctbrec/recorder/download/Download.java | 2 +- .../ProcessExitedUncleanException.java | 7 +++ .../recorder/download/dash/DashDownload.java | 32 +++++----- .../recorder/download/dash/FfmpegMuxer.java | 7 +-- .../download/hls/AbstractHlsDownload.java | 9 +-- .../recorder/download/hls/HlsDownload.java | 14 +++-- .../download/hls/MergedHlsDownload.java | 24 +++++--- .../download/hls/PostProcessingException.java | 6 +- 11 files changed, 124 insertions(+), 97 deletions(-) create mode 100644 common/src/main/java/ctbrec/recorder/download/ProcessExitedUncleanException.java diff --git a/common/src/main/java/ctbrec/Config.java b/common/src/main/java/ctbrec/Config.java index f79ae2a5..ad00b3df 100644 --- a/common/src/main/java/ctbrec/Config.java +++ b/common/src/main/java/ctbrec/Config.java @@ -8,6 +8,10 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.StandardCopyOption; import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.Date; import java.util.List; import java.util.Objects; @@ -142,23 +146,22 @@ public class Config { return configDir; } - public File getFileForRecording(Model model, String suffix) { - File dirForRecording = getDirForRecording(model); - SimpleDateFormat sdf = new SimpleDateFormat(RECORDING_DATE_FORMAT); - String startTime = sdf.format(new Date()); - File targetFile = new File(dirForRecording, model.getSanitizedNamed() + '_' + startTime + '.' + suffix); + public File getFileForRecording(Model model, String suffix, Instant startTime) { + DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(RECORDING_DATE_FORMAT); + LocalDateTime startDateTime = LocalDateTime.ofInstant(startTime, ZoneId.systemDefault()); + String formattedDate = dateTimeFormatter.format(startDateTime); + File dirForRecording = getDirForRecording(model, formattedDate); + File targetFile = new File(dirForRecording, model.getSanitizedNamed() + '_' + formattedDate + '.' + suffix); return targetFile; } - private File getDirForRecording(Model model) { + private File getDirForRecording(Model model, String formattedDate) { switch(getSettings().recordingsDirStructure) { case ONE_PER_MODEL: return new File(getSettings().recordingsDir, model.getSanitizedNamed()); case ONE_PER_RECORDING: File modelDir = new File(getSettings().recordingsDir, model.getSanitizedNamed()); - SimpleDateFormat sdf = new SimpleDateFormat(RECORDING_DATE_FORMAT); - String startTime = sdf.format(new Date()); - return new File(modelDir, startTime); + return new File(modelDir, formattedDate); case FLAT: default: return new File(getSettings().recordingsDir); diff --git a/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java index 5ffa9669..3eab5019 100644 --- a/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java +++ b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java @@ -16,6 +16,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.concurrent.BlockingQueue; @@ -95,6 +96,21 @@ public class NextGenLocalRecorder implements Recorder { LOG.info("Models to record: {}", models); LOG.info("Saving recordings in {}", config.getSettings().recordingsDir); + startCompletionHandler(); + + scheduler.scheduleWithFixedDelay(() -> { + try { + if (!recordingProcesses.isEmpty() && !enoughSpaceForRecording()) { + LOG.info("No space left -> Stopping all recordings"); + stopRecordingProcesses(); + } + } catch (IOException e) { + LOG.error("Couldn't check space left on device", e); + } + }, 1, 1, TimeUnit.SECONDS); + } + + private void startCompletionHandler() { Thread completionHandler = new Thread(() -> { while (!Thread.interrupted()) { try { @@ -127,17 +143,6 @@ public class NextGenLocalRecorder implements Recorder { completionHandler.setName("CompletionHandler"); completionHandler.setDaemon(true); completionHandler.start(); - - scheduler.scheduleWithFixedDelay(() -> { - try { - if (!recordingProcesses.isEmpty() && !enoughSpaceForRecording()) { - LOG.info("No space left -> Stopping all recordings"); - stopRecordingProcesses(); - } - } catch (IOException e) { - LOG.error("Couldn't check space left on device", e); - } - }, 1, 1, TimeUnit.SECONDS); } private void submitPostProcessingJob(Recording recording) { @@ -150,6 +155,9 @@ public class NextGenLocalRecorder implements Recorder { recordingManager.saveRecording(recording); deleteIfTooShort(recording); } catch (Exception e) { + if (e instanceof InterruptedException) { // NOSONAR + Thread.currentThread().interrupt(); + } LOG.error("Error while post-processing recording {}", recording, e); recording.setStatus(State.FAILED); try { @@ -240,14 +248,16 @@ public class NextGenLocalRecorder implements Recorder { LOG.debug("Starting recording for model {}", model.getName()); Download download = model.createDownload(); - download.init(config, model); + download.init(config, model, Instant.now()); + Objects.requireNonNull(download.getStartTime(), + "At this point the download should have set a startTime. Make sure to set a startTime in " + download.getClass() + ".init()"); LOG.debug("Downloading with {}", download.getClass().getSimpleName()); Recording rec = new Recording(); rec.setDownload(download); rec.setPath(download.getPath(model).replaceAll("\\\\", "/")); rec.setModel(model); - rec.setStartDate(Instant.ofEpochMilli(System.currentTimeMillis())); + rec.setStartDate(download.getStartTime()); recordingProcesses.put(model, rec); recordingManager.add(rec); completionService.submit(() -> { @@ -621,7 +631,7 @@ public class NextGenLocalRecorder implements Recorder { for (Recording other : recordings) { if(other.equals(recording)) { Download download = other.getModel().createDownload(); - download.init(Config.getInstance(), other.getModel()); + download.init(Config.getInstance(), other.getModel(), other.getStartDate()); other.setDownload(download); submitPostProcessingJob(other); return; diff --git a/common/src/main/java/ctbrec/recorder/download/AbstractDownload.java b/common/src/main/java/ctbrec/recorder/download/AbstractDownload.java index daa02a91..f06afd5f 100644 --- a/common/src/main/java/ctbrec/recorder/download/AbstractDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/AbstractDownload.java @@ -1,6 +1,8 @@ package ctbrec.recorder.download; import java.io.File; +import java.io.IOException; +import java.time.Instant; import java.util.Arrays; import org.slf4j.Logger; @@ -15,39 +17,42 @@ public abstract class AbstractDownload implements Download { private static final Logger LOG = LoggerFactory.getLogger(AbstractDownload.class); - protected void runPostProcessingScript(Recording recording) { + protected Instant startTime; + + protected void runPostProcessingScript(Recording recording) throws IOException, InterruptedException { String postProcessing = Config.getInstance().getSettings().postProcessing; if (postProcessing != null && !postProcessing.isEmpty()) { File target = recording.getAbsoluteFile(); Runtime rt = Runtime.getRuntime(); - try { - String[] args = new String[] { - postProcessing, - target.getParentFile().getAbsolutePath(), - target.getAbsolutePath(), - getModel().getName(), - getModel().getSite().getName(), - Long.toString(recording.getStartDate().getEpochSecond()) - }; - if(LOG.isDebugEnabled()) { - LOG.debug("Running {}", Arrays.toString(args)); - } - Process process = rt.exec(args, OS.getEnvironment()); - // TODO maybe write these to a separate log file, e.g. recname.ts.pp.log - Thread std = new Thread(new StreamRedirectThread(process.getInputStream(), System.out)); - std.setName("Process stdout pipe"); - std.setDaemon(true); - std.start(); - Thread err = new Thread(new StreamRedirectThread(process.getErrorStream(), System.err)); - err.setName("Process stderr pipe"); - err.setDaemon(true); - err.start(); - - process.waitFor(); - LOG.debug("Process finished."); - } catch (Exception e) { - LOG.error("Error in process thread", e); + String[] args = new String[] { + postProcessing, + target.getParentFile().getAbsolutePath(), + target.getAbsolutePath(), + getModel().getName(), + getModel().getSite().getName(), + Long.toString(recording.getStartDate().getEpochSecond()) + }; + if(LOG.isDebugEnabled()) { + LOG.debug("Running {}", Arrays.toString(args)); } + Process process = rt.exec(args, OS.getEnvironment()); + // TODO maybe write these to a separate log file, e.g. recname.ts.pp.log + Thread std = new Thread(new StreamRedirectThread(process.getInputStream(), System.out)); + std.setName("Process stdout pipe"); + std.setDaemon(true); + std.start(); + Thread err = new Thread(new StreamRedirectThread(process.getErrorStream(), System.err)); + err.setName("Process stderr pipe"); + err.setDaemon(true); + err.start(); + + process.waitFor(); + LOG.debug("Process finished."); } } + + @Override + public Instant getStartTime() { + return startTime; + } } diff --git a/common/src/main/java/ctbrec/recorder/download/Download.java b/common/src/main/java/ctbrec/recorder/download/Download.java index 30dd64fc..44997c74 100644 --- a/common/src/main/java/ctbrec/recorder/download/Download.java +++ b/common/src/main/java/ctbrec/recorder/download/Download.java @@ -10,7 +10,7 @@ import ctbrec.Model; import ctbrec.Recording; public interface Download { - public void init(Config config, Model model); + public void init(Config config, Model model, Instant startTime); public void start() throws IOException; public void stop(); public Model getModel(); diff --git a/common/src/main/java/ctbrec/recorder/download/ProcessExitedUncleanException.java b/common/src/main/java/ctbrec/recorder/download/ProcessExitedUncleanException.java new file mode 100644 index 00000000..9f1a8f40 --- /dev/null +++ b/common/src/main/java/ctbrec/recorder/download/ProcessExitedUncleanException.java @@ -0,0 +1,7 @@ +package ctbrec.recorder.download; + +public class ProcessExitedUncleanException extends RuntimeException { + public ProcessExitedUncleanException(String msg) { + super(msg); + } +} \ No newline at end of file diff --git a/common/src/main/java/ctbrec/recorder/download/dash/DashDownload.java b/common/src/main/java/ctbrec/recorder/download/dash/DashDownload.java index 6f127a48..c7e9d061 100644 --- a/common/src/main/java/ctbrec/recorder/download/dash/DashDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/dash/DashDownload.java @@ -32,10 +32,11 @@ import org.slf4j.LoggerFactory; import ctbrec.Config; import ctbrec.Model; import ctbrec.Recording; -import ctbrec.Recording.State; import ctbrec.io.HttpClient; +import ctbrec.io.HttpException; import ctbrec.recorder.download.AbstractDownload; import ctbrec.recorder.download.dash.SegmentTimelineType.S; +import ctbrec.recorder.download.hls.PostProcessingException; import okhttp3.Request; import okhttp3.Response; @@ -53,7 +54,6 @@ public class DashDownload extends AbstractDownload { private HttpClient httpClient; private Config config; private Model model; - private Instant startTime; private Instant endTime; private Path downloadDir; private String manifestUrl; @@ -80,7 +80,11 @@ public class DashDownload extends AbstractDownload { .build(); // @formatter:on LOG.trace("Loading manifest {}", url); try (Response response = httpClient.execute(request)) { - return response.body().string(); + if (response.isSuccessful()) { + return response.body().string(); + } else { + throw new HttpException(response.code(), "Couldn't load manifest: " + response.message()); + } } } @@ -181,7 +185,7 @@ public class DashDownload extends AbstractDownload { File segmentFile = new File(dir, prefix + '_' + df.format(c) + '_' + new File(absFile).getName()); while (tries <= 10) { if (!segmentFile.exists() || segmentFile.length() == 0) { - if (tries > 1) { + if (tries == 10) { LOG.debug("Loading segment, try {}, {} {} {}", tries, response.code(), response.headers().values("Content-Length"), url); } else { LOG.trace("Loading segment, try {}, {} {} {}", tries, response.code(), response.headers().values("Content-Length"), url); @@ -204,11 +208,11 @@ public class DashDownload extends AbstractDownload { } @Override - public void init(Config config, Model model) { + public void init(Config config, Model model, Instant startTime) { this.config = config; this.model = model; - startTime = Instant.now(); - File finalFile = Config.getInstance().getFileForRecording(model, "mp4"); + this.startTime = startTime; + File finalFile = Config.getInstance().getFileForRecording(model, "mp4", startTime); targetFile = new File(finalFile.getParentFile(), finalFile.getName() + ".part"); downloadDir = targetFile.toPath(); } @@ -241,10 +245,10 @@ public class DashDownload extends AbstractDownload { } private boolean splitRecording() { - if(config.getSettings().splitRecordings > 0) { + if (config.getSettings().splitRecordings > 0) { Duration recordingDuration = Duration.between(splitRecStartTime, ZonedDateTime.now()); long seconds = recordingDuration.getSeconds(); - if(seconds >= config.getSettings().splitRecordings) { + if (seconds >= config.getSettings().splitRecordings) { internalStop(); return true; } @@ -331,11 +335,6 @@ public class DashDownload extends AbstractDownload { return model; } - @Override - public Instant getStartTime() { - return startTime; - } - @Override public Duration getLength() { return Duration.between(startTime, Optional.ofNullable(endTime).orElse(Instant.now())); @@ -352,9 +351,8 @@ public class DashDownload extends AbstractDownload { targetFile = file; recording.setPath(path.substring(0, path.length() - 5)); runPostProcessingScript(recording); - } catch (IOException e) { - LOG.error("Error while merging dash segments", e); - recording.setStatus(State.FAILED); + } catch (Exception e) { + throw new PostProcessingException(e); } } diff --git a/common/src/main/java/ctbrec/recorder/download/dash/FfmpegMuxer.java b/common/src/main/java/ctbrec/recorder/download/dash/FfmpegMuxer.java index f5c3504e..1b6a495c 100644 --- a/common/src/main/java/ctbrec/recorder/download/dash/FfmpegMuxer.java +++ b/common/src/main/java/ctbrec/recorder/download/dash/FfmpegMuxer.java @@ -12,6 +12,7 @@ import org.slf4j.LoggerFactory; import ctbrec.OS; import ctbrec.io.StreamRedirectThread; +import ctbrec.recorder.download.ProcessExitedUncleanException; public class FfmpegMuxer { private static final Logger LOG = LoggerFactory.getLogger(FfmpegMuxer.class); @@ -96,10 +97,4 @@ public class FfmpegMuxer { return 1; } } - - public static class ProcessExitedUncleanException extends RuntimeException { - public ProcessExitedUncleanException(String msg) { - super(msg); - } - } } diff --git a/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload.java index c5f57f62..83dcb71f 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload.java @@ -4,7 +4,6 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URL; -import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -51,7 +50,6 @@ public abstract class AbstractHlsDownload extends AbstractDownload { protected HttpClient client; protected volatile boolean running = false; - protected Instant startTime; protected Model model = new UnknownModel(); protected BlockingQueue downloadQueue = new LinkedBlockingQueue<>(50); protected ExecutorService downloadThreadPool = new ThreadPoolExecutor(5, 5, 2, TimeUnit.MINUTES, downloadQueue, createThreadFactory()); @@ -180,17 +178,12 @@ public abstract class AbstractHlsDownload extends AbstractDownload { abstract void internalStop(); - @Override - public Instant getStartTime() { - return startTime; - } - @Override public Model getModel() { return model; } - + public static class SegmentPlaylist { public String url; diff --git a/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java index e5076e1f..fafb9d99 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java @@ -64,14 +64,14 @@ public class HlsDownload extends AbstractHlsDownload { } @Override - public void init(Config config, Model model) { + public void init(Config config, Model model, Instant startTime) { this.config = config; super.model = model; DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.RECORDING_DATE_FORMAT); - this.startTime = Instant.now(); - String startTime = formatter.format(ZonedDateTime.ofInstant(this.startTime, ZoneId.systemDefault())); + this.startTime = startTime; + String formattedStartTime = formatter.format(ZonedDateTime.ofInstant(this.startTime, ZoneId.systemDefault())); Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getSanitizedNamed()); - downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), startTime); + downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), formattedStartTime); } @Override @@ -179,7 +179,11 @@ public class HlsDownload extends AbstractHlsDownload { recording.setStatusWithEvent(State.GENERATING_PLAYLIST); generatePlaylist(recording); recording.setStatusWithEvent(State.POST_PROCESSING); - runPostProcessingScript(recording); + try { + runPostProcessingScript(recording); + } catch (Exception e) { + throw new PostProcessingException(e); + } } protected File generatePlaylist(Recording recording) { diff --git a/common/src/main/java/ctbrec/recorder/download/hls/MergedHlsDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/MergedHlsDownload.java index bd652698..a1e5ceed 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/MergedHlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/MergedHlsDownload.java @@ -9,6 +9,7 @@ import java.nio.file.Files; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.time.Duration; +import java.time.Instant; import java.util.Arrays; import org.jcodec.containers.mp4.MP4Util; @@ -27,6 +28,7 @@ import ctbrec.io.HttpClient; import ctbrec.io.StreamRedirectThread; import ctbrec.recorder.ProgressListener; import ctbrec.recorder.RecordingManager; +import ctbrec.recorder.download.ProcessExitedUncleanException; import okhttp3.Request; import okhttp3.Response; @@ -40,9 +42,15 @@ public class MergedHlsDownload extends HlsDownload { } @Override - public void init(Config config, Model model) { - super.init(config, model); - finalFile = Config.getInstance().getFileForRecording(model, "mp4"); + public void init(Config config, Model model, Instant startTime) { + super.init(config, model, startTime); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + finalFile = Config.getInstance().getFileForRecording(model, "mp4", startTime); + downloadDir = finalFile.getParentFile().toPath(); } @Override @@ -60,12 +68,12 @@ public class MergedHlsDownload extends HlsDownload { } runPostProcessingScript(recording); - } catch (PostProcessingException | IOException e) { - LOG.error("An error occurred during post-processing", e); + } catch (Exception e) { + throw new PostProcessingException(e); } } - private void postprocess(File playlist, File target) throws PostProcessingException { + private void postprocess(File playlist, File target) { try { File dir = playlist.getParentFile(); // @formatter:off @@ -90,7 +98,7 @@ public class MergedHlsDownload extends HlsDownload { Files.delete(segment.toPath()); } } else { - throw new PostProcessingException("FFmpeg exit code was " + exitCode); + throw new ProcessExitedUncleanException("FFmpeg exit code was " + exitCode); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -101,7 +109,7 @@ public class MergedHlsDownload extends HlsDownload { } public void downloadFinishedRecording(String segmentPlaylistUri, File target, ProgressListener progressListener) - throws IOException, ParseException, PlaylistException, InvalidKeyException, NoSuchAlgorithmException, PostProcessingException { + throws IOException, ParseException, PlaylistException, InvalidKeyException, NoSuchAlgorithmException { if (Config.getInstance().getSettings().requireAuthentication) { URL u = new URL(segmentPlaylistUri); String path = u.getPath(); diff --git a/common/src/main/java/ctbrec/recorder/download/hls/PostProcessingException.java b/common/src/main/java/ctbrec/recorder/download/hls/PostProcessingException.java index 9080be2a..ca754c5c 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/PostProcessingException.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/PostProcessingException.java @@ -1,9 +1,13 @@ package ctbrec.recorder.download.hls; -public class PostProcessingException extends Exception { +public class PostProcessingException extends RuntimeException { public PostProcessingException(String msg) { super(msg); } + public PostProcessingException(Exception cause) { + super(cause); + } + }