s3Storage.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package storageproviders
  2. import (
  3. "bufio"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "strings"
  8. "github.com/owncast/owncast/core/data"
  9. "github.com/owncast/owncast/core/playlist"
  10. "github.com/owncast/owncast/utils"
  11. log "github.com/sirupsen/logrus"
  12. "github.com/aws/aws-sdk-go/aws"
  13. "github.com/aws/aws-sdk-go/aws/credentials"
  14. "github.com/aws/aws-sdk-go/aws/session"
  15. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  16. "github.com/owncast/owncast/config"
  17. "github.com/grafov/m3u8"
  18. )
  19. // S3Storage is the s3 implementation of a storage provider.
  20. type S3Storage struct {
  21. sess *session.Session
  22. host string
  23. s3Endpoint string
  24. s3ServingEndpoint string
  25. s3Region string
  26. s3Bucket string
  27. s3AccessKey string
  28. s3Secret string
  29. s3ACL string
  30. s3ForcePathStyle bool
  31. // If we try to upload a playlist but it is not yet on disk
  32. // then keep a reference to it here.
  33. queuedPlaylistUpdates map[string]string
  34. uploader *s3manager.Uploader
  35. }
  36. // NewS3Storage returns a new S3Storage instance.
  37. func NewS3Storage() *S3Storage {
  38. return &S3Storage{
  39. queuedPlaylistUpdates: make(map[string]string),
  40. }
  41. }
  42. // Setup sets up the s3 storage for saving the video to s3.
  43. func (s *S3Storage) Setup() error {
  44. log.Trace("Setting up S3 for external storage of video...")
  45. s3Config := data.GetS3Config()
  46. if s3Config.ServingEndpoint != "" {
  47. s.host = s3Config.ServingEndpoint
  48. } else {
  49. s.host = fmt.Sprintf("%s/%s", s3Config.Endpoint, s3Config.Bucket)
  50. }
  51. s.s3Endpoint = s3Config.Endpoint
  52. s.s3ServingEndpoint = s3Config.ServingEndpoint
  53. s.s3Region = s3Config.Region
  54. s.s3Bucket = s3Config.Bucket
  55. s.s3AccessKey = s3Config.AccessKey
  56. s.s3Secret = s3Config.Secret
  57. s.s3ACL = s3Config.ACL
  58. s.s3ForcePathStyle = s3Config.ForcePathStyle
  59. s.sess = s.connectAWS()
  60. s.uploader = s3manager.NewUploader(s.sess)
  61. return nil
  62. }
  63. // SegmentWritten is called when a single segment of video is written.
  64. func (s *S3Storage) SegmentWritten(localFilePath string) {
  65. index := utils.GetIndexFromFilePath(localFilePath)
  66. performanceMonitorKey := "s3upload-" + index
  67. utils.StartPerformanceMonitor(performanceMonitorKey)
  68. // Upload the segment
  69. if _, err := s.Save(localFilePath, 0); err != nil {
  70. log.Errorln(err)
  71. return
  72. }
  73. averagePerformance := utils.GetAveragePerformance(performanceMonitorKey)
  74. // Warn the user about long-running save operations
  75. if averagePerformance != 0 {
  76. if averagePerformance > float64(data.GetStreamLatencyLevel().SecondsPerSegment)*0.9 {
  77. log.Warnln("Possible slow uploads: average upload S3 save duration", averagePerformance, "s. troubleshoot this issue by visiting https://owncast.online/docs/troubleshooting/")
  78. }
  79. }
  80. // Upload the variant playlist for this segment
  81. // so the segments and the HLS playlist referencing
  82. // them are in sync.
  83. playlistPath := filepath.Join(filepath.Dir(localFilePath), "stream.m3u8")
  84. if _, err := s.Save(playlistPath, 0); err != nil {
  85. s.queuedPlaylistUpdates[playlistPath] = playlistPath
  86. if pErr, ok := err.(*os.PathError); ok {
  87. log.Debugln(pErr.Path, "does not yet exist locally when trying to upload to S3 storage.")
  88. return
  89. }
  90. }
  91. }
  92. // VariantPlaylistWritten is called when a variant hls playlist is written.
  93. func (s *S3Storage) VariantPlaylistWritten(localFilePath string) {
  94. // We are uploading the variant playlist after uploading the segment
  95. // to make sure we're not referring to files in a playlist that don't
  96. // yet exist. See SegmentWritten.
  97. if _, ok := s.queuedPlaylistUpdates[localFilePath]; ok {
  98. if _, err := s.Save(localFilePath, 0); err != nil {
  99. log.Errorln(err)
  100. s.queuedPlaylistUpdates[localFilePath] = localFilePath
  101. }
  102. delete(s.queuedPlaylistUpdates, localFilePath)
  103. }
  104. }
  105. // MasterPlaylistWritten is called when the master hls playlist is written.
  106. func (s *S3Storage) MasterPlaylistWritten(localFilePath string) {
  107. // Rewrite the playlist to use absolute remote S3 URLs
  108. if err := s.rewriteRemotePlaylist(localFilePath); err != nil {
  109. log.Warnln(err)
  110. }
  111. }
  112. // Save saves the file to the s3 bucket.
  113. func (s *S3Storage) Save(filePath string, retryCount int) (string, error) {
  114. file, err := os.Open(filePath) // nolint
  115. if err != nil {
  116. return "", err
  117. }
  118. defer file.Close()
  119. // Convert the local path to the variant/file path by stripping the local storage location.
  120. normalizedPath := strings.TrimPrefix(filePath, config.HLSStoragePath)
  121. // Build the remote path by adding the "hls" path prefix.
  122. remotePath := strings.Join([]string{"hls", normalizedPath}, "")
  123. maxAgeSeconds := utils.GetCacheDurationSecondsForPath(filePath)
  124. cacheControlHeader := fmt.Sprintf("max-age=%d", maxAgeSeconds)
  125. uploadInput := &s3manager.UploadInput{
  126. Bucket: aws.String(s.s3Bucket), // Bucket to be used
  127. Key: aws.String(remotePath), // Name of the file to be saved
  128. Body: file, // File
  129. CacheControl: &cacheControlHeader,
  130. }
  131. if s.s3ACL != "" {
  132. uploadInput.ACL = aws.String(s.s3ACL)
  133. } else {
  134. // Default ACL
  135. uploadInput.ACL = aws.String("public-read")
  136. }
  137. response, err := s.uploader.Upload(uploadInput)
  138. if err != nil {
  139. log.Traceln("error uploading:", filePath, err.Error())
  140. if retryCount < 4 {
  141. log.Traceln("Retrying...")
  142. return s.Save(filePath, retryCount+1)
  143. }
  144. log.Warnln("Giving up on", filePath, err)
  145. return "", fmt.Errorf("Giving up on %s", filePath)
  146. }
  147. return response.Location, nil
  148. }
  149. func (s *S3Storage) connectAWS() *session.Session {
  150. creds := credentials.NewStaticCredentials(s.s3AccessKey, s.s3Secret, "")
  151. _, err := creds.Get()
  152. if err != nil {
  153. log.Panicln(err)
  154. }
  155. sess, err := session.NewSession(
  156. &aws.Config{
  157. Region: aws.String(s.s3Region),
  158. Credentials: creds,
  159. Endpoint: aws.String(s.s3Endpoint),
  160. S3ForcePathStyle: aws.Bool(s.s3ForcePathStyle),
  161. },
  162. )
  163. if err != nil {
  164. log.Panicln(err)
  165. }
  166. return sess
  167. }
  168. // rewriteRemotePlaylist will take a local playlist and rewrite it to have absolute URLs to remote locations.
  169. func (s *S3Storage) rewriteRemotePlaylist(filePath string) error {
  170. f, err := os.Open(filePath) // nolint
  171. if err != nil {
  172. log.Fatalln(err)
  173. }
  174. p := m3u8.NewMasterPlaylist()
  175. if err := p.DecodeFrom(bufio.NewReader(f), false); err != nil {
  176. log.Warnln(err)
  177. }
  178. for _, item := range p.Variants {
  179. item.URI = s.host + filepath.Join("/hls", item.URI)
  180. }
  181. publicPath := filepath.Join(config.HLSStoragePath, filepath.Base(filePath))
  182. newPlaylist := p.String()
  183. return playlist.WritePlaylist(newPlaylist, publicPath)
  184. }