webhooks_test.go 8.5 KB


  1. package webhooks
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "net/http"
  7. "net/http/httptest"
  8. "os"
  9. "sync"
  10. "sync/atomic"
  11. "testing"
  12. "time"
  13. "github.com/owncast/owncast/core/data"
  14. "github.com/owncast/owncast/models"
  15. jsonpatch "gopkg.in/evanphx/json-patch.v5"
  16. )
  17. func TestMain(m *testing.M) {
  18. dbFile, err := os.CreateTemp(os.TempDir(), "owncast-test-db.db")
  19. if err != nil {
  20. panic(err)
  21. }
  22. dbFile.Close()
  23. defer os.Remove(dbFile.Name())
  24. if err := data.SetupPersistence(dbFile.Name()); err != nil {
  25. panic(err)
  26. }
  27. InitWorkerPool()
  28. defer close(queue)
  29. m.Run()
  30. }
  31. // Because the other tests use `sendEventToWebhooks` with a `WaitGroup` to know when the test completes,
  32. // this test ensures that `SendToWebhooks` without a `WaitGroup` doesn't panic.
  33. func TestPublicSend(t *testing.T) {
  34. // Send enough events to be sure at least one worker delivers a second event.
  35. const eventsCount = webhookWorkerPoolSize + 1
  36. var wg sync.WaitGroup
  37. wg.Add(eventsCount)
  38. svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  39. wg.Done()
  40. }))
  41. defer svr.Close()
  42. hook, err := data.InsertWebhook(svr.URL, []models.EventType{models.MessageSent})
  43. if err != nil {
  44. t.Fatal(err)
  45. }
  46. defer func() {
  47. if err := data.DeleteWebhook(hook); err != nil {
  48. t.Error(err)
  49. }
  50. }()
  51. for i := 0; i < eventsCount; i++ {
  52. wh := WebhookEvent{
  53. EventData: struct{}{},
  54. Type: models.MessageSent,
  55. }
  56. SendEventToWebhooks(wh)
  57. }
  58. wg.Wait()
  59. }
  60. // Make sure that events are only sent to interested endpoints.
  61. func TestRouting(t *testing.T) {
  62. eventTypes := []models.EventType{models.PING, models.PONG}
  63. calls := map[models.EventType]int{}
  64. var lock sync.Mutex
  65. svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  66. if len(r.URL.Path) < 1 || r.URL.Path[0] != '/' {
  67. t.Fatalf("Got unexpected path %v", r.URL.Path)
  68. }
  69. pathType := r.URL.Path[1:]
  70. var body WebhookEvent
  71. if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
  72. t.Fatal(err)
  73. }
  74. if body.Type != pathType {
  75. t.Fatalf("Got %v payload on %v endpoint", body.Type, pathType)
  76. }
  77. lock.Lock()
  78. defer lock.Unlock()
  79. calls[pathType] += 1
  80. }))
  81. defer svr.Close()
  82. for _, eventType := range eventTypes {
  83. hook, err := data.InsertWebhook(svr.URL+"/"+eventType, []models.EventType{eventType})
  84. if err != nil {
  85. t.Fatal(err)
  86. }
  87. defer func() {
  88. if err := data.DeleteWebhook(hook); err != nil {
  89. t.Error(err)
  90. }
  91. }()
  92. }
  93. var wg sync.WaitGroup
  94. for _, eventType := range eventTypes {
  95. wh := WebhookEvent{
  96. EventData: struct{}{},
  97. Type: eventType,
  98. }
  99. sendEventToWebhooks(wh, &wg)
  100. }
  101. wg.Wait()
  102. for _, eventType := range eventTypes {
  103. if calls[eventType] != 1 {
  104. t.Errorf("Expected %v to be called exactly once but it was called %v times", eventType, calls[eventType])
  105. }
  106. }
  107. }
  108. // Make sure that events are sent to all interested endpoints.
  109. func TestMultiple(t *testing.T) {
  110. const times = 2
  111. var calls uint32
  112. svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  113. atomic.AddUint32(&calls, 1)
  114. }))
  115. defer svr.Close()
  116. for i := 0; i < times; i++ {
  117. hook, err := data.InsertWebhook(fmt.Sprintf("%v/%v", svr.URL, i), []models.EventType{models.MessageSent})
  118. if err != nil {
  119. t.Fatal(err)
  120. }
  121. defer func() {
  122. if err := data.DeleteWebhook(hook); err != nil {
  123. t.Error(err)
  124. }
  125. }()
  126. }
  127. var wg sync.WaitGroup
  128. wh := WebhookEvent{
  129. EventData: struct{}{},
  130. Type: models.MessageSent,
  131. }
  132. sendEventToWebhooks(wh, &wg)
  133. wg.Wait()
  134. if atomic.LoadUint32(&calls) != times {
  135. t.Errorf("Expected event to be sent exactly %v times but it was sent %v times", times, atomic.LoadUint32(&calls))
  136. }
  137. }
  138. // Make sure when a webhook is used its last used timestamp is updated.
  139. func TestTimestamps(t *testing.T) {
  140. const tolerance = time.Second
  141. start := time.Now()
  142. eventTypes := []models.EventType{models.PING, models.PONG}
  143. handlerIds := []int{0, 0}
  144. handlers := []*models.Webhook{nil, nil}
  145. svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  146. }))
  147. defer svr.Close()
  148. for i, eventType := range eventTypes {
  149. hook, err := data.InsertWebhook(svr.URL+"/"+eventType, []models.EventType{eventType})
  150. if err != nil {
  151. t.Fatal(err)
  152. }
  153. handlerIds[i] = hook
  154. defer func() {
  155. if err := data.DeleteWebhook(hook); err != nil {
  156. t.Error(err)
  157. }
  158. }()
  159. }
  160. var wg sync.WaitGroup
  161. wh := WebhookEvent{
  162. EventData: struct{}{},
  163. Type: eventTypes[0],
  164. }
  165. sendEventToWebhooks(wh, &wg)
  166. wg.Wait()
  167. hooks, err := data.GetWebhooks()
  168. if err != nil {
  169. t.Fatal(err)
  170. }
  171. for h, hook := range hooks {
  172. for i, handlerId := range handlerIds {
  173. if hook.ID == handlerId {
  174. handlers[i] = &hooks[h]
  175. }
  176. }
  177. }
  178. if handlers[0] == nil {
  179. t.Fatal("First handler was not found in registered handlers")
  180. }
  181. if handlers[1] == nil {
  182. t.Fatal("Second handler was not found in registered handlers")
  183. }
  184. end := time.Now()
  185. if handlers[0].Timestamp.Add(tolerance).Before(start) {
  186. t.Errorf("First handler timestamp %v should not be before start of test %v", handlers[0].Timestamp, start)
  187. }
  188. if handlers[0].Timestamp.Add(tolerance).Before(handlers[1].Timestamp) {
  189. t.Errorf("Second handler timestamp %v should not be before first handler timestamp %v", handlers[1].Timestamp, handlers[0].Timestamp)
  190. }
  191. if end.Add(tolerance).Before(handlers[1].Timestamp) {
  192. t.Errorf("End of test %v should not be before second handler timestamp %v", end, handlers[1].Timestamp)
  193. }
  194. if handlers[0].LastUsed == nil {
  195. t.Error("First handler last used should have been set")
  196. } else if handlers[0].LastUsed.Add(tolerance).Before(handlers[1].Timestamp) {
  197. t.Errorf("First handler last used %v should not be before second handler timestamp %v", handlers[0].LastUsed, handlers[1].Timestamp)
  198. } else if end.Add(tolerance).Before(*handlers[0].LastUsed) {
  199. t.Errorf("End of test %v should not be before first handler last used %v", end, handlers[0].LastUsed)
  200. }
  201. if handlers[1].LastUsed != nil {
  202. t.Error("Second handler last used should not have been set")
  203. }
  204. }
  205. // Make sure up to the expected number of events can be fired in parallel.
  206. func TestParallel(t *testing.T) {
  207. var calls uint32
  208. var wgStart sync.WaitGroup
  209. finished := make(chan int)
  210. wgStart.Add(webhookWorkerPoolSize)
  211. svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  212. myId := atomic.AddUint32(&calls, 1)
  213. // We made it to the pool size + 1 event, so we're done with the test.
  214. if myId == webhookWorkerPoolSize+1 {
  215. close(finished)
  216. return
  217. }
  218. // Wait until all the handlers are started.
  219. wgStart.Done()
  220. wgStart.Wait()
  221. // The first handler just returns so the pool size + 1 event can be handled.
  222. if myId != 1 {
  223. // The other handlers will wait for pool size + 1.
  224. _ = <-finished
  225. }
  226. }))
  227. defer svr.Close()
  228. hook, err := data.InsertWebhook(svr.URL, []models.EventType{models.MessageSent})
  229. if err != nil {
  230. t.Fatal(err)
  231. }
  232. defer func() {
  233. if err := data.DeleteWebhook(hook); err != nil {
  234. t.Error(err)
  235. }
  236. }()
  237. var wgMessages sync.WaitGroup
  238. for i := 0; i < webhookWorkerPoolSize+1; i++ {
  239. wh := WebhookEvent{
  240. EventData: struct{}{},
  241. Type: models.MessageSent,
  242. }
  243. sendEventToWebhooks(wh, &wgMessages)
  244. }
  245. wgMessages.Wait()
  246. }
  247. // Send an event, capture it, and verify that it has the expected payload.
  248. func checkPayload(t *testing.T, eventType models.EventType, send func(), expectedJson string) {
  249. eventChannel := make(chan WebhookEvent)
  250. // Set up a server.
  251. svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  252. data := WebhookEvent{}
  253. json.NewDecoder(r.Body).Decode(&data)
  254. eventChannel <- data
  255. }))
  256. defer svr.Close()
  257. // Subscribe to the webhook.
  258. hook, err := data.InsertWebhook(svr.URL, []models.EventType{eventType})
  259. if err != nil {
  260. t.Fatal(err)
  261. }
  262. defer func() {
  263. if err := data.DeleteWebhook(hook); err != nil {
  264. t.Error(err)
  265. }
  266. }()
  267. // Send and capture the event.
  268. send()
  269. event := <-eventChannel
  270. if event.Type != eventType {
  271. t.Errorf("Got event type %v but expected %v", event.Type, eventType)
  272. }
  273. // Compare.
  274. payloadJson, err := json.MarshalIndent(event.EventData, "", " ")
  275. if err != nil {
  276. t.Fatal(err)
  277. }
  278. t.Logf("Actual payload:\n%s", payloadJson)
  279. if !jsonpatch.Equal(payloadJson, []byte(expectedJson)) {
  280. diff, err := jsonpatch.CreateMergePatch(payloadJson, []byte(expectedJson))
  281. if err != nil {
  282. t.Fatal(err)
  283. }
  284. var out bytes.Buffer
  285. if err := json.Indent(&out, diff, "", " "); err != nil {
  286. t.Fatal(err)
  287. }
  288. t.Errorf("Expected difference from actual payload:\n%s", out.Bytes())
  289. }
  290. }