forked from j62/ctbrec
Fix file handles not released for failed segments
This commit is contained in:
parent
1abb2a8f79
commit
49a581446b
|
@ -4,6 +4,7 @@
|
||||||
group is already or could be recorded used an potentially outdated model
|
group is already or could be recorded used an potentially outdated model
|
||||||
object from the persisted groups.json file. Now the model state is updated
|
object from the persisted groups.json file. Now the model state is updated
|
||||||
before performing the check.
|
before performing the check.
|
||||||
|
* Fixed: File handles not released for failed segments
|
||||||
|
|
||||||
4.7.13
|
4.7.13
|
||||||
========================
|
========================
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import javax.xml.bind.JAXBException;
|
import javax.xml.bind.JAXBException;
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
|
import java.net.MalformedURLException;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
|
@ -66,7 +67,7 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
|
||||||
protected int nextSegmentNumber = 0;
|
protected int nextSegmentNumber = 0;
|
||||||
protected String segmentPlaylistUrl;
|
protected String segmentPlaylistUrl;
|
||||||
|
|
||||||
private Instant beforeLastPlaylistRequest= Instant.EPOCH;
|
private Instant beforeLastPlaylistRequest = Instant.EPOCH;
|
||||||
private int consecutivePlaylistTimeouts = 0;
|
private int consecutivePlaylistTimeouts = 0;
|
||||||
private int consecutivePlaylistErrors = 0;
|
private int consecutivePlaylistErrors = 0;
|
||||||
protected Instant lastSegmentDownload = Instant.MIN;
|
protected Instant lastSegmentDownload = Instant.MIN;
|
||||||
|
@ -93,6 +94,11 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
|
||||||
lastSegmentDownload = Instant.now();
|
lastSegmentDownload = Instant.now();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void segmentDownloadFailed(SegmentDownload segmentDownload) {
|
||||||
|
LOG.info("Segment download failed with", segmentDownload.getException());
|
||||||
|
stopRecordingOnHighSegmentErrorCount();
|
||||||
|
}
|
||||||
|
|
||||||
protected abstract void internalStop();
|
protected abstract void internalStop();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -162,12 +168,17 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
|
||||||
Future<SegmentDownload> future;
|
Future<SegmentDownload> future;
|
||||||
while ((future = segmentDownloadService.poll()) != null) {
|
while ((future = segmentDownloadService.poll()) != null) {
|
||||||
try {
|
try {
|
||||||
segmentDownloadFinished(future.get());
|
SegmentDownload segmentDownload = future.get();
|
||||||
|
if (segmentDownload.isFailed()) {
|
||||||
|
segmentDownloadFailed(segmentDownload);
|
||||||
|
} else {
|
||||||
|
segmentDownloadFinished(segmentDownload);
|
||||||
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
LOG.error("Thread interrupted during segment download", e);
|
LOG.error("Thread interrupted during segment download", e);
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
// Something went wrong during the segment download.
|
// Something unexpected went wrong during the segment download.
|
||||||
// At this point we have taken care of that, but we take note of the error and if a certain threshold of errors is exceeded in a certain
|
// At this point we have taken care of that, but we take note of the error and if a certain threshold of errors is exceeded in a certain
|
||||||
// amount of time, we stop the download and hope for the best to get routed to a better server.
|
// amount of time, we stop the download and hope for the best to get routed to a better server.
|
||||||
stopRecordingOnHighSegmentErrorCount();
|
stopRecordingOnHighSegmentErrorCount();
|
||||||
|
@ -344,7 +355,7 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void emptyPlaylistCheck(SegmentPlaylist playlist) {
|
protected void emptyPlaylistCheck(SegmentPlaylist playlist) {
|
||||||
if(playlist.segments.isEmpty()) {
|
if (playlist.segments.isEmpty()) {
|
||||||
playlistEmptyCount++;
|
playlistEmptyCount++;
|
||||||
try {
|
try {
|
||||||
Thread.sleep(6000);
|
Thread.sleep(6000);
|
||||||
|
@ -354,7 +365,7 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
|
||||||
} else {
|
} else {
|
||||||
playlistEmptyCount = 0;
|
playlistEmptyCount = 0;
|
||||||
}
|
}
|
||||||
if(playlistEmptyCount == 10) {
|
if (playlistEmptyCount == 10) {
|
||||||
LOG.info("Last 10 playlists were empty for {}. Stopping recording!", getModel());
|
LOG.info("Last 10 playlists were empty for {}. Stopping recording!", getModel());
|
||||||
internalStop();
|
internalStop();
|
||||||
}
|
}
|
||||||
|
@ -362,7 +373,7 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
|
||||||
|
|
||||||
private void handleMissedSegments(SegmentPlaylist playlist, int nextSegmentNumber) throws IOException {
|
private void handleMissedSegments(SegmentPlaylist playlist, int nextSegmentNumber) throws IOException {
|
||||||
if (nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) {
|
if (nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) {
|
||||||
recordingEvents.add(RecordingEvent.of("Missed segments: "+nextSegmentNumber+" < " + playlist.seq));
|
recordingEvents.add(RecordingEvent.of("Missed segments: " + nextSegmentNumber + " < " + playlist.seq));
|
||||||
if (config.getSettings().logMissedSegments) {
|
if (config.getSettings().logMissedSegments) {
|
||||||
File hlsEventsFile = File.createTempFile("rec_evt_" + Instant.now() + "_" + model.getSanitizedNamed(), ".log");
|
File hlsEventsFile = File.createTempFile("rec_evt_" + Instant.now() + "_" + model.getSanitizedNamed(), ".log");
|
||||||
try (OutputStream outputStream = Files.newOutputStream(hlsEventsFile.toPath(), CREATE, WRITE, TRUNCATE_EXISTING)) {
|
try (OutputStream outputStream = Files.newOutputStream(hlsEventsFile.toPath(), CREATE, WRITE, TRUNCATE_EXISTING)) {
|
||||||
|
@ -396,12 +407,16 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
|
||||||
String prefix = nf.format(segmentCounter++);
|
String prefix = nf.format(segmentCounter++);
|
||||||
segment.prefix = prefix;
|
segment.prefix = prefix;
|
||||||
OutputStream targetStream = getSegmentOutputStream(segment);
|
OutputStream targetStream = getSegmentOutputStream(segment);
|
||||||
SegmentDownload segmentDownload = new SegmentDownload(model, playlist, segment, client, targetStream);
|
SegmentDownload segmentDownload = getSegmentDownload(playlist, segment, targetStream);
|
||||||
execute(segmentDownload);
|
execute(segmentDownload);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected SegmentDownload getSegmentDownload(SegmentPlaylist playlist, Segment segment, OutputStream targetStream) throws MalformedURLException {
|
||||||
|
return new SegmentDownload(model, playlist, segment, client, targetStream);
|
||||||
|
}
|
||||||
|
|
||||||
private void calculateRescheduleTime() {
|
private void calculateRescheduleTime() {
|
||||||
rescheduleTime = beforeLastPlaylistRequest.plusMillis(1000);
|
rescheduleTime = beforeLastPlaylistRequest.plusMillis(1000);
|
||||||
recordingEvents.add(RecordingEvent.of("next playlist download scheduled for " + rescheduleTime.toString()));
|
recordingEvents.add(RecordingEvent.of("next playlist download scheduled for " + rescheduleTime.toString()));
|
||||||
|
@ -417,7 +432,7 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
|
||||||
Thread.sleep(waitForMillis);
|
Thread.sleep(waitForMillis);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
if(running) {
|
if (running) {
|
||||||
LOG.error("Couldn't sleep. This might mess up the download!");
|
LOG.error("Couldn't sleep. This might mess up the download!");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -210,9 +210,8 @@ public class HlsDownload extends AbstractHlsDownload {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void segmentDownloadFinished(SegmentDownload segmentDownload) {
|
protected SegmentDownload getSegmentDownload(SegmentPlaylist playlist, Segment segment, OutputStream targetStream) throws MalformedURLException {
|
||||||
super.segmentDownloadFinished(segmentDownload);
|
return new MultiFileSegmentDownload(getModel(), playlist, segment, client, targetStream);
|
||||||
IoUtils.close(segmentDownload.getOutputStream(), "Couldn't close segment file");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,24 @@
|
||||||
|
package ctbrec.recorder.download.hls;
|
||||||
|
|
||||||
|
import ctbrec.Model;
|
||||||
|
import ctbrec.io.HttpClient;
|
||||||
|
import ctbrec.io.IoUtils;
|
||||||
|
import ctbrec.recorder.download.hls.SegmentPlaylist.Segment;
|
||||||
|
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.net.MalformedURLException;
|
||||||
|
|
||||||
|
public class MultiFileSegmentDownload extends SegmentDownload {
|
||||||
|
public MultiFileSegmentDownload(Model model, SegmentPlaylist playlist, Segment segment, HttpClient client, OutputStream out) throws MalformedURLException {
|
||||||
|
super(model, playlist, segment, client, out);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SegmentDownload call() {
|
||||||
|
try {
|
||||||
|
return super.call();
|
||||||
|
} finally {
|
||||||
|
IoUtils.close(getOutputStream(), "Couldn't close segment file " + getSegment().targetFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,28 +1,5 @@
|
||||||
package ctbrec.recorder.download.hls;
|
package ctbrec.recorder.download.hls;
|
||||||
|
|
||||||
import static ctbrec.ErrorMessages.HTTP_RESPONSE_BODY_IS_NULL;
|
|
||||||
import static ctbrec.recorder.download.hls.AbstractHlsDownload.*;
|
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.InterruptedIOException;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.net.MalformedURLException;
|
|
||||||
import java.net.URL;
|
|
||||||
import java.security.InvalidAlgorithmParameterException;
|
|
||||||
import java.security.InvalidKeyException;
|
|
||||||
import java.security.NoSuchAlgorithmException;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
|
|
||||||
import javax.crypto.NoSuchPaddingException;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import ctbrec.Model;
|
import ctbrec.Model;
|
||||||
import ctbrec.io.BandwidthMeter;
|
import ctbrec.io.BandwidthMeter;
|
||||||
import ctbrec.io.HttpClient;
|
import ctbrec.io.HttpClient;
|
||||||
|
@ -32,16 +9,37 @@ import ctbrec.recorder.download.hls.SegmentPlaylist.Segment;
|
||||||
import okhttp3.Request;
|
import okhttp3.Request;
|
||||||
import okhttp3.Request.Builder;
|
import okhttp3.Request.Builder;
|
||||||
import okhttp3.Response;
|
import okhttp3.Response;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import javax.crypto.NoSuchPaddingException;
|
||||||
|
import java.io.*;
|
||||||
|
import java.net.MalformedURLException;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.security.InvalidAlgorithmParameterException;
|
||||||
|
import java.security.InvalidKeyException;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
|
import static ctbrec.ErrorMessages.HTTP_RESPONSE_BODY_IS_NULL;
|
||||||
|
import static ctbrec.recorder.download.hls.AbstractHlsDownload.addHeaders;
|
||||||
|
|
||||||
public class SegmentDownload implements Callable<SegmentDownload> {
|
public class SegmentDownload implements Callable<SegmentDownload> {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(SegmentDownload.class);
|
private static final Logger LOG = LoggerFactory.getLogger(SegmentDownload.class);
|
||||||
|
|
||||||
private final URL url;
|
protected final URL url;
|
||||||
private final HttpClient client;
|
protected final HttpClient client;
|
||||||
private final SegmentPlaylist playlist;
|
protected final SegmentPlaylist playlist;
|
||||||
private final Segment segment;
|
protected final Segment segment;
|
||||||
private final Model model;
|
protected final Model model;
|
||||||
private final OutputStream out;
|
protected final OutputStream out;
|
||||||
|
|
||||||
|
protected boolean failed = false;
|
||||||
|
|
||||||
|
protected Exception exception;
|
||||||
|
|
||||||
public SegmentDownload(Model model, SegmentPlaylist playlist, Segment segment, HttpClient client, OutputStream out) throws MalformedURLException {
|
public SegmentDownload(Model model, SegmentPlaylist playlist, Segment segment, HttpClient client, OutputStream out) throws MalformedURLException {
|
||||||
this.model = model;
|
this.model = model;
|
||||||
|
@ -53,7 +51,7 @@ public class SegmentDownload implements Callable<SegmentDownload> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SegmentDownload call() throws InvalidAlgorithmParameterException, NoSuchPaddingException, NoSuchAlgorithmException, IOException, InvalidKeyException {
|
public SegmentDownload call() {
|
||||||
for (int tries = 1; tries <= 3 && !Thread.currentThread().isInterrupted(); tries++) { // NOSONAR
|
for (int tries = 1; tries <= 3 && !Thread.currentThread().isInterrupted(); tries++) { // NOSONAR
|
||||||
Request request = createRequest();
|
Request request = createRequest();
|
||||||
try (Response response = client.execute(request)) {
|
try (Response response = client.execute(request)) {
|
||||||
|
@ -61,13 +59,18 @@ public class SegmentDownload implements Callable<SegmentDownload> {
|
||||||
break;
|
break;
|
||||||
} catch (FileNotFoundException e) {
|
} catch (FileNotFoundException e) {
|
||||||
LOG.debug("Segment does not exist {}", url.getFile());
|
LOG.debug("Segment does not exist {}", url.getFile());
|
||||||
|
failed = true;
|
||||||
|
exception = e;
|
||||||
break;
|
break;
|
||||||
} catch (InterruptedIOException e) {
|
} catch (InterruptedIOException e) {
|
||||||
|
failed = true;
|
||||||
|
exception = e;
|
||||||
break;
|
break;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (tries == 3) {
|
if (tries == 3) {
|
||||||
LOG.warn("Error while downloading segment for {}. Segment {} finally failed: {}", model, url.getFile(), e.getMessage());
|
LOG.warn("Error while downloading segment for {}. Segment {} finally failed: {}", model, url.getFile(), e.getMessage());
|
||||||
throw e;
|
failed = true;
|
||||||
|
exception = e;
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Error while downloading segment {} for {} on try {} - {}", url.getFile(), model, tries, e.getMessage());
|
LOG.debug("Error while downloading segment {} for {} on try {} - {}", url.getFile(), model, tries, e.getMessage());
|
||||||
}
|
}
|
||||||
|
@ -76,7 +79,7 @@ public class SegmentDownload implements Callable<SegmentDownload> {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleResponse(Response response) throws InvalidKeyException, NoSuchAlgorithmException, NoSuchPaddingException, InvalidAlgorithmParameterException, IOException {
|
protected void handleResponse(Response response) throws InvalidKeyException, NoSuchAlgorithmException, NoSuchPaddingException, InvalidAlgorithmParameterException, IOException {
|
||||||
if (response.isSuccessful()) {
|
if (response.isSuccessful()) {
|
||||||
InputStream in = Objects.requireNonNull(response.body(), HTTP_RESPONSE_BODY_IS_NULL).byteStream();
|
InputStream in = Objects.requireNonNull(response.body(), HTTP_RESPONSE_BODY_IS_NULL).byteStream();
|
||||||
if (playlist.encrypted) {
|
if (playlist.encrypted) {
|
||||||
|
@ -94,7 +97,7 @@ public class SegmentDownload implements Callable<SegmentDownload> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Request createRequest() {
|
protected Request createRequest() {
|
||||||
Builder builder = new Request.Builder().url(url);
|
Builder builder = new Request.Builder().url(url);
|
||||||
addHeaders(builder, Optional.ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentHeaders).orElse(new HashMap<>()), model);
|
addHeaders(builder, Optional.ofNullable(model).map(Model::getHttpHeaderFactory).map(HttpHeaderFactory::createSegmentHeaders).orElse(new HashMap<>()), model);
|
||||||
return builder.build();
|
return builder.build();
|
||||||
|
@ -107,4 +110,12 @@ public class SegmentDownload implements Callable<SegmentDownload> {
|
||||||
public Segment getSegment() {
|
public Segment getSegment() {
|
||||||
return segment;
|
return segment;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isFailed() {
|
||||||
|
return failed;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Exception getException() {
|
||||||
|
return exception;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue