forked from j62/ctbrec
1
0
Fork 0

Improve error handling and set timeouts in the Cam4 websocket

This commit is contained in:
0xb00bface 2020-12-19 17:41:44 +01:00
parent 3d076cdde6
commit a3ffa7a71e
2 changed files with 24 additions and 7 deletions

View File

@ -87,9 +87,13 @@ public class Cam4Model extends AbstractModel {
case "INSIDE_PS": case "INSIDE_PS":
onlineState = PRIVATE; onlineState = PRIVATE;
break; break;
case "INSIDE_GS":
case "GROUP_SHOW": case "GROUP_SHOW":
onlineState = GROUP; onlineState = GROUP;
break; break;
case "PAUSED":
onlineState = AWAY;
break;
case "OFFLINE": case "OFFLINE":
onlineState = OFFLINE; onlineState = OFFLINE;
break; break;

View File

@ -2,6 +2,7 @@ package ctbrec.sites.cam4;
import static ctbrec.io.HttpConstants.*; import static ctbrec.io.HttpConstants.*;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -34,7 +35,8 @@ public class Cam4WsClient {
private String token; private String token;
private WebSocket websocket; private WebSocket websocket;
private int r = 1; private int r = 1;
private Map<String, CompletableFuture<String>> responseFutures = new HashMap<>(); private Map<String, CompletableFuture<String>> responseFuturesByPath = new HashMap<>();
private Map<Integer, CompletableFuture<String>> responseFuturesBySequence = new HashMap<>();
public Cam4WsClient(Config config, Cam4 site, Cam4Model model) { public Cam4WsClient(Config config, Cam4 site, Cam4Model model) {
this.config = config; this.config = config;
@ -55,7 +57,7 @@ public class Cam4WsClient {
String p = "chatRooms/" + model.getName() + "/roomState"; String p = "chatRooms/" + model.getName() + "/roomState";
CompletableFuture<String> roomStateFuture = send(p, "{\"t\":\"d\",\"d\":{\"r\":" + (r++) + ",\"a\":\"q\",\"b\":{\"p\":\"" + p + "\",\"h\":\"\"}}}"); CompletableFuture<String> roomStateFuture = send(p, "{\"t\":\"d\",\"d\":{\"r\":" + (r++) + ",\"a\":\"q\",\"b\":{\"p\":\"" + p + "\",\"h\":\"\"}}}");
try { try {
JSONObject roomState = parseRoomStateResponse(roomStateFuture.get(5000, TimeUnit.SECONDS)); JSONObject roomState = parseRoomStateResponse(roomStateFuture.get(1, TimeUnit.SECONDS));
websocket.close(1000, ""); websocket.close(1000, "");
return roomState; return roomState;
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -69,7 +71,7 @@ public class Cam4WsClient {
private boolean connectAndAuthorize() throws IOException { private boolean connectAndAuthorize() throws IOException {
CompletableFuture<Boolean> connectedAndAuthorized = openWebsocketConnection(); CompletableFuture<Boolean> connectedAndAuthorized = openWebsocketConnection();
try { try {
return connectedAndAuthorized.get(5000, TimeUnit.SECONDS); return connectedAndAuthorized.get(1, TimeUnit.SECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new IOException("Interrupted while connecting with websocket"); throw new IOException("Interrupted while connecting with websocket");
@ -85,7 +87,7 @@ public class Cam4WsClient {
if (!sent) { if (!sent) {
future.completeExceptionally(new IOException("send() returned false")); future.completeExceptionally(new IOException("send() returned false"));
} else { } else {
responseFutures.put(p, future); responseFuturesByPath.put(p, future);
} }
return future; return future;
} }
@ -157,6 +159,9 @@ public class Cam4WsClient {
public void onFailure(WebSocket webSocket, Throwable t, Response response) { public void onFailure(WebSocket webSocket, Throwable t, Response response) {
super.onFailure(webSocket, t, response); super.onFailure(webSocket, t, response);
try { try {
if (t instanceof EOFException) {
return;
}
if(response != null) { if(response != null) {
LOG.error("failure {}: {}", model, response.body().string(), t); LOG.error("failure {}: {}", model, response.body().string(), t);
} else { } else {
@ -164,9 +169,10 @@ public class Cam4WsClient {
} }
} catch (IOException e) { } catch (IOException e) {
LOG.error("Connection failure and couldn't get the response body", e); LOG.error("Connection failure and couldn't get the response body", e);
} } finally {
connectedAndAuthorized.completeExceptionally(t); connectedAndAuthorized.completeExceptionally(t);
} }
}
@Override @Override
public void onMessage(WebSocket webSocket, String text) { public void onMessage(WebSocket webSocket, String text) {
@ -180,11 +186,18 @@ public class Cam4WsClient {
JSONObject body = d.getJSONObject("b"); JSONObject body = d.getJSONObject("b");
String status = body.optString("s"); String status = body.optString("s");
connectedAndAuthorized.complete(status.equals("ok")); connectedAndAuthorized.complete(status.equals("ok"));
} else if (responseFuturesBySequence.containsKey(responseSequence)) {
JSONObject body = d.getJSONObject("b");
String status = body.optString("s");
if (!status.equals("ok")) {
CompletableFuture<String> future = responseFuturesBySequence.remove(responseSequence);
future.completeExceptionally(new IOException(status));
}
} else if (d.has("b")) { } else if (d.has("b")) {
JSONObject body = d.getJSONObject("b"); JSONObject body = d.getJSONObject("b");
String p = body.optString("p", "-"); String p = body.optString("p", "-");
if (responseFutures.containsKey(p)) { if (responseFuturesByPath.containsKey(p)) {
CompletableFuture<String> future = responseFutures.get(p); CompletableFuture<String> future = responseFuturesByPath.remove(p);
future.complete(text); future.complete(text);
} }
} }