forked from j62/ctbrec
1
0
Fork 0

Improve thread interrupt and lock handling

This commit is contained in:
0xboobface 2019-11-23 17:13:27 +01:00
parent 84a02d7432
commit 011e8c2f29
9 changed files with 331 additions and 351 deletions

View File

@ -123,6 +123,7 @@ public class StreamPreview extends StackPane {
}
showTestImage();
} catch (InterruptedException | InterruptedIOException e) {
Thread.currentThread().interrupt();
// future has been canceled, that's fine
} catch (ExecutionException e) {
if(e.getCause() instanceof InterruptedException || e.getCause() instanceof InterruptedIOException) {
@ -180,6 +181,7 @@ public class StreamPreview extends StackPane {
private void checkInterrupt() throws InterruptedException {
if(Thread.interrupted()) {
Thread.currentThread().interrupt();
throw new InterruptedException();
}
}

View File

@ -40,6 +40,7 @@ public final class Toast {
try {
Thread.sleep(toastDelay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Timeline fadeOutTimeline = new Timeline();
KeyFrame fadeOutKey1 = new KeyFrame(Duration.millis(fadeOutDelay), new KeyValue(toastStage.getScene().getRoot().opacityProperty(), 0));

View File

@ -53,7 +53,7 @@ import ctbrec.sites.Site;
public class NextGenLocalRecorder implements Recorder {
private static final transient Logger LOG = LoggerFactory.getLogger(NextGenLocalRecorder.class);
private static final Logger LOG = LoggerFactory.getLogger(NextGenLocalRecorder.class);
private static final boolean IGNORE_CACHE = true;
private List<Model> models = Collections.synchronizedList(new ArrayList<>());
private Config config;
@ -78,7 +78,7 @@ public class NextGenLocalRecorder implements Recorder {
public NextGenLocalRecorder(Config config, List<Site> sites) throws IOException {
this.config = config;
recordingManager = new RecordingManager(config, sites);
config.getSettings().models.stream().forEach((m) -> {
config.getSettings().models.stream().forEach(m -> {
if (m.getSite() != null) {
if (m.getSite().isEnabled()) {
models.add(m);
@ -92,9 +92,6 @@ public class NextGenLocalRecorder implements Recorder {
recording = true;
registerEventBusListener();
// if(Config.isServerMode()) {
// processUnfinishedRecordings();
// }
LOG.debug("Recorder initialized");
LOG.info("Models to record: {}", models);
@ -104,24 +101,27 @@ public class NextGenLocalRecorder implements Recorder {
while (!Thread.interrupted()) {
try {
Future<Recording> result = completionService.take();
Recording recording = result.get();
Recording rec = result.get();
recordingsLock.lock();
try {
recordingProcesses.remove(recording.getModel());
recordingProcesses.remove(rec.getModel());
} finally {
recordingsLock.unlock();
}
if (recording.getStatus() == State.WAITING) {
LOG.debug("Download finished for {} -> Starting post-processing", recording.getModel().getName());
submitPostProcessingJob(recording);
if (rec.getStatus() == State.WAITING) {
LOG.debug("Download finished for {} -> Starting post-processing", rec.getModel().getName());
submitPostProcessingJob(rec);
// check, if we have to restart the recording
Model model = recording.getModel();
Model model = rec.getModel();
tryRestartRecording(model);
} else {
setRecordingStatus(recording, State.FAILED);
setRecordingStatus(rec, State.FAILED);
}
} catch (InterruptedException | ExecutionException | IllegalStateException e) {
} catch (ExecutionException | IllegalStateException e) {
LOG.error("Error while completing recording", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Error while completing recording", e);
}
}
@ -171,7 +171,7 @@ public class NextGenLocalRecorder implements Recorder {
}
@Override
public void startRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
public void startRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
if (!models.contains(model)) {
LOG.info("Model {} added", model);
modelLock.lock();
@ -190,7 +190,10 @@ public class NextGenLocalRecorder implements Recorder {
if (model.isOnline()) {
startRecordingProcess(model);
}
} catch (ExecutionException | InterruptedException e) {
} catch (ExecutionException e) {
// noop
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@ -267,7 +270,7 @@ public class NextGenLocalRecorder implements Recorder {
}
}
private boolean deleteIfEmpty(Recording rec) throws IOException, ParseException, PlaylistException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
private boolean deleteIfEmpty(Recording rec) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
rec.refresh();
long sizeInByte = rec.getSizeInByte();
if (sizeInByte == 0) {
@ -279,7 +282,7 @@ public class NextGenLocalRecorder implements Recorder {
}
}
private boolean deleteIfTooShort(Recording rec) throws IOException, ParseException, PlaylistException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
private boolean deleteIfTooShort(Recording rec) throws IOException, ParseException, PlaylistException, InvalidKeyException, NoSuchAlgorithmException {
Duration minimumLengthInSeconds = Duration.ofSeconds(Config.getInstance().getSettings().minimumLengthInSeconds);
if (minimumLengthInSeconds.getSeconds() <= 0) {
return false;
@ -296,7 +299,7 @@ public class NextGenLocalRecorder implements Recorder {
}
@Override
public void stopRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
public void stopRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
modelLock.lock();
try {
if (models.contains(model)) {
@ -314,8 +317,8 @@ public class NextGenLocalRecorder implements Recorder {
recordingsLock.lock();
try {
if (recordingProcesses.containsKey(model)) {
Recording recording = recordingProcesses.get(model);
recording.getDownload().stop();
Recording rec = recordingProcesses.get(model);
rec.getDownload().stop();
}
} finally {
recordingsLock.unlock();
@ -323,7 +326,7 @@ public class NextGenLocalRecorder implements Recorder {
}
@Override
public void switchStreamSource(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
public void switchStreamSource(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
if (models.contains(model)) {
int index = models.indexOf(model);
models.get(index).setStreamUrlIndex(model.getStreamUrlIndex());
@ -331,8 +334,8 @@ public class NextGenLocalRecorder implements Recorder {
LOG.debug("Switching stream source to index {} for model {}", model.getStreamUrlIndex(), model.getName());
recordingsLock.lock();
try {
Recording recording = recordingProcesses.get(model);
if (recording != null) {
Recording rec = recordingProcesses.get(model);
if (rec != null) {
stopRecordingProcess(model);
}
} finally {
@ -341,7 +344,6 @@ public class NextGenLocalRecorder implements Recorder {
tryRestartRecording(model);
} else {
LOG.warn("Couldn't switch stream source for model {}. Not found in list", model.getName());
return;
}
}
@ -349,9 +351,9 @@ public class NextGenLocalRecorder implements Recorder {
recordingsLock.lock();
try {
LOG.debug("Stopping recording for {}", model);
Recording recording = recordingProcesses.get(model);
Recording rec = recordingProcesses.get(model);
LOG.debug("Stopping download for {}", model);
recording.getDownload().stop();
rec.getDownload().stop();
} finally {
recordingsLock.unlock();
}
@ -360,8 +362,8 @@ public class NextGenLocalRecorder implements Recorder {
private void stopRecordingProcesses() {
recordingsLock.lock();
try {
for (Recording recording : recordingProcesses.values()) {
recording.getDownload().stop();
for (Recording rec : recordingProcesses.values()) {
rec.getDownload().stop();
}
} finally {
recordingsLock.unlock();
@ -389,12 +391,12 @@ public class NextGenLocalRecorder implements Recorder {
}
@Override
public List<Recording> getRecordings() throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
public List<Recording> getRecordings() throws IOException, InvalidKeyException, NoSuchAlgorithmException {
return recordingManager.getAll();
}
@Override
public void delete(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
public void delete(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
recordingManager.delete(recording);
}
@ -423,6 +425,7 @@ public class NextGenLocalRecorder implements Recorder {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Error while waiting for downloads to finish", e);
}
}
@ -438,6 +441,7 @@ public class NextGenLocalRecorder implements Recorder {
LOG.info("Waiting for post-processing to finish");
ppPool.awaitTermination(10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Error while waiting for pools to finish", e);
}
}
@ -463,8 +467,8 @@ public class NextGenLocalRecorder implements Recorder {
recordingsLock.lock();
try {
Recording recording = recordingProcesses.get(model);
Optional.ofNullable(recording).map(Recording::getDownload).ifPresent(Download::stop);
Recording rec = recordingProcesses.get(model);
Optional.ofNullable(rec).map(Recording::getDownload).ifPresent(Download::stop);
} finally {
recordingsLock.unlock();
}
@ -485,9 +489,11 @@ public class NextGenLocalRecorder implements Recorder {
config.save();
} else {
LOG.warn("Couldn't resume model {}. Not found in list", model.getName());
return;
}
} catch (ExecutionException | InterruptedException e) {
} catch (ExecutionException e) {
LOG.error("Couldn't check, if model {} is online", model.getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Couldn't check, if model {} is online", model.getName());
} finally {
modelLock.unlock();
@ -515,7 +521,10 @@ public class NextGenLocalRecorder implements Recorder {
return getModels().stream().filter(m -> {
try {
return m.isOnline();
} catch (IOException | ExecutionException | InterruptedException e) {
} catch (IOException | ExecutionException e) {
return false;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}).collect(Collectors.toList());
@ -600,14 +609,11 @@ public class NextGenLocalRecorder implements Recorder {
}
private ThreadFactory createThreadFactory(String name) {
return new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName(name + " " + UUID.randomUUID().toString().substring(0, 8));
t.setDaemon(true);
return t;
}
return r -> {
Thread t = new Thread(r);
t.setName(name + " " + UUID.randomUUID().toString().substring(0, 8));
t.setDaemon(true);
return t;
};
}

View File

@ -23,7 +23,7 @@ import ctbrec.event.ModelStateChangedEvent;
import ctbrec.io.HttpException;
public class OnlineMonitor extends Thread {
private static final transient Logger LOG = LoggerFactory.getLogger(OnlineMonitor.class);
private static final Logger LOG = LoggerFactory.getLogger(OnlineMonitor.class);
private static final boolean IGNORE_CACHE = true;
private volatile boolean running = false;
@ -47,56 +47,74 @@ public class OnlineMonitor extends Thread {
List<Model> models = recorder.getModels();
// remove models, which are not recorded anymore
for (Iterator<Model> iterator = states.keySet().iterator(); iterator.hasNext();) {
Model model = iterator.next();
if(!models.contains(model)) {
iterator.remove();
}
}
removeDeletedModels(models);
// update the currently recorded models
for (Model model : models) {
try {
if(model.isOnline(IGNORE_CACHE)) {
EventBusHolder.BUS.post(new ModelIsOnlineEvent(model));
}
Model.State state = model.getOnlineState(false);
Model.State oldState = states.getOrDefault(model, UNKNOWN);
states.put(model, state);
if(state != oldState) {
EventBusHolder.BUS.post(new ModelStateChangedEvent(model, oldState, state));
}
} catch (HttpException e) {
LOG.error("Couldn't check if model {} is online. HTTP Response: {} - {}",
model.getName(), e.getResponseCode(), e.getResponseMessage());
} catch (SocketTimeoutException e) {
LOG.error("Couldn't check if model {} is online. Request timed out", model.getName());
} catch (InterruptedException | InterruptedIOException e) {
if(running) {
LOG.error("Couldn't check if model {} is online", model.getName(), e);
}
} catch (Exception e) {
LOG.error("Couldn't check if model {} is online", model.getName(), e);
}
}
updateModels(models);
Instant end = Instant.now();
Duration timeCheckTook = Duration.between(begin, end);
LOG.trace("Online check for {} models took {} seconds", models.size(), timeCheckTook.getSeconds());
suspendUntilNextIteration(models, timeCheckTook);
}
LOG.debug("{} terminated", getName());
}
long sleepTime = Config.getInstance().getSettings().onlineCheckIntervalInSecs;
if(timeCheckTook.getSeconds() < sleepTime) {
try {
if (running) {
long millis = TimeUnit.SECONDS.toMillis(sleepTime - timeCheckTook.getSeconds());
LOG.trace("Sleeping {}ms", millis);
Thread.sleep(millis);
}
} catch (InterruptedException e) {
LOG.trace("Sleep interrupted");
}
private void removeDeletedModels(List<Model> models) {
for (Iterator<Model> iterator = states.keySet().iterator(); iterator.hasNext();) {
Model model = iterator.next();
if(!models.contains(model)) {
iterator.remove();
}
}
}
private void updateModels(List<Model> models) {
for (Model model : models) {
updateModel(model);
}
}
private void updateModel(Model model) {
try {
if(model.isOnline(IGNORE_CACHE)) {
EventBusHolder.BUS.post(new ModelIsOnlineEvent(model));
}
Model.State state = model.getOnlineState(false);
Model.State oldState = states.getOrDefault(model, UNKNOWN);
states.put(model, state);
if(state != oldState) {
EventBusHolder.BUS.post(new ModelStateChangedEvent(model, oldState, state));
}
} catch (HttpException e) {
LOG.error("Couldn't check if model {} is online. HTTP Response: {} - {}",
model.getName(), e.getResponseCode(), e.getResponseMessage());
} catch (SocketTimeoutException e) {
LOG.error("Couldn't check if model {} is online. Request timed out", model.getName());
} catch (InterruptedException | InterruptedIOException e) {
Thread.currentThread().interrupt();
if(running) {
LOG.error("Couldn't check if model {} is online", model.getName(), e);
}
} catch (Exception e) {
LOG.error("Couldn't check if model {} is online", model.getName(), e);
}
}
private void suspendUntilNextIteration(List<Model> models, Duration timeCheckTook) {
LOG.trace("Online check for {} models took {} seconds", models.size(), timeCheckTook.getSeconds());
long sleepTime = Config.getInstance().getSettings().onlineCheckIntervalInSecs;
if(timeCheckTook.getSeconds() < sleepTime) {
try {
if (running) {
long millis = TimeUnit.SECONDS.toMillis(sleepTime - timeCheckTook.getSeconds());
LOG.trace("Sleeping {}ms", millis);
Thread.sleep(millis);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.trace("Sleep interrupted");
}
}
LOG.debug(getName() + " terminated");
}
public void shutdown() {

View File

@ -37,13 +37,12 @@ import okhttp3.Response;
public class RemoteRecorder implements Recorder {
private static final transient Logger LOG = LoggerFactory.getLogger(RemoteRecorder.class);
private static final String SUCCESS = "success";
private static final Logger LOG = LoggerFactory.getLogger(RemoteRecorder.class);
public static final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
private Moshi moshi = new Moshi.Builder()
.add(Instant.class, new InstantJsonAdapter())
.add(Model.class, new ModelJsonAdapter())
.build();
private Moshi moshi = new Moshi.Builder().add(Instant.class, new InstantJsonAdapter()).add(Model.class, new ModelJsonAdapter()).build();
private JsonAdapter<ModelListResponse> modelListResponseAdapter = moshi.adapter(ModelListResponse.class);
private JsonAdapter<RecordingListResponse> recordingListResponseAdapter = moshi.adapter(RecordingListResponse.class);
private JsonAdapter<ModelRequest> modelRequestAdapter = moshi.adapter(ModelRequest.class);
@ -62,7 +61,6 @@ public class RemoteRecorder implements Recorder {
private Instant lastSync = Instant.EPOCH;
private SyncThread syncThread;
public RemoteRecorder(Config config, HttpClient client, List<Site> sites) {
this.config = config;
this.client = client;
@ -77,29 +75,27 @@ public class RemoteRecorder implements Recorder {
}
@Override
public void startRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
public void startRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
sendRequest("start", model);
}
@Override
public void stopRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
public void stopRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
sendRequest("stop", model);
}
private void sendRequest(String action, Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
private void sendRequest(String action, Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
String payload = modelRequestAdapter.toJson(new ModelRequest(action, model));
LOG.debug("Sending request to recording server: {}", payload);
RequestBody body = RequestBody.create(JSON, payload);
Request.Builder builder = new Request.Builder()
.url(getRecordingEndpoint())
.post(body);
Request.Builder builder = new Request.Builder().url(getRecordingEndpoint()).post(body);
addHmacIfNeeded(payload, builder);
Request request = builder.build();
try (Response response = client.execute(request)) {
String json = response.body().string();
if (response.isSuccessful()) {
ModelListResponse resp = modelListResponseAdapter.fromJson(json);
if (!resp.status.equals("success")) {
if (!resp.status.equals(SUCCESS)) {
throw new IOException("Server returned error " + resp.status + " " + resp.msg);
}
@ -115,8 +111,8 @@ public class RemoteRecorder implements Recorder {
}
}
private void addHmacIfNeeded(String msg, Builder builder) throws InvalidKeyException, NoSuchAlgorithmException, IllegalStateException, UnsupportedEncodingException {
if(Config.getInstance().getSettings().requireAuthentication) {
private void addHmacIfNeeded(String msg, Builder builder) throws InvalidKeyException, NoSuchAlgorithmException, UnsupportedEncodingException {
if (Config.getInstance().getSettings().requireAuthentication) {
byte[] key = Config.getInstance().getSettings().key;
String hmac = Hmac.calculate(msg, key);
builder.addHeader("CTBREC-HMAC", hmac);
@ -131,7 +127,7 @@ public class RemoteRecorder implements Recorder {
@Override
public boolean isSuspended(Model model) {
int index = models.indexOf(model);
if(index >= 0) {
if (index >= 0) {
Model m = models.get(index);
return m.isSuspended();
} else {
@ -141,10 +137,10 @@ public class RemoteRecorder implements Recorder {
@Override
public List<Model> getModels() {
if(!lastSync.equals(Instant.EPOCH) && lastSync.isBefore(Instant.now().minusSeconds(60))) {
if (!lastSync.equals(Instant.EPOCH) && lastSync.isBefore(Instant.now().minusSeconds(60))) {
throw new RuntimeException("Last sync was over a minute ago");
}
return new ArrayList<Model>(models);
return new ArrayList<>(models);
}
@Override
@ -163,7 +159,7 @@ public class RemoteRecorder implements Recorder {
@Override
public void run() {
running = true;
while(running) {
while (running) {
syncModels();
syncOnlineModels();
syncSpace();
@ -176,14 +172,12 @@ public class RemoteRecorder implements Recorder {
try {
String msg = "{\"action\": \"space\"}";
RequestBody body = RequestBody.create(JSON, msg);
Request.Builder builder = new Request.Builder()
.url(getRecordingEndpoint())
.post(body);
Request.Builder builder = new Request.Builder().url(getRecordingEndpoint()).post(body);
addHmacIfNeeded(msg, builder);
Request request = builder.build();
try(Response response = client.execute(request)) {
try (Response response = client.execute(request)) {
String json = response.body().string();
if(response.isSuccessful()) {
if (response.isSuccessful()) {
JSONObject resp = new JSONObject(json);
spaceTotal = resp.getLong("spaceTotal");
spaceFree = resp.getLong("spaceFree");
@ -200,20 +194,18 @@ public class RemoteRecorder implements Recorder {
try {
String msg = "{\"action\": \"list\"}";
RequestBody body = RequestBody.create(JSON, msg);
Request.Builder builder = new Request.Builder()
.url(getRecordingEndpoint())
.post(body);
Request.Builder builder = new Request.Builder().url(getRecordingEndpoint()).post(body);
addHmacIfNeeded(msg, builder);
Request request = builder.build();
try(Response response = client.execute(request)) {
try (Response response = client.execute(request)) {
String json = response.body().string();
if(response.isSuccessful()) {
if (response.isSuccessful()) {
ModelListResponse resp = modelListResponseAdapter.fromJson(json);
if(resp.status.equals("success")) {
if (resp.status.equals(SUCCESS)) {
models = resp.models;
for (Model model : models) {
for (Site site : sites) {
if(site.isSiteForModel(model)) {
if (site.isSiteForModel(model)) {
model.setSite(site);
}
}
@ -235,16 +227,14 @@ public class RemoteRecorder implements Recorder {
try {
String msg = "{\"action\": \"listOnline\"}";
RequestBody body = RequestBody.create(JSON, msg);
Request.Builder builder = new Request.Builder()
.url(getRecordingEndpoint())
.post(body);
Request.Builder builder = new Request.Builder().url(getRecordingEndpoint()).post(body);
addHmacIfNeeded(msg, builder);
Request request = builder.build();
try (Response response = client.execute(request)) {
String json = response.body().string();
if (response.isSuccessful()) {
ModelListResponse resp = modelListResponseAdapter.fromJson(json);
if (resp.status.equals("success")) {
if (resp.status.equals(SUCCESS)) {
onlineModels = resp.models;
for (Model model : models) {
for (Site site : sites) {
@ -269,27 +259,25 @@ public class RemoteRecorder implements Recorder {
try {
String msg = "{\"action\": \"recordings\"}";
RequestBody body = RequestBody.create(JSON, msg);
Request.Builder builder = new Request.Builder()
.url(getRecordingEndpoint())
.post(body);
Request.Builder builder = new Request.Builder().url(getRecordingEndpoint()).post(body);
addHmacIfNeeded(msg, builder);
Request request = builder.build();
try (Response response = client.execute(request)) {
String json = response.body().string();
if (response.isSuccessful()) {
RecordingListResponse resp = recordingListResponseAdapter.fromJson(json);
if (resp.status.equals("success")) {
if (resp.status.equals(SUCCESS)) {
List<Recording> newRecordings = resp.recordings;
// fire changed events
for (Iterator<Recording> iterator = recordings.iterator(); iterator.hasNext();) {
Recording recording = iterator.next();
if(newRecordings.contains(recording)) {
if (newRecordings.contains(recording)) {
int idx = newRecordings.indexOf(recording);
Recording newRecording = newRecordings.get(idx);
if(newRecording.getStatus() != recording.getStatus()) {
if (newRecording.getStatus() != recording.getStatus()) {
File file = new File(recording.getPath());
RecordingStateChangedEvent evt = new RecordingStateChangedEvent(file,
newRecording.getStatus(), recording.getModel(), recording.getStartDate());
RecordingStateChangedEvent evt = new RecordingStateChangedEvent(file, newRecording.getStatus(), recording.getModel(),
recording.getStartDate());
EventBusHolder.BUS.post(evt);
}
}
@ -311,6 +299,7 @@ public class RemoteRecorder implements Recorder {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// interrupted, probably by stopThread
}
}
@ -339,25 +328,23 @@ public class RemoteRecorder implements Recorder {
}
@Override
public List<Recording> getRecordings() throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
public List<Recording> getRecordings() throws IOException, InvalidKeyException, NoSuchAlgorithmException {
return recordings;
}
@Override
public void delete(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
public void delete(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
RecordingRequest recReq = new RecordingRequest("delete", recording);
String msg = recordingRequestAdapter.toJson(recReq);
RequestBody body = RequestBody.create(JSON, msg);
Request.Builder builder = new Request.Builder()
.url(getRecordingEndpoint())
.post(body);
Request.Builder builder = new Request.Builder().url(getRecordingEndpoint()).post(body);
addHmacIfNeeded(msg, builder);
Request request = builder.build();
try (Response response = client.execute(request)) {
String json = response.body().string();
RecordingListResponse resp = recordingListResponseAdapter.fromJson(json);
if (response.isSuccessful()) {
if (!resp.status.equals("success")) {
if (!resp.status.equals(SUCCESS)) {
throw new IOException("Couldn't delete recording: " + resp.msg);
} else {
recordings.remove(recording);
@ -423,29 +410,29 @@ public class RemoteRecorder implements Recorder {
}
@Override
public void switchStreamSource(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
public void switchStreamSource(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
sendRequest("switch", model);
}
@Override
public void suspendRecording(Model model) throws InvalidKeyException, NoSuchAlgorithmException, IllegalStateException, IOException {
public void suspendRecording(Model model) throws InvalidKeyException, NoSuchAlgorithmException, IOException {
sendRequest("suspend", model);
model.setSuspended(true);
// update cached model
int index = models.indexOf(model);
if(index >= 0) {
if (index >= 0) {
Model m = models.get(index);
m.setSuspended(true);
}
}
@Override
public void resumeRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
public void resumeRecording(Model model) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
sendRequest("resume", model);
model.setSuspended(false);
// update cached model
int index = models.indexOf(model);
if(index >= 0) {
if (index >= 0) {
Model m = models.get(index);
m.setSuspended(false);
}
@ -472,21 +459,19 @@ public class RemoteRecorder implements Recorder {
}
@Override
public void rerunPostProcessing(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException, IllegalStateException {
public void rerunPostProcessing(Recording recording) throws IOException, InvalidKeyException, NoSuchAlgorithmException {
RecordingRequest recReq = new RecordingRequest("rerunPostProcessing", recording);
String msg = recordingRequestAdapter.toJson(recReq);
LOG.debug(msg);
RequestBody body = RequestBody.create(JSON, msg);
Request.Builder builder = new Request.Builder()
.url(getRecordingEndpoint())
.post(body);
Request.Builder builder = new Request.Builder().url(getRecordingEndpoint()).post(body);
addHmacIfNeeded(msg, builder);
Request request = builder.build();
try (Response response = client.execute(request)) {
String json = response.body().string();
SimpleResponse resp = simpleResponseAdapter.fromJson(json);
if (response.isSuccessful()) {
if (!resp.status.equals("success")) {
if (!resp.status.equals(SUCCESS)) {
throw new IOException("Couldn't start post-processing for recording: " + resp.msg);
}
} else {

View File

@ -47,7 +47,7 @@ import okhttp3.Response;
public abstract class AbstractHlsDownload implements Download {
private static final transient Logger LOG = LoggerFactory.getLogger(AbstractHlsDownload.class);
private static final Logger LOG = LoggerFactory.getLogger(AbstractHlsDownload.class);
private static int threadCounter = 0;
protected HttpClient client;
@ -103,9 +103,9 @@ public abstract class AbstractHlsDownload implements Download {
for (TrackData trackData : tracks) {
String uri = trackData.getUri();
if(!uri.startsWith("http")) {
String _url = segmentsUrl.toString();
_url = _url.substring(0, _url.lastIndexOf('/') + 1);
uri = _url + uri;
String tmpurl = segmentsUrl.toString();
tmpurl = tmpurl.substring(0, tmpurl.lastIndexOf('/') + 1);
uri = tmpurl + uri;
}
lsp.totalDuration += trackData.getTrackInfo().duration;
lsp.lastSegDuration = trackData.getTrackInfo().duration;
@ -168,6 +168,7 @@ public abstract class AbstractHlsDownload implements Download {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
playlistEmptyCount = 0;
@ -209,7 +210,9 @@ public abstract class AbstractHlsDownload implements Download {
getModel().getSite().getName(),
Long.toString(recording.getStartDate().getEpochSecond())
};
LOG.debug("Running {}", Arrays.toString(args));
if(LOG.isDebugEnabled()) {
LOG.debug("Running {}", Arrays.toString(args));
}
Process process = rt.exec(args, OS.getEnvironment());
// TODO maybe write these to a separate log file, e.g. recname.ts.pp.log
Thread std = new Thread(new StreamRedirectThread(process.getInputStream(), System.out));

View File

@ -10,17 +10,14 @@ import java.io.InputStream;
import java.net.URL;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@ -51,7 +48,7 @@ import okhttp3.Response;
public class HlsDownload extends AbstractHlsDownload {
private static final transient Logger LOG = LoggerFactory.getLogger(HlsDownload.class);
private static final Logger LOG = LoggerFactory.getLogger(HlsDownload.class);
protected Path downloadDir;
@ -69,7 +66,6 @@ public class HlsDownload extends AbstractHlsDownload {
public void init(Config config, Model model) {
this.config = config;
super.model = model;
startTime = Instant.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(Config.RECORDING_DATE_FORMAT);
String startTime = formatter.format(ZonedDateTime.ofInstant(this.startTime, ZoneId.systemDefault()));
Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getSanitizedNamed());
@ -88,7 +84,7 @@ public class HlsDownload extends AbstractHlsDownload {
String segments = getSegmentPlaylistUrl(model);
if(segments != null) {
if (!Files.exists(downloadDir, LinkOption.NOFOLLOW_LINKS)) {
if (!downloadDir.toFile().exists()) {
Files.createDirectories(downloadDir);
}
int lastSegmentNumber = 0;
@ -103,7 +99,6 @@ public class HlsDownload extends AbstractHlsDownload {
LOG.warn("Missed segments {} < {} in download for {} - setting wait factor to 1/{}", nextSegmentNumber, playlist.seq, model, waitFactor);
}
int skip = nextSegmentNumber - playlist.seq;
Future<Boolean> lastSegmentDownload = null;
for (String segment : playlist.segments) {
if(skip > 0) {
skip--;
@ -111,13 +106,12 @@ public class HlsDownload extends AbstractHlsDownload {
URL segmentUrl = new URL(segment);
String prefix = nf.format(segmentCounter++);
SegmentDownload segmentDownload = new SegmentDownload(playlist, segmentUrl, downloadDir, client, prefix);
lastSegmentDownload = downloadThreadPool.submit(segmentDownload);
//new SegmentDownload(segment, downloadDir).call();
downloadThreadPool.submit(segmentDownload);
}
}
// split recordings
boolean split = splitRecording(lastSegmentDownload);
boolean split = splitRecording();
if (split) {
break;
}
@ -129,13 +123,14 @@ public class HlsDownload extends AbstractHlsDownload {
LOG.trace("Playlist didn't change... waiting for {}ms", wait);
} else {
// playlist did change -> wait for at least last segment duration
wait = 1;//(long) lsp.lastSegDuration * 1000;
wait = 1;
LOG.trace("Playlist changed... waiting for {}ms", wait);
}
try {
Thread.sleep(wait);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if(running) {
LOG.error("Couldn't sleep between segment downloads. This might mess up the download!");
}
@ -175,7 +170,9 @@ public class HlsDownload extends AbstractHlsDownload {
try {
LOG.debug("Waiting for last segments for {}", model);
downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
synchronized (downloadFinished) {
downloadFinished.notifyAll();
}
@ -198,7 +195,7 @@ public class HlsDownload extends AbstractHlsDownload {
}
PlaylistGenerator playlistGenerator = new PlaylistGenerator();
playlistGenerator.addProgressListener(percent -> recording.setProgress(percent));
playlistGenerator.addProgressListener(recording::setProgress);
try {
File playlist = playlistGenerator.generate(recDir);
@ -214,17 +211,21 @@ public class HlsDownload extends AbstractHlsDownload {
} else {
LOG.error("Playlist contains errors");
for (PlaylistError error : e.getErrors()) {
LOG.error("Error: {}", error.toString());
LOG.error("Error: {}", error);
}
}
} catch (InvalidPlaylistException e) {
LOG.error("Playlist is invalid and will be deleted", e);
File playlist = new File(recDir, "playlist.m3u8");
playlist.delete();
try {
Files.delete(playlist.toPath());
} catch (IOException e1) {
LOG.error("Couldn't delete playlist {}", playlist, e1);
}
}
}
private boolean splitRecording(Future<Boolean> lastSegmentDownload) {
private boolean splitRecording() {
if(config.getSettings().splitRecordings > 0) {
Duration recordingDuration = Duration.between(splitRecStartTime, ZonedDateTime.now());
long seconds = recordingDuration.getSeconds();
@ -237,14 +238,15 @@ public class HlsDownload extends AbstractHlsDownload {
}
@Override
public synchronized void stop() {
public void stop() {
if (running) {
internalStop();
try {
synchronized (downloadFinished) {
downloadFinished.wait();
downloadFinished.wait(TimeUnit.SECONDS.toMillis(60));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Couldn't wait for download to finish", e);
}
}
@ -271,7 +273,7 @@ public class HlsDownload extends AbstractHlsDownload {
@Override
public Boolean call() throws Exception {
LOG.trace("Downloading segment to " + file);
LOG.trace("Downloading segment to {}", file);
int maxTries = 3;
for (int i = 1; i <= maxTries; i++) {
Request request = new Request.Builder().url(url).addHeader("connection", "keep-alive").build();
@ -357,6 +359,8 @@ public class HlsDownload extends AbstractHlsDownload {
private void waitSomeTime() {
try {
Thread.sleep(10_000);
} catch (Exception e) {}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

View File

@ -11,7 +11,6 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
@ -50,7 +49,7 @@ import okhttp3.Response;
public class MergedHlsDownload extends AbstractHlsDownload {
private static final transient Logger LOG = LoggerFactory.getLogger(MergedHlsDownload.class);
private static final Logger LOG = LoggerFactory.getLogger(MergedHlsDownload.class);
private static final boolean IGNORE_CACHE = true;
private BlockingMultiMTSSource multiSource;
private Thread mergeThread;
@ -59,7 +58,7 @@ public class MergedHlsDownload extends AbstractHlsDownload {
private Config config;
private File targetFile;
private FileChannel fileChannel = null;
private Object downloadFinished = new Object();
private boolean downloadFinished = false;
public MergedHlsDownload(HttpClient client) {
super(client);
@ -85,11 +84,11 @@ public class MergedHlsDownload extends AbstractHlsDownload {
mergeThread = createMergeThread(targetFile, progressListener, false);
LOG.debug("Merge thread started");
mergeThread.start();
if(Config.getInstance().getSettings().requireAuthentication) {
if (Config.getInstance().getSettings().requireAuthentication) {
URL u = new URL(segmentPlaylistUri);
String path = u.getPath();
byte[] key = Config.getInstance().getSettings().key;
if(!Config.getInstance().getContextPath().isEmpty()) {
if (!Config.getInstance().getContextPath().isEmpty()) {
path = path.substring(Config.getInstance().getContextPath().length());
}
String hmac = Hmac.calculate(path, key);
@ -100,28 +99,29 @@ public class MergedHlsDownload extends AbstractHlsDownload {
LOG.debug("Waiting for merge thread to finish");
mergeThread.join();
LOG.debug("Merge thread finished");
} catch(ParseException e) {
} catch (ParseException e) {
throw new IOException("Couldn't parse stream information", e);
} catch(PlaylistException e) {
} catch (PlaylistException e) {
throw new IOException("Couldn't parse HLS playlist", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Couldn't wait for write thread to finish. Recording might be cut off", e);
} catch (InvalidKeyException | NoSuchAlgorithmException | IllegalStateException e) {
throw new IOException("Couldn't add HMAC to playlist url", e);
} finally {
try {
streamer.stop();
} catch(Exception e) {
} catch (Exception e) {
LOG.error("Couldn't stop streamer", e);
}
downloadThreadPool.shutdown();
try {
LOG.debug("Waiting for last segments for {}", model);
downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {}
synchronized (downloadFinished) {
downloadFinished.notifyAll();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
downloadFinished = true;
LOG.debug("Download terminated for {}", segmentPlaylistUri);
}
}
@ -129,14 +129,13 @@ public class MergedHlsDownload extends AbstractHlsDownload {
@Override
public void start() throws IOException {
try {
if(!model.isOnline(IGNORE_CACHE)) {
throw new IOException(model.getName() +"'s room is not public");
if (!model.isOnline(IGNORE_CACHE)) {
throw new IOException(model.getName() + "'s room is not public");
}
running = true;
super.startTime = Instant.now();
splitRecStartTime = ZonedDateTime.now();
super.model = model;
String segments = getSegmentPlaylistUrl(model);
mergeThread = createMergeThread(targetFile, null, true);
@ -149,16 +148,17 @@ public class MergedHlsDownload extends AbstractHlsDownload {
} else {
throw new IOException("Couldn't determine segments uri");
}
} catch(ParseException e) {
} catch (ParseException e) {
throw new IOException("Couldn't parse stream information", e);
} catch(PlaylistException e) {
} catch (PlaylistException e) {
throw new IOException("Couldn't parse HLS playlist", e);
} catch(EOFException e) {
} catch (EOFException e) {
// end of playlist reached
LOG.debug("Reached end of playlist for model {}", model);
} catch(Exception e) {
} catch (Exception e) {
throw new IOException("Couldn't download segment", e);
} finally {
if (streamer != null) {
try {
streamer.stop();
@ -170,11 +170,10 @@ public class MergedHlsDownload extends AbstractHlsDownload {
try {
LOG.debug("Waiting for last segments for {}", model);
downloadThreadPool.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {}
synchronized (downloadFinished) {
LOG.debug("Download finished notify {}", model);
downloadFinished.notifyAll();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
downloadFinished = true;
LOG.debug("Download for {} terminated", model);
}
}
@ -182,17 +181,17 @@ public class MergedHlsDownload extends AbstractHlsDownload {
private void downloadSegments(String segmentPlaylistUri, boolean livestreamDownload) throws IOException, ParseException, PlaylistException {
int lastSegment = 0;
int nextSegment = 0;
while(running) {
while (running) {
try {
SegmentPlaylist lsp = getNextSegments(segmentPlaylistUri);
emptyPlaylistCheck(lsp);
if(!livestreamDownload) {
if (!livestreamDownload) {
multiSource.setTotalSegments(lsp.segments.size());
}
// download new segments
long downloadStart = System.currentTimeMillis();
if(livestreamDownload) {
if (livestreamDownload) {
downloadNewSegments(lsp, nextSegment);
} else {
downloadRecording(lsp);
@ -200,12 +199,12 @@ public class MergedHlsDownload extends AbstractHlsDownload {
long downloadTookMillis = System.currentTimeMillis() - downloadStart;
// download segments, which might have been skipped
//downloadMissedSegments(lsp, nextSegment);
if(nextSegment > 0 && lsp.seq > nextSegment) {
LOG.warn("Missed segments {} < {} in download for {}. Download took {}ms. Playlist is {}sec", nextSegment, lsp.seq, lsp.url, downloadTookMillis, lsp.totalDuration);
if (nextSegment > 0 && lsp.seq > nextSegment) {
LOG.warn("Missed segments {} < {} in download for {}. Download took {}ms. Playlist is {}sec", nextSegment, lsp.seq, lsp.url,
downloadTookMillis, lsp.totalDuration);
}
if(livestreamDownload) {
if (livestreamDownload) {
// split up the recording, if configured
boolean split = splitRecording();
if (split) {
@ -220,16 +219,16 @@ public class MergedHlsDownload extends AbstractHlsDownload {
} else {
break;
}
} catch(HttpException e) {
if(e.getResponseCode() == 404) {
} catch (HttpException e) {
if (e.getResponseCode() == 404) {
LOG.debug("Playlist not found (404). Model {} probably went offline", model);
} else if(e.getResponseCode() == 403) {
} else if (e.getResponseCode() == 403) {
LOG.debug("Playlist access forbidden (403). Model {} probably went private or offline", model);
} else {
LOG.info("Unexpected error while downloading {}", model, e);
}
running = false;
} catch(Exception e) {
} catch (Exception e) {
LOG.info("Unexpected error while downloading {}", model, e);
running = false;
}
@ -245,19 +244,19 @@ public class MergedHlsDownload extends AbstractHlsDownload {
}
}
private void downloadNewSegments(SegmentPlaylist lsp, int nextSegment) throws MalformedURLException, MissingSegmentException, ExecutionException, HttpException {
private void downloadNewSegments(SegmentPlaylist lsp, int nextSegment) throws MalformedURLException, ExecutionException, HttpException {
int skip = nextSegment - lsp.seq;
// add segments to download threadpool
Queue<Future<byte[]>> downloads = new LinkedList<>();
if(downloadQueue.remainingCapacity() == 0) {
if (downloadQueue.remainingCapacity() == 0) {
LOG.warn("Download to slow for this stream. Download queue is full. Skipping segment");
} else {
for (String segment : lsp.segments) {
if(!running) {
if (!running) {
break;
}
if(skip > 0) {
if (skip > 0) {
skip--;
} else {
URL segmentUrl = new URL(segment);
@ -278,27 +277,28 @@ public class MergedHlsDownload extends AbstractHlsDownload {
byte[] segmentData = downloadFuture.get();
writeSegment(segmentData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Error while downloading segment", e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if(cause instanceof MissingSegmentException) {
if(model != null && !isModelOnline()) {
if (cause instanceof MissingSegmentException) {
if (model != null && !isModelOnline()) {
LOG.debug("Error while downloading segment, because model {} is offline. Stopping now", model.getName());
running = false;
} else {
LOG.debug("Segment not available, but model {} still online. Going on", model.getName());
LOG.debug("Segment not available, but model {} still online. Going on", Optional.ofNullable(model).map(Model::getName).orElse("n/a"));
}
} else if(cause instanceof HttpException) {
} else if (cause instanceof HttpException) {
HttpException he = (HttpException) cause;
if(model != null && !isModelOnline()) {
if (model != null && !isModelOnline()) {
LOG.debug("Error {} while downloading segment, because model {} is offline. Stopping now", he.getResponseCode(), model.getName());
running = false;
} else {
if(he.getResponseCode() == 404) {
LOG.info("Playlist for {} not found [HTTP 404]. Stopping now", model.getName());
if (he.getResponseCode() == 404) {
LOG.info("Playlist for {} not found [HTTP 404]. Stopping now", Optional.ofNullable(model).map(Model::getName).orElse("n/a"));
running = false;
} else if(he.getResponseCode() == 403) {
LOG.info("Playlist for {} not accessible [HTTP 403]. Stopping now", model.getName());
} else if (he.getResponseCode() == 403) {
LOG.info("Playlist for {} not accessible [HTTP 403]. Stopping now", Optional.ofNullable(model).map(Model::getName).orElse("n/a"));
running = false;
} else {
throw he;
@ -318,33 +318,12 @@ public class MergedHlsDownload extends AbstractHlsDownload {
}
private boolean splitRecording() {
if(config.getSettings().splitRecordings > 0) {
if (config.getSettings().splitRecordings > 0) {
Duration recordingDuration = Duration.between(splitRecStartTime, ZonedDateTime.now());
long seconds = recordingDuration.getSeconds();
if(seconds >= config.getSettings().splitRecordings) {
if (seconds >= config.getSettings().splitRecordings) {
internalStop();
return true;
// try {
// File lastTargetFile = targetFile;
//
// // switch to the next file
// targetFile = Config.getInstance().getFileForRecording(model, "ts");
// LOG.debug("Switching to file {}", targetFile.getAbsolutePath());
// fileChannel = FileChannel.open(targetFile.toPath(), CREATE, WRITE);
// MTSSink sink = ByteChannelSink.builder().setByteChannel(fileChannel).build();
// streamer.switchSink(sink);
// super.startTime = Instant.now();
// splitRecStartTime = ZonedDateTime.now();
//
// // post-process current recording
// Thread pp = new Thread(() -> postprocess(lastTargetFile));
// pp.setName("Post-Processing split recording");
// pp.setPriority(Thread.MIN_PRIORITY);
// pp.start();
// } catch (IOException e) {
// LOG.error("Error while splitting recording", e);
// running = false;
// }
}
}
return false;
@ -354,8 +333,8 @@ public class MergedHlsDownload extends AbstractHlsDownload {
try {
long wait = 0;
if (lastSegment == lsp.seq) {
int timeLeftMillis = (int)(lsp.totalDuration * 1000 - downloadTookMillis);
if(timeLeftMillis < 3000) { // we have less than 3 seconds to get the new playlist and start downloading it
int timeLeftMillis = (int) (lsp.totalDuration * 1000 - downloadTookMillis);
if (timeLeftMillis < 3000) { // we have less than 3 seconds to get the new playlist and start downloading it
wait = 1;
} else {
// wait a second to be nice to the server (don't hammer it with requests)
@ -365,11 +344,12 @@ public class MergedHlsDownload extends AbstractHlsDownload {
LOG.trace("Playlist didn't change... waiting for {}ms", wait);
} else {
// playlist did change -> wait for at least last segment duration
wait = 1;// (long) lsp.lastSegDuration * 1000;
wait = 1;
LOG.trace("Playlist changed... waiting for {}ms", wait);
}
Thread.sleep(wait);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (running) {
LOG.error("Couldn't sleep between segment downloads. This might mess up the download!");
}
@ -377,15 +357,20 @@ public class MergedHlsDownload extends AbstractHlsDownload {
}
@Override
public synchronized void stop() {
public void stop() {
if (running) {
internalStop();
try {
synchronized (downloadFinished) {
LOG.debug("Waiting for finished notify {}", model);
downloadFinished.wait();
internalStop();
int count = 0;
while (!downloadFinished && count++ < 60) {
LOG.debug("Waiting for download to finish {}", model);
Thread.sleep(1000);
}
if(!downloadFinished) {
LOG.warn("Download didn't finishe properly for model {}", model);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Couldn't wait for download to finish", e);
}
LOG.debug("Download stopped");
@ -393,7 +378,7 @@ public class MergedHlsDownload extends AbstractHlsDownload {
}
@Override
void internalStop() {
synchronized void internalStop() {
running = false;
if (streamer != null) {
streamer.stop();
@ -403,31 +388,24 @@ public class MergedHlsDownload extends AbstractHlsDownload {
private Thread createMergeThread(File targetFile, ProgressListener listener, boolean liveStream) {
Thread t = new Thread(() -> {
multiSource = BlockingMultiMTSSource.builder()
.setFixContinuity(true)
.setProgressListener(listener)
.build();
multiSource = BlockingMultiMTSSource.builder().setFixContinuity(true).setProgressListener(listener).build();
try {
Path downloadDir = targetFile.getParentFile().toPath();
if (!Files.exists(downloadDir, LinkOption.NOFOLLOW_LINKS)) {
if (!downloadDir.toFile().exists()) {
Files.createDirectories(downloadDir);
}
fileChannel = FileChannel.open(targetFile.toPath(), CREATE, WRITE);
MTSSink sink = ByteChannelSink.builder().setByteChannel(fileChannel).build();
streamer = Streamer.builder()
.setSource(multiSource)
.setSink(sink)
.setSleepingEnabled(liveStream)
.setBufferSize(10)
.setName(Optional.ofNullable(model).map(m -> m.getName()).orElse(""))
.build();
streamer = Streamer.builder().setSource(multiSource).setSink(sink).setSleepingEnabled(liveStream).setBufferSize(10)
.setName(Optional.ofNullable(model).map(Model::getName).orElse("")).build();
// Start streaming
streamer.stream();
LOG.debug("Streamer finished");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (running) {
LOG.error("Error while waiting for a download future", e);
}
@ -478,27 +456,27 @@ public class MergedHlsDownload extends AbstractHlsDownload {
@Override
public byte[] call() throws IOException {
LOG.trace("Downloading segment " + url.getFile());
LOG.trace("Downloading segment {}", url.getFile());
int maxTries = 3;
for (int i = 1; i <= maxTries && running; i++) {
Request request = new Request.Builder().url(url).addHeader("connection", "keep-alive").build();
try (Response response = client.execute(request)) {
if(response.isSuccessful()) {
if (response.isSuccessful()) {
byte[] segment = response.body().bytes();
if(lsp.encrypted) {
if (lsp.encrypted) {
segment = new Crypto(lsp.encryptionKeyUrl, client).decrypt(segment);
}
return segment;
} else {
throw new HttpException(response.code(), response.message());
}
} catch(Exception e) {
} catch (Exception e) {
if (i == maxTries) {
LOG.error("Error while downloading segment. Segment {} finally failed", url.getFile());
} else {
LOG.trace("Error while downloading segment {} on try {}", url.getFile(), i, e);
}
if(model != null && !isModelOnline()) {
if (model != null && !isModelOnline()) {
break;
}
}
@ -510,7 +488,10 @@ public class MergedHlsDownload extends AbstractHlsDownload {
public boolean isModelOnline() {
try {
return model.isOnline(IGNORE_CACHE);
} catch (IOException | ExecutionException | InterruptedException e) {
} catch (IOException | ExecutionException e) {
return false;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}

View File

@ -107,13 +107,10 @@ public class Streamer {
boolean resetState = false;
MTSPacket packet = null;
long packetCount = 0;
//long pcrPidPacketCount = 0;
Long firstPcrValue = null;
Long firstPcrTime = null;
//Long firstPcrPacketCount = null;
Long lastPcrValue = null;
Long lastPcrTime = null;
//Long lastPcrPacketCount = null;
Long averageSleep = null;
while (!streamingShouldStop) {
if (resetState) {
@ -145,6 +142,7 @@ public class Streamer {
log.error("Interrupted while waiting for packet");
continue;
} else {
Thread.currentThread().interrupt();
break;
}
}
@ -164,100 +162,78 @@ public class Streamer {
}
}
if (pid != 0 && patSection!=null) {
if (patSection.getPrograms().values().contains(pid)) {
if (packet.isPayloadUnitStartIndicator()) {
ByteBuffer payload = packet.getPayload();
payload.rewind();
int pointer = payload.get() & 0xff;
payload.position(payload.position() + pointer);
pmtSection.put(pid, PMTSection.parse(payload));
}
}
if (pid != 0 && patSection!=null && patSection.getPrograms().values().contains(pid) && packet.isPayloadUnitStartIndicator()) {
ByteBuffer payload = packet.getPayload();
payload.rewind();
int pointer = payload.get() & 0xff;
payload.position(payload.position() + pointer);
pmtSection.put(pid, PMTSection.parse(payload));
}
// Check PID matches PCR PID
if (true) {//mtsPacket.pid == pmt.getPcrPid()) {
//pcrPidPacketCount++;
if (averageSleep != null) {
sleepNanos = averageSleep;
} else {
// if (pcrPidPacketCount < 2) {
// if (pcrPidPacketCount % 10 == 0) {
// sleepNanos = 15;
// }
// }
}
if (averageSleep != null) {
sleepNanos = averageSleep;
}
// Check for PCR
if (packet.getAdaptationField() != null) {
if (packet.getAdaptationField().getPcr() != null) {
if (packet.getPid() == getPCRPid()) {
if (!packet.getAdaptationField().isDiscontinuityIndicator()) {
// Get PCR and current nano time
long pcrValue = packet.getAdaptationField().getPcr().getValue();
long pcrTime = System.nanoTime();
if (packet.getAdaptationField() != null && packet.getAdaptationField().getPcr() != null) {
if (packet.getPid() == getPCRPid()) {
if (!packet.getAdaptationField().isDiscontinuityIndicator()) {
// Get PCR and current nano time
long pcrValue = packet.getAdaptationField().getPcr().getValue();
long pcrTime = System.nanoTime();
// Compute sleepNanosOrig
if (firstPcrValue == null || firstPcrTime == null) {
firstPcrValue = pcrValue;
firstPcrTime = pcrTime;
//firstPcrPacketCount = pcrPidPacketCount;
}
// Compute sleepNanosPrevious
Long sleepNanosPrevious = null;
if (lastPcrValue != null && lastPcrTime != null) {
if (pcrValue <= lastPcrValue) {
log.trace("PCR discontinuity ! " + packet.getPid());
resetState = true;
} else {
sleepNanosPrevious = ((pcrValue - lastPcrValue) / 27 * 1000) - (pcrTime - lastPcrTime);
}
}
// System.out.println("pcrValue=" + pcrValue + ", lastPcrValue=" + lastPcrValue + ", sleepNanosPrevious=" + sleepNanosPrevious + ", sleepNanosOrig=" + sleepNanosOrig);
// Set sleep time based on PCR if possible
if (sleepNanosPrevious != null) {
// Safety : We should never have to wait more than 100ms
if (sleepNanosPrevious > 100000000) {
log.warn("PCR sleep ignored, too high !");
resetState = true;
} else {
sleepNanos = sleepNanosPrevious;
// averageSleep = sleepNanosPrevious / (pcrPidPacketCount - lastPcrPacketCount - 1);
}
}
// Set lastPcrValue/lastPcrTime
lastPcrValue = pcrValue;
lastPcrTime = pcrTime + sleepNanos;
//lastPcrPacketCount = pcrPidPacketCount;
} else {
log.warn("Skipped PCR - Discontinuity indicator");
// Compute sleepNanosOrig
if (firstPcrValue == null || firstPcrTime == null) {
firstPcrValue = pcrValue;
firstPcrTime = pcrTime;
}
// Compute sleepNanosPrevious
Long sleepNanosPrevious = null;
if (lastPcrValue != null && lastPcrTime != null) {
if (pcrValue <= lastPcrValue) {
log.trace("PCR discontinuity ! {}", packet.getPid());
resetState = true;
} else {
sleepNanosPrevious = ((pcrValue - lastPcrValue) / 27 * 1000) - (pcrTime - lastPcrTime);
}
}
// Set sleep time based on PCR if possible
if (sleepNanosPrevious != null) {
// Safety : We should never have to wait more than 100ms
if (sleepNanosPrevious > 100000000) {
log.warn("PCR sleep ignored, too high !");
resetState = true;
} else {
sleepNanos = sleepNanosPrevious;
}
}
// Set lastPcrValue/lastPcrTime
lastPcrValue = pcrValue;
lastPcrTime = pcrTime + sleepNanos;
} else {
log.debug("Skipped PCR - PID does not match");
log.warn("Skipped PCR - Discontinuity indicator");
}
} else {
log.debug("Skipped PCR - PID does not match");
}
}
// Sleep if needed
if (sleepNanos > 0 && sleepingEnabled) {
log.trace("Sleeping " + sleepNanos / 1000000 + " millis, " + sleepNanos % 1000000 + " nanos");
log.trace("Sleeping {} millis, {} nanos", sleepNanos / 1000000, sleepNanos % 1000000);
try {
Thread.sleep(sleepNanos / 1000000, (int) (sleepNanos % 1000000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Streaming sleep interrupted!");
}
}
// Stream packet
// System.out.println("Streaming packet #" + packetCount + ", PID=" + mtsPacket.getPid() + ", pcrCount=" + pcrCount + ", continuityCounter=" + mtsPacket.getContinuityCounter());
if(!streamingShouldStop && !Thread.interrupted()) {
try {
sink.send(packet);
@ -289,18 +265,12 @@ public class Streamer {
while (!streamingShouldStop && (packet = source.nextPacket()) != null) {
boolean put = false;
while (!put) {
try {
buffer.put(packet);
put = true;
} catch (InterruptedException ignored) {
log.error("Error adding packet to buffer", ignored);
}
put = putPacketToBuffer(packet);
}
}
} catch (InterruptedException e) {
if(!streamingShouldStop) {
log.error("Error reading from source", e);
}
Thread.currentThread().interrupt();
log.error("Error reading from source", e);
} catch (Exception e) {
log.error("Error reading from source", e);
} finally {
@ -313,9 +283,19 @@ public class Streamer {
}
}
private boolean putPacketToBuffer(MTSPacket packet) {
try {
buffer.put(packet);
return true;
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
log.error("Error adding packet to buffer", ignored);
return false;
}
}
private int getPCRPid() {
if ((!pmtSection.isEmpty())) {
// TODO change this
return pmtSection.values().iterator().next().getPcrPid();
}
return -1;