diff --git a/common/src/main/java/ctbrec/io/HttpClient.java b/common/src/main/java/ctbrec/io/HttpClient.java index ca38017c..5dc48ea2 100644 --- a/common/src/main/java/ctbrec/io/HttpClient.java +++ b/common/src/main/java/ctbrec/io/HttpClient.java @@ -38,7 +38,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; @Slf4j public abstract class HttpClient { @Getter - private static final ConnectionPool GLOBAL_HTTP_CONN_POOL = new ConnectionPool(10, 2, TimeUnit.MINUTES); + private static final ConnectionPool GLOBAL_HTTP_CONN_POOL = new ConnectionPool(256, 2, TimeUnit.MINUTES); @Getter protected CookieJarImpl cookieJar; diff --git a/common/src/main/java/ctbrec/recorder/SimplifiedLocalRecorder.java b/common/src/main/java/ctbrec/recorder/SimplifiedLocalRecorder.java index 536d2106..4161fd67 100644 --- a/common/src/main/java/ctbrec/recorder/SimplifiedLocalRecorder.java +++ b/common/src/main/java/ctbrec/recorder/SimplifiedLocalRecorder.java @@ -37,6 +37,7 @@ import static ctbrec.SubsequentAction.*; import static ctbrec.event.Event.Type.MODEL_ONLINE; import static java.lang.Thread.MAX_PRIORITY; import static java.lang.Thread.MIN_PRIORITY; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; @Slf4j @@ -62,7 +63,7 @@ public class SimplifiedLocalRecorder implements Recorder { private final ThreadPoolExecutor postProcessing; private final Thread maintenanceThread; private long lastSpaceCheck; - + public SimplifiedLocalRecorder(Config config, List sites) throws IOException { this.config = config; @@ -260,6 +261,7 @@ public class SimplifiedLocalRecorder implements Recorder { } catch (RejectedExecutionException e) { log.error("Could not start post-processing for {} {}:{}. Execution rejected by thread pool", recording, recording.getModel().getSite().getName(), recording.getModel().getDisplayName()); } + log.trace("submitPostProcessingJob(): {}", postProcessing); } private void runPostProcessing(Recording recording) throws IOException, InterruptedException { @@ -921,7 +923,7 @@ public class SimplifiedLocalRecorder implements Recorder { log.info("Resuming recorder"); running = true; } - + @Override public boolean isForcePriority(Model model) { return findModel(model).map(Model::isForcePriority).orElse(false); diff --git a/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload.java index c09f5ab7..fb5b1914 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/AbstractHlsDownload.java @@ -302,7 +302,7 @@ public abstract class AbstractHlsDownload extends AbstractDownload { throw new HttpException(response.code(), response.message()); } } catch (SocketTimeoutException e) { - LOG.debug("Playlist request timed out ({}ms) for model {} {} time{}", config.getSettings().playlistRequestTimeout, model, + LOG.debug("Playlist request timed out ({}ms) for model {}:{} {} time{}", config.getSettings().playlistRequestTimeout, model.getSite().getName(), model, ++consecutivePlaylistTimeouts, (consecutivePlaylistTimeouts > 1) ? 's' : ""); // times out, return an empty playlist, so that the process can continue without wasting much more time recordingEvents.add(RecordingEvent.of("Playlist request timed out " + consecutivePlaylistTimeouts)); @@ -310,7 +310,7 @@ public abstract class AbstractHlsDownload extends AbstractDownload { } catch (Exception e) { consecutivePlaylistErrors++; throw e; - } + } } private SegmentPlaylist parsePlaylist(String segmentPlaylistUrl, InputStream inputStream) throws IOException, ParseException, PlaylistException { diff --git a/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java index ceedd7cf..afba7a28 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/MergedFfmpegHlsDownload.java @@ -56,7 +56,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { startFfmpegProcess(targetFile); if (ffmpegProcess == null) { throw new ProcessExitedUncleanException("Couldn't spawn FFmpeg"); - } + } } @Override @@ -81,6 +81,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { downloadExecutor.submit(() -> { ffmpegStreamLock.lock(); try { + LOG.trace("Starting streaming segments to file {}", targetFile); while (!queue.isEmpty() && !Thread.currentThread().isInterrupted()) { try { Future future = queue.peek(); @@ -95,11 +96,13 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { break; } } catch (InterruptedException e) { + LOG.trace("Segment download interrupted for model {}", model, e); Thread.currentThread().interrupt(); } catch (Exception e) { LOG.error("Segment download failed for model {}", model, e); } } + LOG.trace("Finishing streaming segments to file {}", targetFile); } finally { ffmpegStreamLock.unlock(); } @@ -146,6 +149,7 @@ public class MergedFfmpegHlsDownload extends AbstractHlsDownload { @Override protected void execute(SegmentDownload segmentDownload) { queue.add(segmentDownloadService.submit(segmentDownload)); + LOG.trace("Enqueuing segment for file {}", targetFile); } @Override diff --git a/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java b/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java index 6cb82d23..df3d6982 100644 --- a/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java +++ b/common/src/main/java/ctbrec/recorder/download/hls/SegmentDownload.java @@ -12,10 +12,13 @@ import okhttp3.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.datatype.jsr310.deser.DurationDeserializer; + import javax.crypto.NoSuchPaddingException; import java.io.*; import java.net.MalformedURLException; import java.net.URL; +import java.net.URI; import java.security.InvalidAlgorithmParameterException; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; @@ -23,6 +26,8 @@ import java.util.HashMap; import java.util.Objects; import java.util.Optional; import java.util.concurrent.Callable; +import java.time.Instant; +import java.time.Duration; import static ctbrec.ErrorMessages.HTTP_RESPONSE_BODY_IS_NULL; import static ctbrec.recorder.download.hls.AbstractHlsDownload.addHeaders; @@ -36,6 +41,7 @@ public class SegmentDownload implements Callable { protected final Segment segment; protected final Model model; protected final OutputStream out; + protected final Instant createdAt; private long size = 0; protected boolean failed = false; @@ -48,12 +54,23 @@ public class SegmentDownload implements Callable { this.segment = segment; this.client = client; this.out = out; - this.url = new URL(segment.url); + this.url = URI.create(segment.url).toURL(); + this.createdAt = Instant.now(); } @Override public SegmentDownload call() { + var expiresAt = createdAt.plusSeconds(10); + for (int tries = 1; tries <= 3 && !Thread.currentThread().isInterrupted(); tries++) { // NOSONAR + if (expiresAt.isBefore(Instant.now())) { + // segment has sexpired, skip it + LOG.warn("Segment for model {} is late {} seconds", model, Duration.between(expiresAt, Instant.now())); + failed = true; + exception = new Exception("Segment expired"); + break; + } + Request request = createRequest(); try (Response response = client.execute(request)) { handleResponse(response);