zipper.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package yomo
  2. import (
  3. "context"
  4. "fmt"
  5. "log/slog"
  6. "github.com/yomorun/yomo/core"
  7. "github.com/yomorun/yomo/core/frame"
  8. "github.com/yomorun/yomo/pkg/config"
  9. )
  10. // Zipper is the orchestrator of yomo. There are two types of zipper:
  11. // one is Upstream Zipper, which is used to connect to multiple downstream zippers,
  12. // another one is Downstream Zipper (will call it as Zipper directly), which is used
  13. // to connected by `Upstream Zipper`, `Source` and `Stream Function`.
  14. type Zipper interface {
  15. // Logger returns the logger of zipper.
  16. Logger() *slog.Logger
  17. // ListenAndServe start zipper as server.
  18. ListenAndServe(context.Context, string) error
  19. // Close will close the zipper.
  20. Close() error
  21. }
  22. // RunZipper run a zipper from a config file.
  23. func RunZipper(ctx context.Context, configPath string) error {
  24. conf, err := config.ParseConfigFile(configPath)
  25. if err != nil {
  26. return err
  27. }
  28. // listening address.
  29. listenAddr := fmt.Sprintf("%s:%d", conf.Host, conf.Port)
  30. options := []ZipperOption{}
  31. if _, ok := conf.Auth["type"]; ok {
  32. if tokenString, ok := conf.Auth["token"]; ok {
  33. options = append(options, WithAuth("token", tokenString))
  34. }
  35. }
  36. zipper, err := NewZipper(conf.Name, conf.Mesh, options...)
  37. if err != nil {
  38. return err
  39. }
  40. zipper.Logger().Info("using config file", "file_path", configPath)
  41. return zipper.ListenAndServe(ctx, listenAddr)
  42. }
  43. // NewZipper returns a zipper.
  44. func NewZipper(name string, meshConfig map[string]config.Mesh, options ...ZipperOption) (Zipper, error) {
  45. opts := &zipperOptions{}
  46. for _, o := range options {
  47. o(opts)
  48. }
  49. server := core.NewServer(name, opts.serverOption...)
  50. // add downstreams to server.
  51. for meshName, meshConf := range meshConfig {
  52. if meshName == "" || meshName == name {
  53. continue
  54. }
  55. addr := fmt.Sprintf("%s:%d", meshConf.Host, meshConf.Port)
  56. clientOptions := []core.ClientOption{
  57. core.WithCredential(meshConf.Credential),
  58. core.WithNonBlockWrite(),
  59. core.WithReConnect(),
  60. core.WithLogger(server.Logger().With("downstream_name", meshName, "downstream_addr", addr)),
  61. }
  62. clientOptions = append(clientOptions, opts.clientOption...)
  63. downstream := &downstream{
  64. localName: meshName,
  65. client: core.NewClient(name, addr, core.ClientTypeUpstreamZipper, clientOptions...),
  66. }
  67. server.Logger().Info("add downstream", "downstream_id", downstream.ID(), "downstream_name", downstream.LocalName(), "downstream_addr", addr)
  68. server.AddDownstreamServer(downstream)
  69. }
  70. // watch signal.
  71. go waitSignalForShutdownServer(server)
  72. return server, nil
  73. }
  74. func statsToLogger(server *core.Server) {
  75. logger := server.Logger()
  76. logger.Info(
  77. "stats",
  78. "zipper_name", server.Name(),
  79. "connector", server.StatsFunctions(),
  80. "downstreams", server.Downstreams(),
  81. "data_frame_received_num", server.StatsCounter(),
  82. )
  83. }
  84. type downstream struct {
  85. localName string
  86. client *core.Client
  87. }
  88. func (d *downstream) Close() error { return d.client.Close() }
  89. func (d *downstream) Connect(ctx context.Context) error { return d.client.Connect(ctx) }
  90. func (d *downstream) ID() string { return d.client.ClientID() }
  91. func (d *downstream) LocalName() string { return d.localName }
  92. func (d *downstream) RemoteName() string { return d.client.Name() }
  93. func (d *downstream) WriteFrame(f frame.Frame) error { return d.client.WriteFrame(f) }