Add cache for StreamInfo and stream resolution requests

This commit is contained in:
0xboobface 2018-10-03 13:39:03 +02:00
parent 530ce820d5
commit e22658b285
7 changed files with 97 additions and 53 deletions

View File

@ -32,6 +32,9 @@ import okhttp3.RequestBody;
import okhttp3.Response;
public class Model {
private static final transient Logger LOG = LoggerFactory.getLogger(Model.class);
private String url;
private String name;
private String preview;
@ -79,6 +82,7 @@ public class Model {
StreamInfo info;
if(ignoreCache) {
info = Chaturbate.INSTANCE.loadStreamInfo(getName());
LOG.debug("Model {} room status: {}", getName(), info.room_status);
} else {
info = Chaturbate.INSTANCE.getStreamInfo(getName());
}
@ -224,6 +228,7 @@ public class Model {
Moshi moshi = new Moshi.Builder().build();
JsonAdapter<StreamInfo> adapter = moshi.adapter(StreamInfo.class);
StreamInfo streamInfo = adapter.fromJson(content);
streamInfoCache.put(modelName, streamInfo);
return streamInfo;
} else {
int code = response.code();
@ -257,6 +262,7 @@ public class Model {
}
}
}
streamResolutionCache.put(modelName, res);
return res;
}

View File

