rtmp.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package rtmp
  2. import (
  3. "fmt"
  4. "io"
  5. "net"
  6. "time"
  7. "github.com/nareix/joy5/format/flv"
  8. "github.com/nareix/joy5/format/flv/flvio"
  9. log "github.com/sirupsen/logrus"
  10. "github.com/nareix/joy5/format/rtmp"
  11. "github.com/owncast/owncast/config"
  12. "github.com/owncast/owncast/core/data"
  13. "github.com/owncast/owncast/models"
  14. )
  15. var _hasInboundRTMPConnection = false
  16. var (
  17. _pipe *io.PipeWriter
  18. _rtmpConnection net.Conn
  19. )
  20. var (
  21. _setStreamAsConnected func(*io.PipeReader)
  22. _setBroadcaster func(models.Broadcaster)
  23. )
  24. // Start starts the rtmp service, listening on specified RTMP port.
  25. func Start(setStreamAsConnected func(*io.PipeReader), setBroadcaster func(models.Broadcaster)) {
  26. _setStreamAsConnected = setStreamAsConnected
  27. _setBroadcaster = setBroadcaster
  28. port := data.GetRTMPPortNumber()
  29. s := rtmp.NewServer()
  30. var lis net.Listener
  31. var err error
  32. if lis, err = net.Listen("tcp", fmt.Sprintf(":%d", port)); err != nil {
  33. log.Fatal(err)
  34. }
  35. s.LogEvent = func(c *rtmp.Conn, nc net.Conn, e int) {
  36. es := rtmp.EventString[e]
  37. log.Traceln("RTMP", nc.LocalAddr(), nc.RemoteAddr(), es)
  38. }
  39. s.HandleConn = HandleConn
  40. if err != nil {
  41. log.Panicln(err)
  42. }
  43. log.Tracef("RTMP server is listening for incoming stream on port: %d", port)
  44. for {
  45. nc, err := lis.Accept()
  46. if err != nil {
  47. time.Sleep(time.Second)
  48. continue
  49. }
  50. go s.HandleNetConn(nc)
  51. }
  52. }
  53. // HandleConn is fired when an inbound RTMP connection takes place.
  54. func HandleConn(c *rtmp.Conn, nc net.Conn) {
  55. c.LogTagEvent = func(isRead bool, t flvio.Tag) {
  56. if t.Type == flvio.TAG_AMF0 {
  57. log.Tracef("%+v\n", t.DebugFields())
  58. setCurrentBroadcasterInfo(t, nc.RemoteAddr().String())
  59. }
  60. }
  61. if _hasInboundRTMPConnection {
  62. log.Errorln("stream already running; can not overtake an existing stream from", nc.RemoteAddr().String())
  63. _ = nc.Close()
  64. return
  65. }
  66. accessGranted := false
  67. validStreamingKeys := data.GetStreamKeys()
  68. // If a stream key override was specified then use that instead.
  69. if config.TemporaryStreamKey != "" {
  70. validStreamingKeys = []models.StreamKey{{Key: config.TemporaryStreamKey}}
  71. }
  72. for _, key := range validStreamingKeys {
  73. if secretMatch(key.Key, c.URL.Path) {
  74. accessGranted = true
  75. break
  76. }
  77. }
  78. if !accessGranted {
  79. log.Errorln("invalid streaming key; rejecting incoming stream from", nc.RemoteAddr().String())
  80. _ = nc.Close()
  81. return
  82. }
  83. rtmpOut, rtmpIn := io.Pipe()
  84. _pipe = rtmpIn
  85. log.Infoln("Inbound stream connected from", nc.RemoteAddr().String())
  86. _setStreamAsConnected(rtmpOut)
  87. _hasInboundRTMPConnection = true
  88. _rtmpConnection = nc
  89. w := flv.NewMuxer(rtmpIn)
  90. for {
  91. if !_hasInboundRTMPConnection {
  92. break
  93. }
  94. // If we don't get a readable packet in 10 seconds give up and disconnect
  95. if err := _rtmpConnection.SetReadDeadline(time.Now().Add(10 * time.Second)); err != nil {
  96. log.Debugln(err)
  97. }
  98. pkt, err := c.ReadPacket()
  99. // Broadcaster disconnected
  100. if err == io.EOF {
  101. handleDisconnect(nc)
  102. return
  103. }
  104. // Read timeout. Disconnect.
  105. if neterr, ok := err.(net.Error); ok && neterr.Timeout() {
  106. log.Debugln("Timeout reading the inbound stream from the broadcaster. Assuming that they disconnected and ending the stream.")
  107. handleDisconnect(nc)
  108. return
  109. }
  110. if err := w.WritePacket(pkt); err != nil {
  111. log.Errorln("unable to write rtmp packet", err)
  112. handleDisconnect(nc)
  113. return
  114. }
  115. }
  116. }
  117. func handleDisconnect(conn net.Conn) {
  118. if !_hasInboundRTMPConnection {
  119. return
  120. }
  121. log.Infoln("Inbound stream disconnected.")
  122. _ = conn.Close()
  123. _ = _pipe.Close()
  124. _hasInboundRTMPConnection = false
  125. }
  126. // Disconnect will force disconnect the current inbound RTMP connection.
  127. func Disconnect() {
  128. if _rtmpConnection == nil {
  129. return
  130. }
  131. log.Traceln("Inbound stream disconnect requested.")
  132. handleDisconnect(_rtmpConnection)
  133. }