muxer.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package mpegts
  2. import (
  3. "encoding/binary"
  4. "github.com/AlexxIT/go2rtc/pkg/bits"
  5. "github.com/AlexxIT/go2rtc/pkg/h264/annexb"
  6. )
  7. type Muxer struct {
  8. pes map[uint16]*PES
  9. }
  10. func NewMuxer() *Muxer {
  11. return &Muxer{
  12. pes: map[uint16]*PES{},
  13. }
  14. }
  15. func (m *Muxer) AddTrack(streamType byte) (pid uint16) {
  16. pes := &PES{StreamType: streamType}
  17. // Audio streams (0xC0-0xDF), Video streams (0xE0-0xEF)
  18. switch streamType {
  19. case StreamTypeH264, StreamTypeH265:
  20. pes.StreamID = 0xE0
  21. case StreamTypeAAC, StreamTypePCMATapo:
  22. pes.StreamID = 0xC0
  23. }
  24. pid = pes0PID + uint16(len(m.pes))
  25. m.pes[pid] = pes
  26. return
  27. }
  28. func (m *Muxer) GetHeader() []byte {
  29. bw := bits.NewWriter(nil)
  30. m.writePAT(bw)
  31. m.writePMT(bw)
  32. return bw.Bytes()
  33. }
  34. // GetPayload - safe to run concurently with different pid
  35. func (m *Muxer) GetPayload(pid uint16, timestamp uint32, payload []byte) []byte {
  36. pes := m.pes[pid]
  37. switch pes.StreamType {
  38. case StreamTypeH264, StreamTypeH265:
  39. payload = annexb.DecodeAVCCWithAUD(payload)
  40. }
  41. if pes.Timestamp != 0 {
  42. pes.PTS += timestamp - pes.Timestamp
  43. }
  44. pes.Timestamp = timestamp
  45. // min header size (3 byte) + adv header size (PES)
  46. size := 3 + 5 + len(payload)
  47. b := make([]byte, 6+3+5)
  48. b[0], b[1], b[2] = 0, 0, 1 // Packet start code prefix
  49. b[3] = pes.StreamID // Stream ID
  50. // PES Packet length (zero value OK for video)
  51. if size <= 0xFFFF {
  52. binary.BigEndian.PutUint16(b[4:], uint16(size))
  53. }
  54. // Optional PES header:
  55. b[6] = 0x80 // Marker bits (binary)
  56. b[7] = 0x80 // PTS indicator
  57. b[8] = 5 // PES header length
  58. WriteTime(b[9:], pes.PTS)
  59. pes.Payload = append(b, payload...)
  60. pes.Size = 1 // set PUSI in first PES
  61. if pes.wr == nil {
  62. pes.wr = bits.NewWriter(nil)
  63. } else {
  64. pes.wr.Reset()
  65. }
  66. for len(pes.Payload) > 0 {
  67. m.writePES(pes.wr, pid, pes)
  68. pes.Sequence++
  69. pes.Size = 0
  70. }
  71. return pes.wr.Bytes()
  72. }
  73. const patPID = 0
  74. const pmtPID = 0x1000
  75. const pes0PID = 0x100
  76. func (m *Muxer) writePAT(wr *bits.Writer) {
  77. m.writeHeader(wr, patPID)
  78. i := wr.Len() + 1 // start for CRC32
  79. m.writePSIHeader(wr, 0, 4)
  80. wr.WriteUint16(1) // Program num
  81. wr.WriteBits8(0b111, 3) // Reserved bits (all to 1)
  82. wr.WriteBits16(pmtPID, 13) // Program map PID
  83. crc := checksum(wr.Bytes()[i:])
  84. wr.WriteBytes(byte(crc), byte(crc>>8), byte(crc>>16), byte(crc>>24)) // CRC32 (little endian)
  85. m.WriteTail(wr)
  86. }
  87. func (m *Muxer) writePMT(wr *bits.Writer) {
  88. m.writeHeader(wr, pmtPID)
  89. i := wr.Len() + 1 // start for CRC32
  90. m.writePSIHeader(wr, 2, 4+uint16(len(m.pes))*5) // 4 bytes below + 5 bytes each PES
  91. wr.WriteBits8(0b111, 3) // Reserved bits (all to 1)
  92. wr.WriteBits16(0x1FFF, 13) // Program map PID (not used)
  93. wr.WriteBits8(0b1111, 4) // Reserved bits (all to 1)
  94. wr.WriteBits8(0, 2) // Program info length unused bits (all to 0)
  95. wr.WriteBits16(0, 10) // Program info length
  96. for pid := uint16(pes0PID); ; pid++ {
  97. pes, ok := m.pes[pid]
  98. if !ok {
  99. break
  100. }
  101. wr.WriteByte(pes.StreamType) // Stream type
  102. wr.WriteBits8(0b111, 3) // Reserved bits (all to 1)
  103. wr.WriteBits16(pid, 13) // Elementary PID
  104. wr.WriteBits8(0b1111, 4) // Reserved bits (all to 1)
  105. wr.WriteBits(0, 2) // ES Info length unused bits
  106. wr.WriteBits16(0, 10) // ES Info length
  107. }
  108. crc := checksum(wr.Bytes()[i:])
  109. wr.WriteBytes(byte(crc), byte(crc>>8), byte(crc>>16), byte(crc>>24)) // CRC32 (little endian)
  110. m.WriteTail(wr)
  111. }
  112. func (m *Muxer) writePES(wr *bits.Writer, pid uint16, pes *PES) {
  113. const flagPUSI = 0b01000000_00000000
  114. const flagAdaptation = 0b00100000
  115. const flagPayload = 0b00010000
  116. wr.WriteByte(SyncByte)
  117. if pes.Size != 0 {
  118. pid |= flagPUSI // Payload unit start indicator (PUSI)
  119. }
  120. wr.WriteUint16(pid)
  121. counter := byte(pes.Sequence) & 0xF
  122. if size := len(pes.Payload); size < PacketSize-4 {
  123. wr.WriteByte(flagAdaptation | flagPayload | counter) // adaptation + payload
  124. // for 183 payload will be zero
  125. adSize := PacketSize - 4 - 1 - byte(size)
  126. wr.WriteByte(adSize)
  127. wr.WriteBytes(make([]byte, adSize)...)
  128. wr.WriteBytes(pes.Payload...)
  129. pes.Payload = nil
  130. } else {
  131. wr.WriteByte(flagPayload | counter) // only payload
  132. wr.WriteBytes(pes.Payload[:PacketSize-4]...)
  133. pes.Payload = pes.Payload[PacketSize-4:]
  134. }
  135. }
  136. func (m *Muxer) writeHeader(wr *bits.Writer, pid uint16) {
  137. wr.WriteByte(SyncByte)
  138. wr.WriteBit(0) // Transport error indicator (TEI)
  139. wr.WriteBit(1) // Payload unit start indicator (PUSI)
  140. wr.WriteBit(0) // Transport priority
  141. wr.WriteBits16(pid, 13) // PID
  142. wr.WriteBits8(0, 2) // Transport scrambling control (TSC)
  143. wr.WriteBit(0) // Adaptation field
  144. wr.WriteBit(1) // Payload
  145. wr.WriteBits8(0, 4) // Continuity counter
  146. }
  147. func (m *Muxer) writePSIHeader(wr *bits.Writer, tableID byte, size uint16) {
  148. wr.WriteByte(0) // Pointer field
  149. wr.WriteByte(tableID) // Table ID
  150. wr.WriteBit(1) // Section syntax indicator
  151. wr.WriteBit(0) // Private bit
  152. wr.WriteBits8(0b11, 2) // Reserved bits (all to 1)
  153. wr.WriteBits8(0, 2) // Section length unused bits (all to 0)
  154. wr.WriteBits16(5+size+4, 10) // Section length (5 bytes below + content + 4 bytes CRC32)
  155. wr.WriteUint16(1) // Table ID extension
  156. wr.WriteBits8(0b11, 2) // Reserved bits (all to 1)
  157. wr.WriteBits8(0, 5) // Version number
  158. wr.WriteBit(1) // Current/next indicator
  159. wr.WriteByte(0) // Section number
  160. wr.WriteByte(0) // Last section number
  161. }
  162. func (m *Muxer) WriteTail(wr *bits.Writer) {
  163. size := PacketSize - wr.Len()%PacketSize
  164. wr.WriteBytes(make([]byte, size)...)
  165. }
  166. func WriteTime(b []byte, t uint32) {
  167. _ = b[4] // bounds
  168. const onlyPTS = 0x20
  169. b[0] = onlyPTS | byte(t>>(32-3)) | 1
  170. b[1] = byte(t >> (24 - 2))
  171. b[2] = byte(t>>(16-2)) | 1
  172. b[3] = byte(t >> (8 - 1))
  173. b[4] = byte(t<<1) | 1 // t>>(0-1)
  174. }