streamState.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. package core
  2. import (
  3. "context"
  4. "io"
  5. "time"
  6. log "github.com/sirupsen/logrus"
  7. "github.com/owncast/owncast/activitypub"
  8. "github.com/owncast/owncast/config"
  9. "github.com/owncast/owncast/core/chat"
  10. "github.com/owncast/owncast/core/data"
  11. "github.com/owncast/owncast/core/rtmp"
  12. "github.com/owncast/owncast/core/transcoder"
  13. "github.com/owncast/owncast/core/webhooks"
  14. "github.com/owncast/owncast/models"
  15. "github.com/owncast/owncast/notifications"
  16. "github.com/owncast/owncast/utils"
  17. )
  18. // After the stream goes offline this timer fires a full cleanup after N min.
  19. var _offlineCleanupTimer *time.Timer
  20. // While a stream takes place cleanup old HLS content every N min.
  21. var _onlineCleanupTicker *time.Ticker
  22. var _currentBroadcast *models.CurrentBroadcast
  23. var _onlineTimerCancelFunc context.CancelFunc
  24. var _lastNotified *time.Time
  25. // setStreamAsConnected sets the stream as connected.
  26. func setStreamAsConnected(rtmpOut *io.PipeReader) {
  27. now := utils.NullTime{Time: time.Now(), Valid: true}
  28. _stats.StreamConnected = true
  29. _stats.LastDisconnectTime = nil
  30. _stats.LastConnectTime = &now
  31. _stats.SessionMaxViewerCount = 0
  32. _currentBroadcast = &models.CurrentBroadcast{
  33. LatencyLevel: data.GetStreamLatencyLevel(),
  34. OutputSettings: data.GetStreamOutputVariants(),
  35. }
  36. StopOfflineCleanupTimer()
  37. startOnlineCleanupTimer()
  38. if _yp != nil {
  39. go _yp.Start()
  40. }
  41. segmentPath := config.HLSStoragePath
  42. if err := setupStorage(); err != nil {
  43. log.Fatalln("failed to setup the storage", err)
  44. }
  45. go func() {
  46. _transcoder = transcoder.NewTranscoder()
  47. _transcoder.TranscoderCompleted = func(error) {
  48. SetStreamAsDisconnected()
  49. _transcoder = nil
  50. _currentBroadcast = nil
  51. }
  52. _transcoder.SetStdin(rtmpOut)
  53. _transcoder.Start()
  54. }()
  55. go webhooks.SendStreamStatusEvent(models.StreamStarted)
  56. transcoder.StartThumbnailGenerator(segmentPath, data.FindHighestVideoQualityIndex(_currentBroadcast.OutputSettings))
  57. _ = chat.SendSystemAction("Stay tuned, the stream is **starting**!", true)
  58. chat.SendAllWelcomeMessage()
  59. // Send delayed notification messages.
  60. _onlineTimerCancelFunc = startLiveStreamNotificationsTimer()
  61. }
  62. // SetStreamAsDisconnected sets the stream as disconnected.
  63. func SetStreamAsDisconnected() {
  64. _ = chat.SendSystemAction("The stream is ending.", true)
  65. now := utils.NullTime{Time: time.Now(), Valid: true}
  66. if _onlineTimerCancelFunc != nil {
  67. _onlineTimerCancelFunc()
  68. }
  69. _stats.StreamConnected = false
  70. _stats.LastDisconnectTime = &now
  71. _stats.LastConnectTime = nil
  72. _broadcaster = nil
  73. offlineFilename := "offline.ts"
  74. offlineFilePath, err := saveOfflineClipToDisk(offlineFilename)
  75. if err != nil {
  76. log.Errorln(err)
  77. return
  78. }
  79. transcoder.StopThumbnailGenerator()
  80. rtmp.Disconnect()
  81. if _yp != nil {
  82. _yp.Stop()
  83. }
  84. // If there is no current broadcast available the previous stream
  85. // likely failed for some reason. Don't try to append to it.
  86. // Just transition to offline.
  87. if _currentBroadcast == nil {
  88. stopOnlineCleanupTimer()
  89. transitionToOfflineVideoStreamContent()
  90. log.Errorln("unexpected nil _currentBroadcast")
  91. return
  92. }
  93. for index := range _currentBroadcast.OutputSettings {
  94. makeVariantIndexOffline(index, offlineFilePath, offlineFilename)
  95. }
  96. StartOfflineCleanupTimer()
  97. stopOnlineCleanupTimer()
  98. saveStats()
  99. go webhooks.SendStreamStatusEvent(models.StreamStopped)
  100. }
  101. // StartOfflineCleanupTimer will fire a cleanup after n minutes being disconnected.
  102. func StartOfflineCleanupTimer() {
  103. _offlineCleanupTimer = time.NewTimer(5 * time.Minute)
  104. go func() {
  105. for range _offlineCleanupTimer.C {
  106. // Set video to offline state
  107. resetDirectories()
  108. transitionToOfflineVideoStreamContent()
  109. }
  110. }()
  111. }
  112. // StopOfflineCleanupTimer will stop the previous cleanup timer.
  113. func StopOfflineCleanupTimer() {
  114. if _offlineCleanupTimer != nil {
  115. _offlineCleanupTimer.Stop()
  116. }
  117. }
  118. func startOnlineCleanupTimer() {
  119. _onlineCleanupTicker = time.NewTicker(1 * time.Minute)
  120. go func() {
  121. for range _onlineCleanupTicker.C {
  122. transcoder.CleanupOldContent(config.HLSStoragePath)
  123. }
  124. }()
  125. }
  126. func stopOnlineCleanupTimer() {
  127. if _onlineCleanupTicker != nil {
  128. _onlineCleanupTicker.Stop()
  129. }
  130. }
  131. func startLiveStreamNotificationsTimer() context.CancelFunc {
  132. // Send delayed notification messages.
  133. c, cancelFunc := context.WithCancel(context.Background())
  134. _onlineTimerCancelFunc = cancelFunc
  135. go func(c context.Context) {
  136. select {
  137. case <-time.After(time.Minute * 2.0):
  138. if _lastNotified != nil && time.Since(*_lastNotified) < 10*time.Minute {
  139. return
  140. }
  141. // Send Fediverse message.
  142. if data.GetFederationEnabled() {
  143. log.Traceln("Sending Federated Go Live message.")
  144. if err := activitypub.SendLive(); err != nil {
  145. log.Errorln(err)
  146. }
  147. }
  148. // Send notification to those who have registered for them.
  149. if notifier, err := notifications.New(data.GetDatastore()); err != nil {
  150. log.Errorln(err)
  151. } else {
  152. notifier.Notify()
  153. }
  154. now := time.Now()
  155. _lastNotified = &now
  156. case <-c.Done():
  157. }
  158. }(c)
  159. return cancelFunc
  160. }