From b8cdb2200ef3ff8f493da7aeb2766df0a1d95dce Mon Sep 17 00:00:00 2001 From: 0xboobface <0xboobface@gmail.com> Date: Wed, 25 Dec 2019 15:15:28 +0100 Subject: [PATCH] Remove mpegts library --- common/src/main/java/org/taktik/CHANGELOG.md | 10 - .../java/org/taktik/ioutils/NIOUtils.java | 29 - .../java/org/taktik/mpegts/Constants.java | 6 - .../java/org/taktik/mpegts/MTSPacket.java | 608 ------------------ .../java/org/taktik/mpegts/PATSection.java | 49 -- .../java/org/taktik/mpegts/PMTSection.java | 133 ---- .../java/org/taktik/mpegts/PSISection.java | 90 --- .../java/org/taktik/mpegts/PacketSupport.java | 37 -- .../main/java/org/taktik/mpegts/Streamer.java | 347 ---------- .../java/org/taktik/mpegts/StreamerTest.java | 45 -- .../taktik/mpegts/sinks/ByteChannelSink.java | 43 -- .../java/org/taktik/mpegts/sinks/MTSSink.java | 7 - .../org/taktik/mpegts/sinks/UDPTransport.java | 77 --- .../sources/AbstractByteChannelMTSSource.java | 117 ---- .../mpegts/sources/AbstractMTSSource.java | 31 - .../sources/BlockingMultiMTSSource.java | 135 ---- .../mpegts/sources/ByteChannelMTSSource.java | 33 - .../mpegts/sources/ByteSourceMTSSource.java | 75 --- .../sources/ConcatenatingMTSSource.java | 194 ------ .../mpegts/sources/ContinuityFixer.java | 214 ------ .../sources/ContinuityFixingMTSSource.java | 30 - .../mpegts/sources/FixedBitrateMTSSource.java | 30 - .../mpegts/sources/InputStreamMTSSource.java | 63 -- .../mpegts/sources/LoopingMTSSource.java | 74 --- .../org/taktik/mpegts/sources/MTSSource.java | 7 - .../org/taktik/mpegts/sources/MTSSources.java | 80 --- .../mpegts/sources/MultiplexingMTSSource.java | 100 --- .../mpegts/sources/NullPacketSource.java | 34 - .../mpegts/sources/ResettableMTSSource.java | 5 - .../sources/SeekableByteChannelMTSSource.java | 39 -- 30 files changed, 2742 deletions(-) delete mode 100644 common/src/main/java/org/taktik/CHANGELOG.md delete mode 100644 common/src/main/java/org/taktik/ioutils/NIOUtils.java delete mode 100644 common/src/main/java/org/taktik/mpegts/Constants.java delete mode 100644 common/src/main/java/org/taktik/mpegts/MTSPacket.java delete mode 100644 common/src/main/java/org/taktik/mpegts/PATSection.java delete mode 100644 common/src/main/java/org/taktik/mpegts/PMTSection.java delete mode 100644 common/src/main/java/org/taktik/mpegts/PSISection.java delete mode 100644 common/src/main/java/org/taktik/mpegts/PacketSupport.java delete mode 100644 common/src/main/java/org/taktik/mpegts/Streamer.java delete mode 100644 common/src/main/java/org/taktik/mpegts/StreamerTest.java delete mode 100644 common/src/main/java/org/taktik/mpegts/sinks/ByteChannelSink.java delete mode 100644 common/src/main/java/org/taktik/mpegts/sinks/MTSSink.java delete mode 100644 common/src/main/java/org/taktik/mpegts/sinks/UDPTransport.java delete mode 100644 common/src/main/java/org/taktik/mpegts/sources/AbstractByteChannelMTSSource.java delete mode 100644 common/src/main/java/org/taktik/mpegts/sources/AbstractMTSSource.java delete mode 100644 common/src/main/java/org/taktik/mpegts/sources/BlockingMultiMTSSource.java delete mode 100644 common/src/main/java/org/taktik/mpegts/sources/ByteChannelMTSSource.java delete mode 100644 common/src/main/java/org/taktik/mpegts/sources/ByteSourceMTSSource.java delete mode 100644 common/src/main/java/org/taktik/mpegts/sources/ConcatenatingMTSSource.java delete mode 100644 common/src/main/java/org/taktik/mpegts/sources/ContinuityFixer.java delete mode 100644 common/src/main/java/org/taktik/mpegts/sources/ContinuityFixingMTSSource.java delete mode 100644 common/src/main/java/org/taktik/mpegts/sources/FixedBitrateMTSSource.java delete mode 100644 common/src/main/java/org/taktik/mpegts/sources/InputStreamMTSSource.java delete mode 100644 common/src/main/java/org/taktik/mpegts/sources/LoopingMTSSource.java delete mode 100644 common/src/main/java/org/taktik/mpegts/sources/MTSSource.java delete mode 100644 common/src/main/java/org/taktik/mpegts/sources/MTSSources.java delete mode 100644 common/src/main/java/org/taktik/mpegts/sources/MultiplexingMTSSource.java delete mode 100644 common/src/main/java/org/taktik/mpegts/sources/NullPacketSource.java delete mode 100644 common/src/main/java/org/taktik/mpegts/sources/ResettableMTSSource.java delete mode 100644 common/src/main/java/org/taktik/mpegts/sources/SeekableByteChannelMTSSource.java diff --git a/common/src/main/java/org/taktik/CHANGELOG.md b/common/src/main/java/org/taktik/CHANGELOG.md deleted file mode 100644 index a841946c..00000000 --- a/common/src/main/java/org/taktik/CHANGELOG.md +++ /dev/null @@ -1,10 +0,0 @@ -Changes made to mpegts-streamer for ctbrec -================= -* Remove dependency on jcodec -* Add sleepingEnabled flag in Streamer to disable sleeping completely and make processing of stream much faster -* Add sanity check in ContinuityFixer to fix avoid an IndexOutOfBoundsException -* Wait for bufferingThread and streamingThread in Streamer.stream() to make it a blocking method -* Add BlockingMultiMTSSource, which can be used to add sources, after the streaming has been started -* Don't close the stream, if a packet can't be read in one go InputStreamMTSSource. Instead read from - the stream until the packet is complete -* Remove finalize method. It is deprecated in Java 9. diff --git a/common/src/main/java/org/taktik/ioutils/NIOUtils.java b/common/src/main/java/org/taktik/ioutils/NIOUtils.java deleted file mode 100644 index f9fbe272..00000000 --- a/common/src/main/java/org/taktik/ioutils/NIOUtils.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.taktik.ioutils; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ReadableByteChannel; - -public class NIOUtils { - - public static int read(ReadableByteChannel channel, ByteBuffer buffer) throws IOException { - int rem = buffer.position(); - while (channel.read(buffer) != -1 && buffer.hasRemaining()) { - } - return buffer.position() - rem; - } - - public static int skip(ByteBuffer buffer, int count) { - int toSkip = Math.min(buffer.remaining(), count); - buffer.position(buffer.position() + toSkip); - return toSkip; - } - - public static final ByteBuffer read(ByteBuffer buffer, int count) { - ByteBuffer slice = buffer.duplicate(); - int limit = buffer.position() + count; - slice.limit(limit); - buffer.position(limit); - return slice; - } -} \ No newline at end of file diff --git a/common/src/main/java/org/taktik/mpegts/Constants.java b/common/src/main/java/org/taktik/mpegts/Constants.java deleted file mode 100644 index 599ac902..00000000 --- a/common/src/main/java/org/taktik/mpegts/Constants.java +++ /dev/null @@ -1,6 +0,0 @@ -package org.taktik.mpegts; - -public class Constants { - public static final int MPEGTS_PACKET_SIZE = 188; - public static final byte TS_MARKER = 0x47; -} diff --git a/common/src/main/java/org/taktik/mpegts/MTSPacket.java b/common/src/main/java/org/taktik/mpegts/MTSPacket.java deleted file mode 100644 index 74ed0c57..00000000 --- a/common/src/main/java/org/taktik/mpegts/MTSPacket.java +++ /dev/null @@ -1,608 +0,0 @@ -package org.taktik.mpegts; - -import java.nio.ByteBuffer; - -import org.taktik.ioutils.NIOUtils; - -import com.google.common.base.Preconditions; - -public class MTSPacket extends PacketSupport { - private boolean transportErrorIndicator; // Transport Error Indicator (TEI) - private boolean payloadUnitStartIndicator; // Payload Unit Start Indicator - private boolean transportPriority; // Transport Priority - private int pid; // Packet Identifier (PID) - private int scramblingControl; // Scrambling control - private boolean adaptationFieldExist; // Adaptation field exist - private boolean containsPayload; // Contains payload - private int continuityCounter; // Continuity counter - - private AdaptationField adaptationField; - private ByteBuffer payload; - - public static class AdaptationField { - private MTSPacket packet; - private boolean discontinuityIndicator; // Discontinuity indicator - private boolean randomAccessIndicator; // Random Access indicator - private boolean elementaryStreamPriorityIndicator; // Elementary stream priority indicator - private boolean pcrFlag; // PCR flag - private boolean opcrFlag; // OPCR flag - private boolean splicingPointFlag; // Splicing point flag - private boolean transportPrivateDataFlag; // Transport private data flag - private boolean adaptationFieldExtensionFlag; // Adaptation field extension flag - private PCR pcr; // PCR - private PCR opcr; // OPCR - private byte spliceCountdown; // Splice countdown - private byte[] privateData; // Private data - private byte[] extension; // Extension - - public static class PCR { - private AdaptationField field; - public long base; // 33 bits - public int extension; // 9 bits - public byte reserved; // 6 bits - - public PCR(AdaptationField field, long base, int extension, byte reserved) { - this.base = base; - this.extension = extension; - this.reserved = reserved; - this.field = field; - } - - public long getValue() { - return base * 300 + extension; - } - - public void setValue(long value) { - base = value / 300; - extension = (int)value % 300; - field.markDirty(); - } - - public void write(ByteBuffer buffer) { - buffer.putInt((int) ((base & 0x1FFFFFFFFL) >> 1)); - int middleByte = 0; - middleByte |= ((base & 0x1) << 7); - middleByte |= ((reserved & 0x3F) << 1); - middleByte |= ((extension & 0x1FF) >> 8); - buffer.put((byte) middleByte); - buffer.put((byte) (extension & 0xff)); - } - } - - public AdaptationField(MTSPacket packet, boolean discontinuityIndicator, boolean randomAccessIndicator, boolean elementaryStreamPriorityIndicator, boolean pcrFlag, boolean opcrFlag, boolean splicingPointFlag, boolean transportPrivateDataFlag, boolean adaptationFieldExtensionFlag, PCR pcr, PCR opcr, byte spliceCountdown, byte[] privateData, byte[] extension) { - this.packet = packet; - this.discontinuityIndicator = discontinuityIndicator; - this.randomAccessIndicator = randomAccessIndicator; - this.elementaryStreamPriorityIndicator = elementaryStreamPriorityIndicator; - this.pcrFlag = pcrFlag; - this.opcrFlag = opcrFlag; - this.splicingPointFlag = splicingPointFlag; - this.transportPrivateDataFlag = transportPrivateDataFlag; - this.adaptationFieldExtensionFlag = adaptationFieldExtensionFlag; - this.pcr = pcr; - if (this.pcr != null) { - this.pcr.field = this; - } - this.opcr = opcr; - if (this.opcr != null) { - this.opcr.field = this; - } - this.spliceCountdown = spliceCountdown; - this.privateData = privateData; - this.extension = extension; - } - - public boolean isDiscontinuityIndicator() { - return discontinuityIndicator; - } - - public void setDiscontinuityIndicator(boolean discontinuityIndicator) { - this.discontinuityIndicator = discontinuityIndicator; - markDirty(); - } - - private void markDirty() { - packet.markDirty(); - } - - public boolean isRandomAccessIndicator() { - return randomAccessIndicator; - } - - public void setRandomAccessIndicator(boolean randomAccessIndicator) { - this.randomAccessIndicator = randomAccessIndicator; - markDirty(); - } - - public boolean isElementaryStreamPriorityIndicator() { - return elementaryStreamPriorityIndicator; - } - - public void setElementaryStreamPriorityIndicator(boolean elementaryStreamPriorityIndicator) { - this.elementaryStreamPriorityIndicator = elementaryStreamPriorityIndicator; - markDirty(); - } - - public boolean isPcrFlag() { - return pcrFlag; - } - - public void setPcrFlag(boolean pcrFlag) { - this.pcrFlag = pcrFlag; - markDirty(); - } - - public boolean isOpcrFlag() { - return opcrFlag; - } - - public void setOpcrFlag(boolean opcrFlag) { - this.opcrFlag = opcrFlag; - markDirty(); - } - - public boolean isSplicingPointFlag() { - return splicingPointFlag; - } - - public void setSplicingPointFlag(boolean splicingPointFlag) { - this.splicingPointFlag = splicingPointFlag; - markDirty(); - } - - public boolean isTransportPrivateDataFlag() { - return transportPrivateDataFlag; - } - - public void setTransportPrivateDataFlag(boolean transportPrivateDataFlag) { - this.transportPrivateDataFlag = transportPrivateDataFlag; - markDirty(); - } - - public boolean isAdaptationFieldExtensionFlag() { - return adaptationFieldExtensionFlag; - } - - public void setAdaptationFieldExtensionFlag(boolean adaptationFieldExtensionFlag) { - this.adaptationFieldExtensionFlag = adaptationFieldExtensionFlag; - markDirty(); - } - - public PCR getPcr() { - return pcr; - } - - public void setPcr(PCR pcr) { - this.pcr = pcr; - markDirty(); - } - - public PCR getOpcr() { - return opcr; - } - - public void setOpcr(PCR opcr) { - this.opcr = opcr; - markDirty(); - } - - public byte getSpliceCountdown() { - return spliceCountdown; - } - - public void setSpliceCountdown(byte spliceCountdown) { - this.spliceCountdown = spliceCountdown; - markDirty(); - } - - public byte[] getPrivateData() { - return privateData; - } - - public void setPrivateData(byte[] privateData) { - this.privateData = privateData; - markDirty(); - } - - public byte[] getExtension() { - return extension; - } - - public void setExtension(byte[] extension) { - this.extension = extension; - markDirty(); - } - - public void write(ByteBuffer buffer, int payloadLength) { - int length = 183 - payloadLength; - int remaining = length; - buffer.put((byte) (length & 0xff)); - int firstByte = 0; - - // Discontinuity indicator - if (discontinuityIndicator) { - firstByte |= 0x80; - } - - // Random Access indicator - if (randomAccessIndicator) { - firstByte |= 0x40; - } - - // Elementary stream priority indicator - if (elementaryStreamPriorityIndicator) { - firstByte |= 0x20; - } - - // PCR flag - if (pcrFlag) { - firstByte |= 0x10; - } - - // OPCR flag - if (opcrFlag) { - firstByte |= 0x08; - } - - // Splicing point flag - if (splicingPointFlag) { - firstByte |= 0x04; - } - - // Transport private data flag - if (transportPrivateDataFlag) { - firstByte |= 0x2; - } - - // Adaptation field extension flag - if (adaptationFieldExtensionFlag) { - firstByte |= 0x01; - } - - buffer.put((byte) firstByte); - remaining --; - - if (pcrFlag && pcr != null) { - pcr.write(buffer); - remaining -= 6; - } - - if (opcrFlag && opcr != null) { - opcr.write(buffer); - remaining -= 6; - } - - if (splicingPointFlag) { - buffer.put(spliceCountdown); - remaining--; - } - - if (transportPrivateDataFlag && privateData != null) { - buffer.put(privateData); - remaining -= privateData.length; - } - - if (adaptationFieldExtensionFlag && extension != null) { - buffer.put(extension); - remaining -= extension.length; - } - - if (remaining < 0) { - throw new IllegalStateException("Adaptation field too big!"); - } - while (remaining-- > 0) { - buffer.put((byte)0); - } - } - } - - public MTSPacket(boolean transportErrorIndicator, boolean payloadUnitStartIndicator, boolean transportPriority, int pid, int scramblingControl, int continuityCounter) { - super(); - this.buffer = ByteBuffer.allocate(Constants.MPEGTS_PACKET_SIZE); - this.transportErrorIndicator = transportErrorIndicator; - this.payloadUnitStartIndicator = payloadUnitStartIndicator; - this.transportPriority = transportPriority; - this.pid = pid; - this.scramblingControl = scramblingControl; - this.continuityCounter = continuityCounter; - this.adaptationFieldExist = false; - this.containsPayload = false; - } - - public MTSPacket(ByteBuffer buffer) { - super(buffer); - } - - @Override - protected void write() { - // First write payload - int payloadLength = 0; - if (containsPayload && payload != null) { - payload.clear(); - payloadLength = payload.capacity(); - buffer.position(Constants.MPEGTS_PACKET_SIZE - payloadLength); - buffer.put(payload); - } - buffer.rewind(); - // First byte - buffer.put((byte) 0x47); - - // Bytes 2->3 - int secondAndThirdBytes = 0; - if (transportErrorIndicator) { - secondAndThirdBytes |= 0x8000; - } - if (payloadUnitStartIndicator) { - secondAndThirdBytes |= 0x4000; - } - if (transportPriority) { - secondAndThirdBytes |= 0x2000; - } - secondAndThirdBytes |= (pid & 0x1fff); - - buffer.putShort((short) secondAndThirdBytes); - - int fourthByte = 0; - - // Byte 4 - fourthByte |= (scramblingControl & 0xc0); - if (adaptationFieldExist) { - fourthByte |= 0x20; - } - if (containsPayload) { - fourthByte |= 0x10; - } - fourthByte |= (continuityCounter & 0x0f); - buffer.put((byte) fourthByte); - - // Adaptation field - if (adaptationFieldExist) { - if (adaptationField == null) { - buffer.put((byte) 0); - } else { - adaptationField.write(buffer, payloadLength); - } - } - - // Payload - if (containsPayload && payload != null) { - payload.rewind(); - buffer.put(payload); - } - if (buffer.remaining() != 0) { - throw new IllegalStateException("buffer.remaining() = " + buffer.remaining() + ", should be zero!"); - } - } - - protected void parse() { - // Sync byte - int marker = buffer.get() & 0xff; - Preconditions.checkArgument(Constants.TS_MARKER == marker); - - // Second/Third byte - int secondAndThirdBytes = buffer.getShort(); - - // Transport Error Indicator (TEI) - boolean transportErrorIndicator = (secondAndThirdBytes & 0x8000) != 0; - - // Payload Unit Start Indicator - boolean payloadUnitStartIndicator = (secondAndThirdBytes & 0x4000) != 0; - - // Transport Priority - boolean transportPriority = (secondAndThirdBytes & 0x2000) != 0; - - // Packet Identifier (PID) - int pid = secondAndThirdBytes & 0x1fff; - - // Fourth byte - int fourthByte = buffer.get() & 0xff; - - // Scrambling control - int scramblingControl = fourthByte & 0xc0; - - // Adaptation field exist - boolean adaptationFieldExist = (fourthByte & 0x20) != 0; - - // Contains payload - boolean containsPayload = (fourthByte & 0x10) != 0; - - // Continuity counter - int continuityCounter = fourthByte & 0x0f; - - MTSPacket.AdaptationField adaptationField = null; - if (adaptationFieldExist) { - // Adaptation Field Length - int adaptationFieldLength = buffer.get() & 0xff; - if (adaptationFieldLength != 0) { - - int remainingBytes = adaptationFieldLength; - - // Get next byte - int nextByte = buffer.get() & 0xff; - remainingBytes--; - - // Discontinuity indicator - boolean discontinuityIndicator = (nextByte & 0x80) != 0; - - // Random Access indicator - boolean randomAccessIndicator = (nextByte & 0x40) != 0; - - // Elementary stream priority indicator - boolean elementaryStreamPriorityIndicator = (nextByte & 0x20) != 0; - - // PCR flag - boolean pcrFlag = (nextByte & 0x10) != 0; - - // OPCR flag - boolean opcrFlag = (nextByte & 0x08) != 0; - - // Splicing point flag - boolean splicingPointFlag = (nextByte & 0x04) != 0; - - // Transport private data flag - boolean transportPrivateDataFlag = (nextByte & 0x2) != 0; - - // Adaptation field extension flag - boolean adaptationFieldExtensionFlag = (nextByte & 0x01) != 0; - - // PCR - MTSPacket.AdaptationField.PCR pcr = null; - if (pcrFlag) { - pcr = parsePCR(); - remainingBytes -= 6; - } - - // OPCR - MTSPacket.AdaptationField.PCR opcr = null; - if (opcrFlag) { - opcr = parsePCR(); - remainingBytes -= 6; - } - - // Splice countdown - byte spliceCountdown = 0; - if (splicingPointFlag) { - spliceCountdown = buffer.get(); - remainingBytes--; - } - - byte[] privateData = null; - if (transportPrivateDataFlag) { - int transportPrivateDataLength = buffer.get() & 0xff; - privateData = new byte[transportPrivateDataLength]; - buffer.get(privateData); - remainingBytes -= transportPrivateDataLength; - } - - byte[] extension = null; - if (adaptationFieldExtensionFlag) { - int extensionLength = buffer.get() & 0xff; - extension = new byte[extensionLength]; - buffer.get(extension); - remainingBytes -= extensionLength; - } - - // Skip remaining bytes - NIOUtils.skip(buffer, remainingBytes); - - adaptationField = new MTSPacket.AdaptationField(this, discontinuityIndicator, randomAccessIndicator, elementaryStreamPriorityIndicator, pcrFlag, opcrFlag, splicingPointFlag, transportPrivateDataFlag, adaptationFieldExtensionFlag, pcr, opcr, spliceCountdown, privateData, extension); - } - } - - this.transportErrorIndicator = transportErrorIndicator; - this.payloadUnitStartIndicator = payloadUnitStartIndicator; - this.transportPriority = transportPriority; - this.pid = pid; - this.scramblingControl = scramblingControl; - this.adaptationFieldExist = adaptationFieldExist; - this.containsPayload = containsPayload; - this.continuityCounter = continuityCounter; - this.adaptationField = adaptationField; - - // Payload - this.payload = containsPayload ? buffer.slice() : null; - } - - private AdaptationField.PCR parsePCR() { - AdaptationField.PCR pcr; - byte[] pcrBytes = new byte[6]; - buffer.get(pcrBytes); - - long pcrBits = ((pcrBytes[0] & 0xffL) << 40) | ((pcrBytes[1] & 0xffL) << 32) | ((pcrBytes[2] & 0xffL) << 24) | ((pcrBytes[3] & 0xffL) << 16) | ((pcrBytes[4] & 0xffL) << 8) | (pcrBytes[5] & 0xffL); - long base = (pcrBits & 0xFFFFFFFF8000L) >> 15; - byte reserved = (byte) ((pcrBits & 0x7E00) >> 9); - int extension = (int) (pcrBits & 0x1FFL); - pcr = new AdaptationField.PCR(null, base, extension, reserved); - return pcr; - } - - public boolean isTransportErrorIndicator() { - return transportErrorIndicator; - } - - public void setTransportErrorIndicator(boolean transportErrorIndicator) { - this.transportErrorIndicator = transportErrorIndicator; - markDirty(); - } - - public boolean isPayloadUnitStartIndicator() { - return payloadUnitStartIndicator; - } - - public void setPayloadUnitStartIndicator(boolean payloadUnitStartIndicator) { - this.payloadUnitStartIndicator = payloadUnitStartIndicator; - markDirty(); - } - - public boolean isTransportPriority() { - return transportPriority; - } - - public void setTransportPriority(boolean transportPriority) { - this.transportPriority = transportPriority; - markDirty(); - } - - public int getPid() { - return pid; - } - - public void setPid(int pid) { - this.pid = pid; - markDirty(); - } - - public int getScramblingControl() { - return scramblingControl; - } - - public void setScramblingControl(int scramblingControl) { - this.scramblingControl = scramblingControl; - markDirty(); - } - - public boolean isAdaptationFieldExist() { - return adaptationFieldExist; - } - - public void setAdaptationFieldExist(boolean adaptationFieldExist) { - this.adaptationFieldExist = adaptationFieldExist; - markDirty(); - } - - public boolean isContainsPayload() { - return containsPayload; - } - - public void setContainsPayload(boolean containsPayload) { - this.containsPayload = containsPayload; - markDirty(); - } - - public int getContinuityCounter() { - return continuityCounter; - } - - public void setContinuityCounter(int continuityCounter) { - this.continuityCounter = continuityCounter; - markDirty(); - } - - public AdaptationField getAdaptationField() { - return adaptationField; - } - - public void setAdaptationField(AdaptationField adaptationField) { - this.adaptationField = adaptationField; - markDirty(); - } - - public ByteBuffer getPayload() { - return payload; - } - - public void setPayload(ByteBuffer payload) { - this.payload = payload; - markDirty(); - } -} \ No newline at end of file diff --git a/common/src/main/java/org/taktik/mpegts/PATSection.java b/common/src/main/java/org/taktik/mpegts/PATSection.java deleted file mode 100644 index 40e155de..00000000 --- a/common/src/main/java/org/taktik/mpegts/PATSection.java +++ /dev/null @@ -1,49 +0,0 @@ -package org.taktik.mpegts; - - -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import com.google.common.collect.Lists; - -public class PATSection extends PSISection { - private Integer[] networkPids; - private Map programs; - - public PATSection(PSISection psi, Integer[] networkPids, Map programs) { - super(psi); - this.networkPids = networkPids; - this.programs = programs; - } - - public Integer[] getNetworkPids() { - return networkPids; - } - - public Map getPrograms() { - return programs; - } - - public static PATSection parse(ByteBuffer data) { - PSISection psi = PSISection.parse(data); - if (psi == null) { - return null; - } - List networkPids = Lists.newArrayList(); - Map programs = new HashMap<>(); - - while (data.remaining() > 4) { - int programNum = data.getShort() & 0xffff; - int w = data.getShort(); - int pid = w & 0x1fff; - if (programNum == 0) - networkPids.add(pid); - else - programs.put(programNum, pid); - } - - return new PATSection(psi, networkPids.toArray(new Integer[networkPids.size()]), programs); - } -} \ No newline at end of file diff --git a/common/src/main/java/org/taktik/mpegts/PMTSection.java b/common/src/main/java/org/taktik/mpegts/PMTSection.java deleted file mode 100644 index 38137d35..00000000 --- a/common/src/main/java/org/taktik/mpegts/PMTSection.java +++ /dev/null @@ -1,133 +0,0 @@ -package org.taktik.mpegts; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -import org.taktik.ioutils.NIOUtils; - - - -/** - * This class is part of JCodec ( www.jcodec.org ) This software is distributed - * under FreeBSD License - * - * Represents PMT ( Program Map Table ) of the MPEG Transport stream - * - * This section contains information about streams of an individual program, a - * program usually contains two or more streams, such as video, audio, text, - * etc.. - * - * @author The JCodec project - * - */ -public class PMTSection extends PSISection { - - private int pcrPid; - // private Tag[] tags; - // private PMTStream[] streams; - - public PMTSection(PSISection psi, int pcrPid) {//, Tag[] tags, PMTStream[] streams) { - super(psi); - this.pcrPid = pcrPid; - // this.tags = tags; - // this.streams = streams; - } - - public int getPcrPid() { - return pcrPid; - } - - // public Tag[] getTags() { - // return tags; - // } - // - // public PMTStream[] getStreams() { - // return streams; - // } - - public static PMTSection parse(ByteBuffer data) { - PSISection psi = PSISection.parse(data); - - int w1 = data.getShort() & 0xffff; - int pcrPid = w1 & 0x1fff; - - //int w2 = data.getShort() & 0xffff; - //int programInfoLength = w2 & 0xfff; - - // List tags = parseTags(NIOUtils.read(data, programInfoLength)); - // List streams = new ArrayList(); - // while (data.remaining() > 4) { - // int streamType = data.get() & 0xff; - // int wn = data.getShort() & 0xffff; - // int elementaryPid = wn & 0x1fff; - // - // - // int wn1 = data.getShort() & 0xffff; - // int esInfoLength = wn1 & 0xfff; - // ByteBuffer read = NIOUtils.read(data, esInfoLength); - // streams.add(new PMTStream(streamType, elementaryPid, MPSUtils.parseDescriptors(read))); - // } - - return new PMTSection(psi, pcrPid); - } - - static List parseTags(ByteBuffer bb) { - List tags = new ArrayList(); - while (bb.hasRemaining()) { - int tag = bb.get(); - int tagLen = bb.get(); - tags.add(new Tag(tag, NIOUtils.read(bb, tagLen))); - } - return tags; - } - - - public static class Tag { - private int tag; - private ByteBuffer content; - - public Tag(int tag, ByteBuffer content) { - this.tag = tag; - this.content = content; - } - - public int getTag() { - return tag; - } - - public ByteBuffer getContent() { - return content; - } - } - - // public static class PMTStream { - // private int streamTypeTag; - // private int pid; - // private List descriptors; - // private StreamType streamType; - // - // public PMTStream(int streamTypeTag, int pid, List descriptors) { - // this.streamTypeTag = streamTypeTag; - // this.pid = pid; - // this.descriptors = descriptors; - // this.streamType = StreamType.fromTag(streamTypeTag); - // } - // - // public int getStreamTypeTag() { - // return streamTypeTag; - // } - // - // public StreamType getStreamType() { - // return streamType; - // } - // - // public int getPid() { - // return pid; - // } - // - // public List getDesctiptors() { - // return descriptors; - // } - // } -} \ No newline at end of file diff --git a/common/src/main/java/org/taktik/mpegts/PSISection.java b/common/src/main/java/org/taktik/mpegts/PSISection.java deleted file mode 100644 index 89b0215b..00000000 --- a/common/src/main/java/org/taktik/mpegts/PSISection.java +++ /dev/null @@ -1,90 +0,0 @@ -package org.taktik.mpegts; - - -import java.nio.ByteBuffer; - -/** - * This class is part of JCodec ( www.jcodec.org ) This software is distributed - * under FreeBSD License - * - * Represents a section of PSI payload ( Program Stream Information ) MPEG - * Transport stream - * - * @author The JCodec project - * - */ -public class PSISection { - private int tableId; - private int specificId; - private int versionNumber; - private int currentNextIndicator; - private int sectionNumber; - private int lastSectionNumber; - - /** - * A copy constructor - * - * @param other - */ - public PSISection(PSISection other) { - this(other.tableId, other.specificId, other.versionNumber, other.currentNextIndicator, other.sectionNumber, - other.lastSectionNumber); - } - - public PSISection(int tableId, int specificId, int versionNumber, int currentNextIndicator, int sectionNumber, - int lastSectionNumber) { - this.tableId = tableId; - this.specificId = specificId; - this.versionNumber = versionNumber; - this.currentNextIndicator = currentNextIndicator; - this.sectionNumber = sectionNumber; - this.lastSectionNumber = lastSectionNumber; - } - - public static PSISection parse(ByteBuffer data) { - int tableId = data.get() & 0xff; - int w0 = data.getShort() & 0xffff; - if ((w0 & 0xC000) != 0x8000) { - return null; - } - - int sectionLength = w0 & 0xfff; - - data.limit(data.position() + sectionLength); - - int specificId = data.getShort() & 0xffff; - int b0 = data.get() & 0xff; - int versionNumber = (b0 >> 1) & 0x1f; - int currentNextIndicator = b0 & 1; - - int sectionNumber = data.get() & 0xff; - int lastSectionNumber = data.get() & 0xff; - - return new PSISection(tableId, specificId, versionNumber, currentNextIndicator, sectionNumber, - lastSectionNumber); - } - - public int getTableId() { - return tableId; - } - - public int getSpecificId() { - return specificId; - } - - public int getVersionNumber() { - return versionNumber; - } - - public int getCurrentNextIndicator() { - return currentNextIndicator; - } - - public int getSectionNumber() { - return sectionNumber; - } - - public int getLastSectionNumber() { - return lastSectionNumber; - } -} \ No newline at end of file diff --git a/common/src/main/java/org/taktik/mpegts/PacketSupport.java b/common/src/main/java/org/taktik/mpegts/PacketSupport.java deleted file mode 100644 index 93dee48b..00000000 --- a/common/src/main/java/org/taktik/mpegts/PacketSupport.java +++ /dev/null @@ -1,37 +0,0 @@ -package org.taktik.mpegts; - - -import java.nio.ByteBuffer; - -public abstract class PacketSupport { - protected ByteBuffer buffer; - protected boolean dirty; - - public PacketSupport() { - dirty = false; - } - - public PacketSupport(ByteBuffer buffer) { - this.buffer = buffer; - buffer.rewind(); - parse(); - buffer.rewind(); - dirty = false; - } - - public ByteBuffer getBuffer() { - if (dirty) { - write(); - buffer.rewind(); - dirty = false; - } - return buffer; - } - - protected abstract void parse(); - protected abstract void write(); - - protected void markDirty() { - dirty = true; - } -} diff --git a/common/src/main/java/org/taktik/mpegts/Streamer.java b/common/src/main/java/org/taktik/mpegts/Streamer.java deleted file mode 100644 index 9c4fd428..00000000 --- a/common/src/main/java/org/taktik/mpegts/Streamer.java +++ /dev/null @@ -1,347 +0,0 @@ -package org.taktik.mpegts; - -import java.nio.ByteBuffer; -import java.util.TreeMap; -import java.util.concurrent.ArrayBlockingQueue; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.taktik.mpegts.sinks.MTSSink; -import org.taktik.mpegts.sources.MTSSource; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; - -public class Streamer { - static final Logger log = LoggerFactory.getLogger("streamer"); - - private MTSSource source; - private MTSSink sink; - - private ArrayBlockingQueue buffer; - private int bufferSize; - private boolean endOfSourceReached; - private boolean streamingShouldStop; - - private PATSection patSection; - private TreeMap pmtSection; - - private Thread bufferingThread; - private Thread streamingThread; - - private boolean sleepingEnabled; - private String name; - - private Streamer(MTSSource source, MTSSink sink, int bufferSize, boolean sleepingEnabled, String name) { - this.source = source; - this.sink = sink; - this.bufferSize = bufferSize; - this.sleepingEnabled = sleepingEnabled; - this.name = name; - } - - public void stream() throws InterruptedException { - buffer = new ArrayBlockingQueue<>(bufferSize); - patSection = null; - pmtSection = Maps.newTreeMap(); - endOfSourceReached = false; - streamingShouldStop = false; - log.info("PreBuffering {} packets", bufferSize); - try { - preBuffer(); - } catch (Exception e) { - throw new IllegalStateException("Error while buffering", e); - } - log.info("Done PreBuffering"); - - bufferingThread = new Thread(this::fillBuffer, "Buffering ["+name+"]"); - bufferingThread.setDaemon(true); - bufferingThread.start(); - - streamingThread = new Thread(this::internalStream, "Streaming ["+name+"]"); - streamingThread.setDaemon(true); - streamingThread.start(); - - bufferingThread.join(); - streamingThread.join(); - - try { - sink.close(); - } catch(Exception e) { - log.error("Couldn't close sink", e); - } - } - - public void stop() { - streamingShouldStop = true; - try { - source.close(); - } catch (Exception e) { - log.error("Couldn't close source", e); - } - try { - sink.close(); - } catch (Exception e) { - log.error("Couldn't close sink", e); - } - buffer.clear(); - try { - bufferingThread.interrupt(); - streamingThread.interrupt(); - } catch (Exception e) { - log.error("Couldn't interrupt streamer threads"); - } - } - - public void switchSink(MTSSink sink) { - MTSSink old = this.sink; - this.sink = sink; - try { - old.close(); - } catch (Exception e) { - log.error("Couldn't close old sink while switching sinks", e); - } - } - - private void internalStream() { - boolean resetState = false; - MTSPacket packet = null; - long packetCount = 0; - Long firstPcrValue = null; - Long firstPcrTime = null; - Long lastPcrValue = null; - Long lastPcrTime = null; - Long averageSleep = null; - while (!streamingShouldStop) { - if (resetState) { - firstPcrValue = null; - firstPcrTime = null; - lastPcrValue = null; - lastPcrTime = null; - averageSleep = null; - resetState = false; - } - - // Initialize time to sleep - long sleepNanos = 0; - - try { - packet = buffer.take(); - if (packet == null) { - if (endOfSourceReached) { - packet = buffer.take(); - if (packet == null) { - break; - } - } else { - continue; - } - } - } catch (InterruptedException e1) { - if(!endOfSourceReached && !streamingShouldStop) { - log.error("Interrupted while waiting for packet"); - continue; - } else { - Thread.currentThread().interrupt(); - break; - } - } - - int pid = packet.getPid(); - - if (pid == 0 && packet.isPayloadUnitStartIndicator()) { - ByteBuffer payload = packet.getPayload(); - payload.rewind(); - int pointer = payload.get() & 0xff; - payload.position(payload.position() + pointer); - patSection = PATSection.parse(payload); - for (Integer pmtPid : pmtSection.keySet()) { - if (!patSection.getPrograms().values().contains(pmtPid)) { - pmtSection.remove(pmtPid); - } - } - } - - if (pid != 0 && patSection!=null && patSection.getPrograms().values().contains(pid) && packet.isPayloadUnitStartIndicator()) { - ByteBuffer payload = packet.getPayload(); - payload.rewind(); - int pointer = payload.get() & 0xff; - payload.position(payload.position() + pointer); - pmtSection.put(pid, PMTSection.parse(payload)); - } - - // Check PID matches PCR PID - if (averageSleep != null) { - sleepNanos = averageSleep; - } - - // Check for PCR - if (packet.getAdaptationField() != null && packet.getAdaptationField().getPcr() != null) { - if (packet.getPid() == getPCRPid()) { - if (!packet.getAdaptationField().isDiscontinuityIndicator()) { - // Get PCR and current nano time - long pcrValue = packet.getAdaptationField().getPcr().getValue(); - long pcrTime = System.nanoTime(); - - // Compute sleepNanosOrig - if (firstPcrValue == null || firstPcrTime == null) { - firstPcrValue = pcrValue; - firstPcrTime = pcrTime; - } - - // Compute sleepNanosPrevious - Long sleepNanosPrevious = null; - if (lastPcrValue != null && lastPcrTime != null) { - if (pcrValue <= lastPcrValue) { - log.trace("PCR discontinuity ! {}", packet.getPid()); - resetState = true; - } else { - sleepNanosPrevious = ((pcrValue - lastPcrValue) / 27 * 1000) - (pcrTime - lastPcrTime); - } - } - - // Set sleep time based on PCR if possible - if (sleepNanosPrevious != null) { - // Safety : We should never have to wait more than 100ms - if (sleepNanosPrevious > 100000000) { - log.warn("PCR sleep ignored, too high !"); - resetState = true; - } else { - sleepNanos = sleepNanosPrevious; - } - } - - // Set lastPcrValue/lastPcrTime - lastPcrValue = pcrValue; - lastPcrTime = pcrTime + sleepNanos; - } else { - log.warn("Skipped PCR - Discontinuity indicator"); - } - } else { - log.debug("Skipped PCR - PID does not match"); - } - } - - // Sleep if needed - if (sleepNanos > 0 && sleepingEnabled) { - log.trace("Sleeping {} millis, {} nanos", sleepNanos / 1000000, sleepNanos % 1000000); - try { - Thread.sleep(sleepNanos / 1000000, (int) (sleepNanos % 1000000)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.warn("Streaming sleep interrupted!"); - } - } - - // Stream packet - if(!streamingShouldStop && !Thread.interrupted()) { - try { - sink.send(packet); - } catch (Exception e) { - log.error("Error sending packet to sink", e); - } - } - - packetCount++; - } - log.info("Sent {} MPEG-TS packets", packetCount); - synchronized (this) { - notifyAll(); - } - } - - private void preBuffer() throws Exception { - MTSPacket packet; - int packetNumber = 0; - while ((packetNumber < bufferSize) && (packet = source.nextPacket()) != null) { - buffer.add(packet); - packetNumber++; - } - } - - private void fillBuffer() { - try { - MTSPacket packet; - while (!streamingShouldStop && (packet = source.nextPacket()) != null) { - boolean put = false; - while (!put) { - put = putPacketToBuffer(packet); - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.error("Error reading from source", e); - } catch (Exception e) { - log.error("Error reading from source", e); - } finally { - endOfSourceReached = true; - try { - streamingThread.interrupt(); - } catch(Exception e) { - log.error("Couldn't interrupt streaming thread", e); - } - } - } - - private boolean putPacketToBuffer(MTSPacket packet) { - try { - buffer.put(packet); - return true; - } catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - log.error("Error adding packet to buffer", ignored); - return false; - } - } - - private int getPCRPid() { - if ((!pmtSection.isEmpty())) { - return pmtSection.values().iterator().next().getPcrPid(); - } - return -1; - } - - public static StreamerBuilder builder() { - return new StreamerBuilder(); - } - - public static class StreamerBuilder { - private MTSSink sink; - private MTSSource source; - private int bufferSize = 1000; - private boolean sleepingEnabled = false; - private String name; - - public StreamerBuilder setSink(MTSSink sink) { - this.sink = sink; - return this; - } - - public StreamerBuilder setSource(MTSSource source) { - this.source = source; - return this; - } - - public StreamerBuilder setBufferSize(int bufferSize) { - this.bufferSize = bufferSize; - return this; - } - - public StreamerBuilder setSleepingEnabled(boolean sleepingEnabled) { - this.sleepingEnabled = sleepingEnabled; - return this; - } - - public StreamerBuilder setName(String name) { - this.name = name; - return this; - } - - public Streamer build() { - Preconditions.checkNotNull(sink); - Preconditions.checkNotNull(source); - return new Streamer(source, sink, bufferSize, sleepingEnabled, name); - } - - } -} \ No newline at end of file diff --git a/common/src/main/java/org/taktik/mpegts/StreamerTest.java b/common/src/main/java/org/taktik/mpegts/StreamerTest.java deleted file mode 100644 index 699f4da1..00000000 --- a/common/src/main/java/org/taktik/mpegts/StreamerTest.java +++ /dev/null @@ -1,45 +0,0 @@ -package org.taktik.mpegts; - -import java.io.File; - -import org.taktik.mpegts.sinks.MTSSink; -import org.taktik.mpegts.sinks.UDPTransport; -import org.taktik.mpegts.sources.MTSSource; -import org.taktik.mpegts.sources.MTSSources; -import org.taktik.mpegts.sources.ResettableMTSSource; - -public class StreamerTest { - public static void main(String[] args) throws Exception { - - // Set up mts sink - MTSSink transport = UDPTransport.builder() - //.setAddress("239.222.1.1") - .setAddress("127.0.0.1") - .setPort(1234) - .setSoTimeout(5000) - .setTtl(1) - .build(); - - - ResettableMTSSource ts1 = MTSSources.from(new File("/Users/abaudoux/Downloads/EBSrecording.mpg")); - - // media132, media133 --> ok - // media133, media132 --> ok - // media123, media132 --> ko - - - // Build source - MTSSource source = MTSSources.loop(ts1); - - // build streamer - Streamer streamer = Streamer.builder() - .setSource(source) - //.setSink(ByteChannelSink.builder().setByteChannel(fc).build()) - .setSink(transport) - .build(); - - // Start streaming - streamer.stream(); - - } -} diff --git a/common/src/main/java/org/taktik/mpegts/sinks/ByteChannelSink.java b/common/src/main/java/org/taktik/mpegts/sinks/ByteChannelSink.java deleted file mode 100644 index 435b1657..00000000 --- a/common/src/main/java/org/taktik/mpegts/sinks/ByteChannelSink.java +++ /dev/null @@ -1,43 +0,0 @@ -package org.taktik.mpegts.sinks; - -import org.taktik.mpegts.MTSPacket; - -import java.nio.channels.ByteChannel; - -public class ByteChannelSink implements MTSSink { - - private ByteChannel byteChannel; - - private ByteChannelSink(ByteChannel byteChannel) { - this.byteChannel = byteChannel; - } - - public static ByteChannelSinkBuilder builder() { - return new ByteChannelSinkBuilder(); - } - - @Override - public void send(MTSPacket packet) throws Exception { - byteChannel.write(packet.getBuffer()); - } - - @Override - public void close() throws Exception { - byteChannel.close(); - } - - public static class ByteChannelSinkBuilder { - private ByteChannel byteChannel; - - private ByteChannelSinkBuilder(){} - - public ByteChannelSink build() { - return new ByteChannelSink(byteChannel); - } - - public ByteChannelSinkBuilder setByteChannel(ByteChannel byteChannel) { - this.byteChannel = byteChannel; - return this; - } - } -} diff --git a/common/src/main/java/org/taktik/mpegts/sinks/MTSSink.java b/common/src/main/java/org/taktik/mpegts/sinks/MTSSink.java deleted file mode 100644 index f84f00c2..00000000 --- a/common/src/main/java/org/taktik/mpegts/sinks/MTSSink.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.taktik.mpegts.sinks; - -import org.taktik.mpegts.MTSPacket; - -public interface MTSSink extends AutoCloseable { - void send(MTSPacket packet) throws Exception; -} diff --git a/common/src/main/java/org/taktik/mpegts/sinks/UDPTransport.java b/common/src/main/java/org/taktik/mpegts/sinks/UDPTransport.java deleted file mode 100644 index 786148ef..00000000 --- a/common/src/main/java/org/taktik/mpegts/sinks/UDPTransport.java +++ /dev/null @@ -1,77 +0,0 @@ -package org.taktik.mpegts.sinks; - -import com.google.common.base.Preconditions; -import org.taktik.mpegts.MTSPacket; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.InetSocketAddress; -import java.net.MulticastSocket; -import java.nio.ByteBuffer; - -public class UDPTransport implements MTSSink { - - private final InetSocketAddress inetSocketAddress; - private final MulticastSocket multicastSocket; - - - private UDPTransport(String address, int port, int ttl, int soTimeout) throws IOException { - // InetSocketAddress - inetSocketAddress = new InetSocketAddress(address, port); - - // Create the socket but we don't bind it as we are only going to send data - // Note that we don't have to join the multicast group if we are only sending data and not receiving - multicastSocket = new MulticastSocket(); - multicastSocket.setReuseAddress(true); - multicastSocket.setSoTimeout(soTimeout); - multicastSocket.setTimeToLive(ttl); - } - - public static UDPTransport.UDPTransportBuilder builder() { - return new UDPTransportBuilder(); - } - - @Override - public void send(MTSPacket packet) throws IOException { - ByteBuffer buffer = packet.getBuffer(); - Preconditions.checkArgument(buffer.hasArray()); - DatagramPacket datagramPacket = new DatagramPacket(buffer.array(), buffer.arrayOffset(), buffer.limit(), inetSocketAddress); - multicastSocket.send(datagramPacket); - } - - @Override - public void close() { - multicastSocket.close(); - } - - public static class UDPTransportBuilder { - private String address; - private int port; - private int ttl; - private int soTimeout; - - public UDPTransportBuilder setAddress(String address) { - this.address = address; - return this; - } - - public UDPTransportBuilder setPort(int port) { - this.port = port; - return this; - } - - public UDPTransportBuilder setTtl(int ttl) { - this.ttl = ttl; - return this; - } - - public UDPTransportBuilder setSoTimeout(int timeout) { - this.soTimeout = timeout; - return this; - } - - public UDPTransport build() throws IOException { - return new UDPTransport(address, port, ttl, soTimeout); - } - } -} diff --git a/common/src/main/java/org/taktik/mpegts/sources/AbstractByteChannelMTSSource.java b/common/src/main/java/org/taktik/mpegts/sources/AbstractByteChannelMTSSource.java deleted file mode 100644 index 5cc52fdd..00000000 --- a/common/src/main/java/org/taktik/mpegts/sources/AbstractByteChannelMTSSource.java +++ /dev/null @@ -1,117 +0,0 @@ -package org.taktik.mpegts.sources; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ByteChannel; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.taktik.ioutils.NIOUtils; -import org.taktik.mpegts.Constants; -import org.taktik.mpegts.MTSPacket; - -public abstract class AbstractByteChannelMTSSource extends AbstractMTSSource { - static final Logger log = LoggerFactory.getLogger("source"); - - private static final int BUFFER_SIZE = Constants.MPEGTS_PACKET_SIZE * 1000; - - protected ByteBuffer buffer; - protected T byteChannel; - - - protected AbstractByteChannelMTSSource(T byteChannel) throws IOException { - this.byteChannel = byteChannel; - fillBuffer(); - } - - protected void fillBuffer() throws IOException { - buffer = ByteBuffer.allocate(BUFFER_SIZE); - NIOUtils.read(byteChannel, buffer); - buffer.flip(); - } - - protected boolean lastBuffer() { - return buffer.capacity() > buffer.limit(); - } - - @Override - protected MTSPacket nextPacketInternal() throws IOException { - ByteBuffer packetBuffer = null; - while (true) { - boolean foundFirstMarker = false; - int skipped = 0; - while (!foundFirstMarker) { - if (!buffer.hasRemaining()) { - if (lastBuffer()) { - return null; - } - buffer = ByteBuffer.allocate(BUFFER_SIZE); - if (NIOUtils.read(byteChannel, buffer) <= 0) { - return null; - } - buffer.flip(); - } - if ((buffer.get(buffer.position()) & 0xff) == Constants.TS_MARKER) { - foundFirstMarker = true; - } else { - buffer.position(buffer.position() + 1); - skipped++; - } - } - if (skipped > 0) { - log.info("Skipped {} bytes looking for TS marker", skipped); - } - if (buffer.remaining() >= Constants.MPEGTS_PACKET_SIZE) { - if ((buffer.remaining() == Constants.MPEGTS_PACKET_SIZE) || - (buffer.get(buffer.position() + Constants.MPEGTS_PACKET_SIZE) & 0xff) == Constants.TS_MARKER) { - packetBuffer = buffer.slice(); - packetBuffer.limit(Constants.MPEGTS_PACKET_SIZE); - buffer.position(buffer.position() + Constants.MPEGTS_PACKET_SIZE); - } else { - log.info("no second marker found"); - buffer.position(buffer.position() + 1); - } - } else if (!lastBuffer()) { - log.info("NEW BUFFER"); - - ByteBuffer newBuffer = ByteBuffer.allocate(BUFFER_SIZE); - newBuffer.put(buffer); - buffer = newBuffer; - if (NIOUtils.read(byteChannel, buffer) <= 0) { - return null; - } - buffer.flip(); - if (buffer.remaining() >= Constants.MPEGTS_PACKET_SIZE) { - if ((buffer.remaining() == Constants.MPEGTS_PACKET_SIZE) || - (buffer.get(buffer.position() + Constants.MPEGTS_PACKET_SIZE) & 0xff) == Constants.TS_MARKER) { - packetBuffer = buffer.slice(); - packetBuffer.limit(Constants.MPEGTS_PACKET_SIZE); - buffer.position(buffer.position() + Constants.MPEGTS_PACKET_SIZE); - } else { - log.info("no second marker found"); - buffer.position(buffer.position() + 1); - } - } else { - return null; - } - } else { - return null; - } - - if (packetBuffer != null) { - // Parse the packet - try { - return new MTSPacket(packetBuffer); - } catch (Exception e) { - packetBuffer = null; - log.warn("Error parsing packet", e); - } - } - } - } - - @Override - protected void closeInternal() throws Exception { - byteChannel.close(); - } -} diff --git a/common/src/main/java/org/taktik/mpegts/sources/AbstractMTSSource.java b/common/src/main/java/org/taktik/mpegts/sources/AbstractMTSSource.java deleted file mode 100644 index a7c81a69..00000000 --- a/common/src/main/java/org/taktik/mpegts/sources/AbstractMTSSource.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.taktik.mpegts.sources; - -import org.taktik.mpegts.MTSPacket; - -public abstract class AbstractMTSSource implements MTSSource { - private boolean closed; - - @Override - public final MTSPacket nextPacket() throws Exception { - if (isClosed()) { - throw new IllegalStateException("Source is closed"); - } - return nextPacketInternal(); - } - - @Override - public final void close() throws Exception { - try { - closeInternal(); - } finally { - closed = true; - } - } - - protected boolean isClosed() { - return closed; - } - - protected abstract MTSPacket nextPacketInternal() throws Exception; - protected abstract void closeInternal() throws Exception; -} diff --git a/common/src/main/java/org/taktik/mpegts/sources/BlockingMultiMTSSource.java b/common/src/main/java/org/taktik/mpegts/sources/BlockingMultiMTSSource.java deleted file mode 100644 index 298d674e..00000000 --- a/common/src/main/java/org/taktik/mpegts/sources/BlockingMultiMTSSource.java +++ /dev/null @@ -1,135 +0,0 @@ -package org.taktik.mpegts.sources; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.taktik.mpegts.MTSPacket; - -import ctbrec.recorder.ProgressListener; - -public class BlockingMultiMTSSource extends AbstractMTSSource implements AutoCloseable { - - private static final transient Logger LOG = LoggerFactory.getLogger(BlockingMultiMTSSource.class); - - private final boolean fixContinuity; - private ContinuityFixer continuityFixer; - - private final BlockingQueue sources; - private MTSSource currentSource; - private int downloadedSegments = 0; - private int totalSegments = -1; - private ProgressListener listener; - private int lastProgress = 0; - - private BlockingMultiMTSSource(boolean fixContinuity) { - this.fixContinuity = fixContinuity; - if (fixContinuity) { - continuityFixer = new ContinuityFixer(); - } - this.sources = new LinkedBlockingQueue<>(10); - } - - public void addSource(MTSSource source) throws InterruptedException { - this.sources.put(source); - } - - @Override - protected MTSPacket nextPacketInternal() throws Exception { - if(currentSource == null) { - currentSource = sources.take(); - } - - MTSPacket packet = currentSource.nextPacket(); - packet = switchSourceIfNeeded(packet); - - if (fixContinuity) { - try { - continuityFixer.fixContinuity(packet); - } catch(Exception e) { - LOG.warn("Failed to fix continuity. MTSPacket probably invalid"); - return nextPacketInternal(); - } - } - return packet; - } - - private MTSPacket switchSourceIfNeeded(MTSPacket packet) throws Exception { - if(packet == null) { - // end of source has been reached, switch to the next source - closeCurrentSource(); - - downloadedSegments++; - if(listener != null && totalSegments > 0) { - int progress = (int)(downloadedSegments * 100.0 / totalSegments); - if(progress > lastProgress) { - listener.update(progress); - lastProgress = progress; - } - } - if(downloadedSegments == totalSegments) { - LOG.debug("All segments written. Queue size {}", sources.size()); - return null; - } - - return firstPacketFromNextSource(); - } - return packet; - } - - private MTSPacket firstPacketFromNextSource() throws Exception { - switchSource(); - return currentSource.nextPacket(); - } - - private void switchSource() throws InterruptedException { - currentSource = sources.take(); - } - - private void closeCurrentSource() throws Exception { - currentSource.close(); - } - - private void setProgressListener(ProgressListener listener) { - this.listener = listener; - } - - public void setTotalSegments(int total) { - this.totalSegments = total; - } - - @Override - protected void closeInternal() throws Exception { - for (MTSSource source : sources) { - source.close(); - } - } - - public static BlockingMultiMTSSourceBuilder builder() { - return new BlockingMultiMTSSourceBuilder(); - } - - public static class BlockingMultiMTSSourceBuilder { - boolean fixContinuity = false; - ProgressListener listener; - - public BlockingMultiMTSSourceBuilder setFixContinuity(boolean fixContinuity) { - this.fixContinuity = fixContinuity; - return this; - } - public BlockingMultiMTSSourceBuilder setProgressListener(ProgressListener listener) { - this.listener = listener; - return this; - } - - public BlockingMultiMTSSource build() { - BlockingMultiMTSSource source = new BlockingMultiMTSSource(fixContinuity); - if(listener != null) { - source.setProgressListener(listener); - } - return source; - } - - } -} diff --git a/common/src/main/java/org/taktik/mpegts/sources/ByteChannelMTSSource.java b/common/src/main/java/org/taktik/mpegts/sources/ByteChannelMTSSource.java deleted file mode 100644 index f6139673..00000000 --- a/common/src/main/java/org/taktik/mpegts/sources/ByteChannelMTSSource.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.taktik.mpegts.sources; - -import java.io.IOException; -import java.nio.channels.ByteChannel; - -import com.google.common.base.Preconditions; - -public class ByteChannelMTSSource extends AbstractByteChannelMTSSource { - - private ByteChannelMTSSource(ByteChannel byteChannel) throws IOException { - super(byteChannel); - } - - public static ByteChannelMTSSourceBuilder builder() { - return new ByteChannelMTSSourceBuilder(); - } - - public static class ByteChannelMTSSourceBuilder { - private ByteChannel byteChannel; - - private ByteChannelMTSSourceBuilder(){} - - public ByteChannelMTSSourceBuilder setByteChannel(ByteChannel byteChannel) { - this.byteChannel = byteChannel; - return this; - } - - public ByteChannelMTSSource build() throws IOException { - Preconditions.checkNotNull(byteChannel, "byteChannel cannot be null"); - return new ByteChannelMTSSource(byteChannel); - } - } -} diff --git a/common/src/main/java/org/taktik/mpegts/sources/ByteSourceMTSSource.java b/common/src/main/java/org/taktik/mpegts/sources/ByteSourceMTSSource.java deleted file mode 100644 index 48ac4b2e..00000000 --- a/common/src/main/java/org/taktik/mpegts/sources/ByteSourceMTSSource.java +++ /dev/null @@ -1,75 +0,0 @@ -package org.taktik.mpegts.sources; - -import com.google.common.base.Preconditions; -import com.google.common.io.ByteSource; -import org.taktik.mpegts.Constants; -import org.taktik.mpegts.MTSPacket; - -import java.io.InputStream; -import java.nio.ByteBuffer; - -public class ByteSourceMTSSource extends AbstractMTSSource implements ResettableMTSSource { - - private ByteSource byteSource; - - private InputStream stream; - - - private ByteSourceMTSSource(ByteSource byteSource) { - this.byteSource = byteSource; - } - - public static ByteSourceMTSSourceBuilder builder() { - return new ByteSourceMTSSourceBuilder(); - } - - @Override - public void reset() throws Exception { - if (stream != null) { - stream.close(); - } - stream = byteSource.openBufferedStream(); - } - - @Override - protected MTSPacket nextPacketInternal() throws Exception { - if (stream == null) { - stream = byteSource.openBufferedStream(); - } - - byte[] barray = new byte[Constants.MPEGTS_PACKET_SIZE]; - if (stream.read(barray) != Constants.MPEGTS_PACKET_SIZE) { - stream.close(); - return null; - } - - // Parse the packet - return new MTSPacket(ByteBuffer.wrap(barray)); - } - - @Override - protected void closeInternal() throws Exception { - if (stream != null) { - try (InputStream ignored = stream){ - //close - } - } - } - - public static class ByteSourceMTSSourceBuilder { - private ByteSource byteSource; - - private ByteSourceMTSSourceBuilder() { - } - - public ByteSourceMTSSource build() { - Preconditions.checkNotNull(byteSource); - return new ByteSourceMTSSource(byteSource); - } - - public ByteSourceMTSSourceBuilder setByteSource(ByteSource byteSource) { - this.byteSource = byteSource; - return this; - } - } -} diff --git a/common/src/main/java/org/taktik/mpegts/sources/ConcatenatingMTSSource.java b/common/src/main/java/org/taktik/mpegts/sources/ConcatenatingMTSSource.java deleted file mode 100644 index 5f5700a4..00000000 --- a/common/src/main/java/org/taktik/mpegts/sources/ConcatenatingMTSSource.java +++ /dev/null @@ -1,194 +0,0 @@ -package org.taktik.mpegts.sources; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.taktik.mpegts.MTSPacket; - -import java.util.Collection; -import java.util.List; - -public class ConcatenatingMTSSource extends AbstractMTSSource { - static final Logger log = LoggerFactory.getLogger("multisource"); - private List sources; - private MTSSource currentSource; - private int idx; - private boolean fixContinuity; - - private ContinuityFixer continuityFixer; - private int maxLoops; - private int currentLoop; - private boolean closeCurrentSource; - - protected ConcatenatingMTSSource(boolean fixContinuity, int maxloops, Collection sources) { - Preconditions.checkArgument(sources.size() > 0, "Must provide at least contain one source"); - Preconditions.checkArgument(maxloops != 0, "Cannot loop zero times"); - this.sources = Lists.newArrayList(sources); - this.fixContinuity = fixContinuity; - idx = 0; - currentSource = this.sources.get(0); - if (fixContinuity) { - continuityFixer = new ContinuityFixer(); - } - this.maxLoops = maxloops; - if (maxloops != 1) { - checkLoopingPossible(sources); - } - this.currentLoop = 1; - this.closeCurrentSource = false; - } - - public static MultiMTSSourceBuilder builder() { - return new MultiMTSSourceBuilder(); - } - - private void checkLoopingPossible(Collection sources) { - for (MTSSource source : sources) { - checkLoopingPossible(source); - } - } - - private void checkLoopingPossible(MTSSource source) { - if (!(source instanceof ResettableMTSSource)) { - throw new IllegalStateException("Sources must be resettable for looping"); - } - } - - @Override - protected MTSPacket nextPacketInternal() throws Exception { - if (currentSource == null) { - return null; - } - MTSPacket tsPacket = currentSource.nextPacket(); - if (tsPacket != null) { - if (fixContinuity) { - continuityFixer.fixContinuity(tsPacket); - } - return tsPacket; - } else { - // FIXME: infinite loop - nextSource(); - return nextPacket(); - } - } - - public synchronized void updateSources(List newSources) { - checkLoopingPossible(newSources); - List oldSources = this.sources; - this.sources = newSources; - for (MTSSource oldSource : oldSources) { - if (!newSources.contains(oldSource) && oldSource != currentSource) { - try { - oldSource.close(); - } catch (Exception e) { - log.error("Error closing source", e); - - } - } - } - closeCurrentSource = !newSources.contains(currentSource); - // Force next call to nextSource() to pick source 0 (first source) - idx = -1; - } - - @Override - protected synchronized void closeInternal() throws Exception { - for (MTSSource source : sources) { - source.close(); - } - if (closeCurrentSource && currentSource != null && !sources.contains(currentSource)) { - currentSource.close(); - } - } - - private synchronized void nextSource() { - if (closeCurrentSource) { - try { - currentSource.close(); - } catch (Exception e) { - log.error("Error closing source", e); - } finally { - closeCurrentSource = false; - } - } - if (fixContinuity) { - continuityFixer.nextSource(); - } - idx++; - if (idx >= sources.size()) { - currentLoop++; - if (maxLoops > 0 && currentLoop > maxLoops) { - currentSource = null; - } else { - idx = 0; - for (MTSSource source : sources) { - try { - ((ResettableMTSSource) source).reset(); - } catch (Exception e) { - e.printStackTrace(); - } - } - currentSource = sources.get(idx); - } - } else { - currentSource = sources.get(idx); - } - if (idx < sources.size()) { - log.info("Switched to source #{}", idx); - } - } - - public static class MultiMTSSourceBuilder { - boolean fixContinuity = false; - private List sources = Lists.newArrayList(); - private int maxLoops = 1; - - private MultiMTSSourceBuilder() { - } - - public MultiMTSSourceBuilder addSource(MTSSource source) { - this.sources.add(source); - return this; - } - - public MultiMTSSourceBuilder addSources(Collection sources) { - this.sources.addAll(sources); - return this; - } - - public MultiMTSSourceBuilder setSources(MTSSource... sources) { - this.sources = Lists.newArrayList(sources); - return this; - } - - public MultiMTSSourceBuilder setSources(Collection sources) { - this.sources = Lists.newArrayList(sources); - return this; - } - - public MultiMTSSourceBuilder setFixContinuity(boolean fixContinuity) { - this.fixContinuity = fixContinuity; - return this; - } - - public MultiMTSSourceBuilder loop() { - this.maxLoops = -1; - return this; - } - - public MultiMTSSourceBuilder loops(int count) { - this.maxLoops = count; - return this; - } - - public MultiMTSSourceBuilder noLoop() { - this.maxLoops = 1; - return this; - } - - public ConcatenatingMTSSource build() { - return new ConcatenatingMTSSource(fixContinuity, maxLoops, sources); - } - } -} diff --git a/common/src/main/java/org/taktik/mpegts/sources/ContinuityFixer.java b/common/src/main/java/org/taktik/mpegts/sources/ContinuityFixer.java deleted file mode 100644 index 4b2dc3ae..00000000 --- a/common/src/main/java/org/taktik/mpegts/sources/ContinuityFixer.java +++ /dev/null @@ -1,214 +0,0 @@ -package org.taktik.mpegts.sources; - -import java.nio.ByteBuffer; -import java.util.Map; - -import org.taktik.mpegts.MTSPacket; - -import com.google.common.collect.Maps; - - -/** - * This class will attempt to fix timestamp discontinuities - * when switching from one source to another. - * This should allow for smoother transitions between videos.
- * This class does 3 things: - *
    - *
  1. Rewrite the PCR to be continuous with the previous source
  2. - *
  3. Rewrite the PTS of the PES to be continuous with the previous source
  4. - *
  5. Rewrite the continuity counter to be continuous with the previous source
  6. - *
- * - * Code using this class should call {@link #fixContinuity(org.taktik.mpegts.MTSPacket)} for each source packet, - * then {@link #nextSource()} after the last packet of the current source and before the first packet of the next source. - */ -public class ContinuityFixer { - private Map pcrPackets; - private Map allPackets; - private Map ptss; - private Map lastPTSsOfPreviousSource; - private Map lastPCRsOfPreviousSource; - private Map firstPCRsOfCurrentSource; - private Map firstPTSsOfCurrentSource; - - private Map lastPacketsOfPreviousSource = Maps.newHashMap(); - private Map firstPacketsOfCurrentSource = Maps.newHashMap(); - private Map continuityFixes = Maps.newHashMap(); - - private boolean firstSource; - - - public ContinuityFixer() { - pcrPackets = Maps.newHashMap(); - allPackets = Maps.newHashMap(); - ptss = Maps.newHashMap(); - lastPTSsOfPreviousSource = Maps.newHashMap(); - lastPCRsOfPreviousSource = Maps.newHashMap(); - firstPCRsOfCurrentSource = Maps.newHashMap(); - firstPTSsOfCurrentSource = Maps.newHashMap(); - - lastPacketsOfPreviousSource = Maps.newHashMap(); - firstPacketsOfCurrentSource = Maps.newHashMap(); - continuityFixes = Maps.newHashMap(); - firstSource = true; - } - - /** - * Signals the {@link org.taktik.mpegts.sources.ContinuityFixer} that the following - * packet will be from another source. - * - * Call this method after the last packet of the current source and before the first packet of the next source. - */ - public void nextSource() { - firstPCRsOfCurrentSource.clear(); - lastPCRsOfPreviousSource.clear(); - firstPTSsOfCurrentSource.clear(); - lastPTSsOfPreviousSource.clear(); - firstPacketsOfCurrentSource.clear(); - lastPacketsOfPreviousSource.clear(); - for (MTSPacket mtsPacket : pcrPackets.values()) { - lastPCRsOfPreviousSource.put(mtsPacket.getPid(), mtsPacket.getAdaptationField().getPcr().getValue()); - } - lastPTSsOfPreviousSource.putAll(ptss); - lastPacketsOfPreviousSource.putAll(allPackets); - pcrPackets.clear(); - ptss.clear(); - allPackets.clear(); - firstSource = false; - } - - /** - * Fix the continuity of the packet. - * - * Call this method for each source packet, in order. - * - * @param tsPacket The packet to fix. - */ - public void fixContinuity(MTSPacket tsPacket) { - if(tsPacket == null) { - return; - } - - int pid = tsPacket.getPid(); - allPackets.put(pid, tsPacket); - if (!firstPacketsOfCurrentSource.containsKey(pid)) { - firstPacketsOfCurrentSource.put(pid, tsPacket); - if (!firstSource) { - MTSPacket lastPacketOfPreviousSource = lastPacketsOfPreviousSource.get(pid); - int continuityFix = lastPacketOfPreviousSource == null ? 0 : lastPacketOfPreviousSource.getContinuityCounter() - tsPacket.getContinuityCounter(); - if (tsPacket.isContainsPayload()) { - continuityFix++; - } - continuityFixes.put(pid, continuityFix); - } - } - if (!firstSource) { - tsPacket.setContinuityCounter((tsPacket.getContinuityCounter() + continuityFixes.get(pid)) % 16); - } - fixPTS(tsPacket, pid); - fixPCR(tsPacket, pid); - } - - private void fixPCR(MTSPacket tsPacket, int pid) { - if (tsPacket.isAdaptationFieldExist() && tsPacket.getAdaptationField() != null) { - if (tsPacket.getAdaptationField().isPcrFlag()) { - if (!firstPCRsOfCurrentSource.containsKey(pid)) { - firstPCRsOfCurrentSource.put(pid, tsPacket.getAdaptationField().getPcr().getValue()); - } - rewritePCR(tsPacket); - pcrPackets.put(pid, tsPacket); - } - } - } - - private void fixPTS(MTSPacket tsPacket, int pid) { - if (tsPacket.isContainsPayload()) { - ByteBuffer payload = tsPacket.getPayload(); - //System.out.println("PKT RMN " + tsPacket.getPayload().remaining()); - if (/*payload.remaining() >= 3 &&*/ ((payload.get(0) & 0xff) == 0) && ((payload.get(1) & 0xff) == 0) && ((payload.get(2) & 0xff) == 1)) { - int extension = payload.getShort(6) & 0xffff; - if ((extension & 0x80) != 0) { - // PTS is present - // TODO add payload size check to avoid indexoutofboundexception - long pts = (((payload.get(9) & 0xE)) << 29) | (((payload.getShort(10) & 0xFFFE)) << 14) | ((payload.getShort(12) & 0xFFFE) >> 1); - if (!firstPTSsOfCurrentSource.containsKey(pid)) { - firstPTSsOfCurrentSource.put(pid, pts); - } - if (!firstSource) { - long newPts = Math.round(pts + (getTimeGap(pid) / 300.0) + 100 * ((27_000_000 / 300.0) / 1_000)); - - payload.put(9, (byte) (0x20 | ((newPts & 0x1C0000000l) >> 29) | 0x1)); - payload.putShort(10, (short) (0x1 | ((newPts & 0x3FFF8000) >> 14))); - payload.putShort(12, (short) (0x1 | ((newPts & 0x7FFF) << 1))); - payload.rewind(); - pts = newPts; - } - - ptss.put(pid, pts); - } - } - } - } - - private long getTimeGap(int pid) { - // Try with PCR of the same PID - Long lastPCROfPreviousSource = lastPCRsOfPreviousSource.get(pid); - if (lastPCROfPreviousSource == null) { - lastPCROfPreviousSource = 0l; - } - Long firstPCROfCurrentSource = firstPCRsOfCurrentSource.get(pid); - if (firstPCROfCurrentSource != null) { - return lastPCROfPreviousSource - firstPCROfCurrentSource; - } - - // Try with any PCR - if (!lastPCRsOfPreviousSource.isEmpty()) { - int pcrPid = lastPCRsOfPreviousSource.keySet().iterator().next(); - lastPCROfPreviousSource = lastPCRsOfPreviousSource.get(pcrPid); - if (lastPCROfPreviousSource == null) { - lastPCROfPreviousSource = 0l; - } - firstPCROfCurrentSource = firstPCRsOfCurrentSource.get(pcrPid); - if (firstPCROfCurrentSource != null) { - return lastPCROfPreviousSource - firstPCROfCurrentSource; - } - } - - // Try with PTS of the same PID - Long lastPTSOfPreviousSource = lastPTSsOfPreviousSource.get(pid); - if (lastPTSOfPreviousSource == null) { - lastPTSOfPreviousSource = 0l; - } - - Long firstPTSofCurrentSource = firstPTSsOfCurrentSource.get(pid); - if (firstPTSofCurrentSource != null) { - return (lastPTSOfPreviousSource - firstPTSofCurrentSource) * 300; - } - - // Try with any PTS - if (!lastPTSsOfPreviousSource.isEmpty()) { - int randomPid = lastPTSsOfPreviousSource.keySet().iterator().next(); - lastPTSOfPreviousSource = lastPTSsOfPreviousSource.get(randomPid); - if (lastPTSOfPreviousSource == null) { - lastPTSOfPreviousSource = 0l; - } - - firstPTSofCurrentSource = firstPTSsOfCurrentSource.get(randomPid); - if (firstPTSofCurrentSource != null) { - return (lastPTSOfPreviousSource - firstPTSofCurrentSource) * 300; - } - } - - return 0; - } - - private void rewritePCR(MTSPacket tsPacket) { - if (firstSource) { - return; - } - long timeGap = getTimeGap(tsPacket.getPid()); - long pcr = tsPacket.getAdaptationField().getPcr().getValue(); - long newPcr = pcr + timeGap + 100 * ((27_000_000) / 1_000); - tsPacket.getAdaptationField().getPcr().setValue(newPcr); - } -} diff --git a/common/src/main/java/org/taktik/mpegts/sources/ContinuityFixingMTSSource.java b/common/src/main/java/org/taktik/mpegts/sources/ContinuityFixingMTSSource.java deleted file mode 100644 index 21e0ac9b..00000000 --- a/common/src/main/java/org/taktik/mpegts/sources/ContinuityFixingMTSSource.java +++ /dev/null @@ -1,30 +0,0 @@ -package org.taktik.mpegts.sources; - -import org.taktik.mpegts.MTSPacket; - -/** - * Decorates an MTSSource with continuity fixing. Not suitable for use with multiple different - * MTSSources as it will not reset counters when switching sources. - */ -public class ContinuityFixingMTSSource extends AbstractMTSSource { - private final ContinuityFixer continuityFixer = new ContinuityFixer(); - private final MTSSource source; - - public ContinuityFixingMTSSource(MTSSource source) { - this.source = source; - } - - @Override - protected MTSPacket nextPacketInternal() throws Exception { - MTSPacket packet = source.nextPacket(); - if (packet != null) { - continuityFixer.fixContinuity(packet); - } - return packet; - } - - @Override - protected void closeInternal() throws Exception { - source.close(); - } -} diff --git a/common/src/main/java/org/taktik/mpegts/sources/FixedBitrateMTSSource.java b/common/src/main/java/org/taktik/mpegts/sources/FixedBitrateMTSSource.java deleted file mode 100644 index ba047be9..00000000 --- a/common/src/main/java/org/taktik/mpegts/sources/FixedBitrateMTSSource.java +++ /dev/null @@ -1,30 +0,0 @@ -package org.taktik.mpegts.sources; - -import org.taktik.mpegts.MTSPacket; - -/** - * Decorate a source with a declared bitrate. - */ -public class FixedBitrateMTSSource extends AbstractMTSSource { - private final MTSSource source; - private final long bitrate; - - public FixedBitrateMTSSource(MTSSource source, long bitrate) { - this.source = source; - this.bitrate = bitrate; - } - - public long getBitrate() { - return bitrate; - } - - @Override - protected MTSPacket nextPacketInternal() throws Exception { - return source.nextPacket(); - } - - @Override - protected void closeInternal() throws Exception { - source.close(); - } -} diff --git a/common/src/main/java/org/taktik/mpegts/sources/InputStreamMTSSource.java b/common/src/main/java/org/taktik/mpegts/sources/InputStreamMTSSource.java deleted file mode 100644 index d0b29415..00000000 --- a/common/src/main/java/org/taktik/mpegts/sources/InputStreamMTSSource.java +++ /dev/null @@ -1,63 +0,0 @@ -package org.taktik.mpegts.sources; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; - -import org.taktik.mpegts.Constants; -import org.taktik.mpegts.MTSPacket; - -import com.google.common.base.Preconditions; - -public class InputStreamMTSSource extends AbstractMTSSource { - - private InputStream inputStream; - - private InputStreamMTSSource(InputStream inputStream) { - this.inputStream = inputStream; - } - - public static InputStreamMTSSourceBuilder builder() { - return new InputStreamMTSSourceBuilder(); - } - - @Override - protected MTSPacket nextPacketInternal() throws IOException { - byte[] packetData = new byte[Constants.MPEGTS_PACKET_SIZE]; - int bytesRead = 0; - while(bytesRead < Constants.MPEGTS_PACKET_SIZE) { - int bytesLeft = Constants.MPEGTS_PACKET_SIZE - bytesRead; - int length = inputStream.read(packetData, bytesRead, bytesLeft); - bytesRead += length; - if(length == -1) { - // no more bytes available - return null; - } - } - - // Parse the packet - return new MTSPacket(ByteBuffer.wrap(packetData)); - } - - @Override - protected void closeInternal() throws Exception { - inputStream.close(); - } - - public static class InputStreamMTSSourceBuilder { - private InputStream inputStream; - - private InputStreamMTSSourceBuilder() { - } - - public InputStreamMTSSourceBuilder setInputStream(InputStream inputStream) { - this.inputStream = inputStream; - return this; - } - - public InputStreamMTSSource build() { - Preconditions.checkNotNull(inputStream, "InputStream cannot be null"); - return new InputStreamMTSSource(inputStream); - } - } -} diff --git a/common/src/main/java/org/taktik/mpegts/sources/LoopingMTSSource.java b/common/src/main/java/org/taktik/mpegts/sources/LoopingMTSSource.java deleted file mode 100644 index 6cc23a18..00000000 --- a/common/src/main/java/org/taktik/mpegts/sources/LoopingMTSSource.java +++ /dev/null @@ -1,74 +0,0 @@ -package org.taktik.mpegts.sources; - -import org.taktik.mpegts.MTSPacket; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -public class LoopingMTSSource extends AbstractMTSSource{ - private ResettableMTSSource source; - private Integer maxLoops; - private long currentLoop; - - public LoopingMTSSource(ResettableMTSSource source, Integer maxLoops) { - this.source = source; - this.maxLoops = maxLoops; - currentLoop = 1; - } - - public static LoopingMTSSourceBuilder builder() { - return new LoopingMTSSourceBuilder(); - } - - @Override - protected MTSPacket nextPacketInternal() throws Exception { - MTSPacket packet = source.nextPacket(); - if (packet == null) { - currentLoop++; - if (maxLoops == null || (currentLoop <= maxLoops)) { - source.reset(); - packet = source.nextPacket(); - } - } - return packet; - } - - @Override - protected void closeInternal() throws Exception { - source.close(); - } - - public static class LoopingMTSSourceBuilder { - private ResettableMTSSource source; - private boolean fixContinuity; - private Integer maxLoops; - - private LoopingMTSSourceBuilder() { - } - - public MTSSource build() { - checkNotNull(source); - checkArgument(maxLoops == null || maxLoops > 0); - MTSSource result = new LoopingMTSSource(source, maxLoops); - if (fixContinuity) { - return new ContinuityFixingMTSSource(result); - } - return result; - } - - public LoopingMTSSourceBuilder setSource(ResettableMTSSource source) { - this.source = source; - return this; - } - - public LoopingMTSSourceBuilder setFixContinuity(boolean fixContinuity) { - this.fixContinuity = fixContinuity; - return this; - } - - public LoopingMTSSourceBuilder setMaxLoops(Integer maxLoops) { - this.maxLoops = maxLoops; - return this; - } - } -} diff --git a/common/src/main/java/org/taktik/mpegts/sources/MTSSource.java b/common/src/main/java/org/taktik/mpegts/sources/MTSSource.java deleted file mode 100644 index 32c3057e..00000000 --- a/common/src/main/java/org/taktik/mpegts/sources/MTSSource.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.taktik.mpegts.sources; - -import org.taktik.mpegts.MTSPacket; - -public interface MTSSource extends AutoCloseable { - MTSPacket nextPacket() throws Exception; -} diff --git a/common/src/main/java/org/taktik/mpegts/sources/MTSSources.java b/common/src/main/java/org/taktik/mpegts/sources/MTSSources.java deleted file mode 100644 index 5713254c..00000000 --- a/common/src/main/java/org/taktik/mpegts/sources/MTSSources.java +++ /dev/null @@ -1,80 +0,0 @@ -package org.taktik.mpegts.sources; - - -import com.google.common.io.ByteSource; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.nio.channels.ByteChannel; -import java.nio.channels.FileChannel; -import java.nio.channels.SeekableByteChannel; - -public class MTSSources { - public static MTSSource fromSources(MTSSource... sources) { - return fromSources(1, false, sources); - } - - public static MTSSource fromSources(int loops, MTSSource... sources) { - return fromSources(loops, false, sources); - } - - public static MTSSource fromSources(int loops, boolean fixContinuity, MTSSource... sources) { - return ConcatenatingMTSSource.builder() - .setFixContinuity(fixContinuity) - .setSources(sources) - .loops(loops) - .build(); - } - - public static MTSSource from(ByteChannel channel) throws IOException { - return ByteChannelMTSSource.builder() - .setByteChannel(channel) - .build(); - } - - public static ResettableMTSSource from(SeekableByteChannel channel) throws IOException { - return SeekableByteChannelMTSSource.builder() - .setByteChannel(channel) - .build(); - } - - public static ResettableMTSSource from(File file) throws IOException { - return SeekableByteChannelMTSSource.builder() - .setByteChannel(FileChannel.open(file.toPath())) - .build(); - } - - public static ResettableMTSSource from(ByteSource byteSource) throws IOException { - return ByteSourceMTSSource.builder() - .setByteSource(byteSource) - .build(); - } - - public static MTSSource from(InputStream inputStream) throws IOException { - return InputStreamMTSSource.builder() - .setInputStream(inputStream) - .build(); - } - - public static MTSSource loop(ResettableMTSSource source) { - return LoopingMTSSource.builder() - .setSource(source) - .build(); - } - - public static MTSSource loop(ResettableMTSSource source, int maxLoops) { - return LoopingMTSSource.builder() - .setSource(source) - .setMaxLoops(maxLoops) - .build(); - } - - public static MTSSource multiplexing(MTSSource... sources) { - return MultiplexingMTSSource.builder() - .setSources(sources) - .setFixContinuity(true) - .build(); - } - -} diff --git a/common/src/main/java/org/taktik/mpegts/sources/MultiplexingMTSSource.java b/common/src/main/java/org/taktik/mpegts/sources/MultiplexingMTSSource.java deleted file mode 100644 index 3952b17c..00000000 --- a/common/src/main/java/org/taktik/mpegts/sources/MultiplexingMTSSource.java +++ /dev/null @@ -1,100 +0,0 @@ -package org.taktik.mpegts.sources; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import org.taktik.mpegts.MTSPacket; - -import java.util.Collection; -import java.util.List; - -/** - * An MTSSource using a simple round-robin packet multiplexing strategy. - */ -public class MultiplexingMTSSource extends AbstractMTSSource { - private final boolean fixContinuity; - private final List sources; - - private ContinuityFixer continuityFixer; - private int nextSource = 0; - - public MultiplexingMTSSource(boolean fixContinuity, Collection sources) { - Preconditions.checkArgument(sources.size() > 0, "Must provide at least contain one source"); - this.fixContinuity = fixContinuity; - this.sources = Lists.newArrayList(sources); - if (fixContinuity) { - continuityFixer = new ContinuityFixer(); - } - } - - public static MultiplexingMTSSourceBuilder builder() { - return new MultiplexingMTSSourceBuilder(); - } - - @Override - protected MTSPacket nextPacketInternal() throws Exception { - MTSPacket packet = sources.get(nextSource).nextPacket(); - if (packet != null) { - if (fixContinuity) { - continuityFixer.fixContinuity(packet); - } - return packet; - } else { - // FIXME: infinite loop - nextSource(); - return nextPacket(); - } - } - - @Override - protected synchronized void closeInternal() throws Exception { - for (MTSSource source : sources) { - source.close(); - } - } - - private synchronized void nextSource() { - nextSource++; - if (nextSource == sources.size()) { - nextSource = 0; - } - } - - public static class MultiplexingMTSSourceBuilder { - boolean fixContinuity = false; - private List sources = Lists.newArrayList(); - - public MultiplexingMTSSourceBuilder addSource(MTSSource source) { - sources.add(source); - return this; - } - - public MultiplexingMTSSourceBuilder addSources(MTSSource... sources) { - this.sources.addAll(Lists.newArrayList(sources)); - return this; - } - - public MultiplexingMTSSourceBuilder addSources(Collection sources) { - this.sources.addAll(sources); - return this; - } - - public MultiplexingMTSSourceBuilder setSources(MTSSource... sources) { - this.sources = Lists.newArrayList(sources); - return this; - } - - public MultiplexingMTSSourceBuilder setSources(Collection sources) { - this.sources = Lists.newArrayList(sources); - return this; - } - - public MultiplexingMTSSourceBuilder setFixContinuity(boolean fixContinuity) { - this.fixContinuity = fixContinuity; - return this; - } - - public MultiplexingMTSSource build() { - return new MultiplexingMTSSource(fixContinuity, sources); - } - } -} diff --git a/common/src/main/java/org/taktik/mpegts/sources/NullPacketSource.java b/common/src/main/java/org/taktik/mpegts/sources/NullPacketSource.java deleted file mode 100644 index c873c9d9..00000000 --- a/common/src/main/java/org/taktik/mpegts/sources/NullPacketSource.java +++ /dev/null @@ -1,34 +0,0 @@ -package org.taktik.mpegts.sources; - -import org.taktik.mpegts.Constants; -import org.taktik.mpegts.MTSPacket; - -import java.nio.ByteBuffer; -import java.util.Arrays; - -public class NullPacketSource extends AbstractMTSSource { - - public NullPacketSource() { - } - - @Override - protected MTSPacket nextPacketInternal() throws Exception { - byte[] buf = new byte[Constants.MPEGTS_PACKET_SIZE]; - - // payload (null bytes) - Arrays.fill(buf, (byte) 0xff); - - // header - buf[0] = 0x47; // sync byte - buf[1] = 0x1f; // PID high - buf[2] = (byte) 0xff; // PID low - buf[3] = 0x10; // adaptation control and continuity - - return new MTSPacket(ByteBuffer.wrap(buf)); - } - - @Override - protected void closeInternal() throws Exception { - // does nothing - } -} diff --git a/common/src/main/java/org/taktik/mpegts/sources/ResettableMTSSource.java b/common/src/main/java/org/taktik/mpegts/sources/ResettableMTSSource.java deleted file mode 100644 index b94df9b4..00000000 --- a/common/src/main/java/org/taktik/mpegts/sources/ResettableMTSSource.java +++ /dev/null @@ -1,5 +0,0 @@ -package org.taktik.mpegts.sources; - -public interface ResettableMTSSource extends MTSSource { - void reset() throws Exception; -} diff --git a/common/src/main/java/org/taktik/mpegts/sources/SeekableByteChannelMTSSource.java b/common/src/main/java/org/taktik/mpegts/sources/SeekableByteChannelMTSSource.java deleted file mode 100644 index f6a09456..00000000 --- a/common/src/main/java/org/taktik/mpegts/sources/SeekableByteChannelMTSSource.java +++ /dev/null @@ -1,39 +0,0 @@ -package org.taktik.mpegts.sources; - -import java.io.IOException; -import java.nio.channels.SeekableByteChannel; - -import com.google.common.base.Preconditions; - -public class SeekableByteChannelMTSSource extends AbstractByteChannelMTSSource implements ResettableMTSSource { - - private SeekableByteChannelMTSSource(SeekableByteChannel byteChannel) throws IOException { - super(byteChannel); - } - - public static SeekableByteChannelMTSSourceBuilder builder() { - return new SeekableByteChannelMTSSourceBuilder(); - } - - @Override - public void reset() throws IOException { - byteChannel.position(0); - fillBuffer(); - } - - public static class SeekableByteChannelMTSSourceBuilder { - private SeekableByteChannel byteChannel; - - private SeekableByteChannelMTSSourceBuilder(){} - - public SeekableByteChannelMTSSourceBuilder setByteChannel(SeekableByteChannel byteChannel) { - this.byteChannel = byteChannel; - return this; - } - - public SeekableByteChannelMTSSource build() throws IOException { - Preconditions.checkNotNull(byteChannel, "byteChannel cannot be null"); - return new SeekableByteChannelMTSSource(byteChannel); - } - } -}