From 7e03b488957f911a42114bc4a9f284c1ccca64b8 Mon Sep 17 00:00:00 2001 From: 0xb00bface <0xboobface@gmail.com> Date: Sat, 26 Dec 2020 13:07:14 +0100 Subject: [PATCH] Fixed error handling for new multi-threading --- .../src/main/java/ctbrec/io/HttpClient.java | 7 + common/src/main/java/ctbrec/io/IoUtils.java | 13 +- .../ctbrec/recorder/NextGenLocalRecorder.java | 148 ++++++++------- .../ctbrec/recorder/download/Download.java | 1 + .../recorder/download/dash/DashDownload.java | 6 + .../download/hls/AbstractHlsDownload2.java | 175 ++++++++++-------- .../recorder/download/hls/FFmpegDownload.java | 6 + .../recorder/download/hls/HlsDownload.java | 22 +-- .../download/hls/MergedFfmpegHlsDownload.java | 3 +- 9 files changed, 217 insertions(+), 164 deletions(-) diff --git a/common/src/main/java/ctbrec/io/HttpClient.java b/common/src/main/java/ctbrec/io/HttpClient.java index e8191a4e..583f366b 100644 --- a/common/src/main/java/ctbrec/io/HttpClient.java +++ b/common/src/main/java/ctbrec/io/HttpClient.java @@ -120,6 +120,13 @@ public abstract class HttpClient { return resp; } + public Response execute(Request request, int timeoutInMillis) throws IOException { + return client.newBuilder() // + .connectTimeout(timeoutInMillis, TimeUnit.MILLISECONDS) // + .readTimeout(timeoutInMillis, TimeUnit.MILLISECONDS).build() // + .newCall(request).execute(); + } + public abstract boolean login() throws IOException; public void reconfigure() { diff --git a/common/src/main/java/ctbrec/io/IoUtils.java b/common/src/main/java/ctbrec/io/IoUtils.java index a76d6879..162879c2 100644 --- a/common/src/main/java/ctbrec/io/IoUtils.java +++ b/common/src/main/java/ctbrec/io/IoUtils.java @@ -2,6 +2,7 @@ package ctbrec.io; import java.io.File; import java.io.IOException; +import java.io.OutputStream; import java.nio.file.FileVisitOption; import java.nio.file.FileVisitResult; import java.nio.file.Files; @@ -59,7 +60,7 @@ public class IoUtils { public static long getDirectorySize(File dir) { final long[] size = { 0 }; - int maxDepth = 1; // Don't expect subdirs, so don't even try + int maxDepth = 7; try { Files.walkFileTree(dir.toPath(), EnumSet.noneOf(FileVisitOption.class), maxDepth, new SimpleFileVisitor() { @Override @@ -79,4 +80,14 @@ public class IoUtils { } return size[0]; } + + public static void close(OutputStream outputStream, String errorMsg) { + if (outputStream != null) { + try { + outputStream.close(); + } catch (IOException e) { + LOG.error(errorMsg, e); + } + } + } } diff --git a/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java index 0ef7e692..5e4b92e6 100644 --- a/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java +++ b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java @@ -68,9 +68,10 @@ public class NextGenLocalRecorder implements Recorder { private RecordingPreconditions preconditions; // thread pools for downloads and post-processing - private ScheduledExecutorService downloadPool = Executors.newScheduledThreadPool(30, createThreadFactory("Download")); + private ScheduledExecutorService downloadPool = Executors.newScheduledThreadPool(10, createThreadFactory("Download")); private ExecutorService downloadCompletionPool = Executors.newFixedThreadPool(1, createThreadFactory("DownloadCompletionWorker")); private BlockingQueue> downloadFutureQueue = new LinkedBlockingQueue<>(); + private Map, Recording> downloadFutureRecordingMap = Collections.synchronizedMap(new HashMap<>()); private BlockingQueue ppQueue = new LinkedBlockingQueue<>(); private ThreadPoolExecutor ppPool; @@ -120,59 +121,70 @@ public class NextGenLocalRecorder implements Recorder { } private void startCompletionHandler() { - for (int i = 0; i < 1; i++) { - downloadCompletionPool.submit(() -> { - while (!Thread.currentThread().isInterrupted()) { - try { - ScheduledFuture future = downloadFutureQueue.take(); - if (!future.isDone()) { - downloadFutureQueue.put(future); - } else { - Recording rec = future.get(); - Download d = rec.getDownload(); - if (d.isRunning()) { - long delay = Math.max(0, Duration.between(Instant.now(), d.getRescheduleTime()).toMillis()); - // LOG.debug("Download still running. Scheduling to run in {} ms", delay); - downloadFutureQueue.add(downloadPool.schedule(rec, delay, TimeUnit.MILLISECONDS)); - } else { - try { - boolean deleted = deleteIfEmpty(rec); - setRecordingStatus(rec, deleted ? State.DELETED : State.WAITING); - if (!deleted) { - // only save the status, if the recording has not been deleted, otherwise we recreate the metadata file - recordingManager.saveRecording(rec); - } - } catch (InvalidKeyException | NoSuchAlgorithmException | IOException e) { - LOG.error("Couldn't execute post-processing step \"delete if empty\"", e); - } + downloadCompletionPool.submit(() -> { + while (!Thread.currentThread().isInterrupted()) { + try { + ScheduledFuture future = downloadFutureQueue.take(); + rescheduleRecordingTask(future); + Thread.sleep(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Error while getting recording result from download queue", e); + } + } + }); + } - recorderLock.lock(); - try { - recordingProcesses.remove(rec.getModel()); - } finally { - recorderLock.unlock(); - } - if (rec.getStatus() == State.WAITING) { - LOG.info("Download finished for {} -> Starting post-processing", rec.getModel().getName()); - submitPostProcessingJob(rec); + private void rescheduleRecordingTask(ScheduledFuture future) throws InterruptedException{ + try { + if (!future.isDone()) { + downloadFutureQueue.put(future); + } else { + downloadFutureRecordingMap.remove(future); + Recording rec = future.get(); + Download d = rec.getDownload(); + if (d.isRunning()) { + long delay = Math.max(0, Duration.between(Instant.now(), d.getRescheduleTime()).toMillis()); + ScheduledFuture rescheduledFuture = downloadPool.schedule(rec, delay, TimeUnit.MILLISECONDS); + downloadFutureQueue.add(rescheduledFuture); + } else { + d.finalizeDownload(); + deleteIfEmpty(rec); + removeRecordingProcess(rec); + if (rec.getStatus() == State.WAITING) { + LOG.info("Download finished for {} -> Starting post-processing", rec.getModel().getName()); + submitPostProcessingJob(rec); - // check, if we have to restart the recording - Model model = rec.getModel(); - tryRestartRecording(model); - } else { - setRecordingStatus(rec, State.FAILED); - } - } - } - Thread.sleep(1); - } catch (ExecutionException | IllegalStateException e) { - LOG.error("Error while completing recording", e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("Error while completing recording", e); + // check, if we have to restart the recording + Model model = rec.getModel(); + tryRestartRecording(model); + } else { + setRecordingStatus(rec, State.FAILED); } } - }); + } + } catch (ExecutionException | IllegalStateException e) { + fail(future, e); + } + } + + private void fail(ScheduledFuture future, Exception e) { + if (downloadFutureRecordingMap.containsKey(future)) { + Recording rec = downloadFutureRecordingMap.remove(future); + removeRecordingProcess(rec); + rec.getDownload().finalizeDownload(); + LOG.error("Error while recording stream for model {}", rec.getModel(), e); + } else { + LOG.error("Error while recording stream", e); + } + } + + private void removeRecordingProcess(Recording rec) { + recorderLock.lock(); + try { + recordingProcesses.remove(rec.getModel()); + } finally { + recorderLock.unlock(); } } @@ -245,7 +257,9 @@ public class NextGenLocalRecorder implements Recorder { setRecordingStatus(rec, State.RECORDING); rec.getModel().setLastRecorded(rec.getStartDate()); recordingManager.saveRecording(rec); - downloadFutureQueue.add(downloadPool.schedule(rec, 0, TimeUnit.MILLISECONDS)); + ScheduledFuture future = downloadPool.schedule(rec, 0, TimeUnit.MILLISECONDS); + downloadFutureQueue.add(future); + downloadFutureRecordingMap.put(future, rec); } catch (RecordUntilExpiredException e) { LOG.info("Precondition not met to record {}: {}", model, e.getLocalizedMessage()); executeRecordUntilSubsequentAction(model); @@ -297,16 +311,25 @@ public class NextGenLocalRecorder implements Recorder { return rec; } - private boolean deleteIfEmpty(Recording rec) throws IOException, InvalidKeyException, NoSuchAlgorithmException { - rec.refresh(); - long sizeInByte = rec.getSizeInByte(); - if (sizeInByte <= 0) { - LOG.info("Deleting empty recording {}", rec); - delete(rec); - return true; - } else { - return false; + private boolean deleteIfEmpty(Recording rec) { + boolean deleted = false; + try { + rec.refresh(); + long sizeInByte = rec.getSizeInByte(); + if (sizeInByte <= 0) { + LOG.info("Deleting empty recording {}", rec); + delete(rec); + deleted = true; + } + setRecordingStatus(rec, deleted ? State.DELETED : State.WAITING); + if (!deleted) { + // only save the status, if the recording has not been deleted, otherwise we recreate the metadata file + recordingManager.saveRecording(rec); + } + } catch (IOException e) { + LOG.error("Couldn't execute post-processing step \"delete if empty\"", e); } + return deleted; } @Override @@ -397,18 +420,17 @@ public class NextGenLocalRecorder implements Recorder { } @Override - public List getRecordings() throws IOException, InvalidKeyException, NoSuchAlgorithmException { + public List getRecordings() throws IOException { return recordingManager.getAll(); } @Override - public void delete(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException { + public void delete(Recording recording) throws IOException { recordingManager.delete(recording); } @Override public void shutdown(boolean immediately) { - // TODO add a config flag for waitign or stopping immediately LOG.info("Shutting down"); recording = false; diff --git a/common/src/main/java/ctbrec/recorder/download/Download.java b/common/src/main/java/ctbrec/recorder/download/Download.java index 23fca731..391fb230 100644 --- a/common/src/main/java/ctbrec/recorder/download/Download.java +++ b/common/src/main/java/ctbrec/recorder/download/Download.java @@ -14,6 +14,7 @@ import ctbrec.Recording; public interface Download extends Serializable, Callable { void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException; void stop(); + void finalizeDownload(); boolean isRunning(); Model getModel(); Instant getStartTime(); diff --git a/common/src/main/java/ctbrec/recorder/download/dash/DashDownload.java b/common/src/main/java/ctbrec/recorder/download/dash/DashDownload.java index 29f9f229..e71708b3 100644 --- a/common/src/main/java/ctbrec/recorder/download/dash/DashDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/dash/DashDownload.java @@ -429,4 +429,10 @@ public class DashDownload extends AbstractDownload { return running; } + @Override + public void finalizeDownload() { + // TODO Auto-generated method stub + + } + } diff --git a/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload2.java b/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload2.java index d75a8389..f567e915 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload2.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload2.java @@ -10,10 +10,12 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.SocketTimeoutException; import java.net.URL; import java.nio.charset.StandardCharsets; import java.text.DecimalFormat; import java.text.NumberFormat; +import java.time.Duration; import java.time.Instant; import java.util.Collections; import java.util.HashMap; @@ -80,7 +82,8 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { protected transient String segmentPlaylistUrl; private transient Instant previousPlaylistRequest = Instant.EPOCH; - private transient Instant lastPlaylistRequest= Instant.EPOCH; + private transient Instant afterLastPlaylistRequest= Instant.EPOCH; + private transient Instant beforeLastPlaylistRequest= Instant.EPOCH; protected Model model = new UnknownModel(); @@ -88,11 +91,9 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { this.client = client; } - protected abstract void execute(SegmentDownload segmentDownload); protected abstract OutputStream getSegmentOutputStream(String prefix, String fileName) throws IOException; protected void segmentDownloadFinished(SegmentDownload segmentDownload) {} protected abstract void internalStop(); - protected void finalizeDownload() {} @Override public void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException { @@ -110,9 +111,10 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { segmentPlaylistUrl = getSegmentPlaylistUrl(model); } + previousPlaylistRequest = beforeLastPlaylistRequest; + beforeLastPlaylistRequest = Instant.now(); SegmentPlaylist segmentPlaylist = getNextSegments(segmentPlaylistUrl); - previousPlaylistRequest = lastPlaylistRequest; - lastPlaylistRequest = Instant.now(); + afterLastPlaylistRequest = Instant.now(); emptyPlaylistCheck(segmentPlaylist); handleMissedSegments(segmentPlaylist, nextSegmentNumber); enqueueNewSegments(segmentPlaylist, nextSegmentNumber); @@ -126,23 +128,53 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { nextSegmentNumber = lastSegmentNumber + segmentPlaylist.segments.size(); } } catch (ParseException e) { + LOG.error("Couldn't parse HLS playlist for model " + model + "\n" + e.getInput(), e); running = false; - throw new IOException("Couldn't parse HLS playlist for model " + model + "\n" + e.getInput(), e); } catch (PlaylistException e) { + LOG.error("Couldn't parse HLS playlist for model " + model, e); running = false; - throw new IOException("Couldn't parse HLS playlist for model " + model, e); } catch (EOFException e) { // end of playlist reached LOG.debug("Reached end of playlist for model {}", model); + running = false; } catch (HttpException e) { handleHttpException(e); } catch (Exception e) { + LOG.error("Couldn't download segment", e); running = false; - throw new IOException("Couldn't download segment", e); } return this; } + protected void execute(SegmentDownload segmentDownload) { + segmentDownload.call(); + } + + private void handleHttpException(HttpException e) throws IOException { + if (e.getResponseCode() == 404) { + checkIfModelIsStillOnline("Playlist not found (404). Model {} probably went offline. Model state: {}"); + } else if (e.getResponseCode() == 403) { + checkIfModelIsStillOnline("Playlist access forbidden (403). Model {} probably went private or offline. Model state: {}"); + } else { + LOG.info("Playlist couldn't not be downloaded for model {}. Stopping recording", model, e); + running = false; + } + } + + protected void checkIfModelIsStillOnline(String errorMsg) throws IOException { + ctbrec.Model.State modelState; + try { + modelState = model.getOnlineState(false); + if (modelState != State.ONLINE) { + running = false; + } + } catch (ExecutionException e1) { + modelState = ctbrec.Model.State.UNKNOWN; + } + LOG.info(errorMsg, model, modelState); + waitSomeTime(TEN_SECONDS); + } + protected String getSegmentPlaylistUrl(Model model) throws IOException, ExecutionException, ParseException, PlaylistException, JAXBException { LOG.debug("{} stream idx: {}", model.getName(), model.getStreamUrlIndex()); List streamSources = model.getStreamSources(); @@ -182,49 +214,63 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { addHeaders(builder, Optional.ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentPlaylistHeaders).orElse(new HashMap<>()), model); Request request = builder.build(); - try (Response response = client.execute(request)) { - if (response.isSuccessful()) { - String body = response.body().string(); - if (!body.contains("#EXTINF")) { - // no segments, empty playlist - return new SegmentPlaylist(segmentsURL); - } - - byte[] bytes = body.getBytes(StandardCharsets.UTF_8); - BandwidthMeter.add(bytes.length); - InputStream inputStream = new ByteArrayInputStream(bytes); - PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT); - Playlist playlist = parser.parse(); - if (playlist.hasMediaPlaylist()) { - MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist(); - SegmentPlaylist lsp = new SegmentPlaylist(segmentsURL); - lsp.seq = mediaPlaylist.getMediaSequenceNumber(); - lsp.targetDuration = mediaPlaylist.getTargetDuration(); - - List tracks = mediaPlaylist.getTracks(); - for (TrackData trackData : tracks) { - String uri = trackData.getUri(); - if (!uri.startsWith("http")) { - URL context = new URL(segmentsURL); - uri = new URL(context, uri).toExternalForm(); - } - lsp.totalDuration += trackData.getTrackInfo().duration; - lsp.segments.add(uri); - if (trackData.hasEncryptionData()) { - lsp.encrypted = true; - EncryptionData data = trackData.getEncryptionData(); - lsp.encryptionKeyUrl = data.getUri(); - lsp.encryptionMethod = data.getMethod().getValue(); - } + for (int i = 1; i <= 3; i++) { + Instant before = Instant.now(); + try (Response response = client.execute(request, 1000)) { + if (response.isSuccessful()) { + String body = response.body().string(); + if (!body.contains("#EXTINF")) { + // no segments, empty playlist + return new SegmentPlaylist(segmentsURL); } - lsp.avgSegDuration = lsp.totalDuration / tracks.size(); - return lsp; + + byte[] bytes = body.getBytes(StandardCharsets.UTF_8); + BandwidthMeter.add(bytes.length); + InputStream inputStream = new ByteArrayInputStream(bytes); + PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8, ParsingMode.LENIENT); + Playlist playlist = parser.parse(); + if (playlist.hasMediaPlaylist()) { + MediaPlaylist mediaPlaylist = playlist.getMediaPlaylist(); + SegmentPlaylist lsp = new SegmentPlaylist(segmentsURL); + lsp.seq = mediaPlaylist.getMediaSequenceNumber(); + lsp.targetDuration = mediaPlaylist.getTargetDuration(); + + List tracks = mediaPlaylist.getTracks(); + for (TrackData trackData : tracks) { + String uri = trackData.getUri(); + if (!uri.startsWith("http")) { + URL context = new URL(segmentsURL); + uri = new URL(context, uri).toExternalForm(); + } + lsp.totalDuration += trackData.getTrackInfo().duration; + lsp.segments.add(uri); + if (trackData.hasEncryptionData()) { + lsp.encrypted = true; + EncryptionData data = trackData.getEncryptionData(); + lsp.encryptionKeyUrl = data.getUri(); + lsp.encryptionMethod = data.getMethod().getValue(); + } + } + lsp.avgSegDuration = lsp.totalDuration / tracks.size(); + Duration d = Duration.between(before, Instant.now()); + if(i > 1) { + LOG.trace("Playlist request took {} ms", d.toMillis()); + } + return lsp; + } + throw new InvalidPlaylistException("Playlist has no media playlist"); + } else { + throw new HttpException(response.code(), response.message()); + } + } catch (SocketTimeoutException e) { + if (i == 3) { + throw e; + } else { + LOG.trace("Playlist timeout {} for model {}", i, model); } - throw new InvalidPlaylistException("Playlist has no media playlist"); - } else { - throw new HttpException(response.code(), response.message()); } } + throw new InvalidPlaylistException("Playlist could not be downloaded in time"); } protected void emptyPlaylistCheck(SegmentPlaylist playlist) { @@ -247,8 +293,8 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { private void handleMissedSegments(SegmentPlaylist playlist, int nextSegmentNumber) { if (nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) { waitFactor *= 2; - LOG.warn("Missed segments {} < {} in download for {} - setting wait factor to 1/{}. Last 2 playlist requests at [{}] [{}] schedule was {}", nextSegmentNumber, playlist.seq, model, - waitFactor, previousPlaylistRequest, lastPlaylistRequest, rescheduleTime); + LOG.warn("Missed segments: {} < {} in download for {} - setting wait factor to 1/{}", nextSegmentNumber, playlist.seq, model, waitFactor); + LOG.warn("Missed segments: Last 2 [{}] [{}], schedule was {}, request took {} ms", previousPlaylistRequest, beforeLastPlaylistRequest, rescheduleTime, Duration.between(beforeLastPlaylistRequest, afterLastPlaylistRequest)); short missedSegments = (short) (playlist.seq - nextSegmentNumber); MissedSegmentsStatistics.increase(model, missedSegments); } @@ -295,6 +341,7 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { OutputStream targetStream = getSegmentOutputStream(prefix, tmp.getName()); SegmentDownload segmentDownload = new SegmentDownload(model, playlist, segmentUrl, client, targetStream); execute(segmentDownload); + segmentDownloadFinished(segmentDownload); } } } @@ -310,7 +357,7 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { waitForMillis = (long) (playlist.avgSegDuration * 1000); LOG.trace("Playlist changed. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis); } - rescheduleTime = Instant.now().plusMillis(waitForMillis); + rescheduleTime = beforeLastPlaylistRequest.plusMillis(waitForMillis); } /** @@ -329,34 +376,6 @@ public abstract class AbstractHlsDownload2 extends AbstractDownload { } } - private void handleHttpException(HttpException e) throws IOException { - if (e.getResponseCode() == 404) { - ctbrec.Model.State modelState; - try { - modelState = model.getOnlineState(false); - } catch (ExecutionException e1) { - modelState = ctbrec.Model.State.UNKNOWN; - } - LOG.info("Playlist not found (404). Model {} probably went offline. Model state: {}", model, modelState); - waitSomeTime(TEN_SECONDS); - } else if (e.getResponseCode() == 403) { - ctbrec.Model.State modelState; - try { - modelState = model.getOnlineState(false); - if (modelState != State.ONLINE) { - running = false; - } - } catch (ExecutionException e1) { - modelState = ctbrec.Model.State.UNKNOWN; - } - LOG.info("Playlist access forbidden (403). Model {} probably went private or offline. Model state: {}", model, modelState); - waitSomeTime(TEN_SECONDS); - } else { - running = false; - throw e; - } - } - protected static void addHeaders(Builder builder, Map headers, Model model) { headers.putIfAbsent(ACCEPT, "*/*"); headers.putIfAbsent(ACCEPT_LANGUAGE, Locale.ENGLISH.getLanguage()); diff --git a/common/src/main/java/ctbrec/recorder/download/hls/FFmpegDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/FFmpegDownload.java index 6703e587..750bb775 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/FFmpegDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/FFmpegDownload.java @@ -169,4 +169,10 @@ public class FFmpegDownload extends AbstractHlsDownload2 { return null; } + @Override + public void finalizeDownload() { + // TODO Auto-generated method stub + + } + } diff --git a/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java index 726723c8..7f296686 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/HlsDownload.java @@ -12,7 +12,6 @@ import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.regex.Pattern; @@ -57,7 +56,7 @@ public class HlsDownload extends AbstractHlsDownload2 { } @Override - protected void finalizeDownload() { + public void finalizeDownload() { LOG.debug("Download for {} terminated", model); } @@ -89,19 +88,6 @@ public class HlsDownload extends AbstractHlsDownload2 { } - @Override - protected void execute(SegmentDownload segmentDownload) { - CompletableFuture.supplyAsync(segmentDownload::call, downloadExecutor).whenComplete((result, exception) -> { - if (result != null) { - try { - result.getOutputStream().close(); - } catch (IOException e) { - LOG.error("Couldn't close segment file", e); - } - } - }); - } - @Override public void stop() { LOG.debug("Recording stopped"); @@ -144,10 +130,6 @@ public class HlsDownload extends AbstractHlsDownload2 { @Override protected void segmentDownloadFinished(SegmentDownload segmentDownload) { - try { - segmentDownload.getOutputStream().close(); - } catch (IOException e) { - LOG.warn("Couldn't close segment file"); - } + IoUtils.close(segmentDownload.getOutputStream(), "Couldn't close segment file"); } } diff --git a/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java index aa1065b8..367ab8fd 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java @@ -92,13 +92,12 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload2 { @Override - protected void finalizeDownload() { + public void finalizeDownload() { try { ffmpegThread.join(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - super.finalizeDownload(); } private void startFfmpegProcess(File target) {