forked from j62/ctbrec
Adjust ThreadPoolScaler behavior
- ramp up at 65% thread usage - ramp down at 15% thread usage - added cooldowns for ramp down, 10 secs after ramp down, 30 seconds after ramp up
This commit is contained in:
parent
d1764389f0
commit
602c81d18d
|
@ -15,8 +15,8 @@ public class ThreadPoolScaler {
|
||||||
|
|
||||||
private int[] values = new int[20];
|
private int[] values = new int[20];
|
||||||
private int index = -1;
|
private int index = -1;
|
||||||
private Instant lastAdjustment = Instant.EPOCH;
|
private Instant lastAdjustment = Instant.now();
|
||||||
|
private Instant downScaleCoolDown = Instant.EPOCH;
|
||||||
|
|
||||||
public ThreadPoolScaler(ThreadPoolExecutor threadPool, int configuredPoolSize) {
|
public ThreadPoolScaler(ThreadPoolExecutor threadPool, int configuredPoolSize) {
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
|
@ -25,23 +25,38 @@ public class ThreadPoolScaler {
|
||||||
|
|
||||||
public void tick() {
|
public void tick() {
|
||||||
values[getNextIndex()] = threadPool.getActiveCount();
|
values[getNextIndex()] = threadPool.getActiveCount();
|
||||||
|
adjustPoolSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void adjustPoolSize() {
|
||||||
|
Instant now = Instant.now();
|
||||||
|
if (Duration.between(lastAdjustment, now).toMillis() > 500) {
|
||||||
|
double average = calculateAverage();
|
||||||
|
int coreSize = threadPool.getCorePoolSize();
|
||||||
|
if (average > 0.65 * coreSize) {
|
||||||
|
threadPool.setCorePoolSize(coreSize + 1);
|
||||||
|
downScaleCoolDown = now.plusSeconds(30);
|
||||||
|
LOG.trace("Adjusted scheduler pool size to {}", threadPool.getCorePoolSize());
|
||||||
|
} else if (average < 0.15 * coreSize) {
|
||||||
|
int newValue = Math.max(configuredPoolSize, coreSize - 1);
|
||||||
|
if (threadPool.getCorePoolSize() != newValue && now.isAfter(downScaleCoolDown)) {
|
||||||
|
threadPool.setCorePoolSize(newValue);
|
||||||
|
downScaleCoolDown = now.plusSeconds(10);
|
||||||
|
LOG.trace("Adjusted scheduler pool size to {}", threadPool.getCorePoolSize());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
lastAdjustment = now;
|
||||||
|
LOG.trace("Thread pool size is {}", threadPool.getCorePoolSize());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private double calculateAverage() {
|
||||||
int sum = 0;
|
int sum = 0;
|
||||||
for (int i = 0; i < values.length; i++) {
|
for (int i = 0; i < values.length; i++) {
|
||||||
sum += values[i];
|
sum += values[i];
|
||||||
}
|
}
|
||||||
double average = sum / (double) values.length;
|
double average = sum / (double) values.length;
|
||||||
|
return average;
|
||||||
if (Duration.between(lastAdjustment, Instant.now()).toMillis() > 2000) {
|
|
||||||
int coreSize = threadPool.getCorePoolSize();
|
|
||||||
if (average > 0.75 * coreSize) {
|
|
||||||
threadPool.setCorePoolSize(coreSize + 1);
|
|
||||||
lastAdjustment = Instant.now();
|
|
||||||
} else if (average > 0.25 * coreSize) {
|
|
||||||
threadPool.setCorePoolSize(Math.max(configuredPoolSize, coreSize - 1));
|
|
||||||
lastAdjustment = Instant.now();
|
|
||||||
}
|
|
||||||
LOG.trace("Adjusted scheduler pool size to {}", threadPool.getCorePoolSize());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized int getNextIndex() {
|
private synchronized int getNextIndex() {
|
||||||
|
|
Loading…
Reference in New Issue