demuxer.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. package mpegts
  2. import (
  3. "bytes"
  4. "errors"
  5. "io"
  6. "github.com/AlexxIT/go2rtc/pkg/aac"
  7. "github.com/AlexxIT/go2rtc/pkg/bits"
  8. "github.com/AlexxIT/go2rtc/pkg/h264/annexb"
  9. "github.com/pion/rtp"
  10. )
  11. type Demuxer struct {
  12. buf [PacketSize]byte // total buf
  13. byte byte // current byte
  14. bits byte // bits left in byte
  15. pos byte // current pos in buf
  16. end byte // end position
  17. pmtID uint16 // Program Map Table (PMT) PID
  18. pes map[uint16]*PES
  19. }
  20. func NewDemuxer() *Demuxer {
  21. return &Demuxer{}
  22. }
  23. const skipRead = 0xFF
  24. func (d *Demuxer) ReadPacket(rd io.Reader) (*rtp.Packet, error) {
  25. for {
  26. if d.pos != skipRead {
  27. if _, err := io.ReadFull(rd, d.buf[:]); err != nil {
  28. return nil, err
  29. }
  30. }
  31. pid, start, err := d.readPacketHeader()
  32. if err != nil {
  33. return nil, err
  34. }
  35. if d.pes == nil {
  36. switch pid {
  37. case 0: // PAT ID
  38. d.readPAT() // PAT: Program Association Table
  39. case d.pmtID:
  40. d.readPMT() // PMT : Program Map Table
  41. pkt := &rtp.Packet{
  42. Payload: make([]byte, 0, len(d.pes)),
  43. }
  44. for _, pes := range d.pes {
  45. pkt.Payload = append(pkt.Payload, pes.StreamType)
  46. }
  47. return pkt, nil
  48. }
  49. continue
  50. }
  51. if pkt := d.readPES(pid, start); pkt != nil {
  52. return pkt, nil
  53. }
  54. }
  55. }
  56. func (d *Demuxer) readPacketHeader() (pid uint16, start bool, err error) {
  57. d.reset()
  58. sb := d.readByte() // Sync byte
  59. if sb != SyncByte {
  60. return 0, false, errors.New("mpegts: wrong sync byte")
  61. }
  62. _ = d.readBit() // Transport error indicator (TEI)
  63. pusi := d.readBit() // Payload unit start indicator (PUSI)
  64. _ = d.readBit() // Transport priority
  65. pid = d.readBits16(13) // PID
  66. _ = d.readBits(2) // Transport scrambling control (TSC)
  67. af := d.readBit() // Adaptation field
  68. _ = d.readBit() // Payload
  69. _ = d.readBits(4) // Continuity counter
  70. if af != 0 {
  71. adSize := d.readByte() // Adaptation field length
  72. if adSize > PacketSize-6 {
  73. return 0, false, errors.New("mpegts: wrong adaptation size")
  74. }
  75. d.skip(adSize)
  76. }
  77. return pid, pusi != 0, nil
  78. }
  79. func (d *Demuxer) skip(i byte) {
  80. d.pos += i
  81. }
  82. func (d *Demuxer) readBytes(i byte) []byte {
  83. d.pos += i
  84. return d.buf[d.pos-i : d.pos]
  85. }
  86. func (d *Demuxer) readPSIHeader() {
  87. // https://en.wikipedia.org/wiki/Program-specific_information#Table_Sections
  88. pointer := d.readByte() // Pointer field
  89. d.skip(pointer) // Pointer filler bytes
  90. _ = d.readByte() // Table ID
  91. _ = d.readBit() // Section syntax indicator
  92. _ = d.readBit() // Private bit
  93. _ = d.readBits(2) // Reserved bits
  94. _ = d.readBits(2) // Section length unused bits
  95. size := d.readBits(10) // Section length
  96. d.setSize(byte(size))
  97. _ = d.readBits(16) // Table ID extension
  98. _ = d.readBits(2) // Reserved bits
  99. _ = d.readBits(5) // Version number
  100. _ = d.readBit() // Current/next indicator
  101. _ = d.readByte() // Section number
  102. _ = d.readByte() // Last section number
  103. }
  104. // ReadPAT (Program Association Table)
  105. func (d *Demuxer) readPAT() {
  106. // https://en.wikipedia.org/wiki/Program-specific_information#PAT_(Program_Association_Table)
  107. d.readPSIHeader()
  108. const CRCSize = 4
  109. for d.left() > CRCSize {
  110. num := d.readBits(16) // Program num
  111. _ = d.readBits(3) // Reserved bits
  112. pid := d.readBits16(13) // Program map PID
  113. if num != 0 {
  114. d.pmtID = pid
  115. }
  116. }
  117. d.skip(4) // CRC32
  118. }
  119. // ReadPMT (Program map specific data)
  120. func (d *Demuxer) readPMT() {
  121. // https://en.wikipedia.org/wiki/Program-specific_information#PMT_(Program_map_specific_data)
  122. d.readPSIHeader()
  123. _ = d.readBits(3) // Reserved bits
  124. _ = d.readBits(13) // PCR PID
  125. _ = d.readBits(4) // Reserved bits
  126. _ = d.readBits(2) // Program info length unused bits
  127. size := d.readBits(10) // Program info length
  128. d.skip(byte(size))
  129. d.pes = map[uint16]*PES{}
  130. const CRCSize = 4
  131. for d.left() > CRCSize {
  132. streamType := d.readByte() // Stream type
  133. _ = d.readBits(3) // Reserved bits
  134. pid := d.readBits16(13) // Elementary PID
  135. _ = d.readBits(4) // Reserved bits
  136. _ = d.readBits(2) // ES Info length unused bits
  137. size = d.readBits(10) // ES Info length
  138. info := d.readBytes(byte(size))
  139. if streamType == StreamTypePrivate && bytes.HasPrefix(info, opusInfo) {
  140. streamType = StreamTypePrivateOPUS
  141. }
  142. d.pes[pid] = &PES{StreamType: streamType}
  143. }
  144. d.skip(4) // CRC32
  145. }
  146. func (d *Demuxer) readPES(pid uint16, start bool) *rtp.Packet {
  147. pes := d.pes[pid]
  148. if pes == nil {
  149. return nil
  150. }
  151. // if new payload beging
  152. if start {
  153. if len(pes.Payload) != 0 {
  154. d.pos = skipRead
  155. return pes.GetPacket() // finish previous packet
  156. }
  157. // https://en.wikipedia.org/wiki/Packetized_elementary_stream
  158. // Packet start code prefix
  159. if d.readByte() != 0 || d.readByte() != 0 || d.readByte() != 1 {
  160. return nil
  161. }
  162. pes.StreamID = d.readByte() // Stream id
  163. packetSize := d.readBits16(16) // PES Packet length
  164. _ = d.readBits(2) // Marker bits
  165. _ = d.readBits(2) // Scrambling control
  166. _ = d.readBit() // Priority
  167. _ = d.readBit() // Data alignment indicator
  168. _ = d.readBit() // Copyright
  169. _ = d.readBit() // Original or Copy
  170. ptsi := d.readBit() // PTS indicator
  171. dtsi := d.readBit() // DTS indicator
  172. _ = d.readBit() // ESCR flag
  173. _ = d.readBit() // ES rate flag
  174. _ = d.readBit() // DSM trick mode flag
  175. _ = d.readBit() // Additional copy info flag
  176. _ = d.readBit() // CRC flag
  177. _ = d.readBit() // extension flag
  178. headerSize := d.readByte() // PES header length
  179. if packetSize != 0 {
  180. packetSize -= uint16(3 + headerSize)
  181. }
  182. if ptsi != 0 {
  183. pes.PTS = d.readTime()
  184. headerSize -= 5
  185. } else {
  186. pes.PTS = 0
  187. }
  188. if dtsi != 0 {
  189. pes.DTS = d.readTime()
  190. headerSize -= 5
  191. } else {
  192. pes.DTS = 0
  193. }
  194. d.skip(headerSize)
  195. pes.SetBuffer(packetSize, d.bytes())
  196. } else {
  197. pes.AppendBuffer(d.bytes())
  198. }
  199. if pes.Size != 0 && len(pes.Payload) >= pes.Size {
  200. return pes.GetPacket() // finish current packet
  201. }
  202. return nil
  203. }
  204. func (d *Demuxer) reset() {
  205. d.pos = 0
  206. d.end = PacketSize
  207. d.bits = 0
  208. }
  209. //goland:noinspection GoStandardMethods
  210. func (d *Demuxer) readByte() byte {
  211. if d.bits != 0 {
  212. return byte(d.readBits(8))
  213. }
  214. b := d.buf[d.pos]
  215. d.pos++
  216. return b
  217. }
  218. func (d *Demuxer) readBit() byte {
  219. if d.bits == 0 {
  220. d.byte = d.readByte()
  221. d.bits = 7
  222. } else {
  223. d.bits--
  224. }
  225. return (d.byte >> d.bits) & 0b1
  226. }
  227. func (d *Demuxer) readBits(n byte) (res uint32) {
  228. for i := n - 1; i != 255; i-- {
  229. res |= uint32(d.readBit()) << i
  230. }
  231. return
  232. }
  233. func (d *Demuxer) readBits16(n byte) (res uint16) {
  234. for i := n - 1; i != 255; i-- {
  235. res |= uint16(d.readBit()) << i
  236. }
  237. return
  238. }
  239. func (d *Demuxer) readTime() uint32 {
  240. // https://en.wikipedia.org/wiki/Packetized_elementary_stream
  241. // xxxxAAAx BBBBBBBB BBBBBBBx CCCCCCCC CCCCCCCx
  242. _ = d.readBits(4) // 0010b or 0011b or 0001b
  243. ts := d.readBits(3) << 30
  244. _ = d.readBits(1) // 1b
  245. ts |= d.readBits(15) << 15
  246. _ = d.readBits(1) // 1b
  247. ts |= d.readBits(15)
  248. _ = d.readBits(1) // 1b
  249. return ts
  250. }
  251. func (d *Demuxer) bytes() []byte {
  252. return d.buf[d.pos:PacketSize]
  253. }
  254. func (d *Demuxer) left() byte {
  255. return d.end - d.pos
  256. }
  257. func (d *Demuxer) setSize(size byte) {
  258. d.end = d.pos + size
  259. }
  260. const (
  261. PacketSize = 188
  262. SyncByte = 0x47 // Uppercase G
  263. ClockRate = 90000 // fixed clock rate for PTS/DTS of any type
  264. )
  265. // https://en.wikipedia.org/wiki/Program-specific_information#Elementary_stream_types
  266. const (
  267. StreamTypeMetadata = 0 // Reserved
  268. StreamTypePrivate = 0x06 // PCMU or PCMA or FLAC from FFmpeg
  269. StreamTypeAAC = 0x0F
  270. StreamTypeH264 = 0x1B
  271. StreamTypeH265 = 0x24
  272. StreamTypePCMATapo = 0x90
  273. StreamTypePrivateOPUS = 0xEB
  274. )
  275. // PES - Packetized Elementary Stream
  276. type PES struct {
  277. StreamID byte // from each PES header
  278. StreamType byte // from PMT table
  279. Sequence uint16 // manual
  280. Timestamp uint32 // manual
  281. PTS uint32 // from extra header, always 90000Hz
  282. DTS uint32
  283. Payload []byte // from PES body
  284. Size int // from PES header, can be 0
  285. wr *bits.Writer
  286. }
  287. func (p *PES) SetBuffer(size uint16, b []byte) {
  288. p.Payload = make([]byte, 0, size)
  289. p.Payload = append(p.Payload, b...)
  290. p.Size = int(size)
  291. }
  292. func (p *PES) AppendBuffer(b []byte) {
  293. p.Payload = append(p.Payload, b...)
  294. }
  295. func (p *PES) GetPacket() (pkt *rtp.Packet) {
  296. switch p.StreamType {
  297. case StreamTypeH264, StreamTypeH265:
  298. pkt = &rtp.Packet{
  299. Header: rtp.Header{
  300. PayloadType: p.StreamType,
  301. },
  302. Payload: annexb.EncodeToAVCC(p.Payload),
  303. }
  304. if p.DTS != 0 {
  305. pkt.Timestamp = p.DTS
  306. // wrong place for CTS, but we don't have another one
  307. pkt.ExtensionProfile = uint16(p.PTS - p.DTS)
  308. } else {
  309. pkt.Timestamp = p.PTS
  310. }
  311. case StreamTypeAAC:
  312. p.Sequence++
  313. pkt = &rtp.Packet{
  314. Header: rtp.Header{
  315. Version: 2,
  316. Marker: true,
  317. PayloadType: p.StreamType,
  318. SequenceNumber: p.Sequence,
  319. Timestamp: p.PTS,
  320. //Timestamp: p.Timestamp,
  321. },
  322. Payload: aac.ADTStoRTP(p.Payload),
  323. }
  324. //p.Timestamp += aac.RTPTimeSize(pkt.Payload) // update next timestamp!
  325. case StreamTypePCMATapo:
  326. p.Sequence++
  327. pkt = &rtp.Packet{
  328. Header: rtp.Header{
  329. Version: 2,
  330. Marker: true,
  331. PayloadType: p.StreamType,
  332. SequenceNumber: p.Sequence,
  333. Timestamp: p.PTS,
  334. //Timestamp: p.Timestamp,
  335. },
  336. Payload: p.Payload,
  337. }
  338. //p.Timestamp += uint32(len(p.Payload)) // update next timestamp!
  339. case StreamTypePrivateOPUS:
  340. p.Sequence++
  341. pkt = &rtp.Packet{
  342. Header: rtp.Header{
  343. Version: 2,
  344. Marker: true,
  345. PayloadType: p.StreamType,
  346. SequenceNumber: p.Sequence,
  347. Timestamp: p.PTS,
  348. },
  349. }
  350. pkt.Payload, p.Payload = CutOPUSPacket(p.Payload)
  351. p.PTS += opusDT
  352. return
  353. }
  354. p.Payload = nil
  355. return
  356. }