forked from j62/ctbrec
1
0
Fork 0

Count the websocket uses wth AtomicInteger

This commit is contained in:
0xboobface 2019-01-22 22:42:46 +01:00
parent fc6aeff94a
commit 64c82748dc
4 changed files with 78 additions and 34 deletions

View File

@ -96,7 +96,7 @@ public class JavaFxModel implements Model {
return pausedProperty;
}
Model getDelegate() {
public Model getDelegate() {
return delegate;
}

View File

@ -9,17 +9,18 @@ import ctbrec.Model;
import ctbrec.sites.ConfigUI;
import ctbrec.sites.fc2live.Fc2Live;
import ctbrec.sites.fc2live.Fc2Model;
import ctbrec.ui.JavaFxModel;
import ctbrec.ui.Player;
import ctbrec.ui.TabProvider;
import ctbrec.ui.sites.AbstractSiteUi;
public class Fc2LiveSiteUi extends AbstractSiteUi {
private static final transient Logger LOG = LoggerFactory.getLogger(Fc2LiveSiteUi.class);
private Fc2Live fc2live;
//private Fc2Live fc2live;
private Fc2TabProvider tabProvider;
public Fc2LiveSiteUi(Fc2Live fc2live) {
this.fc2live = fc2live;
//this.fc2live = fc2live;
this.tabProvider = new Fc2TabProvider(fc2live);
}
@ -41,22 +42,17 @@ public class Fc2LiveSiteUi extends AbstractSiteUi {
@Override
public boolean play(Model model) {
new Thread(() -> {
Fc2Model m = (Fc2Model) model;
Fc2Model m;
if(model instanceof JavaFxModel) {
m = (Fc2Model) ((JavaFxModel)model).getDelegate();
} else {
m = (Fc2Model) model;
}
try {
boolean opened = m.openWebsocket();
if(opened) {
LOG.debug("Opened new websocket for player");
} else {
LOG.debug("Using existing websocket for player");
}
LOG.debug("Starting player");
m.openWebsocket();
LOG.debug("Starting player for {}", model);
Player.play(model, false);
if(opened) {
LOG.debug("Closing websocket for player");
m.closeWebsocket();
} else {
LOG.debug("Leaving websocket for player open");
}
} catch (InterruptedException | IOException e) {
LOG.error("Error opening websocket connection", e);
}

View File

@ -2,6 +2,9 @@ package ctbrec.sites.fc2live;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ctbrec.Config;
import ctbrec.Model;
import ctbrec.io.HttpClient;
@ -9,16 +12,23 @@ import ctbrec.recorder.download.MergedHlsDownload;
public class Fc2MergedHlsDownload extends MergedHlsDownload {
private Fc2WebSocketClient ws;
private static final transient Logger LOG = LoggerFactory.getLogger(Fc2MergedHlsDownload.class);
public Fc2MergedHlsDownload(HttpClient client) {
super(client);
}
@Override
public void start(Model model, Config config) throws IOException {
Fc2Model fc2Model = (Fc2Model) model;
try {
fc2Model.openWebsocket();
super.start(model, config);
} catch (InterruptedException e) {
LOG.error("Couldn't start download for {}", model, e);
} finally {
fc2Model.closeWebsocket();
}
}
@Override

View File

@ -6,6 +6,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.json.JSONArray;
@ -22,10 +24,13 @@ import com.iheartradio.m3u8.data.MasterPlaylist;
import com.iheartradio.m3u8.data.Playlist;
import com.iheartradio.m3u8.data.PlaylistData;
import com.iheartradio.m3u8.data.StreamInfo;
import com.squareup.moshi.JsonReader;
import com.squareup.moshi.JsonWriter;
import ctbrec.AbstractModel;
import ctbrec.Config;
import ctbrec.io.HttpException;
import ctbrec.recorder.download.Download;
import ctbrec.recorder.download.StreamSource;
import okhttp3.FormBody;
import okhttp3.Request;
@ -43,6 +48,7 @@ public class Fc2Model extends AbstractModel {
private String version;
private WebSocket ws;
private String playlistUrl;
private AtomicInteger websocketUsage = new AtomicInteger(0);
@Override
public boolean isOnline(boolean ignoreCache) throws IOException, ExecutionException, InterruptedException {
@ -98,16 +104,15 @@ public class Fc2Model extends AbstractModel {
@Override
public List<StreamSource> getStreamSources() throws IOException, ExecutionException, ParseException, PlaylistException {
try {
boolean opened = openWebsocket();
openWebsocket();
List<StreamSource> sources = new ArrayList<>();
LOG.debug("Paylist url {}", playlistUrl);
sources.addAll(parseMasterPlaylist(playlistUrl));
if(opened) {
closeWebsocket();
}
return sources;
} catch (InterruptedException e1) {
throw new ExecutionException(e1);
} finally {
closeWebsocket();
}
}
@ -226,14 +231,18 @@ public class Fc2Model extends AbstractModel {
/**
* Opens a chat websocket connection. This connection is used to retrieve the HLS playlist url. It also has to be kept open as long as the HLS stream is
* "played"
* "played". Fc2Model keeps track of the number of objects, which tried to open or close the websocket. As long as at least one object is using the
* websocket, it is kept open. If the last object, which is using it, calls closeWebsocket, the websocket is closed.
*
* @return true, if a new websocket connection is opened. If the connection was already open, this method returns false
* @throws IOException
*/
public boolean openWebsocket() throws InterruptedException, IOException {
public void openWebsocket() throws InterruptedException, IOException {
// TODO send heartbeat (maybe every minute) {"name":"heartbeat","arguments":{},"id":2}
int usage = websocketUsage.incrementAndGet();
LOG.debug("{} objects using the websocket for {}", usage, this);
if(ws != null) {
return false;
return;
} else {
Object monitor = new Object();
loadModelInfo();
@ -268,6 +277,10 @@ public class Fc2Model extends AbstractModel {
synchronized (monitor) {
monitor.notify();
}
} else if(json.optString("name").equals("user_count") || json.optString("name").equals("comment")) {
// ignore
} else {
LOG.debug("WS <-- {}", text);
}
}
@ -289,15 +302,40 @@ public class Fc2Model extends AbstractModel {
});
});
synchronized (monitor) {
monitor.wait();
// wait at max 10 seconds, otherwise we can assume, that the stream is not available
LOG.debug("No playlist response for 10 seconds");
monitor.wait(TimeUnit.SECONDS.toMillis(10));
}
return true;
}
}
public boolean closeWebsocket() {
public void closeWebsocket() {
int websocketUsers = websocketUsage.decrementAndGet();
LOG.debug("{} objects using the websocket for {}", websocketUsers, this);
if(websocketUsers == 0) {
LOG.debug("Closing the websocket for {}", this);
ws.close(1000, "");
ws = null;
return true;
}
}
@Override
public Download createDownload() {
if(Config.isServerMode()) {
return super.createDownload(); // TODO implement fc2 download for server
} else {
return new Fc2MergedHlsDownload(getSite().getHttpClient());
}
}
@Override
public void readSiteSpecificData(JsonReader reader) throws IOException {
reader.nextName();
id = reader.nextString();
}
@Override
public void writeSiteSpecificData(JsonWriter writer) throws IOException {
writer.name("id").value(id);
}
}