client.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package mqtt
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "io"
  7. "net"
  8. "time"
  9. )
  10. const Timeout = time.Second * 5
  11. type Client struct {
  12. conn net.Conn
  13. mid uint16
  14. }
  15. func NewClient(conn net.Conn) *Client {
  16. return &Client{conn: conn, mid: 2}
  17. }
  18. func (c *Client) Connect(clientID, username, password string) (err error) {
  19. if err = c.conn.SetDeadline(time.Now().Add(Timeout)); err != nil {
  20. return
  21. }
  22. msg := NewConnect(clientID, username, password)
  23. if _, err = c.conn.Write(msg.b); err != nil {
  24. return
  25. }
  26. b := make([]byte, 4)
  27. if _, err = io.ReadFull(c.conn, b); err != nil {
  28. return
  29. }
  30. if !bytes.Equal(b, []byte{CONNACK, 2, 0, 0}) {
  31. return errors.New("wrong login")
  32. }
  33. return
  34. }
  35. func (c *Client) Subscribe(topic string) (err error) {
  36. if err = c.conn.SetDeadline(time.Now().Add(Timeout)); err != nil {
  37. return
  38. }
  39. c.mid++
  40. msg := NewSubscribe(c.mid, topic, 1)
  41. _, err = c.conn.Write(msg.b)
  42. return
  43. }
  44. func (c *Client) Publish(topic string, payload []byte) (err error) {
  45. if err = c.conn.SetDeadline(time.Now().Add(Timeout)); err != nil {
  46. return
  47. }
  48. c.mid++
  49. msg := NewPublishQOS1(c.mid, topic, payload)
  50. _, err = c.conn.Write(msg.b)
  51. return
  52. }
  53. func (c *Client) Read() (string, []byte, error) {
  54. if err := c.conn.SetDeadline(time.Now().Add(Timeout)); err != nil {
  55. return "", nil, err
  56. }
  57. b := make([]byte, 1)
  58. if _, err := io.ReadFull(c.conn, b); err != nil {
  59. return "", nil, err
  60. }
  61. size, err := ReadLen(c.conn)
  62. if err != nil {
  63. return "", nil, err
  64. }
  65. b0 := b[0]
  66. b = make([]byte, size)
  67. if _, err = io.ReadFull(c.conn, b); err != nil {
  68. return "", nil, err
  69. }
  70. if b0&0xF0 != PUBLISH {
  71. return "", nil, nil
  72. }
  73. i := binary.BigEndian.Uint16(b)
  74. if uint32(i) > size {
  75. return "", nil, errors.New("wrong topic size")
  76. }
  77. b = b[2:]
  78. if qos := (b0 >> 1) & 0b11; qos == 0 {
  79. return string(b[:i]), b[i:], nil
  80. }
  81. // response with packet ID
  82. _, _ = c.conn.Write([]byte{PUBACK, 2, b[i], b[i+1]})
  83. return string(b[2:i]), b[i+2:], nil
  84. }
  85. func (c *Client) Close() error {
  86. // TODO: Teardown
  87. return c.conn.Close()
  88. }