Merge branch 'fix-choking-dev' into reusedname-dev
This commit is contained in:
commit
7e5e7d2cb1
|
@ -38,7 +38,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
|
|||
@Slf4j
|
||||
public abstract class HttpClient {
|
||||
@Getter
|
||||
private static final ConnectionPool GLOBAL_HTTP_CONN_POOL = new ConnectionPool(10, 2, TimeUnit.MINUTES);
|
||||
private static final ConnectionPool GLOBAL_HTTP_CONN_POOL = new ConnectionPool(256, 2, TimeUnit.MINUTES);
|
||||
|
||||
@Getter
|
||||
protected CookieJarImpl cookieJar;
|
||||
|
|
|
@ -37,6 +37,7 @@ import static ctbrec.SubsequentAction.*;
|
|||
import static ctbrec.event.Event.Type.MODEL_ONLINE;
|
||||
import static java.lang.Thread.MAX_PRIORITY;
|
||||
import static java.lang.Thread.MIN_PRIORITY;
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
|
||||
@Slf4j
|
||||
|
@ -62,7 +63,7 @@ public class SimplifiedLocalRecorder implements Recorder {
|
|||
private final ThreadPoolExecutor postProcessing;
|
||||
private final Thread maintenanceThread;
|
||||
private long lastSpaceCheck;
|
||||
|
||||
|
||||
|
||||
public SimplifiedLocalRecorder(Config config, List<Site> sites) throws IOException {
|
||||
this.config = config;
|
||||
|
@ -260,6 +261,7 @@ public class SimplifiedLocalRecorder implements Recorder {
|
|||
} catch (RejectedExecutionException e) {
|
||||
log.error("Could not start post-processing for {} {}:{}. Execution rejected by thread pool", recording, recording.getModel().getSite().getName(), recording.getModel().getDisplayName());
|
||||
}
|
||||
log.trace("submitPostProcessingJob(): {}", postProcessing);
|
||||
}
|
||||
|
||||
private void runPostProcessing(Recording recording) throws IOException, InterruptedException {
|
||||
|
@ -921,7 +923,7 @@ public class SimplifiedLocalRecorder implements Recorder {
|
|||
log.info("Resuming recorder");
|
||||
running = true;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean isForcePriority(Model model) {
|
||||
return findModel(model).map(Model::isForcePriority).orElse(false);
|
||||
|
|
|
@ -302,7 +302,7 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
|
|||
throw new HttpException(response.code(), response.message());
|
||||
}
|
||||
} catch (SocketTimeoutException e) {
|
||||
LOG.debug("Playlist request timed out ({}ms) for model {} {} time{}", config.getSettings().playlistRequestTimeout, model,
|
||||
LOG.debug("Playlist request timed out ({}ms) for model {}:{} {} time{}", config.getSettings().playlistRequestTimeout, model.getSite().getName(), model,
|
||||
++consecutivePlaylistTimeouts, (consecutivePlaylistTimeouts > 1) ? 's' : "");
|
||||
// times out, return an empty playlist, so that the process can continue without wasting much more time
|
||||
recordingEvents.add(RecordingEvent.of("Playlist request timed out " + consecutivePlaylistTimeouts));
|
||||
|
@ -310,7 +310,7 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
|
|||
} catch (Exception e) {
|
||||
consecutivePlaylistErrors++;
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private SegmentPlaylist parsePlaylist(String segmentPlaylistUrl, InputStream inputStream) throws IOException, ParseException, PlaylistException {
|
||||
|
|
|
@ -56,7 +56,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
|
|||
startFfmpegProcess(targetFile);
|
||||
if (ffmpegProcess == null) {
|
||||
throw new ProcessExitedUncleanException("Couldn't spawn FFmpeg");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -81,6 +81,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
|
|||
downloadExecutor.submit(() -> {
|
||||
ffmpegStreamLock.lock();
|
||||
try {
|
||||
LOG.trace("Starting streaming segments to file {}", targetFile);
|
||||
while (!queue.isEmpty() && !Thread.currentThread().isInterrupted()) {
|
||||
try {
|
||||
Future<SegmentDownload> future = queue.peek();
|
||||
|
@ -95,11 +96,13 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
|
|||
break;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.trace("Segment download interrupted for model {}", model, e);
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Segment download failed for model {}", model, e);
|
||||
}
|
||||
}
|
||||
LOG.trace("Finishing streaming segments to file {}", targetFile);
|
||||
} finally {
|
||||
ffmpegStreamLock.unlock();
|
||||
}
|
||||
|
@ -146,6 +149,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload {
|
|||
@Override
|
||||
protected void execute(SegmentDownload segmentDownload) {
|
||||
queue.add(segmentDownloadService.submit(segmentDownload));
|
||||
LOG.trace("Enqueuing segment for file {}", targetFile);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -12,10 +12,13 @@ import okhttp3.Response;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.datatype.jsr310.deser.DurationDeserializer;
|
||||
|
||||
import javax.crypto.NoSuchPaddingException;
|
||||
import java.io.*;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.net.URI;
|
||||
import java.security.InvalidAlgorithmParameterException;
|
||||
import java.security.InvalidKeyException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
|
@ -23,6 +26,8 @@ import java.util.HashMap;
|
|||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.time.Instant;
|
||||
import java.time.Duration;
|
||||
|
||||
import static ctbrec.ErrorMessages.HTTP_RESPONSE_BODY_IS_NULL;
|
||||
import static ctbrec.recorder.download.hls.AbstractHlsDownload.addHeaders;
|
||||
|
@ -36,6 +41,7 @@ public class SegmentDownload implements Callable<SegmentDownload> {
|
|||
protected final Segment segment;
|
||||
protected final Model model;
|
||||
protected final OutputStream out;
|
||||
protected final Instant createdAt;
|
||||
|
||||
private long size = 0;
|
||||
protected boolean failed = false;
|
||||
|
@ -48,12 +54,23 @@ public class SegmentDownload implements Callable<SegmentDownload> {
|
|||
this.segment = segment;
|
||||
this.client = client;
|
||||
this.out = out;
|
||||
this.url = new URL(segment.url);
|
||||
this.url = URI.create(segment.url).toURL();
|
||||
this.createdAt = Instant.now();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SegmentDownload call() {
|
||||
var expiresAt = createdAt.plusSeconds(10);
|
||||
|
||||
for (int tries = 1; tries <= 3 && !Thread.currentThread().isInterrupted(); tries++) { // NOSONAR
|
||||
if (expiresAt.isBefore(Instant.now())) {
|
||||
// segment has sexpired, skip it
|
||||
LOG.warn("Segment for model {} is late {} seconds", model, Duration.between(expiresAt, Instant.now()));
|
||||
failed = true;
|
||||
exception = new Exception("Segment expired");
|
||||
break;
|
||||
}
|
||||
|
||||
Request request = createRequest();
|
||||
try (Response response = client.execute(request)) {
|
||||
handleResponse(response);
|
||||
|
|
Loading…
Reference in New Issue