consumer.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package mpegts
  2. import (
  3. "io"
  4. "github.com/AlexxIT/go2rtc/pkg/aac"
  5. "github.com/AlexxIT/go2rtc/pkg/core"
  6. "github.com/AlexxIT/go2rtc/pkg/h264"
  7. "github.com/AlexxIT/go2rtc/pkg/h265"
  8. "github.com/pion/rtp"
  9. )
  10. type Consumer struct {
  11. core.Connection
  12. muxer *Muxer
  13. wr *core.WriteBuffer
  14. }
  15. func NewConsumer() *Consumer {
  16. medias := []*core.Media{
  17. {
  18. Kind: core.KindVideo,
  19. Direction: core.DirectionSendonly,
  20. Codecs: []*core.Codec{
  21. {Name: core.CodecH264},
  22. {Name: core.CodecH265},
  23. },
  24. },
  25. {
  26. Kind: core.KindAudio,
  27. Direction: core.DirectionSendonly,
  28. Codecs: []*core.Codec{
  29. {Name: core.CodecAAC},
  30. },
  31. },
  32. }
  33. wr := core.NewWriteBuffer(nil)
  34. return &Consumer{
  35. core.Connection{
  36. ID: core.NewID(),
  37. FormatName: "mpegts",
  38. Medias: medias,
  39. Transport: wr,
  40. },
  41. NewMuxer(),
  42. wr,
  43. }
  44. }
  45. func (c *Consumer) AddTrack(media *core.Media, codec *core.Codec, track *core.Receiver) error {
  46. sender := core.NewSender(media, track.Codec)
  47. switch track.Codec.Name {
  48. case core.CodecH264:
  49. pid := c.muxer.AddTrack(StreamTypeH264)
  50. sender.Handler = func(pkt *rtp.Packet) {
  51. b := c.muxer.GetPayload(pid, pkt.Timestamp, pkt.Payload)
  52. if n, err := c.wr.Write(b); err == nil {
  53. c.Send += n
  54. }
  55. }
  56. if track.Codec.IsRTP() {
  57. sender.Handler = h264.RTPDepay(track.Codec, sender.Handler)
  58. } else {
  59. sender.Handler = h264.RepairAVCC(track.Codec, sender.Handler)
  60. }
  61. case core.CodecH265:
  62. pid := c.muxer.AddTrack(StreamTypeH265)
  63. sender.Handler = func(pkt *rtp.Packet) {
  64. b := c.muxer.GetPayload(pid, pkt.Timestamp, pkt.Payload)
  65. if n, err := c.wr.Write(b); err == nil {
  66. c.Send += n
  67. }
  68. }
  69. if track.Codec.IsRTP() {
  70. sender.Handler = h265.RTPDepay(track.Codec, sender.Handler)
  71. }
  72. case core.CodecAAC:
  73. pid := c.muxer.AddTrack(StreamTypeAAC)
  74. // convert timestamp to 90000Hz clock
  75. dt := 90000 / float64(track.Codec.ClockRate)
  76. sender.Handler = func(pkt *rtp.Packet) {
  77. pts := uint32(float64(pkt.Timestamp) * dt)
  78. b := c.muxer.GetPayload(pid, pts, pkt.Payload)
  79. if n, err := c.wr.Write(b); err == nil {
  80. c.Send += n
  81. }
  82. }
  83. if track.Codec.IsRTP() {
  84. sender.Handler = aac.RTPToADTS(track.Codec, sender.Handler)
  85. } else {
  86. sender.Handler = aac.EncodeToADTS(track.Codec, sender.Handler)
  87. }
  88. }
  89. sender.HandleRTP(track)
  90. c.Senders = append(c.Senders, sender)
  91. return nil
  92. }
  93. func (c *Consumer) WriteTo(wr io.Writer) (int64, error) {
  94. b := c.muxer.GetHeader()
  95. if _, err := wr.Write(b); err != nil {
  96. return 0, err
  97. }
  98. return c.wr.WriteTo(wr)
  99. }
  100. //func TimestampFromRTP(rtp *rtp.Packet, codec *core.Codec) {
  101. // if codec.ClockRate == ClockRate {
  102. // return
  103. // }
  104. // rtp.Timestamp = uint32(float64(rtp.Timestamp) / float64(codec.ClockRate) * ClockRate)
  105. //}