chatclient.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. package chat
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "net/http"
  6. "sync"
  7. "time"
  8. log "github.com/sirupsen/logrus"
  9. "golang.org/x/time/rate"
  10. "github.com/gorilla/websocket"
  11. "github.com/owncast/owncast/models"
  12. "github.com/owncast/owncast/services/config"
  13. "github.com/owncast/owncast/services/geoip"
  14. )
  15. // Client represents a single chat client.
  16. type Client struct {
  17. ConnectedAt time.Time `json:"connectedAt"`
  18. timeoutTimer *time.Timer
  19. rateLimiter *rate.Limiter
  20. conn *websocket.Conn
  21. User *models.User `json:"user"`
  22. server *Server
  23. Geo *geoip.GeoDetails `json:"geo"`
  24. // Buffered channel of outbound messages.
  25. send chan []byte
  26. accessToken string
  27. IPAddress string `json:"-"`
  28. UserAgent string `json:"userAgent"`
  29. MessageCount int `json:"messageCount"`
  30. Id uint `json:"-"`
  31. mu sync.RWMutex
  32. inTimeout bool
  33. }
  34. type chatClientEvent struct {
  35. client *Client
  36. data []byte
  37. }
  38. const (
  39. // Time allowed to write a message to the peer.
  40. writeWait = 10 * time.Second
  41. // Time allowed to read the next pong message from the peer.
  42. pongWait = 60 * time.Second
  43. // Send pings to peer with this period. Must be less than pongWait.
  44. pingPeriod = (pongWait * 9) / 10
  45. // Maximum message size allowed from peer.
  46. // Larger messages get thrown away.
  47. // Messages > *2 the socket gets closed.
  48. maxMessageSize = config.MaxSocketPayloadSize
  49. )
  50. var upgrader = websocket.Upgrader{
  51. ReadBufferSize: 1024,
  52. WriteBufferSize: 1024,
  53. // Override default origin check to allow all clients, even those that
  54. // do not match our server.
  55. CheckOrigin: func(r *http.Request) bool {
  56. return true
  57. },
  58. }
  59. var (
  60. newline = []byte{'\n'}
  61. space = []byte{' '}
  62. )
  63. func (c *Client) sendConnectedClientInfo() {
  64. payload := models.ConnectedClientInfo{
  65. Event: models.Event{
  66. Type: models.ConnectedUserInfo,
  67. },
  68. User: c.User,
  69. }
  70. payload.SetDefaults()
  71. c.sendPayload(payload)
  72. }
  73. func (c *Client) readPump() {
  74. // Allow 3 messages every two seconds.
  75. limit := rate.Every(2 * time.Second / 3)
  76. c.rateLimiter = rate.NewLimiter(limit, 1)
  77. defer func() {
  78. c.close()
  79. }()
  80. // If somebody is sending 2x the max message size they're likely a bad actor
  81. // and should be disconnected. Below we throw away messages > max size.
  82. c.conn.SetReadLimit(maxMessageSize * 2)
  83. _ = c.conn.SetReadDeadline(time.Now().Add(pongWait))
  84. c.conn.SetPongHandler(func(string) error { _ = c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
  85. for {
  86. _, message, err := c.conn.ReadMessage()
  87. if err != nil {
  88. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  89. c.close()
  90. }
  91. break
  92. }
  93. // Throw away messages greater than max message size.
  94. if len(message) > maxMessageSize {
  95. c.sendAction("Sorry, that message exceeded the maximum size and can't be delivered.")
  96. continue
  97. }
  98. // Check if this client is temporarily blocked from sending messages.
  99. if c.inTimeout {
  100. continue
  101. }
  102. // Guard against floods.
  103. if !c.passesRateLimit() {
  104. log.Warnln("Client", c.Id, c.User.DisplayName, "has exceeded the messaging rate limiting thresholds and messages are being rejected temporarily.")
  105. c.startChatRejectionTimeout()
  106. continue
  107. }
  108. message = bytes.TrimSpace(bytes.ReplaceAll(message, newline, space))
  109. c.handleEvent(message)
  110. }
  111. }
  112. func (c *Client) writePump() {
  113. ticker := time.NewTicker(pingPeriod)
  114. defer func() {
  115. ticker.Stop()
  116. _ = c.conn.Close()
  117. }()
  118. for {
  119. select {
  120. case message, ok := <-c.send:
  121. _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
  122. if !ok {
  123. // The server closed the channel.
  124. _ = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
  125. return
  126. }
  127. w, err := c.conn.NextWriter(websocket.TextMessage)
  128. if err != nil {
  129. return
  130. }
  131. if _, err := w.Write(message); err != nil {
  132. log.Debugln(err)
  133. }
  134. // Optimization: Send multiple events in a single websocket message.
  135. // Add queued chat messages to the current websocket message.
  136. c.mu.RLock()
  137. n := len(c.send)
  138. for i := 0; i < n; i++ {
  139. _, _ = w.Write(newline)
  140. _, _ = w.Write(<-c.send)
  141. }
  142. c.mu.RUnlock()
  143. if err := w.Close(); err != nil {
  144. return
  145. }
  146. case <-ticker.C:
  147. _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
  148. if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
  149. return
  150. }
  151. }
  152. }
  153. }
  154. func (c *Client) handleEvent(data []byte) {
  155. c.server.inbound <- chatClientEvent{data: data, client: c}
  156. }
  157. func (c *Client) close() {
  158. log.Traceln("client closed:", c.User.DisplayName, c.Id, c.IPAddress)
  159. c.mu.Lock()
  160. defer c.mu.Unlock()
  161. if c.send != nil {
  162. _ = c.conn.Close()
  163. c.server.unregister <- c.Id
  164. close(c.send)
  165. c.send = nil
  166. }
  167. }
  168. func (c *Client) passesRateLimit() bool {
  169. return c.rateLimiter.Allow() && !c.inTimeout
  170. }
  171. func (c *Client) startChatRejectionTimeout() {
  172. if c.timeoutTimer != nil {
  173. return
  174. }
  175. c.inTimeout = true
  176. c.timeoutTimer = time.NewTimer(10 * time.Second)
  177. go func(c *Client) {
  178. for range c.timeoutTimer.C {
  179. c.inTimeout = false
  180. c.timeoutTimer = nil
  181. }
  182. }(c)
  183. c.sendAction("You are temporarily blocked from sending chat messages due to perceived flooding.")
  184. }
  185. func (c *Client) sendPayload(payload interface{}) {
  186. var data []byte
  187. data, err := json.Marshal(payload)
  188. if err != nil {
  189. log.Errorln(err)
  190. return
  191. }
  192. c.mu.RLock()
  193. defer c.mu.RUnlock()
  194. if c.send != nil {
  195. c.send <- data
  196. }
  197. }
  198. func (c *Client) sendAction(message string) {
  199. clientMessage := ActionEvent{
  200. MessageEvent: MessageEvent{
  201. Body: message,
  202. },
  203. }
  204. clientMessage.SetDefaults()
  205. clientMessage.RenderBody()
  206. c.sendPayload(clientMessage.GetBroadcastPayload())
  207. }