viewers.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package metrics
  2. import (
  3. "time"
  4. "github.com/nakabonne/tstorage"
  5. "github.com/owncast/owncast/core"
  6. "github.com/owncast/owncast/core/chat"
  7. "github.com/owncast/owncast/core/data"
  8. log "github.com/sirupsen/logrus"
  9. )
  10. var storage tstorage.Storage
  11. func startViewerCollectionMetrics() {
  12. storage, _ = tstorage.NewStorage(
  13. tstorage.WithTimestampPrecision(tstorage.Seconds),
  14. tstorage.WithDataPath("./data/metrics"),
  15. )
  16. defer storage.Close()
  17. collectViewerCount()
  18. for range time.Tick(viewerMetricsPollingInterval) {
  19. collectViewerCount()
  20. collectChatClientCount()
  21. }
  22. }
  23. func collectViewerCount() {
  24. // Don't collect metrics for viewers if there's no stream active.
  25. if !core.GetStatus().Online {
  26. return
  27. }
  28. count := core.GetStatus().ViewerCount
  29. // Save active viewer count to our Prometheus collector.
  30. activeViewerCount.Set(float64(count))
  31. // Insert active viewer count into our on-disk time series storage.
  32. if err := storage.InsertRows([]tstorage.Row{
  33. {
  34. Metric: activeViewerCountKey,
  35. DataPoint: tstorage.DataPoint{Timestamp: time.Now().Unix(), Value: float64(count)},
  36. },
  37. }); err != nil {
  38. log.Errorln(err)
  39. }
  40. }
  41. func collectChatClientCount() {
  42. count := len(chat.GetClients())
  43. activeChatClientCount.Set(float64(count))
  44. // Total message count
  45. cmc := data.GetMessagesCount()
  46. // Insert message count into Prometheus collector.
  47. currentChatMessageCount.Set(float64(cmc))
  48. // Total user count
  49. uc := data.GetUsersCount()
  50. // Insert user count into Prometheus collector.
  51. chatUserCount.Set(float64(uc))
  52. // Insert active chat user count into our on-disk time series storage.
  53. if err := storage.InsertRows([]tstorage.Row{
  54. {
  55. Metric: activeChatClientCountKey,
  56. DataPoint: tstorage.DataPoint{Timestamp: time.Now().Unix(), Value: float64(count)},
  57. },
  58. }); err != nil {
  59. log.Errorln(err)
  60. }
  61. }
  62. // GetViewersOverTime will return a window of viewer counts over time.
  63. func GetViewersOverTime(start, end time.Time) []TimestampedValue {
  64. p, err := storage.Select(activeViewerCountKey, nil, start.Unix(), end.Unix())
  65. if err != nil && err != tstorage.ErrNoDataPoints {
  66. log.Errorln(err)
  67. }
  68. datapoints := makeTimestampedValuesFromDatapoints(p)
  69. return datapoints
  70. }
  71. // GetChatClientCountOverTime will return a window of connected chat clients over time.
  72. func GetChatClientCountOverTime(start, end time.Time) []TimestampedValue {
  73. p, err := storage.Select(activeChatClientCountKey, nil, start.Unix(), end.Unix())
  74. if err != nil && err != tstorage.ErrNoDataPoints {
  75. log.Errorln(err)
  76. }
  77. datapoints := makeTimestampedValuesFromDatapoints(p)
  78. return datapoints
  79. }