Make sure that LiveJasmin websocket threads are stopped, if a recording is done

This commit is contained in:
0xb00bface 2023-05-29 17:01:14 +02:00
parent 013d28c33d
commit 2ab0c99c76
7 changed files with 52 additions and 288 deletions

View File

@ -1,51 +0,0 @@
package ctbrec.sites.jasmin;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.JAXBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.iheartradio.m3u8.ParseException;
import com.iheartradio.m3u8.PlaylistException;
import ctbrec.io.HttpClient;
import ctbrec.recorder.download.hls.HlsDownload;
import ctbrec.recorder.download.hls.SegmentPlaylist;
public class LiveJasminHlsDownload extends HlsDownload {
private static final Logger LOG = LoggerFactory.getLogger(LiveJasminHlsDownload.class);
private long lastMasterPlaylistUpdate = 0;
private String segmentUrl;
public LiveJasminHlsDownload(HttpClient client) {
super(client);
}
@Override
protected SegmentPlaylist getNextSegments(String segments) throws IOException, ParseException, PlaylistException {
if(this.segmentUrl == null) {
this.segmentUrl = segments;
}
SegmentPlaylist playlist = super.getNextSegments(segmentUrl);
long now = System.currentTimeMillis();
if( (now - lastMasterPlaylistUpdate) > TimeUnit.SECONDS.toMillis(60)) {
super.downloadExecutor.submit(this::updatePlaylistUrl);
lastMasterPlaylistUpdate = now;
}
return playlist;
}
private void updatePlaylistUrl() {
try {
LOG.debug("Updating segment playlist URL for {}", getModel());
segmentUrl = getSegmentPlaylistUrl(getModel());
} catch (IOException | JAXBException | ExecutionException | ParseException | PlaylistException e) {
LOG.error("Couldn't update segment playlist url. This might cause a premature download termination", e);
}
}
}

View File

@ -1,51 +0,0 @@
package ctbrec.sites.jasmin;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.JAXBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.iheartradio.m3u8.ParseException;
import com.iheartradio.m3u8.PlaylistException;
import ctbrec.io.HttpClient;
import ctbrec.recorder.download.hls.MergedFfmpegHlsDownload;
import ctbrec.recorder.download.hls.SegmentPlaylist;
public class LiveJasminMergedHlsDownload extends MergedFfmpegHlsDownload {
private static final Logger LOG = LoggerFactory.getLogger(LiveJasminMergedHlsDownload.class);
private long lastMasterPlaylistUpdate = 0;
private String segmentUrl;
public LiveJasminMergedHlsDownload(HttpClient client) {
super(client);
}
@Override
protected SegmentPlaylist getNextSegments(String segments) throws IOException, ParseException, PlaylistException {
if(this.segmentUrl == null) {
this.segmentUrl = segments;
}
SegmentPlaylist playlist = super.getNextSegments(segmentUrl);
long now = System.currentTimeMillis();
if( (now - lastMasterPlaylistUpdate) > TimeUnit.SECONDS.toMillis(60)) {
super.downloadExecutor.submit(this::updatePlaylistUrl);
lastMasterPlaylistUpdate = now;
}
return playlist;
}
private void updatePlaylistUrl() {
try {
LOG.debug("Updating segment playlist URL for {}", getModel());
segmentUrl = getSegmentPlaylistUrl(getModel());
} catch (IOException | JAXBException | ExecutionException | ParseException | PlaylistException e) {
LOG.error("Couldn't update segment playlist url. This might cause a premature download termination", e);
}
}
}

View File

@ -17,10 +17,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Random;
import java.util.*;
import java.util.concurrent.ExecutionException;
import static ctbrec.io.HttpConstants.*;
@ -129,6 +126,7 @@ public class LiveJasminModel extends AbstractModel {
LiveJasminStreamRegistration liveJasminStreamRegistration = new LiveJasminStreamRegistration(site, modelInfo);
List<StreamSource> streamSources = liveJasminStreamRegistration.getStreamSources();
Collections.sort(streamSources);
return streamSources;
}
@ -159,10 +157,8 @@ public class LiveJasminModel extends AbstractModel {
} catch (IOException e) {
throw new ExecutionException(e);
}
return resolution;
} else {
return resolution;
}
return resolution;
}
@Override

View File

