client_config.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package client
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "net/url"
  7. "regexp"
  8. "strings"
  9. "time"
  10. "github.com/cockroachdb/errors"
  11. "google.golang.org/grpc"
  12. "google.golang.org/grpc/backoff"
  13. "google.golang.org/grpc/keepalive"
  14. "github.com/milvus-io/milvus/pkg/util/crypto"
  15. )
  16. const (
  17. disableDatabase uint64 = 1 << iota
  18. disableJSON
  19. disableDynamicSchema
  20. disableParitionKey
  21. )
  22. var regexValidScheme = regexp.MustCompile(`^https?:\/\/`)
  23. // DefaultGrpcOpts is GRPC options for milvus client.
  24. var DefaultGrpcOpts = []grpc.DialOption{
  25. grpc.WithBlock(),
  26. grpc.WithKeepaliveParams(keepalive.ClientParameters{
  27. Time: 5 * time.Second,
  28. Timeout: 10 * time.Second,
  29. PermitWithoutStream: true,
  30. }),
  31. grpc.WithConnectParams(grpc.ConnectParams{
  32. Backoff: backoff.Config{
  33. BaseDelay: 100 * time.Millisecond,
  34. Multiplier: 1.6,
  35. Jitter: 0.2,
  36. MaxDelay: 3 * time.Second,
  37. },
  38. MinConnectTimeout: 3 * time.Second,
  39. }),
  40. grpc.WithDefaultCallOptions(
  41. grpc.MaxCallRecvMsgSize(math.MaxInt32), // math.MaxInt32 = 2147483647, 2GB - 1
  42. ),
  43. }
  44. // ClientConfig for milvus client.
  45. type ClientConfig struct {
  46. Address string // Remote address, "localhost:19530".
  47. Username string // Username for auth.
  48. Password string // Password for auth.
  49. DBName string // DBName for this client.
  50. EnableTLSAuth bool // Enable TLS Auth for transport security.
  51. APIKey string // API key
  52. DialOptions []grpc.DialOption // Dial options for GRPC.
  53. RetryRateLimit *RetryRateLimitOption // option for retry on rate limit inteceptor
  54. DisableConn bool
  55. metadataHeaders map[string]string
  56. identifier string // Identifier for this connection
  57. ServerVersion string // ServerVersion
  58. parsedAddress *url.URL
  59. flags uint64 // internal flags
  60. }
  61. type RetryRateLimitOption struct {
  62. MaxRetry uint
  63. MaxBackoff time.Duration
  64. }
  65. func (cfg *ClientConfig) parse() error {
  66. // Prepend default fake tcp:// scheme for remote address.
  67. address := cfg.Address
  68. if !regexValidScheme.MatchString(address) {
  69. address = fmt.Sprintf("tcp://%s", address)
  70. }
  71. remoteURL, err := url.Parse(address)
  72. if err != nil {
  73. return errors.Wrap(err, "milvus address parse fail")
  74. }
  75. // Remote Host should never be empty.
  76. if remoteURL.Host == "" {
  77. return errors.New("empty remote host of milvus address")
  78. }
  79. // Use DBName in remote url path.
  80. if cfg.DBName == "" {
  81. cfg.DBName = strings.TrimLeft(remoteURL.Path, "/")
  82. }
  83. // Always enable tls auth for https remote url.
  84. if remoteURL.Scheme == "https" {
  85. cfg.EnableTLSAuth = true
  86. }
  87. if remoteURL.Port() == "" && cfg.EnableTLSAuth {
  88. remoteURL.Host += ":443"
  89. }
  90. cfg.parsedAddress = remoteURL
  91. return nil
  92. }
  93. // Get parsed remote milvus address, should be called after parse was called.
  94. func (c *ClientConfig) getParsedAddress() string {
  95. return c.parsedAddress.Host
  96. }
  97. // useDatabase change the inner db name.
  98. func (c *ClientConfig) useDatabase(dbName string) {
  99. c.DBName = dbName
  100. }
  101. // useDatabase change the inner db name.
  102. func (c *ClientConfig) setIdentifier(identifier string) {
  103. c.identifier = identifier
  104. }
  105. func (c *ClientConfig) setServerInfo(serverInfo string) {
  106. c.ServerVersion = serverInfo
  107. }
  108. // parseAuthentication prepares authentication headers for grpc inteceptors based on the provided username, password or API key.
  109. func (c *ClientConfig) parseAuthentication() {
  110. c.metadataHeaders = make(map[string]string)
  111. if c.Username != "" || c.Password != "" {
  112. value := crypto.Base64Encode(fmt.Sprintf("%s:%s", c.Username, c.Password))
  113. c.metadataHeaders[authorizationHeader] = value
  114. }
  115. // API overwrites username & passwd
  116. if c.APIKey != "" {
  117. value := crypto.Base64Encode(c.APIKey)
  118. c.metadataHeaders[authorizationHeader] = value
  119. }
  120. }
  121. func (c *ClientConfig) getRetryOnRateLimitInterceptor() grpc.UnaryClientInterceptor {
  122. if c.RetryRateLimit == nil {
  123. c.RetryRateLimit = c.defaultRetryRateLimitOption()
  124. }
  125. return RetryOnRateLimitInterceptor(c.RetryRateLimit.MaxRetry, c.RetryRateLimit.MaxBackoff, func(ctx context.Context, attempt uint) time.Duration {
  126. return 10 * time.Millisecond * time.Duration(math.Pow(3, float64(attempt)))
  127. })
  128. }
  129. func (c *ClientConfig) defaultRetryRateLimitOption() *RetryRateLimitOption {
  130. return &RetryRateLimitOption{
  131. MaxRetry: 75,
  132. MaxBackoff: 3 * time.Second,
  133. }
  134. }
  135. // addFlags set internal flags
  136. func (c *ClientConfig) addFlags(flags uint64) {
  137. c.flags |= flags
  138. }
  139. // hasFlags check flags is set
  140. func (c *ClientConfig) hasFlags(flags uint64) bool {
  141. return (c.flags & flags) > 0
  142. }
  143. func (c *ClientConfig) resetFlags(flags uint64) {
  144. c.flags &= ^flags
  145. }