persistence.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. package chat
  2. import (
  3. "context"
  4. "database/sql"
  5. "strings"
  6. "time"
  7. "github.com/owncast/owncast/core/chat/events"
  8. "github.com/owncast/owncast/core/data"
  9. "github.com/owncast/owncast/core/user"
  10. "github.com/owncast/owncast/models"
  11. log "github.com/sirupsen/logrus"
  12. )
  13. var _datastore *data.Datastore
  14. const (
  15. maxBacklogHours = 2 // Keep backlog max hours worth of messages
  16. maxBacklogNumber = 50 // Return max number of messages in history request
  17. )
  18. func setupPersistence() {
  19. _datastore = data.GetDatastore()
  20. data.CreateMessagesTable(_datastore.DB)
  21. data.CreateBanIPTable(_datastore.DB)
  22. chatDataPruner := time.NewTicker(5 * time.Minute)
  23. go func() {
  24. runPruner()
  25. for range chatDataPruner.C {
  26. runPruner()
  27. }
  28. }()
  29. }
  30. // SaveUserMessage will save a single chat event to the messages database.
  31. func SaveUserMessage(event events.UserMessageEvent) {
  32. saveEvent(event.ID, &event.User.ID, event.Body, event.Type, event.HiddenAt, event.Timestamp, nil, nil, nil, nil)
  33. }
  34. func saveFederatedAction(event events.FediverseEngagementEvent) {
  35. saveEvent(event.ID, nil, event.Body, event.Type, nil, event.Timestamp, event.Image, &event.Link, &event.UserAccountName, nil)
  36. }
  37. // nolint: unparam
  38. func saveEvent(id string, userID *string, body string, eventType string, hidden *time.Time, timestamp time.Time, image *string, link *string, title *string, subtitle *string) {
  39. defer func() {
  40. _historyCache = nil
  41. }()
  42. tx, err := _datastore.DB.Begin()
  43. if err != nil {
  44. log.Errorln("error saving", eventType, err)
  45. return
  46. }
  47. defer tx.Rollback() // nolint
  48. stmt, err := tx.Prepare("INSERT INTO messages(id, user_id, body, eventType, hidden_at, timestamp, image, link, title, subtitle) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
  49. if err != nil {
  50. log.Errorln("error saving", eventType, err)
  51. return
  52. }
  53. defer stmt.Close()
  54. if _, err = stmt.Exec(id, userID, body, eventType, hidden, timestamp, image, link, title, subtitle); err != nil {
  55. log.Errorln("error saving", eventType, err)
  56. return
  57. }
  58. if err = tx.Commit(); err != nil {
  59. log.Errorln("error saving", eventType, err)
  60. return
  61. }
  62. }
  63. func makeUserMessageEventFromRowData(row rowData) events.UserMessageEvent {
  64. scopes := ""
  65. if row.userScopes != nil {
  66. scopes = *row.userScopes
  67. }
  68. previousUsernames := []string{}
  69. if row.previousUsernames != nil {
  70. previousUsernames = strings.Split(*row.previousUsernames, ",")
  71. }
  72. displayName := ""
  73. if row.userDisplayName != nil {
  74. displayName = *row.userDisplayName
  75. }
  76. displayColor := 0
  77. if row.userDisplayColor != nil {
  78. displayColor = *row.userDisplayColor
  79. }
  80. createdAt := time.Time{}
  81. if row.userCreatedAt != nil {
  82. createdAt = *row.userCreatedAt
  83. }
  84. isBot := (row.userType != nil && *row.userType == "API")
  85. scopeSlice := strings.Split(scopes, ",")
  86. u := user.User{
  87. ID: *row.userID,
  88. DisplayName: displayName,
  89. DisplayColor: displayColor,
  90. CreatedAt: createdAt,
  91. DisabledAt: row.userDisabledAt,
  92. NameChangedAt: row.userNameChangedAt,
  93. PreviousNames: previousUsernames,
  94. AuthenticatedAt: row.userAuthenticatedAt,
  95. Authenticated: row.userAuthenticatedAt != nil,
  96. Scopes: scopeSlice,
  97. IsBot: isBot,
  98. }
  99. message := events.UserMessageEvent{
  100. Event: events.Event{
  101. Type: row.eventType,
  102. ID: row.id,
  103. Timestamp: row.timestamp,
  104. },
  105. UserEvent: events.UserEvent{
  106. User: &u,
  107. HiddenAt: row.hiddenAt,
  108. },
  109. MessageEvent: events.MessageEvent{
  110. Body: row.body,
  111. RawBody: row.body,
  112. },
  113. }
  114. return message
  115. }
  116. func makeSystemMessageChatEventFromRowData(row rowData) events.SystemMessageEvent {
  117. message := events.SystemMessageEvent{
  118. Event: events.Event{
  119. Type: row.eventType,
  120. ID: row.id,
  121. Timestamp: row.timestamp,
  122. },
  123. MessageEvent: events.MessageEvent{
  124. Body: row.body,
  125. RawBody: row.body,
  126. },
  127. }
  128. return message
  129. }
  130. func makeActionMessageChatEventFromRowData(row rowData) events.ActionEvent {
  131. message := events.ActionEvent{
  132. Event: events.Event{
  133. Type: row.eventType,
  134. ID: row.id,
  135. Timestamp: row.timestamp,
  136. },
  137. MessageEvent: events.MessageEvent{
  138. Body: row.body,
  139. RawBody: row.body,
  140. },
  141. }
  142. return message
  143. }
  144. func makeFederatedActionChatEventFromRowData(row rowData) events.FediverseEngagementEvent {
  145. message := events.FediverseEngagementEvent{
  146. Event: events.Event{
  147. Type: row.eventType,
  148. ID: row.id,
  149. Timestamp: row.timestamp,
  150. },
  151. MessageEvent: events.MessageEvent{
  152. Body: row.body,
  153. RawBody: row.body,
  154. },
  155. Image: row.image,
  156. Link: *row.link,
  157. UserAccountName: *row.title,
  158. }
  159. return message
  160. }
  161. type rowData struct {
  162. id string
  163. userID *string
  164. body string
  165. eventType models.EventType
  166. hiddenAt *time.Time
  167. timestamp time.Time
  168. title *string
  169. subtitle *string
  170. image *string
  171. link *string
  172. userDisplayName *string
  173. userDisplayColor *int
  174. userCreatedAt *time.Time
  175. userDisabledAt *time.Time
  176. previousUsernames *string
  177. userNameChangedAt *time.Time
  178. userAuthenticatedAt *time.Time
  179. userScopes *string
  180. userType *string
  181. }
  182. func getChat(rows *sql.Rows) ([]interface{}, error) {
  183. history := make([]interface{}, 0)
  184. for rows.Next() {
  185. row := rowData{}
  186. // Convert a database row into a chat event
  187. if err := rows.Scan(
  188. &row.id,
  189. &row.userID,
  190. &row.body,
  191. &row.title,
  192. &row.subtitle,
  193. &row.image,
  194. &row.link,
  195. &row.eventType,
  196. &row.hiddenAt,
  197. &row.timestamp,
  198. &row.userDisplayName,
  199. &row.userDisplayColor,
  200. &row.userCreatedAt,
  201. &row.userDisabledAt,
  202. &row.previousUsernames,
  203. &row.userNameChangedAt,
  204. &row.userAuthenticatedAt,
  205. &row.userScopes,
  206. &row.userType,
  207. ); err != nil {
  208. return nil, err
  209. }
  210. var message interface{}
  211. switch row.eventType {
  212. case events.MessageSent:
  213. message = makeUserMessageEventFromRowData(row)
  214. case events.SystemMessageSent:
  215. message = makeSystemMessageChatEventFromRowData(row)
  216. case events.ChatActionSent:
  217. message = makeActionMessageChatEventFromRowData(row)
  218. case events.FediverseEngagementFollow:
  219. message = makeFederatedActionChatEventFromRowData(row)
  220. case events.FediverseEngagementLike:
  221. message = makeFederatedActionChatEventFromRowData(row)
  222. case events.FediverseEngagementRepost:
  223. message = makeFederatedActionChatEventFromRowData(row)
  224. }
  225. history = append(history, message)
  226. }
  227. return history, nil
  228. }
  229. var _historyCache *[]interface{}
  230. // GetChatModerationHistory will return all the chat messages suitable for moderation purposes.
  231. func GetChatModerationHistory() []interface{} {
  232. if _historyCache != nil {
  233. return *_historyCache
  234. }
  235. tx, err := _datastore.DB.Begin()
  236. if err != nil {
  237. log.Errorln("error fetching chat moderation history", err)
  238. return nil
  239. }
  240. defer tx.Rollback() // nolint
  241. // Get all messages regardless of visibility
  242. query := "SELECT messages.id, user_id, body, title, subtitle, image, link, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, authenticated_at, scopes, type FROM messages INNER JOIN users ON messages.user_id = users.id ORDER BY timestamp DESC"
  243. stmt, err := tx.Prepare(query)
  244. if err != nil {
  245. log.Errorln("error fetching chat moderation history", err)
  246. return nil
  247. }
  248. rows, err := stmt.Query()
  249. if err != nil {
  250. log.Errorln("error fetching chat moderation history", err)
  251. return nil
  252. }
  253. defer stmt.Close()
  254. defer rows.Close()
  255. result, err := getChat(rows)
  256. if err != nil {
  257. log.Errorln(err)
  258. log.Errorln("There is a problem enumerating chat message rows. Please report this:", query)
  259. return nil
  260. }
  261. _historyCache = &result
  262. if err = tx.Commit(); err != nil {
  263. log.Errorln("error fetching chat moderation history", err)
  264. return nil
  265. }
  266. return result
  267. }
  268. // GetChatHistory will return all the chat messages suitable for returning as user-facing chat history.
  269. func GetChatHistory() []interface{} {
  270. tx, err := _datastore.DB.Begin()
  271. if err != nil {
  272. log.Errorln("error fetching chat history", err)
  273. return nil
  274. }
  275. defer tx.Rollback() // nolint
  276. // Get all visible messages
  277. query := "SELECT messages.id, messages.user_id, messages.body, messages.title, messages.subtitle, messages.image, messages.link, messages.eventType, messages.hidden_at, messages.timestamp, users.display_name, users.display_color, users.created_at, users.disabled_at, users.previous_names, users.namechanged_at, users.authenticated_at, users.scopes, users.type FROM users JOIN messages ON users.id = messages.user_id WHERE hidden_at IS NULL AND disabled_at IS NULL ORDER BY timestamp DESC LIMIT ?"
  278. stmt, err := tx.Prepare(query)
  279. if err != nil {
  280. log.Errorln("error fetching chat history", err)
  281. return nil
  282. }
  283. rows, err := stmt.Query(maxBacklogNumber)
  284. if err != nil {
  285. log.Errorln("error fetching chat history", err)
  286. return nil
  287. }
  288. defer stmt.Close()
  289. defer rows.Close()
  290. m, err := getChat(rows)
  291. if err != nil {
  292. log.Errorln(err)
  293. log.Errorln("There is a problem enumerating chat message rows. Please report this:", query)
  294. return nil
  295. }
  296. if err = tx.Commit(); err != nil {
  297. log.Errorln("error fetching chat history", err)
  298. return nil
  299. }
  300. // Invert order of messages
  301. for i, j := 0, len(m)-1; i < j; i, j = i+1, j-1 {
  302. m[i], m[j] = m[j], m[i]
  303. }
  304. return m
  305. }
  306. // GetMessagesFromUser returns chat messages that were sent by a specific user.
  307. func GetMessagesFromUser(userID string) ([]events.UserMessageEvent, error) {
  308. query, err := _datastore.GetQueries().GetMessagesFromUser(context.Background(), sql.NullString{String: userID, Valid: true})
  309. if err != nil {
  310. return nil, err
  311. }
  312. results := make([]events.UserMessageEvent, len(query))
  313. for i, row := range query {
  314. results[i] = events.UserMessageEvent{
  315. Event: events.Event{
  316. Timestamp: row.Timestamp.Time,
  317. ID: row.ID,
  318. },
  319. MessageEvent: events.MessageEvent{
  320. Body: row.Body.String,
  321. },
  322. }
  323. }
  324. return results, nil
  325. }
  326. // SetMessageVisibilityForUserID will bulk change the visibility of messages for a user
  327. // and then send out visibility changed events to chat clients.
  328. func SetMessageVisibilityForUserID(userID string, visible bool) error {
  329. defer func() {
  330. _historyCache = nil
  331. }()
  332. tx, err := _datastore.DB.Begin()
  333. if err != nil {
  334. log.Errorln("error while setting message visibility", err)
  335. return nil
  336. }
  337. defer tx.Rollback() // nolint
  338. query := "SELECT messages.id, user_id, body, title, subtitle, image, link, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, authenticated_at, scopes, type FROM messages INNER JOIN users ON messages.user_id = users.id WHERE user_id IS ?"
  339. stmt, err := tx.Prepare(query)
  340. if err != nil {
  341. log.Errorln("error while setting message visibility", err)
  342. return nil
  343. }
  344. rows, err := stmt.Query(userID)
  345. if err != nil {
  346. log.Errorln("error while setting message visibility", err)
  347. return nil
  348. }
  349. defer stmt.Close()
  350. defer rows.Close()
  351. // Get a list of IDs to send to the connected clients to hide
  352. ids := make([]string, 0)
  353. messages, err := getChat(rows)
  354. if err != nil {
  355. log.Errorln(err)
  356. log.Errorln("There is a problem enumerating chat message rows. Please report this:", query)
  357. return nil
  358. }
  359. if len(messages) == 0 {
  360. return nil
  361. }
  362. for _, message := range messages {
  363. ids = append(ids, message.(events.UserMessageEvent).ID)
  364. }
  365. if err = tx.Commit(); err != nil {
  366. log.Errorln("error while setting message visibility ", err)
  367. return nil
  368. }
  369. // Tell the clients to hide/show these messages.
  370. return SetMessagesVisibility(ids, visible)
  371. }
  372. func saveMessageVisibility(messageIDs []string, visible bool) error {
  373. defer func() {
  374. _historyCache = nil
  375. }()
  376. _datastore.DbLock.Lock()
  377. defer _datastore.DbLock.Unlock()
  378. tx, err := _datastore.DB.Begin()
  379. if err != nil {
  380. return err
  381. }
  382. // nolint:gosec
  383. stmt, err := tx.Prepare("UPDATE messages SET hidden_at=? WHERE id IN (?" + strings.Repeat(",?", len(messageIDs)-1) + ")")
  384. if err != nil {
  385. return err
  386. }
  387. defer stmt.Close()
  388. var hiddenAt *time.Time
  389. if !visible {
  390. now := time.Now()
  391. hiddenAt = &now
  392. } else {
  393. hiddenAt = nil
  394. }
  395. args := make([]interface{}, len(messageIDs)+1)
  396. args[0] = hiddenAt
  397. for i, id := range messageIDs {
  398. args[i+1] = id
  399. }
  400. if _, err = stmt.Exec(args...); err != nil {
  401. return err
  402. }
  403. if err = tx.Commit(); err != nil {
  404. return err
  405. }
  406. return nil
  407. }