From e22658b2856cf62996a6ce74d7f895d71134e592 Mon Sep 17 00:00:00 2001 From: 0xboobface <0xboobface@gmail.com> Date: Wed, 3 Oct 2018 13:39:03 +0200 Subject: [PATCH] Add cache for StreamInfo and stream resolution requests --- src/main/java/ctbrec/Model.java | 6 +++ .../java/ctbrec/recorder/LocalRecorder.java | 13 ++--- .../download/AbstractHlsDownload.java | 18 ++++++- .../ctbrec/recorder/download/HlsDownload.java | 16 +++--- .../recorder/download/MergedHlsDownload.java | 53 +++++++++++++------ .../java/ctbrec/ui/RecordedModelsTab.java | 18 +++++-- src/main/java/ctbrec/ui/ThumbCell.java | 26 ++++----- 7 files changed, 97 insertions(+), 53 deletions(-) diff --git a/src/main/java/ctbrec/Model.java b/src/main/java/ctbrec/Model.java index 81d948e5..9ca74530 100644 --- a/src/main/java/ctbrec/Model.java +++ b/src/main/java/ctbrec/Model.java @@ -32,6 +32,9 @@ import okhttp3.RequestBody; import okhttp3.Response; public class Model { + + private static final transient Logger LOG = LoggerFactory.getLogger(Model.class); + private String url; private String name; private String preview; @@ -79,6 +82,7 @@ public class Model { StreamInfo info; if(ignoreCache) { info = Chaturbate.INSTANCE.loadStreamInfo(getName()); + LOG.debug("Model {} room status: {}", getName(), info.room_status); } else { info = Chaturbate.INSTANCE.getStreamInfo(getName()); } @@ -224,6 +228,7 @@ public class Model { Moshi moshi = new Moshi.Builder().build(); JsonAdapter adapter = moshi.adapter(StreamInfo.class); StreamInfo streamInfo = adapter.fromJson(content); + streamInfoCache.put(modelName, streamInfo); return streamInfo; } else { int code = response.code(); @@ -257,6 +262,7 @@ public class Model { } } } + streamResolutionCache.put(modelName, res); return res; } diff --git a/src/main/java/ctbrec/recorder/LocalRecorder.java b/src/main/java/ctbrec/recorder/LocalRecorder.java index 38179ddb..a5b2ae27 100644 --- a/src/main/java/ctbrec/recorder/LocalRecorder.java +++ b/src/main/java/ctbrec/recorder/LocalRecorder.java @@ -43,6 +43,7 @@ public class LocalRecorder implements Recorder { private static final transient Logger LOG = LoggerFactory.getLogger(LocalRecorder.class); + private static final boolean IGNORE_CACHE = true; private List followedModels = Collections.synchronizedList(new ArrayList<>()); private List models = Collections.synchronizedList(new ArrayList<>()); private Map recordingProcesses = Collections.synchronizedMap(new HashMap<>()); @@ -92,7 +93,6 @@ public class LocalRecorder implements Recorder { } models.add(model); config.getSettings().models.add(model); - onlineMonitor.interrupt(); } } @@ -199,7 +199,7 @@ public class LocalRecorder implements Recorder { try { boolean modelInRecordingList = isRecording(model); - boolean online = model.isOnline(); + boolean online = model.isOnline(IGNORE_CACHE); if (modelInRecordingList && online) { LOG.info("Restarting recording for model {}", model); recordingProcesses.remove(model); @@ -231,7 +231,9 @@ public class LocalRecorder implements Recorder { LOG.debug("Recording terminated for model {}", m.getName()); iterator.remove(); restart.add(m); - finishRecording(d.getDirectory()); + try { + finishRecording(d.getDirectory()); + } catch(NullPointerException e) {}//fail silently } } for (Model m : restart) { @@ -345,8 +347,7 @@ public class LocalRecorder implements Recorder { for (Model model : getModelsRecording()) { try { if (!recordingProcesses.containsKey(model)) { - boolean ignoreCache = true; - boolean isOnline = model.isOnline(ignoreCache); + boolean isOnline = model.isOnline(IGNORE_CACHE); LOG.trace("Checking online state for {}: {}", model, (isOnline ? "online" : "offline")); if (isOnline) { LOG.info("Model {}'s room back to public. Starting recording", model); @@ -488,7 +489,7 @@ public class LocalRecorder implements Recorder { } recordings.add(recording); } catch (Exception e) { - LOG.debug("Ignoring {}", rec.getAbsolutePath()); + LOG.debug("Ignoring {} - {}", rec.getAbsolutePath(), e.getMessage()); } } } diff --git a/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java b/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java index acfa8990..05952252 100644 --- a/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java +++ b/src/main/java/ctbrec/recorder/download/AbstractHlsDownload.java @@ -1,5 +1,6 @@ package ctbrec.recorder.download; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -10,6 +11,9 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.iheartradio.m3u8.Encoding; import com.iheartradio.m3u8.Format; import com.iheartradio.m3u8.ParseException; @@ -27,6 +31,8 @@ import okhttp3.Response; public abstract class AbstractHlsDownload implements Download { + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractHlsDownload.class); + ExecutorService downloadThreadPool = Executors.newFixedThreadPool(5); HttpClient client; volatile boolean running = false; @@ -40,9 +46,14 @@ public abstract class AbstractHlsDownload implements Download { String parseMaster(String url, int streamUrlIndex) throws IOException, ParseException, PlaylistException { Request request = new Request.Builder().url(url).addHeader("connection", "keep-alive").build(); Response response = client.execute(request); + String playlistContent = ""; try { - InputStream inputStream = response.body().byteStream(); - + if(response.code() != 200) { + LOG.debug("HTTP response {}, {}\n{}\n{}", response.code(), response.message(), response.headers(), response.body().string()); + throw new IOException("HTTP response " + response.code() + " " + response.message()); + } + playlistContent = response.body().string(); + InputStream inputStream = new ByteArrayInputStream(playlistContent.getBytes()); PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8); Playlist playlist = parser.parse(); if(playlist.hasMasterPlaylist()) { @@ -62,6 +73,9 @@ public abstract class AbstractHlsDownload implements Download { } } return null; + } catch(Exception e) { + LOG.debug("Playlist: {}", playlistContent, e); + throw e; } finally { response.close(); } diff --git a/src/main/java/ctbrec/recorder/download/HlsDownload.java b/src/main/java/ctbrec/recorder/download/HlsDownload.java index c8206c3c..5a2546a4 100644 --- a/src/main/java/ctbrec/recorder/download/HlsDownload.java +++ b/src/main/java/ctbrec/recorder/download/HlsDownload.java @@ -41,21 +41,21 @@ public class HlsDownload extends AbstractHlsDownload { public void start(Model model, Config config) throws IOException { try { running = true; + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm"); + String startTime = sdf.format(new Date()); + Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getName()); + downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), startTime); + StreamInfo streamInfo = model.getStreamInfo(); if(!Objects.equals(streamInfo.room_status, "public")) { throw new IOException(model.getName() +"'s room is not public"); } - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm"); - String startTime = sdf.format(new Date()); - Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getName()); - downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), startTime); - if (!Files.exists(downloadDir, LinkOption.NOFOLLOW_LINKS)) { - Files.createDirectories(downloadDir); - } - String segments = parseMaster(streamInfo.url, model.getStreamUrlIndex()); if(segments != null) { + if (!Files.exists(downloadDir, LinkOption.NOFOLLOW_LINKS)) { + Files.createDirectories(downloadDir); + } int lastSegment = 0; int nextSegment = 0; while(running) { diff --git a/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java b/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java index d9c479ab..f5f655d9 100644 --- a/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java +++ b/src/main/java/ctbrec/recorder/download/MergedHlsDownload.java @@ -20,7 +20,6 @@ import java.time.Duration; import java.time.ZonedDateTime; import java.util.Date; import java.util.LinkedList; -import java.util.Objects; import java.util.Queue; import java.util.concurrent.Callable; @@ -47,6 +46,7 @@ import okhttp3.Response; public class MergedHlsDownload extends AbstractHlsDownload { private static final transient Logger LOG = LoggerFactory.getLogger(MergedHlsDownload.class); + private static final boolean IGNORE_CACHE = true; private BlockingMultiMTSSource multiSource; private Thread mergeThread; private Streamer streamer; @@ -63,6 +63,7 @@ public class MergedHlsDownload extends AbstractHlsDownload { public void start(String segmentPlaylistUri, File targetFile, ProgressListener progressListener) throws IOException { try { running = true; + downloadDir = targetFile.getParentFile().toPath(); mergeThread = createMergeThread(targetFile, progressListener, false); mergeThread.start(); downloadSegments(segmentPlaylistUri, false); @@ -83,17 +84,14 @@ public class MergedHlsDownload extends AbstractHlsDownload { try { running = true; startTime = ZonedDateTime.now(); - StreamInfo streamInfo = model.getStreamInfo(); - if(!Objects.equals(streamInfo.room_status, "public")) { - throw new IOException(model.getName() +"'s room is not public"); - } - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm"); String startTime = sdf.format(new Date()); Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getName()); downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), startTime); - if (!Files.exists(downloadDir, LinkOption.NOFOLLOW_LINKS)) { - Files.createDirectories(downloadDir); + + StreamInfo streamInfo = model.getStreamInfo(); + if(!model.isOnline(IGNORE_CACHE)) { + throw new IOException(model.getName() +"'s room is not public"); } targetFile = Recording.mergedFileFromDirectory(downloadDir.toFile()); @@ -102,10 +100,10 @@ public class MergedHlsDownload extends AbstractHlsDownload { LOG.debug("Splitting recordings every {} seconds", config.getSettings().splitRecordings); target = new File(targetFile.getAbsolutePath().replaceAll("\\.ts", "-00000.ts")); } - mergeThread = createMergeThread(target, null, true); - mergeThread.start(); String segments = parseMaster(streamInfo.url, model.getStreamUrlIndex()); + mergeThread = createMergeThread(target, null, true); + mergeThread.start(); if(segments != null) { downloadSegments(segments, true); } else { @@ -122,7 +120,9 @@ public class MergedHlsDownload extends AbstractHlsDownload { throw new IOException("Couldn't download segment", e); } finally { alive = false; - streamer.stop(); + if(streamer != null) { + streamer.stop(); + } LOG.debug("Download for {} terminated", model); } } @@ -249,6 +249,9 @@ public class MergedHlsDownload extends AbstractHlsDownload { FileChannel channel = null; try { + if (!Files.exists(downloadDir, LinkOption.NOFOLLOW_LINKS)) { + Files.createDirectories(downloadDir); + } channel = FileChannel.open(targetFile.toPath(), CREATE, WRITE); MTSSink sink = ByteChannelSink.builder().setByteChannel(channel).build(); @@ -269,11 +272,8 @@ public class MergedHlsDownload extends AbstractHlsDownload { } catch(Exception e) { LOG.error("Error while saving stream to file", e); } finally { - try { - channel.close(); - } catch (IOException e) { - LOG.error("Error while closing file {}", targetFile); - } + closeFile(channel); + deleteEmptyRecording(targetFile); } }); t.setName("Segment Merger Thread"); @@ -281,6 +281,27 @@ public class MergedHlsDownload extends AbstractHlsDownload { return t; } + private void deleteEmptyRecording(File targetFile) { + try { + if (targetFile.exists() && targetFile.length() == 0) { + Files.delete(targetFile.toPath()); + Files.delete(targetFile.getParentFile().toPath()); + } + } catch (IOException e) { + LOG.error("Error while deleting empty recording {}", targetFile); + } + } + + private void closeFile(FileChannel channel) { + try { + if (channel != null) { + channel.close(); + } + } catch (IOException e) { + LOG.error("Error while closing file channel", e); + } + } + private static class SegmentDownload implements Callable { private URL url; private HttpClient client; diff --git a/src/main/java/ctbrec/ui/RecordedModelsTab.java b/src/main/java/ctbrec/ui/RecordedModelsTab.java index cde8e52f..9a0bc08d 100644 --- a/src/main/java/ctbrec/ui/RecordedModelsTab.java +++ b/src/main/java/ctbrec/ui/RecordedModelsTab.java @@ -5,11 +5,13 @@ import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.util.Iterator; import java.util.List; -import java.util.Objects; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -52,6 +54,9 @@ import javafx.util.Duration; public class RecordedModelsTab extends Tab implements TabSelectionListener { private static final transient Logger LOG = LoggerFactory.getLogger(RecordedModelsTab.class); + static BlockingQueue queue = new LinkedBlockingQueue<>(); + static ExecutorService threadPool = new ThreadPoolExecutor(2, 2, 10, TimeUnit.MINUTES, queue); + private ScheduledService> updateService; private Recorder recorder; @@ -149,16 +154,19 @@ public class RecordedModelsTab extends Tab implements TabSelectionListener { if(models == null) { return; } + queue.clear(); for (Model model : models) { int index = observableModels.indexOf(model); if (index == -1) { observableModels.add(new JavaFxModel(model)); } else { // make sure to update the JavaFX online property, so that the table cell is updated - try { - JavaFxModel javaFxModel = observableModels.get(index); - javaFxModel.getOnlineProperty().set(Objects.equals("public", javaFxModel.getOnlineState())); - } catch (IOException | ExecutionException e) {} + JavaFxModel javaFxModel = observableModels.get(index); + threadPool.submit(() -> { + try { + javaFxModel.getOnlineProperty().set(javaFxModel.isOnline()); + } catch (IOException | ExecutionException | InterruptedException e) {} + }); } } for (Iterator iterator = observableModels.iterator(); iterator.hasNext();) { diff --git a/src/main/java/ctbrec/ui/ThumbCell.java b/src/main/java/ctbrec/ui/ThumbCell.java index b8b7794a..b527e32f 100644 --- a/src/main/java/ctbrec/ui/ThumbCell.java +++ b/src/main/java/ctbrec/ui/ThumbCell.java @@ -208,9 +208,9 @@ public class ThumbCell extends StackPane { LOG.error("Coulnd't get resolution for model {}", model, e); } } catch (ExecutionException e1) { - LOG.warn("Couldn't update resolution tag for model {} - {}", model.getName(), e1.getCause().getMessage()); + LOG.warn("Couldn't update resolution tag for model {}", model.getName(), e1); } catch (IOException e1) { - LOG.warn("Couldn't update resolution tag for model {} - {}", model.getName(), e1.getMessage()); + LOG.warn("Couldn't update resolution tag for model {}", model.getName(), e1); } finally { ThumbOverviewTab.resolutionProcessing.remove(model); } @@ -220,21 +220,13 @@ public class ThumbCell extends StackPane { private void updateResolutionTag(int[] resolution) throws IOException, ExecutionException { String _res = "n/a"; Paint resolutionBackgroundColor = resolutionOnlineColor; - if (resolution[1] > 0) { + String state = model.getOnlineState(); + if ("public".equals(state)) { LOG.trace("Model resolution {} {}x{}", model.getName(), resolution[0], resolution[1]); LOG.trace("Resolution queue size: {}", ThumbOverviewTab.queue.size()); final int w = resolution[1]; - _res = Integer.toString(w); + _res = w > 0 ? Integer.toString(w) : state; } else { - if(model.getOnlineState() != null) { - String state = model.getOnlineState(); - Platform.runLater(() -> { - resolutionTag.setText(state); - resolutionTag.setVisible(true); - resolutionBackground.setVisible(true); - resolutionBackground.setWidth(resolutionTag.getBoundsInLocal().getWidth() + 4); - }); - } _res = model.getOnlineState(); resolutionBackgroundColor = resolutionOfflineColor; } @@ -284,8 +276,8 @@ public class ThumbCell extends StackPane { // or maybe not, because the player should automatically switch between resolutions depending on the // network bandwidth try { - StreamInfo streamInfo = model.getStreamInfo(); - if(streamInfo.room_status.equals("public")) { + if(model.isOnline(true)) { + StreamInfo streamInfo = model.getStreamInfo(); LOG.debug("Playing {}", streamInfo.url); Player.play(streamInfo.url); } else { @@ -294,7 +286,7 @@ public class ThumbCell extends StackPane { alert.setHeaderText("Room is currently not public"); alert.showAndWait(); } - } catch (IOException | ExecutionException e1) { + } catch (IOException | ExecutionException | InterruptedException e1) { LOG.error("Couldn't get stream information for model {}", model, e1); Alert alert = new AutosizeAlert(Alert.AlertType.ERROR); alert.setTitle("Error"); @@ -346,8 +338,10 @@ public class ThumbCell extends StackPane { try { if(start) { recorder.startRecording(model); + setRecording(true); } else { recorder.stopRecording(model); + setRecording(false); } } catch (Exception e1) { LOG.error("Couldn't start/stop recording", e1);