@ -20,6 +20,7 @@ import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static ctbrec.io.HttpConstants.USER_AGENT;
import static java.nio.charset.StandardCharsets.UTF_8;
@ -35,6 +36,7 @@ public class LiveJasminStreamRegistration {
private final CyclicBarrier barrier = new CyclicBarrier(2);
private int streamCount = 0;
private WebSocket webSocket;
public LiveJasminStreamRegistration(Site site, LiveJasminModelInfo modelInfo) {
this.site = site;
@ -49,9 +51,10 @@ public class LiveJasminStreamRegistration {
.addHeader(USER_AGENT, Config.getInstance().getSettings().httpUserAgentMobile)
.build();
log.debug("Websocket: {}", modelInfo.getWebsocketUrl());
site.getHttpClient().newWebSocket(webSocketRequest, new WebSocketListener() {
webSocket = site.getHttpClient().newWebSocket(webSocketRequest, new WebSocketListener() {
@Override
public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) {
Thread.currentThread().setName("Stream registration for " + modelInfo.getPerformerId());
log.debug("onOpen");
JSONObject register = new JSONObject()
.put(KEY_EVENT, "register")
@ -150,7 +153,7 @@ public class LiveJasminStreamRegistration {
.put("protocols", new JSONArray()
.put("h5live")
)
.put("streamId", src.streamId)
.put("streamId", src.getStreamId())
.put("correlationId", UUID.randomUUID().toString().replace("-", "").substring(0, 16))
)
);
@ -163,7 +166,7 @@ public class LiveJasminStreamRegistration {
JSONObject data = message.getJSONArray("data").getJSONArray(0).getJSONObject(0);
String streamId = data.getString("streamId");
String wssUrl = data.getJSONObject("protocol").getJSONObject("h5live").getString("wssUrl");
streamSources.stream().filter(src -> Objects.equals(src.streamId, streamId)).findAny().ifPresent(src -> src.mediaPlaylistUrl = wssUrl);
streamSources.stream().filter(src -> Objects.equals(src.getStreamId(), streamId)).findAny().ifPresent(src -> src.mediaPlaylistUrl = wssUrl);
if (--streamCount == 0) {
awaitBarrier();
}
@ -197,68 +200,7 @@ public class LiveJasminStreamRegistration {
} catch (Exception e) {
log.error("Couldn't determine stream sources", e);
}
return streamSources.stream().map(StreamSource.class::cast).toList();
}
public void keepStreamAlive() {
try {
Request webSocketRequest = new Request.Builder()
.url(modelInfo.getWebsocketUrl())
.addHeader(USER_AGENT, Config.getInstance().getSettings().httpUserAgentMobile)
.build();
log.debug("Websocket: {}", modelInfo.getWebsocketUrl());
site.getHttpClient().newWebSocket(webSocketRequest, new WebSocketListener() {
@Override
public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) {
log.debug("onOpen");
webSocket.send(new JSONObject().put(KEY_EVENT, "ping").toString());
}
@Override
public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) {
log.error("onFailure", t);
webSocket.close(1000, "");
}
@Override
public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
JSONObject message = new JSONObject(text);
if (message.opt(KEY_EVENT).equals("pong")) {
new Thread(() -> {
try {
Thread.sleep(message.optInt("nextPing"));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
webSocket.send(new JSONObject().put(KEY_EVENT, "ping").toString());
}).start();
}
}
@Override
public void onMessage(@NotNull WebSocket webSocket, @NotNull ByteString bytes) {
log.debug("onMessageB");
super.onMessage(webSocket, bytes);
}
@Override
public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
log.debug("onClosed {} {}", code, reason);
super.onClosed(webSocket, code, reason);
}
@Override
public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
log.debug("onClosing {} {}", code, reason);
awaitBarrier();
}
});
log.debug("Waiting for websocket to return");
awaitBarrier();
} catch (Exception e) {
log.error("Couldn't determine stream sources", e);
}
return streamSources.stream().map(StreamSource.class::cast).collect(Collectors.toList()); // NOSONAR
}
private void addStreamSource(LinkedList<LiveJasminStreamSource> streamSources, String pattern, JSONObject stream) {
@ -286,9 +228,10 @@ public class LiveJasminStreamRegistration {
streamSource.width = w;
streamSource.height = h;
streamSource.bandwidth = bitrate;
streamSource.rtmpUrl = rtmpUrl;
streamSource.streamName = streamName;
streamSource.streamId = streamId;
streamSource.setRtmpUrl(rtmpUrl);
streamSource.setStreamName(streamName);
streamSource.setStreamId(streamId);
streamSource.setStreamRegistration(this);
streamSources.add(streamSource);
}
@ -302,4 +245,8 @@ public class LiveJasminStreamRegistration {
log.error(e.getLocalizedMessage(), e);
}
}
void close() {
webSocket.close(1000, "");
}
}

View File

@ -1,9 +1,14 @@
package ctbrec.sites.jasmin;
import ctbrec.recorder.download.StreamSource;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class LiveJasminStreamSource extends StreamSource {
public String rtmpUrl;
public String streamName;
public String streamId;
private String rtmpUrl;
private String streamName;
private String streamId;
private LiveJasminStreamRegistration streamRegistration;
}

View File

@ -1,107 +0,0 @@
package ctbrec.sites.jasmin;
import ctbrec.Config;
import ctbrec.sites.Site;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URLEncoder;
import java.util.Optional;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static ctbrec.io.HttpConstants.USER_AGENT;
import static java.nio.charset.StandardCharsets.UTF_8;
public class LiveJasminStreamStarter {
private static final Logger LOG = LoggerFactory.getLogger(LiveJasminStreamStarter.class);
private final CyclicBarrier barrier = new CyclicBarrier(2);
void start(Site site, LiveJasminModelInfo modelInfo, LiveJasminStreamSource ss) {
try {
String websocketUrl = "wss://dss-live-{ipWithDashes}.dditscdn.com/h5live/http/playlist.m3u8?url={rtmpUrl}&stream={streamName}"
.replace("{ipWithDashes}", modelInfo.getSbIp().replace('.', '-'))
.replace("{rtmpUrl}", URLEncoder.encode(ss.rtmpUrl, UTF_8))
.replace("{streamName}", URLEncoder.encode(ss.streamName, UTF_8));
Request webSocketRequest = new Request.Builder()
.url(websocketUrl)
.addHeader(USER_AGENT, Config.getInstance().getSettings().httpUserAgentMobile)
.build();
LOG.debug("Websocket: {}", websocketUrl);
site.getHttpClient().newWebSocket(webSocketRequest, new WebSocketListener() {
@Override
public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) {
LOG.debug("onOpen");
}
@Override
public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) {
String body = Optional.ofNullable(response).map(Response::body).map(responseBody -> {
try {
return responseBody.string();
} catch (IOException e) {
return "";
}
}).orElse("");
LOG.error("onFailure Body:[{}]", body, t);
awaitBarrier();
webSocket.close(1000, "");
}
@Override
public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
LOG.debug("{}", new JSONObject(text).toString(2));
webSocket.close(1000, "");
}
@Override
public void onMessage(@NotNull WebSocket webSocket, @NotNull ByteString bytes) {
LOG.trace("onMessageB");
super.onMessage(webSocket, bytes);
}
@Override
public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
LOG.debug("onClosed {} {}", code, reason);
super.onClosed(webSocket, code, reason);
}
@Override
public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
LOG.trace("onClosing {} {}", code, reason);
awaitBarrier();
}
});
LOG.debug("Waiting for websocket to return");
awaitBarrier();
LOG.debug("Websocket is done.");
} catch (Exception e) {
LOG.error("Couldn't start stream", e);
}
}
private void awaitBarrier() {
try {
barrier.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error(e.getLocalizedMessage(), e);
} catch (TimeoutException | BrokenBarrierException e) {
LOG.error(e.getLocalizedMessage(), e);
}
}
}

View File

@ -9,6 +9,7 @@ import ctbrec.io.BandwidthMeter;
import ctbrec.io.HttpClient;
import ctbrec.recorder.download.AbstractDownload;
import ctbrec.recorder.download.RecordingProcess;
import ctbrec.recorder.download.StreamSource;
import ctbrec.sites.showup.Showup;
import okhttp3.Request;
import okhttp3.Response;
@ -25,6 +26,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;
@ -141,8 +143,11 @@ public class LiveJasminWebrtcDownload extends AbstractDownload {
private void startDownload() throws IOException, PlaylistException, ParseException, ExecutionException {
LiveJasminModel liveJasminModel = (LiveJasminModel) model;
List<StreamSource> streamSources = liveJasminModel.getStreamSources();
LiveJasminStreamSource streamSource = (LiveJasminStreamSource) selectStreamSource(streamSources);
LiveJasminStreamRegistration streamRegistration = streamSource.getStreamRegistration();
Request request = new Request.Builder()
.url(liveJasminModel.getStreamSources().get(0).getMediaPlaylistUrl())
.url(streamSource.getMediaPlaylistUrl())
.header(USER_AGENT, Config.getInstance().getSettings().httpUserAgent)
.header(ACCEPT, "*/*")
.header(ACCEPT_LANGUAGE, "pl")
@ -202,6 +207,7 @@ public class LiveJasminWebrtcDownload extends AbstractDownload {
if (response != null) {
response.close();
}
streamRegistration.close();
}
@Override
@ -215,7 +221,26 @@ public class LiveJasminWebrtcDownload extends AbstractDownload {
super.onClosed(webSocket, code, reason);
LOG.debug("Websocket closed for model {} {} {}", model, code, reason);
stop();
streamRegistration.close();
}
});
}
@Override
public void awaitEnd() {
long secondsToWait = 30;
for (int i = 0; i < secondsToWait; i++) {
if (ws == null) {
break;
} else {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted while waiting for the download to terminate");
}
}
}
LOG.warn("Download didn't finish in {} seconds", secondsToWait);
}
}