index.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. // Licensed to the LF AI & Data foundation under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing, software
  12. // distributed under the License is distributed on an "AS IS" BASIS,
  13. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. // See the License for the specific language governing permissions and
  15. // limitations under the License.
  16. package client
  17. import (
  18. "context"
  19. "fmt"
  20. "time"
  21. "google.golang.org/grpc"
  22. "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
  23. "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
  24. "github.com/milvus-io/milvus/client/v2/entity"
  25. "github.com/milvus-io/milvus/client/v2/index"
  26. "github.com/milvus-io/milvus/pkg/util/merr"
  27. )
  28. type CreateIndexTask struct {
  29. client *Client
  30. collectionName string
  31. fieldName string
  32. indexName string
  33. interval time.Duration
  34. }
  35. func (t *CreateIndexTask) Await(ctx context.Context) error {
  36. timer := time.NewTimer(t.interval)
  37. defer timer.Stop()
  38. for {
  39. select {
  40. case <-timer.C:
  41. finished := false
  42. err := t.client.callService(func(milvusService milvuspb.MilvusServiceClient) error {
  43. resp, err := milvusService.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{
  44. CollectionName: t.collectionName,
  45. FieldName: t.fieldName,
  46. IndexName: t.indexName,
  47. })
  48. err = merr.CheckRPCCall(resp, err)
  49. if err != nil {
  50. return err
  51. }
  52. for _, info := range resp.GetIndexDescriptions() {
  53. if (t.indexName == "" && info.GetFieldName() == t.fieldName) || t.indexName == info.GetIndexName() {
  54. switch info.GetState() {
  55. case commonpb.IndexState_Finished:
  56. finished = true
  57. return nil
  58. case commonpb.IndexState_Failed:
  59. return fmt.Errorf("create index failed, reason: %s", info.GetIndexStateFailReason())
  60. }
  61. }
  62. }
  63. return nil
  64. })
  65. if err != nil {
  66. return err
  67. }
  68. if finished {
  69. return nil
  70. }
  71. timer.Reset(t.interval)
  72. case <-ctx.Done():
  73. return ctx.Err()
  74. }
  75. }
  76. }
  77. func (c *Client) CreateIndex(ctx context.Context, option CreateIndexOption, callOptions ...grpc.CallOption) (*CreateIndexTask, error) {
  78. req := option.Request()
  79. var task *CreateIndexTask
  80. err := c.callService(func(milvusService milvuspb.MilvusServiceClient) error {
  81. resp, err := milvusService.CreateIndex(ctx, req, callOptions...)
  82. if err = merr.CheckRPCCall(resp, err); err != nil {
  83. return err
  84. }
  85. task = &CreateIndexTask{
  86. client: c,
  87. collectionName: req.GetCollectionName(),
  88. fieldName: req.GetFieldName(),
  89. indexName: req.GetIndexName(),
  90. interval: time.Millisecond * 100,
  91. }
  92. return nil
  93. })
  94. return task, err
  95. }
  96. func (c *Client) ListIndexes(ctx context.Context, opt ListIndexOption, callOptions ...grpc.CallOption) ([]string, error) {
  97. req := opt.Request()
  98. var indexes []string
  99. err := c.callService(func(milvusService milvuspb.MilvusServiceClient) error {
  100. resp, err := milvusService.DescribeIndex(ctx, req, callOptions...)
  101. if err = merr.CheckRPCCall(resp, err); err != nil {
  102. return err
  103. }
  104. for _, idxDef := range resp.GetIndexDescriptions() {
  105. if opt.Matches(idxDef) {
  106. indexes = append(indexes, idxDef.GetIndexName())
  107. }
  108. }
  109. return nil
  110. })
  111. return indexes, err
  112. }
  113. type IndexDescription struct {
  114. index.Index
  115. State index.IndexState
  116. PendingIndexRows int64
  117. TotalRows int64
  118. IndexedRows int64
  119. }
  120. func (c *Client) DescribeIndex(ctx context.Context, opt DescribeIndexOption, callOptions ...grpc.CallOption) (IndexDescription, error) {
  121. req := opt.Request()
  122. var idx IndexDescription
  123. err := c.callService(func(milvusService milvuspb.MilvusServiceClient) error {
  124. resp, err := milvusService.DescribeIndex(ctx, req, callOptions...)
  125. if err = merr.CheckRPCCall(resp, err); err != nil {
  126. return err
  127. }
  128. if len(resp.GetIndexDescriptions()) == 0 {
  129. return merr.WrapErrIndexNotFound(req.GetIndexName())
  130. }
  131. for _, idxDef := range resp.GetIndexDescriptions() {
  132. if idxDef.GetIndexName() == req.GetIndexName() {
  133. idx = IndexDescription{
  134. Index: index.NewGenericIndex(idxDef.GetIndexName(), entity.KvPairsMap(idxDef.GetParams())),
  135. State: index.IndexState(idxDef.GetState()),
  136. PendingIndexRows: idxDef.GetPendingIndexRows(),
  137. IndexedRows: idxDef.GetIndexedRows(),
  138. TotalRows: idxDef.GetTotalRows(),
  139. }
  140. }
  141. }
  142. return nil
  143. })
  144. return idx, err
  145. }
  146. func (c *Client) DropIndex(ctx context.Context, opt DropIndexOption, callOptions ...grpc.CallOption) error {
  147. req := opt.Request()
  148. return c.callService(func(milvusService milvuspb.MilvusServiceClient) error {
  149. resp, err := milvusService.DropIndex(ctx, req, callOptions...)
  150. return merr.CheckRPCCall(resp, err)
  151. })
  152. }