From 011e8c2f29e01738252fde7e31c3b67198252e9e Mon Sep 17 00:00:00 2001 From: 0xboobface <0xboobface@gmail.com> Date: Sat, 23 Nov 2019 17:13:27 +0100 Subject: [PATCH] Improve thread interrupt and lock handling --- .../ctbrec/ui/controls/StreamPreview.java | 2 + .../main/java/ctbrec/ui/controls/Toast.java | 1 + .../ctbrec/recorder/NextGenLocalRecorder.java | 92 +++++----- .../java/ctbrec/recorder/OnlineMonitor.java | 106 ++++++----- .../java/ctbrec/recorder/RemoteRecorder.java | 103 +++++------ .../download/AbstractHlsDownload.java | 13 +- .../ctbrec/recorder/download/HlsDownload.java | 44 +++-- .../recorder/download/MergedHlsDownload.java | 173 ++++++++---------- .../main/java/org/taktik/mpegts/Streamer.java | 148 +++++++-------- 9 files changed, 331 insertions(+), 351 deletions(-) diff --git a/client/src/main/java/ctbrec/ui/controls/StreamPreview.java b/client/src/main/java/ctbrec/ui/controls/StreamPreview.java index 122c4f58..74cbe074 100644 --- a/client/src/main/java/ctbrec/ui/controls/StreamPreview.java +++ b/client/src/main/java/ctbrec/ui/controls/StreamPreview.java @@ -123,6 +123,7 @@ public class StreamPreview extends StackPane { } showTestImage(); } catch (InterruptedException | InterruptedIOException e) { + Thread.currentThread().interrupt(); // future has been canceled, that's fine } catch (ExecutionException e) { if(e.getCause() instanceof InterruptedException || e.getCause() instanceof InterruptedIOException) { @@ -180,6 +181,7 @@ public class StreamPreview extends StackPane { private void checkInterrupt() throws InterruptedException { if(Thread.interrupted()) { + Thread.currentThread().interrupt(); throw new InterruptedException(); } } diff --git a/client/src/main/java/ctbrec/ui/controls/Toast.java b/client/src/main/java/ctbrec/ui/controls/Toast.java index 53b26a75..e622013d 100644 --- a/client/src/main/java/ctbrec/ui/controls/Toast.java +++ b/client/src/main/java/ctbrec/ui/controls/Toast.java @@ -40,6 +40,7 @@ public final class Toast { try { Thread.sleep(toastDelay); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } Timeline fadeOutTimeline = new Timeline(); KeyFrame fadeOutKey1 = new KeyFrame(Duration.millis(fadeOutDelay), new KeyValue(toastStage.getScene().getRoot().opacityProperty(), 0)); diff --git a/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java index f74dd9ea..0bbc2dc3 100644 --- a/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java +++ b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java @@ -53,7 +53,7 @@ import ctbrec.sites.Site; public class NextGenLocalRecorder implements Recorder { - private static final transient Logger LOG = LoggerFactory.getLogger(NextGenLocalRecorder.class); + private static final Logger LOG = LoggerFactory.getLogger(NextGenLocalRecorder.class); private static final boolean IGNORE_CACHE = true; private List models = Collections.synchronizedList(new ArrayList<>()); private Config config; @@ -78,7 +78,7 @@ public class NextGenLocalRecorder implements Recorder { public NextGenLocalRecorder(Config config, List sites) throws IOException { this.config = config; recordingManager = new RecordingManager(config, sites); - config.getSettings().models.stream().forEach((m) -> { + config.getSettings().models.stream().forEach(m -> { if (m.getSite() != null) { if (m.getSite().isEnabled()) { models.add(m); @@ -92,9 +92,6 @@ public class NextGenLocalRecorder implements Recorder { recording = true; registerEventBusListener(); - // if(Config.isServerMode()) { - // processUnfinishedRecordings(); - // } LOG.debug("Recorder initialized"); LOG.info("Models to record: {}", models); @@ -104,24 +101,27 @@ public class NextGenLocalRecorder implements Recorder { while (!Thread.interrupted()) { try { Future result = completionService.take(); - Recording recording = result.get(); + Recording rec = result.get(); recordingsLock.lock(); try { - recordingProcesses.remove(recording.getModel()); + recordingProcesses.remove(rec.getModel()); } finally { recordingsLock.unlock(); } - if (recording.getStatus() == State.WAITING) { - LOG.debug("Download finished for {} -> Starting post-processing", recording.getModel().getName()); - submitPostProcessingJob(recording); + if (rec.getStatus() == State.WAITING) { + LOG.debug("Download finished for {} -> Starting post-processing", rec.getModel().getName()); + submitPostProcessingJob(rec); // check, if we have to restart the recording - Model model = recording.getModel(); + Model model = rec.getModel(); tryRestartRecording(model); } else { - setRecordingStatus(recording, State.FAILED); + setRecordingStatus(rec, State.FAILED); } - } catch (InterruptedException | ExecutionException | IllegalStateException e) { + } catch (ExecutionException | IllegalStateException e) { + LOG.error("Error while completing recording", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); LOG.error("Error while completing recording", e); } } @@ -171,7 +171,7 @@ public class NextGenLocalRecorder implements Recorder { } @Override - public void startRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { + public void startRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException { if (!models.contains(model)) { LOG.info("Model {} added", model); modelLock.lock(); @@ -190,7 +190,10 @@ public class NextGenLocalRecorder implements Recorder { if (model.isOnline()) { startRecordingProcess(model); } - } catch (ExecutionException | InterruptedException e) { + } catch (ExecutionException e) { + // noop + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } } @@ -267,7 +270,7 @@ public class NextGenLocalRecorder implements Recorder { } } - private boolean deleteIfEmpty(Recording rec) throws IOException, ParseException, PlaylistException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { + private boolean deleteIfEmpty(Recording rec) throws IOException, InvalidKeyException, NoSuchAlgorithmException { rec.refresh(); long sizeInByte = rec.getSizeInByte(); if (sizeInByte == 0) { @@ -279,7 +282,7 @@ public class NextGenLocalRecorder implements Recorder { } } - private boolean deleteIfTooShort(Recording rec) throws IOException, ParseException, PlaylistException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { + private boolean deleteIfTooShort(Recording rec) throws IOException, ParseException, PlaylistException, InvalidKeyException, NoSuchAlgorithmException { Duration minimumLengthInSeconds = Duration.ofSeconds(Config.getInstance().getSettings().minimumLengthInSeconds); if (minimumLengthInSeconds.getSeconds() <= 0) { return false; @@ -296,7 +299,7 @@ public class NextGenLocalRecorder implements Recorder { } @Override - public void stopRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { + public void stopRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException { modelLock.lock(); try { if (models.contains(model)) { @@ -314,8 +317,8 @@ public class NextGenLocalRecorder implements Recorder { recordingsLock.lock(); try { if (recordingProcesses.containsKey(model)) { - Recording recording = recordingProcesses.get(model); - recording.getDownload().stop(); + Recording rec = recordingProcesses.get(model); + rec.getDownload().stop(); } } finally { recordingsLock.unlock(); @@ -323,7 +326,7 @@ public class NextGenLocalRecorder implements Recorder { } @Override - public void switchStreamSource(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { + public void switchStreamSource(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException { if (models.contains(model)) { int index = models.indexOf(model); models.get(index).setStreamUrlIndex(model.getStreamUrlIndex()); @@ -331,8 +334,8 @@ public class NextGenLocalRecorder implements Recorder { LOG.debug("Switching stream source to index {} for model {}", model.getStreamUrlIndex(), model.getName()); recordingsLock.lock(); try { - Recording recording = recordingProcesses.get(model); - if (recording != null) { + Recording rec = recordingProcesses.get(model); + if (rec != null) { stopRecordingProcess(model); } } finally { @@ -341,7 +344,6 @@ public class NextGenLocalRecorder implements Recorder { tryRestartRecording(model); } else { LOG.warn("Couldn't switch stream source for model {}. Not found in list", model.getName()); - return; } } @@ -349,9 +351,9 @@ public class NextGenLocalRecorder implements Recorder { recordingsLock.lock(); try { LOG.debug("Stopping recording for {}", model); - Recording recording = recordingProcesses.get(model); + Recording rec = recordingProcesses.get(model); LOG.debug("Stopping download for {}", model); - recording.getDownload().stop(); + rec.getDownload().stop(); } finally { recordingsLock.unlock(); } @@ -360,8 +362,8 @@ public class NextGenLocalRecorder implements Recorder { private void stopRecordingProcesses() { recordingsLock.lock(); try { - for (Recording recording : recordingProcesses.values()) { - recording.getDownload().stop(); + for (Recording rec : recordingProcesses.values()) { + rec.getDownload().stop(); } } finally { recordingsLock.unlock(); @@ -389,12 +391,12 @@ public class NextGenLocalRecorder implements Recorder { } @Override - public List getRecordings() throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { + public List getRecordings() throws IOException, InvalidKeyException, NoSuchAlgorithmException { return recordingManager.getAll(); } @Override - public void delete(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { + public void delete(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException { recordingManager.delete(recording); } @@ -423,6 +425,7 @@ public class NextGenLocalRecorder implements Recorder { try { Thread.sleep(1000); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); LOG.error("Error while waiting for downloads to finish", e); } } @@ -438,6 +441,7 @@ public class NextGenLocalRecorder implements Recorder { LOG.info("Waiting for post-processing to finish"); ppPool.awaitTermination(10, TimeUnit.MINUTES); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); LOG.error("Error while waiting for pools to finish", e); } } @@ -463,8 +467,8 @@ public class NextGenLocalRecorder implements Recorder { recordingsLock.lock(); try { - Recording recording = recordingProcesses.get(model); - Optional.ofNullable(recording).map(Recording::getDownload).ifPresent(Download::stop); + Recording rec = recordingProcesses.get(model); + Optional.ofNullable(rec).map(Recording::getDownload).ifPresent(Download::stop); } finally { recordingsLock.unlock(); } @@ -485,9 +489,11 @@ public class NextGenLocalRecorder implements Recorder { config.save(); } else { LOG.warn("Couldn't resume model {}. Not found in list", model.getName()); - return; } - } catch (ExecutionException | InterruptedException e) { + } catch (ExecutionException e) { + LOG.error("Couldn't check, if model {} is online", model.getName()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); LOG.error("Couldn't check, if model {} is online", model.getName()); } finally { modelLock.unlock(); @@ -515,7 +521,10 @@ public class NextGenLocalRecorder implements Recorder { return getModels().stream().filter(m -> { try { return m.isOnline(); - } catch (IOException | ExecutionException | InterruptedException e) { + } catch (IOException | ExecutionException e) { + return false; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); return false; } }).collect(Collectors.toList()); @@ -600,14 +609,11 @@ public class NextGenLocalRecorder implements Recorder { } private ThreadFactory createThreadFactory(String name) { - return new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setName(name + " " + UUID.randomUUID().toString().substring(0, 8)); - t.setDaemon(true); - return t; - } + return r -> { + Thread t = new Thread(r); + t.setName(name + " " + UUID.randomUUID().toString().substring(0, 8)); + t.setDaemon(true); + return t; }; } diff --git a/common/src/main/java/ctbrec/recorder/OnlineMonitor.java b/common/src/main/java/ctbrec/recorder/OnlineMonitor.java index 796cd6d9..c5d32933 100644 --- a/common/src/main/java/ctbrec/recorder/OnlineMonitor.java +++ b/common/src/main/java/ctbrec/recorder/OnlineMonitor.java @@ -23,7 +23,7 @@ import ctbrec.event.ModelStateChangedEvent; import ctbrec.io.HttpException; public class OnlineMonitor extends Thread { - private static final transient Logger LOG = LoggerFactory.getLogger(OnlineMonitor.class); + private static final Logger LOG = LoggerFactory.getLogger(OnlineMonitor.class); private static final boolean IGNORE_CACHE = true; private volatile boolean running = false; @@ -47,56 +47,74 @@ public class OnlineMonitor extends Thread { List models = recorder.getModels(); // remove models, which are not recorded anymore - for (Iterator iterator = states.keySet().iterator(); iterator.hasNext();) { - Model model = iterator.next(); - if(!models.contains(model)) { - iterator.remove(); - } - } + removeDeletedModels(models); // update the currently recorded models - for (Model model : models) { - try { - if(model.isOnline(IGNORE_CACHE)) { - EventBusHolder.BUS.post(new ModelIsOnlineEvent(model)); - } - Model.State state = model.getOnlineState(false); - Model.State oldState = states.getOrDefault(model, UNKNOWN); - states.put(model, state); - if(state != oldState) { - EventBusHolder.BUS.post(new ModelStateChangedEvent(model, oldState, state)); - } - } catch (HttpException e) { - LOG.error("Couldn't check if model {} is online. HTTP Response: {} - {}", - model.getName(), e.getResponseCode(), e.getResponseMessage()); - } catch (SocketTimeoutException e) { - LOG.error("Couldn't check if model {} is online. Request timed out", model.getName()); - } catch (InterruptedException | InterruptedIOException e) { - if(running) { - LOG.error("Couldn't check if model {} is online", model.getName(), e); - } - } catch (Exception e) { - LOG.error("Couldn't check if model {} is online", model.getName(), e); - } - } + updateModels(models); + Instant end = Instant.now(); Duration timeCheckTook = Duration.between(begin, end); - LOG.trace("Online check for {} models took {} seconds", models.size(), timeCheckTook.getSeconds()); + suspendUntilNextIteration(models, timeCheckTook); + } + LOG.debug("{} terminated", getName()); + } - long sleepTime = Config.getInstance().getSettings().onlineCheckIntervalInSecs; - if(timeCheckTook.getSeconds() < sleepTime) { - try { - if (running) { - long millis = TimeUnit.SECONDS.toMillis(sleepTime - timeCheckTook.getSeconds()); - LOG.trace("Sleeping {}ms", millis); - Thread.sleep(millis); - } - } catch (InterruptedException e) { - LOG.trace("Sleep interrupted"); - } + private void removeDeletedModels(List models) { + for (Iterator iterator = states.keySet().iterator(); iterator.hasNext();) { + Model model = iterator.next(); + if(!models.contains(model)) { + iterator.remove(); + } + } + } + + private void updateModels(List models) { + for (Model model : models) { + updateModel(model); + } + } + + private void updateModel(Model model) { + try { + if(model.isOnline(IGNORE_CACHE)) { + EventBusHolder.BUS.post(new ModelIsOnlineEvent(model)); + } + Model.State state = model.getOnlineState(false); + Model.State oldState = states.getOrDefault(model, UNKNOWN); + states.put(model, state); + if(state != oldState) { + EventBusHolder.BUS.post(new ModelStateChangedEvent(model, oldState, state)); + } + } catch (HttpException e) { + LOG.error("Couldn't check if model {} is online. HTTP Response: {} - {}", + model.getName(), e.getResponseCode(), e.getResponseMessage()); + } catch (SocketTimeoutException e) { + LOG.error("Couldn't check if model {} is online. Request timed out", model.getName()); + } catch (InterruptedException | InterruptedIOException e) { + Thread.currentThread().interrupt(); + if(running) { + LOG.error("Couldn't check if model {} is online", model.getName(), e); + } + } catch (Exception e) { + LOG.error("Couldn't check if model {} is online", model.getName(), e); + } + } + + private void suspendUntilNextIteration(List models, Duration timeCheckTook) { + LOG.trace("Online check for {} models took {} seconds", models.size(), timeCheckTook.getSeconds()); + long sleepTime = Config.getInstance().getSettings().onlineCheckIntervalInSecs; + if(timeCheckTook.getSeconds() < sleepTime) { + try { + if (running) { + long millis = TimeUnit.SECONDS.toMillis(sleepTime - timeCheckTook.getSeconds()); + LOG.trace("Sleeping {}ms", millis); + Thread.sleep(millis); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.trace("Sleep interrupted"); } } - LOG.debug(getName() + " terminated"); } public void shutdown() { diff --git a/common/src/main/java/ctbrec/recorder/RemoteRecorder.java b/common/src/main/java/ctbrec/recorder/RemoteRecorder.java index c9175013..49bdcdab 100644 --- a/common/src/main/java/ctbrec/recorder/RemoteRecorder.java +++ b/common/src/main/java/ctbrec/recorder/RemoteRecorder.java @@ -37,13 +37,12 @@ import okhttp3.Response; public class RemoteRecorder implements Recorder { - private static final transient Logger LOG = LoggerFactory.getLogger(RemoteRecorder.class); + private static final String SUCCESS = "success"; + + private static final Logger LOG = LoggerFactory.getLogger(RemoteRecorder.class); public static final MediaType JSON = MediaType.parse("application/json; charset=utf-8"); - private Moshi moshi = new Moshi.Builder() - .add(Instant.class, new InstantJsonAdapter()) - .add(Model.class, new ModelJsonAdapter()) - .build(); + private Moshi moshi = new Moshi.Builder().add(Instant.class, new InstantJsonAdapter()).add(Model.class, new ModelJsonAdapter()).build(); private JsonAdapter modelListResponseAdapter = moshi.adapter(ModelListResponse.class); private JsonAdapter recordingListResponseAdapter = moshi.adapter(RecordingListResponse.class); private JsonAdapter modelRequestAdapter = moshi.adapter(ModelRequest.class); @@ -62,7 +61,6 @@ public class RemoteRecorder implements Recorder { private Instant lastSync = Instant.EPOCH; private SyncThread syncThread; - public RemoteRecorder(Config config, HttpClient client, List sites) { this.config = config; this.client = client; @@ -77,29 +75,27 @@ public class RemoteRecorder implements Recorder { } @Override - public void startRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { + public void startRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException { sendRequest("start", model); } @Override - public void stopRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { + public void stopRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException { sendRequest("stop", model); } - private void sendRequest(String action, Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { + private void sendRequest(String action, Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException { String payload = modelRequestAdapter.toJson(new ModelRequest(action, model)); LOG.debug("Sending request to recording server: {}", payload); RequestBody body = RequestBody.create(JSON, payload); - Request.Builder builder = new Request.Builder() - .url(getRecordingEndpoint()) - .post(body); + Request.Builder builder = new Request.Builder().url(getRecordingEndpoint()).post(body); addHmacIfNeeded(payload, builder); Request request = builder.build(); try (Response response = client.execute(request)) { String json = response.body().string(); if (response.isSuccessful()) { ModelListResponse resp = modelListResponseAdapter.fromJson(json); - if (!resp.status.equals("success")) { + if (!resp.status.equals(SUCCESS)) { throw new IOException("Server returned error " + resp.status + " " + resp.msg); } @@ -115,8 +111,8 @@ public class RemoteRecorder implements Recorder { } } - private void addHmacIfNeeded(String msg, Builder builder) throws InvalidKeyException, NoSuchAlgorithmException, IllegalStateException, UnsupportedEncodingException { - if(Config.getInstance().getSettings().requireAuthentication) { + private void addHmacIfNeeded(String msg, Builder builder) throws InvalidKeyException, NoSuchAlgorithmException, UnsupportedEncodingException { + if (Config.getInstance().getSettings().requireAuthentication) { byte[] key = Config.getInstance().getSettings().key; String hmac = Hmac.calculate(msg, key); builder.addHeader("CTBREC-HMAC", hmac); @@ -131,7 +127,7 @@ public class RemoteRecorder implements Recorder { @Override public boolean isSuspended(Model model) { int index = models.indexOf(model); - if(index >= 0) { + if (index >= 0) { Model m = models.get(index); return m.isSuspended(); } else { @@ -141,10 +137,10 @@ public class RemoteRecorder implements Recorder { @Override public List getModels() { - if(!lastSync.equals(Instant.EPOCH) && lastSync.isBefore(Instant.now().minusSeconds(60))) { + if (!lastSync.equals(Instant.EPOCH) && lastSync.isBefore(Instant.now().minusSeconds(60))) { throw new RuntimeException("Last sync was over a minute ago"); } - return new ArrayList(models); + return new ArrayList<>(models); } @Override @@ -163,7 +159,7 @@ public class RemoteRecorder implements Recorder { @Override public void run() { running = true; - while(running) { + while (running) { syncModels(); syncOnlineModels(); syncSpace(); @@ -176,14 +172,12 @@ public class RemoteRecorder implements Recorder { try { String msg = "{\"action\": \"space\"}"; RequestBody body = RequestBody.create(JSON, msg); - Request.Builder builder = new Request.Builder() - .url(getRecordingEndpoint()) - .post(body); + Request.Builder builder = new Request.Builder().url(getRecordingEndpoint()).post(body); addHmacIfNeeded(msg, builder); Request request = builder.build(); - try(Response response = client.execute(request)) { + try (Response response = client.execute(request)) { String json = response.body().string(); - if(response.isSuccessful()) { + if (response.isSuccessful()) { JSONObject resp = new JSONObject(json); spaceTotal = resp.getLong("spaceTotal"); spaceFree = resp.getLong("spaceFree"); @@ -200,20 +194,18 @@ public class RemoteRecorder implements Recorder { try { String msg = "{\"action\": \"list\"}"; RequestBody body = RequestBody.create(JSON, msg); - Request.Builder builder = new Request.Builder() - .url(getRecordingEndpoint()) - .post(body); + Request.Builder builder = new Request.Builder().url(getRecordingEndpoint()).post(body); addHmacIfNeeded(msg, builder); Request request = builder.build(); - try(Response response = client.execute(request)) { + try (Response response = client.execute(request)) { String json = response.body().string(); - if(response.isSuccessful()) { + if (response.isSuccessful()) { ModelListResponse resp = modelListResponseAdapter.fromJson(json); - if(resp.status.equals("success")) { + if (resp.status.equals(SUCCESS)) { models = resp.models; for (Model model : models) { for (Site site : sites) { - if(site.isSiteForModel(model)) { + if (site.isSiteForModel(model)) { model.setSite(site); } } @@ -235,16 +227,14 @@ public class RemoteRecorder implements Recorder { try { String msg = "{\"action\": \"listOnline\"}"; RequestBody body = RequestBody.create(JSON, msg); - Request.Builder builder = new Request.Builder() - .url(getRecordingEndpoint()) - .post(body); + Request.Builder builder = new Request.Builder().url(getRecordingEndpoint()).post(body); addHmacIfNeeded(msg, builder); Request request = builder.build(); try (Response response = client.execute(request)) { String json = response.body().string(); if (response.isSuccessful()) { ModelListResponse resp = modelListResponseAdapter.fromJson(json); - if (resp.status.equals("success")) { + if (resp.status.equals(SUCCESS)) { onlineModels = resp.models; for (Model model : models) { for (Site site : sites) { @@ -269,27 +259,25 @@ public class RemoteRecorder implements Recorder { try { String msg = "{\"action\": \"recordings\"}"; RequestBody body = RequestBody.create(JSON, msg); - Request.Builder builder = new Request.Builder() - .url(getRecordingEndpoint()) - .post(body); + Request.Builder builder = new Request.Builder().url(getRecordingEndpoint()).post(body); addHmacIfNeeded(msg, builder); Request request = builder.build(); try (Response response = client.execute(request)) { String json = response.body().string(); if (response.isSuccessful()) { RecordingListResponse resp = recordingListResponseAdapter.fromJson(json); - if (resp.status.equals("success")) { + if (resp.status.equals(SUCCESS)) { List newRecordings = resp.recordings; // fire changed events for (Iterator iterator = recordings.iterator(); iterator.hasNext();) { Recording recording = iterator.next(); - if(newRecordings.contains(recording)) { + if (newRecordings.contains(recording)) { int idx = newRecordings.indexOf(recording); Recording newRecording = newRecordings.get(idx); - if(newRecording.getStatus() != recording.getStatus()) { + if (newRecording.getStatus() != recording.getStatus()) { File file = new File(recording.getPath()); - RecordingStateChangedEvent evt = new RecordingStateChangedEvent(file, - newRecording.getStatus(), recording.getModel(), recording.getStartDate()); + RecordingStateChangedEvent evt = new RecordingStateChangedEvent(file, newRecording.getStatus(), recording.getModel(), + recording.getStartDate()); EventBusHolder.BUS.post(evt); } } @@ -311,6 +299,7 @@ public class RemoteRecorder implements Recorder { try { Thread.sleep(2000); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // interrupted, probably by stopThread } } @@ -339,25 +328,23 @@ public class RemoteRecorder implements Recorder { } @Override - public List getRecordings() throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { + public List getRecordings() throws IOException, InvalidKeyException, NoSuchAlgorithmException { return recordings; } @Override - public void delete(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { + public void delete(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException { RecordingRequest recReq = new RecordingRequest("delete", recording); String msg = recordingRequestAdapter.toJson(recReq); RequestBody body = RequestBody.create(JSON, msg); - Request.Builder builder = new Request.Builder() - .url(getRecordingEndpoint()) - .post(body); + Request.Builder builder = new Request.Builder().url(getRecordingEndpoint()).post(body); addHmacIfNeeded(msg, builder); Request request = builder.build(); try (Response response = client.execute(request)) { String json = response.body().string(); RecordingListResponse resp = recordingListResponseAdapter.fromJson(json); if (response.isSuccessful()) { - if (!resp.status.equals("success")) { + if (!resp.status.equals(SUCCESS)) { throw new IOException("Couldn't delete recording: " + resp.msg); } else { recordings.remove(recording); @@ -423,29 +410,29 @@ public class RemoteRecorder implements Recorder { } @Override - public void switchStreamSource(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { + public void switchStreamSource(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException { sendRequest("switch", model); } @Override - public void suspendRecording(Model model) throws InvalidKeyException, NoSuchAlgorithmException, IllegalStateException, IOException { + public void suspendRecording(Model model) throws InvalidKeyException, NoSuchAlgorithmException, IOException { sendRequest("suspend", model); model.setSuspended(true); // update cached model int index = models.indexOf(model); - if(index >= 0) { + if (index >= 0) { Model m = models.get(index); m.setSuspended(true); } } @Override - public void resumeRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { + public void resumeRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException { sendRequest("resume", model); model.setSuspended(false); // update cached model int index = models.indexOf(model); - if(index >= 0) { + if (index >= 0) { Model m = models.get(index); m.setSuspended(false); } @@ -472,21 +459,19 @@ public class RemoteRecorder implements Recorder { } @Override - public void rerunPostProcessing(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException { + public void rerunPostProcessing(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException { RecordingRequest recReq = new RecordingRequest("rerunPostProcessing", recording); String msg = recordingRequestAdapter.toJson(recReq); LOG.debug(msg); RequestBody body = RequestBody.create(JSON, msg); - Request.Builder builder = new Request.Builder() - .url(getRecordingEndpoint()) - .post(body); + Request.Builder builder = new Request.Builder().url(getRecordingEndpoint()).post(body); addHmacIfNeeded(msg, builder); Request request = builder.build(); try (Response response = client.execute(request)) { String json = response.body().string(); SimpleResponse resp = simpleResponseAdapter.fromJson(json); if (response.isSuccessful()) { - if (!resp.status.equals("success")) { + if (!resp.status.equals(SUCCESS)) { throw new IOException("Couldn't start post-processing for recording: " + resp.msg); } } else { diff --git a/common/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java b/common/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java index a69915cd..ac407caf 100644 --- a/common/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java @@ -47,7 +47,7 @@ import okhttp3.Response; public abstract class AbstractHlsDownload implements Download { - private static final transient Logger LOG = LoggerFactory.getLogger(AbstractHlsDownload.class); + private static final Logger LOG = LoggerFactory.getLogger(AbstractHlsDownload.class); private static int threadCounter = 0; protected HttpClient client; @@ -103,9 +103,9 @@ public abstract class AbstractHlsDownload implements Download { for (TrackData trackData : tracks) { String uri = trackData.getUri(); if(!uri.startsWith("http")) { - String _url = segmentsUrl.toString(); - _url = _url.substring(0, _url.lastIndexOf('/') + 1); - uri = _url + uri; + String tmpurl = segmentsUrl.toString(); + tmpurl = tmpurl.substring(0, tmpurl.lastIndexOf('/') + 1); + uri = tmpurl + uri; } lsp.totalDuration += trackData.getTrackInfo().duration; lsp.lastSegDuration = trackData.getTrackInfo().duration; @@ -168,6 +168,7 @@ public abstract class AbstractHlsDownload implements Download { try { Thread.sleep(6000); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } else { playlistEmptyCount = 0; @@ -209,7 +210,9 @@ public abstract class AbstractHlsDownload implements Download { getModel().getSite().getName(), Long.toString(recording.getStartDate().getEpochSecond()) }; - LOG.debug("Running {}", Arrays.toString(args)); + if(LOG.isDebugEnabled()) { + LOG.debug("Running {}", Arrays.toString(args)); + } Process process = rt.exec(args, OS.getEnvironment()); // TODO maybe write these to a separate log file, e.g. recname.ts.pp.log Thread std = new Thread(new StreamRedirectThread(process.getInputStream(), System.out)); diff --git a/common/src/main/java/ctbrec/recorder/download/HlsDownload.java b/common/src/main/java/ctbrec/recorder/download/HlsDownload.java index bcbf5dd5..dcc5df97 100644 --- a/common/src/main/java/ctbrec/recorder/download/HlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/HlsDownload.java @@ -10,17 +10,14 @@ import java.io.InputStream; import java.net.URL; import java.nio.file.FileSystems; import java.nio.file.Files; -import java.nio.file.LinkOption; import java.nio.file.Path; import java.text.DecimalFormat; import java.text.NumberFormat; import java.time.Duration; -import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.concurrent.Callable; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -51,7 +48,7 @@ import okhttp3.Response; public class HlsDownload extends AbstractHlsDownload { - private static final transient Logger LOG = LoggerFactory.getLogger(HlsDownload.class); + private static final Logger LOG = LoggerFactory.getLogger(HlsDownload.class); protected Path downloadDir; @@ -69,7 +66,6 @@ public class HlsDownload extends AbstractHlsDownload { public void init(Config config, Model model) { this.config = config; super.model = model; - startTime = Instant.now(); DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.RECORDING_DATE_FORMAT); String startTime = formatter.format(ZonedDateTime.ofInstant(this.startTime, ZoneId.systemDefault())); Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getSanitizedNamed()); @@ -88,7 +84,7 @@ public class HlsDownload extends AbstractHlsDownload { String segments = getSegmentPlaylistUrl(model); if(segments != null) { - if (!Files.exists(downloadDir, LinkOption.NOFOLLOW_LINKS)) { + if (!downloadDir.toFile().exists()) { Files.createDirectories(downloadDir); } int lastSegmentNumber = 0; @@ -103,7 +99,6 @@ public class HlsDownload extends AbstractHlsDownload { LOG.warn("Missed segments {} < {} in download for {} - setting wait factor to 1/{}", nextSegmentNumber, playlist.seq, model, waitFactor); } int skip = nextSegmentNumber - playlist.seq; - Future lastSegmentDownload = null; for (String segment : playlist.segments) { if(skip > 0) { skip--; @@ -111,13 +106,12 @@ public class HlsDownload extends AbstractHlsDownload { URL segmentUrl = new URL(segment); String prefix = nf.format(segmentCounter++); SegmentDownload segmentDownload = new SegmentDownload(playlist, segmentUrl, downloadDir, client, prefix); - lastSegmentDownload = downloadThreadPool.submit(segmentDownload); - //new SegmentDownload(segment, downloadDir).call(); + downloadThreadPool.submit(segmentDownload); } } // split recordings - boolean split = splitRecording(lastSegmentDownload); + boolean split = splitRecording(); if (split) { break; } @@ -129,13 +123,14 @@ public class HlsDownload extends AbstractHlsDownload { LOG.trace("Playlist didn't change... waiting for {}ms", wait); } else { // playlist did change -> wait for at least last segment duration - wait = 1;//(long) lsp.lastSegDuration * 1000; + wait = 1; LOG.trace("Playlist changed... waiting for {}ms", wait); } try { Thread.sleep(wait); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); if(running) { LOG.error("Couldn't sleep between segment downloads. This might mess up the download!"); } @@ -175,7 +170,9 @@ public class HlsDownload extends AbstractHlsDownload { try { LOG.debug("Waiting for last segments for {}", model); downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS); - } catch (InterruptedException e) {} + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } synchronized (downloadFinished) { downloadFinished.notifyAll(); } @@ -198,7 +195,7 @@ public class HlsDownload extends AbstractHlsDownload { } PlaylistGenerator playlistGenerator = new PlaylistGenerator(); - playlistGenerator.addProgressListener(percent -> recording.setProgress(percent)); + playlistGenerator.addProgressListener(recording::setProgress); try { File playlist = playlistGenerator.generate(recDir); @@ -214,17 +211,21 @@ public class HlsDownload extends AbstractHlsDownload { } else { LOG.error("Playlist contains errors"); for (PlaylistError error : e.getErrors()) { - LOG.error("Error: {}", error.toString()); + LOG.error("Error: {}", error); } } } catch (InvalidPlaylistException e) { LOG.error("Playlist is invalid and will be deleted", e); File playlist = new File(recDir, "playlist.m3u8"); - playlist.delete(); + try { + Files.delete(playlist.toPath()); + } catch (IOException e1) { + LOG.error("Couldn't delete playlist {}", playlist, e1); + } } } - private boolean splitRecording(Future lastSegmentDownload) { + private boolean splitRecording() { if(config.getSettings().splitRecordings > 0) { Duration recordingDuration = Duration.between(splitRecStartTime, ZonedDateTime.now()); long seconds = recordingDuration.getSeconds(); @@ -237,14 +238,15 @@ public class HlsDownload extends AbstractHlsDownload { } @Override - public synchronized void stop() { + public void stop() { if (running) { internalStop(); try { synchronized (downloadFinished) { - downloadFinished.wait(); + downloadFinished.wait(TimeUnit.SECONDS.toMillis(60)); } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); LOG.error("Couldn't wait for download to finish", e); } } @@ -271,7 +273,7 @@ public class HlsDownload extends AbstractHlsDownload { @Override public Boolean call() throws Exception { - LOG.trace("Downloading segment to " + file); + LOG.trace("Downloading segment to {}", file); int maxTries = 3; for (int i = 1; i <= maxTries; i++) { Request request = new Request.Builder().url(url).addHeader("connection", "keep-alive").build(); @@ -357,6 +359,8 @@ public class HlsDownload extends AbstractHlsDownload { private void waitSomeTime() { try { Thread.sleep(10_000); - } catch (Exception e) {} + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } } diff --git a/common/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java b/common/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java index 72286c4f..3d819da6 100644 --- a/common/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java @@ -11,7 +11,6 @@ import java.net.MalformedURLException; import java.net.URL; import java.nio.channels.FileChannel; import java.nio.file.Files; -import java.nio.file.LinkOption; import java.nio.file.Path; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; @@ -50,7 +49,7 @@ import okhttp3.Response; public class MergedHlsDownload extends AbstractHlsDownload { - private static final transient Logger LOG = LoggerFactory.getLogger(MergedHlsDownload.class); + private static final Logger LOG = LoggerFactory.getLogger(MergedHlsDownload.class); private static final boolean IGNORE_CACHE = true; private BlockingMultiMTSSource multiSource; private Thread mergeThread; @@ -59,7 +58,7 @@ public class MergedHlsDownload extends AbstractHlsDownload { private Config config; private File targetFile; private FileChannel fileChannel = null; - private Object downloadFinished = new Object(); + private boolean downloadFinished = false; public MergedHlsDownload(HttpClient client) { super(client); @@ -85,11 +84,11 @@ public class MergedHlsDownload extends AbstractHlsDownload { mergeThread = createMergeThread(targetFile, progressListener, false); LOG.debug("Merge thread started"); mergeThread.start(); - if(Config.getInstance().getSettings().requireAuthentication) { + if (Config.getInstance().getSettings().requireAuthentication) { URL u = new URL(segmentPlaylistUri); String path = u.getPath(); byte[] key = Config.getInstance().getSettings().key; - if(!Config.getInstance().getContextPath().isEmpty()) { + if (!Config.getInstance().getContextPath().isEmpty()) { path = path.substring(Config.getInstance().getContextPath().length()); } String hmac = Hmac.calculate(path, key); @@ -100,28 +99,29 @@ public class MergedHlsDownload extends AbstractHlsDownload { LOG.debug("Waiting for merge thread to finish"); mergeThread.join(); LOG.debug("Merge thread finished"); - } catch(ParseException e) { + } catch (ParseException e) { throw new IOException("Couldn't parse stream information", e); - } catch(PlaylistException e) { + } catch (PlaylistException e) { throw new IOException("Couldn't parse HLS playlist", e); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IOException("Couldn't wait for write thread to finish. Recording might be cut off", e); } catch (InvalidKeyException | NoSuchAlgorithmException | IllegalStateException e) { throw new IOException("Couldn't add HMAC to playlist url", e); } finally { try { streamer.stop(); - } catch(Exception e) { + } catch (Exception e) { LOG.error("Couldn't stop streamer", e); } downloadThreadPool.shutdown(); try { LOG.debug("Waiting for last segments for {}", model); downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS); - } catch (InterruptedException e) {} - synchronized (downloadFinished) { - downloadFinished.notifyAll(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } + downloadFinished = true; LOG.debug("Download terminated for {}", segmentPlaylistUri); } } @@ -129,14 +129,13 @@ public class MergedHlsDownload extends AbstractHlsDownload { @Override public void start() throws IOException { try { - if(!model.isOnline(IGNORE_CACHE)) { - throw new IOException(model.getName() +"'s room is not public"); + if (!model.isOnline(IGNORE_CACHE)) { + throw new IOException(model.getName() + "'s room is not public"); } running = true; super.startTime = Instant.now(); splitRecStartTime = ZonedDateTime.now(); - super.model = model; String segments = getSegmentPlaylistUrl(model); mergeThread = createMergeThread(targetFile, null, true); @@ -149,16 +148,17 @@ public class MergedHlsDownload extends AbstractHlsDownload { } else { throw new IOException("Couldn't determine segments uri"); } - } catch(ParseException e) { + } catch (ParseException e) { throw new IOException("Couldn't parse stream information", e); - } catch(PlaylistException e) { + } catch (PlaylistException e) { throw new IOException("Couldn't parse HLS playlist", e); - } catch(EOFException e) { + } catch (EOFException e) { // end of playlist reached LOG.debug("Reached end of playlist for model {}", model); - } catch(Exception e) { + } catch (Exception e) { throw new IOException("Couldn't download segment", e); } finally { + if (streamer != null) { try { streamer.stop(); @@ -170,11 +170,10 @@ public class MergedHlsDownload extends AbstractHlsDownload { try { LOG.debug("Waiting for last segments for {}", model); downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS); - } catch (InterruptedException e) {} - synchronized (downloadFinished) { - LOG.debug("Download finished notify {}", model); - downloadFinished.notifyAll(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } + downloadFinished = true; LOG.debug("Download for {} terminated", model); } } @@ -182,17 +181,17 @@ public class MergedHlsDownload extends AbstractHlsDownload { private void downloadSegments(String segmentPlaylistUri, boolean livestreamDownload) throws IOException, ParseException, PlaylistException { int lastSegment = 0; int nextSegment = 0; - while(running) { + while (running) { try { SegmentPlaylist lsp = getNextSegments(segmentPlaylistUri); emptyPlaylistCheck(lsp); - if(!livestreamDownload) { + if (!livestreamDownload) { multiSource.setTotalSegments(lsp.segments.size()); } // download new segments long downloadStart = System.currentTimeMillis(); - if(livestreamDownload) { + if (livestreamDownload) { downloadNewSegments(lsp, nextSegment); } else { downloadRecording(lsp); @@ -200,12 +199,12 @@ public class MergedHlsDownload extends AbstractHlsDownload { long downloadTookMillis = System.currentTimeMillis() - downloadStart; // download segments, which might have been skipped - //downloadMissedSegments(lsp, nextSegment); - if(nextSegment > 0 && lsp.seq > nextSegment) { - LOG.warn("Missed segments {} < {} in download for {}. Download took {}ms. Playlist is {}sec", nextSegment, lsp.seq, lsp.url, downloadTookMillis, lsp.totalDuration); + if (nextSegment > 0 && lsp.seq > nextSegment) { + LOG.warn("Missed segments {} < {} in download for {}. Download took {}ms. Playlist is {}sec", nextSegment, lsp.seq, lsp.url, + downloadTookMillis, lsp.totalDuration); } - if(livestreamDownload) { + if (livestreamDownload) { // split up the recording, if configured boolean split = splitRecording(); if (split) { @@ -220,16 +219,16 @@ public class MergedHlsDownload extends AbstractHlsDownload { } else { break; } - } catch(HttpException e) { - if(e.getResponseCode() == 404) { + } catch (HttpException e) { + if (e.getResponseCode() == 404) { LOG.debug("Playlist not found (404). Model {} probably went offline", model); - } else if(e.getResponseCode() == 403) { + } else if (e.getResponseCode() == 403) { LOG.debug("Playlist access forbidden (403). Model {} probably went private or offline", model); } else { LOG.info("Unexpected error while downloading {}", model, e); } running = false; - } catch(Exception e) { + } catch (Exception e) { LOG.info("Unexpected error while downloading {}", model, e); running = false; } @@ -245,19 +244,19 @@ public class MergedHlsDownload extends AbstractHlsDownload { } } - private void downloadNewSegments(SegmentPlaylist lsp, int nextSegment) throws MalformedURLException, MissingSegmentException, ExecutionException, HttpException { + private void downloadNewSegments(SegmentPlaylist lsp, int nextSegment) throws MalformedURLException, ExecutionException, HttpException { int skip = nextSegment - lsp.seq; // add segments to download threadpool Queue> downloads = new LinkedList<>(); - if(downloadQueue.remainingCapacity() == 0) { + if (downloadQueue.remainingCapacity() == 0) { LOG.warn("Download to slow for this stream. Download queue is full. Skipping segment"); } else { for (String segment : lsp.segments) { - if(!running) { + if (!running) { break; } - if(skip > 0) { + if (skip > 0) { skip--; } else { URL segmentUrl = new URL(segment); @@ -278,27 +277,28 @@ public class MergedHlsDownload extends AbstractHlsDownload { byte[] segmentData = downloadFuture.get(); writeSegment(segmentData); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); LOG.error("Error while downloading segment", e); } catch (ExecutionException e) { Throwable cause = e.getCause(); - if(cause instanceof MissingSegmentException) { - if(model != null && !isModelOnline()) { + if (cause instanceof MissingSegmentException) { + if (model != null && !isModelOnline()) { LOG.debug("Error while downloading segment, because model {} is offline. Stopping now", model.getName()); running = false; } else { - LOG.debug("Segment not available, but model {} still online. Going on", model.getName()); + LOG.debug("Segment not available, but model {} still online. Going on", Optional.ofNullable(model).map(Model::getName).orElse("n/a")); } - } else if(cause instanceof HttpException) { + } else if (cause instanceof HttpException) { HttpException he = (HttpException) cause; - if(model != null && !isModelOnline()) { + if (model != null && !isModelOnline()) { LOG.debug("Error {} while downloading segment, because model {} is offline. Stopping now", he.getResponseCode(), model.getName()); running = false; } else { - if(he.getResponseCode() == 404) { - LOG.info("Playlist for {} not found [HTTP 404]. Stopping now", model.getName()); + if (he.getResponseCode() == 404) { + LOG.info("Playlist for {} not found [HTTP 404]. Stopping now", Optional.ofNullable(model).map(Model::getName).orElse("n/a")); running = false; - } else if(he.getResponseCode() == 403) { - LOG.info("Playlist for {} not accessible [HTTP 403]. Stopping now", model.getName()); + } else if (he.getResponseCode() == 403) { + LOG.info("Playlist for {} not accessible [HTTP 403]. Stopping now", Optional.ofNullable(model).map(Model::getName).orElse("n/a")); running = false; } else { throw he; @@ -318,33 +318,12 @@ public class MergedHlsDownload extends AbstractHlsDownload { } private boolean splitRecording() { - if(config.getSettings().splitRecordings > 0) { + if (config.getSettings().splitRecordings > 0) { Duration recordingDuration = Duration.between(splitRecStartTime, ZonedDateTime.now()); long seconds = recordingDuration.getSeconds(); - if(seconds >= config.getSettings().splitRecordings) { + if (seconds >= config.getSettings().splitRecordings) { internalStop(); return true; - // try { - // File lastTargetFile = targetFile; - // - // // switch to the next file - // targetFile = Config.getInstance().getFileForRecording(model, "ts"); - // LOG.debug("Switching to file {}", targetFile.getAbsolutePath()); - // fileChannel = FileChannel.open(targetFile.toPath(), CREATE, WRITE); - // MTSSink sink = ByteChannelSink.builder().setByteChannel(fileChannel).build(); - // streamer.switchSink(sink); - // super.startTime = Instant.now(); - // splitRecStartTime = ZonedDateTime.now(); - // - // // post-process current recording - // Thread pp = new Thread(() -> postprocess(lastTargetFile)); - // pp.setName("Post-Processing split recording"); - // pp.setPriority(Thread.MIN_PRIORITY); - // pp.start(); - // } catch (IOException e) { - // LOG.error("Error while splitting recording", e); - // running = false; - // } } } return false; @@ -354,8 +333,8 @@ public class MergedHlsDownload extends AbstractHlsDownload { try { long wait = 0; if (lastSegment == lsp.seq) { - int timeLeftMillis = (int)(lsp.totalDuration * 1000 - downloadTookMillis); - if(timeLeftMillis < 3000) { // we have less than 3 seconds to get the new playlist and start downloading it + int timeLeftMillis = (int) (lsp.totalDuration * 1000 - downloadTookMillis); + if (timeLeftMillis < 3000) { // we have less than 3 seconds to get the new playlist and start downloading it wait = 1; } else { // wait a second to be nice to the server (don't hammer it with requests) @@ -365,11 +344,12 @@ public class MergedHlsDownload extends AbstractHlsDownload { LOG.trace("Playlist didn't change... waiting for {}ms", wait); } else { // playlist did change -> wait for at least last segment duration - wait = 1;// (long) lsp.lastSegDuration * 1000; + wait = 1; LOG.trace("Playlist changed... waiting for {}ms", wait); } Thread.sleep(wait); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); if (running) { LOG.error("Couldn't sleep between segment downloads. This might mess up the download!"); } @@ -377,15 +357,20 @@ public class MergedHlsDownload extends AbstractHlsDownload { } @Override - public synchronized void stop() { + public void stop() { if (running) { - internalStop(); try { - synchronized (downloadFinished) { - LOG.debug("Waiting for finished notify {}", model); - downloadFinished.wait(); + internalStop(); + int count = 0; + while (!downloadFinished && count++ < 60) { + LOG.debug("Waiting for download to finish {}", model); + Thread.sleep(1000); + } + if(!downloadFinished) { + LOG.warn("Download didn't finishe properly for model {}", model); } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); LOG.error("Couldn't wait for download to finish", e); } LOG.debug("Download stopped"); @@ -393,7 +378,7 @@ public class MergedHlsDownload extends AbstractHlsDownload { } @Override - void internalStop() { + synchronized void internalStop() { running = false; if (streamer != null) { streamer.stop(); @@ -403,31 +388,24 @@ public class MergedHlsDownload extends AbstractHlsDownload { private Thread createMergeThread(File targetFile, ProgressListener listener, boolean liveStream) { Thread t = new Thread(() -> { - multiSource = BlockingMultiMTSSource.builder() - .setFixContinuity(true) - .setProgressListener(listener) - .build(); + multiSource = BlockingMultiMTSSource.builder().setFixContinuity(true).setProgressListener(listener).build(); try { Path downloadDir = targetFile.getParentFile().toPath(); - if (!Files.exists(downloadDir, LinkOption.NOFOLLOW_LINKS)) { + if (!downloadDir.toFile().exists()) { Files.createDirectories(downloadDir); } fileChannel = FileChannel.open(targetFile.toPath(), CREATE, WRITE); MTSSink sink = ByteChannelSink.builder().setByteChannel(fileChannel).build(); - streamer = Streamer.builder() - .setSource(multiSource) - .setSink(sink) - .setSleepingEnabled(liveStream) - .setBufferSize(10) - .setName(Optional.ofNullable(model).map(m -> m.getName()).orElse("")) - .build(); + streamer = Streamer.builder().setSource(multiSource).setSink(sink).setSleepingEnabled(liveStream).setBufferSize(10) + .setName(Optional.ofNullable(model).map(Model::getName).orElse("")).build(); // Start streaming streamer.stream(); LOG.debug("Streamer finished"); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); if (running) { LOG.error("Error while waiting for a download future", e); } @@ -478,27 +456,27 @@ public class MergedHlsDownload extends AbstractHlsDownload { @Override public byte[] call() throws IOException { - LOG.trace("Downloading segment " + url.getFile()); + LOG.trace("Downloading segment {}", url.getFile()); int maxTries = 3; for (int i = 1; i <= maxTries && running; i++) { Request request = new Request.Builder().url(url).addHeader("connection", "keep-alive").build(); try (Response response = client.execute(request)) { - if(response.isSuccessful()) { + if (response.isSuccessful()) { byte[] segment = response.body().bytes(); - if(lsp.encrypted) { + if (lsp.encrypted) { segment = new Crypto(lsp.encryptionKeyUrl, client).decrypt(segment); } return segment; } else { throw new HttpException(response.code(), response.message()); } - } catch(Exception e) { + } catch (Exception e) { if (i == maxTries) { LOG.error("Error while downloading segment. Segment {} finally failed", url.getFile()); } else { LOG.trace("Error while downloading segment {} on try {}", url.getFile(), i, e); } - if(model != null && !isModelOnline()) { + if (model != null && !isModelOnline()) { break; } } @@ -510,7 +488,10 @@ public class MergedHlsDownload extends AbstractHlsDownload { public boolean isModelOnline() { try { return model.isOnline(IGNORE_CACHE); - } catch (IOException | ExecutionException | InterruptedException e) { + } catch (IOException | ExecutionException e) { + return false; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); return false; } } diff --git a/common/src/main/java/org/taktik/mpegts/Streamer.java b/common/src/main/java/org/taktik/mpegts/Streamer.java index 560bbfc8..9c4fd428 100644 --- a/common/src/main/java/org/taktik/mpegts/Streamer.java +++ b/common/src/main/java/org/taktik/mpegts/Streamer.java @@ -107,13 +107,10 @@ public class Streamer { boolean resetState = false; MTSPacket packet = null; long packetCount = 0; - //long pcrPidPacketCount = 0; Long firstPcrValue = null; Long firstPcrTime = null; - //Long firstPcrPacketCount = null; Long lastPcrValue = null; Long lastPcrTime = null; - //Long lastPcrPacketCount = null; Long averageSleep = null; while (!streamingShouldStop) { if (resetState) { @@ -145,6 +142,7 @@ public class Streamer { log.error("Interrupted while waiting for packet"); continue; } else { + Thread.currentThread().interrupt(); break; } } @@ -164,100 +162,78 @@ public class Streamer { } } - if (pid != 0 && patSection!=null) { - if (patSection.getPrograms().values().contains(pid)) { - if (packet.isPayloadUnitStartIndicator()) { - ByteBuffer payload = packet.getPayload(); - payload.rewind(); - int pointer = payload.get() & 0xff; - payload.position(payload.position() + pointer); - pmtSection.put(pid, PMTSection.parse(payload)); - } - } - + if (pid != 0 && patSection!=null && patSection.getPrograms().values().contains(pid) && packet.isPayloadUnitStartIndicator()) { + ByteBuffer payload = packet.getPayload(); + payload.rewind(); + int pointer = payload.get() & 0xff; + payload.position(payload.position() + pointer); + pmtSection.put(pid, PMTSection.parse(payload)); } // Check PID matches PCR PID - if (true) {//mtsPacket.pid == pmt.getPcrPid()) { - //pcrPidPacketCount++; - - if (averageSleep != null) { - sleepNanos = averageSleep; - } else { - // if (pcrPidPacketCount < 2) { - // if (pcrPidPacketCount % 10 == 0) { - // sleepNanos = 15; - // } - // } - } + if (averageSleep != null) { + sleepNanos = averageSleep; } // Check for PCR - if (packet.getAdaptationField() != null) { - if (packet.getAdaptationField().getPcr() != null) { - if (packet.getPid() == getPCRPid()) { - if (!packet.getAdaptationField().isDiscontinuityIndicator()) { - // Get PCR and current nano time - long pcrValue = packet.getAdaptationField().getPcr().getValue(); - long pcrTime = System.nanoTime(); + if (packet.getAdaptationField() != null && packet.getAdaptationField().getPcr() != null) { + if (packet.getPid() == getPCRPid()) { + if (!packet.getAdaptationField().isDiscontinuityIndicator()) { + // Get PCR and current nano time + long pcrValue = packet.getAdaptationField().getPcr().getValue(); + long pcrTime = System.nanoTime(); - // Compute sleepNanosOrig - if (firstPcrValue == null || firstPcrTime == null) { - firstPcrValue = pcrValue; - firstPcrTime = pcrTime; - //firstPcrPacketCount = pcrPidPacketCount; - } - - // Compute sleepNanosPrevious - Long sleepNanosPrevious = null; - if (lastPcrValue != null && lastPcrTime != null) { - if (pcrValue <= lastPcrValue) { - log.trace("PCR discontinuity ! " + packet.getPid()); - resetState = true; - } else { - sleepNanosPrevious = ((pcrValue - lastPcrValue) / 27 * 1000) - (pcrTime - lastPcrTime); - } - } - // System.out.println("pcrValue=" + pcrValue + ", lastPcrValue=" + lastPcrValue + ", sleepNanosPrevious=" + sleepNanosPrevious + ", sleepNanosOrig=" + sleepNanosOrig); - - // Set sleep time based on PCR if possible - if (sleepNanosPrevious != null) { - // Safety : We should never have to wait more than 100ms - if (sleepNanosPrevious > 100000000) { - log.warn("PCR sleep ignored, too high !"); - resetState = true; - } else { - sleepNanos = sleepNanosPrevious; - // averageSleep = sleepNanosPrevious / (pcrPidPacketCount - lastPcrPacketCount - 1); - } - } - - // Set lastPcrValue/lastPcrTime - lastPcrValue = pcrValue; - lastPcrTime = pcrTime + sleepNanos; - //lastPcrPacketCount = pcrPidPacketCount; - } else { - log.warn("Skipped PCR - Discontinuity indicator"); + // Compute sleepNanosOrig + if (firstPcrValue == null || firstPcrTime == null) { + firstPcrValue = pcrValue; + firstPcrTime = pcrTime; } + + // Compute sleepNanosPrevious + Long sleepNanosPrevious = null; + if (lastPcrValue != null && lastPcrTime != null) { + if (pcrValue <= lastPcrValue) { + log.trace("PCR discontinuity ! {}", packet.getPid()); + resetState = true; + } else { + sleepNanosPrevious = ((pcrValue - lastPcrValue) / 27 * 1000) - (pcrTime - lastPcrTime); + } + } + + // Set sleep time based on PCR if possible + if (sleepNanosPrevious != null) { + // Safety : We should never have to wait more than 100ms + if (sleepNanosPrevious > 100000000) { + log.warn("PCR sleep ignored, too high !"); + resetState = true; + } else { + sleepNanos = sleepNanosPrevious; + } + } + + // Set lastPcrValue/lastPcrTime + lastPcrValue = pcrValue; + lastPcrTime = pcrTime + sleepNanos; } else { - log.debug("Skipped PCR - PID does not match"); + log.warn("Skipped PCR - Discontinuity indicator"); } + } else { + log.debug("Skipped PCR - PID does not match"); } } // Sleep if needed if (sleepNanos > 0 && sleepingEnabled) { - log.trace("Sleeping " + sleepNanos / 1000000 + " millis, " + sleepNanos % 1000000 + " nanos"); + log.trace("Sleeping {} millis, {} nanos", sleepNanos / 1000000, sleepNanos % 1000000); try { Thread.sleep(sleepNanos / 1000000, (int) (sleepNanos % 1000000)); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.warn("Streaming sleep interrupted!"); } } // Stream packet - // System.out.println("Streaming packet #" + packetCount + ", PID=" + mtsPacket.getPid() + ", pcrCount=" + pcrCount + ", continuityCounter=" + mtsPacket.getContinuityCounter()); - if(!streamingShouldStop && !Thread.interrupted()) { try { sink.send(packet); @@ -289,18 +265,12 @@ public class Streamer { while (!streamingShouldStop && (packet = source.nextPacket()) != null) { boolean put = false; while (!put) { - try { - buffer.put(packet); - put = true; - } catch (InterruptedException ignored) { - log.error("Error adding packet to buffer", ignored); - } + put = putPacketToBuffer(packet); } } } catch (InterruptedException e) { - if(!streamingShouldStop) { - log.error("Error reading from source", e); - } + Thread.currentThread().interrupt(); + log.error("Error reading from source", e); } catch (Exception e) { log.error("Error reading from source", e); } finally { @@ -313,9 +283,19 @@ public class Streamer { } } + private boolean putPacketToBuffer(MTSPacket packet) { + try { + buffer.put(packet); + return true; + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + log.error("Error adding packet to buffer", ignored); + return false; + } + } + private int getPCRPid() { if ((!pmtSection.isEmpty())) { - // TODO change this return pmtSection.values().iterator().next().getPcrPid(); } return -1;