Add automatic scaling of the scheduled thread pool

This commit is contained in:
0xb00bface 2020-12-28 17:35:40 +01:00
parent c79cc826d7
commit f86ba637b4
4 changed files with 66 additions and 2 deletions

View File

@ -46,6 +46,7 @@
<logger name="ctbrec.recorder.server.HlsServlet" level="INFO"/>
<logger name="ctbrec.recorder.server.RecorderServlet" level="INFO"/>
<logger name="ctbrec.recorder.RecordingFileMonitor" level="TRACE"/>
<logger name="ctbrec.recorder.ThreadPoolScaler" level="DEBUG"/>
<logger name="ctbrec.recorder.download.dash.DashDownload" level="DEBUG"/>
<logger name="ctbrec.ui.ExternalBrowser" level="DEBUG"/>
<logger name="ctbrec.ui.ThumbOverviewTab" level="DEBUG"/>

View File

@ -2,6 +2,7 @@ package ctbrec.recorder;
import static ctbrec.SubsequentAction.*;
import static ctbrec.event.Event.Type.*;
import static java.util.concurrent.TimeUnit.*;
import java.io.File;
import java.io.IOException;
@ -29,6 +30,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -69,6 +71,8 @@ public class NextGenLocalRecorder implements Recorder {
// thread pools for downloads and post-processing
private ScheduledExecutorService downloadPool;
private ThreadPoolScaler threadPoolScaler;
private ExecutorService segmentDownloadPool = new ThreadPoolExecutor(0, 1000, 30L, SECONDS, new SynchronousQueue<>(), createThreadFactory("SegmentDownload"));
private ExecutorService downloadCompletionPool = Executors.newFixedThreadPool(1, createThreadFactory("DownloadCompletionWorker"));
private BlockingQueue<ScheduledFuture<Recording>> downloadFutureQueue = new LinkedBlockingQueue<>();
private Map<ScheduledFuture<Recording>, Recording> downloadFutureRecordingMap = Collections.synchronizedMap(new HashMap<>());
@ -81,6 +85,7 @@ public class NextGenLocalRecorder implements Recorder {
public NextGenLocalRecorder(Config config, List<Site> sites) throws IOException {
this.config = config;
downloadPool = Executors.newScheduledThreadPool(config.getSettings().downloadThreadPoolSize, createThreadFactory("Download"));
threadPoolScaler = new ThreadPoolScaler((ThreadPoolExecutor) downloadPool, config.getSettings().downloadThreadPoolSize);
recordingManager = new RecordingManager(config, sites);
config.getSettings().models.stream().forEach(m -> {
if (m.getSite() != null) {
@ -127,10 +132,13 @@ public class NextGenLocalRecorder implements Recorder {
try {
ScheduledFuture<Recording> future = downloadFutureQueue.take();
rescheduleRecordingTask(future);
threadPoolScaler.tick();
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Error while getting recording result from download queue", e);
} catch(Exception e) {
LOG.error("Error in completion handler", e);
}
}
});
@ -273,7 +281,7 @@ public class NextGenLocalRecorder implements Recorder {
private Download createDownload(Model model) throws IOException {
Download download = model.createDownload();
download.init(config, model, Instant.now(), downloadPool);
download.init(config, model, Instant.now(), segmentDownloadPool);
Objects.requireNonNull(download.getStartTime(),
"At this point the download should have set a startTime. Make sure to set a startTime in " + download.getClass() + ".init()");
LOG.debug("Downloading with {}", download.getClass().getSimpleName());
@ -653,7 +661,7 @@ public class NextGenLocalRecorder implements Recorder {
for (Recording other : recordings) {
if (other.equals(recording)) {
Download download = other.getModel().createDownload();
download.init(Config.getInstance(), other.getModel(), other.getStartDate(), downloadPool);
download.init(Config.getInstance(), other.getModel(), other.getStartDate(), segmentDownloadPool);
other.setDownload(download);
other.setPostProcessedFile(null);
other.setStatus(State.WAITING);

View File

@ -0,0 +1,54 @@
package ctbrec.recorder;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ThreadPoolScaler {
private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolScaler.class);
private ThreadPoolExecutor threadPool;
private int configuredPoolSize;
private int[] values = new int[20];
private int index = -1;
private Instant lastAdjustment = Instant.EPOCH;
public ThreadPoolScaler(ThreadPoolExecutor threadPool, int configuredPoolSize) {
this.threadPool = threadPool;
this.configuredPoolSize = configuredPoolSize;
}
public void tick() {
values[getNextIndex()] = threadPool.getActiveCount();
int sum = 0;
for (int i = 0; i < values.length; i++) {
sum += values[i];
}
double average = sum / (double) values.length;
if (Duration.between(lastAdjustment, Instant.now()).toMillis() > 2000) {
int coreSize = threadPool.getCorePoolSize();
if (average > 0.75 * coreSize) {
threadPool.setCorePoolSize(coreSize + 1);
lastAdjustment = Instant.now();
} else if (average > 0.25 * coreSize) {
threadPool.setCorePoolSize(Math.max(configuredPoolSize, coreSize - 1));
lastAdjustment = Instant.now();
}
LOG.trace("Adjusted scheduler pool size to {}", threadPool.getCorePoolSize());
}
}
private synchronized int getNextIndex() {
index++;
if (index == values.length) {
index = 0;
}
return index;
}
}

View File

@ -38,6 +38,7 @@
<logger name="ctbrec.recorder.Chaturbate" level="INFO" />
<logger name="ctbrec.recorder.server.HlsServlet" level="INFO"/>
<logger name="ctbrec.recorder.server.RecorderServlet" level="INFO"/>
<logger name="ctbrec.recorder.ThreadPoolScaler" level="DEBUG"/>
<logger name="ctbrec.io.CookieJarImpl" level="INFO"/>
<logger name="org.eclipse.jetty" level="INFO" />
<logger name="streamer" level="ERROR" />