mongodb.go 21 KB


  1. // Copyright 2021 gorse Project Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package data
  15. import (
  16. "context"
  17. "encoding/base64"
  18. "encoding/json"
  19. "time"
  20. mapset "github.com/deckarep/golang-set/v2"
  21. "github.com/juju/errors"
  22. "github.com/zhenghaoz/gorse/storage"
  23. "go.mongodb.org/mongo-driver/bson"
  24. "go.mongodb.org/mongo-driver/bson/primitive"
  25. "go.mongodb.org/mongo-driver/mongo"
  26. "go.mongodb.org/mongo-driver/mongo/options"
  27. )
  28. func feedbackKeyFromString(s string) (*FeedbackKey, error) {
  29. var feedbackKey FeedbackKey
  30. err := json.Unmarshal([]byte(s), &feedbackKey)
  31. return &feedbackKey, err
  32. }
  33. func (k *FeedbackKey) toString() (string, error) {
  34. b, err := json.Marshal(k)
  35. return string(b), err
  36. }
  37. func unpack(o any) any {
  38. if o == nil {
  39. return nil
  40. }
  41. switch p := o.(type) {
  42. case primitive.A:
  43. return []any(p)
  44. case primitive.D:
  45. m := make(map[string]any)
  46. for _, e := range p {
  47. m[e.Key] = unpack(e.Value)
  48. }
  49. return m
  50. default:
  51. return p
  52. }
  53. }
  54. // MongoDB is the data storage based on MongoDB.
  55. type MongoDB struct {
  56. storage.TablePrefix
  57. client *mongo.Client
  58. dbName string
  59. }
  60. // Init collections and indices in MongoDB.
  61. func (db *MongoDB) Init() error {
  62. ctx := context.Background()
  63. d := db.client.Database(db.dbName)
  64. // list collections
  65. var hasUsers, hasItems, hasFeedback bool
  66. collections, err := d.ListCollectionNames(ctx, bson.M{})
  67. if err != nil {
  68. return errors.Trace(err)
  69. }
  70. for _, collectionName := range collections {
  71. switch collectionName {
  72. case db.UsersTable():
  73. hasUsers = true
  74. case db.ItemsTable():
  75. hasItems = true
  76. case db.FeedbackTable():
  77. hasFeedback = true
  78. }
  79. }
  80. // create collections
  81. if !hasUsers {
  82. if err = d.CreateCollection(ctx, db.UsersTable()); err != nil {
  83. return errors.Trace(err)
  84. }
  85. }
  86. if !hasItems {
  87. if err = d.CreateCollection(ctx, db.ItemsTable()); err != nil {
  88. return errors.Trace(err)
  89. }
  90. }
  91. if !hasFeedback {
  92. if err = d.CreateCollection(ctx, db.FeedbackTable()); err != nil {
  93. return errors.Trace(err)
  94. }
  95. }
  96. // create index
  97. _, err = d.Collection(db.UsersTable()).Indexes().CreateOne(ctx, mongo.IndexModel{
  98. Keys: bson.M{
  99. "userid": 1,
  100. },
  101. Options: options.Index().SetUnique(true),
  102. })
  103. if err != nil {
  104. return errors.Trace(err)
  105. }
  106. _, err = d.Collection(db.ItemsTable()).Indexes().CreateOne(ctx, mongo.IndexModel{
  107. Keys: bson.M{
  108. "itemid": 1,
  109. },
  110. Options: options.Index().SetUnique(true),
  111. })
  112. if err != nil {
  113. return errors.Trace(err)
  114. }
  115. _, err = d.Collection(db.FeedbackTable()).Indexes().CreateOne(ctx, mongo.IndexModel{
  116. Keys: bson.M{
  117. "feedbackkey": 1,
  118. },
  119. Options: options.Index().SetUnique(true),
  120. })
  121. if err != nil {
  122. return errors.Trace(err)
  123. }
  124. _, err = d.Collection(db.FeedbackTable()).Indexes().CreateOne(ctx, mongo.IndexModel{
  125. Keys: bson.M{
  126. "feedbackkey.userid": 1,
  127. },
  128. })
  129. if err != nil {
  130. return errors.Trace(err)
  131. }
  132. _, err = d.Collection(db.FeedbackTable()).Indexes().CreateOne(ctx, mongo.IndexModel{
  133. Keys: bson.M{
  134. "feedbackkey.itemid": 1,
  135. },
  136. })
  137. if err != nil {
  138. return errors.Trace(err)
  139. }
  140. return nil
  141. }
  142. func (db *MongoDB) Ping() error {
  143. return db.client.Ping(context.Background(), nil)
  144. }
  145. // Close connection to MongoDB.
  146. func (db *MongoDB) Close() error {
  147. return db.client.Disconnect(context.Background())
  148. }
  149. func (db *MongoDB) Purge() error {
  150. tables := []string{db.ItemsTable(), db.FeedbackTable(), db.UsersTable()}
  151. for _, tableName := range tables {
  152. c := db.client.Database(db.dbName).Collection(tableName)
  153. _, err := c.DeleteMany(context.Background(), bson.D{})
  154. if err != nil {
  155. return errors.Trace(err)
  156. }
  157. }
  158. return nil
  159. }
  160. // BatchInsertItems insert items into MongoDB.
  161. func (db *MongoDB) BatchInsertItems(ctx context.Context, items []Item) error {
  162. if len(items) == 0 {
  163. return nil
  164. }
  165. c := db.client.Database(db.dbName).Collection(db.ItemsTable())
  166. var models []mongo.WriteModel
  167. for _, item := range items {
  168. models = append(models, mongo.NewUpdateOneModel().
  169. SetUpsert(true).
  170. SetFilter(bson.M{"itemid": bson.M{"$eq": item.ItemId}}).
  171. SetUpdate(bson.M{"$set": item}))
  172. }
  173. _, err := c.BulkWrite(ctx, models)
  174. return errors.Trace(err)
  175. }
  176. func (db *MongoDB) BatchGetItems(ctx context.Context, itemIds []string) ([]Item, error) {
  177. if len(itemIds) == 0 {
  178. return nil, nil
  179. }
  180. c := db.client.Database(db.dbName).Collection(db.ItemsTable())
  181. r, err := c.Find(ctx, bson.M{"itemid": bson.M{"$in": itemIds}})
  182. if err != nil {
  183. return nil, errors.Trace(err)
  184. }
  185. items := make([]Item, 0)
  186. defer r.Close(ctx)
  187. for r.Next(ctx) {
  188. var item Item
  189. if err = r.Decode(&item); err != nil {
  190. return nil, errors.Trace(err)
  191. }
  192. item.Labels = unpack(item.Labels)
  193. items = append(items, item)
  194. }
  195. return items, nil
  196. }
  197. // ModifyItem modify an item in MongoDB.
  198. func (db *MongoDB) ModifyItem(ctx context.Context, itemId string, patch ItemPatch) error {
  199. // create update
  200. update := bson.M{}
  201. if patch.IsHidden != nil {
  202. update["ishidden"] = patch.IsHidden
  203. }
  204. if patch.Categories != nil {
  205. update["categories"] = patch.Categories
  206. }
  207. if patch.Comment != nil {
  208. update["comment"] = patch.Comment
  209. }
  210. if patch.Labels != nil {
  211. update["labels"] = patch.Labels
  212. }
  213. if patch.Timestamp != nil {
  214. update["timestamp"] = patch.Timestamp
  215. }
  216. // execute
  217. c := db.client.Database(db.dbName).Collection(db.ItemsTable())
  218. _, err := c.UpdateOne(ctx, bson.M{"itemid": bson.M{"$eq": itemId}}, bson.M{"$set": update})
  219. return errors.Trace(err)
  220. }
  221. // DeleteItem deletes a item from MongoDB.
  222. func (db *MongoDB) DeleteItem(ctx context.Context, itemId string) error {
  223. c := db.client.Database(db.dbName).Collection(db.ItemsTable())
  224. _, err := c.DeleteOne(ctx, bson.M{"itemid": itemId})
  225. if err != nil {
  226. return errors.Trace(err)
  227. }
  228. c = db.client.Database(db.dbName).Collection(db.FeedbackTable())
  229. _, err = c.DeleteMany(ctx, bson.M{
  230. "feedbackkey.itemid": bson.M{"$eq": itemId},
  231. })
  232. return errors.Trace(err)
  233. }
  234. // GetItem returns a item from MongoDB.
  235. func (db *MongoDB) GetItem(ctx context.Context, itemId string) (item Item, err error) {
  236. c := db.client.Database(db.dbName).Collection(db.ItemsTable())
  237. r := c.FindOne(ctx, bson.M{"itemid": itemId})
  238. if r.Err() == mongo.ErrNoDocuments {
  239. err = errors.Annotate(ErrItemNotExist, itemId)
  240. return
  241. }
  242. err = r.Decode(&item)
  243. item.Labels = unpack(item.Labels)
  244. return
  245. }
  246. // GetItems returns items from MongoDB.
  247. func (db *MongoDB) GetItems(ctx context.Context, cursor string, n int, timeLimit *time.Time) (string, []Item, error) {
  248. buf, err := base64.StdEncoding.DecodeString(cursor)
  249. if err != nil {
  250. return "", nil, errors.Trace(err)
  251. }
  252. cursorItem := string(buf)
  253. c := db.client.Database(db.dbName).Collection(db.ItemsTable())
  254. opt := options.Find()
  255. opt.SetLimit(int64(n))
  256. opt.SetSort(bson.D{{"itemid", 1}})
  257. filter := bson.M{"itemid": bson.M{"$gt": cursorItem}}
  258. if timeLimit != nil {
  259. filter["timestamp"] = bson.M{"$gt": *timeLimit}
  260. }
  261. r, err := c.Find(ctx, filter, opt)
  262. if err != nil {
  263. return "", nil, err
  264. }
  265. items := make([]Item, 0)
  266. defer r.Close(ctx)
  267. for r.Next(ctx) {
  268. var item Item
  269. if err = r.Decode(&item); err != nil {
  270. return "", nil, err
  271. }
  272. item.Labels = unpack(item.Labels)
  273. items = append(items, item)
  274. }
  275. if len(items) == n {
  276. cursor = items[n-1].ItemId
  277. } else {
  278. cursor = ""
  279. }
  280. return base64.StdEncoding.EncodeToString([]byte(cursor)), items, nil
  281. }
  282. // GetItemStream read items from MongoDB by stream.
  283. func (db *MongoDB) GetItemStream(ctx context.Context, batchSize int, timeLimit *time.Time) (chan []Item, chan error) {
  284. itemChan := make(chan []Item, bufSize)
  285. errChan := make(chan error, 1)
  286. go func() {
  287. defer close(itemChan)
  288. defer close(errChan)
  289. // send query
  290. ctx := context.Background()
  291. c := db.client.Database(db.dbName).Collection(db.ItemsTable())
  292. opt := options.Find()
  293. filter := bson.M{}
  294. if timeLimit != nil {
  295. filter["timestamp"] = bson.M{"$gt": *timeLimit}
  296. }
  297. r, err := c.Find(ctx, filter, opt)
  298. if err != nil {
  299. errChan <- errors.Trace(err)
  300. return
  301. }
  302. // fetch result
  303. items := make([]Item, 0, batchSize)
  304. defer r.Close(ctx)
  305. for r.Next(ctx) {
  306. var item Item
  307. if err = r.Decode(&item); err != nil {
  308. errChan <- errors.Trace(err)
  309. return
  310. }
  311. item.Labels = unpack(item.Labels)
  312. items = append(items, item)
  313. if len(items) == batchSize {
  314. itemChan <- items
  315. items = make([]Item, 0, batchSize)
  316. }
  317. }
  318. if len(items) > 0 {
  319. itemChan <- items
  320. }
  321. errChan <- nil
  322. }()
  323. return itemChan, errChan
  324. }
  325. // GetItemFeedback returns feedback of a item from MongoDB.
  326. func (db *MongoDB) GetItemFeedback(ctx context.Context, itemId string, feedbackTypes ...string) ([]Feedback, error) {
  327. c := db.client.Database(db.dbName).Collection(db.FeedbackTable())
  328. var r *mongo.Cursor
  329. var err error
  330. filter := bson.M{
  331. "feedbackkey.itemid": bson.M{"$eq": itemId},
  332. "timestamp": bson.M{"$lte": time.Now()},
  333. }
  334. if len(feedbackTypes) > 0 {
  335. filter["feedbackkey.feedbacktype"] = bson.M{"$in": feedbackTypes}
  336. }
  337. r, err = c.Find(ctx, filter)
  338. if err != nil {
  339. return nil, err
  340. }
  341. feedbacks := make([]Feedback, 0)
  342. defer r.Close(ctx)
  343. for r.Next(ctx) {
  344. var feedback Feedback
  345. if err = r.Decode(&feedback); err != nil {
  346. return nil, err
  347. }
  348. feedbacks = append(feedbacks, feedback)
  349. }
  350. return feedbacks, nil
  351. }
  352. // BatchInsertUsers inserts a user into MongoDB.
  353. func (db *MongoDB) BatchInsertUsers(ctx context.Context, users []User) error {
  354. if len(users) == 0 {
  355. return nil
  356. }
  357. c := db.client.Database(db.dbName).Collection(db.UsersTable())
  358. var models []mongo.WriteModel
  359. for _, user := range users {
  360. models = append(models, mongo.NewUpdateOneModel().
  361. SetUpsert(true).
  362. SetFilter(bson.M{"userid": bson.M{"$eq": user.UserId}}).
  363. SetUpdate(bson.M{"$set": user}))
  364. }
  365. _, err := c.BulkWrite(ctx, models)
  366. return errors.Trace(err)
  367. }
  368. // ModifyUser modify a user in MongoDB.
  369. func (db *MongoDB) ModifyUser(ctx context.Context, userId string, patch UserPatch) error {
  370. // create patch
  371. update := bson.M{}
  372. if patch.Labels != nil {
  373. update["labels"] = patch.Labels
  374. }
  375. if patch.Comment != nil {
  376. update["comment"] = patch.Comment
  377. }
  378. if patch.Subscribe != nil {
  379. update["subscribe"] = patch.Subscribe
  380. }
  381. // execute
  382. c := db.client.Database(db.dbName).Collection(db.UsersTable())
  383. _, err := c.UpdateOne(ctx, bson.M{"userid": bson.M{"$eq": userId}}, bson.M{"$set": update})
  384. return errors.Trace(err)
  385. }
  386. // DeleteUser deletes a user from MongoDB.
  387. func (db *MongoDB) DeleteUser(ctx context.Context, userId string) error {
  388. c := db.client.Database(db.dbName).Collection(db.UsersTable())
  389. _, err := c.DeleteOne(ctx, bson.M{"userid": userId})
  390. if err != nil {
  391. return errors.Trace(err)
  392. }
  393. c = db.client.Database(db.dbName).Collection(db.FeedbackTable())
  394. _, err = c.DeleteMany(ctx, bson.M{
  395. "feedbackkey.userid": bson.M{"$eq": userId},
  396. })
  397. return errors.Trace(err)
  398. }
  399. // GetUser returns a user from MongoDB.
  400. func (db *MongoDB) GetUser(ctx context.Context, userId string) (user User, err error) {
  401. c := db.client.Database(db.dbName).Collection(db.UsersTable())
  402. r := c.FindOne(ctx, bson.M{"userid": userId})
  403. if r.Err() == mongo.ErrNoDocuments {
  404. err = errors.Annotate(ErrUserNotExist, userId)
  405. return
  406. }
  407. err = r.Decode(&user)
  408. user.Labels = unpack(user.Labels)
  409. return
  410. }
  411. // GetUsers returns users from MongoDB.
  412. func (db *MongoDB) GetUsers(ctx context.Context, cursor string, n int) (string, []User, error) {
  413. buf, err := base64.StdEncoding.DecodeString(cursor)
  414. if err != nil {
  415. return "", nil, errors.Trace(err)
  416. }
  417. cursorUser := string(buf)
  418. c := db.client.Database(db.dbName).Collection(db.UsersTable())
  419. opt := options.Find()
  420. opt.SetLimit(int64(n))
  421. opt.SetSort(bson.D{{"userid", 1}})
  422. r, err := c.Find(ctx, bson.M{"userid": bson.M{"$gt": cursorUser}}, opt)
  423. if err != nil {
  424. return "", nil, err
  425. }
  426. users := make([]User, 0)
  427. defer r.Close(ctx)
  428. for r.Next(ctx) {
  429. var user User
  430. if err = r.Decode(&user); err != nil {
  431. return "", nil, err
  432. }
  433. user.Labels = unpack(user.Labels)
  434. users = append(users, user)
  435. }
  436. if len(users) == n {
  437. cursor = users[n-1].UserId
  438. } else {
  439. cursor = ""
  440. }
  441. return base64.StdEncoding.EncodeToString([]byte(cursor)), users, nil
  442. }
  443. // GetUserStream reads users from MongoDB by stream.
  444. func (db *MongoDB) GetUserStream(ctx context.Context, batchSize int) (chan []User, chan error) {
  445. userChan := make(chan []User, bufSize)
  446. errChan := make(chan error, 1)
  447. go func() {
  448. defer close(userChan)
  449. defer close(errChan)
  450. // send query
  451. ctx := context.Background()
  452. c := db.client.Database(db.dbName).Collection(db.UsersTable())
  453. opt := options.Find()
  454. r, err := c.Find(ctx, bson.M{}, opt)
  455. if err != nil {
  456. errChan <- errors.Trace(err)
  457. return
  458. }
  459. users := make([]User, 0, batchSize)
  460. defer r.Close(ctx)
  461. for r.Next(ctx) {
  462. var user User
  463. if err = r.Decode(&user); err != nil {
  464. errChan <- errors.Trace(err)
  465. return
  466. }
  467. user.Labels = unpack(user.Labels)
  468. users = append(users, user)
  469. if len(users) == batchSize {
  470. userChan <- users
  471. users = make([]User, 0, batchSize)
  472. }
  473. }
  474. if len(users) > 0 {
  475. userChan <- users
  476. }
  477. errChan <- nil
  478. }()
  479. return userChan, errChan
  480. }
  481. // GetUserFeedback returns feedback of a user from MongoDB.
  482. func (db *MongoDB) GetUserFeedback(ctx context.Context, userId string, endTime *time.Time, feedbackTypes ...string) ([]Feedback, error) {
  483. c := db.client.Database(db.dbName).Collection(db.FeedbackTable())
  484. var r *mongo.Cursor
  485. var err error
  486. filter := bson.M{
  487. "feedbackkey.userid": bson.M{"$eq": userId},
  488. }
  489. if endTime != nil {
  490. filter["timestamp"] = bson.M{"$lte": endTime}
  491. }
  492. if len(feedbackTypes) > 0 {
  493. filter["feedbackkey.feedbacktype"] = bson.M{"$in": feedbackTypes}
  494. }
  495. r, err = c.Find(ctx, filter)
  496. if err != nil {
  497. return nil, err
  498. }
  499. feedbacks := make([]Feedback, 0)
  500. defer r.Close(ctx)
  501. for r.Next(ctx) {
  502. var feedback Feedback
  503. if err = r.Decode(&feedback); err != nil {
  504. return nil, err
  505. }
  506. feedbacks = append(feedbacks, feedback)
  507. }
  508. return feedbacks, nil
  509. }
  510. // BatchInsertFeedback returns multiple feedback into MongoDB.
  511. func (db *MongoDB) BatchInsertFeedback(ctx context.Context, feedback []Feedback, insertUser, insertItem, overwrite bool) error {
  512. // skip empty list
  513. if len(feedback) == 0 {
  514. return nil
  515. }
  516. // collect users and items
  517. users := mapset.NewSet[string]()
  518. items := mapset.NewSet[string]()
  519. for _, v := range feedback {
  520. users.Add(v.UserId)
  521. items.Add(v.ItemId)
  522. }
  523. // insert users
  524. userList := users.ToSlice()
  525. if insertUser {
  526. var models []mongo.WriteModel
  527. for _, userId := range userList {
  528. models = append(models, mongo.NewUpdateOneModel().
  529. SetUpsert(true).
  530. SetFilter(bson.M{"userid": bson.M{"$eq": userId}}).
  531. SetUpdate(bson.M{"$setOnInsert": User{UserId: userId}}))
  532. }
  533. c := db.client.Database(db.dbName).Collection(db.UsersTable())
  534. _, err := c.BulkWrite(ctx, models)
  535. if err != nil {
  536. return errors.Trace(err)
  537. }
  538. } else {
  539. for _, userId := range userList {
  540. _, err := db.GetUser(ctx, userId)
  541. if err != nil {
  542. if errors.Is(err, errors.NotFound) {
  543. users.Remove(userId)
  544. continue
  545. }
  546. return errors.Trace(err)
  547. }
  548. }
  549. }
  550. // insert items
  551. itemList := items.ToSlice()
  552. if insertItem {
  553. var models []mongo.WriteModel
  554. for _, itemId := range itemList {
  555. models = append(models, mongo.NewUpdateOneModel().
  556. SetUpsert(true).
  557. SetFilter(bson.M{"itemid": bson.M{"$eq": itemId}}).
  558. SetUpdate(bson.M{"$setOnInsert": Item{ItemId: itemId}}))
  559. }
  560. c := db.client.Database(db.dbName).Collection(db.ItemsTable())
  561. _, err := c.BulkWrite(ctx, models)
  562. if err != nil {
  563. return errors.Trace(err)
  564. }
  565. } else {
  566. for _, itemId := range itemList {
  567. _, err := db.GetItem(ctx, itemId)
  568. if err != nil {
  569. if errors.Is(err, errors.NotFound) {
  570. items.Remove(itemId)
  571. continue
  572. }
  573. return errors.Trace(err)
  574. }
  575. }
  576. }
  577. // insert feedback
  578. c := db.client.Database(db.dbName).Collection(db.FeedbackTable())
  579. var models []mongo.WriteModel
  580. for _, f := range feedback {
  581. if users.Contains(f.UserId) && items.Contains(f.ItemId) {
  582. model := mongo.NewUpdateOneModel().
  583. SetUpsert(true).
  584. SetFilter(bson.M{
  585. "feedbackkey": f.FeedbackKey,
  586. })
  587. if overwrite {
  588. model.SetUpdate(bson.M{"$set": f})
  589. } else {
  590. model.SetUpdate(bson.M{"$setOnInsert": f})
  591. }
  592. models = append(models, model)
  593. }
  594. }
  595. if len(models) == 0 {
  596. return nil
  597. }
  598. _, err := c.BulkWrite(ctx, models)
  599. return errors.Trace(err)
  600. }
  601. // GetFeedback returns multiple feedback from MongoDB.
  602. func (db *MongoDB) GetFeedback(ctx context.Context, cursor string, n int, beginTime, endTime *time.Time, feedbackTypes ...string) (string, []Feedback, error) {
  603. buf, err := base64.StdEncoding.DecodeString(cursor)
  604. if err != nil {
  605. return "", nil, errors.Trace(err)
  606. }
  607. c := db.client.Database(db.dbName).Collection(db.FeedbackTable())
  608. opt := options.Find()
  609. opt.SetLimit(int64(n))
  610. opt.SetSort(bson.D{{"feedbackkey", 1}})
  611. filter := make(bson.M)
  612. // pass cursor to filter
  613. if len(buf) > 0 {
  614. feedbackKey, err := feedbackKeyFromString(string(buf))
  615. if err != nil {
  616. return "", nil, err
  617. }
  618. filter["feedbackkey"] = bson.M{"$gt": feedbackKey}
  619. }
  620. // pass feedback type to filter
  621. if len(feedbackTypes) > 0 {
  622. filter["feedbackkey.feedbacktype"] = bson.M{"$in": feedbackTypes}
  623. }
  624. // pass time limit to filter
  625. timestampConditions := bson.M{}
  626. if beginTime != nil {
  627. timestampConditions["$gt"] = *beginTime
  628. }
  629. if endTime != nil {
  630. timestampConditions["$lte"] = *endTime
  631. }
  632. filter["timestamp"] = timestampConditions
  633. r, err := c.Find(ctx, filter, opt)
  634. if err != nil {
  635. return "", nil, err
  636. }
  637. feedbacks := make([]Feedback, 0)
  638. defer r.Close(ctx)
  639. for r.Next(ctx) {
  640. var feedback Feedback
  641. if err = r.Decode(&feedback); err != nil {
  642. return "", nil, err
  643. }
  644. feedbacks = append(feedbacks, feedback)
  645. }
  646. if len(feedbacks) == n {
  647. cursor, err = feedbacks[n-1].toString()
  648. if err != nil {
  649. return "", nil, err
  650. }
  651. } else {
  652. cursor = ""
  653. }
  654. return base64.StdEncoding.EncodeToString([]byte(cursor)), feedbacks, nil
  655. }
  656. // GetFeedbackStream reads feedback from MongoDB by stream.
  657. func (db *MongoDB) GetFeedbackStream(ctx context.Context, batchSize int, scanOptions ...ScanOption) (chan []Feedback, chan error) {
  658. scan := NewScanOptions(scanOptions...)
  659. feedbackChan := make(chan []Feedback, bufSize)
  660. errChan := make(chan error, 1)
  661. go func() {
  662. defer close(feedbackChan)
  663. defer close(errChan)
  664. // send query
  665. ctx := context.Background()
  666. c := db.client.Database(db.dbName).Collection(db.FeedbackTable())
  667. opt := options.Find()
  668. filter := make(bson.M)
  669. // pass feedback type to filter
  670. if len(scan.FeedbackTypes) > 0 {
  671. filter["feedbackkey.feedbacktype"] = bson.M{"$in": scan.FeedbackTypes}
  672. }
  673. // pass time limit to filter
  674. if scan.BeginTime != nil || scan.EndTime != nil {
  675. timestampConditions := bson.M{}
  676. if scan.BeginTime != nil {
  677. timestampConditions["$gt"] = *scan.BeginTime
  678. }
  679. if scan.EndTime != nil {
  680. timestampConditions["$lte"] = *scan.EndTime
  681. }
  682. filter["timestamp"] = timestampConditions
  683. }
  684. // pass user id to filter
  685. if scan.BeginUserId != nil || scan.EndUserId != nil {
  686. userIdConditions := bson.M{}
  687. if scan.BeginUserId != nil {
  688. userIdConditions["$gte"] = *scan.BeginUserId
  689. }
  690. if scan.EndUserId != nil {
  691. userIdConditions["$lte"] = *scan.EndUserId
  692. }
  693. filter["feedbackkey.userid"] = userIdConditions
  694. }
  695. r, err := c.Find(ctx, filter, opt)
  696. if err != nil {
  697. errChan <- errors.Trace(err)
  698. return
  699. }
  700. feedbacks := make([]Feedback, 0, batchSize)
  701. defer r.Close(ctx)
  702. for r.Next(ctx) {
  703. var feedback Feedback
  704. if err = r.Decode(&feedback); err != nil {
  705. errChan <- errors.Trace(err)
  706. return
  707. }
  708. feedbacks = append(feedbacks, feedback)
  709. if len(feedbacks) == batchSize {
  710. feedbackChan <- feedbacks
  711. feedbacks = make([]Feedback, 0, batchSize)
  712. }
  713. }
  714. if len(feedbacks) > 0 {
  715. feedbackChan <- feedbacks
  716. }
  717. errChan <- nil
  718. }()
  719. return feedbackChan, errChan
  720. }
  721. // GetUserItemFeedback returns a feedback return the user id and item id from MongoDB.
  722. func (db *MongoDB) GetUserItemFeedback(ctx context.Context, userId, itemId string, feedbackTypes ...string) ([]Feedback, error) {
  723. c := db.client.Database(db.dbName).Collection(db.FeedbackTable())
  724. var filter = bson.M{
  725. "feedbackkey.userid": bson.M{"$eq": userId},
  726. "feedbackkey.itemid": bson.M{"$eq": itemId},
  727. }
  728. if len(feedbackTypes) > 0 {
  729. filter["feedbackkey.feedbacktype"] = bson.M{"$in": feedbackTypes}
  730. }
  731. r, err := c.Find(ctx, filter)
  732. if err != nil {
  733. return nil, err
  734. }
  735. feedbacks := make([]Feedback, 0)
  736. defer r.Close(ctx)
  737. for r.Next(ctx) {
  738. var feedback Feedback
  739. if err = r.Decode(&feedback); err != nil {
  740. return nil, err
  741. }
  742. feedbacks = append(feedbacks, feedback)
  743. }
  744. return feedbacks, nil
  745. }
  746. // DeleteUserItemFeedback deletes a feedback return the user id and item id from MongoDB.
  747. func (db *MongoDB) DeleteUserItemFeedback(ctx context.Context, userId, itemId string, feedbackTypes ...string) (int, error) {
  748. c := db.client.Database(db.dbName).Collection(db.FeedbackTable())
  749. var filter = bson.M{
  750. "feedbackkey.userid": bson.M{"$eq": userId},
  751. "feedbackkey.itemid": bson.M{"$eq": itemId},
  752. }
  753. if len(feedbackTypes) > 0 {
  754. filter["feedbackkey.feedbacktype"] = bson.M{"$in": feedbackTypes}
  755. }
  756. r, err := c.DeleteMany(ctx, filter)
  757. if err != nil {
  758. return 0, err
  759. }
  760. return int(r.DeletedCount), nil
  761. }