source.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package yomo
  2. import (
  3. "context"
  4. "github.com/yomorun/yomo/core"
  5. "github.com/yomorun/yomo/core/frame"
  6. "github.com/yomorun/yomo/pkg/id"
  7. )
  8. // Source is responsible for sending data to yomo.
  9. type Source interface {
  10. // Close will close the connection to YoMo-Zipper.
  11. Close() error
  12. // Connect to YoMo-Zipper.
  13. Connect() error
  14. // Write the data to directed downstream.
  15. Write(tag uint32, data []byte) error
  16. // WriteWithTarget writes data to sfn instance with specified target.
  17. WriteWithTarget(tag uint32, data []byte, target string) error
  18. // SetErrorHandler set the error handler function when server error occurs
  19. SetErrorHandler(fn func(err error))
  20. }
  21. // YoMo-Source
  22. type yomoSource struct {
  23. name string
  24. zipperAddr string
  25. client *core.Client
  26. }
  27. var _ Source = &yomoSource{}
  28. // NewSource create a yomo-source
  29. func NewSource(name, zipperAddr string, opts ...SourceOption) Source {
  30. clientOpts := make([]core.ClientOption, len(opts))
  31. for k, v := range opts {
  32. clientOpts[k] = core.ClientOption(v)
  33. }
  34. client := core.NewClient(name, zipperAddr, core.ClientTypeSource, clientOpts...)
  35. client.Logger = client.Logger.With(
  36. "component", core.ClientTypeSource.String(),
  37. "source_id", client.ClientID(),
  38. "source_name", client.Name(),
  39. "zipper_addr", zipperAddr,
  40. )
  41. return &yomoSource{
  42. name: name,
  43. zipperAddr: zipperAddr,
  44. client: client,
  45. }
  46. }
  47. // Close will close the connection to YoMo-Zipper.
  48. func (s *yomoSource) Close() error {
  49. _ = s.client.Close()
  50. s.client.Logger.Debug("the source is closed")
  51. return nil
  52. }
  53. // Connect to YoMo-Zipper.
  54. func (s *yomoSource) Connect() error {
  55. return s.client.Connect(context.Background())
  56. }
  57. // Write writes data with specified tag.
  58. func (s *yomoSource) Write(tag uint32, data []byte) error {
  59. if err := frame.IsReservedTag(tag); err != nil {
  60. return err
  61. }
  62. md := core.NewMetadata(s.client.ClientID(), id.New())
  63. mdBytes, err := md.Encode()
  64. // metadata
  65. if err != nil {
  66. return err
  67. }
  68. f := &frame.DataFrame{
  69. Tag: tag,
  70. Metadata: mdBytes,
  71. Payload: data,
  72. }
  73. s.client.Logger.Debug("source write", "tag", tag, "dataLen", len(data))
  74. return s.client.WriteFrame(f)
  75. }
  76. // WritePayload writes `yomo.Payload` with specified tag.
  77. func (s *yomoSource) WriteWithTarget(tag uint32, data []byte, target string) error {
  78. if err := frame.IsReservedTag(tag); err != nil {
  79. return err
  80. }
  81. md := core.NewMetadata(s.client.ClientID(), id.New())
  82. if target != "" {
  83. core.SetMetadataTarget(md, target)
  84. }
  85. mdBytes, err := md.Encode()
  86. if err != nil {
  87. return err
  88. }
  89. f := &frame.DataFrame{
  90. Tag: tag,
  91. Metadata: mdBytes,
  92. Payload: data,
  93. }
  94. s.client.Logger.Debug("source write with target", "tag", tag, "data", data, "target", target)
  95. return s.client.WriteFrame(f)
  96. }
  97. // SetErrorHandler set the error handler function when server error occurs
  98. func (s *yomoSource) SetErrorHandler(fn func(err error)) {
  99. s.client.SetErrorHandler(fn)
  100. }