track.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. package core
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "github.com/pion/rtp"
  6. )
  7. var ErrCantGetTrack = errors.New("can't get track")
  8. type Receiver struct {
  9. Node
  10. // Deprecated: should be removed
  11. Media *Media `json:"-"`
  12. // Deprecated: should be removed
  13. ID byte `json:"-"` // Channel for RTSP, PayloadType for MPEG-TS
  14. Bytes int `json:"bytes,omitempty"`
  15. Packets int `json:"packets,omitempty"`
  16. }
  17. func NewReceiver(media *Media, codec *Codec) *Receiver {
  18. r := &Receiver{
  19. Node: Node{id: NewID(), Codec: codec},
  20. Media: media,
  21. }
  22. r.Input = func(packet *Packet) {
  23. r.Bytes += len(packet.Payload)
  24. r.Packets++
  25. for _, child := range r.childs {
  26. child.Input(packet)
  27. }
  28. }
  29. return r
  30. }
  31. // Deprecated: should be removed
  32. func (r *Receiver) WriteRTP(packet *rtp.Packet) {
  33. r.Input(packet)
  34. }
  35. // Deprecated: should be removed
  36. func (r *Receiver) Senders() []*Sender {
  37. if len(r.childs) > 0 {
  38. return []*Sender{{}}
  39. } else {
  40. return nil
  41. }
  42. }
  43. // Deprecated: should be removed
  44. func (r *Receiver) Replace(target *Receiver) {
  45. MoveNode(&target.Node, &r.Node)
  46. }
  47. func (r *Receiver) Close() {
  48. r.Node.Close()
  49. }
  50. type Sender struct {
  51. Node
  52. // Deprecated:
  53. Media *Media `json:"-"`
  54. // Deprecated:
  55. Handler HandlerFunc `json:"-"`
  56. Bytes int `json:"bytes,omitempty"`
  57. Packets int `json:"packets,omitempty"`
  58. Drops int `json:"drops,omitempty"`
  59. buf chan *Packet
  60. done chan struct{}
  61. }
  62. func NewSender(media *Media, codec *Codec) *Sender {
  63. var bufSize uint16
  64. if GetKind(codec.Name) == KindVideo {
  65. if codec.IsRTP() {
  66. // in my tests 40Mbit/s 4K-video can generate up to 1500 items
  67. // for the h264.RTPDepay => RTPPay queue
  68. bufSize = 4096
  69. } else {
  70. bufSize = 64
  71. }
  72. } else {
  73. bufSize = 128
  74. }
  75. buf := make(chan *Packet, bufSize)
  76. s := &Sender{
  77. Node: Node{id: NewID(), Codec: codec},
  78. Media: media,
  79. buf: buf,
  80. }
  81. s.Input = func(packet *Packet) {
  82. // writing to nil chan - OK, writing to closed chan - panic
  83. s.mu.Lock()
  84. select {
  85. case s.buf <- packet:
  86. s.Bytes += len(packet.Payload)
  87. s.Packets++
  88. default:
  89. s.Drops++
  90. }
  91. s.mu.Unlock()
  92. }
  93. s.Output = func(packet *Packet) {
  94. s.Handler(packet)
  95. }
  96. return s
  97. }
  98. // Deprecated: should be removed
  99. func (s *Sender) HandleRTP(parent *Receiver) {
  100. s.WithParent(parent)
  101. s.Start()
  102. }
  103. // Deprecated: should be removed
  104. func (s *Sender) Bind(parent *Receiver) {
  105. s.WithParent(parent)
  106. }
  107. func (s *Sender) WithParent(parent *Receiver) *Sender {
  108. s.Node.WithParent(&parent.Node)
  109. return s
  110. }
  111. func (s *Sender) Start() {
  112. s.mu.Lock()
  113. defer s.mu.Unlock()
  114. if s.buf == nil || s.done != nil {
  115. return
  116. }
  117. s.done = make(chan struct{})
  118. go func() {
  119. for packet := range s.buf {
  120. s.Output(packet)
  121. }
  122. close(s.done)
  123. }()
  124. }
  125. func (s *Sender) Wait() {
  126. if done := s.done; s.done != nil {
  127. <-done
  128. }
  129. }
  130. func (s *Sender) State() string {
  131. if s.buf == nil {
  132. return "closed"
  133. }
  134. if s.done == nil {
  135. return "new"
  136. }
  137. return "connected"
  138. }
  139. func (s *Sender) Close() {
  140. // close buffer if exists
  141. if buf := s.buf; buf != nil {
  142. s.buf = nil
  143. defer close(buf)
  144. }
  145. s.Node.Close()
  146. }
  147. func (r *Receiver) MarshalJSON() ([]byte, error) {
  148. v := struct {
  149. ID uint32 `json:"id"`
  150. Codec *Codec `json:"codec"`
  151. Childs []uint32 `json:"childs,omitempty"`
  152. Bytes int `json:"bytes,omitempty"`
  153. Packets int `json:"packets,omitempty"`
  154. }{
  155. ID: r.Node.id,
  156. Codec: r.Node.Codec,
  157. Bytes: r.Bytes,
  158. Packets: r.Packets,
  159. }
  160. for _, child := range r.childs {
  161. v.Childs = append(v.Childs, child.id)
  162. }
  163. return json.Marshal(v)
  164. }
  165. func (s *Sender) MarshalJSON() ([]byte, error) {
  166. v := struct {
  167. ID uint32 `json:"id"`
  168. Codec *Codec `json:"codec"`
  169. Parent uint32 `json:"parent,omitempty"`
  170. Bytes int `json:"bytes,omitempty"`
  171. Packets int `json:"packets,omitempty"`
  172. Drops int `json:"drops,omitempty"`
  173. }{
  174. ID: s.Node.id,
  175. Codec: s.Node.Codec,
  176. Bytes: s.Bytes,
  177. Packets: s.Packets,
  178. Drops: s.Drops,
  179. }
  180. if s.parent != nil {
  181. v.Parent = s.parent.id
  182. }
  183. return json.Marshal(v)
  184. }