123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487 |
- package chat
- import (
- "context"
- "database/sql"
- "strings"
- "time"
- "github.com/owncast/owncast/core/chat/events"
- "github.com/owncast/owncast/core/data"
- "github.com/owncast/owncast/core/user"
- "github.com/owncast/owncast/models"
- log "github.com/sirupsen/logrus"
- )
- var _datastore *data.Datastore
- const (
- maxBacklogHours = 2 // Keep backlog max hours worth of messages
- maxBacklogNumber = 50 // Return max number of messages in history request
- )
- func setupPersistence() {
- _datastore = data.GetDatastore()
- data.CreateMessagesTable(_datastore.DB)
- data.CreateBanIPTable(_datastore.DB)
- chatDataPruner := time.NewTicker(5 * time.Minute)
- go func() {
- runPruner()
- for range chatDataPruner.C {
- runPruner()
- }
- }()
- }
- // SaveUserMessage will save a single chat event to the messages database.
- func SaveUserMessage(event events.UserMessageEvent) {
- saveEvent(event.ID, &event.User.ID, event.Body, event.Type, event.HiddenAt, event.Timestamp, nil, nil, nil, nil)
- }
- func saveFederatedAction(event events.FediverseEngagementEvent) {
- saveEvent(event.ID, nil, event.Body, event.Type, nil, event.Timestamp, event.Image, &event.Link, &event.UserAccountName, nil)
- }
- // nolint: unparam
- func saveEvent(id string, userID *string, body string, eventType string, hidden *time.Time, timestamp time.Time, image *string, link *string, title *string, subtitle *string) {
- defer func() {
- _historyCache = nil
- }()
- tx, err := _datastore.DB.Begin()
- if err != nil {
- log.Errorln("error saving", eventType, err)
- return
- }
- defer tx.Rollback() // nolint
- stmt, err := tx.Prepare("INSERT INTO messages(id, user_id, body, eventType, hidden_at, timestamp, image, link, title, subtitle) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
- if err != nil {
- log.Errorln("error saving", eventType, err)
- return
- }
- defer stmt.Close()
- if _, err = stmt.Exec(id, userID, body, eventType, hidden, timestamp, image, link, title, subtitle); err != nil {
- log.Errorln("error saving", eventType, err)
- return
- }
- if err = tx.Commit(); err != nil {
- log.Errorln("error saving", eventType, err)
- return
- }
- }
- func makeUserMessageEventFromRowData(row rowData) events.UserMessageEvent {
- scopes := ""
- if row.userScopes != nil {
- scopes = *row.userScopes
- }
- previousUsernames := []string{}
- if row.previousUsernames != nil {
- previousUsernames = strings.Split(*row.previousUsernames, ",")
- }
- displayName := ""
- if row.userDisplayName != nil {
- displayName = *row.userDisplayName
- }
- displayColor := 0
- if row.userDisplayColor != nil {
- displayColor = *row.userDisplayColor
- }
- createdAt := time.Time{}
- if row.userCreatedAt != nil {
- createdAt = *row.userCreatedAt
- }
- isBot := (row.userType != nil && *row.userType == "API")
- scopeSlice := strings.Split(scopes, ",")
- u := user.User{
- ID: *row.userID,
- DisplayName: displayName,
- DisplayColor: displayColor,
- CreatedAt: createdAt,
- DisabledAt: row.userDisabledAt,
- NameChangedAt: row.userNameChangedAt,
- PreviousNames: previousUsernames,
- AuthenticatedAt: row.userAuthenticatedAt,
- Authenticated: row.userAuthenticatedAt != nil,
- Scopes: scopeSlice,
- IsBot: isBot,
- }
- message := events.UserMessageEvent{
- Event: events.Event{
- Type: row.eventType,
- ID: row.id,
- Timestamp: row.timestamp,
- },
- UserEvent: events.UserEvent{
- User: &u,
- HiddenAt: row.hiddenAt,
- },
- MessageEvent: events.MessageEvent{
- Body: row.body,
- RawBody: row.body,
- },
- }
- return message
- }
- func makeSystemMessageChatEventFromRowData(row rowData) events.SystemMessageEvent {
- message := events.SystemMessageEvent{
- Event: events.Event{
- Type: row.eventType,
- ID: row.id,
- Timestamp: row.timestamp,
- },
- MessageEvent: events.MessageEvent{
- Body: row.body,
- RawBody: row.body,
- },
- }
- return message
- }
- func makeActionMessageChatEventFromRowData(row rowData) events.ActionEvent {
- message := events.ActionEvent{
- Event: events.Event{
- Type: row.eventType,
- ID: row.id,
- Timestamp: row.timestamp,
- },
- MessageEvent: events.MessageEvent{
- Body: row.body,
- RawBody: row.body,
- },
- }
- return message
- }
- func makeFederatedActionChatEventFromRowData(row rowData) events.FediverseEngagementEvent {
- message := events.FediverseEngagementEvent{
- Event: events.Event{
- Type: row.eventType,
- ID: row.id,
- Timestamp: row.timestamp,
- },
- MessageEvent: events.MessageEvent{
- Body: row.body,
- RawBody: row.body,
- },
- Image: row.image,
- Link: *row.link,
- UserAccountName: *row.title,
- }
- return message
- }
- type rowData struct {
- id string
- userID *string
- body string
- eventType models.EventType
- hiddenAt *time.Time
- timestamp time.Time
- title *string
- subtitle *string
- image *string
- link *string
- userDisplayName *string
- userDisplayColor *int
- userCreatedAt *time.Time
- userDisabledAt *time.Time
- previousUsernames *string
- userNameChangedAt *time.Time
- userAuthenticatedAt *time.Time
- userScopes *string
- userType *string
- }
- func getChat(rows *sql.Rows) ([]interface{}, error) {
- history := make([]interface{}, 0)
- for rows.Next() {
- row := rowData{}
- // Convert a database row into a chat event
- if err := rows.Scan(
- &row.id,
- &row.userID,
- &row.body,
- &row.title,
- &row.subtitle,
- &row.image,
- &row.link,
- &row.eventType,
- &row.hiddenAt,
- &row.timestamp,
- &row.userDisplayName,
- &row.userDisplayColor,
- &row.userCreatedAt,
- &row.userDisabledAt,
- &row.previousUsernames,
- &row.userNameChangedAt,
- &row.userAuthenticatedAt,
- &row.userScopes,
- &row.userType,
- ); err != nil {
- return nil, err
- }
- var message interface{}
- switch row.eventType {
- case events.MessageSent:
- message = makeUserMessageEventFromRowData(row)
- case events.SystemMessageSent:
- message = makeSystemMessageChatEventFromRowData(row)
- case events.ChatActionSent:
- message = makeActionMessageChatEventFromRowData(row)
- case events.FediverseEngagementFollow:
- message = makeFederatedActionChatEventFromRowData(row)
- case events.FediverseEngagementLike:
- message = makeFederatedActionChatEventFromRowData(row)
- case events.FediverseEngagementRepost:
- message = makeFederatedActionChatEventFromRowData(row)
- }
- history = append(history, message)
- }
- return history, nil
- }
- var _historyCache *[]interface{}
- // GetChatModerationHistory will return all the chat messages suitable for moderation purposes.
- func GetChatModerationHistory() []interface{} {
- if _historyCache != nil {
- return *_historyCache
- }
- tx, err := _datastore.DB.Begin()
- if err != nil {
- log.Errorln("error fetching chat moderation history", err)
- return nil
- }
- defer tx.Rollback() // nolint
- // Get all messages regardless of visibility
- query := "SELECT messages.id, user_id, body, title, subtitle, image, link, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, authenticated_at, scopes, type FROM messages INNER JOIN users ON messages.user_id = users.id ORDER BY timestamp DESC"
- stmt, err := tx.Prepare(query)
- if err != nil {
- log.Errorln("error fetching chat moderation history", err)
- return nil
- }
- rows, err := stmt.Query()
- if err != nil {
- log.Errorln("error fetching chat moderation history", err)
- return nil
- }
- defer stmt.Close()
- defer rows.Close()
- result, err := getChat(rows)
- if err != nil {
- log.Errorln(err)
- log.Errorln("There is a problem enumerating chat message rows. Please report this:", query)
- return nil
- }
- _historyCache = &result
- if err = tx.Commit(); err != nil {
- log.Errorln("error fetching chat moderation history", err)
- return nil
- }
- return result
- }
- // GetChatHistory will return all the chat messages suitable for returning as user-facing chat history.
- func GetChatHistory() []interface{} {
- tx, err := _datastore.DB.Begin()
- if err != nil {
- log.Errorln("error fetching chat history", err)
- return nil
- }
- defer tx.Rollback() // nolint
- // Get all visible messages
- query := "SELECT messages.id, messages.user_id, messages.body, messages.title, messages.subtitle, messages.image, messages.link, messages.eventType, messages.hidden_at, messages.timestamp, users.display_name, users.display_color, users.created_at, users.disabled_at, users.previous_names, users.namechanged_at, users.authenticated_at, users.scopes, users.type FROM users JOIN messages ON users.id = messages.user_id WHERE hidden_at IS NULL AND disabled_at IS NULL ORDER BY timestamp DESC LIMIT ?"
- stmt, err := tx.Prepare(query)
- if err != nil {
- log.Errorln("error fetching chat history", err)
- return nil
- }
- rows, err := stmt.Query(maxBacklogNumber)
- if err != nil {
- log.Errorln("error fetching chat history", err)
- return nil
- }
- defer stmt.Close()
- defer rows.Close()
- m, err := getChat(rows)
- if err != nil {
- log.Errorln(err)
- log.Errorln("There is a problem enumerating chat message rows. Please report this:", query)
- return nil
- }
- if err = tx.Commit(); err != nil {
- log.Errorln("error fetching chat history", err)
- return nil
- }
- // Invert order of messages
- for i, j := 0, len(m)-1; i < j; i, j = i+1, j-1 {
- m[i], m[j] = m[j], m[i]
- }
- return m
- }
- // GetMessagesFromUser returns chat messages that were sent by a specific user.
- func GetMessagesFromUser(userID string) ([]events.UserMessageEvent, error) {
- query, err := _datastore.GetQueries().GetMessagesFromUser(context.Background(), sql.NullString{String: userID, Valid: true})
- if err != nil {
- return nil, err
- }
- results := make([]events.UserMessageEvent, len(query))
- for i, row := range query {
- results[i] = events.UserMessageEvent{
- Event: events.Event{
- Timestamp: row.Timestamp.Time,
- ID: row.ID,
- },
- MessageEvent: events.MessageEvent{
- Body: row.Body.String,
- },
- }
- }
- return results, nil
- }
- // SetMessageVisibilityForUserID will bulk change the visibility of messages for a user
- // and then send out visibility changed events to chat clients.
- func SetMessageVisibilityForUserID(userID string, visible bool) error {
- defer func() {
- _historyCache = nil
- }()
- tx, err := _datastore.DB.Begin()
- if err != nil {
- log.Errorln("error while setting message visibility", err)
- return nil
- }
- defer tx.Rollback() // nolint
- query := "SELECT messages.id, user_id, body, title, subtitle, image, link, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, authenticated_at, scopes, type FROM messages INNER JOIN users ON messages.user_id = users.id WHERE user_id IS ?"
- stmt, err := tx.Prepare(query)
- if err != nil {
- log.Errorln("error while setting message visibility", err)
- return nil
- }
- rows, err := stmt.Query(userID)
- if err != nil {
- log.Errorln("error while setting message visibility", err)
- return nil
- }
- defer stmt.Close()
- defer rows.Close()
- // Get a list of IDs to send to the connected clients to hide
- ids := make([]string, 0)
- messages, err := getChat(rows)
- if err != nil {
- log.Errorln(err)
- log.Errorln("There is a problem enumerating chat message rows. Please report this:", query)
- return nil
- }
- if len(messages) == 0 {
- return nil
- }
- for _, message := range messages {
- ids = append(ids, message.(events.UserMessageEvent).ID)
- }
- if err = tx.Commit(); err != nil {
- log.Errorln("error while setting message visibility ", err)
- return nil
- }
- // Tell the clients to hide/show these messages.
- return SetMessagesVisibility(ids, visible)
- }
- func saveMessageVisibility(messageIDs []string, visible bool) error {
- defer func() {
- _historyCache = nil
- }()
- _datastore.DbLock.Lock()
- defer _datastore.DbLock.Unlock()
- tx, err := _datastore.DB.Begin()
- if err != nil {
- return err
- }
- // nolint:gosec
- stmt, err := tx.Prepare("UPDATE messages SET hidden_at=? WHERE id IN (?" + strings.Repeat(",?", len(messageIDs)-1) + ")")
- if err != nil {
- return err
- }
- defer stmt.Close()
- var hiddenAt *time.Time
- if !visible {
- now := time.Now()
- hiddenAt = &now
- } else {
- hiddenAt = nil
- }
- args := make([]interface{}, len(messageIDs)+1)
- args[0] = hiddenAt
- for i, id := range messageIDs {
- args[i+1] = id
- }
- if _, err = stmt.Exec(args...); err != nil {
- return err
- }
- if err = tx.Commit(); err != nil {
- return err
- }
- return nil
- }
|