streamState.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package core
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "os"
  8. "path/filepath"
  9. "time"
  10. log "github.com/sirupsen/logrus"
  11. "github.com/owncast/owncast/config"
  12. "github.com/owncast/owncast/core/chat"
  13. "github.com/owncast/owncast/core/data"
  14. "github.com/owncast/owncast/core/rtmp"
  15. "github.com/owncast/owncast/core/transcoder"
  16. "github.com/owncast/owncast/core/webhooks"
  17. "github.com/owncast/owncast/models"
  18. "github.com/owncast/owncast/static"
  19. "github.com/owncast/owncast/utils"
  20. "github.com/grafov/m3u8"
  21. )
  22. // After the stream goes offline this timer fires a full cleanup after N min.
  23. var _offlineCleanupTimer *time.Timer
  24. // While a stream takes place cleanup old HLS content every N min.
  25. var _onlineCleanupTicker *time.Ticker
  26. var _currentBroadcast *models.CurrentBroadcast
  27. // setStreamAsConnected sets the stream as connected.
  28. func setStreamAsConnected(rtmpOut *io.PipeReader) {
  29. now := utils.NullTime{Time: time.Now(), Valid: true}
  30. _stats.StreamConnected = true
  31. _stats.LastDisconnectTime = nil
  32. _stats.LastConnectTime = &now
  33. _stats.SessionMaxViewerCount = 0
  34. _currentBroadcast = &models.CurrentBroadcast{
  35. LatencyLevel: data.GetStreamLatencyLevel(),
  36. OutputSettings: data.GetStreamOutputVariants(),
  37. }
  38. StopOfflineCleanupTimer()
  39. startOnlineCleanupTimer()
  40. if _yp != nil {
  41. go _yp.Start()
  42. }
  43. segmentPath := config.HLSStoragePath
  44. if err := setupStorage(); err != nil {
  45. log.Fatalln("failed to setup the storage", err)
  46. }
  47. go func() {
  48. _transcoder = transcoder.NewTranscoder()
  49. _transcoder.TranscoderCompleted = func(error) {
  50. SetStreamAsDisconnected()
  51. _transcoder = nil
  52. _currentBroadcast = nil
  53. }
  54. _transcoder.SetStdin(rtmpOut)
  55. _transcoder.Start()
  56. }()
  57. go webhooks.SendStreamStatusEvent(models.StreamStarted)
  58. transcoder.StartThumbnailGenerator(segmentPath, data.FindHighestVideoQualityIndex(_currentBroadcast.OutputSettings))
  59. _ = chat.SendSystemAction("Stay tuned, the stream is **starting**!", true)
  60. chat.SendAllWelcomeMessage()
  61. }
  62. // SetStreamAsDisconnected sets the stream as disconnected.
  63. func SetStreamAsDisconnected() {
  64. _ = chat.SendSystemAction("The stream is ending.", true)
  65. now := utils.NullTime{Time: time.Now(), Valid: true}
  66. _stats.StreamConnected = false
  67. _stats.LastDisconnectTime = &now
  68. _stats.LastConnectTime = nil
  69. _broadcaster = nil
  70. offlineFileData := static.GetOfflineSegment()
  71. offlineFilename := "offline.ts"
  72. offlineTmpFile, err := ioutil.TempFile(os.TempDir(), offlineFilename)
  73. if err != nil {
  74. log.Errorln("unable to create temp file for offline video segment")
  75. }
  76. if _, err = offlineTmpFile.Write(offlineFileData); err != nil {
  77. log.Errorln("unable to write offline segment to disk", err)
  78. }
  79. offlineFilePath := offlineTmpFile.Name()
  80. transcoder.StopThumbnailGenerator()
  81. rtmp.Disconnect()
  82. if _yp != nil {
  83. _yp.Stop()
  84. }
  85. // If there is no current broadcast available the previous stream
  86. // likely failed for some reason. Don't try to append to it.
  87. // Just transition to offline.
  88. if _currentBroadcast == nil {
  89. stopOnlineCleanupTimer()
  90. transitionToOfflineVideoStreamContent()
  91. return
  92. }
  93. for index := range _currentBroadcast.OutputSettings {
  94. playlistFilePath := fmt.Sprintf(filepath.Join(config.HLSStoragePath, "%d/stream.m3u8"), index)
  95. segmentFilePath := fmt.Sprintf(filepath.Join(config.HLSStoragePath, "%d/%s"), index, offlineFilename)
  96. if err := utils.Copy(offlineFilePath, segmentFilePath); err != nil {
  97. log.Warnln(err)
  98. }
  99. if _, err := _storage.Save(segmentFilePath, 0); err != nil {
  100. log.Warnln(err)
  101. }
  102. if utils.DoesFileExists(playlistFilePath) {
  103. f, err := os.OpenFile(playlistFilePath, os.O_CREATE|os.O_RDWR, os.ModePerm) //nolint
  104. if err != nil {
  105. log.Errorln(err)
  106. }
  107. defer f.Close()
  108. playlist, _, err := m3u8.DecodeFrom(bufio.NewReader(f), true)
  109. if err != nil {
  110. log.Fatalln(err)
  111. }
  112. variantPlaylist := playlist.(*m3u8.MediaPlaylist)
  113. if len(variantPlaylist.Segments) > data.GetStreamLatencyLevel().SegmentCount {
  114. variantPlaylist.Segments = variantPlaylist.Segments[:len(variantPlaylist.Segments)]
  115. }
  116. if err := variantPlaylist.Append(offlineFilename, 8.0, ""); err != nil {
  117. log.Fatalln(err)
  118. }
  119. if err := variantPlaylist.SetDiscontinuity(); err != nil {
  120. log.Fatalln(err)
  121. }
  122. if _, err := f.WriteAt(variantPlaylist.Encode().Bytes(), 0); err != nil {
  123. log.Errorln(err)
  124. }
  125. } else {
  126. p, err := m3u8.NewMediaPlaylist(1, 1)
  127. if err != nil {
  128. log.Errorln(err)
  129. }
  130. // If "offline" content gets changed then change the duration below
  131. if err := p.Append(offlineFilename, 8.0, ""); err != nil {
  132. log.Errorln(err)
  133. }
  134. p.Close()
  135. f, err := os.Create(playlistFilePath)
  136. if err != nil {
  137. log.Errorln(err)
  138. }
  139. defer f.Close()
  140. if _, err := f.Write(p.Encode().Bytes()); err != nil {
  141. log.Errorln(err)
  142. }
  143. }
  144. if _, err := _storage.Save(playlistFilePath, 0); err != nil {
  145. log.Warnln(err)
  146. }
  147. }
  148. StartOfflineCleanupTimer()
  149. stopOnlineCleanupTimer()
  150. saveStats()
  151. go webhooks.SendStreamStatusEvent(models.StreamStopped)
  152. }
  153. // StartOfflineCleanupTimer will fire a cleanup after n minutes being disconnected.
  154. func StartOfflineCleanupTimer() {
  155. _offlineCleanupTimer = time.NewTimer(5 * time.Minute)
  156. go func() {
  157. for range _offlineCleanupTimer.C {
  158. // Set video to offline state
  159. resetDirectories()
  160. transitionToOfflineVideoStreamContent()
  161. }
  162. }()
  163. }
  164. // StopOfflineCleanupTimer will stop the previous cleanup timer.
  165. func StopOfflineCleanupTimer() {
  166. if _offlineCleanupTimer != nil {
  167. _offlineCleanupTimer.Stop()
  168. }
  169. }
  170. func startOnlineCleanupTimer() {
  171. _onlineCleanupTicker = time.NewTicker(1 * time.Minute)
  172. go func() {
  173. for range _onlineCleanupTicker.C {
  174. transcoder.CleanupOldContent(config.HLSStoragePath)
  175. }
  176. }()
  177. }
  178. func stopOnlineCleanupTimer() {
  179. if _onlineCleanupTicker != nil {
  180. _onlineCleanupTicker.Stop()
  181. }
  182. }