diff --git a/client/src/main/java/ctbrec/ui/CamrecApplication.java b/client/src/main/java/ctbrec/ui/CamrecApplication.java index bbac61ce..7218895a 100644 --- a/client/src/main/java/ctbrec/ui/CamrecApplication.java +++ b/client/src/main/java/ctbrec/ui/CamrecApplication.java @@ -2,6 +2,7 @@ package ctbrec.ui; import static ctbrec.EventBusHolder.*; import static ctbrec.EventBusHolder.EVENT_TYPE.*; +import static ctbrec.Model.STATUS.*; import java.io.BufferedReader; import java.io.File; @@ -32,6 +33,7 @@ import ctbrec.StringUtil; import ctbrec.Version; import ctbrec.io.HttpClient; import ctbrec.recorder.LocalRecorder; +import ctbrec.recorder.OnlineMonitor; import ctbrec.recorder.Recorder; import ctbrec.recorder.RemoteRecorder; import ctbrec.sites.Site; @@ -62,12 +64,14 @@ public class CamrecApplication extends Application { private Stage primaryStage; private Config config; private Recorder recorder; + private OnlineMonitor onlineMonitor; static HostServices hostServices; private SettingsTab settingsTab; private TabPane rootPane = new TabPane(); private List sites = new ArrayList<>(); public static HttpClient httpClient; + @Override public void start(Stage primaryStage) throws Exception { this.primaryStage = primaryStage; @@ -82,6 +86,8 @@ public class CamrecApplication extends Application { createHttpClient(); hostServices = getHostServices(); createRecorder(); + onlineMonitor = new OnlineMonitor(recorder); + onlineMonitor.start(); for (Site site : sites) { if(site.isEnabled()) { try { @@ -165,6 +171,7 @@ public class CamrecApplication extends Application { modelsTab.saveState(); recordingsTab.saveState(); settingsTab.saveConfig(); + onlineMonitor.shutdown(); recorder.shutdown(); for (Site site : sites) { if(site.isEnabled()) { @@ -219,12 +226,12 @@ public class CamrecApplication extends Application { EventBusHolder.BUS.register(new Object() { @Subscribe public void modelEvent(Map e) { - LOG.debug("Alert: {}", e); try { if (Objects.equals(e.get(EVENT), MODEL_STATUS_CHANGED)) { + LOG.debug("Alert: {}", e); Model.STATUS status = (Model.STATUS) e.get(STATUS); Model model = (Model) e.get(MODEL); - if (Objects.equals(Model.STATUS.ONLINE, status)) { + if (status == ONLINE) { Platform.runLater(() -> { String header = "Model Online"; String msg = model.getDisplayName() + " is now online"; diff --git a/common/src/main/java/ctbrec/recorder/LocalRecorder.java b/common/src/main/java/ctbrec/recorder/LocalRecorder.java index 1efdb1b1..85309816 100644 --- a/common/src/main/java/ctbrec/recorder/LocalRecorder.java +++ b/common/src/main/java/ctbrec/recorder/LocalRecorder.java @@ -7,13 +7,11 @@ import static ctbrec.Recording.STATUS.*; import java.io.File; import java.io.FilenameFilter; import java.io.IOException; -import java.net.SocketTimeoutException; import java.nio.file.FileStore; import java.nio.file.Files; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.text.SimpleDateFormat; -import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; @@ -26,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -34,6 +33,7 @@ import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.eventbus.Subscribe; import com.iheartradio.m3u8.ParseException; import com.iheartradio.m3u8.PlaylistException; @@ -44,7 +44,6 @@ import ctbrec.OS; import ctbrec.Recording; import ctbrec.Recording.STATUS; import ctbrec.io.HttpClient; -import ctbrec.io.HttpException; import ctbrec.io.StreamRedirectThread; import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException; import ctbrec.recorder.download.Download; @@ -62,7 +61,6 @@ public class LocalRecorder implements Recorder { private Map playlistGenerators = new HashMap<>(); private Config config; private ProcessMonitor processMonitor; - private OnlineMonitor onlineMonitor; private PostProcessingTrigger postProcessingTrigger; private volatile boolean recording = true; private List deleteInProgress = Collections.synchronizedList(new ArrayList<>()); @@ -83,19 +81,37 @@ public class LocalRecorder implements Recorder { recording = true; processMonitor = new ProcessMonitor(); processMonitor.start(); - onlineMonitor = new OnlineMonitor(); - onlineMonitor.start(); postProcessingTrigger = new PostProcessingTrigger(); if(Config.isServerMode()) { postProcessingTrigger.start(); } + registerEventBusListener(); + LOG.debug("Recorder initialized"); LOG.info("Models to record: {}", models); LOG.info("Saving recordings in {}", config.getSettings().recordingsDir); } + private void registerEventBusListener() { + EventBusHolder.BUS.register(new Object() { + @Subscribe + public void modelEvent(Map e) { + try { + if (Objects.equals(e.get(EVENT), MODEL_ONLINE)) { + Model model = (Model) e.get(MODEL); + if(!isSuspended(model) && !recordingProcesses.containsKey(model)) { + startRecordingProcess(model); + } + } + } catch (Exception e1) { + LOG.error("Error while handling model state changed event", e); + } + } + }); + } + @Override public void startRecording(Model model) { if (!models.contains(model)) { @@ -288,7 +304,6 @@ public class LocalRecorder implements Recorder { LOG.info("Shutting down"); recording = false; LOG.debug("Stopping monitor threads"); - onlineMonitor.running = false; processMonitor.running = false; postProcessingTrigger.running = false; LOG.debug("Stopping all recording processes"); @@ -424,67 +439,6 @@ public class LocalRecorder implements Recorder { } } - private class OnlineMonitor extends Thread { - private volatile boolean running = false; - - public OnlineMonitor() { - setName("OnlineMonitor"); - setDaemon(true); - } - - @Override - public void run() { - running = true; - while (running) { - Instant begin = Instant.now(); - List models = getModelsRecording(); - for (Model model : models) { - try { - boolean isOnline = model.isOnline(IGNORE_CACHE); - LOG.trace("Checking online state for {}: {}", model, (isOnline ? "online" : "offline")); - if (isOnline && !isSuspended(model) && !recordingProcesses.containsKey(model)) { - LOG.info("Model {}'s room back to public", model); - startRecordingProcess(model); - } - } catch (HttpException e) { - LOG.error("Couldn't check if model {} is online. HTTP Response: {} - {}", - model.getName(), e.getResponseCode(), e.getResponseMessage()); - } catch (SocketTimeoutException e) { - LOG.error("Couldn't check if model {} is online. Request timed out", model.getName()); - } catch (Exception e) { - LOG.error("Couldn't check if model {} is online", model.getName(), e); - } - } - Instant end = Instant.now(); - Duration timeCheckTook = Duration.between(begin, end); - LOG.trace("Online check for {} models took {} seconds", models.size(), timeCheckTook.getSeconds()); - - long sleepTime = Config.getInstance().getSettings().onlineCheckIntervalInSecs; - if(timeCheckTook.getSeconds() < sleepTime) { - try { - if (running) { - long millis = TimeUnit.SECONDS.toMillis(sleepTime - timeCheckTook.getSeconds()); - LOG.trace("Sleeping {}ms", millis); - Thread.sleep(millis); - } - } catch (InterruptedException e) { - LOG.trace("Sleep interrupted"); - } - } - } - LOG.debug(getName() + " terminated"); - } - } - - private void fireModelOnlineStateChanged(Model model, Model.STATUS status) { - Map evt = new HashMap<>(); - evt.put(EVENT, MODEL_STATUS_CHANGED); - evt.put(STATUS, status); - evt.put(MODEL, model); - EventBusHolder.BUS.post(evt); - LOG.debug("Event fired {}", evt); - } - private void fireRecordingStateChanged(Model model, Recording.STATUS status) { Map evt = new HashMap<>(); evt.put(EVENT, RECORDING_STATUS_CHANGED); diff --git a/common/src/main/java/ctbrec/recorder/OnlineMonitor.java b/common/src/main/java/ctbrec/recorder/OnlineMonitor.java new file mode 100644 index 00000000..5bd96975 --- /dev/null +++ b/common/src/main/java/ctbrec/recorder/OnlineMonitor.java @@ -0,0 +1,117 @@ +package ctbrec.recorder; + +import static ctbrec.EventBusHolder.*; +import static ctbrec.EventBusHolder.EVENT_TYPE.*; +import static ctbrec.Model.STATUS.*; + +import java.net.SocketTimeoutException; +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import ctbrec.Config; +import ctbrec.EventBusHolder; +import ctbrec.Model; +import ctbrec.Model.STATUS; +import ctbrec.io.HttpException; + +public class OnlineMonitor extends Thread { + private static final transient Logger LOG = LoggerFactory.getLogger(OnlineMonitor.class); + private static final boolean IGNORE_CACHE = true; + + private volatile boolean running = false; + private Recorder recorder; + + private Map states = new HashMap<>(); + + public OnlineMonitor(Recorder recorder) { + this.recorder = recorder; + setName("OnlineMonitor"); + setDaemon(true); + } + + @Override + public void run() { + running = true; + while (running) { + Instant begin = Instant.now(); + List models = recorder.getModelsRecording(); + + // remove models, which are not recorded anymore + for (Iterator iterator = states.keySet().iterator(); iterator.hasNext();) { + Model model = iterator.next(); + if(!models.contains(model)) { + iterator.remove(); + } + } + + // update the currently recorded models + for (Model model : models) { + try { + if(model.isOnline(IGNORE_CACHE)) { + fireModelOnline(model); + } + STATUS state = model.getOnlineState(false); + STATUS oldState = states.getOrDefault(model, UNKNOWN); + states.put(model, state); + if(state != oldState) { + fireModelOnlineStateChanged(model, oldState, state); + } + } catch (HttpException e) { + LOG.error("Couldn't check if model {} is online. HTTP Response: {} - {}", + model.getName(), e.getResponseCode(), e.getResponseMessage()); + } catch (SocketTimeoutException e) { + LOG.error("Couldn't check if model {} is online. Request timed out", model.getName()); + } catch (Exception e) { + LOG.error("Couldn't check if model {} is online", model.getName(), e); + } + } + Instant end = Instant.now(); + Duration timeCheckTook = Duration.between(begin, end); + LOG.trace("Online check for {} models took {} seconds", models.size(), timeCheckTook.getSeconds()); + + long sleepTime = Config.getInstance().getSettings().onlineCheckIntervalInSecs; + if(timeCheckTook.getSeconds() < sleepTime) { + try { + if (running) { + long millis = TimeUnit.SECONDS.toMillis(sleepTime - timeCheckTook.getSeconds()); + LOG.trace("Sleeping {}ms", millis); + Thread.sleep(millis); + } + } catch (InterruptedException e) { + LOG.trace("Sleep interrupted"); + } + } + } + LOG.debug(getName() + " terminated"); + } + + private void fireModelOnline(Model model) { + Map evt = new HashMap<>(); + evt.put(EVENT, MODEL_ONLINE); + evt.put(MODEL, model); + EventBusHolder.BUS.post(evt); + } + + private void fireModelOnlineStateChanged(Model model, STATUS oldStatus, STATUS newStatus) { + Map evt = new HashMap<>(); + evt.put(EVENT, MODEL_STATUS_CHANGED); + evt.put(STATUS, newStatus); + evt.put(OLD, oldStatus); + evt.put(MODEL, model); + EventBusHolder.BUS.post(evt); + LOG.debug("Event fired {}", evt); + } + + public void shutdown() { + running = false; + interrupt(); + } +} \ No newline at end of file diff --git a/server/src/main/java/ctbrec/recorder/server/HttpServer.java b/server/src/main/java/ctbrec/recorder/server/HttpServer.java index 5767b72e..ab07dabc 100644 --- a/server/src/main/java/ctbrec/recorder/server/HttpServer.java +++ b/server/src/main/java/ctbrec/recorder/server/HttpServer.java @@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory; import ctbrec.Config; import ctbrec.recorder.LocalRecorder; +import ctbrec.recorder.OnlineMonitor; import ctbrec.recorder.Recorder; import ctbrec.sites.Site; import ctbrec.sites.bonga.BongaCams; @@ -30,6 +31,7 @@ public class HttpServer { private static final transient Logger LOG = LoggerFactory.getLogger(HttpServer.class); private Recorder recorder; + private OnlineMonitor onlineMonitor; private Config config; private Server server = new Server(); private List sites = new ArrayList<>(); @@ -54,6 +56,8 @@ public class HttpServer { LOG.info("HMAC authentication is enabled"); } recorder = new LocalRecorder(config); + OnlineMonitor monitor = new OnlineMonitor(recorder); + monitor.start(); for (Site site : sites) { if(site.isEnabled()) { site.init(); @@ -75,6 +79,9 @@ public class HttpServer { @Override public void run() { LOG.info("Shutting down"); + if(onlineMonitor != null) { + onlineMonitor.shutdown(); + } if(recorder != null) { recorder.shutdown(); }