diff --git a/client/src/main/resources/logback.xml b/client/src/main/resources/logback.xml index e83c831b..06551556 100644 --- a/client/src/main/resources/logback.xml +++ b/client/src/main/resources/logback.xml @@ -46,6 +46,7 @@ + diff --git a/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java index c46146df..ee5a3c43 100644 --- a/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java +++ b/common/src/main/java/ctbrec/recorder/NextGenLocalRecorder.java @@ -2,6 +2,7 @@ package ctbrec.recorder; import static ctbrec.SubsequentAction.*; import static ctbrec.event.Event.Type.*; +import static java.util.concurrent.TimeUnit.*; import java.io.File; import java.io.IOException; @@ -29,6 +30,7 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -69,6 +71,8 @@ public class NextGenLocalRecorder implements Recorder { // thread pools for downloads and post-processing private ScheduledExecutorService downloadPool; + private ThreadPoolScaler threadPoolScaler; + private ExecutorService segmentDownloadPool = new ThreadPoolExecutor(0, 1000, 30L, SECONDS, new SynchronousQueue<>(), createThreadFactory("SegmentDownload")); private ExecutorService downloadCompletionPool = Executors.newFixedThreadPool(1, createThreadFactory("DownloadCompletionWorker")); private BlockingQueue> downloadFutureQueue = new LinkedBlockingQueue<>(); private Map, Recording> downloadFutureRecordingMap = Collections.synchronizedMap(new HashMap<>()); @@ -81,6 +85,7 @@ public class NextGenLocalRecorder implements Recorder { public NextGenLocalRecorder(Config config, List sites) throws IOException { this.config = config; downloadPool = Executors.newScheduledThreadPool(config.getSettings().downloadThreadPoolSize, createThreadFactory("Download")); + threadPoolScaler = new ThreadPoolScaler((ThreadPoolExecutor) downloadPool, config.getSettings().downloadThreadPoolSize); recordingManager = new RecordingManager(config, sites); config.getSettings().models.stream().forEach(m -> { if (m.getSite() != null) { @@ -127,10 +132,13 @@ public class NextGenLocalRecorder implements Recorder { try { ScheduledFuture future = downloadFutureQueue.take(); rescheduleRecordingTask(future); + threadPoolScaler.tick(); Thread.sleep(1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.error("Error while getting recording result from download queue", e); + } catch(Exception e) { + LOG.error("Error in completion handler", e); } } }); @@ -273,7 +281,7 @@ public class NextGenLocalRecorder implements Recorder { private Download createDownload(Model model) throws IOException { Download download = model.createDownload(); - download.init(config, model, Instant.now(), downloadPool); + download.init(config, model, Instant.now(), segmentDownloadPool); Objects.requireNonNull(download.getStartTime(), "At this point the download should have set a startTime. Make sure to set a startTime in " + download.getClass() + ".init()"); LOG.debug("Downloading with {}", download.getClass().getSimpleName()); @@ -653,7 +661,7 @@ public class NextGenLocalRecorder implements Recorder { for (Recording other : recordings) { if (other.equals(recording)) { Download download = other.getModel().createDownload(); - download.init(Config.getInstance(), other.getModel(), other.getStartDate(), downloadPool); + download.init(Config.getInstance(), other.getModel(), other.getStartDate(), segmentDownloadPool); other.setDownload(download); other.setPostProcessedFile(null); other.setStatus(State.WAITING); diff --git a/common/src/main/java/ctbrec/recorder/ThreadPoolScaler.java b/common/src/main/java/ctbrec/recorder/ThreadPoolScaler.java new file mode 100644 index 00000000..72de156f --- /dev/null +++ b/common/src/main/java/ctbrec/recorder/ThreadPoolScaler.java @@ -0,0 +1,54 @@ +package ctbrec.recorder; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.ThreadPoolExecutor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ThreadPoolScaler { + private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolScaler.class); + + private ThreadPoolExecutor threadPool; + private int configuredPoolSize; + + private int[] values = new int[20]; + private int index = -1; + private Instant lastAdjustment = Instant.EPOCH; + + + public ThreadPoolScaler(ThreadPoolExecutor threadPool, int configuredPoolSize) { + this.threadPool = threadPool; + this.configuredPoolSize = configuredPoolSize; + } + + public void tick() { + values[getNextIndex()] = threadPool.getActiveCount(); + int sum = 0; + for (int i = 0; i < values.length; i++) { + sum += values[i]; + } + double average = sum / (double) values.length; + + if (Duration.between(lastAdjustment, Instant.now()).toMillis() > 2000) { + int coreSize = threadPool.getCorePoolSize(); + if (average > 0.75 * coreSize) { + threadPool.setCorePoolSize(coreSize + 1); + lastAdjustment = Instant.now(); + } else if (average > 0.25 * coreSize) { + threadPool.setCorePoolSize(Math.max(configuredPoolSize, coreSize - 1)); + lastAdjustment = Instant.now(); + } + LOG.trace("Adjusted scheduler pool size to {}", threadPool.getCorePoolSize()); + } + } + + private synchronized int getNextIndex() { + index++; + if (index == values.length) { + index = 0; + } + return index; + } +} diff --git a/server/src/main/resources/logback.xml b/server/src/main/resources/logback.xml index 39d1afc6..cedd8bbe 100644 --- a/server/src/main/resources/logback.xml +++ b/server/src/main/resources/logback.xml @@ -38,6 +38,7 @@ +