stats.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package core
  2. import (
  3. "math"
  4. "sync"
  5. "time"
  6. log "github.com/sirupsen/logrus"
  7. "github.com/owncast/owncast/models"
  8. "github.com/owncast/owncast/services/geoip"
  9. "github.com/owncast/owncast/storage/configrepository"
  10. )
  11. var (
  12. l = &sync.RWMutex{}
  13. _activeViewerPurgeTimeout = time.Second * 15
  14. _geoIPClient = geoip.NewClient()
  15. )
  16. func setupStats() error {
  17. s := getSavedStats()
  18. _stats = &s
  19. statsSaveTimer := time.NewTicker(1 * time.Minute)
  20. go func() {
  21. for range statsSaveTimer.C {
  22. saveStats()
  23. }
  24. }()
  25. viewerCountPruneTimer := time.NewTicker(5 * time.Second)
  26. go func() {
  27. for range viewerCountPruneTimer.C {
  28. pruneViewerCount()
  29. }
  30. }()
  31. return nil
  32. }
  33. // IsStreamConnected checks if the stream is connected or not.
  34. func IsStreamConnected() bool {
  35. if !_stats.StreamConnected {
  36. return false
  37. }
  38. configRepository := configrepository.Get()
  39. // Kind of a hack. It takes a handful of seconds between a RTMP connection and when HLS data is available.
  40. // So account for that with an artificial buffer of four segments.
  41. timeSinceLastConnected := time.Since(_stats.LastConnectTime.Time).Seconds()
  42. waitTime := math.Max(float64(configRepository.GetStreamLatencyLevel().SecondsPerSegment)*3.0, 7)
  43. if timeSinceLastConnected < waitTime {
  44. return false
  45. }
  46. return _stats.StreamConnected
  47. }
  48. // RemoveChatClient removes a client from the active clients record.
  49. func RemoveChatClient(clientID string) {
  50. log.Trace("Removing the client:", clientID)
  51. l.Lock()
  52. delete(_stats.ChatClients, clientID)
  53. l.Unlock()
  54. }
  55. // SetViewerActive sets a client as active and connected.
  56. func SetViewerActive(viewer *models.Viewer) {
  57. // Don't update viewer counts if a live stream session is not active.
  58. if !_stats.StreamConnected {
  59. return
  60. }
  61. l.Lock()
  62. defer l.Unlock()
  63. // Asynchronously, optionally, fetch GeoIP configRepository.
  64. go func(viewer *models.Viewer) {
  65. viewer.Geo = _geoIPClient.GetGeoFromIP(viewer.IPAddress)
  66. }(viewer)
  67. if _, exists := _stats.Viewers[viewer.ClientID]; exists {
  68. _stats.Viewers[viewer.ClientID].LastSeen = time.Now()
  69. } else {
  70. _stats.Viewers[viewer.ClientID] = viewer
  71. }
  72. _stats.SessionMaxViewerCount = int(math.Max(float64(len(_stats.Viewers)), float64(_stats.SessionMaxViewerCount)))
  73. _stats.OverallMaxViewerCount = int(math.Max(float64(_stats.SessionMaxViewerCount), float64(_stats.OverallMaxViewerCount)))
  74. }
  75. // GetActiveViewers will return the active viewers.
  76. func GetActiveViewers() map[string]*models.Viewer {
  77. return _stats.Viewers
  78. }
  79. func pruneViewerCount() {
  80. viewers := make(map[string]*models.Viewer)
  81. l.Lock()
  82. defer l.Unlock()
  83. for viewerID, viewer := range _stats.Viewers {
  84. viewerLastSeenTime := _stats.Viewers[viewerID].LastSeen
  85. if time.Since(viewerLastSeenTime) < _activeViewerPurgeTimeout {
  86. viewers[viewerID] = viewer
  87. }
  88. }
  89. _stats.Viewers = viewers
  90. }
  91. func saveStats() {
  92. configRepository := configrepository.Get()
  93. if err := configRepository.SetPeakOverallViewerCount(_stats.OverallMaxViewerCount); err != nil {
  94. log.Errorln("error saving viewer count", err)
  95. }
  96. if err := configRepository.SetPeakSessionViewerCount(_stats.SessionMaxViewerCount); err != nil {
  97. log.Errorln("error saving viewer count", err)
  98. }
  99. if _stats.LastDisconnectTime != nil && _stats.LastDisconnectTime.Valid {
  100. if err := configRepository.SetLastDisconnectTime(_stats.LastDisconnectTime.Time); err != nil {
  101. log.Errorln("error saving disconnect time", err)
  102. }
  103. }
  104. }
  105. func getSavedStats() models.Stats {
  106. configRepository := configrepository.Get()
  107. savedLastDisconnectTime, _ := configRepository.GetLastDisconnectTime()
  108. result := models.Stats{
  109. ChatClients: make(map[string]models.Client),
  110. Viewers: make(map[string]*models.Viewer),
  111. SessionMaxViewerCount: configRepository.GetPeakSessionViewerCount(),
  112. OverallMaxViewerCount: configRepository.GetPeakOverallViewerCount(),
  113. LastDisconnectTime: savedLastDisconnectTime,
  114. }
  115. // If the stats were saved > 5min ago then ignore the
  116. // peak session count value, since the session is over.
  117. if result.LastDisconnectTime == nil || !result.LastDisconnectTime.Valid || time.Since(result.LastDisconnectTime.Time).Minutes() > 5 {
  118. result.SessionMaxViewerCount = 0
  119. }
  120. return result
  121. }