producer.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package image
  2. import (
  3. "errors"
  4. "io"
  5. "net/http"
  6. "github.com/AlexxIT/go2rtc/pkg/core"
  7. "github.com/AlexxIT/go2rtc/pkg/tcp"
  8. "github.com/pion/rtp"
  9. )
  10. type Producer struct {
  11. core.Connection
  12. closed bool
  13. res *http.Response
  14. }
  15. func Open(res *http.Response) (*Producer, error) {
  16. return &Producer{
  17. Connection: core.Connection{
  18. ID: core.NewID(),
  19. FormatName: "image",
  20. Protocol: "http",
  21. RemoteAddr: res.Request.URL.Host,
  22. Transport: res.Body,
  23. Medias: []*core.Media{
  24. {
  25. Kind: core.KindVideo,
  26. Direction: core.DirectionRecvonly,
  27. Codecs: []*core.Codec{
  28. {
  29. Name: core.CodecJPEG,
  30. ClockRate: 90000,
  31. PayloadType: core.PayloadTypeRAW,
  32. },
  33. },
  34. },
  35. },
  36. },
  37. res: res,
  38. }, nil
  39. }
  40. func (c *Producer) Start() error {
  41. body, err := io.ReadAll(c.res.Body)
  42. if err != nil {
  43. return err
  44. }
  45. pkt := &rtp.Packet{
  46. Header: rtp.Header{Timestamp: core.Now90000()},
  47. Payload: body,
  48. }
  49. c.Receivers[0].WriteRTP(pkt)
  50. c.Recv += len(body)
  51. req := c.res.Request
  52. for !c.closed {
  53. res, err := tcp.Do(req)
  54. if err != nil {
  55. return err
  56. }
  57. if res.StatusCode != http.StatusOK {
  58. return errors.New("wrong status: " + res.Status)
  59. }
  60. body, err = io.ReadAll(res.Body)
  61. if err != nil {
  62. return err
  63. }
  64. c.Recv += len(body)
  65. pkt = &rtp.Packet{
  66. Header: rtp.Header{Timestamp: core.Now90000()},
  67. Payload: body,
  68. }
  69. c.Receivers[0].WriteRTP(pkt)
  70. }
  71. return nil
  72. }
  73. func (c *Producer) Stop() error {
  74. c.closed = true
  75. return c.Connection.Stop()
  76. }