streamState.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package core
  2. import (
  3. "context"
  4. "io"
  5. "time"
  6. log "github.com/sirupsen/logrus"
  7. "github.com/owncast/owncast/activitypub"
  8. "github.com/owncast/owncast/config"
  9. "github.com/owncast/owncast/core/chat"
  10. "github.com/owncast/owncast/core/data"
  11. "github.com/owncast/owncast/core/rtmp"
  12. "github.com/owncast/owncast/core/transcoder"
  13. "github.com/owncast/owncast/core/webhooks"
  14. "github.com/owncast/owncast/models"
  15. "github.com/owncast/owncast/notifications"
  16. "github.com/owncast/owncast/utils"
  17. )
  18. // After the stream goes offline this timer fires a full cleanup after N min.
  19. var _offlineCleanupTimer *time.Timer
  20. // While a stream takes place cleanup old HLS content every N min.
  21. var _onlineCleanupTicker *time.Ticker
  22. var _currentBroadcast *models.CurrentBroadcast
  23. var _onlineTimerCancelFunc context.CancelFunc
  24. var _lastNotified *time.Time
  25. // setStreamAsConnected sets the stream as connected.
  26. func setStreamAsConnected(rtmpOut *io.PipeReader) {
  27. now := utils.NullTime{Time: time.Now(), Valid: true}
  28. _stats.StreamConnected = true
  29. _stats.LastDisconnectTime = nil
  30. _stats.LastConnectTime = &now
  31. _stats.SessionMaxViewerCount = 0
  32. _currentBroadcast = &models.CurrentBroadcast{
  33. LatencyLevel: data.GetStreamLatencyLevel(),
  34. OutputSettings: data.GetStreamOutputVariants(),
  35. }
  36. StopOfflineCleanupTimer()
  37. startOnlineCleanupTimer()
  38. if _yp != nil {
  39. go _yp.Start()
  40. }
  41. segmentPath := config.HLSStoragePath
  42. if err := setupStorage(); err != nil {
  43. log.Fatalln("failed to setup the storage", err)
  44. }
  45. go func() {
  46. _transcoder = transcoder.NewTranscoder()
  47. _transcoder.TranscoderCompleted = func(error) {
  48. SetStreamAsDisconnected()
  49. _transcoder = nil
  50. _currentBroadcast = nil
  51. }
  52. _transcoder.SetStdin(rtmpOut)
  53. _transcoder.Start(true)
  54. }()
  55. go webhooks.SendStreamStatusEvent(models.StreamStarted)
  56. selectedThumbnailVideoQualityIndex, isVideoPassthrough := data.FindHighestVideoQualityIndex(_currentBroadcast.OutputSettings)
  57. transcoder.StartThumbnailGenerator(segmentPath, selectedThumbnailVideoQualityIndex, isVideoPassthrough)
  58. _ = chat.SendSystemAction("Stay tuned, the stream is **starting**!", true)
  59. chat.SendAllWelcomeMessage()
  60. // Send delayed notification messages.
  61. _onlineTimerCancelFunc = startLiveStreamNotificationsTimer()
  62. }
  63. // SetStreamAsDisconnected sets the stream as disconnected.
  64. func SetStreamAsDisconnected() {
  65. _ = chat.SendSystemAction("The stream is ending.", true)
  66. now := utils.NullTime{Time: time.Now(), Valid: true}
  67. if _onlineTimerCancelFunc != nil {
  68. _onlineTimerCancelFunc()
  69. }
  70. _stats.StreamConnected = false
  71. _stats.LastDisconnectTime = &now
  72. _stats.LastConnectTime = nil
  73. _broadcaster = nil
  74. offlineFilename := "offline-v2.ts"
  75. offlineFilePath, err := saveOfflineClipToDisk(offlineFilename)
  76. if err != nil {
  77. log.Errorln(err)
  78. return
  79. }
  80. transcoder.StopThumbnailGenerator()
  81. rtmp.Disconnect()
  82. if _yp != nil {
  83. _yp.Stop()
  84. }
  85. // If there is no current broadcast available the previous stream
  86. // likely failed for some reason. Don't try to append to it.
  87. // Just transition to offline.
  88. if _currentBroadcast == nil {
  89. stopOnlineCleanupTimer()
  90. transitionToOfflineVideoStreamContent()
  91. log.Errorln("unexpected nil _currentBroadcast")
  92. return
  93. }
  94. for index := range _currentBroadcast.OutputSettings {
  95. makeVariantIndexOffline(index, offlineFilePath, offlineFilename)
  96. }
  97. StartOfflineCleanupTimer()
  98. stopOnlineCleanupTimer()
  99. saveStats()
  100. go webhooks.SendStreamStatusEvent(models.StreamStopped)
  101. }
  102. // StartOfflineCleanupTimer will fire a cleanup after n minutes being disconnected.
  103. func StartOfflineCleanupTimer() {
  104. _offlineCleanupTimer = time.NewTimer(5 * time.Minute)
  105. go func() {
  106. for range _offlineCleanupTimer.C {
  107. // Set video to offline state
  108. resetDirectories()
  109. transitionToOfflineVideoStreamContent()
  110. }
  111. }()
  112. }
  113. // StopOfflineCleanupTimer will stop the previous cleanup timer.
  114. func StopOfflineCleanupTimer() {
  115. if _offlineCleanupTimer != nil {
  116. _offlineCleanupTimer.Stop()
  117. }
  118. }
  119. func startOnlineCleanupTimer() {
  120. _onlineCleanupTicker = time.NewTicker(1 * time.Minute)
  121. go func() {
  122. for range _onlineCleanupTicker.C {
  123. if err := _storage.Cleanup(); err != nil {
  124. log.Errorln(err)
  125. }
  126. }
  127. }()
  128. }
  129. func stopOnlineCleanupTimer() {
  130. if _onlineCleanupTicker != nil {
  131. _onlineCleanupTicker.Stop()
  132. }
  133. }
  134. func startLiveStreamNotificationsTimer() context.CancelFunc {
  135. // Send delayed notification messages.
  136. c, cancelFunc := context.WithCancel(context.Background())
  137. _onlineTimerCancelFunc = cancelFunc
  138. go func(c context.Context) {
  139. select {
  140. case <-time.After(time.Minute * 2.0):
  141. if _lastNotified != nil && time.Since(*_lastNotified) < 10*time.Minute {
  142. return
  143. }
  144. // Send Fediverse message.
  145. if data.GetFederationEnabled() {
  146. log.Traceln("Sending Federated Go Live message.")
  147. if err := activitypub.SendLive(); err != nil {
  148. log.Errorln(err)
  149. }
  150. }
  151. // Send notification to those who have registered for them.
  152. if notifier, err := notifications.New(data.GetDatastore()); err != nil {
  153. log.Errorln(err)
  154. } else {
  155. notifier.Notify()
  156. }
  157. now := time.Now()
  158. _lastNotified = &now
  159. case <-c.Done():
  160. }
  161. }(c)
  162. return cancelFunc
  163. }