forked from j62/ctbrec
Check the online state of models in parallel
For each site a SingleThreadExecutor is used to check the online state of the site's models in series. These SingleThreadExecutors run in parallel to speed the online check up.
This commit is contained in:
parent
24cb3ecf99
commit
52bc8a6b64
|
@ -8,8 +8,13 @@ import java.time.Duration;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -31,6 +36,8 @@ public class OnlineMonitor extends Thread {
|
||||||
|
|
||||||
private Map<Model, Model.State> states = new HashMap<>();
|
private Map<Model, Model.State> states = new HashMap<>();
|
||||||
|
|
||||||
|
private Map<String, ExecutorService> executors = new HashMap<>();
|
||||||
|
|
||||||
// TODO divide models into buckets by their site in each iteration a model of each bucket can be testes in parallel
|
// TODO divide models into buckets by their site in each iteration a model of each bucket can be testes in parallel
|
||||||
// this will speed up the testing, but not hammer the sites
|
// this will speed up the testing, but not hammer the sites
|
||||||
public OnlineMonitor(Recorder recorder) {
|
public OnlineMonitor(Recorder recorder) {
|
||||||
|
@ -69,35 +76,58 @@ public class OnlineMonitor extends Thread {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateModels(List<Model> models) {
|
private void updateModels(List<Model> models) {
|
||||||
|
// submit online check jobs to the executor for the model's site
|
||||||
|
List<Future<?>> futures = new LinkedList<>();
|
||||||
for (Model model : models) {
|
for (Model model : models) {
|
||||||
updateModel(model);
|
futures.add(updateModel(model));
|
||||||
|
}
|
||||||
|
// wait for all jobs to finish
|
||||||
|
for (Future<?> future : futures) {
|
||||||
|
try {
|
||||||
|
future.get();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
LOG.info("Error while checking online state", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateModel(Model model) {
|
private Future<?> updateModel(Model model) {
|
||||||
try {
|
final String siteName = model.getSite().getName();
|
||||||
if(model.isOnline(IGNORE_CACHE)) {
|
ExecutorService executor = executors.computeIfAbsent(siteName, name -> Executors.newSingleThreadExecutor(r -> {
|
||||||
EventBusHolder.BUS.post(new ModelIsOnlineEvent(model));
|
Thread t = new Thread(r);
|
||||||
}
|
t.setName("OnlineMonitorThread-" + siteName);
|
||||||
Model.State state = model.getOnlineState(false);
|
t.setPriority(MIN_PRIORITY);
|
||||||
Model.State oldState = states.getOrDefault(model, UNKNOWN);
|
t.setDaemon(true);
|
||||||
states.put(model, state);
|
return t;
|
||||||
if(state != oldState) {
|
}));
|
||||||
EventBusHolder.BUS.post(new ModelStateChangedEvent(model, oldState, state));
|
|
||||||
}
|
return executor.submit(() -> {
|
||||||
} catch (HttpException e) {
|
try {
|
||||||
LOG.error("Couldn't check if model {} is online. HTTP Response: {} - {}",
|
if (model.isOnline(IGNORE_CACHE)) {
|
||||||
model.getName(), e.getResponseCode(), e.getResponseMessage());
|
EventBusHolder.BUS.post(new ModelIsOnlineEvent(model));
|
||||||
} catch (SocketTimeoutException e) {
|
}
|
||||||
LOG.error("Couldn't check if model {} is online. Request timed out", model.getName());
|
Model.State state = model.getOnlineState(false);
|
||||||
} catch (InterruptedException | InterruptedIOException e) {
|
LOG.debug("Model online state: {}", state);
|
||||||
Thread.currentThread().interrupt();
|
Model.State oldState = states.getOrDefault(model, UNKNOWN);
|
||||||
if(running) {
|
states.put(model, state);
|
||||||
|
if (state != oldState) {
|
||||||
|
EventBusHolder.BUS.post(new ModelStateChangedEvent(model, oldState, state));
|
||||||
|
}
|
||||||
|
} catch (HttpException e) {
|
||||||
|
LOG.error("Couldn't check if model {} is online. HTTP Response: {} - {}", model.getName(), e.getResponseCode(), e.getResponseMessage());
|
||||||
|
} catch (SocketTimeoutException e) {
|
||||||
|
LOG.error("Couldn't check if model {} is online. Request timed out", model.getName());
|
||||||
|
} catch (InterruptedException | InterruptedIOException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
if (running) {
|
||||||
|
LOG.error("Couldn't check if model {} is online", model.getName(), e);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
LOG.error("Couldn't check if model {} is online", model.getName(), e);
|
LOG.error("Couldn't check if model {} is online", model.getName(), e);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
});
|
||||||
LOG.error("Couldn't check if model {} is online", model.getName(), e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void suspendUntilNextIteration(List<Model> models, Duration timeCheckTook) {
|
private void suspendUntilNextIteration(List<Model> models, Duration timeCheckTook) {
|
||||||
|
@ -119,6 +149,9 @@ public class OnlineMonitor extends Thread {
|
||||||
|
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
running = false;
|
running = false;
|
||||||
|
for (ExecutorService executor : executors.values()) {
|
||||||
|
executor.shutdownNow();
|
||||||
|
}
|
||||||
interrupt();
|
interrupt();
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue