diff --git a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminHlsDownload.java b/common/src/main/java/ctbrec/sites/jasmin/LiveJasminHlsDownload.java deleted file mode 100644 index ecefe7d0..00000000 --- a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminHlsDownload.java +++ /dev/null @@ -1,51 +0,0 @@ -package ctbrec.sites.jasmin; - -import java.io.IOException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -import javax.xml.bind.JAXBException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.iheartradio.m3u8.ParseException; -import com.iheartradio.m3u8.PlaylistException; - -import ctbrec.io.HttpClient; -import ctbrec.recorder.download.hls.HlsDownload; -import ctbrec.recorder.download.hls.SegmentPlaylist; - -public class LiveJasminHlsDownload extends HlsDownload { - - private static final Logger LOG = LoggerFactory.getLogger(LiveJasminHlsDownload.class); - private long lastMasterPlaylistUpdate = 0; - private String segmentUrl; - - public LiveJasminHlsDownload(HttpClient client) { - super(client); - } - - @Override - protected SegmentPlaylist getNextSegments(String segments) throws IOException, ParseException, PlaylistException { - if(this.segmentUrl == null) { - this.segmentUrl = segments; - } - SegmentPlaylist playlist = super.getNextSegments(segmentUrl); - long now = System.currentTimeMillis(); - if( (now - lastMasterPlaylistUpdate) > TimeUnit.SECONDS.toMillis(60)) { - super.downloadExecutor.submit(this::updatePlaylistUrl); - lastMasterPlaylistUpdate = now; - } - return playlist; - } - - private void updatePlaylistUrl() { - try { - LOG.debug("Updating segment playlist URL for {}", getModel()); - segmentUrl = getSegmentPlaylistUrl(getModel()); - } catch (IOException | JAXBException | ExecutionException | ParseException | PlaylistException e) { - LOG.error("Couldn't update segment playlist url. This might cause a premature download termination", e); - } - } -} diff --git a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminMergedHlsDownload.java b/common/src/main/java/ctbrec/sites/jasmin/LiveJasminMergedHlsDownload.java deleted file mode 100644 index 94a9bf6c..00000000 --- a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminMergedHlsDownload.java +++ /dev/null @@ -1,51 +0,0 @@ -package ctbrec.sites.jasmin; - -import java.io.IOException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -import javax.xml.bind.JAXBException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.iheartradio.m3u8.ParseException; -import com.iheartradio.m3u8.PlaylistException; - -import ctbrec.io.HttpClient; -import ctbrec.recorder.download.hls.MergedFfmpegHlsDownload; -import ctbrec.recorder.download.hls.SegmentPlaylist; - -public class LiveJasminMergedHlsDownload extends MergedFfmpegHlsDownload { - - private static final Logger LOG = LoggerFactory.getLogger(LiveJasminMergedHlsDownload.class); - private long lastMasterPlaylistUpdate = 0; - private String segmentUrl; - - public LiveJasminMergedHlsDownload(HttpClient client) { - super(client); - } - - @Override - protected SegmentPlaylist getNextSegments(String segments) throws IOException, ParseException, PlaylistException { - if(this.segmentUrl == null) { - this.segmentUrl = segments; - } - SegmentPlaylist playlist = super.getNextSegments(segmentUrl); - long now = System.currentTimeMillis(); - if( (now - lastMasterPlaylistUpdate) > TimeUnit.SECONDS.toMillis(60)) { - super.downloadExecutor.submit(this::updatePlaylistUrl); - lastMasterPlaylistUpdate = now; - } - return playlist; - } - - private void updatePlaylistUrl() { - try { - LOG.debug("Updating segment playlist URL for {}", getModel()); - segmentUrl = getSegmentPlaylistUrl(getModel()); - } catch (IOException | JAXBException | ExecutionException | ParseException | PlaylistException e) { - LOG.error("Couldn't update segment playlist url. This might cause a premature download termination", e); - } - } -} diff --git a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminModel.java b/common/src/main/java/ctbrec/sites/jasmin/LiveJasminModel.java index 4bf68103..6e786311 100644 --- a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminModel.java +++ b/common/src/main/java/ctbrec/sites/jasmin/LiveJasminModel.java @@ -17,10 +17,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Locale; -import java.util.Random; +import java.util.*; import java.util.concurrent.ExecutionException; import static ctbrec.io.HttpConstants.*; @@ -129,6 +126,7 @@ public class LiveJasminModel extends AbstractModel { LiveJasminStreamRegistration liveJasminStreamRegistration = new LiveJasminStreamRegistration(site, modelInfo); List streamSources = liveJasminStreamRegistration.getStreamSources(); + Collections.sort(streamSources); return streamSources; } @@ -159,10 +157,8 @@ public class LiveJasminModel extends AbstractModel { } catch (IOException e) { throw new ExecutionException(e); } - return resolution; - } else { - return resolution; } + return resolution; } @Override diff --git a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminStreamRegistration.java b/common/src/main/java/ctbrec/sites/jasmin/LiveJasminStreamRegistration.java index 8bca0b5d..741f2c18 100644 --- a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminStreamRegistration.java +++ b/common/src/main/java/ctbrec/sites/jasmin/LiveJasminStreamRegistration.java @@ -20,6 +20,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import static ctbrec.io.HttpConstants.USER_AGENT; import static java.nio.charset.StandardCharsets.UTF_8; @@ -35,6 +36,7 @@ public class LiveJasminStreamRegistration { private final CyclicBarrier barrier = new CyclicBarrier(2); private int streamCount = 0; + private WebSocket webSocket; public LiveJasminStreamRegistration(Site site, LiveJasminModelInfo modelInfo) { this.site = site; @@ -49,9 +51,10 @@ public class LiveJasminStreamRegistration { .addHeader(USER_AGENT, Config.getInstance().getSettings().httpUserAgentMobile) .build(); log.debug("Websocket: {}", modelInfo.getWebsocketUrl()); - site.getHttpClient().newWebSocket(webSocketRequest, new WebSocketListener() { + webSocket = site.getHttpClient().newWebSocket(webSocketRequest, new WebSocketListener() { @Override public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) { + Thread.currentThread().setName("Stream registration for " + modelInfo.getPerformerId()); log.debug("onOpen"); JSONObject register = new JSONObject() .put(KEY_EVENT, "register") @@ -150,7 +153,7 @@ public class LiveJasminStreamRegistration { .put("protocols", new JSONArray() .put("h5live") ) - .put("streamId", src.streamId) + .put("streamId", src.getStreamId()) .put("correlationId", UUID.randomUUID().toString().replace("-", "").substring(0, 16)) ) ); @@ -163,7 +166,7 @@ public class LiveJasminStreamRegistration { JSONObject data = message.getJSONArray("data").getJSONArray(0).getJSONObject(0); String streamId = data.getString("streamId"); String wssUrl = data.getJSONObject("protocol").getJSONObject("h5live").getString("wssUrl"); - streamSources.stream().filter(src -> Objects.equals(src.streamId, streamId)).findAny().ifPresent(src -> src.mediaPlaylistUrl = wssUrl); + streamSources.stream().filter(src -> Objects.equals(src.getStreamId(), streamId)).findAny().ifPresent(src -> src.mediaPlaylistUrl = wssUrl); if (--streamCount == 0) { awaitBarrier(); } @@ -197,68 +200,7 @@ public class LiveJasminStreamRegistration { } catch (Exception e) { log.error("Couldn't determine stream sources", e); } - return streamSources.stream().map(StreamSource.class::cast).toList(); - } - - public void keepStreamAlive() { - try { - Request webSocketRequest = new Request.Builder() - .url(modelInfo.getWebsocketUrl()) - .addHeader(USER_AGENT, Config.getInstance().getSettings().httpUserAgentMobile) - .build(); - log.debug("Websocket: {}", modelInfo.getWebsocketUrl()); - site.getHttpClient().newWebSocket(webSocketRequest, new WebSocketListener() { - @Override - public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) { - log.debug("onOpen"); - webSocket.send(new JSONObject().put(KEY_EVENT, "ping").toString()); - } - - @Override - public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) { - log.error("onFailure", t); - webSocket.close(1000, ""); - } - - @Override - public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) { - JSONObject message = new JSONObject(text); - if (message.opt(KEY_EVENT).equals("pong")) { - new Thread(() -> { - try { - Thread.sleep(message.optInt("nextPing")); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - webSocket.send(new JSONObject().put(KEY_EVENT, "ping").toString()); - }).start(); - } - } - - @Override - public void onMessage(@NotNull WebSocket webSocket, @NotNull ByteString bytes) { - log.debug("onMessageB"); - super.onMessage(webSocket, bytes); - } - - @Override - public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) { - log.debug("onClosed {} {}", code, reason); - super.onClosed(webSocket, code, reason); - } - - @Override - public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) { - log.debug("onClosing {} {}", code, reason); - awaitBarrier(); - } - }); - - log.debug("Waiting for websocket to return"); - awaitBarrier(); - } catch (Exception e) { - log.error("Couldn't determine stream sources", e); - } + return streamSources.stream().map(StreamSource.class::cast).collect(Collectors.toList()); // NOSONAR } private void addStreamSource(LinkedList streamSources, String pattern, JSONObject stream) { @@ -286,9 +228,10 @@ public class LiveJasminStreamRegistration { streamSource.width = w; streamSource.height = h; streamSource.bandwidth = bitrate; - streamSource.rtmpUrl = rtmpUrl; - streamSource.streamName = streamName; - streamSource.streamId = streamId; + streamSource.setRtmpUrl(rtmpUrl); + streamSource.setStreamName(streamName); + streamSource.setStreamId(streamId); + streamSource.setStreamRegistration(this); streamSources.add(streamSource); } @@ -302,4 +245,8 @@ public class LiveJasminStreamRegistration { log.error(e.getLocalizedMessage(), e); } } + + void close() { + webSocket.close(1000, ""); + } } diff --git a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminStreamSource.java b/common/src/main/java/ctbrec/sites/jasmin/LiveJasminStreamSource.java index 21d8a9ad..9b7a8703 100644 --- a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminStreamSource.java +++ b/common/src/main/java/ctbrec/sites/jasmin/LiveJasminStreamSource.java @@ -1,9 +1,14 @@ package ctbrec.sites.jasmin; import ctbrec.recorder.download.StreamSource; +import lombok.Getter; +import lombok.Setter; +@Getter +@Setter public class LiveJasminStreamSource extends StreamSource { - public String rtmpUrl; - public String streamName; - public String streamId; + private String rtmpUrl; + private String streamName; + private String streamId; + private LiveJasminStreamRegistration streamRegistration; } diff --git a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminStreamStarter.java b/common/src/main/java/ctbrec/sites/jasmin/LiveJasminStreamStarter.java deleted file mode 100644 index 6a09d498..00000000 --- a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminStreamStarter.java +++ /dev/null @@ -1,107 +0,0 @@ -package ctbrec.sites.jasmin; - -import ctbrec.Config; -import ctbrec.sites.Site; -import okhttp3.Request; -import okhttp3.Response; -import okhttp3.WebSocket; -import okhttp3.WebSocketListener; -import okio.ByteString; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URLEncoder; -import java.util.Optional; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static ctbrec.io.HttpConstants.USER_AGENT; -import static java.nio.charset.StandardCharsets.UTF_8; - -public class LiveJasminStreamStarter { - - private static final Logger LOG = LoggerFactory.getLogger(LiveJasminStreamStarter.class); - private final CyclicBarrier barrier = new CyclicBarrier(2); - - void start(Site site, LiveJasminModelInfo modelInfo, LiveJasminStreamSource ss) { - try { - String websocketUrl = "wss://dss-live-{ipWithDashes}.dditscdn.com/h5live/http/playlist.m3u8?url={rtmpUrl}&stream={streamName}" - .replace("{ipWithDashes}", modelInfo.getSbIp().replace('.', '-')) - .replace("{rtmpUrl}", URLEncoder.encode(ss.rtmpUrl, UTF_8)) - .replace("{streamName}", URLEncoder.encode(ss.streamName, UTF_8)); - - Request webSocketRequest = new Request.Builder() - .url(websocketUrl) - .addHeader(USER_AGENT, Config.getInstance().getSettings().httpUserAgentMobile) - .build(); - LOG.debug("Websocket: {}", websocketUrl); - site.getHttpClient().newWebSocket(webSocketRequest, new WebSocketListener() { - @Override - public void onOpen(@NotNull WebSocket webSocket, @NotNull Response response) { - LOG.debug("onOpen"); - } - - @Override - public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) { - String body = Optional.ofNullable(response).map(Response::body).map(responseBody -> { - try { - return responseBody.string(); - } catch (IOException e) { - return ""; - } - }).orElse(""); - LOG.error("onFailure Body:[{}]", body, t); - awaitBarrier(); - webSocket.close(1000, ""); - } - - @Override - public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) { - LOG.debug("{}", new JSONObject(text).toString(2)); - webSocket.close(1000, ""); - } - - @Override - public void onMessage(@NotNull WebSocket webSocket, @NotNull ByteString bytes) { - LOG.trace("onMessageB"); - super.onMessage(webSocket, bytes); - } - - @Override - public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) { - LOG.debug("onClosed {} {}", code, reason); - super.onClosed(webSocket, code, reason); - } - - @Override - public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) { - LOG.trace("onClosing {} {}", code, reason); - awaitBarrier(); - } - }); - - LOG.debug("Waiting for websocket to return"); - awaitBarrier(); - LOG.debug("Websocket is done."); - } catch (Exception e) { - LOG.error("Couldn't start stream", e); - } - } - - private void awaitBarrier() { - try { - barrier.await(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error(e.getLocalizedMessage(), e); - } catch (TimeoutException | BrokenBarrierException e) { - LOG.error(e.getLocalizedMessage(), e); - } - } -} diff --git a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminWebrtcDownload.java b/common/src/main/java/ctbrec/sites/jasmin/LiveJasminWebrtcDownload.java index b05ba20e..5be8bae5 100644 --- a/common/src/main/java/ctbrec/sites/jasmin/LiveJasminWebrtcDownload.java +++ b/common/src/main/java/ctbrec/sites/jasmin/LiveJasminWebrtcDownload.java @@ -9,6 +9,7 @@ import ctbrec.io.BandwidthMeter; import ctbrec.io.HttpClient; import ctbrec.recorder.download.AbstractDownload; import ctbrec.recorder.download.RecordingProcess; +import ctbrec.recorder.download.StreamSource; import ctbrec.sites.showup.Showup; import okhttp3.Request; import okhttp3.Response; @@ -25,6 +26,7 @@ import java.io.IOException; import java.nio.file.Files; import java.time.Duration; import java.time.Instant; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.regex.Pattern; @@ -141,8 +143,11 @@ public class LiveJasminWebrtcDownload extends AbstractDownload { private void startDownload() throws IOException, PlaylistException, ParseException, ExecutionException { LiveJasminModel liveJasminModel = (LiveJasminModel) model; + List streamSources = liveJasminModel.getStreamSources(); + LiveJasminStreamSource streamSource = (LiveJasminStreamSource) selectStreamSource(streamSources); + LiveJasminStreamRegistration streamRegistration = streamSource.getStreamRegistration(); Request request = new Request.Builder() - .url(liveJasminModel.getStreamSources().get(0).getMediaPlaylistUrl()) + .url(streamSource.getMediaPlaylistUrl()) .header(USER_AGENT, Config.getInstance().getSettings().httpUserAgent) .header(ACCEPT, "*/*") .header(ACCEPT_LANGUAGE, "pl") @@ -202,6 +207,7 @@ public class LiveJasminWebrtcDownload extends AbstractDownload { if (response != null) { response.close(); } + streamRegistration.close(); } @Override @@ -215,7 +221,26 @@ public class LiveJasminWebrtcDownload extends AbstractDownload { super.onClosed(webSocket, code, reason); LOG.debug("Websocket closed for model {} {} {}", model, code, reason); stop(); + streamRegistration.close(); } }); } + + @Override + public void awaitEnd() { + long secondsToWait = 30; + for (int i = 0; i < secondsToWait; i++) { + if (ws == null) { + break; + } else { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while waiting for the download to terminate"); + } + } + } + LOG.warn("Download didn't finish in {} seconds", secondsToWait); + } }