package ctbrec.sites.dreamcam; import lombok.extern.slf4j.Slf4j; import ctbrec.*; import ctbrec.io.BandwidthMeter; import ctbrec.io.HttpClient; import ctbrec.recorder.FFmpeg; import ctbrec.recorder.download.AbstractDownload; import ctbrec.recorder.download.ProcessExitedUncleanException; import ctbrec.recorder.download.RecordingProcess; import okhttp3.Request; import okhttp3.Response; import okhttp3.WebSocket; import okhttp3.WebSocketListener; import okio.ByteString; import org.json.JSONObject; import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.nio.file.Files; import java.time.Duration; import java.time.Instant; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; import static ctbrec.io.HttpConstants.*; @Slf4j public class DreamcamDownload extends AbstractDownload { private static final int MAX_SECONDS_WITHOUT_TRANSFER = 30; private final HttpClient httpClient; private Instant timeOfLastTransfer = Instant.MAX; protected File targetFile; protected FFmpeg ffmpeg; protected Process ffmpegProcess; protected OutputStream ffmpegStdIn; protected Lock ffmpegStreamLock = new ReentrantLock(); protected String wsUrl; private volatile boolean running; private volatile boolean started; private final transient Object monitor = new Object(); private WebSocket ws; private DreamcamModel model; public DreamcamDownload(HttpClient httpClient) { this.httpClient = httpClient; } @Override public void init(Config config, Model model, Instant startTime, ExecutorService executorService) throws IOException { super.init(config, model, startTime, executorService); this.model = (DreamcamModel) model; timeOfLastTransfer = startTime; String fileSuffix = config.getSettings().ffmpegFileSuffix; targetFile = config.getFileForRecording(model, fileSuffix, startTime); createTargetDirectory(); startFfmpegProcess(targetFile); if (ffmpegProcess == null) { throw new ProcessExitedUncleanException("Couldn't spawn FFmpeg"); } } @Override public int getSelectedResolution() { return 0; } @Override public void stop() { if (running) { internalStop(); } } private synchronized void internalStop() { running = false; if (ws != null) { ws.close(1000, null); ws = null; } if (ffmpegStdIn != null) { try { ffmpegStdIn.close(); } catch (IOException e) { log.error("Couldn't terminate FFmpeg by closing stdin", e); } } if (ffmpegProcess != null) { try { boolean waitFor = ffmpegProcess.waitFor(45, TimeUnit.SECONDS); if (!waitFor && ffmpegProcess.isAlive()) { ffmpegProcess.destroy(); if (ffmpegProcess.isAlive()) { log.info("FFmpeg didn't terminate. Destroying the process with force!"); ffmpegProcess.destroyForcibly(); ffmpegProcess = null; } } } catch (InterruptedException e) { log.error("Interrupted while waiting for FFmpeg to terminate"); Thread.currentThread().interrupt(); } } } private void startFfmpegProcess(File target) { try { String[] cmdline = prepareCommandLine(target); ffmpeg = new FFmpeg.Builder() .logOutput(config.getSettings().logFFmpegOutput) .onStarted(p -> { ffmpegProcess = p; ffmpegStdIn = ffmpegProcess.getOutputStream(); }) .build(); ffmpeg.exec(cmdline, new String[0], target.getParentFile()); } catch (IOException | ProcessExitedUncleanException e) { log.error("Error in FFmpeg thread", e); } } private String[] prepareCommandLine(File target) { String[] args = config.getSettings().ffmpegMergedDownloadArgs.split(" "); String[] argsPlusFile = new String[args.length + 3]; int i = 0; argsPlusFile[i++] = "-i"; argsPlusFile[i++] = "-"; System.arraycopy(args, 0, argsPlusFile, i, args.length); argsPlusFile[argsPlusFile.length - 1] = target.getAbsolutePath(); return OS.getFFmpegCommand(argsPlusFile); } @Override public void finalizeDownload() { internalStop(); } @Override public boolean isRunning() { return running; } @Override public void postProcess(Recording recording) { // nothing to do } @Override public File getTarget() { return targetFile; } @Override public String getPath(Model model) { String absolutePath = targetFile.getAbsolutePath(); String recordingsDir = Config.getInstance().getSettings().recordingsDir; String relativePath = absolutePath.replaceFirst(Pattern.quote(recordingsDir), ""); return relativePath; } @Override public boolean isSingleFile() { return true; } @Override public long getSizeInByte() { return getTarget().length(); } @Override public RecordingProcess call() throws Exception { try { if (!ffmpegProcess.isAlive()) { running = false; int exitValue = ffmpegProcess.exitValue(); ffmpeg.shutdown(exitValue); } } catch (ProcessExitedUncleanException e) { log.error("FFmpeg exited unclean", e); internalStop(); } try { if (!started) { started = true; startDownload(); } } catch (Exception e) { log.error("Error while downloading", e); stop(); } if (!model.isOnline()) { log.debug("Model {} not online. Stop recording.", model); stop(); } if (splittingStrategy.splitNecessary(this)) { log.debug("Split necessary for model {}. Stop recording.", model); internalStop(); rescheduleTime = Instant.now(); } else { rescheduleTime = Instant.now().plusSeconds(5); } if (Duration.between(timeOfLastTransfer, Instant.now()).getSeconds() > MAX_SECONDS_WITHOUT_TRANSFER) { log.debug("No video data received for {} seconds. Stopping recording for model {}", MAX_SECONDS_WITHOUT_TRANSFER, model); stop(); } return this; } private void startDownload() { downloadExecutor.submit(() -> { running = true; ffmpegStreamLock.lock(); try { wsUrl = model.getWsUrl(); log.debug("{} ws url: {}", model.getName(), wsUrl); if (StringUtil.isBlank(wsUrl)) { log.error("{}: Stream URL not found", model); stop(); return; } Request request = new Request.Builder() .url(wsUrl) .header(USER_AGENT, Config.getInstance().getSettings().httpUserAgent) .header(ACCEPT, "*/*") .header(ACCEPT_LANGUAGE, "en") .header(ORIGIN, model.getSite().getBaseUrl()) .header(REFERER, model.getSite().getBaseUrl() + "/") .build(); ws = httpClient.newWebSocket(request, new WebSocketListener() { @Override public void onOpen(WebSocket webSocket, Response response) { super.onOpen(webSocket, response); log.debug("{}: Websocket open", model); if (response != null) { response.close(); } JSONObject msg = new JSONObject(); msg.put("url", "stream/hello"); msg.put("version", "0.0.1"); webSocket.send(msg.toString()); } // onOpen @Override public void onClosed(WebSocket webSocket, int code, String reason) { super.onClosed(webSocket, code, reason); log.trace("{}: Websocket closed", model); stop(); synchronized (monitor) { monitor.notifyAll(); } } // onClosed @Override public void onFailure(WebSocket webSocket, Throwable t, Response response) { super.onFailure(webSocket, t, response); log.debug("{}: Websocket failed: {}", model, t.getMessage()); if (response != null) { response.close(); } stop(); synchronized (monitor) { monitor.notifyAll(); } } // onFailure @Override public void onMessage(WebSocket webSocket, String text) { super.onMessage(webSocket, text); log.trace("{} ws message: {}", model, text); JSONObject message = new JSONObject(text); if (message.optString("url").equals("stream/qual")) { JSONObject msg = new JSONObject(); msg.put("quality", "test"); msg.put("url", "stream/play"); msg.put("version", "0.0.1"); webSocket.send(msg.toString()); } } // onMessage @Override public void onMessage(WebSocket webSocket, ByteString bytes) { super.onMessage(webSocket, bytes); timeOfLastTransfer = Instant.now(); try { if (running) { byte[] videoData = bytes.toByteArray(); ffmpegStdIn.write(videoData); BandwidthMeter.add(videoData.length); } } catch (IOException e) { if (running) { log.error("Couldn't write video stream to file", e); stop(); } } } // onMessage }); // websocket synchronized (monitor) { try { monitor.wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("Interrupted while waiting for the download to terminate"); } } } catch (IOException ex) { if (running) { log.error("Error while downloading: {}", ex.getMessage()); stop(); } } finally { ffmpegStreamLock.unlock(); running = false; } }); // submit } protected void createTargetDirectory() throws IOException { Files.createDirectories(targetFile.getParentFile().toPath()); } }