read.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. // Licensed to the LF AI & Data foundation under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing, software
  12. // distributed under the License is distributed on an "AS IS" BASIS,
  13. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. // See the License for the specific language governing permissions and
  15. // limitations under the License.
  16. package client
  17. import (
  18. "context"
  19. "github.com/cockroachdb/errors"
  20. "google.golang.org/grpc"
  21. "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
  22. "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
  23. "github.com/milvus-io/milvus/client/v2/column"
  24. "github.com/milvus-io/milvus/client/v2/entity"
  25. "github.com/milvus-io/milvus/pkg/util/merr"
  26. )
  27. func (c *Client) Search(ctx context.Context, option SearchOption, callOptions ...grpc.CallOption) ([]ResultSet, error) {
  28. req := option.Request()
  29. collection, err := c.getCollection(ctx, req.GetCollectionName())
  30. if err != nil {
  31. return nil, err
  32. }
  33. var resultSets []ResultSet
  34. err = c.callService(func(milvusService milvuspb.MilvusServiceClient) error {
  35. resp, err := milvusService.Search(ctx, req, callOptions...)
  36. err = merr.CheckRPCCall(resp, err)
  37. if err != nil {
  38. return err
  39. }
  40. resultSets, err = c.handleSearchResult(collection.Schema, req.GetOutputFields(), int(req.GetNq()), resp)
  41. return err
  42. })
  43. return resultSets, err
  44. }
  45. func (c *Client) handleSearchResult(schema *entity.Schema, outputFields []string, nq int, resp *milvuspb.SearchResults) ([]ResultSet, error) {
  46. sr := make([]ResultSet, 0, nq)
  47. results := resp.GetResults()
  48. offset := 0
  49. fieldDataList := results.GetFieldsData()
  50. gb := results.GetGroupByFieldValue()
  51. for i := 0; i < int(results.GetNumQueries()); i++ {
  52. rc := int(results.GetTopks()[i]) // result entry count for current query
  53. entry := ResultSet{
  54. ResultCount: rc,
  55. Scores: results.GetScores()[offset : offset+rc],
  56. sch: schema,
  57. }
  58. entry.IDs, entry.Err = column.IDColumns(schema, results.GetIds(), offset, offset+rc)
  59. if entry.Err != nil {
  60. offset += rc
  61. continue
  62. }
  63. // parse group-by values
  64. if gb != nil {
  65. entry.GroupByValue, entry.Err = column.FieldDataColumn(gb, offset, offset+rc)
  66. if entry.Err != nil {
  67. offset += rc
  68. continue
  69. }
  70. }
  71. entry.Fields, entry.Err = c.parseSearchResult(schema, outputFields, fieldDataList, i, offset, offset+rc)
  72. sr = append(sr, entry)
  73. offset += rc
  74. }
  75. return sr, nil
  76. }
  77. func (c *Client) parseSearchResult(sch *entity.Schema, outputFields []string, fieldDataList []*schemapb.FieldData, _, from, to int) ([]column.Column, error) {
  78. var wildcard bool
  79. outputFields, wildcard = expandWildcard(sch, outputFields)
  80. // duplicated name will have only one column now
  81. outputSet := make(map[string]struct{})
  82. for _, output := range outputFields {
  83. outputSet[output] = struct{}{}
  84. }
  85. // fields := make(map[string]*schemapb.FieldData)
  86. columns := make([]column.Column, 0, len(outputFields))
  87. var dynamicColumn *column.ColumnJSONBytes
  88. for _, fieldData := range fieldDataList {
  89. col, err := column.FieldDataColumn(fieldData, from, to)
  90. if err != nil {
  91. return nil, err
  92. }
  93. if fieldData.GetIsDynamic() {
  94. var ok bool
  95. dynamicColumn, ok = col.(*column.ColumnJSONBytes)
  96. if !ok {
  97. return nil, errors.New("dynamic field not json")
  98. }
  99. // return json column only explicitly specified in output fields and not in wildcard mode
  100. if _, ok := outputSet[fieldData.GetFieldName()]; !ok && !wildcard {
  101. continue
  102. }
  103. }
  104. // remove processed field
  105. delete(outputSet, fieldData.GetFieldName())
  106. columns = append(columns, col)
  107. }
  108. if len(outputSet) > 0 && dynamicColumn == nil {
  109. var extraFields []string
  110. for output := range outputSet {
  111. extraFields = append(extraFields, output)
  112. }
  113. return nil, errors.Newf("extra output fields %v found and result does not dynamic field", extraFields)
  114. }
  115. // add dynamic column for extra fields
  116. for outputField := range outputSet {
  117. column := column.NewColumnDynamic(dynamicColumn, outputField)
  118. columns = append(columns, column)
  119. }
  120. return columns, nil
  121. }
  122. func (c *Client) Query(ctx context.Context, option QueryOption, callOptions ...grpc.CallOption) (ResultSet, error) {
  123. req := option.Request()
  124. var resultSet ResultSet
  125. collection, err := c.getCollection(ctx, req.GetCollectionName())
  126. if err != nil {
  127. return resultSet, err
  128. }
  129. err = c.callService(func(milvusService milvuspb.MilvusServiceClient) error {
  130. resp, err := milvusService.Query(ctx, req, callOptions...)
  131. err = merr.CheckRPCCall(resp, err)
  132. if err != nil {
  133. return err
  134. }
  135. columns, err := c.parseSearchResult(collection.Schema, resp.GetOutputFields(), resp.GetFieldsData(), 0, 0, -1)
  136. if err != nil {
  137. return err
  138. }
  139. resultSet = ResultSet{
  140. Fields: columns,
  141. }
  142. if len(columns) > 0 {
  143. resultSet.ResultCount = columns[0].Len()
  144. }
  145. return nil
  146. })
  147. return resultSet, err
  148. }
  149. func expandWildcard(schema *entity.Schema, outputFields []string) ([]string, bool) {
  150. wildcard := false
  151. for _, outputField := range outputFields {
  152. if outputField == "*" {
  153. wildcard = true
  154. }
  155. }
  156. if !wildcard {
  157. return outputFields, false
  158. }
  159. set := make(map[string]struct{})
  160. result := make([]string, 0, len(schema.Fields))
  161. for _, field := range schema.Fields {
  162. result = append(result, field.Name)
  163. set[field.Name] = struct{}{}
  164. }
  165. // add dynamic fields output
  166. for _, output := range outputFields {
  167. if output == "*" {
  168. continue
  169. }
  170. _, ok := set[output]
  171. if !ok {
  172. result = append(result, output)
  173. }
  174. }
  175. return result, true
  176. }