forked from j62/ctbrec
1
0
Fork 0

Fix split recordings

Split recordings didn't work, because splitRecStartTime had been removed
by accident. Also the splitting now does not start a new recording, but
switches the output file in Streamer. This is a much cleaner and
smoother approach, because it is much faster and no segments are missed
This commit is contained in:
0xboobface 2018-11-28 16:03:21 +01:00
parent 3a7f2ceca6
commit 403c1ed2d0
3 changed files with 41 additions and 20 deletions

View File

@ -117,10 +117,6 @@ public class Config {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm"); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm");
String startTime = sdf.format(new Date()); String startTime = sdf.format(new Date());
File targetFile = new File(dirForRecording, model.getName() + '_' + startTime + ".ts"); File targetFile = new File(dirForRecording, model.getName() + '_' + startTime + ".ts");
if(getSettings().splitRecordings > 0) {
LOG.debug("Splitting recordings every {} seconds", getSettings().splitRecordings);
targetFile = new File(targetFile.getAbsolutePath().replaceAll("\\.ts", "-00000.ts"));
}
return targetFile; return targetFile;
} }

View File

@ -15,7 +15,6 @@ import java.nio.file.LinkOption;
import java.nio.file.Path; import java.nio.file.Path;
import java.security.InvalidKeyException; import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.text.DecimalFormat;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
@ -57,13 +56,12 @@ public class MergedHlsDownload extends AbstractHlsDownload {
private BlockingMultiMTSSource multiSource; private BlockingMultiMTSSource multiSource;
private Thread mergeThread; private Thread mergeThread;
private Streamer streamer; private Streamer streamer;
private ZonedDateTime startTime; private ZonedDateTime splitRecStartTime;
private Config config; private Config config;
private File targetFile; private File targetFile;
private DecimalFormat df = new DecimalFormat("00000");
private int splitCounter = 0;
private BlockingQueue<Runnable> downloadQueue = new LinkedBlockingQueue<>(50); private BlockingQueue<Runnable> downloadQueue = new LinkedBlockingQueue<>(50);
private ExecutorService downloadThreadPool = new ThreadPoolExecutor(5, 5, 2, TimeUnit.MINUTES, downloadQueue); private ExecutorService downloadThreadPool = new ThreadPoolExecutor(5, 5, 2, TimeUnit.MINUTES, downloadQueue);
private FileChannel fileChannel = null;
public MergedHlsDownload(HttpClient client) { public MergedHlsDownload(HttpClient client) {
super(client); super(client);
@ -78,6 +76,7 @@ public class MergedHlsDownload extends AbstractHlsDownload {
try { try {
running = true; running = true;
super.startTime = Instant.now(); super.startTime = Instant.now();
splitRecStartTime = ZonedDateTime.now();
mergeThread = createMergeThread(targetFile, progressListener, false); mergeThread = createMergeThread(targetFile, progressListener, false);
LOG.debug("Merge thread started"); LOG.debug("Merge thread started");
mergeThread.start(); mergeThread.start();
@ -123,6 +122,7 @@ public class MergedHlsDownload extends AbstractHlsDownload {
running = true; running = true;
super.startTime = Instant.now(); super.startTime = Instant.now();
splitRecStartTime = ZonedDateTime.now();
super.model = model; super.model = model;
targetFile = Config.getInstance().getFileForRecording(model); targetFile = Config.getInstance().getFileForRecording(model);
String segments = getSegmentPlaylistUrl(model); String segments = getSegmentPlaylistUrl(model);
@ -130,6 +130,9 @@ public class MergedHlsDownload extends AbstractHlsDownload {
mergeThread.start(); mergeThread.start();
if(segments != null) { if(segments != null) {
downloadSegments(segments, true); downloadSegments(segments, true);
if(config.getSettings().splitRecordings > 0) {
LOG.debug("Splitting recordings every {} seconds", config.getSettings().splitRecordings);
}
} else { } else {
throw new IOException("Couldn't determine segments uri"); throw new IOException("Couldn't determine segments uri");
} }
@ -194,7 +197,7 @@ public class MergedHlsDownload extends AbstractHlsDownload {
break; break;
} }
} catch(Exception e) { } catch(Exception e) {
LOG.info("Unexpected error while downloading ", model.getName()); LOG.info("Unexpected error while downloading {}", model.getName(), e);
running = false; running = false;
} }
} }
@ -226,6 +229,7 @@ public class MergedHlsDownload extends AbstractHlsDownload {
} }
// get completed downloads and write them to the file // get completed downloads and write them to the file
// TODO it might be a good idea to do this in a separate thread, so that the main download loop isn't blocked
writeFinishedSegments(downloads); writeFinishedSegments(downloads);
} }
@ -276,14 +280,20 @@ public class MergedHlsDownload extends AbstractHlsDownload {
private void splitRecording() { private void splitRecording() {
if(config.getSettings().splitRecordings > 0) { if(config.getSettings().splitRecordings > 0) {
Duration recordingDuration = Duration.between(startTime, ZonedDateTime.now()); Duration recordingDuration = Duration.between(splitRecStartTime, ZonedDateTime.now());
long seconds = recordingDuration.getSeconds(); long seconds = recordingDuration.getSeconds();
if(seconds >= config.getSettings().splitRecordings) { if(seconds >= config.getSettings().splitRecordings) {
streamer.stop(); try {
File target = new File(targetFile.getAbsolutePath().replaceAll("\\.ts", "-"+df.format(++splitCounter)+".ts")); targetFile = Config.getInstance().getFileForRecording(model);
mergeThread = createMergeThread(target, null, true); LOG.debug("Switching to file {}", targetFile.getAbsolutePath());
mergeThread.start(); fileChannel = FileChannel.open(targetFile.toPath(), CREATE, WRITE);
startTime = ZonedDateTime.now(); MTSSink sink = ByteChannelSink.builder().setByteChannel(fileChannel).build();
streamer.switchSink(sink);
splitRecStartTime = ZonedDateTime.now();
} catch (IOException e) {
LOG.error("Error while splitting recording", e);
running = false;
}
} }
} }
} }
@ -331,14 +341,13 @@ public class MergedHlsDownload extends AbstractHlsDownload {
.setProgressListener(listener) .setProgressListener(listener)
.build(); .build();
FileChannel channel = null;
try { try {
Path downloadDir = targetFile.getParentFile().toPath(); Path downloadDir = targetFile.getParentFile().toPath();
if (!Files.exists(downloadDir, LinkOption.NOFOLLOW_LINKS)) { if (!Files.exists(downloadDir, LinkOption.NOFOLLOW_LINKS)) {
Files.createDirectories(downloadDir); Files.createDirectories(downloadDir);
} }
channel = FileChannel.open(targetFile.toPath(), CREATE, WRITE); fileChannel = FileChannel.open(targetFile.toPath(), CREATE, WRITE);
MTSSink sink = ByteChannelSink.builder().setByteChannel(channel).build(); MTSSink sink = ByteChannelSink.builder().setByteChannel(fileChannel).build();
streamer = Streamer.builder() streamer = Streamer.builder()
.setSource(multiSource) .setSource(multiSource)
@ -358,9 +367,9 @@ public class MergedHlsDownload extends AbstractHlsDownload {
} catch(Exception e) { } catch(Exception e) {
LOG.error("Error while saving stream to file", e); LOG.error("Error while saving stream to file", e);
} finally { } finally {
closeFile(channel);
deleteEmptyRecording(targetFile); deleteEmptyRecording(targetFile);
running = false; running = false;
closeFile(fileChannel);
} }
}); });
t.setName("Segment Merger Thread [" + model.getName() + "]"); t.setName("Segment Merger Thread [" + model.getName() + "]");
@ -381,7 +390,7 @@ public class MergedHlsDownload extends AbstractHlsDownload {
private void closeFile(FileChannel channel) { private void closeFile(FileChannel channel) {
try { try {
if (channel != null) { if (channel != null && channel.isOpen()) {
channel.close(); channel.close();
} }
} catch (Exception e) { } catch (Exception e) {

View File

@ -64,6 +64,12 @@ public class Streamer {
bufferingThread.join(); bufferingThread.join();
streamingThread.join(); streamingThread.join();
try {
sink.close();
} catch(Exception e) {
log.error("Couldn't close sink", e);
}
} }
public void stop() { public void stop() {
@ -87,6 +93,16 @@ public class Streamer {
} }
} }
public void switchSink(MTSSink sink) {
MTSSink old = this.sink;
this.sink = sink;
try {
old.close();
} catch (Exception e) {
log.error("Couldn't close old sink while switching sinks", e);
}
}
private void internalStream() { private void internalStream() {
boolean resetState = false; boolean resetState = false;
MTSPacket packet = null; MTSPacket packet = null;