forked from j62/ctbrec
1
0
Fork 0

Pull OnlineMonitor out of LocalRecorder

This commit is contained in:
0xboobface 2018-12-05 21:01:33 +01:00
parent 093b36270a
commit 69544a7a60
4 changed files with 155 additions and 70 deletions

View File

@ -2,6 +2,7 @@ package ctbrec.ui;
import static ctbrec.EventBusHolder.*;
import static ctbrec.EventBusHolder.EVENT_TYPE.*;
import static ctbrec.Model.STATUS.*;
import java.io.BufferedReader;
import java.io.File;
@ -32,6 +33,7 @@ import ctbrec.StringUtil;
import ctbrec.Version;
import ctbrec.io.HttpClient;
import ctbrec.recorder.LocalRecorder;
import ctbrec.recorder.OnlineMonitor;
import ctbrec.recorder.Recorder;
import ctbrec.recorder.RemoteRecorder;
import ctbrec.sites.Site;
@ -62,12 +64,14 @@ public class CamrecApplication extends Application {
private Stage primaryStage;
private Config config;
private Recorder recorder;
private OnlineMonitor onlineMonitor;
static HostServices hostServices;
private SettingsTab settingsTab;
private TabPane rootPane = new TabPane();
private List<Site> sites = new ArrayList<>();
public static HttpClient httpClient;
@Override
public void start(Stage primaryStage) throws Exception {
this.primaryStage = primaryStage;
@ -82,6 +86,8 @@ public class CamrecApplication extends Application {
createHttpClient();
hostServices = getHostServices();
createRecorder();
onlineMonitor = new OnlineMonitor(recorder);
onlineMonitor.start();
for (Site site : sites) {
if(site.isEnabled()) {
try {
@ -165,6 +171,7 @@ public class CamrecApplication extends Application {
modelsTab.saveState();
recordingsTab.saveState();
settingsTab.saveConfig();
onlineMonitor.shutdown();
recorder.shutdown();
for (Site site : sites) {
if(site.isEnabled()) {
@ -219,12 +226,12 @@ public class CamrecApplication extends Application {
EventBusHolder.BUS.register(new Object() {
@Subscribe
public void modelEvent(Map<String, Object> e) {
LOG.debug("Alert: {}", e);
try {
if (Objects.equals(e.get(EVENT), MODEL_STATUS_CHANGED)) {
LOG.debug("Alert: {}", e);
Model.STATUS status = (Model.STATUS) e.get(STATUS);
Model model = (Model) e.get(MODEL);
if (Objects.equals(Model.STATUS.ONLINE, status)) {
if (status == ONLINE) {
Platform.runLater(() -> {
String header = "Model Online";
String msg = model.getDisplayName() + " is now online";

View File

@ -7,13 +7,11 @@ import static ctbrec.Recording.STATUS.*;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
@ -26,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@ -34,6 +33,7 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.eventbus.Subscribe;
import com.iheartradio.m3u8.ParseException;
import com.iheartradio.m3u8.PlaylistException;
@ -44,7 +44,6 @@ import ctbrec.OS;
import ctbrec.Recording;
import ctbrec.Recording.STATUS;
import ctbrec.io.HttpClient;
import ctbrec.io.HttpException;
import ctbrec.io.StreamRedirectThread;
import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException;
import ctbrec.recorder.download.Download;
@ -62,7 +61,6 @@ public class LocalRecorder implements Recorder {
private Map<File, PlaylistGenerator> playlistGenerators = new HashMap<>();
private Config config;
private ProcessMonitor processMonitor;
private OnlineMonitor onlineMonitor;
private PostProcessingTrigger postProcessingTrigger;
private volatile boolean recording = true;
private List<File> deleteInProgress = Collections.synchronizedList(new ArrayList<>());
@ -83,19 +81,37 @@ public class LocalRecorder implements Recorder {
recording = true;
processMonitor = new ProcessMonitor();
processMonitor.start();
onlineMonitor = new OnlineMonitor();
onlineMonitor.start();
postProcessingTrigger = new PostProcessingTrigger();
if(Config.isServerMode()) {
postProcessingTrigger.start();
}
registerEventBusListener();
LOG.debug("Recorder initialized");
LOG.info("Models to record: {}", models);
LOG.info("Saving recordings in {}", config.getSettings().recordingsDir);
}
private void registerEventBusListener() {
EventBusHolder.BUS.register(new Object() {
@Subscribe
public void modelEvent(Map<String, Object> e) {
try {
if (Objects.equals(e.get(EVENT), MODEL_ONLINE)) {
Model model = (Model) e.get(MODEL);
if(!isSuspended(model) && !recordingProcesses.containsKey(model)) {
startRecordingProcess(model);
}
}
} catch (Exception e1) {
LOG.error("Error while handling model state changed event", e);
}
}
});
}
@Override
public void startRecording(Model model) {
if (!models.contains(model)) {
@ -288,7 +304,6 @@ public class LocalRecorder implements Recorder {
LOG.info("Shutting down");
recording = false;
LOG.debug("Stopping monitor threads");
onlineMonitor.running = false;
processMonitor.running = false;
postProcessingTrigger.running = false;
LOG.debug("Stopping all recording processes");
@ -424,67 +439,6 @@ public class LocalRecorder implements Recorder {
}
}
private class OnlineMonitor extends Thread {
private volatile boolean running = false;
public OnlineMonitor() {
setName("OnlineMonitor");
setDaemon(true);
}
@Override
public void run() {
running = true;
while (running) {
Instant begin = Instant.now();
List<Model> models = getModelsRecording();
for (Model model : models) {
try {
boolean isOnline = model.isOnline(IGNORE_CACHE);
LOG.trace("Checking online state for {}: {}", model, (isOnline ? "online" : "offline"));
if (isOnline && !isSuspended(model) && !recordingProcesses.containsKey(model)) {
LOG.info("Model {}'s room back to public", model);
startRecordingProcess(model);
}
} catch (HttpException e) {
LOG.error("Couldn't check if model {} is online. HTTP Response: {} - {}",
model.getName(), e.getResponseCode(), e.getResponseMessage());
} catch (SocketTimeoutException e) {
LOG.error("Couldn't check if model {} is online. Request timed out", model.getName());
} catch (Exception e) {
LOG.error("Couldn't check if model {} is online", model.getName(), e);
}
}
Instant end = Instant.now();
Duration timeCheckTook = Duration.between(begin, end);
LOG.trace("Online check for {} models took {} seconds", models.size(), timeCheckTook.getSeconds());
long sleepTime = Config.getInstance().getSettings().onlineCheckIntervalInSecs;
if(timeCheckTook.getSeconds() < sleepTime) {
try {
if (running) {
long millis = TimeUnit.SECONDS.toMillis(sleepTime - timeCheckTook.getSeconds());
LOG.trace("Sleeping {}ms", millis);
Thread.sleep(millis);
}
} catch (InterruptedException e) {
LOG.trace("Sleep interrupted");
}
}
}
LOG.debug(getName() + " terminated");
}
}
private void fireModelOnlineStateChanged(Model model, Model.STATUS status) {
Map<String, Object> evt = new HashMap<>();
evt.put(EVENT, MODEL_STATUS_CHANGED);
evt.put(STATUS, status);
evt.put(MODEL, model);
EventBusHolder.BUS.post(evt);
LOG.debug("Event fired {}", evt);
}
private void fireRecordingStateChanged(Model model, Recording.STATUS status) {
Map<String, Object> evt = new HashMap<>();
evt.put(EVENT, RECORDING_STATUS_CHANGED);

View File

@ -0,0 +1,117 @@
package ctbrec.recorder;
import static ctbrec.EventBusHolder.*;
import static ctbrec.EventBusHolder.EVENT_TYPE.*;
import static ctbrec.Model.STATUS.*;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ctbrec.Config;
import ctbrec.EventBusHolder;
import ctbrec.Model;
import ctbrec.Model.STATUS;
import ctbrec.io.HttpException;
public class OnlineMonitor extends Thread {
private static final transient Logger LOG = LoggerFactory.getLogger(OnlineMonitor.class);
private static final boolean IGNORE_CACHE = true;
private volatile boolean running = false;
private Recorder recorder;
private Map<Model, STATUS> states = new HashMap<>();
public OnlineMonitor(Recorder recorder) {
this.recorder = recorder;
setName("OnlineMonitor");
setDaemon(true);
}
@Override
public void run() {
running = true;
while (running) {
Instant begin = Instant.now();
List<Model> models = recorder.getModelsRecording();
// remove models, which are not recorded anymore
for (Iterator<Model> iterator = states.keySet().iterator(); iterator.hasNext();) {
Model model = iterator.next();
if(!models.contains(model)) {
iterator.remove();
}
}
// update the currently recorded models
for (Model model : models) {
try {
if(model.isOnline(IGNORE_CACHE)) {
fireModelOnline(model);
}
STATUS state = model.getOnlineState(false);
STATUS oldState = states.getOrDefault(model, UNKNOWN);
states.put(model, state);
if(state != oldState) {
fireModelOnlineStateChanged(model, oldState, state);
}
} catch (HttpException e) {
LOG.error("Couldn't check if model {} is online. HTTP Response: {} - {}",
model.getName(), e.getResponseCode(), e.getResponseMessage());
} catch (SocketTimeoutException e) {
LOG.error("Couldn't check if model {} is online. Request timed out", model.getName());
} catch (Exception e) {
LOG.error("Couldn't check if model {} is online", model.getName(), e);
}
}
Instant end = Instant.now();
Duration timeCheckTook = Duration.between(begin, end);
LOG.trace("Online check for {} models took {} seconds", models.size(), timeCheckTook.getSeconds());
long sleepTime = Config.getInstance().getSettings().onlineCheckIntervalInSecs;
if(timeCheckTook.getSeconds() < sleepTime) {
try {
if (running) {
long millis = TimeUnit.SECONDS.toMillis(sleepTime - timeCheckTook.getSeconds());
LOG.trace("Sleeping {}ms", millis);
Thread.sleep(millis);
}
} catch (InterruptedException e) {
LOG.trace("Sleep interrupted");
}
}
}
LOG.debug(getName() + " terminated");
}
private void fireModelOnline(Model model) {
Map<String, Object> evt = new HashMap<>();
evt.put(EVENT, MODEL_ONLINE);
evt.put(MODEL, model);
EventBusHolder.BUS.post(evt);
}
private void fireModelOnlineStateChanged(Model model, STATUS oldStatus, STATUS newStatus) {
Map<String, Object> evt = new HashMap<>();
evt.put(EVENT, MODEL_STATUS_CHANGED);
evt.put(STATUS, newStatus);
evt.put(OLD, oldStatus);
evt.put(MODEL, model);
EventBusHolder.BUS.post(evt);
LOG.debug("Event fired {}", evt);
}
public void shutdown() {
running = false;
interrupt();
}
}

View File

@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory;
import ctbrec.Config;
import ctbrec.recorder.LocalRecorder;
import ctbrec.recorder.OnlineMonitor;
import ctbrec.recorder.Recorder;
import ctbrec.sites.Site;
import ctbrec.sites.bonga.BongaCams;
@ -30,6 +31,7 @@ public class HttpServer {
private static final transient Logger LOG = LoggerFactory.getLogger(HttpServer.class);
private Recorder recorder;
private OnlineMonitor onlineMonitor;
private Config config;
private Server server = new Server();
private List<Site> sites = new ArrayList<>();
@ -54,6 +56,8 @@ public class HttpServer {
LOG.info("HMAC authentication is enabled");
}
recorder = new LocalRecorder(config);
OnlineMonitor monitor = new OnlineMonitor(recorder);
monitor.start();
for (Site site : sites) {
if(site.isEnabled()) {
site.init();
@ -75,6 +79,9 @@ public class HttpServer {
@Override
public void run() {
LOG.info("Shutting down");
if(onlineMonitor != null) {
onlineMonitor.shutdown();
}
if(recorder != null) {
recorder.shutdown();
}