webhookRepository.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. package storage
  2. import (
  3. "fmt"
  4. "strings"
  5. "time"
  6. "github.com/owncast/owncast/models"
  7. "github.com/owncast/owncast/storage/data"
  8. "github.com/pkg/errors"
  9. log "github.com/sirupsen/logrus"
  10. )
  11. type WebhookRepository struct {
  12. datastore *data.Store
  13. }
  14. func NewWebhookRepository(datastore *data.Store) *WebhookRepository {
  15. return &WebhookRepository{datastore: datastore}
  16. }
  17. var temporaryGlobalWebhooksInstance *WebhookRepository
  18. // GetWebhookRepository returns the shared instance of the owncast datastore.
  19. func GetWebhookRepository() *WebhookRepository {
  20. if temporaryGlobalWebhooksInstance == nil {
  21. temporaryGlobalWebhooksInstance = NewWebhookRepository(data.GetDatastore())
  22. }
  23. return temporaryGlobalWebhooksInstance
  24. }
  25. // InsertWebhook will add a new webhook to the database.
  26. func (w *WebhookRepository) InsertWebhook(url string, events []models.EventType) (int, error) {
  27. log.Traceln("Adding new webhook")
  28. eventsString := strings.Join(events, ",")
  29. tx, err := w.datastore.DB.Begin()
  30. if err != nil {
  31. return 0, err
  32. }
  33. stmt, err := tx.Prepare("INSERT INTO webhooks(url, events) values(?, ?)")
  34. if err != nil {
  35. return 0, err
  36. }
  37. defer stmt.Close()
  38. insertResult, err := stmt.Exec(url, eventsString)
  39. if err != nil {
  40. return 0, err
  41. }
  42. if err = tx.Commit(); err != nil {
  43. return 0, err
  44. }
  45. newID, err := insertResult.LastInsertId()
  46. if err != nil {
  47. return 0, err
  48. }
  49. return int(newID), err
  50. }
  51. // DeleteWebhook will delete a webhook from the database.
  52. func (w *WebhookRepository) DeleteWebhook(id int) error {
  53. log.Traceln("Deleting webhook")
  54. tx, err := w.datastore.DB.Begin()
  55. if err != nil {
  56. return err
  57. }
  58. stmt, err := tx.Prepare("DELETE FROM webhooks WHERE id = ?")
  59. if err != nil {
  60. return err
  61. }
  62. defer stmt.Close()
  63. result, err := stmt.Exec(id)
  64. if err != nil {
  65. return err
  66. }
  67. if rowsDeleted, _ := result.RowsAffected(); rowsDeleted == 0 {
  68. _ = tx.Rollback()
  69. return errors.New(fmt.Sprint(id) + " not found")
  70. }
  71. if err = tx.Commit(); err != nil {
  72. return err
  73. }
  74. return nil
  75. }
  76. // GetWebhooksForEvent will return all of the webhooks that want to be notified about an event type.
  77. func (w *WebhookRepository) GetWebhooksForEvent(event models.EventType) []models.Webhook {
  78. webhooks := make([]models.Webhook, 0)
  79. query := `SELECT * FROM (
  80. WITH RECURSIVE split(id, url, event, rest) AS (
  81. SELECT id, url, '', events || ',' FROM webhooks
  82. UNION ALL
  83. SELECT id, url,
  84. substr(rest, 0, instr(rest, ',')),
  85. substr(rest, instr(rest, ',')+1)
  86. FROM split
  87. WHERE rest <> '')
  88. SELECT id, url, event
  89. FROM split
  90. WHERE event <> ''
  91. ) AS webhook WHERE event IS "` + event + `"`
  92. rows, err := w.datastore.DB.Query(query)
  93. if err != nil || rows.Err() != nil {
  94. log.Fatal(err)
  95. }
  96. defer rows.Close()
  97. for rows.Next() {
  98. var id int
  99. var url string
  100. if err := rows.Scan(&id, &url, &event); err != nil {
  101. log.Debugln(err)
  102. log.Error("There is a problem with the database.")
  103. break
  104. }
  105. singleWebhook := models.Webhook{
  106. ID: id,
  107. URL: url,
  108. }
  109. webhooks = append(webhooks, singleWebhook)
  110. }
  111. return webhooks
  112. }
  113. // GetWebhooks will return all the webhooks.
  114. func (w *WebhookRepository) GetWebhooks() ([]models.Webhook, error) { //nolint
  115. webhooks := make([]models.Webhook, 0)
  116. query := "SELECT * FROM webhooks"
  117. rows, err := w.datastore.DB.Query(query)
  118. if err != nil {
  119. return webhooks, err
  120. }
  121. defer rows.Close()
  122. for rows.Next() {
  123. var id int
  124. var url string
  125. var events string
  126. var timestampString string
  127. var lastUsedString *string
  128. if err := rows.Scan(&id, &url, &events, &timestampString, &lastUsedString); err != nil {
  129. log.Error("There is a problem reading the database.", err)
  130. return webhooks, err
  131. }
  132. timestamp, err := time.Parse(time.RFC3339, timestampString)
  133. if err != nil {
  134. return webhooks, err
  135. }
  136. var lastUsed *time.Time
  137. if lastUsedString != nil {
  138. lastUsedTime, _ := time.Parse(time.RFC3339, *lastUsedString)
  139. lastUsed = &lastUsedTime
  140. }
  141. singleWebhook := models.Webhook{
  142. ID: id,
  143. URL: url,
  144. Events: strings.Split(events, ","),
  145. Timestamp: timestamp,
  146. LastUsed: lastUsed,
  147. }
  148. webhooks = append(webhooks, singleWebhook)
  149. }
  150. if err := rows.Err(); err != nil {
  151. return webhooks, err
  152. }
  153. return webhooks, nil
  154. }
  155. // SetWebhookAsUsed will update the last used time for a webhook.
  156. func (w *WebhookRepository) SetWebhookAsUsed(webhook models.Webhook) error {
  157. tx, err := w.datastore.DB.Begin()
  158. if err != nil {
  159. return err
  160. }
  161. stmt, err := tx.Prepare("UPDATE webhooks SET last_used = CURRENT_TIMESTAMP WHERE id = ?")
  162. if err != nil {
  163. return err
  164. }
  165. defer stmt.Close()
  166. if _, err := stmt.Exec(webhook.ID); err != nil {
  167. return err
  168. }
  169. if err = tx.Commit(); err != nil {
  170. return err
  171. }
  172. return nil
  173. }