server.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. package chat
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/http"
  6. "sync"
  7. "time"
  8. log "github.com/sirupsen/logrus"
  9. "github.com/gorilla/websocket"
  10. "github.com/owncast/owncast/config"
  11. "github.com/owncast/owncast/core/chat/events"
  12. "github.com/owncast/owncast/core/data"
  13. "github.com/owncast/owncast/core/user"
  14. "github.com/owncast/owncast/core/webhooks"
  15. "github.com/owncast/owncast/geoip"
  16. "github.com/owncast/owncast/utils"
  17. )
  18. var _server *Server
  19. // a map of user IDs and when they last were active.
  20. var _lastSeenCache = map[string]time.Time{}
  21. // Server represents an instance of the chat server.
  22. type Server struct {
  23. mu sync.RWMutex
  24. seq uint
  25. clients map[uint]*Client
  26. maxSocketConnectionLimit int64
  27. // send outbound message payload to all clients
  28. outbound chan []byte
  29. // receive inbound message payload from all clients
  30. inbound chan chatClientEvent
  31. // unregister requests from clients.
  32. unregister chan uint // the ChatClient id
  33. geoipClient *geoip.Client
  34. }
  35. // NewChat will return a new instance of the chat server.
  36. func NewChat() *Server {
  37. maximumConcurrentConnectionLimit := getMaximumConcurrentConnectionLimit()
  38. setSystemConcurrentConnectionLimit(maximumConcurrentConnectionLimit)
  39. server := &Server{
  40. clients: map[uint]*Client{},
  41. outbound: make(chan []byte),
  42. inbound: make(chan chatClientEvent),
  43. unregister: make(chan uint),
  44. maxSocketConnectionLimit: maximumConcurrentConnectionLimit,
  45. geoipClient: geoip.NewClient(),
  46. }
  47. return server
  48. }
  49. // Run will start the chat server.
  50. func (s *Server) Run() {
  51. for {
  52. select {
  53. case clientID := <-s.unregister:
  54. if _, ok := s.clients[clientID]; ok {
  55. s.mu.Lock()
  56. delete(s.clients, clientID)
  57. s.mu.Unlock()
  58. }
  59. case message := <-s.inbound:
  60. s.eventReceived(message)
  61. }
  62. }
  63. }
  64. // Addclient registers new connection as a User.
  65. func (s *Server) Addclient(conn *websocket.Conn, user *user.User, accessToken string, userAgent string, ipAddress string) *Client {
  66. client := &Client{
  67. server: s,
  68. conn: conn,
  69. User: user,
  70. IPAddress: ipAddress,
  71. accessToken: accessToken,
  72. send: make(chan []byte, 256),
  73. UserAgent: userAgent,
  74. ConnectedAt: time.Now(),
  75. }
  76. // Do not send user re-joined broadcast message if they've been active within 5 minutes.
  77. shouldSendJoinedMessages := data.GetChatJoinMessagesEnabled()
  78. if previouslyLastSeen, ok := _lastSeenCache[user.ID]; ok && time.Since(previouslyLastSeen) < time.Minute*5 {
  79. shouldSendJoinedMessages = false
  80. }
  81. s.mu.Lock()
  82. {
  83. client.Id = s.seq
  84. s.clients[client.Id] = client
  85. s.seq++
  86. _lastSeenCache[user.ID] = time.Now()
  87. }
  88. s.mu.Unlock()
  89. log.Traceln("Adding client", client.Id, "total count:", len(s.clients))
  90. go client.writePump()
  91. go client.readPump()
  92. client.sendConnectedClientInfo()
  93. if getStatus().Online {
  94. if shouldSendJoinedMessages {
  95. s.sendUserJoinedMessage(client)
  96. }
  97. s.sendWelcomeMessageToClient(client)
  98. }
  99. // Asynchronously, optionally, fetch GeoIP data.
  100. go func(client *Client) {
  101. client.Geo = s.geoipClient.GetGeoFromIP(ipAddress)
  102. }(client)
  103. return client
  104. }
  105. func (s *Server) sendUserJoinedMessage(c *Client) {
  106. userJoinedEvent := events.UserJoinedEvent{}
  107. userJoinedEvent.SetDefaults()
  108. userJoinedEvent.User = c.User
  109. userJoinedEvent.ClientID = c.Id
  110. if err := s.Broadcast(userJoinedEvent.GetBroadcastPayload()); err != nil {
  111. log.Errorln("error adding client to chat server", err)
  112. }
  113. // Send chat user joined webhook
  114. webhooks.SendChatEventUserJoined(userJoinedEvent)
  115. }
  116. // ClientClosed is fired when a client disconnects or connection is dropped.
  117. func (s *Server) ClientClosed(c *Client) {
  118. s.mu.Lock()
  119. defer s.mu.Unlock()
  120. c.close()
  121. if _, ok := s.clients[c.Id]; ok {
  122. log.Debugln("Deleting", c.Id)
  123. delete(s.clients, c.Id)
  124. }
  125. }
  126. // HandleClientConnection is fired when a single client connects to the websocket.
  127. func (s *Server) HandleClientConnection(w http.ResponseWriter, r *http.Request) {
  128. if data.GetChatDisabled() {
  129. _, _ = w.Write([]byte(events.ChatDisabled))
  130. return
  131. }
  132. ipAddress := utils.GetIPAddressFromRequest(r)
  133. // Check if this client's IP address is banned. If so send a rejection.
  134. if blocked, err := data.IsIPAddressBanned(ipAddress); blocked {
  135. log.Debugln("Client ip address has been blocked. Rejecting.")
  136. w.WriteHeader(http.StatusForbidden)
  137. return
  138. } else if err != nil {
  139. log.Errorln("error determining if IP address is blocked: ", err)
  140. }
  141. // Limit concurrent chat connections
  142. if int64(len(s.clients)) >= s.maxSocketConnectionLimit {
  143. log.Warnln("rejecting incoming client connection as it exceeds the max client count of", s.maxSocketConnectionLimit)
  144. _, _ = w.Write([]byte(events.ErrorMaxConnectionsExceeded))
  145. return
  146. }
  147. // To allow dev web environments to connect.
  148. upgrader.CheckOrigin = func(r *http.Request) bool {
  149. return true
  150. }
  151. conn, err := upgrader.Upgrade(w, r, nil)
  152. if err != nil {
  153. log.Debugln(err)
  154. return
  155. }
  156. accessToken := r.URL.Query().Get("accessToken")
  157. if accessToken == "" {
  158. log.Errorln("Access token is required")
  159. // Return HTTP status code
  160. _ = conn.Close()
  161. return
  162. }
  163. // A user is required to use the websocket
  164. user := user.GetUserByToken(accessToken)
  165. if user == nil {
  166. // Send error that registration is required
  167. _ = conn.WriteJSON(events.EventPayload{
  168. "type": events.ErrorNeedsRegistration,
  169. })
  170. _ = conn.Close()
  171. return
  172. }
  173. // User is disabled therefore we should disconnect.
  174. if user.DisabledAt != nil {
  175. log.Traceln("Disabled user", user.ID, user.DisplayName, "rejected")
  176. _ = conn.WriteJSON(events.EventPayload{
  177. "type": events.ErrorUserDisabled,
  178. })
  179. _ = conn.Close()
  180. return
  181. }
  182. userAgent := r.UserAgent()
  183. s.Addclient(conn, user, accessToken, userAgent, ipAddress)
  184. }
  185. // Broadcast sends message to all connected clients.
  186. func (s *Server) Broadcast(payload events.EventPayload) error {
  187. data, err := json.Marshal(payload)
  188. if err != nil {
  189. return err
  190. }
  191. s.mu.RLock()
  192. defer s.mu.RUnlock()
  193. for _, client := range s.clients {
  194. if client == nil {
  195. continue
  196. }
  197. select {
  198. case client.send <- data:
  199. default:
  200. go client.close()
  201. }
  202. }
  203. return nil
  204. }
  205. // Send will send a single payload to a single connected client.
  206. func (s *Server) Send(payload events.EventPayload, client *Client) {
  207. data, err := json.Marshal(payload)
  208. if err != nil {
  209. log.Errorln(err)
  210. return
  211. }
  212. client.send <- data
  213. }
  214. // DisconnectClients will forcefully disconnect all clients belonging to a user by ID.
  215. func (s *Server) DisconnectClients(clients []*Client) {
  216. for _, client := range clients {
  217. log.Traceln("Disconnecting client", client.User.ID, "owned by", client.User.DisplayName)
  218. go func(client *Client) {
  219. event := events.UserDisabledEvent{}
  220. event.SetDefaults()
  221. // Send this disabled event specifically to this single connected client
  222. // to let them know they've been banned.
  223. _server.Send(event.GetBroadcastPayload(), client)
  224. // Give the socket time to send out the above message.
  225. // Unfortunately I don't know of any way to get a real callback to know when
  226. // the message was successfully sent, so give it a couple seconds.
  227. time.Sleep(2 * time.Second)
  228. // Forcefully disconnect if still valid.
  229. if client != nil {
  230. client.close()
  231. }
  232. }(client)
  233. }
  234. }
  235. // SendConnectedClientInfoToUser will find all the connected clients assigned to a user
  236. // and re-send each the connected client info.
  237. func SendConnectedClientInfoToUser(userID string) error {
  238. clients, err := GetClientsForUser(userID)
  239. if err != nil {
  240. return err
  241. }
  242. // Get an updated reference to the user.
  243. user := user.GetUserByID(userID)
  244. if user == nil {
  245. return fmt.Errorf("user not found")
  246. }
  247. if err != nil {
  248. return err
  249. }
  250. for _, client := range clients {
  251. // Update the client's reference to its user.
  252. client.User = user
  253. // Send the update to the client.
  254. client.sendConnectedClientInfo()
  255. }
  256. return nil
  257. }
  258. // SendActionToUser will send system action text to all connected clients
  259. // assigned to a user ID.
  260. func SendActionToUser(userID string, text string) error {
  261. clients, err := GetClientsForUser(userID)
  262. if err != nil {
  263. return err
  264. }
  265. for _, client := range clients {
  266. _server.sendActionToClient(client, text)
  267. }
  268. return nil
  269. }
  270. func (s *Server) eventReceived(event chatClientEvent) {
  271. c := event.client
  272. u := c.User
  273. // If established chat user only mode is enabled and the user is not old
  274. // enough then reject this event and send them an informative message.
  275. if u != nil && data.GetChatEstbalishedUsersOnlyMode() && time.Since(event.client.User.CreatedAt) < config.GetDefaults().ChatEstablishedUserModeTimeDuration && !u.IsModerator() {
  276. s.sendActionToClient(c, "You have not been an established chat participant long enough to take part in chat. Please enjoy the stream and try again later.")
  277. return
  278. }
  279. var typecheck map[string]interface{}
  280. if err := json.Unmarshal(event.data, &typecheck); err != nil {
  281. log.Debugln(err)
  282. }
  283. eventType := typecheck["type"]
  284. switch eventType {
  285. case events.MessageSent:
  286. s.userMessageSent(event)
  287. case events.UserNameChanged:
  288. s.userNameChanged(event)
  289. case events.UserColorChanged:
  290. s.userColorChanged(event)
  291. default:
  292. log.Debugln(logSanitize(fmt.Sprint(eventType)), "event not found:", logSanitize(fmt.Sprint(typecheck)))
  293. }
  294. }
  295. func (s *Server) sendWelcomeMessageToClient(c *Client) {
  296. // Add an artificial delay so people notice this message come in.
  297. time.Sleep(7 * time.Second)
  298. welcomeMessage := utils.RenderSimpleMarkdown(data.GetServerWelcomeMessage())
  299. if welcomeMessage != "" {
  300. s.sendSystemMessageToClient(c, welcomeMessage)
  301. }
  302. }
  303. func (s *Server) sendAllWelcomeMessage() {
  304. welcomeMessage := utils.RenderSimpleMarkdown(data.GetServerWelcomeMessage())
  305. if welcomeMessage != "" {
  306. clientMessage := events.SystemMessageEvent{
  307. Event: events.Event{},
  308. MessageEvent: events.MessageEvent{
  309. Body: welcomeMessage,
  310. },
  311. }
  312. clientMessage.SetDefaults()
  313. _ = s.Broadcast(clientMessage.GetBroadcastPayload())
  314. }
  315. }
  316. func (s *Server) sendSystemMessageToClient(c *Client, message string) {
  317. clientMessage := events.SystemMessageEvent{
  318. Event: events.Event{},
  319. MessageEvent: events.MessageEvent{
  320. Body: message,
  321. },
  322. }
  323. clientMessage.SetDefaults()
  324. clientMessage.RenderBody()
  325. s.Send(clientMessage.GetBroadcastPayload(), c)
  326. }
  327. func (s *Server) sendActionToClient(c *Client, message string) {
  328. clientMessage := events.ActionEvent{
  329. MessageEvent: events.MessageEvent{
  330. Body: message,
  331. },
  332. Event: events.Event{
  333. Type: events.ChatActionSent,
  334. },
  335. }
  336. clientMessage.SetDefaults()
  337. clientMessage.RenderBody()
  338. s.Send(clientMessage.GetBroadcastPayload(), c)
  339. }