hub.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "net/http"
  7. "strconv"
  8. "time"
  9. "github.com/gorilla/websocket"
  10. )
  11. type Hub struct {
  12. clients map[*Client]bool // Registered clients.
  13. broadcast chan []byte // Inbound messages from the clients.
  14. register chan *Client // Register requests from the clients.
  15. unregister chan *Client // Unregister requests from clients.
  16. }
  17. func newHub() *Hub {
  18. return &Hub{
  19. broadcast: make(chan []byte, 10),
  20. register: make(chan *Client),
  21. unregister: make(chan *Client),
  22. clients: make(map[*Client]bool),
  23. }
  24. }
  25. func (h *Hub) _startTranslate(ctx context.Context) {
  26. h.broadcast <- []byte("welcome")
  27. if minicapSocketPath == "@minicap" {
  28. service.Start("minicap")
  29. }
  30. log.Printf("Receive images from %s", minicapSocketPath)
  31. retries := 0
  32. for {
  33. if retries > 10 {
  34. log.Printf("unix %s connect failed", minicapSocketPath)
  35. h.broadcast <- []byte("@minicapagent listen timeout")
  36. break
  37. }
  38. conn, err := net.Dial("unix", minicapSocketPath)
  39. if err != nil {
  40. retries++
  41. log.Printf("dial %s err: %v, wait 0.5s", minicapSocketPath, err)
  42. select {
  43. case <-ctx.Done():
  44. return
  45. case <-time.After(500 * time.Millisecond):
  46. }
  47. continue
  48. }
  49. retries = 0 // connected, reset retries
  50. if er := translateMinicap(conn, h.broadcast, ctx); er == nil {
  51. conn.Close()
  52. log.Println("transfer closed")
  53. break
  54. } else {
  55. conn.Close()
  56. log.Println("translateMinicap error:", er) //scrcpy read error, try to read again")
  57. }
  58. }
  59. }
  60. func (h *Hub) run() {
  61. var cancel context.CancelFunc
  62. var ctx context.Context
  63. for {
  64. select {
  65. case client := <-h.register:
  66. h.clients[client] = true
  67. log.Println("new broadcast client")
  68. h.broadcast <- []byte("rotation " + strconv.Itoa(deviceRotation))
  69. if len(h.clients) == 1 {
  70. ctx, cancel = context.WithCancel(context.Background())
  71. go h._startTranslate(ctx)
  72. }
  73. case client := <-h.unregister:
  74. if _, ok := h.clients[client]; ok {
  75. delete(h.clients, client)
  76. close(client.send)
  77. }
  78. if len(h.clients) == 0 {
  79. log.Println("All client quited, context stop minicap service")
  80. cancel()
  81. }
  82. case message := <-h.broadcast:
  83. for client := range h.clients {
  84. select {
  85. case client.send <- message:
  86. default:
  87. close(client.send)
  88. delete(h.clients, client)
  89. }
  90. }
  91. }
  92. }
  93. }
  94. // Client is a middleman between the websocket connection and the hub.
  95. type Client struct {
  96. hub *Hub
  97. conn *websocket.Conn // The websocket connection.
  98. send chan []byte // Buffered channel of outbound messages.
  99. }
  100. // writePump pumps messages from the hub to the websocket connection.
  101. //
  102. // A goroutine running writePump is started for each connection. The
  103. // application ensures that there is at most one writer to a connection by
  104. // executing all writes from this goroutine.
  105. func (c *Client) writePump() {
  106. ticker := time.NewTicker(time.Second * 10)
  107. defer func() {
  108. ticker.Stop()
  109. c.conn.Close()
  110. }()
  111. for {
  112. var err error
  113. select {
  114. case data, ok := <-c.send:
  115. c.conn.SetWriteDeadline(time.Now().Add(time.Second * 10))
  116. if !ok {
  117. // The hub closed the channel.
  118. c.conn.WriteMessage(websocket.CloseMessage, []byte{})
  119. return
  120. }
  121. if string(data[:2]) == "\xff\xd8" || string(data[:4]) == "\x89PNG" { // jpg or png data
  122. err = c.conn.WriteMessage(websocket.BinaryMessage, data)
  123. } else {
  124. err = c.conn.WriteMessage(websocket.TextMessage, data)
  125. }
  126. case <-ticker.C:
  127. // err = c.conn.WriteMessage(websocket.PingMessage, nil)
  128. }
  129. if err != nil {
  130. log.Println(err)
  131. break
  132. }
  133. }
  134. }
  135. // readPump pumps messages from the websocket connection to the hub.
  136. //
  137. // The application runs readPump in a per-connection goroutine. The application
  138. // ensures that there is at most one reader on a connection by executing all
  139. // reads from this goroutine.
  140. func (c *Client) readPump() {
  141. defer func() {
  142. c.hub.unregister <- c
  143. c.conn.Close()
  144. }()
  145. // c.conn.SetReadLimit(maxMessageSize)
  146. // c.conn.SetReadDeadline(time.Now().Add(pongWait))
  147. // c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
  148. for {
  149. _, message, err := c.conn.ReadMessage()
  150. if err != nil {
  151. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  152. log.Printf("error: %v", err)
  153. }
  154. break
  155. }
  156. log.Println("websocket recv message", string(message))
  157. // message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
  158. // c.hub.broadcast <- message
  159. }
  160. }
  161. func broadcastWebsocket() func(http.ResponseWriter, *http.Request) {
  162. hub := newHub()
  163. go hub.run() // start read images from unix:@minicap
  164. return func(w http.ResponseWriter, r *http.Request) {
  165. conn, err := upgrader.Upgrade(w, r, nil)
  166. if err != nil {
  167. log.Println(err)
  168. return
  169. }
  170. client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
  171. hub.register <- client
  172. done := make(chan bool)
  173. go client.writePump()
  174. go func() {
  175. client.readPump()
  176. done <- true
  177. }()
  178. go func() {
  179. ch := make(chan interface{})
  180. rotationPublisher.Register(ch)
  181. defer rotationPublisher.Unregister(ch)
  182. for {
  183. select {
  184. case <-done:
  185. return
  186. case r := <-ch:
  187. hub.broadcast <- []byte(fmt.Sprintf("rotation %v", r))
  188. }
  189. }
  190. }()
  191. }
  192. }