Change debug mechanisms for HLS

This commit is contained in:
0xb00bface 2021-01-23 13:38:57 +01:00
parent 3202d5d2cd
commit 9be4c07049
2 changed files with 56 additions and 100 deletions

View File

@ -1,71 +0,0 @@
package ctbrec.io;
import java.time.Duration;
import java.time.Instant;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ctbrec.Model;
import ctbrec.UnknownModel;
public class MissedSegmentsStatistics {
private static final Logger LOG = LoggerFactory.getLogger(MissedSegmentsStatistics.class);
private static Map<Model, Short> missegSegmentsCount = new HashMap<>();
private static Instant lastOutput = Instant.EPOCH;
private static Instant lastclear = Instant.now();
private static Thread t;
static {
increase(new UnknownModel(), (short) 0);
}
private MissedSegmentsStatistics() {}
public static void increase(Model model, short amount) {
if (Duration.between(lastclear, Instant.now()).toHours() > 24) {
missegSegmentsCount.clear();
LOG.debug("Missed segments statistics cleared");
}
short total = missegSegmentsCount.getOrDefault(model, (short) 0);
missegSegmentsCount.put(model, (short) (total + amount));
if (t == null) {
t = new Thread(() -> {
while (true) {
printStatistics();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
});
t.setName("MissingSegments logger");
t.setPriority(Thread.MIN_PRIORITY);
t.setDaemon(true);
t.start();
}
}
private static void printStatistics() {
Instant now = Instant.now();
StringBuilder sb = new StringBuilder("Missed segments stats:\n");
if (Duration.between(lastOutput, now).getSeconds() > 120) {
try {
for (Entry<Model, Short> entry : missegSegmentsCount.entrySet()) {
sb.append('\t').append(entry.getKey().getName()).append(": ").append(entry.getValue()).append('\n');
}
LOG.debug(sb.toString());
lastOutput = now;
} catch(ConcurrentModificationException e) {
// ignore
}
}
}
}

View File

@ -3,25 +3,34 @@ package ctbrec.recorder.download.hls;
import static ctbrec.io.HttpConstants.*; import static ctbrec.io.HttpConstants.*;
import static ctbrec.recorder.download.StreamSource.*; import static ctbrec.recorder.download.StreamSource.*;
import static java.nio.charset.StandardCharsets.*; import static java.nio.charset.StandardCharsets.*;
import static java.nio.file.StandardOpenOption.*;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.EOFException; import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.URL; import java.net.URL;
import java.nio.file.Files;
import java.text.DecimalFormat; import java.text.DecimalFormat;
import java.text.NumberFormat; import java.text.NumberFormat;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -46,11 +55,11 @@ import com.iheartradio.m3u8.data.TrackData;
import ctbrec.Config; import ctbrec.Config;
import ctbrec.Model; import ctbrec.Model;
import ctbrec.Model.State; import ctbrec.Model.State;
import ctbrec.StringUtil;
import ctbrec.io.BandwidthMeter; import ctbrec.io.BandwidthMeter;
import ctbrec.io.HttpClient; import ctbrec.io.HttpClient;
import ctbrec.io.HttpConstants; import ctbrec.io.HttpConstants;
import ctbrec.io.HttpException; import ctbrec.io.HttpException;
import ctbrec.io.MissedSegmentsStatistics;
import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException; import ctbrec.recorder.PlaylistGenerator.InvalidPlaylistException;
import ctbrec.recorder.download.AbstractDownload; import ctbrec.recorder.download.AbstractDownload;
import ctbrec.recorder.download.HttpHeaderFactory; import ctbrec.recorder.download.HttpHeaderFactory;
@ -64,6 +73,7 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
private static final transient Logger LOG = LoggerFactory.getLogger(AbstractHlsDownload.class); private static final transient Logger LOG = LoggerFactory.getLogger(AbstractHlsDownload.class);
private static final int TEN_SECONDS = 10_000; private static final int TEN_SECONDS = 10_000;
private static final boolean DEBUG_HLS = Objects.equals(System.getenv("CTBREC_DEBUG_HLS"), "true");
private transient NumberFormat nf = new DecimalFormat("000000"); private transient NumberFormat nf = new DecimalFormat("000000");
private transient int playlistEmptyCount = 0; private transient int playlistEmptyCount = 0;
@ -76,14 +86,12 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
protected transient int nextSegmentNumber = 0; protected transient int nextSegmentNumber = 0;
protected transient String segmentPlaylistUrl; protected transient String segmentPlaylistUrl;
private transient String previousPlaylist;
private transient String lastPlaylist;
private transient Instant previousPlaylistRequest = Instant.EPOCH;
private transient Instant afterLastPlaylistRequest= Instant.EPOCH;
private transient Instant beforeLastPlaylistRequest= Instant.EPOCH; private transient Instant beforeLastPlaylistRequest= Instant.EPOCH;
private transient int consecutivePlaylistTimeouts = 0; private transient int consecutivePlaylistTimeouts = 0;
private transient int consecutivePlaylistErrors = 0; private transient int consecutivePlaylistErrors = 0;
private transient List<RecordingEvent> recordingEvents = new LinkedList<>();
protected AbstractHlsDownload(HttpClient client) { protected AbstractHlsDownload(HttpClient client) {
this.client = client; this.client = client;
} }
@ -99,15 +107,13 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
segmentPlaylistUrl = getSegmentPlaylistUrl(model); segmentPlaylistUrl = getSegmentPlaylistUrl(model);
} }
previousPlaylistRequest = beforeLastPlaylistRequest;
beforeLastPlaylistRequest = Instant.now(); beforeLastPlaylistRequest = Instant.now();
SegmentPlaylist segmentPlaylist = getNextSegments(segmentPlaylistUrl); SegmentPlaylist segmentPlaylist = getNextSegments(segmentPlaylistUrl);
afterLastPlaylistRequest = Instant.now();
emptyPlaylistCheck(segmentPlaylist); emptyPlaylistCheck(segmentPlaylist);
handleMissedSegments(segmentPlaylist, nextSegmentNumber); handleMissedSegments(segmentPlaylist, nextSegmentNumber);
enqueueNewSegments(segmentPlaylist, nextSegmentNumber); enqueueNewSegments(segmentPlaylist, nextSegmentNumber);
splitRecordingIfNecessary(); splitRecordingIfNecessary();
calculateRescheduleTime(segmentPlaylist, lastSegmentNumber); calculateRescheduleTime();
// this if check makes sure, that we don't decrease nextSegment. for some reason // this if check makes sure, that we don't decrease nextSegment. for some reason
// streamate playlists sometimes jump back. e.g. max sequence = 79 -> 80 -> 79 // streamate playlists sometimes jump back. e.g. max sequence = 79 -> 80 -> 79
@ -115,6 +121,10 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
if (lastSegmentNumber + segmentPlaylist.segments.size() > nextSegmentNumber) { if (lastSegmentNumber + segmentPlaylist.segments.size() > nextSegmentNumber) {
nextSegmentNumber = lastSegmentNumber + segmentPlaylist.segments.size(); nextSegmentNumber = lastSegmentNumber + segmentPlaylist.segments.size();
} }
while (recordingEvents.size() > 30) {
recordingEvents.remove(0);
}
} catch (ParseException e) { } catch (ParseException e) {
LOG.error("Couldn't parse HLS playlist for model {}\n{}", model, e.getInput(), e); LOG.error("Couldn't parse HLS playlist for model {}\n{}", model, e.getInput(), e);
stop(); stop();
@ -212,6 +222,8 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
} }
protected SegmentPlaylist getNextSegments(String segmentPlaylistUrl) throws IOException, ParseException, PlaylistException { protected SegmentPlaylist getNextSegments(String segmentPlaylistUrl) throws IOException, ParseException, PlaylistException {
Instant start = Instant.now();
recordingEvents.add(RecordingEvent.of("Playlist request"));
URL segmentsUrl = new URL(segmentPlaylistUrl); URL segmentsUrl = new URL(segmentPlaylistUrl);
Builder builder = new Request.Builder() Builder builder = new Request.Builder()
.url(segmentsUrl); .url(segmentsUrl);
@ -222,8 +234,6 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
if (response.isSuccessful()) { if (response.isSuccessful()) {
consecutivePlaylistTimeouts = 0; consecutivePlaylistTimeouts = 0;
String body = response.body().string(); String body = response.body().string();
previousPlaylist = lastPlaylist;
lastPlaylist = beforeLastPlaylistRequest.toString()+"\n"+body;
if (!body.contains("#EXTINF")) { if (!body.contains("#EXTINF")) {
// no segments, empty playlist // no segments, empty playlist
return new SegmentPlaylist(segmentPlaylistUrl); return new SegmentPlaylist(segmentPlaylistUrl);
@ -234,13 +244,18 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
InputStream inputStream = new ByteArrayInputStream(bytes); InputStream inputStream = new ByteArrayInputStream(bytes);
SegmentPlaylist playList = parsePlaylist(segmentPlaylistUrl, inputStream); SegmentPlaylist playList = parsePlaylist(segmentPlaylistUrl, inputStream);
consecutivePlaylistErrors = 0; consecutivePlaylistErrors = 0;
recordingEvents.add(RecordingEvent.of("Sequence: " + StringUtil.grep(body, "MEDIA-SEQUENCE")));
recordingEvents.add(RecordingEvent.of("Playlist downloaded in " + (Duration.between(start, Instant.now()).toMillis()) + "ms: "
+ StringUtil.grep(body, "X-PROGRAM-DATE-TIME")));
return playList; return playList;
} else { } else {
recordingEvents.add(RecordingEvent.of("HTTP code " + response.code()));
throw new HttpException(response.code(), response.message()); throw new HttpException(response.code(), response.message());
} }
} catch (SocketTimeoutException e) { } catch (SocketTimeoutException e) {
LOG.debug("Playlist request timed out for model {} {} time{}", model, ++consecutivePlaylistTimeouts, (consecutivePlaylistTimeouts > 1) ? 's' : ""); LOG.debug("Playlist request timed out for model {} {} time{}", model, ++consecutivePlaylistTimeouts, (consecutivePlaylistTimeouts > 1) ? 's' : "");
// times out, return an empty playlist, so that the process can continue without wasting much more time // times out, return an empty playlist, so that the process can continue without wasting much more time
recordingEvents.add(RecordingEvent.of("Playlist request timed out " + consecutivePlaylistTimeouts));
throw new PlaylistTimeoutException(e); throw new PlaylistTimeoutException(e);
} catch (Exception e) { } catch (Exception e) {
consecutivePlaylistErrors++; consecutivePlaylistErrors++;
@ -296,14 +311,24 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
} }
} }
private void handleMissedSegments(SegmentPlaylist playlist, int nextSegmentNumber) { private void handleMissedSegments(SegmentPlaylist playlist, int nextSegmentNumber) throws IOException {
if (nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) { if (nextSegmentNumber > 0 && playlist.seq > nextSegmentNumber) {
LOG.warn("Missed segments: {} < {} in download for {}", nextSegmentNumber, playlist.seq, model); recordingEvents.add(RecordingEvent.of("Missed segments: "+nextSegmentNumber+" < " + playlist.seq));
LOG.warn("Missed segments: Last 2 [{}] [{}], schedule was {}, request took {} ms", previousPlaylistRequest, beforeLastPlaylistRequest, rescheduleTime, Duration.between(beforeLastPlaylistRequest, afterLastPlaylistRequest)); if (DEBUG_HLS) {
LOG.warn("Missed segments: previous playlist\n{}", previousPlaylist); File hlsEventsFile = File.createTempFile("/tmp/rec_evt_" + Instant.now() + "_" + model.getSanitizedNamed(), ".log");
LOG.warn("Missed segments: last playlist\n{}", lastPlaylist); try (OutputStream outputStream = Files.newOutputStream(hlsEventsFile.toPath(), CREATE, WRITE, TRUNCATE_EXISTING)) {
short missedSegments = (short) (playlist.seq - nextSegmentNumber); BufferedWriter br = new BufferedWriter(new OutputStreamWriter(outputStream));
MissedSegmentsStatistics.increase(model, missedSegments); DateTimeFormatter dtf = DateTimeFormatter.ISO_LOCAL_TIME;
for (RecordingEvent recordingEvent : recordingEvents) {
ZonedDateTime dateTime = recordingEvent.timestamp.atZone(ZoneId.systemDefault());
br.append(dtf.format(dateTime) + " " + model + " " + recordingEvent.message);
br.newLine();
}
br.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
} }
} }
@ -329,19 +354,9 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
} }
} }
private void calculateRescheduleTime(SegmentPlaylist playlist, int lastSegmentNumber) { private void calculateRescheduleTime() {
// long waitForMillis = 0;
// if (lastSegmentNumber == playlist.seq) {
// // playlist didn't change -> wait for at least half the target duration
// waitForMillis = (long) playlist.avgSegDuration * 1000 / 2;
// LOG.trace("Playlist didn't change. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis);
// } else {
// // playlist did change -> wait for at least the target duration
// waitForMillis = (long) (playlist.avgSegDuration * 1000);
// LOG.trace("Playlist changed. Average duration is {}. Waiting for {}ms", playlist.avgSegDuration, waitForMillis);
// }
// rescheduleTime = beforeLastPlaylistRequest.plusMillis(waitForMillis);
rescheduleTime = beforeLastPlaylistRequest.plusMillis(1000); rescheduleTime = beforeLastPlaylistRequest.plusMillis(1000);
recordingEvents.add(RecordingEvent.of("next playlist download scheduled for " + rescheduleTime.toString()));
} }
/** /**
@ -382,4 +397,16 @@ public abstract class AbstractHlsDownload extends AbstractDownload {
public boolean isRunning() { public boolean isRunning() {
return running; return running;
} }
private static class RecordingEvent {
Instant timestamp;
String message;
public static RecordingEvent of(String message) {
RecordingEvent evt = new RecordingEvent();
evt.timestamp = Instant.now();
evt.message = message;
return evt;
}
}
} }