@ -43,6 +43,7 @@ public class LocalRecorder implements Recorder {
private static final transient Logger LOG = LoggerFactory.getLogger(LocalRecorder.class);
private static final boolean IGNORE_CACHE = true;
private List<Model> followedModels = Collections.synchronizedList(new ArrayList<>());
private List<Model> models = Collections.synchronizedList(new ArrayList<>());
private Map<Model, Download> recordingProcesses = Collections.synchronizedMap(new HashMap<>());
@ -92,7 +93,6 @@ public class LocalRecorder implements Recorder {
}
models.add(model);
config.getSettings().models.add(model);
onlineMonitor.interrupt();
}
}
@ -199,7 +199,7 @@ public class LocalRecorder implements Recorder {
try {
boolean modelInRecordingList = isRecording(model);
boolean online = model.isOnline();
boolean online = model.isOnline(IGNORE_CACHE);
if (modelInRecordingList && online) {
LOG.info("Restarting recording for model {}", model);
recordingProcesses.remove(model);
@ -231,7 +231,9 @@ public class LocalRecorder implements Recorder {
LOG.debug("Recording terminated for model {}", m.getName());
iterator.remove();
restart.add(m);
finishRecording(d.getDirectory());
try {
finishRecording(d.getDirectory());
} catch(NullPointerException e) {}//fail silently
}
}
for (Model m : restart) {
@ -345,8 +347,7 @@ public class LocalRecorder implements Recorder {
for (Model model : getModelsRecording()) {
try {
if (!recordingProcesses.containsKey(model)) {
boolean ignoreCache = true;
boolean isOnline = model.isOnline(ignoreCache);
boolean isOnline = model.isOnline(IGNORE_CACHE);
LOG.trace("Checking online state for {}: {}", model, (isOnline ? "online" : "offline"));
if (isOnline) {
LOG.info("Model {}'s room back to public. Starting recording", model);
@ -488,7 +489,7 @@ public class LocalRecorder implements Recorder {
}
recordings.add(recording);
} catch (Exception e) {
LOG.debug("Ignoring {}", rec.getAbsolutePath());
LOG.debug("Ignoring {} - {}", rec.getAbsolutePath(), e.getMessage());
}
}
}

View File

@ -1,5 +1,6 @@
package ctbrec.recorder.download;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@ -10,6 +11,9 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.iheartradio.m3u8.Encoding;
import com.iheartradio.m3u8.Format;
import com.iheartradio.m3u8.ParseException;
@ -27,6 +31,8 @@ import okhttp3.Response;
public abstract class AbstractHlsDownload implements Download {
private static final transient Logger LOG = LoggerFactory.getLogger(AbstractHlsDownload.class);
ExecutorService downloadThreadPool = Executors.newFixedThreadPool(5);
HttpClient client;
volatile boolean running = false;
@ -40,9 +46,14 @@ public abstract class AbstractHlsDownload implements Download {
String parseMaster(String url, int streamUrlIndex) throws IOException, ParseException, PlaylistException {
Request request = new Request.Builder().url(url).addHeader("connection", "keep-alive").build();
Response response = client.execute(request);
String playlistContent = "";
try {
InputStream inputStream = response.body().byteStream();
if(response.code() != 200) {
LOG.debug("HTTP response {}, {}\n{}\n{}", response.code(), response.message(), response.headers(), response.body().string());
throw new IOException("HTTP response " + response.code() + " " + response.message());
}
playlistContent = response.body().string();
InputStream inputStream = new ByteArrayInputStream(playlistContent.getBytes());
PlaylistParser parser = new PlaylistParser(inputStream, Format.EXT_M3U, Encoding.UTF_8);
Playlist playlist = parser.parse();
if(playlist.hasMasterPlaylist()) {
@ -62,6 +73,9 @@ public abstract class AbstractHlsDownload implements Download {
}
}
return null;
} catch(Exception e) {
LOG.debug("Playlist: {}", playlistContent, e);
throw e;
} finally {
response.close();
}

View File

@ -41,21 +41,21 @@ public class HlsDownload extends AbstractHlsDownload {
public void start(Model model, Config config) throws IOException {
try {
running = true;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm");
String startTime = sdf.format(new Date());
Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getName());
downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), startTime);
StreamInfo streamInfo = model.getStreamInfo();
if(!Objects.equals(streamInfo.room_status, "public")) {
throw new IOException(model.getName() +"'s room is not public");
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm");
String startTime = sdf.format(new Date());
Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getName());
downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), startTime);
if (!Files.exists(downloadDir, LinkOption.NOFOLLOW_LINKS)) {
Files.createDirectories(downloadDir);
}
String segments = parseMaster(streamInfo.url, model.getStreamUrlIndex());
if(segments != null) {
if (!Files.exists(downloadDir, LinkOption.NOFOLLOW_LINKS)) {
Files.createDirectories(downloadDir);
}
int lastSegment = 0;
int nextSegment = 0;
while(running) {

View File

@ -20,7 +20,6 @@ import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.Callable;
@ -47,6 +46,7 @@ import okhttp3.Response;
public class MergedHlsDownload extends AbstractHlsDownload {
private static final transient Logger LOG = LoggerFactory.getLogger(MergedHlsDownload.class);
private static final boolean IGNORE_CACHE = true;
private BlockingMultiMTSSource multiSource;
private Thread mergeThread;
private Streamer streamer;
@ -63,6 +63,7 @@ public class MergedHlsDownload extends AbstractHlsDownload {
public void start(String segmentPlaylistUri, File targetFile, ProgressListener progressListener) throws IOException {
try {
running = true;
downloadDir = targetFile.getParentFile().toPath();
mergeThread = createMergeThread(targetFile, progressListener, false);
mergeThread.start();
downloadSegments(segmentPlaylistUri, false);
@ -83,17 +84,14 @@ public class MergedHlsDownload extends AbstractHlsDownload {
try {
running = true;
startTime = ZonedDateTime.now();
StreamInfo streamInfo = model.getStreamInfo();
if(!Objects.equals(streamInfo.room_status, "public")) {
throw new IOException(model.getName() +"'s room is not public");
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm");
String startTime = sdf.format(new Date());
Path modelDir = FileSystems.getDefault().getPath(config.getSettings().recordingsDir, model.getName());
downloadDir = FileSystems.getDefault().getPath(modelDir.toString(), startTime);
if (!Files.exists(downloadDir, LinkOption.NOFOLLOW_LINKS)) {
Files.createDirectories(downloadDir);
StreamInfo streamInfo = model.getStreamInfo();
if(!model.isOnline(IGNORE_CACHE)) {
throw new IOException(model.getName() +"'s room is not public");
}
targetFile = Recording.mergedFileFromDirectory(downloadDir.toFile());
@ -102,10 +100,10 @@ public class MergedHlsDownload extends AbstractHlsDownload {
LOG.debug("Splitting recordings every {} seconds", config.getSettings().splitRecordings);
target = new File(targetFile.getAbsolutePath().replaceAll("\\.ts", "-00000.ts"));
}
mergeThread = createMergeThread(target, null, true);
mergeThread.start();
String segments = parseMaster(streamInfo.url, model.getStreamUrlIndex());
mergeThread = createMergeThread(target, null, true);
mergeThread.start();
if(segments != null) {
downloadSegments(segments, true);
} else {
@ -122,7 +120,9 @@ public class MergedHlsDownload extends AbstractHlsDownload {
throw new IOException("Couldn't download segment", e);
} finally {
alive = false;
streamer.stop();
if(streamer != null) {
streamer.stop();
}
LOG.debug("Download for {} terminated", model);
}
}
@ -249,6 +249,9 @@ public class MergedHlsDownload extends AbstractHlsDownload {
FileChannel channel = null;
try {
if (!Files.exists(downloadDir, LinkOption.NOFOLLOW_LINKS)) {
Files.createDirectories(downloadDir);
}
channel = FileChannel.open(targetFile.toPath(), CREATE, WRITE);
MTSSink sink = ByteChannelSink.builder().setByteChannel(channel).build();
@ -269,11 +272,8 @@ public class MergedHlsDownload extends AbstractHlsDownload {
} catch(Exception e) {
LOG.error("Error while saving stream to file", e);
} finally {
try {
channel.close();
} catch (IOException e) {
LOG.error("Error while closing file {}", targetFile);
}
closeFile(channel);
deleteEmptyRecording(targetFile);
}
});
t.setName("Segment Merger Thread");
@ -281,6 +281,27 @@ public class MergedHlsDownload extends AbstractHlsDownload {
return t;
}
private void deleteEmptyRecording(File targetFile) {
try {
if (targetFile.exists() && targetFile.length() == 0) {
Files.delete(targetFile.toPath());
Files.delete(targetFile.getParentFile().toPath());
}
} catch (IOException e) {
LOG.error("Error while deleting empty recording {}", targetFile);
}
}
private void closeFile(FileChannel channel) {
try {
if (channel != null) {
channel.close();
}
} catch (IOException e) {
LOG.error("Error while closing file channel", e);
}
}
private static class SegmentDownload implements Callable<byte[]> {
private URL url;
private HttpClient client;

View File

@ -5,11 +5,13 @@ import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@ -52,6 +54,9 @@ import javafx.util.Duration;
public class RecordedModelsTab extends Tab implements TabSelectionListener {
private static final transient Logger LOG = LoggerFactory.getLogger(RecordedModelsTab.class);
static BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
static ExecutorService threadPool = new ThreadPoolExecutor(2, 2, 10, TimeUnit.MINUTES, queue);
private ScheduledService<List<Model>> updateService;
private Recorder recorder;
@ -149,16 +154,19 @@ public class RecordedModelsTab extends Tab implements TabSelectionListener {
if(models == null) {
return;
}
queue.clear();
for (Model model : models) {
int index = observableModels.indexOf(model);
if (index == -1) {
observableModels.add(new JavaFxModel(model));
} else {
// make sure to update the JavaFX online property, so that the table cell is updated
try {
JavaFxModel javaFxModel = observableModels.get(index);
javaFxModel.getOnlineProperty().set(Objects.equals("public", javaFxModel.getOnlineState()));
} catch (IOException | ExecutionException e) {}
JavaFxModel javaFxModel = observableModels.get(index);
threadPool.submit(() -> {
try {
javaFxModel.getOnlineProperty().set(javaFxModel.isOnline());
} catch (IOException | ExecutionException | InterruptedException e) {}
});
}
}
for (Iterator<JavaFxModel> iterator = observableModels.iterator(); iterator.hasNext();) {

View File

@ -208,9 +208,9 @@ public class ThumbCell extends StackPane {
LOG.error("Coulnd't get resolution for model {}", model, e);
}
} catch (ExecutionException e1) {
LOG.warn("Couldn't update resolution tag for model {} - {}", model.getName(), e1.getCause().getMessage());
LOG.warn("Couldn't update resolution tag for model {}", model.getName(), e1);
} catch (IOException e1) {
LOG.warn("Couldn't update resolution tag for model {} - {}", model.getName(), e1.getMessage());
LOG.warn("Couldn't update resolution tag for model {}", model.getName(), e1);
} finally {
ThumbOverviewTab.resolutionProcessing.remove(model);
}
@ -220,21 +220,13 @@ public class ThumbCell extends StackPane {
private void updateResolutionTag(int[] resolution) throws IOException, ExecutionException {
String _res = "n/a";
Paint resolutionBackgroundColor = resolutionOnlineColor;
if (resolution[1] > 0) {
String state = model.getOnlineState();
if ("public".equals(state)) {
LOG.trace("Model resolution {} {}x{}", model.getName(), resolution[0], resolution[1]);
LOG.trace("Resolution queue size: {}", ThumbOverviewTab.queue.size());
final int w = resolution[1];
_res = Integer.toString(w);
_res = w > 0 ? Integer.toString(w) : state;
} else {
if(model.getOnlineState() != null) {
String state = model.getOnlineState();
Platform.runLater(() -> {
resolutionTag.setText(state);
resolutionTag.setVisible(true);
resolutionBackground.setVisible(true);
resolutionBackground.setWidth(resolutionTag.getBoundsInLocal().getWidth() + 4);
});
}
_res = model.getOnlineState();
resolutionBackgroundColor = resolutionOfflineColor;
}
@ -284,8 +276,8 @@ public class ThumbCell extends StackPane {
// or maybe not, because the player should automatically switch between resolutions depending on the
// network bandwidth
try {
StreamInfo streamInfo = model.getStreamInfo();
if(streamInfo.room_status.equals("public")) {
if(model.isOnline(true)) {
StreamInfo streamInfo = model.getStreamInfo();
LOG.debug("Playing {}", streamInfo.url);
Player.play(streamInfo.url);
} else {
@ -294,7 +286,7 @@ public class ThumbCell extends StackPane {
alert.setHeaderText("Room is currently not public");
alert.showAndWait();
}
} catch (IOException | ExecutionException e1) {
} catch (IOException | ExecutionException | InterruptedException e1) {
LOG.error("Couldn't get stream information for model {}", model, e1);
Alert alert = new AutosizeAlert(Alert.AlertType.ERROR);
alert.setTitle("Error");
@ -346,8 +338,10 @@ public class ThumbCell extends StackPane {
try {
if(start) {
recorder.startRecording(model);
setRecording(true);
} else {
recorder.stopRecording(model);
setRecording(false);
}
} catch (Exception e1) {
LOG.error("Couldn't start/stop recording", e1);