package core import ( "encoding/json" "errors" "github.com/pion/rtp" ) var ErrCantGetTrack = errors.New("can't get track") type Receiver struct { Node // Deprecated: should be removed Media *Media `json:"-"` // Deprecated: should be removed ID byte `json:"-"` // Channel for RTSP, PayloadType for MPEG-TS Bytes int `json:"bytes,omitempty"` Packets int `json:"packets,omitempty"` } func NewReceiver(media *Media, codec *Codec) *Receiver { r := &Receiver{ Node: Node{id: NewID(), Codec: codec}, Media: media, } r.Input = func(packet *Packet) { r.Bytes += len(packet.Payload) r.Packets++ for _, child := range r.childs { child.Input(packet) } } return r } // Deprecated: should be removed func (r *Receiver) WriteRTP(packet *rtp.Packet) { r.Input(packet) } // Deprecated: should be removed func (r *Receiver) Senders() []*Sender { if len(r.childs) > 0 { return []*Sender{{}} } else { return nil } } // Deprecated: should be removed func (r *Receiver) Replace(target *Receiver) { MoveNode(&target.Node, &r.Node) } func (r *Receiver) Close() { r.Node.Close() } type Sender struct { Node // Deprecated: Media *Media `json:"-"` // Deprecated: Handler HandlerFunc `json:"-"` Bytes int `json:"bytes,omitempty"` Packets int `json:"packets,omitempty"` Drops int `json:"drops,omitempty"` buf chan *Packet done chan struct{} } func NewSender(media *Media, codec *Codec) *Sender { var bufSize uint16 if GetKind(codec.Name) == KindVideo { if codec.IsRTP() { // in my tests 40Mbit/s 4K-video can generate up to 1500 items // for the h264.RTPDepay => RTPPay queue bufSize = 4096 } else { bufSize = 64 } } else { bufSize = 128 } buf := make(chan *Packet, bufSize) s := &Sender{ Node: Node{id: NewID(), Codec: codec}, Media: media, buf: buf, } s.Input = func(packet *Packet) { // writing to nil chan - OK, writing to closed chan - panic s.mu.Lock() select { case s.buf <- packet: s.Bytes += len(packet.Payload) s.Packets++ default: s.Drops++ } s.mu.Unlock() } s.Output = func(packet *Packet) { s.Handler(packet) } return s } // Deprecated: should be removed func (s *Sender) HandleRTP(parent *Receiver) { s.WithParent(parent) s.Start() } // Deprecated: should be removed func (s *Sender) Bind(parent *Receiver) { s.WithParent(parent) } func (s *Sender) WithParent(parent *Receiver) *Sender { s.Node.WithParent(&parent.Node) return s } func (s *Sender) Start() { s.mu.Lock() defer s.mu.Unlock() if s.buf == nil || s.done != nil { return } s.done = make(chan struct{}) go func() { for packet := range s.buf { s.Output(packet) } close(s.done) }() } func (s *Sender) Wait() { if done := s.done; s.done != nil { <-done } } func (s *Sender) State() string { if s.buf == nil { return "closed" } if s.done == nil { return "new" } return "connected" } func (s *Sender) Close() { // close buffer if exists if buf := s.buf; buf != nil { s.buf = nil defer close(buf) } s.Node.Close() } func (r *Receiver) MarshalJSON() ([]byte, error) { v := struct { ID uint32 `json:"id"` Codec *Codec `json:"codec"` Childs []uint32 `json:"childs,omitempty"` Bytes int `json:"bytes,omitempty"` Packets int `json:"packets,omitempty"` }{ ID: r.Node.id, Codec: r.Node.Codec, Bytes: r.Bytes, Packets: r.Packets, } for _, child := range r.childs { v.Childs = append(v.Childs, child.id) } return json.Marshal(v) } func (s *Sender) MarshalJSON() ([]byte, error) { v := struct { ID uint32 `json:"id"` Codec *Codec `json:"codec"` Parent uint32 `json:"parent,omitempty"` Bytes int `json:"bytes,omitempty"` Packets int `json:"packets,omitempty"` Drops int `json:"drops,omitempty"` }{ ID: s.Node.id, Codec: s.Node.Codec, Bytes: s.Bytes, Packets: s.Packets, Drops: s.Drops, } if s.parent != nil { v.Parent = s.parent.id } return json.Marshal(v) }