chatclient.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package chat
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "net/http"
  6. "sync"
  7. "time"
  8. log "github.com/sirupsen/logrus"
  9. "golang.org/x/time/rate"
  10. "github.com/gorilla/websocket"
  11. "github.com/owncast/owncast/config"
  12. "github.com/owncast/owncast/core/chat/events"
  13. "github.com/owncast/owncast/core/user"
  14. "github.com/owncast/owncast/geoip"
  15. )
  16. // Client represents a single chat client.
  17. type Client struct {
  18. mu sync.RWMutex
  19. Id uint `json:"-"`
  20. accessToken string
  21. conn *websocket.Conn
  22. User *user.User `json:"user"`
  23. server *Server
  24. IPAddress string `json:"-"`
  25. // Buffered channel of outbound messages.
  26. send chan []byte
  27. rateLimiter *rate.Limiter
  28. timeoutTimer *time.Timer
  29. inTimeout bool
  30. Geo *geoip.GeoDetails `json:"geo"`
  31. MessageCount int `json:"messageCount"`
  32. UserAgent string `json:"userAgent"`
  33. ConnectedAt time.Time `json:"connectedAt"`
  34. }
  35. type chatClientEvent struct {
  36. data []byte
  37. client *Client
  38. }
  39. const (
  40. // Time allowed to write a message to the peer.
  41. writeWait = 10 * time.Second
  42. // Time allowed to read the next pong message from the peer.
  43. pongWait = 60 * time.Second
  44. // Send pings to peer with this period. Must be less than pongWait.
  45. pingPeriod = (pongWait * 9) / 10
  46. // Maximum message size allowed from peer.
  47. // Larger messages get thrown away.
  48. // Messages > *2 the socket gets closed.
  49. maxMessageSize = config.MaxSocketPayloadSize
  50. )
  51. var upgrader = websocket.Upgrader{
  52. ReadBufferSize: 1024,
  53. WriteBufferSize: 1024,
  54. // Override default origin check to allow all clients, even those that
  55. // do not match our server.
  56. CheckOrigin: func(r *http.Request) bool {
  57. return true
  58. },
  59. }
  60. var (
  61. newline = []byte{'\n'}
  62. space = []byte{' '}
  63. )
  64. func (c *Client) sendConnectedClientInfo() {
  65. payload := events.ConnectedClientInfo{
  66. Event: events.Event{
  67. Type: events.ConnectedUserInfo,
  68. },
  69. User: c.User,
  70. }
  71. payload.SetDefaults()
  72. c.sendPayload(payload)
  73. }
  74. func (c *Client) readPump() {
  75. // Allow 3 messages every two seconds.
  76. limit := rate.Every(2 * time.Second / 3)
  77. c.rateLimiter = rate.NewLimiter(limit, 1)
  78. defer func() {
  79. c.close()
  80. }()
  81. // If somebody is sending 2x the max message size they're likely a bad actor
  82. // and should be disconnected. Below we throw away messages > max size.
  83. c.conn.SetReadLimit(maxMessageSize * 2)
  84. _ = c.conn.SetReadDeadline(time.Now().Add(pongWait))
  85. c.conn.SetPongHandler(func(string) error { _ = c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
  86. for {
  87. _, message, err := c.conn.ReadMessage()
  88. if err != nil {
  89. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  90. c.close()
  91. }
  92. break
  93. }
  94. // Throw away messages greater than max message size.
  95. if len(message) > maxMessageSize {
  96. c.sendAction("Sorry, that message exceeded the maximum size and can't be delivered.")
  97. continue
  98. }
  99. // Check if this client is temporarily blocked from sending messages.
  100. if c.inTimeout {
  101. continue
  102. }
  103. // Guard against floods.
  104. if !c.passesRateLimit() {
  105. log.Warnln("Client", c.Id, c.User.DisplayName, "has exceeded the messaging rate limiting thresholds and messages are being rejected temporarily.")
  106. c.startChatRejectionTimeout()
  107. continue
  108. }
  109. message = bytes.TrimSpace(bytes.ReplaceAll(message, newline, space))
  110. c.handleEvent(message)
  111. }
  112. }
  113. func (c *Client) writePump() {
  114. ticker := time.NewTicker(pingPeriod)
  115. defer func() {
  116. ticker.Stop()
  117. _ = c.conn.Close()
  118. }()
  119. for {
  120. select {
  121. case message, ok := <-c.send:
  122. _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
  123. if !ok {
  124. // The server closed the channel.
  125. _ = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
  126. return
  127. }
  128. w, err := c.conn.NextWriter(websocket.TextMessage)
  129. if err != nil {
  130. return
  131. }
  132. if _, err := w.Write(message); err != nil {
  133. log.Debugln(err)
  134. }
  135. // Optimization: Send multiple events in a single websocket message.
  136. // Add queued chat messages to the current websocket message.
  137. c.mu.RLock()
  138. n := len(c.send)
  139. for i := 0; i < n; i++ {
  140. _, _ = w.Write(newline)
  141. _, _ = w.Write(<-c.send)
  142. }
  143. c.mu.RUnlock()
  144. if err := w.Close(); err != nil {
  145. return
  146. }
  147. case <-ticker.C:
  148. _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
  149. if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
  150. return
  151. }
  152. }
  153. }
  154. }
  155. func (c *Client) handleEvent(data []byte) {
  156. c.server.inbound <- chatClientEvent{data: data, client: c}
  157. }
  158. func (c *Client) close() {
  159. log.Traceln("client closed:", c.User.DisplayName, c.Id, c.IPAddress)
  160. c.mu.Lock()
  161. defer c.mu.Unlock()
  162. if c.send != nil {
  163. _ = c.conn.Close()
  164. c.server.unregister <- c.Id
  165. close(c.send)
  166. c.send = nil
  167. }
  168. }
  169. func (c *Client) passesRateLimit() bool {
  170. return c.rateLimiter.Allow() && !c.inTimeout
  171. }
  172. func (c *Client) startChatRejectionTimeout() {
  173. if c.timeoutTimer != nil {
  174. return
  175. }
  176. c.inTimeout = true
  177. c.timeoutTimer = time.NewTimer(10 * time.Second)
  178. go func(c *Client) {
  179. for range c.timeoutTimer.C {
  180. c.inTimeout = false
  181. c.timeoutTimer = nil
  182. }
  183. }(c)
  184. c.sendAction("You are temporarily blocked from sending chat messages due to perceived flooding.")
  185. }
  186. func (c *Client) sendPayload(payload interface{}) {
  187. var data []byte
  188. data, err := json.Marshal(payload)
  189. if err != nil {
  190. log.Errorln(err)
  191. return
  192. }
  193. c.mu.RLock()
  194. defer c.mu.RUnlock()
  195. if c.send != nil {
  196. c.send <- data
  197. }
  198. }
  199. func (c *Client) sendAction(message string) {
  200. clientMessage := events.ActionEvent{
  201. MessageEvent: events.MessageEvent{
  202. Body: message,
  203. },
  204. }
  205. clientMessage.SetDefaults()
  206. clientMessage.RenderBody()
  207. c.sendPayload(clientMessage.GetBroadcastPayload())
  208. }