forked from j62/ctbrec
1
0
Fork 0

Fix post-processing timestamp problems

This commit is contained in:
0xboobface 2019-12-25 17:55:33 +01:00
parent b8cdb2200e
commit 6cc8fd9cc2
11 changed files with 124 additions and 97 deletions

View File

@ -8,6 +8,10 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.List;
import java.util.Objects;
@ -142,23 +146,22 @@ public class Config {
return configDir;
}
public File getFileForRecording(Model model, String suffix) {
File dirForRecording = getDirForRecording(model);
SimpleDateFormat sdf = new SimpleDateFormat(RECORDING_DATE_FORMAT);
String startTime = sdf.format(new Date());
File targetFile = new File(dirForRecording, model.getSanitizedNamed() + '_' + startTime + '.' + suffix);
public File getFileForRecording(Model model, String suffix, Instant startTime) {
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(RECORDING_DATE_FORMAT);
LocalDateTime startDateTime = LocalDateTime.ofInstant(startTime, ZoneId.systemDefault());
String formattedDate = dateTimeFormatter.format(startDateTime);
File dirForRecording = getDirForRecording(model, formattedDate);
File targetFile = new File(dirForRecording, model.getSanitizedNamed() + '_' + formattedDate + '.' + suffix);
return targetFile;
}
private File getDirForRecording(Model model) {
private File getDirForRecording(Model model, String formattedDate) {
switch(getSettings().recordingsDirStructure) {
case ONE_PER_MODEL:
return new File(getSettings().recordingsDir, model.getSanitizedNamed());
case ONE_PER_RECORDING:
File modelDir = new File(getSettings().recordingsDir, model.getSanitizedNamed());
SimpleDateFormat sdf = new SimpleDateFormat(RECORDING_DATE_FORMAT);
String startTime = sdf.format(new Date());
return new File(modelDir, startTime);
return new File(modelDir, formattedDate);
case FLAT:
default:
return new File(getSettings().recordingsDir);

View File

@ -16,6 +16,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
@ -95,6 +96,21 @@ public class NextGenLocalRecorder implements Recorder {
LOG.info("Models to record: {}", models);
LOG.info("Saving recordings in {}", config.getSettings().recordingsDir);
startCompletionHandler();
scheduler.scheduleWithFixedDelay(() -> {
try {
if (!recordingProcesses.isEmpty() && !enoughSpaceForRecording()) {
LOG.info("No space left -> Stopping all recordings");
stopRecordingProcesses();
}
} catch (IOException e) {
LOG.error("Couldn't check space left on device", e);
}
}, 1, 1, TimeUnit.SECONDS);
}
private void startCompletionHandler() {
Thread completionHandler = new Thread(() -> {
while (!Thread.interrupted()) {
try {
@ -127,17 +143,6 @@ public class NextGenLocalRecorder implements Recorder {
completionHandler.setName("CompletionHandler");
completionHandler.setDaemon(true);
completionHandler.start();
scheduler.scheduleWithFixedDelay(() -> {
try {
if (!recordingProcesses.isEmpty() && !enoughSpaceForRecording()) {
LOG.info("No space left -> Stopping all recordings");
stopRecordingProcesses();
}
} catch (IOException e) {
LOG.error("Couldn't check space left on device", e);
}
}, 1, 1, TimeUnit.SECONDS);
}
private void submitPostProcessingJob(Recording recording) {
@ -150,6 +155,9 @@ public class NextGenLocalRecorder implements Recorder {
recordingManager.saveRecording(recording);
deleteIfTooShort(recording);
} catch (Exception e) {
if (e instanceof InterruptedException) { // NOSONAR
Thread.currentThread().interrupt();
}
LOG.error("Error while post-processing recording {}", recording, e);
recording.setStatus(State.FAILED);
try {
@ -240,14 +248,16 @@ public class NextGenLocalRecorder implements Recorder {
LOG.debug("Starting recording for model {}", model.getName());
Download download = model.createDownload();
download.init(config, model);
download.init(config, model, Instant.now());
Objects.requireNonNull(download.getStartTime(),
"At this point the download should have set a startTime. Make sure to set a startTime in " + download.getClass() + ".init()");
LOG.debug("Downloading with {}", download.getClass().getSimpleName());
Recording rec = new Recording();
rec.setDownload(download);
rec.setPath(download.getPath(model).replaceAll("\\\\", "/"));
rec.setModel(model);
rec.setStartDate(Instant.ofEpochMilli(System.currentTimeMillis()));
rec.setStartDate(download.getStartTime());
recordingProcesses.put(model, rec);
recordingManager.add(rec);
completionService.submit(() -> {
@ -621,7 +631,7 @@ public class NextGenLocalRecorder implements Recorder {
for (Recording other : recordings) {
if(other.equals(recording)) {
Download download = other.getModel().createDownload();
download.init(Config.getInstance(), other.getModel());
download.init(Config.getInstance(), other.getModel(), other.getStartDate());
other.setDownload(download);
submitPostProcessingJob(other);
return;

View File

@ -1,6 +1,8 @@
package ctbrec.recorder.download;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import org.slf4j.Logger;
@ -15,39 +17,42 @@ public abstract class AbstractDownload implements Download {
private static final Logger LOG = LoggerFactory.getLogger(AbstractDownload.class);
protected void runPostProcessingScript(Recording recording) {
protected Instant startTime;
protected void runPostProcessingScript(Recording recording) throws IOException, InterruptedException {
String postProcessing = Config.getInstance().getSettings().postProcessing;
if (postProcessing != null && !postProcessing.isEmpty()) {
File target = recording.getAbsoluteFile();
Runtime rt = Runtime.getRuntime();
try {
String[] args = new String[] {
postProcessing,
target.getParentFile().getAbsolutePath(),
target.getAbsolutePath(),
getModel().getName(),
getModel().getSite().getName(),
Long.toString(recording.getStartDate().getEpochSecond())
};
if(LOG.isDebugEnabled()) {
LOG.debug("Running {}", Arrays.toString(args));
}
Process process = rt.exec(args, OS.getEnvironment());
// TODO maybe write these to a separate log file, e.g. recname.ts.pp.log
Thread std = new Thread(new StreamRedirectThread(process.getInputStream(), System.out));
std.setName("Process stdout pipe");
std.setDaemon(true);
std.start();
Thread err = new Thread(new StreamRedirectThread(process.getErrorStream(), System.err));
err.setName("Process stderr pipe");
err.setDaemon(true);
err.start();
process.waitFor();
LOG.debug("Process finished.");
} catch (Exception e) {
LOG.error("Error in process thread", e);
String[] args = new String[] {
postProcessing,
target.getParentFile().getAbsolutePath(),
target.getAbsolutePath(),
getModel().getName(),
getModel().getSite().getName(),
Long.toString(recording.getStartDate().getEpochSecond())
};
if(LOG.isDebugEnabled()) {
LOG.debug("Running {}", Arrays.toString(args));
}
Process process = rt.exec(args, OS.getEnvironment());
// TODO maybe write these to a separate log file, e.g. recname.ts.pp.log
Thread std = new Thread(new StreamRedirectThread(process.getInputStream(), System.out));
std.setName("Process stdout pipe");
std.setDaemon(true);
std.start();
Thread err = new Thread(new StreamRedirectThread(process.getErrorStream(), System.err));
err.setName("Process stderr pipe");
err.setDaemon(true);
err.start();
process.waitFor();
LOG.debug("Process finished.");
}
}
@Override
public Instant getStartTime() {
return startTime;
}
}

View File

@ -10,7 +10,7 @@ import ctbrec.Model;
import ctbrec.Recording;
public interface Download {
public void init(Config config, Model model);
public void init(Config config, Model model, Instant startTime);
public void start() throws IOException;
public void stop();
public Model getModel();

View File

@ -0,0 +1,7 @@
package ctbrec.recorder.download;
public class ProcessExitedUncleanException extends RuntimeException {
public ProcessExitedUncleanException(String msg) {
super(msg);
}
}

View File

@ -32,10 +32,11 @@ import org.slf4j.LoggerFactory;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.Recording;
import ctbrec.Recording.State;
import ctbrec.io.HttpClient;
import ctbrec.io.HttpException;
import ctbrec.recorder.download.AbstractDownload;
import ctbrec.recorder.download.dash.SegmentTimelineType.S;
import ctbrec.recorder.download.hls.PostProcessingException;
import okhttp3.Request;
import okhttp3.Response;
@ -53,7 +54,6 @@ public class DashDownload extends AbstractDownload {
private HttpClient httpClient;
private Config config;
private Model model;
private Instant startTime;
private Instant endTime;
private Path downloadDir;
private String manifestUrl;
@ -80,7 +80,11 @@ public class DashDownload extends AbstractDownload {
.build(); // @formatter:on
LOG.trace("Loading manifest {}", url);
try (Response response = httpClient.execute(request)) {
return response.body().string();
if (response.isSuccessful()) {
return response.body().string();
} else {
throw new HttpException(response.code(), "Couldn't load manifest: " + response.message());
}
}
}
@ -181,7 +185,7 @@ public class DashDownload extends AbstractDownload {
File segmentFile = new File(dir, prefix + '_' + df.format(c) + '_' + new File(absFile).getName());
while (tries <= 10) {
if (!segmentFile.exists() || segmentFile.length() == 0) {
if (tries > 1) {
if (tries == 10) {
LOG.debug("Loading segment, try {}, {} {} {}", tries, response.code(), response.headers().values("Content-Length"), url);
} else {
LOG.trace("Loading segment, try {}, {} {} {}", tries, response.code(), response.headers().values("Content-Length"), url);
@ -204,11 +208,11 @@ public class DashDownload extends AbstractDownload {
}
@Override
public void init(Config config, Model model) {
public void init(Config config, Model model, Instant startTime) {
this.config = config;
this.model = model;
startTime = Instant.now();
File finalFile = Config.getInstance().getFileForRecording(model, "mp4");
this.startTime = startTime;
File finalFile = Config.getInstance().getFileForRecording(model, "mp4", startTime);
targetFile = new File(finalFile.getParentFile(), finalFile.getName() + ".part");
downloadDir = targetFile.toPath();
}
@ -241,10 +245,10 @@ public class DashDownload extends AbstractDownload {
}
private boolean splitRecording() {
if(config.getSettings().splitRecordings > 0) {
if (config.getSettings().splitRecordings > 0) {
Duration recordingDuration = Duration.between(splitRecStartTime, ZonedDateTime.now());
long seconds = recordingDuration.getSeconds();
if(seconds >= config.getSettings().splitRecordings) {
if (seconds >= config.getSettings().splitRecordings) {
internalStop();
return true;
}
@ -331,11 +335,6 @@ public class DashDownload extends AbstractDownload {
return model;
}
@Override
public Instant getStartTime() {
return startTime;
}
@Override
public Duration getLength() {
return Duration.between(startTime, Optional.ofNullable(endTime).orElse(Instant.now()));
@ -352,9 +351,8 @@ public class DashDownload extends AbstractDownload {
targetFile = file;
recording.setPath(path.substring(0, path.length() - 5));
runPostProcessingScript(recording);
} catch (IOException e) {
LOG.error("Error while merging dash segments", e);
recording.setStatus(State.FAILED);
} catch (Exception e) {
throw new PostProcessingException(e);
}
}

View File

@ -12,6 +12,7 @@ import org.slf4j.LoggerFactory;
import ctbrec.OS;
import ctbrec.io.StreamRedirectThread;
import ctbrec.recorder.download.ProcessExitedUncleanException;
public class FfmpegMuxer {
private static final Logger LOG = LoggerFactory.getLogger(FfmpegMuxer.class);
@ -96,10 +97,4 @@ public class FfmpegMuxer {
return 1;
}
}
public static class ProcessExitedUncleanException extends RuntimeException {
public ProcessExitedUncleanException(String msg) {
super(msg);
}
}
}

View File

@ -4,7 +4,6 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@ -51,7 +50,6 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
protected HttpClient client;
protected volatile boolean running = false;
protected Instant startTime;
protected Model model = new UnknownModel();
protected BlockingQueue<Runnable> downloadQueue = new LinkedBlockingQueue<>(50);
protected ExecutorService downloadThreadPool = new ThreadPoolExecutor(5, 5, 2, TimeUnit.MINUTES, downloadQueue, createThreadFactory());
@ -180,17 +178,12 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
abstract void internalStop();
@Override
public Instant getStartTime() {
return startTime;
}
@Override
public Model getModel() {
return model;
}
public static class SegmentPlaylist {
public String url;

View File

@ -64,14 +64,14 @@ public class HlsDownload extends AbstractHlsDownload {
}
@Override
public void init(Config config, Model model) {
public void init(Config config, Model model, Instant startTime) {
this.config = config;
super.model = model;
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.RECORDING_DATE_FORMAT);
this.startTime = Instant.now();
String startTime = formatter.format(ZonedDateTime.ofInstant(this.startTime, ZoneId.systemDefault()));
this.startTime = startTime;
String formattedStartTime = formatter.format(ZonedDateTime.ofInstant(this.startTime, ZoneId.systemDefault()));
Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getSanitizedNamed());
downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), startTime);
downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), formattedStartTime);
}
@Override
@ -179,7 +179,11 @@ public class HlsDownload extends AbstractHlsDownload {
recording.setStatusWithEvent(State.GENERATING_PLAYLIST);
generatePlaylist(recording);
recording.setStatusWithEvent(State.POST_PROCESSING);
runPostProcessingScript(recording);
try {
runPostProcessingScript(recording);
} catch (Exception e) {
throw new PostProcessingException(e);
}
}
protected File generatePlaylist(Recording recording) {

View File

@ -9,6 +9,7 @@ import java.nio.file.Files;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import org.jcodec.containers.mp4.MP4Util;
@ -27,6 +28,7 @@ import ctbrec.io.HttpClient;
import ctbrec.io.StreamRedirectThread;
import ctbrec.recorder.ProgressListener;
import ctbrec.recorder.RecordingManager;
import ctbrec.recorder.download.ProcessExitedUncleanException;
import okhttp3.Request;
import okhttp3.Response;
@ -40,9 +42,15 @@ public class MergedHlsDownload extends HlsDownload {
}
@Override
public void init(Config config, Model model) {
super.init(config, model);
finalFile = Config.getInstance().getFileForRecording(model, "mp4");
public void init(Config config, Model model, Instant startTime) {
super.init(config, model, startTime);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
finalFile = Config.getInstance().getFileForRecording(model, "mp4", startTime);
downloadDir = finalFile.getParentFile().toPath();
}
@Override
@ -60,12 +68,12 @@ public class MergedHlsDownload extends HlsDownload {
}
runPostProcessingScript(recording);
} catch (PostProcessingException | IOException e) {
LOG.error("An error occurred during post-processing", e);
} catch (Exception e) {
throw new PostProcessingException(e);
}
}
private void postprocess(File playlist, File target) throws PostProcessingException {
private void postprocess(File playlist, File target) {
try {
File dir = playlist.getParentFile();
// @formatter:off
@ -90,7 +98,7 @@ public class MergedHlsDownload extends HlsDownload {
Files.delete(segment.toPath());
}
} else {
throw new PostProcessingException("FFmpeg exit code was " + exitCode);
throw new ProcessExitedUncleanException("FFmpeg exit code was " + exitCode);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@ -101,7 +109,7 @@ public class MergedHlsDownload extends HlsDownload {
}
public void downloadFinishedRecording(String segmentPlaylistUri, File target, ProgressListener progressListener)
throws IOException, ParseException, PlaylistException, InvalidKeyException, NoSuchAlgorithmException, PostProcessingException {
throws IOException, ParseException, PlaylistException, InvalidKeyException, NoSuchAlgorithmException {
if (Config.getInstance().getSettings().requireAuthentication) {
URL u = new URL(segmentPlaylistUri);
String path = u.getPath();

View File

@ -1,9 +1,13 @@
package ctbrec.recorder.download.hls;
public class PostProcessingException extends Exception {
public class PostProcessingException extends RuntimeException {
public PostProcessingException(String msg) {
super(msg);
}
public PostProcessingException(Exception cause) {
super(cause);
}
}