milvus_client.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package base
  2. import (
  3. "context"
  4. "encoding/json"
  5. "strings"
  6. "time"
  7. "go.uber.org/zap"
  8. "google.golang.org/grpc"
  9. "github.com/milvus-io/milvus/client/v2"
  10. "github.com/milvus-io/milvus/client/v2/entity"
  11. "github.com/milvus-io/milvus/pkg/log"
  12. )
  13. func LoggingUnaryInterceptor() grpc.UnaryClientInterceptor {
  14. return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  15. maxLogLength := 300
  16. _method := strings.Split(method, "/")
  17. _methodShotName := _method[len(_method)-1]
  18. // Marshal req to json str
  19. reqJSON, err := json.Marshal(req)
  20. if err != nil {
  21. log.Error("Failed to marshal request", zap.Error(err))
  22. reqJSON = []byte("could not marshal request")
  23. }
  24. reqStr := string(reqJSON)
  25. if len(reqStr) > maxLogLength {
  26. reqStr = reqStr[:maxLogLength] + "..."
  27. }
  28. // log before
  29. log.Info("Request", zap.String("method", _methodShotName), zap.Any("reqs", reqStr))
  30. // invoker
  31. start := time.Now()
  32. errResp := invoker(ctx, method, req, reply, cc, opts...)
  33. cost := time.Since(start)
  34. // Marshal reply to json str
  35. respJSON, err := json.Marshal(reply)
  36. if err != nil {
  37. log.Error("Failed to marshal response", zap.Error(err))
  38. respJSON = []byte("could not marshal response")
  39. }
  40. respStr := string(respJSON)
  41. if len(respStr) > maxLogLength {
  42. respStr = respStr[:maxLogLength] + "..."
  43. }
  44. // log after
  45. log.Info("Response", zap.String("method", _methodShotName), zap.Any("resp", respStr))
  46. log.Debug("Cost", zap.String("method", _methodShotName), zap.Duration("cost", cost))
  47. return errResp
  48. }
  49. }
  50. type MilvusClient struct {
  51. mClient *client.Client
  52. }
  53. func NewMilvusClient(ctx context.Context, cfg *client.ClientConfig) (*MilvusClient, error) {
  54. cfg.DialOptions = append(cfg.DialOptions, grpc.WithUnaryInterceptor(LoggingUnaryInterceptor()))
  55. mClient, err := client.New(ctx, cfg)
  56. return &MilvusClient{
  57. mClient,
  58. }, err
  59. }
  60. func (mc *MilvusClient) Close(ctx context.Context) error {
  61. err := mc.mClient.Close(ctx)
  62. return err
  63. }
  64. // -- database --
  65. // UsingDatabase list all database in milvus cluster.
  66. func (mc *MilvusClient) UsingDatabase(ctx context.Context, option client.UsingDatabaseOption) error {
  67. err := mc.mClient.UsingDatabase(ctx, option)
  68. return err
  69. }
  70. // ListDatabases list all database in milvus cluster.
  71. func (mc *MilvusClient) ListDatabases(ctx context.Context, option client.ListDatabaseOption, callOptions ...grpc.CallOption) ([]string, error) {
  72. databaseNames, err := mc.mClient.ListDatabase(ctx, option, callOptions...)
  73. return databaseNames, err
  74. }
  75. // CreateDatabase create database with the given name.
  76. func (mc *MilvusClient) CreateDatabase(ctx context.Context, option client.CreateDatabaseOption, callOptions ...grpc.CallOption) error {
  77. err := mc.mClient.CreateDatabase(ctx, option, callOptions...)
  78. return err
  79. }
  80. // DropDatabase drop database with the given db name.
  81. func (mc *MilvusClient) DropDatabase(ctx context.Context, option client.DropDatabaseOption, callOptions ...grpc.CallOption) error {
  82. err := mc.mClient.DropDatabase(ctx, option, callOptions...)
  83. return err
  84. }
  85. // -- collection --
  86. // CreateCollection Create Collection
  87. func (mc *MilvusClient) CreateCollection(ctx context.Context, option client.CreateCollectionOption, callOptions ...grpc.CallOption) error {
  88. err := mc.mClient.CreateCollection(ctx, option, callOptions...)
  89. return err
  90. }
  91. // ListCollections Create Collection
  92. func (mc *MilvusClient) ListCollections(ctx context.Context, option client.ListCollectionOption, callOptions ...grpc.CallOption) ([]string, error) {
  93. collectionNames, err := mc.mClient.ListCollections(ctx, option, callOptions...)
  94. return collectionNames, err
  95. }
  96. // DescribeCollection Describe collection
  97. func (mc *MilvusClient) DescribeCollection(ctx context.Context, option client.DescribeCollectionOption, callOptions ...grpc.CallOption) (*entity.Collection, error) {
  98. collection, err := mc.mClient.DescribeCollection(ctx, option, callOptions...)
  99. return collection, err
  100. }
  101. // HasCollection Has collection
  102. func (mc *MilvusClient) HasCollection(ctx context.Context, option client.HasCollectionOption, callOptions ...grpc.CallOption) (bool, error) {
  103. has, err := mc.mClient.HasCollection(ctx, option, callOptions...)
  104. return has, err
  105. }
  106. // DropCollection Drop Collection
  107. func (mc *MilvusClient) DropCollection(ctx context.Context, option client.DropCollectionOption, callOptions ...grpc.CallOption) error {
  108. err := mc.mClient.DropCollection(ctx, option, callOptions...)
  109. return err
  110. }
  111. // -- partition --
  112. // CreatePartition Create Partition
  113. func (mc *MilvusClient) CreatePartition(ctx context.Context, option client.CreatePartitionOption, callOptions ...grpc.CallOption) error {
  114. err := mc.mClient.CreatePartition(ctx, option, callOptions...)
  115. return err
  116. }
  117. // DropPartition Drop Partition
  118. func (mc *MilvusClient) DropPartition(ctx context.Context, option client.DropPartitionOption, callOptions ...grpc.CallOption) error {
  119. err := mc.mClient.DropPartition(ctx, option, callOptions...)
  120. return err
  121. }
  122. // HasPartition Has Partition
  123. func (mc *MilvusClient) HasPartition(ctx context.Context, option client.HasPartitionOption, callOptions ...grpc.CallOption) (bool, error) {
  124. has, err := mc.mClient.HasPartition(ctx, option, callOptions...)
  125. return has, err
  126. }
  127. // ListPartitions List Partitions
  128. func (mc *MilvusClient) ListPartitions(ctx context.Context, option client.ListPartitionsOption, callOptions ...grpc.CallOption) ([]string, error) {
  129. partitionNames, err := mc.mClient.ListPartitions(ctx, option, callOptions...)
  130. return partitionNames, err
  131. }
  132. // LoadPartitions Load Partitions into memory
  133. func (mc *MilvusClient) LoadPartitions(ctx context.Context, option client.LoadPartitionsOption, callOptions ...grpc.CallOption) (client.LoadTask, error) {
  134. loadTask, err := mc.mClient.LoadPartitions(ctx, option, callOptions...)
  135. return loadTask, err
  136. }
  137. // -- index --
  138. // CreateIndex Create Index
  139. func (mc *MilvusClient) CreateIndex(ctx context.Context, option client.CreateIndexOption, callOptions ...grpc.CallOption) (*client.CreateIndexTask, error) {
  140. createIndexTask, err := mc.mClient.CreateIndex(ctx, option, callOptions...)
  141. return createIndexTask, err
  142. }
  143. // ListIndexes List Indexes
  144. func (mc *MilvusClient) ListIndexes(ctx context.Context, option client.ListIndexOption, callOptions ...grpc.CallOption) ([]string, error) {
  145. indexes, err := mc.mClient.ListIndexes(ctx, option, callOptions...)
  146. return indexes, err
  147. }
  148. // DescribeIndex Describe Index
  149. func (mc *MilvusClient) DescribeIndex(ctx context.Context, option client.DescribeIndexOption, callOptions ...grpc.CallOption) (client.IndexDescription, error) {
  150. idxDesc, err := mc.mClient.DescribeIndex(ctx, option, callOptions...)
  151. return idxDesc, err
  152. }
  153. // DropIndex Drop Index
  154. func (mc *MilvusClient) DropIndex(ctx context.Context, option client.DropIndexOption, callOptions ...grpc.CallOption) error {
  155. err := mc.mClient.DropIndex(ctx, option, callOptions...)
  156. return err
  157. }
  158. // -- write --
  159. // Insert insert data
  160. func (mc *MilvusClient) Insert(ctx context.Context, option client.InsertOption, callOptions ...grpc.CallOption) (client.InsertResult, error) {
  161. insertRes, err := mc.mClient.Insert(ctx, option, callOptions...)
  162. if err == nil {
  163. log.Info("Insert", zap.Any("result", insertRes))
  164. }
  165. return insertRes, err
  166. }
  167. // Flush flush data
  168. func (mc *MilvusClient) Flush(ctx context.Context, option client.FlushOption, callOptions ...grpc.CallOption) (*client.FlushTask, error) {
  169. flushTask, err := mc.mClient.Flush(ctx, option, callOptions...)
  170. return flushTask, err
  171. }
  172. // Delete deletes data
  173. func (mc *MilvusClient) Delete(ctx context.Context, option client.DeleteOption, callOptions ...grpc.CallOption) (client.DeleteResult, error) {
  174. deleteRes, err := mc.mClient.Delete(ctx, option, callOptions...)
  175. return deleteRes, err
  176. }
  177. // Upsert upsert data
  178. func (mc *MilvusClient) Upsert(ctx context.Context, option client.UpsertOption, callOptions ...grpc.CallOption) (client.UpsertResult, error) {
  179. upsertRes, err := mc.mClient.Upsert(ctx, option, callOptions...)
  180. return upsertRes, err
  181. }
  182. // -- read --
  183. // LoadCollection Load Collection
  184. func (mc *MilvusClient) LoadCollection(ctx context.Context, option client.LoadCollectionOption, callOptions ...grpc.CallOption) (client.LoadTask, error) {
  185. loadTask, err := mc.mClient.LoadCollection(ctx, option, callOptions...)
  186. return loadTask, err
  187. }
  188. // Search search from collection
  189. func (mc *MilvusClient) Search(ctx context.Context, option client.SearchOption, callOptions ...grpc.CallOption) ([]client.ResultSet, error) {
  190. resultSets, err := mc.mClient.Search(ctx, option, callOptions...)
  191. return resultSets, err
  192. }
  193. // Query query from collection
  194. func (mc *MilvusClient) Query(ctx context.Context, option client.QueryOption, callOptions ...grpc.CallOption) (client.ResultSet, error) {
  195. resultSet, err := mc.mClient.Query(ctx, option, callOptions...)
  196. return resultSet, err
  197. }