123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- package core
- import (
- "encoding/json"
- "errors"
- "github.com/pion/rtp"
- )
- var ErrCantGetTrack = errors.New("can't get track")
- type Receiver struct {
- Node
- // Deprecated: should be removed
- Media *Media `json:"-"`
- // Deprecated: should be removed
- ID byte `json:"-"` // Channel for RTSP, PayloadType for MPEG-TS
- Bytes int `json:"bytes,omitempty"`
- Packets int `json:"packets,omitempty"`
- }
- func NewReceiver(media *Media, codec *Codec) *Receiver {
- r := &Receiver{
- Node: Node{id: NewID(), Codec: codec},
- Media: media,
- }
- r.Input = func(packet *Packet) {
- r.Bytes += len(packet.Payload)
- r.Packets++
- for _, child := range r.childs {
- child.Input(packet)
- }
- }
- return r
- }
- // Deprecated: should be removed
- func (r *Receiver) WriteRTP(packet *rtp.Packet) {
- r.Input(packet)
- }
- // Deprecated: should be removed
- func (r *Receiver) Senders() []*Sender {
- if len(r.childs) > 0 {
- return []*Sender{{}}
- } else {
- return nil
- }
- }
- // Deprecated: should be removed
- func (r *Receiver) Replace(target *Receiver) {
- MoveNode(&target.Node, &r.Node)
- }
- func (r *Receiver) Close() {
- r.Node.Close()
- }
- type Sender struct {
- Node
- // Deprecated:
- Media *Media `json:"-"`
- // Deprecated:
- Handler HandlerFunc `json:"-"`
- Bytes int `json:"bytes,omitempty"`
- Packets int `json:"packets,omitempty"`
- Drops int `json:"drops,omitempty"`
- buf chan *Packet
- done chan struct{}
- }
- func NewSender(media *Media, codec *Codec) *Sender {
- var bufSize uint16
- if GetKind(codec.Name) == KindVideo {
- if codec.IsRTP() {
- // in my tests 40Mbit/s 4K-video can generate up to 1500 items
- // for the h264.RTPDepay => RTPPay queue
- bufSize = 4096
- } else {
- bufSize = 64
- }
- } else {
- bufSize = 128
- }
- buf := make(chan *Packet, bufSize)
- s := &Sender{
- Node: Node{id: NewID(), Codec: codec},
- Media: media,
- buf: buf,
- }
- s.Input = func(packet *Packet) {
- // writing to nil chan - OK, writing to closed chan - panic
- s.mu.Lock()
- select {
- case s.buf <- packet:
- s.Bytes += len(packet.Payload)
- s.Packets++
- default:
- s.Drops++
- }
- s.mu.Unlock()
- }
- s.Output = func(packet *Packet) {
- s.Handler(packet)
- }
- return s
- }
- // Deprecated: should be removed
- func (s *Sender) HandleRTP(parent *Receiver) {
- s.WithParent(parent)
- s.Start()
- }
- // Deprecated: should be removed
- func (s *Sender) Bind(parent *Receiver) {
- s.WithParent(parent)
- }
- func (s *Sender) WithParent(parent *Receiver) *Sender {
- s.Node.WithParent(&parent.Node)
- return s
- }
- func (s *Sender) Start() {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.buf == nil || s.done != nil {
- return
- }
- s.done = make(chan struct{})
- go func() {
- for packet := range s.buf {
- s.Output(packet)
- }
- close(s.done)
- }()
- }
- func (s *Sender) Wait() {
- if done := s.done; s.done != nil {
- <-done
- }
- }
- func (s *Sender) State() string {
- if s.buf == nil {
- return "closed"
- }
- if s.done == nil {
- return "new"
- }
- return "connected"
- }
- func (s *Sender) Close() {
- // close buffer if exists
- if buf := s.buf; buf != nil {
- s.buf = nil
- defer close(buf)
- }
- s.Node.Close()
- }
- func (r *Receiver) MarshalJSON() ([]byte, error) {
- v := struct {
- ID uint32 `json:"id"`
- Codec *Codec `json:"codec"`
- Childs []uint32 `json:"childs,omitempty"`
- Bytes int `json:"bytes,omitempty"`
- Packets int `json:"packets,omitempty"`
- }{
- ID: r.Node.id,
- Codec: r.Node.Codec,
- Bytes: r.Bytes,
- Packets: r.Packets,
- }
- for _, child := range r.childs {
- v.Childs = append(v.Childs, child.id)
- }
- return json.Marshal(v)
- }
- func (s *Sender) MarshalJSON() ([]byte, error) {
- v := struct {
- ID uint32 `json:"id"`
- Codec *Codec `json:"codec"`
- Parent uint32 `json:"parent,omitempty"`
- Bytes int `json:"bytes,omitempty"`
- Packets int `json:"packets,omitempty"`
- Drops int `json:"drops,omitempty"`
- }{
- ID: s.Node.id,
- Codec: s.Node.Codec,
- Bytes: s.Bytes,
- Packets: s.Packets,
- Drops: s.Drops,
- }
- if s.parent != nil {
- v.Parent = s.parent.id
- }
- return json.Marshal(v)
- }
|