fileWriterReceiverService.go 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package transcoder
  2. import (
  3. "io"
  4. "net"
  5. "net/http"
  6. "os"
  7. "path/filepath"
  8. "strings"
  9. "github.com/owncast/owncast/config"
  10. "github.com/owncast/owncast/utils"
  11. log "github.com/sirupsen/logrus"
  12. )
  13. // FileWriterReceiverServiceCallback are to be fired when transcoder responses are written to disk.
  14. type FileWriterReceiverServiceCallback interface {
  15. SegmentWritten(localFilePath string)
  16. VariantPlaylistWritten(localFilePath string)
  17. MasterPlaylistWritten(localFilePath string)
  18. }
  19. // FileWriterReceiverService accepts transcoder responses via HTTP and fires the callbacks.
  20. // It is intended to be the middleman between the transcoder and the storage provider and allows
  21. // the transcoder process to be completely isolated and even run remotely in the future, as long
  22. // as it can send HTTP requests to this service with the results.
  23. type FileWriterReceiverService struct {
  24. callbacks FileWriterReceiverServiceCallback
  25. }
  26. // SetupFileWriterReceiverService will start listening for transcoder responses.
  27. func (s *FileWriterReceiverService) SetupFileWriterReceiverService(callbacks FileWriterReceiverServiceCallback) {
  28. s.callbacks = callbacks
  29. httpServer := http.NewServeMux()
  30. httpServer.HandleFunc("/", s.uploadHandler)
  31. localListenerAddress := "127.0.0.1:0"
  32. listener, err := net.Listen("tcp", localListenerAddress)
  33. if err != nil {
  34. log.Fatalln("Unable to start internal video writing service", err)
  35. }
  36. listenerPort := strings.Split(listener.Addr().String(), ":")[1]
  37. config.InternalHLSListenerPort = listenerPort
  38. log.Traceln("Transcoder response service listening on: " + listenerPort)
  39. go func() {
  40. //nolint: gosec
  41. if err := http.Serve(listener, httpServer); err != nil {
  42. log.Fatalln("Unable to start internal video writing service", err)
  43. }
  44. }()
  45. }
  46. func (s *FileWriterReceiverService) uploadHandler(w http.ResponseWriter, r *http.Request) {
  47. if r.Method != "PUT" {
  48. http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
  49. return
  50. }
  51. path := r.URL.Path
  52. writePath := filepath.Join(config.HLSStoragePath, path)
  53. f, err := os.Create(writePath) //nolint: gosec
  54. if err != nil {
  55. returnError(err, w)
  56. return
  57. }
  58. defer f.Close()
  59. if _, err := io.Copy(f, r.Body); err != nil {
  60. returnError(err, w)
  61. return
  62. }
  63. s.fileWritten(writePath)
  64. w.WriteHeader(http.StatusOK)
  65. }
  66. func (s *FileWriterReceiverService) fileWritten(path string) {
  67. if utils.GetRelativePathFromAbsolutePath(path) == "hls/stream.m3u8" {
  68. s.callbacks.MasterPlaylistWritten(path)
  69. } else if strings.HasSuffix(path, ".ts") {
  70. s.callbacks.SegmentWritten(path)
  71. } else if strings.HasSuffix(path, ".m3u8") {
  72. s.callbacks.VariantPlaylistWritten(path)
  73. }
  74. }
  75. func returnError(err error, w http.ResponseWriter) {
  76. log.Debugln(err)
  77. http.Error(w, http.StatusText(http.StatusInternalServerError)+": "+err.Error(), http.StatusInternalServerError)
  78. }