workerpool.go 1011 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. package inbox
  2. import (
  3. "runtime"
  4. "github.com/owncast/owncast/activitypub/apmodels"
  5. log "github.com/sirupsen/logrus"
  6. )
  7. // workerPoolSize defines the number of concurrent ActivityPub handlers.
  8. var workerPoolSize = runtime.GOMAXPROCS(0)
  9. // Job struct bundling the ActivityPub and the payload in one struct.
  10. type Job struct {
  11. request apmodels.InboxRequest
  12. }
  13. var queue chan Job
  14. // InitInboxWorkerPool starts n go routines that await ActivityPub jobs.
  15. func InitInboxWorkerPool() {
  16. queue = make(chan Job)
  17. // start workers
  18. for i := 1; i <= workerPoolSize; i++ {
  19. go worker(i, queue)
  20. }
  21. }
  22. // AddToQueue will queue up an outbound http request.
  23. func AddToQueue(req apmodels.InboxRequest) {
  24. log.Tracef("Queued request for ActivityPub inbox handler")
  25. queue <- Job{req}
  26. }
  27. func worker(workerID int, queue <-chan Job) {
  28. log.Debugf("Started ActivityPub worker %d", workerID)
  29. for job := range queue {
  30. handle(job.request)
  31. log.Tracef("Done with ActivityPub inbox handler using worker %d", workerID)
  32. }
  33. }