null_data_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. // Licensed to the LF AI & Data foundation under one
  2. // or more contributor license agreements. See the NOTICE file
  3. // distributed with this work for additional information
  4. // regarding copyright ownership. The ASF licenses this file
  5. // to you under the Apache License, Version 2.0 (the
  6. // "License"); you may not use this file except in compliance
  7. // with the License. You may obtain a copy of the License at
  8. //
  9. // http://www.apache.org/licenses/LICENSE-2.0
  10. //
  11. // Unless required by applicable law or agreed to in writing, software
  12. // distributed under the License is distributed on an "AS IS" BASIS,
  13. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. // See the License for the specific language governing permissions and
  15. // limitations under the License.
  16. package hellomilvus
  17. import (
  18. "context"
  19. "fmt"
  20. "testing"
  21. "time"
  22. "github.com/stretchr/testify/suite"
  23. "go.uber.org/zap"
  24. "google.golang.org/protobuf/proto"
  25. "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
  26. "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
  27. "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
  28. "github.com/milvus-io/milvus/pkg/common"
  29. "github.com/milvus-io/milvus/pkg/log"
  30. "github.com/milvus-io/milvus/pkg/util/funcutil"
  31. "github.com/milvus-io/milvus/pkg/util/merr"
  32. "github.com/milvus-io/milvus/pkg/util/metric"
  33. "github.com/milvus-io/milvus/tests/integration"
  34. )
  35. type NullDataSuite struct {
  36. integration.MiniClusterSuite
  37. indexType string
  38. metricType string
  39. vecType schemapb.DataType
  40. }
  41. func getTargetFieldData(fieldName string, fieldDatas []*schemapb.FieldData) *schemapb.FieldData {
  42. var actual *schemapb.FieldData
  43. for _, result := range fieldDatas {
  44. if result.FieldName == fieldName {
  45. actual = result
  46. break
  47. }
  48. }
  49. return actual
  50. }
  51. func (s *NullDataSuite) checkNullableFieldData(fieldName string, fieldDatas []*schemapb.FieldData, start int64) {
  52. actual := getTargetFieldData(fieldName, fieldDatas)
  53. fieldData := actual.GetScalars().GetLongData().Data
  54. validData := actual.GetValidData()
  55. s.Equal(len(validData), len(fieldData))
  56. for i, ans := range actual.GetScalars().GetLongData().Data {
  57. if ans < start {
  58. s.False(validData[i])
  59. } else {
  60. s.True(validData[i])
  61. }
  62. }
  63. }
  64. func (s *NullDataSuite) run() {
  65. ctx, cancel := context.WithCancel(context.Background())
  66. defer cancel()
  67. c := s.Cluster
  68. const (
  69. dim = 128
  70. dbName = ""
  71. rowNum = 100
  72. start = 1000
  73. )
  74. collectionName := "TestNullData" + funcutil.GenRandomStr()
  75. schema := integration.ConstructSchemaOfVecDataType(collectionName, dim, false, s.vecType)
  76. nullableFid := &schemapb.FieldSchema{
  77. FieldID: 102,
  78. Name: "nullableFid",
  79. Description: "",
  80. DataType: schemapb.DataType_Int64,
  81. TypeParams: nil,
  82. IndexParams: nil,
  83. AutoID: false,
  84. Nullable: true,
  85. }
  86. schema.Fields = append(schema.Fields, nullableFid)
  87. marshaledSchema, err := proto.Marshal(schema)
  88. s.NoError(err)
  89. createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
  90. DbName: dbName,
  91. CollectionName: collectionName,
  92. Schema: marshaledSchema,
  93. ShardsNum: common.DefaultShardsNum,
  94. })
  95. s.NoError(err)
  96. if createCollectionStatus.GetErrorCode() != commonpb.ErrorCode_Success {
  97. log.Warn("createCollectionStatus fail reason", zap.String("reason", createCollectionStatus.GetReason()))
  98. }
  99. s.Equal(createCollectionStatus.GetErrorCode(), commonpb.ErrorCode_Success)
  100. log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
  101. showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
  102. s.NoError(err)
  103. s.Equal(showCollectionsResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
  104. log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
  105. fieldsData := make([]*schemapb.FieldData, 0)
  106. fieldsData = append(fieldsData, integration.NewInt64FieldDataWithStart(integration.Int64Field, rowNum, start))
  107. var fVecColumn *schemapb.FieldData
  108. if s.vecType == schemapb.DataType_SparseFloatVector {
  109. fVecColumn = integration.NewSparseFloatVectorFieldData(integration.SparseFloatVecField, rowNum)
  110. } else {
  111. fVecColumn = integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim)
  112. }
  113. fieldsData = append(fieldsData, fVecColumn)
  114. nullableFidData := integration.NewInt64FieldDataNullableWithStart(nullableFid.GetName(), rowNum, start)
  115. fieldsData = append(fieldsData, nullableFidData)
  116. hashKeys := integration.GenerateHashKeys(rowNum)
  117. insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{
  118. DbName: dbName,
  119. CollectionName: collectionName,
  120. FieldsData: fieldsData,
  121. HashKeys: hashKeys,
  122. NumRows: uint32(rowNum),
  123. })
  124. s.NoError(err)
  125. s.Equal(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
  126. // flush
  127. flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{
  128. DbName: dbName,
  129. CollectionNames: []string{collectionName},
  130. })
  131. s.NoError(err)
  132. segmentIDs, has := flushResp.GetCollSegIDs()[collectionName]
  133. ids := segmentIDs.GetData()
  134. s.Require().NotEmpty(segmentIDs)
  135. s.Require().True(has)
  136. flushTs, has := flushResp.GetCollFlushTs()[collectionName]
  137. s.True(has)
  138. segments, err := c.MetaWatcher.ShowSegments()
  139. s.NoError(err)
  140. s.NotEmpty(segments)
  141. for _, segment := range segments {
  142. log.Info("ShowSegments result", zap.String("segment", segment.String()))
  143. }
  144. s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
  145. // create index
  146. createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
  147. CollectionName: collectionName,
  148. FieldName: fVecColumn.FieldName,
  149. IndexName: "_default",
  150. ExtraParams: integration.ConstructIndexParam(dim, s.indexType, s.metricType),
  151. })
  152. if createIndexStatus.GetErrorCode() != commonpb.ErrorCode_Success {
  153. log.Warn("createIndexStatus fail reason", zap.String("reason", createIndexStatus.GetReason()))
  154. }
  155. s.NoError(err)
  156. s.Equal(commonpb.ErrorCode_Success, createIndexStatus.GetErrorCode())
  157. s.WaitForIndexBuilt(ctx, collectionName, fVecColumn.FieldName)
  158. desCollResp, err := c.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
  159. CollectionName: collectionName,
  160. })
  161. s.NoError(err)
  162. s.Equal(desCollResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
  163. compactResp, err := c.Proxy.ManualCompaction(ctx, &milvuspb.ManualCompactionRequest{
  164. CollectionID: desCollResp.GetCollectionID(),
  165. })
  166. s.NoError(err)
  167. s.Equal(compactResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
  168. compacted := func() bool {
  169. resp, err := c.Proxy.GetCompactionState(ctx, &milvuspb.GetCompactionStateRequest{
  170. CompactionID: compactResp.GetCompactionID(),
  171. })
  172. if err != nil {
  173. return false
  174. }
  175. return resp.GetState() == commonpb.CompactionState_Completed
  176. }
  177. for !compacted() {
  178. time.Sleep(3 * time.Second)
  179. }
  180. // load
  181. loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
  182. DbName: dbName,
  183. CollectionName: collectionName,
  184. })
  185. s.NoError(err)
  186. if loadStatus.GetErrorCode() != commonpb.ErrorCode_Success {
  187. log.Warn("loadStatus fail reason", zap.String("reason", loadStatus.GetReason()))
  188. }
  189. s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode())
  190. s.WaitForLoad(ctx, collectionName)
  191. // search
  192. expr := fmt.Sprintf("%s > 0", integration.Int64Field)
  193. nq := 10
  194. topk := 10
  195. roundDecimal := -1
  196. params := integration.GetSearchParams(s.indexType, s.metricType)
  197. searchReq := integration.ConstructSearchRequest("", collectionName, expr,
  198. fVecColumn.FieldName, s.vecType, []string{"nullableFid"}, s.metricType, params, nq, dim, topk, roundDecimal)
  199. searchResult, err := c.Proxy.Search(ctx, searchReq)
  200. err = merr.CheckRPCCall(searchResult, err)
  201. s.NoError(err)
  202. s.checkNullableFieldData(nullableFid.GetName(), searchResult.GetResults().GetFieldsData(), start)
  203. queryResult, err := c.Proxy.Query(ctx, &milvuspb.QueryRequest{
  204. DbName: dbName,
  205. CollectionName: collectionName,
  206. Expr: expr,
  207. OutputFields: []string{"nullableFid"},
  208. })
  209. if queryResult.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
  210. log.Warn("searchResult fail reason", zap.String("reason", queryResult.GetStatus().GetReason()))
  211. }
  212. s.NoError(err)
  213. s.Equal(commonpb.ErrorCode_Success, queryResult.GetStatus().GetErrorCode())
  214. s.checkNullableFieldData(nullableFid.GetName(), queryResult.GetFieldsData(), start)
  215. fieldsData[2] = integration.NewInt64FieldDataNullableWithStart(nullableFid.GetName(), rowNum, start)
  216. fieldsDataForUpsert := make([]*schemapb.FieldData, 0)
  217. fieldsDataForUpsert = append(fieldsDataForUpsert, integration.NewInt64FieldDataWithStart(integration.Int64Field, rowNum, start))
  218. fieldsDataForUpsert = append(fieldsDataForUpsert, fVecColumn)
  219. nullableFidDataForUpsert := &schemapb.FieldData{
  220. Type: schemapb.DataType_Int64,
  221. FieldName: nullableFid.GetName(),
  222. Field: &schemapb.FieldData_Scalars{
  223. Scalars: &schemapb.ScalarField{
  224. Data: &schemapb.ScalarField_LongData{
  225. LongData: &schemapb.LongArray{
  226. Data: []int64{},
  227. },
  228. },
  229. },
  230. },
  231. ValidData: make([]bool, rowNum),
  232. }
  233. fieldsDataForUpsert = append(fieldsDataForUpsert, nullableFidDataForUpsert)
  234. insertResult, err = c.Proxy.Insert(ctx, &milvuspb.InsertRequest{
  235. DbName: dbName,
  236. CollectionName: collectionName,
  237. FieldsData: fieldsData,
  238. HashKeys: hashKeys,
  239. NumRows: uint32(rowNum),
  240. })
  241. s.NoError(err)
  242. s.Equal(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
  243. upsertResult, err := c.Proxy.Upsert(ctx, &milvuspb.UpsertRequest{
  244. DbName: dbName,
  245. CollectionName: collectionName,
  246. FieldsData: fieldsDataForUpsert,
  247. HashKeys: hashKeys,
  248. NumRows: uint32(rowNum),
  249. })
  250. s.NoError(err)
  251. s.Equal(upsertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
  252. // create index
  253. createIndexStatus, err = c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
  254. CollectionName: collectionName,
  255. FieldName: fVecColumn.FieldName,
  256. IndexName: "_default",
  257. ExtraParams: integration.ConstructIndexParam(dim, s.indexType, s.metricType),
  258. })
  259. if createIndexStatus.GetErrorCode() != commonpb.ErrorCode_Success {
  260. log.Warn("createIndexStatus fail reason", zap.String("reason", createIndexStatus.GetReason()))
  261. }
  262. s.NoError(err)
  263. s.Equal(commonpb.ErrorCode_Success, createIndexStatus.GetErrorCode())
  264. s.WaitForIndexBuilt(ctx, collectionName, fVecColumn.FieldName)
  265. desCollResp, err = c.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
  266. CollectionName: collectionName,
  267. })
  268. s.NoError(err)
  269. s.Equal(desCollResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
  270. compactResp, err = c.Proxy.ManualCompaction(ctx, &milvuspb.ManualCompactionRequest{
  271. CollectionID: desCollResp.GetCollectionID(),
  272. })
  273. s.NoError(err)
  274. s.Equal(compactResp.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
  275. compacted = func() bool {
  276. resp, err := c.Proxy.GetCompactionState(ctx, &milvuspb.GetCompactionStateRequest{
  277. CompactionID: compactResp.GetCompactionID(),
  278. })
  279. if err != nil {
  280. return false
  281. }
  282. return resp.GetState() == commonpb.CompactionState_Completed
  283. }
  284. for !compacted() {
  285. time.Sleep(3 * time.Second)
  286. }
  287. // load
  288. loadStatus, err = c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
  289. DbName: dbName,
  290. CollectionName: collectionName,
  291. })
  292. s.NoError(err)
  293. if loadStatus.GetErrorCode() != commonpb.ErrorCode_Success {
  294. log.Warn("loadStatus fail reason", zap.String("reason", loadStatus.GetReason()))
  295. }
  296. s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode())
  297. s.WaitForLoad(ctx, collectionName)
  298. // flush
  299. flushResp, err = c.Proxy.Flush(ctx, &milvuspb.FlushRequest{
  300. DbName: dbName,
  301. CollectionNames: []string{collectionName},
  302. })
  303. s.NoError(err)
  304. segmentIDs, has = flushResp.GetCollSegIDs()[collectionName]
  305. ids = segmentIDs.GetData()
  306. s.Require().NotEmpty(segmentIDs)
  307. s.Require().True(has)
  308. flushTs, has = flushResp.GetCollFlushTs()[collectionName]
  309. s.True(has)
  310. segments, err = c.MetaWatcher.ShowSegments()
  311. s.NoError(err)
  312. s.NotEmpty(segments)
  313. for _, segment := range segments {
  314. log.Info("ShowSegments result", zap.String("segment", segment.String()))
  315. }
  316. s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
  317. // search
  318. searchResult, err = c.Proxy.Search(ctx, searchReq)
  319. err = merr.CheckRPCCall(searchResult, err)
  320. s.NoError(err)
  321. s.checkNullableFieldData(nullableFid.GetName(), searchResult.GetResults().GetFieldsData(), start)
  322. queryResult, err = c.Proxy.Query(ctx, &milvuspb.QueryRequest{
  323. DbName: dbName,
  324. CollectionName: collectionName,
  325. Expr: expr,
  326. OutputFields: []string{"nullableFid"},
  327. })
  328. if queryResult.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
  329. log.Warn("searchResult fail reason", zap.String("reason", queryResult.GetStatus().GetReason()))
  330. }
  331. s.NoError(err)
  332. s.Equal(commonpb.ErrorCode_Success, queryResult.GetStatus().GetErrorCode())
  333. s.checkNullableFieldData(nullableFid.GetName(), queryResult.GetFieldsData(), start)
  334. // // expr will not select null data
  335. // exprResult, err := c.Proxy.Query(ctx, &milvuspb.QueryRequest{
  336. // DbName: dbName,
  337. // CollectionName: collectionName,
  338. // Expr: "nullableFid in [0,1000]",
  339. // OutputFields: []string{"nullableFid"},
  340. // })
  341. // if exprResult.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
  342. // log.Warn("searchResult fail reason", zap.String("reason", queryResult.GetStatus().GetReason()))
  343. // }
  344. // s.NoError(err)
  345. // s.Equal(commonpb.ErrorCode_Success, queryResult.GetStatus().GetErrorCode())
  346. // target := getTargetFieldData(nullableFid.Name, exprResult.GetFieldsData())
  347. // s.Equal(len(target.GetScalars().GetLongData().GetData()), 1)
  348. // s.Equal(len(target.GetValidData()), 1)
  349. deleteResult, err := c.Proxy.Delete(ctx, &milvuspb.DeleteRequest{
  350. DbName: dbName,
  351. CollectionName: collectionName,
  352. Expr: integration.Int64Field + " in [1, 2]",
  353. })
  354. if deleteResult.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
  355. log.Warn("deleteResult fail reason", zap.String("reason", deleteResult.GetStatus().GetReason()))
  356. }
  357. s.NoError(err)
  358. s.Equal(commonpb.ErrorCode_Success, deleteResult.GetStatus().GetErrorCode())
  359. status, err := c.Proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{
  360. CollectionName: collectionName,
  361. })
  362. err = merr.CheckRPCCall(status, err)
  363. s.NoError(err)
  364. status, err = c.Proxy.DropCollection(ctx, &milvuspb.DropCollectionRequest{
  365. CollectionName: collectionName,
  366. })
  367. err = merr.CheckRPCCall(status, err)
  368. s.NoError(err)
  369. log.Info("TestNullData succeed")
  370. }
  371. func (s *NullDataSuite) TestNullData_basic() {
  372. s.indexType = integration.IndexFaissIvfFlat
  373. s.metricType = metric.L2
  374. s.vecType = schemapb.DataType_FloatVector
  375. s.run()
  376. }
  377. func TestNullData(t *testing.T) {
  378. suite.Run(t, new(NullDataSuite))
  379. }