forked from j62/ctbrec
1
0
Fork 0

Wait for the download to terminate before starting PP

Sometimes the PP was started before the last segments were downloaded.
This could cause unexpected effects. E.g. the playlist generator would
fail, because the number of segments chained during playlist generation.
This commit is contained in:
0xboobface 2018-12-16 16:14:53 +01:00
parent e621e49e00
commit ebb5310d26
3 changed files with 46 additions and 9 deletions

View File

@ -216,10 +216,14 @@ public class LocalRecorder implements Recorder {
private void stopRecordingProcess(Model model) { private void stopRecordingProcess(Model model) {
Download download = recordingProcesses.get(model); Download download = recordingProcesses.get(model);
download.stop();
recordingProcesses.remove(model); recordingProcesses.remove(model);
fireRecordingStateChanged(download.getTarget(), STOPPED, model, download.getStartTime()); fireRecordingStateChanged(download.getTarget(), STOPPED, model, download.getStartTime());
ppThreadPool.submit(createPostProcessor(download));
Runnable stopAndThePostProcess = () -> {
download.stop();
createPostProcessor(download).run();
};
ppThreadPool.submit(stopAndThePostProcess);
} }
private void postprocess(Download download) { private void postprocess(Download download) {
@ -551,6 +555,8 @@ public class LocalRecorder implements Recorder {
continue; continue;
} }
// TODO don't list recordings, which currently get deleted
Date startDate = sdf.parse(rec.getName()); Date startDate = sdf.parse(rec.getName());
Recording recording = new Recording(); Recording recording = new Recording();
recording.setModelName(subdir.getName()); recording.setModelName(subdir.getName());

View File

@ -44,6 +44,7 @@ public class HlsDownload extends AbstractHlsDownload {
private int segmentCounter = 1; private int segmentCounter = 1;
private NumberFormat nf = new DecimalFormat("000000"); private NumberFormat nf = new DecimalFormat("000000");
private Object downloadFinished = new Object();
public HlsDownload(HttpClient client) { public HlsDownload(HttpClient client) {
super(client); super(client);
@ -75,8 +76,7 @@ public class HlsDownload extends AbstractHlsDownload {
} }
int lastSegment = 0; int lastSegment = 0;
int nextSegment = 0; int nextSegment = 0;
boolean sleep = true; // this enables sleeping between playlist requests boolean sleep = true; // this enables sleeping between playlist requests. once we miss a segment, this is set to false, so that no sleeping happens anymore
// once we miss a segment, this is set to false, so that no sleeping happens anymore
while(running) { while(running) {
SegmentPlaylist lsp = getNextSegments(segments); SegmentPlaylist lsp = getNextSegments(segments);
if(nextSegment > 0 && lsp.seq > nextSegment) { if(nextSegment > 0 && lsp.seq > nextSegment) {
@ -137,12 +137,15 @@ public class HlsDownload extends AbstractHlsDownload {
} catch(Exception e) { } catch(Exception e) {
throw new IOException("Couldn't download segment", e); throw new IOException("Couldn't download segment", e);
} finally { } finally {
alive = false;
downloadThreadPool.shutdown(); downloadThreadPool.shutdown();
try { try {
LOG.debug("Waiting for last segments for {}", model); LOG.debug("Waiting for last segments for {}", model);
downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS); downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {} } catch (InterruptedException e) {}
alive = false;
synchronized (downloadFinished) {
downloadFinished.notifyAll();
}
LOG.debug("Download for {} terminated", model); LOG.debug("Download for {} terminated", model);
} }
} }
@ -150,7 +153,13 @@ public class HlsDownload extends AbstractHlsDownload {
@Override @Override
public void stop() { public void stop() {
running = false; running = false;
alive = false; try {
synchronized (downloadFinished) {
downloadFinished.wait();
}
} catch (InterruptedException e) {
LOG.error("Couldn't wait for download to finish", e);
}
} }
private static class SegmentDownload implements Callable<Boolean> { private static class SegmentDownload implements Callable<Boolean> {

View File

@ -66,6 +66,7 @@ public class MergedHlsDownload extends AbstractHlsDownload {
private BlockingQueue<Runnable> downloadQueue = new LinkedBlockingQueue<>(50); private BlockingQueue<Runnable> downloadQueue = new LinkedBlockingQueue<>(50);
private ExecutorService downloadThreadPool = new ThreadPoolExecutor(5, 5, 2, TimeUnit.MINUTES, downloadQueue); private ExecutorService downloadThreadPool = new ThreadPoolExecutor(5, 5, 2, TimeUnit.MINUTES, downloadQueue);
private FileChannel fileChannel = null; private FileChannel fileChannel = null;
private Object downloadFinished = new Object();
public MergedHlsDownload(HttpClient client) { public MergedHlsDownload(HttpClient client) {
super(client); super(client);
@ -105,13 +106,20 @@ public class MergedHlsDownload extends AbstractHlsDownload {
} catch (InvalidKeyException | NoSuchAlgorithmException | IllegalStateException e) { } catch (InvalidKeyException | NoSuchAlgorithmException | IllegalStateException e) {
throw new IOException("Couldn't add HMAC to playlist url", e); throw new IOException("Couldn't add HMAC to playlist url", e);
} finally { } finally {
alive = false;
try { try {
streamer.stop(); streamer.stop();
} catch(Exception e) { } catch(Exception e) {
LOG.error("Couldn't stop streamer", e); LOG.error("Couldn't stop streamer", e);
} }
downloadThreadPool.shutdown(); downloadThreadPool.shutdown();
try {
LOG.debug("Waiting for last segments for {}", model);
downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {}
alive = false;
synchronized (downloadFinished) {
downloadFinished.notifyAll();
}
LOG.debug("Download terminated for {}", segmentPlaylistUri); LOG.debug("Download terminated for {}", segmentPlaylistUri);
} }
} }
@ -155,7 +163,6 @@ public class MergedHlsDownload extends AbstractHlsDownload {
} catch(Exception e) { } catch(Exception e) {
throw new IOException("Couldn't download segment", e); throw new IOException("Couldn't download segment", e);
} finally { } finally {
alive = false;
if(streamer != null) { if(streamer != null) {
try { try {
streamer.stop(); streamer.stop();
@ -163,6 +170,15 @@ public class MergedHlsDownload extends AbstractHlsDownload {
LOG.error("Couldn't stop streamer", e); LOG.error("Couldn't stop streamer", e);
} }
} }
downloadThreadPool.shutdown();
try {
LOG.debug("Waiting for last segments for {}", model);
downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {}
alive = false;
synchronized (downloadFinished) {
downloadFinished.notifyAll();
}
LOG.debug("Download for {} terminated", model); LOG.debug("Download for {} terminated", model);
} }
} }
@ -353,10 +369,16 @@ public class MergedHlsDownload extends AbstractHlsDownload {
@Override @Override
public void stop() { public void stop() {
running = false; running = false;
alive = false;
if(streamer != null) { if(streamer != null) {
streamer.stop(); streamer.stop();
} }
try {
synchronized (downloadFinished) {
downloadFinished.wait();
}
} catch (InterruptedException e) {
LOG.error("Couldn't wait for download to finish", e);
}
LOG.debug("Download stopped"); LOG.debug("Download stopped");
} }