server.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. // Copyright 2020 gorse Project Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "context"
  17. "encoding/json"
  18. "fmt"
  19. "math/rand"
  20. "time"
  21. "github.com/emicklei/go-restful/v3"
  22. "github.com/juju/errors"
  23. "github.com/zhenghaoz/gorse/base"
  24. "github.com/zhenghaoz/gorse/base/log"
  25. "github.com/zhenghaoz/gorse/cmd/version"
  26. "github.com/zhenghaoz/gorse/config"
  27. "github.com/zhenghaoz/gorse/protocol"
  28. "github.com/zhenghaoz/gorse/storage/cache"
  29. "github.com/zhenghaoz/gorse/storage/data"
  30. "go.opentelemetry.io/otel"
  31. "go.opentelemetry.io/otel/propagation"
  32. "go.uber.org/zap"
  33. "google.golang.org/grpc"
  34. "google.golang.org/grpc/credentials/insecure"
  35. )
  36. // Server manages states of a server node.
  37. type Server struct {
  38. RestServer
  39. traceConfig config.TracingConfig
  40. cachePath string
  41. cachePrefix string
  42. dataPath string
  43. dataPrefix string
  44. masterClient protocol.MasterClient
  45. serverName string
  46. masterHost string
  47. masterPort int
  48. testMode bool
  49. cacheFile string
  50. }
  51. // NewServer creates a server node.
  52. func NewServer(masterHost string, masterPort int, serverHost string, serverPort int, cacheFile string) *Server {
  53. s := &Server{
  54. masterHost: masterHost,
  55. masterPort: masterPort,
  56. cacheFile: cacheFile,
  57. RestServer: RestServer{
  58. Settings: config.NewSettings(),
  59. HttpHost: serverHost,
  60. HttpPort: serverPort,
  61. WebService: new(restful.WebService),
  62. },
  63. }
  64. return s
  65. }
  66. // Serve starts a server node.
  67. func (s *Server) Serve() {
  68. rand.Seed(time.Now().UTC().UnixNano())
  69. // open local store
  70. state, err := LoadLocalCache(s.cacheFile)
  71. if err != nil {
  72. if errors.Is(err, errors.NotFound) {
  73. log.Logger().Info("no cache file found, create a new one", zap.String("path", s.cacheFile))
  74. } else {
  75. log.Logger().Error("failed to connect local store", zap.Error(err),
  76. zap.String("path", s.cacheFile))
  77. }
  78. }
  79. if state.ServerName == "" {
  80. state.ServerName = base.GetRandomName(0)
  81. err = state.WriteLocalCache()
  82. if err != nil {
  83. log.Logger().Fatal("failed to write meta", zap.Error(err))
  84. }
  85. }
  86. s.serverName = state.ServerName
  87. log.Logger().Info("start server",
  88. zap.String("server_name", s.serverName),
  89. zap.String("server_host", s.HttpHost),
  90. zap.Int("server_port", s.HttpPort),
  91. zap.String("master_host", s.masterHost),
  92. zap.Int("master_port", s.masterPort))
  93. // connect to master
  94. conn, err := grpc.Dial(fmt.Sprintf("%v:%v", s.masterHost, s.masterPort), grpc.WithTransportCredentials(insecure.NewCredentials()))
  95. if err != nil {
  96. log.Logger().Fatal("failed to connect master", zap.Error(err))
  97. }
  98. s.masterClient = protocol.NewMasterClient(conn)
  99. go s.Sync()
  100. container := restful.NewContainer()
  101. s.StartHttpServer(container)
  102. }
  103. func (s *Server) Shutdown() {
  104. err := s.HttpServer.Shutdown(context.TODO())
  105. if err != nil {
  106. log.Logger().Fatal("failed to shutdown http server", zap.Error(err))
  107. }
  108. }
  109. // Sync this server to the master.
  110. func (s *Server) Sync() {
  111. defer base.CheckPanic()
  112. log.Logger().Info("start meta sync", zap.Duration("meta_timeout", s.Config.Master.MetaTimeout))
  113. for {
  114. var meta *protocol.Meta
  115. var err error
  116. if meta, err = s.masterClient.GetMeta(context.Background(),
  117. &protocol.NodeInfo{
  118. NodeType: protocol.NodeType_ServerNode,
  119. NodeName: s.serverName,
  120. HttpPort: int64(s.HttpPort),
  121. BinaryVersion: version.Version,
  122. }); err != nil {
  123. log.Logger().Error("failed to get meta", zap.Error(err))
  124. goto sleep
  125. }
  126. // load master config
  127. err = json.Unmarshal([]byte(meta.Config), &s.Config)
  128. if err != nil {
  129. log.Logger().Error("failed to parse master config", zap.Error(err))
  130. goto sleep
  131. }
  132. // connect to data store
  133. if s.dataPath != s.Config.Database.DataStore || s.dataPrefix != s.Config.Database.DataTablePrefix {
  134. log.Logger().Info("connect data store",
  135. zap.String("database", log.RedactDBURL(s.Config.Database.DataStore)))
  136. if s.DataClient, err = data.Open(s.Config.Database.DataStore, s.Config.Database.DataTablePrefix); err != nil {
  137. log.Logger().Error("failed to connect data store", zap.Error(err))
  138. goto sleep
  139. }
  140. s.dataPath = s.Config.Database.DataStore
  141. s.dataPrefix = s.Config.Database.DataTablePrefix
  142. }
  143. // connect to cache store
  144. if s.cachePath != s.Config.Database.CacheStore || s.cachePrefix != s.Config.Database.CacheTablePrefix {
  145. log.Logger().Info("connect cache store",
  146. zap.String("database", log.RedactDBURL(s.Config.Database.CacheStore)))
  147. if s.CacheClient, err = cache.Open(s.Config.Database.CacheStore, s.Config.Database.CacheTablePrefix); err != nil {
  148. log.Logger().Error("failed to connect cache store", zap.Error(err))
  149. goto sleep
  150. }
  151. s.cachePath = s.Config.Database.CacheStore
  152. s.cachePrefix = s.Config.Database.CacheTablePrefix
  153. }
  154. // create trace provider
  155. if !s.traceConfig.Equal(s.Config.Tracing) {
  156. log.Logger().Info("create trace provider", zap.Any("tracing_config", s.Config.Tracing))
  157. tp, err := s.Config.Tracing.NewTracerProvider()
  158. if err != nil {
  159. log.Logger().Fatal("failed to create trace provider", zap.Error(err))
  160. }
  161. otel.SetTracerProvider(tp)
  162. otel.SetErrorHandler(log.GetErrorHandler())
  163. otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
  164. s.traceConfig = s.Config.Tracing
  165. }
  166. sleep:
  167. if s.testMode {
  168. return
  169. }
  170. time.Sleep(s.Config.Master.MetaTimeout)
  171. }
  172. }