diff --git a/client/src/main/resources/logback.xml b/client/src/main/resources/logback.xml index 62404622..ea0f86d8 100644 --- a/client/src/main/resources/logback.xml +++ b/client/src/main/resources/logback.xml @@ -45,7 +45,7 @@ - + diff --git a/common/src/main/java/ctbrec/recorder/OnlineMonitor.java b/common/src/main/java/ctbrec/recorder/OnlineMonitor.java index 93a4a93c..3723b537 100644 --- a/common/src/main/java/ctbrec/recorder/OnlineMonitor.java +++ b/common/src/main/java/ctbrec/recorder/OnlineMonitor.java @@ -1,44 +1,38 @@ package ctbrec.recorder; -import static ctbrec.Model.State.*; - -import java.io.InterruptedIOException; -import java.net.SocketTimeoutException; -import java.time.Duration; -import java.time.Instant; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import ctbrec.Config; import ctbrec.Model; import ctbrec.event.EventBusHolder; import ctbrec.event.ModelIsOnlineEvent; import ctbrec.event.ModelStateChangedEvent; import ctbrec.io.HttpException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InterruptedIOException; +import java.net.SocketTimeoutException; +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; + +import static ctbrec.Model.State.UNKNOWN; public class OnlineMonitor extends Thread { private static final Logger LOG = LoggerFactory.getLogger(OnlineMonitor.class); private static final boolean IGNORE_CACHE = true; + private static final long TIMEOUT_IN_MILLIS = 2000; private volatile boolean running = false; - private Recorder recorder; + private final Recorder recorder; - private Map states = new HashMap<>(); + private final Map states = new HashMap<>(); - private Map executors = new HashMap<>(); - private Config config; + private final Map executors = new HashMap<>(); + private final Config config; public OnlineMonitor(Recorder recorder, Config config) { this.recorder = recorder; @@ -68,19 +62,14 @@ public class OnlineMonitor extends Thread { } private void removeDeletedModels(List models) { - for (Iterator iterator = states.keySet().iterator(); iterator.hasNext();) { - Model model = iterator.next(); - if(!models.contains(model)) { - iterator.remove(); - } - } + states.keySet().removeIf(model -> !models.contains(model)); } private void updateModels(List models) { // sort models by priority - Collections.sort(models, (a, b) -> b.getPriority() - a.getPriority()); + models.sort((a, b) -> b.getPriority() - a.getPriority()); // submit online check jobs to the executor for the model's site - List> futures = new LinkedList<>(); + List futures = new LinkedList<>(); for (Model model : models) { boolean skipCheckForSuspended = config.getSettings().onlineCheckSkipsPausedModels && model.isSuspended(); boolean skipCheckForMarkedAsLater = model.isMarkedForLaterRecording(); @@ -91,18 +80,21 @@ public class OnlineMonitor extends Thread { } } // wait for all jobs to finish - for (Future future : futures) { + for (ModelAwareFuture future : futures) { try { - future.get(); + future.get(TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { + LOG.debug("Online check interrupted for model {}", future.getModel(), e); Thread.currentThread().interrupt(); } catch (ExecutionException e) { - LOG.info("Error while checking online state", e); + LOG.info("Error while checking online state for model {}", future.getModel(), e); + } catch (TimeoutException e) { + LOG.debug("Online check didn't finish after {}ms for model {}", TIMEOUT_IN_MILLIS, future.getModel()); } } } - private Future updateModel(Model model) { + private ModelAwareFuture updateModel(Model model) { final String siteName = model.getSite().getName(); ExecutorService executor = executors.computeIfAbsent(siteName, name -> Executors.newSingleThreadExecutor(r -> { Thread t = new Thread(r); @@ -112,14 +104,14 @@ public class OnlineMonitor extends Thread { return t; })); - return executor.submit(() -> { + var future = executor.submit(() -> { try { if (model.isOnline(IGNORE_CACHE)) { EventBusHolder.BUS.post(new ModelIsOnlineEvent(model)); model.setLastSeen(Instant.now()); } Model.State state = model.getOnlineState(false); - LOG.trace("Model online state: {} {}", model.getName(), state); + LOG.debug("Model online state: {} {}", model.getName(), state); Model.State oldState = states.getOrDefault(model, UNKNOWN); states.put(model, state); if (state != oldState) { @@ -137,13 +129,15 @@ public class OnlineMonitor extends Thread { } catch (Exception e) { LOG.error("Couldn't check if model {} is online", model.getName(), e); } + return model; }); + return new ModelAwareFuture(model, future); } private void suspendUntilNextIteration(List models, Duration timeCheckTook) { LOG.debug("Online check for {} models took {} seconds", models.size(), timeCheckTook.getSeconds()); long sleepTime = config.getSettings().onlineCheckIntervalInSecs; - if(timeCheckTook.getSeconds() < sleepTime) { + if (timeCheckTook.getSeconds() < sleepTime) { try { if (running) { long millis = TimeUnit.SECONDS.toMillis(sleepTime - timeCheckTook.getSeconds()); @@ -164,4 +158,36 @@ public class OnlineMonitor extends Thread { } interrupt(); } -} \ No newline at end of file + + private record ModelAwareFuture(Model model, Future future) implements Future { + + public Model getModel() { + return model; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return future.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return future.isCancelled(); + } + + @Override + public boolean isDone() { + return future.isDone(); + } + + @Override + public Model get() throws InterruptedException, ExecutionException { + return future.get(); + } + + @Override + public Model get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return future.get(timeout, unit); + } + } +} diff --git a/common/src/main/java/ctbrec/sites/mfc/MyFreeCamsClient.java b/common/src/main/java/ctbrec/sites/mfc/MyFreeCamsClient.java index 1b91293d..dbf17437 100644 --- a/common/src/main/java/ctbrec/sites/mfc/MyFreeCamsClient.java +++ b/common/src/main/java/ctbrec/sites/mfc/MyFreeCamsClient.java @@ -20,6 +20,7 @@ import java.net.URLDecoder; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; @@ -125,8 +126,8 @@ public class MyFreeCamsClient { .header(USER_AGENT, Config.getInstance().getSettings().httpUserAgent) .header(CONNECTION, KEEP_ALIVE) .build(); - try(Response resp = mfc.getHttpClient().execute(req)) { - if(!resp.isSuccessful()) { + try (Response resp = mfc.getHttpClient().execute(req)) { + if (!resp.isSuccessful()) { throw new HttpException(resp.code(), resp.message()); } } @@ -208,93 +209,93 @@ public class MyFreeCamsClient { try { while ((message = parseMessage(msgBuffer)) != null) { switch (message.getType()) { - case NULL: - LOG.trace("NULL websocket still alive"); - break; - case LOGIN: - LOG.debug("LOGIN: {}", message); - sessionId = message.getReceiver(); - LOG.debug("Session ID {}", sessionId); - break; - case DETAILS: - case ROOMHELPER: - case ADDFRIEND: - case ADDIGNORE: - case CMESG: - case PMESG: - case TXPROFILE: - case MYCAMSTATE: - case MYWEBCAM: - case JOINCHAN: - case SESSIONSTATE: - if (!message.getMessage().isEmpty()) { - //LOG.debug("SessionState: {}", message.getMessage()); - JsonAdapter adapter = moshi.adapter(SessionState.class); - try { - SessionState sessionState = adapter.fromJson(message.getMessage()); - updateSessionState(sessionState); - } catch (IOException e) { - LOG.error("Couldn't parse session state message {}", message, e); + case NULL: + LOG.trace("NULL websocket still alive"); + break; + case LOGIN: + LOG.debug("LOGIN: {}", message); + sessionId = message.getReceiver(); + LOG.debug("Session ID {}", sessionId); + break; + case DETAILS: + case ROOMHELPER: + case ADDFRIEND: + case ADDIGNORE: + case CMESG: + case PMESG: + case TXPROFILE: + case MYCAMSTATE: + case MYWEBCAM: + case JOINCHAN: + case SESSIONSTATE: + if (!message.getMessage().isEmpty()) { + //LOG.debug("SessionState: {}", message.getMessage()); + JsonAdapter adapter = moshi.adapter(SessionState.class); + try { + SessionState sessionState = adapter.fromJson(message.getMessage()); + updateSessionState(sessionState); + } catch (IOException e) { + LOG.error("Couldn't parse session state message {}", message, e); + } } - } - break; - case USERNAMELOOKUP: - // LOG.debug("{}", message.getType()); - // LOG.debug("{}", message.getSender()); - // LOG.debug("{}", message.getReceiver()); - // LOG.debug("{}", message.getArg1()); - // LOG.debug("{}", message.getArg2()); - // LOG.debug("{}", message.getMessage()); - Consumer responseHandler = responseHandlers.remove(message.getArg1()); - if (responseHandler != null) { - responseHandler.accept(message); - } - break; - case TAGS: - JSONObject json = new JSONObject(message.getMessage()); - String[] names = JSONObject.getNames(json); - Integer uid = Integer.parseInt(names[0]); - SessionState sessionState = sessionStates.getIfPresent(uid); - if (sessionState != null) { - JSONArray tags = json.getJSONArray(names[0]); - for (Object obj : tags) { - sessionState.getM().getTags().add((String) obj); + break; + case USERNAMELOOKUP: + // LOG.debug("{}", message.getType()); + // LOG.debug("{}", message.getSender()); + // LOG.debug("{}", message.getReceiver()); + // LOG.debug("{}", message.getArg1()); + // LOG.debug("{}", message.getArg2()); + // LOG.debug("{}", message.getMessage()); + Consumer responseHandler = responseHandlers.remove(message.getArg1()); + if (responseHandler != null) { + responseHandler.accept(message); } - } - break; - case EXTDATA: - if (message.getArg1() == MessageTypes.LOGIN) { - // noop - } else if (message.getArg1() == MessageTypes.MANAGELIST) { - requestExtData(message.getMessage()); - } else { - LOG.debug("EXTDATA: {}", message); - } - break; - case ROOMDATA: - LOG.debug("ROOMDATA: {}", message); - break; - case UEOPT: - LOG.trace("UEOPT: {}", message); - break; - case SLAVEVSHARE: - // LOG.debug("SLAVEVSHARE {}", message); - // LOG.debug("SLAVEVSHARE MSG [{}]", message.getMessage()); - break; - case TKX: - json = new JSONObject(message.getMessage()); - tkx = json.getString("tkx"); - cxid = json.getInt("cxid"); - ctxenc = URLDecoder.decode(json.getString("ctxenc"), UTF_8); - JSONArray ctxArray = json.getJSONArray("ctx"); - ctx = new int[ctxArray.length()]; - for (int i = 0; i < ctxArray.length(); i++) { - ctx[i] = ctxArray.getInt(i); - } - break; - default: - LOG.trace("Unknown message {}", message); - break; + break; + case TAGS: + JSONObject json = new JSONObject(message.getMessage()); + String[] names = JSONObject.getNames(json); + Integer uid = Integer.parseInt(names[0]); + SessionState sessionState = sessionStates.getIfPresent(uid); + if (sessionState != null) { + JSONArray tags = json.getJSONArray(names[0]); + for (Object obj : tags) { + sessionState.getM().getTags().add((String) obj); + } + } + break; + case EXTDATA: + if (message.getArg1() == MessageTypes.LOGIN) { + // noop + } else if (message.getArg1() == MessageTypes.MANAGELIST) { + requestExtData(message.getMessage()); + } else { + LOG.debug("EXTDATA: {}", message); + } + break; + case ROOMDATA: + LOG.debug("ROOMDATA: {}", message); + break; + case UEOPT: + LOG.trace("UEOPT: {}", message); + break; + case SLAVEVSHARE: + // LOG.debug("SLAVEVSHARE {}", message); + // LOG.debug("SLAVEVSHARE MSG [{}]", message.getMessage()); + break; + case TKX: + json = new JSONObject(message.getMessage()); + tkx = json.getString("tkx"); + cxid = json.getInt("cxid"); + ctxenc = URLDecoder.decode(json.getString("ctxenc"), UTF_8); + JSONArray ctxArray = json.getJSONArray("ctx"); + ctx = new int[ctxArray.length()]; + for (int i = 0; i < ctxArray.length(); i++) { + ctx[i] = ctxArray.getInt(i); + } + break; + default: + LOG.trace("Unknown message {}", message); + break; } } } catch (Exception e) { @@ -598,7 +599,7 @@ public class MyFreeCamsClient { for (SessionState state : sessionStates.asMap().values()) { Optional nm = Optional.ofNullable(state.getNm()); Optional name = Optional.ofNullable(model.getName()); - if(nm.isEmpty() || name.isEmpty()) { + if (nm.isEmpty() || name.isEmpty()) { continue; } @@ -672,36 +673,46 @@ public class MyFreeCamsClient { LOG.trace("Sending USERNAMELOOKUP for {}", q); Object monitor = new Object(); int msgId = messageId++; + AtomicBoolean searchDone = new AtomicBoolean(false); responseHandlers.put(msgId, msg -> { - LOG.trace("Search result: {}", msg); - if (StringUtil.isNotBlank(msg.getMessage()) && !Objects.equals(msg.getMessage(), q)) { - JSONObject json = new JSONObject(msg.getMessage()); + try { + LOG.trace("Search result: {}", msg); + if (StringUtil.isNotBlank(msg.getMessage()) && !Objects.equals(msg.getMessage(), q)) { + JSONObject json = new JSONObject(msg.getMessage()); - JsonAdapter adapter = moshi.adapter(SessionState.class); - try { - SessionState sessionState = Objects.requireNonNull(adapter.fromJson(msg.getMessage())); - updateSessionState(sessionState); - } catch (Exception e) { - LOG.error("Couldn't parse session state message {}", msg, e); + JsonAdapter adapter = moshi.adapter(SessionState.class); + try { + SessionState sessionState = Objects.requireNonNull(adapter.fromJson(msg.getMessage())); + updateSessionState(sessionState); + } catch (Exception e) { + LOG.error("Couldn't parse session state message {}", msg, e); + } + + String name = json.getString("nm"); + MyFreeCamsModel model = mfc.createModel(name); + model.setUid(json.getInt("uid")); + model.setMfcState(State.of(json.getInt("vs"))); + String uid = Integer.toString(model.getUid()); + String uidStart = uid.substring(0, 3); + String previewUrl = "https://img.mfcimg.com/photos2/" + uidStart + '/' + uid + "/avatar.90x90.jpg"; + model.setPreview(previewUrl); + result.add(model); + } + } finally { + searchDone.setPlain(true); + synchronized (monitor) { + monitor.notifyAll(); } - - String name = json.getString("nm"); - MyFreeCamsModel model = mfc.createModel(name); - model.setUid(json.getInt("uid")); - model.setMfcState(State.of(json.getInt("vs"))); - String uid = Integer.toString(model.getUid()); - String uidStart = uid.substring(0, 3); - String previewUrl = "https://img.mfcimg.com/photos2/" + uidStart + '/' + uid + "/avatar.90x90.jpg"; - model.setPreview(previewUrl); - result.add(model); - } - synchronized (monitor) { - monitor.notifyAll(); } }); ws.send("10 " + sessionId + " 0 " + msgId + " 0 " + q + "\n"); - synchronized (monitor) { - monitor.wait(); + int waitInMillis = 1000; + int iterations = 0; + while (iterations < 5 && !searchDone.get()) { + synchronized (monitor) { + monitor.wait(waitInMillis); + iterations++; + } } for (MyFreeCamsModel model : models.asMap().values()) {