From ebb5310d262134265643be7dcf9bcf8e0c7a0032 Mon Sep 17 00:00:00 2001 From: 0xboobface <0xboobface@gmail.com> Date: Sun, 16 Dec 2018 16:14:53 +0100 Subject: [PATCH] Wait for the download to terminate before starting PP Sometimes the PP was started before the last segments were downloaded. This could cause unexpected effects. E.g. the playlist generator would fail, because the number of segments chained during playlist generation. --- .../java/ctbrec/recorder/LocalRecorder.java | 10 +++++-- .../ctbrec/recorder/download/HlsDownload.java | 17 ++++++++--- .../recorder/download/MergedHlsDownload.java | 28 +++++++++++++++++-- 3 files changed, 46 insertions(+), 9 deletions(-) diff --git a/common/src/main/java/ctbrec/recorder/LocalRecorder.java b/common/src/main/java/ctbrec/recorder/LocalRecorder.java index 6e16c809..5dafee65 100644 --- a/common/src/main/java/ctbrec/recorder/LocalRecorder.java +++ b/common/src/main/java/ctbrec/recorder/LocalRecorder.java @@ -216,10 +216,14 @@ public class LocalRecorder implements Recorder { private void stopRecordingProcess(Model model) { Download download = recordingProcesses.get(model); - download.stop(); recordingProcesses.remove(model); fireRecordingStateChanged(download.getTarget(), STOPPED, model, download.getStartTime()); - ppThreadPool.submit(createPostProcessor(download)); + + Runnable stopAndThePostProcess = () -> { + download.stop(); + createPostProcessor(download).run(); + }; + ppThreadPool.submit(stopAndThePostProcess); } private void postprocess(Download download) { @@ -551,6 +555,8 @@ public class LocalRecorder implements Recorder { continue; } + // TODO don't list recordings, which currently get deleted + Date startDate = sdf.parse(rec.getName()); Recording recording = new Recording(); recording.setModelName(subdir.getName()); diff --git a/common/src/main/java/ctbrec/recorder/download/HlsDownload.java b/common/src/main/java/ctbrec/recorder/download/HlsDownload.java index 9148f198..f682acda 100644 --- a/common/src/main/java/ctbrec/recorder/download/HlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/HlsDownload.java @@ -44,6 +44,7 @@ public class HlsDownload extends AbstractHlsDownload { private int segmentCounter = 1; private NumberFormat nf = new DecimalFormat("000000"); + private Object downloadFinished = new Object(); public HlsDownload(HttpClient client) { super(client); @@ -75,8 +76,7 @@ public class HlsDownload extends AbstractHlsDownload { } int lastSegment = 0; int nextSegment = 0; - boolean sleep = true; // this enables sleeping between playlist requests - // once we miss a segment, this is set to false, so that no sleeping happens anymore + boolean sleep = true; // this enables sleeping between playlist requests. once we miss a segment, this is set to false, so that no sleeping happens anymore while(running) { SegmentPlaylist lsp = getNextSegments(segments); if(nextSegment > 0 && lsp.seq > nextSegment) { @@ -137,12 +137,15 @@ public class HlsDownload extends AbstractHlsDownload { } catch(Exception e) { throw new IOException("Couldn't download segment", e); } finally { - alive = false; downloadThreadPool.shutdown(); try { LOG.debug("Waiting for last segments for {}", model); downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS); } catch (InterruptedException e) {} + alive = false; + synchronized (downloadFinished) { + downloadFinished.notifyAll(); + } LOG.debug("Download for {} terminated", model); } } @@ -150,7 +153,13 @@ public class HlsDownload extends AbstractHlsDownload { @Override public void stop() { running = false; - alive = false; + try { + synchronized (downloadFinished) { + downloadFinished.wait(); + } + } catch (InterruptedException e) { + LOG.error("Couldn't wait for download to finish", e); + } } private static class SegmentDownload implements Callable { diff --git a/common/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java b/common/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java index e298d48a..958fae17 100644 --- a/common/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java @@ -66,6 +66,7 @@ public class MergedHlsDownload extends AbstractHlsDownload { private BlockingQueue downloadQueue = new LinkedBlockingQueue<>(50); private ExecutorService downloadThreadPool = new ThreadPoolExecutor(5, 5, 2, TimeUnit.MINUTES, downloadQueue); private FileChannel fileChannel = null; + private Object downloadFinished = new Object(); public MergedHlsDownload(HttpClient client) { super(client); @@ -105,13 +106,20 @@ public class MergedHlsDownload extends AbstractHlsDownload { } catch (InvalidKeyException | NoSuchAlgorithmException | IllegalStateException e) { throw new IOException("Couldn't add HMAC to playlist url", e); } finally { - alive = false; try { streamer.stop(); } catch(Exception e) { LOG.error("Couldn't stop streamer", e); } downloadThreadPool.shutdown(); + try { + LOG.debug("Waiting for last segments for {}", model); + downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS); + } catch (InterruptedException e) {} + alive = false; + synchronized (downloadFinished) { + downloadFinished.notifyAll(); + } LOG.debug("Download terminated for {}", segmentPlaylistUri); } } @@ -155,7 +163,6 @@ public class MergedHlsDownload extends AbstractHlsDownload { } catch(Exception e) { throw new IOException("Couldn't download segment", e); } finally { - alive = false; if(streamer != null) { try { streamer.stop(); @@ -163,6 +170,15 @@ public class MergedHlsDownload extends AbstractHlsDownload { LOG.error("Couldn't stop streamer", e); } } + downloadThreadPool.shutdown(); + try { + LOG.debug("Waiting for last segments for {}", model); + downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS); + } catch (InterruptedException e) {} + alive = false; + synchronized (downloadFinished) { + downloadFinished.notifyAll(); + } LOG.debug("Download for {} terminated", model); } } @@ -353,10 +369,16 @@ public class MergedHlsDownload extends AbstractHlsDownload { @Override public void stop() { running = false; - alive = false; if(streamer != null) { streamer.stop(); } + try { + synchronized (downloadFinished) { + downloadFinished.wait(); + } + } catch (InterruptedException e) { + LOG.error("Couldn't wait for download to finish", e); + } LOG.debug("Download stopped"); }