From 52bc8a6b6494881aaa6dff7ee9c4f2772e38cc57 Mon Sep 17 00:00:00 2001 From: 0xboobface <0xboobface@gmail.com> Date: Mon, 30 Dec 2019 18:30:31 +0100 Subject: [PATCH] Check the online state of models in parallel For each site a SingleThreadExecutor is used to check the online state of the site's models in series. These SingleThreadExecutors run in parallel to speed the online check up. --- .../java/ctbrec/recorder/OnlineMonitor.java | 79 +++++++++++++------ 1 file changed, 56 insertions(+), 23 deletions(-) diff --git a/common/src/main/java/ctbrec/recorder/OnlineMonitor.java b/common/src/main/java/ctbrec/recorder/OnlineMonitor.java index c5d32933..0970bd75 100644 --- a/common/src/main/java/ctbrec/recorder/OnlineMonitor.java +++ b/common/src/main/java/ctbrec/recorder/OnlineMonitor.java @@ -8,8 +8,13 @@ import java.time.Duration; import java.time.Instant; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -31,6 +36,8 @@ public class OnlineMonitor extends Thread { private Map states = new HashMap<>(); + private Map executors = new HashMap<>(); + // TODO divide models into buckets by their site in each iteration a model of each bucket can be testes in parallel // this will speed up the testing, but not hammer the sites public OnlineMonitor(Recorder recorder) { @@ -69,35 +76,58 @@ public class OnlineMonitor extends Thread { } private void updateModels(List models) { + // submit online check jobs to the executor for the model's site + List> futures = new LinkedList<>(); for (Model model : models) { - updateModel(model); + futures.add(updateModel(model)); + } + // wait for all jobs to finish + for (Future future : futures) { + try { + future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + LOG.info("Error while checking online state", e); + } } } - private void updateModel(Model model) { - try { - if(model.isOnline(IGNORE_CACHE)) { - EventBusHolder.BUS.post(new ModelIsOnlineEvent(model)); - } - Model.State state = model.getOnlineState(false); - Model.State oldState = states.getOrDefault(model, UNKNOWN); - states.put(model, state); - if(state != oldState) { - EventBusHolder.BUS.post(new ModelStateChangedEvent(model, oldState, state)); - } - } catch (HttpException e) { - LOG.error("Couldn't check if model {} is online. HTTP Response: {} - {}", - model.getName(), e.getResponseCode(), e.getResponseMessage()); - } catch (SocketTimeoutException e) { - LOG.error("Couldn't check if model {} is online. Request timed out", model.getName()); - } catch (InterruptedException | InterruptedIOException e) { - Thread.currentThread().interrupt(); - if(running) { + private Future updateModel(Model model) { + final String siteName = model.getSite().getName(); + ExecutorService executor = executors.computeIfAbsent(siteName, name -> Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r); + t.setName("OnlineMonitorThread-" + siteName); + t.setPriority(MIN_PRIORITY); + t.setDaemon(true); + return t; + })); + + return executor.submit(() -> { + try { + if (model.isOnline(IGNORE_CACHE)) { + EventBusHolder.BUS.post(new ModelIsOnlineEvent(model)); + } + Model.State state = model.getOnlineState(false); + LOG.debug("Model online state: {}", state); + Model.State oldState = states.getOrDefault(model, UNKNOWN); + states.put(model, state); + if (state != oldState) { + EventBusHolder.BUS.post(new ModelStateChangedEvent(model, oldState, state)); + } + } catch (HttpException e) { + LOG.error("Couldn't check if model {} is online. HTTP Response: {} - {}", model.getName(), e.getResponseCode(), e.getResponseMessage()); + } catch (SocketTimeoutException e) { + LOG.error("Couldn't check if model {} is online. Request timed out", model.getName()); + } catch (InterruptedException | InterruptedIOException e) { + Thread.currentThread().interrupt(); + if (running) { + LOG.error("Couldn't check if model {} is online", model.getName(), e); + } + } catch (Exception e) { LOG.error("Couldn't check if model {} is online", model.getName(), e); } - } catch (Exception e) { - LOG.error("Couldn't check if model {} is online", model.getName(), e); - } + }); } private void suspendUntilNextIteration(List models, Duration timeCheckTook) { @@ -119,6 +149,9 @@ public class OnlineMonitor extends Thread { public void shutdown() { running = false; + for (ExecutorService executor : executors.values()) { + executor.shutdownNow(); + } interrupt(); } } \ No newline at end of file