tasks_test.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684
  1. // Copyright 2020 gorse Project Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package master
  15. import (
  16. "context"
  17. "runtime"
  18. "strconv"
  19. "time"
  20. "github.com/samber/lo"
  21. "github.com/zhenghaoz/gorse/config"
  22. "github.com/zhenghaoz/gorse/storage/cache"
  23. "github.com/zhenghaoz/gorse/storage/data"
  24. )
  25. func (s *MasterTestSuite) TestFindItemNeighborsBruteForce() {
  26. ctx := context.Background()
  27. // create config
  28. s.Config = &config.Config{}
  29. s.Config.Recommend.CacheSize = 3
  30. s.Config.Master.NumJobs = 4
  31. // collect similar
  32. items := []data.Item{
  33. {ItemId: "0", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{"a", "b", "c", "d"}, Comment: ""},
  34. {ItemId: "1", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{}, Comment: ""},
  35. {ItemId: "2", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{"b", "c", "d"}, Comment: ""},
  36. {ItemId: "3", IsHidden: false, Categories: nil, Timestamp: time.Now(), Labels: []string{}, Comment: ""},
  37. {ItemId: "4", IsHidden: false, Categories: nil, Timestamp: time.Now(), Labels: []string{"b", "c"}, Comment: ""},
  38. {ItemId: "5", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{}, Comment: ""},
  39. {ItemId: "6", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{"c"}, Comment: ""},
  40. {ItemId: "7", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{}, Comment: ""},
  41. {ItemId: "8", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{"a", "b", "c", "d", "e"}, Comment: ""},
  42. {ItemId: "9", IsHidden: false, Categories: nil, Timestamp: time.Now(), Labels: []string{}, Comment: ""},
  43. }
  44. feedbacks := make([]data.Feedback, 0)
  45. for i := 0; i < 10; i++ {
  46. for j := 0; j <= i; j++ {
  47. if i%2 == 1 {
  48. feedbacks = append(feedbacks, data.Feedback{
  49. FeedbackKey: data.FeedbackKey{
  50. ItemId: strconv.Itoa(i),
  51. UserId: strconv.Itoa(j),
  52. FeedbackType: "FeedbackType",
  53. },
  54. Timestamp: time.Now(),
  55. })
  56. }
  57. }
  58. }
  59. var err error
  60. err = s.DataClient.BatchInsertItems(ctx, items)
  61. s.NoError(err)
  62. err = s.DataClient.BatchInsertFeedback(ctx, feedbacks, true, true, true)
  63. s.NoError(err)
  64. // insert hidden item
  65. err = s.DataClient.BatchInsertItems(ctx, []data.Item{{
  66. ItemId: "10",
  67. Labels: []string{"a", "b", "c", "d", "e"},
  68. IsHidden: true,
  69. }})
  70. s.NoError(err)
  71. for i := 0; i <= 10; i++ {
  72. err = s.DataClient.BatchInsertFeedback(ctx, []data.Feedback{{
  73. FeedbackKey: data.FeedbackKey{UserId: strconv.Itoa(i), ItemId: "10", FeedbackType: "FeedbackType"},
  74. }}, true, true, true)
  75. s.NoError(err)
  76. }
  77. // load mock dataset
  78. dataset, _, _, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"}, nil, 0, 0, NewOnlineEvaluator())
  79. s.NoError(err)
  80. s.rankingTrainSet = dataset
  81. // similar items (common users)
  82. s.Config.Recommend.ItemNeighbors.NeighborType = config.NeighborTypeRelated
  83. neighborTask := NewFindItemNeighborsTask(&s.Master)
  84. s.NoError(neighborTask.run(context.Background(), nil))
  85. similar, err := s.CacheClient.SearchDocuments(ctx, cache.ItemNeighbors, "9", []string{""}, 0, 100)
  86. s.NoError(err)
  87. s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar))
  88. // similar items in category (common users)
  89. similar, err = s.CacheClient.SearchDocuments(ctx, cache.ItemNeighbors, "9", []string{"*"}, 0, 100)
  90. s.NoError(err)
  91. s.Equal([]string{"7", "5", "1"}, cache.ConvertDocumentsToValues(similar))
  92. // similar items (common labels)
  93. err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyItemTime, "8"), time.Now()))
  94. s.NoError(err)
  95. s.Config.Recommend.ItemNeighbors.NeighborType = config.NeighborTypeSimilar
  96. neighborTask = NewFindItemNeighborsTask(&s.Master)
  97. s.NoError(neighborTask.run(context.Background(), nil))
  98. similar, err = s.CacheClient.SearchDocuments(ctx, cache.ItemNeighbors, "8", []string{""}, 0, 100)
  99. s.NoError(err)
  100. s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar))
  101. // similar items in category (common labels)
  102. similar, err = s.CacheClient.SearchDocuments(ctx, cache.ItemNeighbors, "8", []string{"*"}, 0, 100)
  103. s.NoError(err)
  104. s.Equal([]string{"0", "2", "6"}, cache.ConvertDocumentsToValues(similar))
  105. // similar items (auto)
  106. err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyItemTime, "8"), time.Now()))
  107. s.NoError(err)
  108. err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyItemTime, "9"), time.Now()))
  109. s.NoError(err)
  110. s.Config.Recommend.ItemNeighbors.NeighborType = config.NeighborTypeAuto
  111. neighborTask = NewFindItemNeighborsTask(&s.Master)
  112. s.NoError(neighborTask.run(context.Background(), nil))
  113. similar, err = s.CacheClient.SearchDocuments(ctx, cache.ItemNeighbors, "8", []string{""}, 0, 100)
  114. s.NoError(err)
  115. s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar))
  116. similar, err = s.CacheClient.SearchDocuments(ctx, cache.ItemNeighbors, "9", []string{""}, 0, 100)
  117. s.NoError(err)
  118. s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar))
  119. }
  120. func (s *MasterTestSuite) TestFindItemNeighborsIVF() {
  121. // create mock master
  122. ctx := context.Background()
  123. // create config
  124. s.Config = &config.Config{}
  125. s.Config.Recommend.CacheSize = 3
  126. s.Config.Master.NumJobs = 4
  127. s.Config.Recommend.ItemNeighbors.EnableIndex = true
  128. s.Config.Recommend.ItemNeighbors.IndexRecall = 1
  129. s.Config.Recommend.ItemNeighbors.IndexFitEpoch = 10
  130. // collect similar
  131. items := []data.Item{
  132. {ItemId: "0", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{"a", "b", "c", "d"}, Comment: ""},
  133. {ItemId: "1", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{}, Comment: ""},
  134. {ItemId: "2", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{"b", "c", "d"}, Comment: ""},
  135. {ItemId: "3", IsHidden: false, Categories: nil, Timestamp: time.Now(), Labels: []string{}, Comment: ""},
  136. {ItemId: "4", IsHidden: false, Categories: nil, Timestamp: time.Now(), Labels: []string{"b", "c"}, Comment: ""},
  137. {ItemId: "5", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{}, Comment: ""},
  138. {ItemId: "6", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{"c"}, Comment: ""},
  139. {ItemId: "7", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{}, Comment: ""},
  140. {ItemId: "8", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{"a", "b", "c", "d", "e"}, Comment: ""},
  141. {ItemId: "9", IsHidden: false, Categories: nil, Timestamp: time.Now(), Labels: []string{}, Comment: ""},
  142. }
  143. feedbacks := make([]data.Feedback, 0)
  144. for i := 0; i < 10; i++ {
  145. for j := 0; j <= i; j++ {
  146. if i%2 == 1 {
  147. feedbacks = append(feedbacks, data.Feedback{
  148. FeedbackKey: data.FeedbackKey{
  149. ItemId: strconv.Itoa(i),
  150. UserId: strconv.Itoa(j),
  151. FeedbackType: "FeedbackType",
  152. },
  153. Timestamp: time.Now(),
  154. })
  155. }
  156. }
  157. }
  158. var err error
  159. err = s.DataClient.BatchInsertItems(ctx, items)
  160. s.NoError(err)
  161. err = s.DataClient.BatchInsertFeedback(ctx, feedbacks, true, true, true)
  162. s.NoError(err)
  163. // insert hidden item
  164. err = s.DataClient.BatchInsertItems(ctx, []data.Item{{
  165. ItemId: "10",
  166. Labels: []string{"a", "b", "c", "d", "e"},
  167. IsHidden: true,
  168. }})
  169. s.NoError(err)
  170. for i := 0; i <= 10; i++ {
  171. err = s.DataClient.BatchInsertFeedback(ctx, []data.Feedback{{
  172. FeedbackKey: data.FeedbackKey{UserId: strconv.Itoa(i), ItemId: "10", FeedbackType: "FeedbackType"},
  173. }}, true, true, true)
  174. s.NoError(err)
  175. }
  176. // load mock dataset
  177. dataset, _, _, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"}, nil, 0, 0, NewOnlineEvaluator())
  178. s.NoError(err)
  179. s.rankingTrainSet = dataset
  180. // similar items (common users)
  181. s.Config.Recommend.ItemNeighbors.NeighborType = config.NeighborTypeRelated
  182. neighborTask := NewFindItemNeighborsTask(&s.Master)
  183. s.NoError(neighborTask.run(context.Background(), nil))
  184. similar, err := s.CacheClient.SearchDocuments(ctx, cache.ItemNeighbors, "9", []string{""}, 0, 100)
  185. s.NoError(err)
  186. s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar))
  187. // similar items in category (common users)
  188. similar, err = s.CacheClient.SearchDocuments(ctx, cache.ItemNeighbors, "9", []string{"*"}, 0, 100)
  189. s.NoError(err)
  190. s.Equal([]string{"7", "5", "1"}, cache.ConvertDocumentsToValues(similar))
  191. // similar items (common labels)
  192. err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyItemTime, "8"), time.Now()))
  193. s.NoError(err)
  194. s.Config.Recommend.ItemNeighbors.NeighborType = config.NeighborTypeSimilar
  195. neighborTask = NewFindItemNeighborsTask(&s.Master)
  196. s.NoError(neighborTask.run(context.Background(), nil))
  197. similar, err = s.CacheClient.SearchDocuments(ctx, cache.ItemNeighbors, "8", []string{""}, 0, 100)
  198. s.NoError(err)
  199. s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar))
  200. // similar items in category (common labels)
  201. similar, err = s.CacheClient.SearchDocuments(ctx, cache.ItemNeighbors, "8", []string{"*"}, 0, 100)
  202. s.NoError(err)
  203. s.Equal([]string{"0", "2", "6"}, cache.ConvertDocumentsToValues(similar))
  204. // similar items (auto)
  205. err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyItemTime, "8"), time.Now()))
  206. s.NoError(err)
  207. err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyItemTime, "9"), time.Now()))
  208. s.NoError(err)
  209. s.Config.Recommend.ItemNeighbors.NeighborType = config.NeighborTypeAuto
  210. neighborTask = NewFindItemNeighborsTask(&s.Master)
  211. s.NoError(neighborTask.run(context.Background(), nil))
  212. similar, err = s.CacheClient.SearchDocuments(ctx, cache.ItemNeighbors, "8", []string{""}, 0, 100)
  213. s.NoError(err)
  214. s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar))
  215. similar, err = s.CacheClient.SearchDocuments(ctx, cache.ItemNeighbors, "9", []string{""}, 0, 100)
  216. s.NoError(err)
  217. s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar))
  218. }
  219. func (s *MasterTestSuite) TestFindItemNeighborsIVF_ZeroIDF() {
  220. ctx := context.Background()
  221. // create config
  222. s.Config = &config.Config{}
  223. s.Config.Recommend.CacheSize = 3
  224. s.Config.Master.NumJobs = 4
  225. s.Config.Recommend.ItemNeighbors.EnableIndex = true
  226. s.Config.Recommend.ItemNeighbors.IndexRecall = 1
  227. s.Config.Recommend.ItemNeighbors.IndexFitEpoch = 10
  228. // create dataset
  229. err := s.DataClient.BatchInsertItems(ctx, []data.Item{
  230. {ItemId: "0", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{"a", "a"}, Comment: ""},
  231. {ItemId: "1", IsHidden: false, Categories: []string{"*"}, Timestamp: time.Now(), Labels: []string{"a", "a"}, Comment: ""},
  232. })
  233. s.NoError(err)
  234. err = s.DataClient.BatchInsertFeedback(ctx, []data.Feedback{
  235. {FeedbackKey: data.FeedbackKey{FeedbackType: "FeedbackType", UserId: "0", ItemId: "0"}},
  236. {FeedbackKey: data.FeedbackKey{FeedbackType: "FeedbackType", UserId: "0", ItemId: "1"}},
  237. }, true, true, true)
  238. s.NoError(err)
  239. dataset, _, _, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"}, nil, 0, 0, NewOnlineEvaluator())
  240. s.NoError(err)
  241. s.rankingTrainSet = dataset
  242. // similar items (common users)
  243. s.Config.Recommend.ItemNeighbors.NeighborType = config.NeighborTypeRelated
  244. neighborTask := NewFindItemNeighborsTask(&s.Master)
  245. s.NoError(neighborTask.run(context.Background(), nil))
  246. similar, err := s.CacheClient.SearchDocuments(ctx, cache.ItemNeighbors, "0", []string{""}, 0, 100)
  247. s.NoError(err)
  248. s.Equal([]string{"1"}, cache.ConvertDocumentsToValues(similar))
  249. // similar items (common labels)
  250. s.Config.Recommend.ItemNeighbors.NeighborType = config.NeighborTypeSimilar
  251. neighborTask = NewFindItemNeighborsTask(&s.Master)
  252. s.NoError(neighborTask.run(context.Background(), nil))
  253. similar, err = s.CacheClient.SearchDocuments(ctx, cache.ItemNeighbors, "0", []string{""}, 0, 100)
  254. s.NoError(err)
  255. s.Equal([]string{"1"}, cache.ConvertDocumentsToValues(similar))
  256. }
  257. func (s *MasterTestSuite) TestFindUserNeighborsBruteForce() {
  258. ctx := context.Background()
  259. // create config
  260. s.Config = &config.Config{}
  261. s.Config.Recommend.CacheSize = 3
  262. s.Config.Master.NumJobs = 4
  263. // collect similar
  264. users := []data.User{
  265. {UserId: "0", Labels: []string{"a", "b", "c", "d"}, Subscribe: nil, Comment: ""},
  266. {UserId: "1", Labels: []string{}, Subscribe: nil, Comment: ""},
  267. {UserId: "2", Labels: []string{"b", "c", "d"}, Subscribe: nil, Comment: ""},
  268. {UserId: "3", Labels: []string{}, Subscribe: nil, Comment: ""},
  269. {UserId: "4", Labels: []string{"b", "c"}, Subscribe: nil, Comment: ""},
  270. {UserId: "5", Labels: []string{}, Subscribe: nil, Comment: ""},
  271. {UserId: "6", Labels: []string{"c"}, Subscribe: nil, Comment: ""},
  272. {UserId: "7", Labels: []string{}, Subscribe: nil, Comment: ""},
  273. {UserId: "8", Labels: []string{"a", "b", "c", "d", "e"}, Subscribe: nil, Comment: ""},
  274. {UserId: "9", Labels: []string{}, Subscribe: nil, Comment: ""},
  275. }
  276. feedbacks := make([]data.Feedback, 0)
  277. for i := 0; i < 10; i++ {
  278. for j := 0; j <= i; j++ {
  279. if i%2 == 1 {
  280. feedbacks = append(feedbacks, data.Feedback{
  281. FeedbackKey: data.FeedbackKey{
  282. ItemId: strconv.Itoa(j),
  283. UserId: strconv.Itoa(i),
  284. FeedbackType: "FeedbackType",
  285. },
  286. Timestamp: time.Now(),
  287. })
  288. }
  289. }
  290. }
  291. var err error
  292. err = s.DataClient.BatchInsertUsers(ctx, users)
  293. s.NoError(err)
  294. err = s.DataClient.BatchInsertFeedback(ctx, feedbacks, true, true, true)
  295. s.NoError(err)
  296. dataset, _, _, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"}, nil, 0, 0, NewOnlineEvaluator())
  297. s.NoError(err)
  298. s.rankingTrainSet = dataset
  299. // similar items (common users)
  300. s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeRelated
  301. neighborTask := NewFindUserNeighborsTask(&s.Master)
  302. s.NoError(neighborTask.run(context.Background(), nil))
  303. similar, err := s.CacheClient.SearchDocuments(ctx, cache.UserNeighbors, "9", []string{""}, 0, 100)
  304. s.NoError(err)
  305. s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar))
  306. // similar items (common labels)
  307. err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyUserTime, "8"), time.Now()))
  308. s.NoError(err)
  309. s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeSimilar
  310. neighborTask = NewFindUserNeighborsTask(&s.Master)
  311. s.NoError(neighborTask.run(context.Background(), nil))
  312. similar, err = s.CacheClient.SearchDocuments(ctx, cache.UserNeighbors, "8", []string{""}, 0, 100)
  313. s.NoError(err)
  314. s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar))
  315. // similar items (auto)
  316. err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyUserTime, "8"), time.Now()))
  317. s.NoError(err)
  318. err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyUserTime, "9"), time.Now()))
  319. s.NoError(err)
  320. s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeAuto
  321. neighborTask = NewFindUserNeighborsTask(&s.Master)
  322. s.NoError(neighborTask.run(context.Background(), nil))
  323. similar, err = s.CacheClient.SearchDocuments(ctx, cache.UserNeighbors, "8", []string{""}, 0, 100)
  324. s.NoError(err)
  325. s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar))
  326. similar, err = s.CacheClient.SearchDocuments(ctx, cache.UserNeighbors, "9", []string{""}, 0, 100)
  327. s.NoError(err)
  328. s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar))
  329. }
  330. func (s *MasterTestSuite) TestFindUserNeighborsIVF() {
  331. ctx := context.Background()
  332. // create config
  333. s.Config = &config.Config{}
  334. s.Config.Recommend.CacheSize = 3
  335. s.Config.Master.NumJobs = 4
  336. s.Config.Recommend.UserNeighbors.EnableIndex = true
  337. s.Config.Recommend.UserNeighbors.IndexRecall = 1
  338. s.Config.Recommend.UserNeighbors.IndexFitEpoch = 10
  339. // collect similar
  340. users := []data.User{
  341. {UserId: "0", Labels: []string{"a", "b", "c", "d"}, Subscribe: nil, Comment: ""},
  342. {UserId: "1", Labels: []string{}, Subscribe: nil, Comment: ""},
  343. {UserId: "2", Labels: []string{"b", "c", "d"}, Subscribe: nil, Comment: ""},
  344. {UserId: "3", Labels: []string{}, Subscribe: nil, Comment: ""},
  345. {UserId: "4", Labels: []string{"b", "c"}, Subscribe: nil, Comment: ""},
  346. {UserId: "5", Labels: []string{}, Subscribe: nil, Comment: ""},
  347. {UserId: "6", Labels: []string{"c"}, Subscribe: nil, Comment: ""},
  348. {UserId: "7", Labels: []string{}, Subscribe: nil, Comment: ""},
  349. {UserId: "8", Labels: []string{"a", "b", "c", "d", "e"}, Subscribe: nil, Comment: ""},
  350. {UserId: "9", Labels: []string{}, Subscribe: nil, Comment: ""},
  351. }
  352. feedbacks := make([]data.Feedback, 0)
  353. for i := 0; i < 10; i++ {
  354. for j := 0; j <= i; j++ {
  355. if i%2 == 1 {
  356. feedbacks = append(feedbacks, data.Feedback{
  357. FeedbackKey: data.FeedbackKey{
  358. ItemId: strconv.Itoa(j),
  359. UserId: strconv.Itoa(i),
  360. FeedbackType: "FeedbackType",
  361. },
  362. Timestamp: time.Now(),
  363. })
  364. }
  365. }
  366. }
  367. var err error
  368. err = s.DataClient.BatchInsertUsers(ctx, users)
  369. s.NoError(err)
  370. err = s.DataClient.BatchInsertFeedback(ctx, feedbacks, true, true, true)
  371. s.NoError(err)
  372. dataset, _, _, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"}, nil, 0, 0, NewOnlineEvaluator())
  373. s.NoError(err)
  374. s.rankingTrainSet = dataset
  375. // similar items (common users)
  376. s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeRelated
  377. neighborTask := NewFindUserNeighborsTask(&s.Master)
  378. s.NoError(neighborTask.run(context.Background(), nil))
  379. similar, err := s.CacheClient.SearchDocuments(ctx, cache.UserNeighbors, "9", []string{""}, 0, 100)
  380. s.NoError(err)
  381. s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar))
  382. // similar items (common labels)
  383. err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyUserTime, "8"), time.Now()))
  384. s.NoError(err)
  385. s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeSimilar
  386. neighborTask = NewFindUserNeighborsTask(&s.Master)
  387. s.NoError(neighborTask.run(context.Background(), nil))
  388. similar, err = s.CacheClient.SearchDocuments(ctx, cache.UserNeighbors, "8", []string{""}, 0, 100)
  389. s.NoError(err)
  390. s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar))
  391. // similar items (auto)
  392. err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyUserTime, "8"), time.Now()))
  393. s.NoError(err)
  394. err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyUserTime, "9"), time.Now()))
  395. s.NoError(err)
  396. s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeAuto
  397. neighborTask = NewFindUserNeighborsTask(&s.Master)
  398. s.NoError(neighborTask.run(context.Background(), nil))
  399. similar, err = s.CacheClient.SearchDocuments(ctx, cache.UserNeighbors, "8", []string{""}, 0, 100)
  400. s.NoError(err)
  401. s.Equal([]string{"0", "2", "4"}, cache.ConvertDocumentsToValues(similar))
  402. similar, err = s.CacheClient.SearchDocuments(ctx, cache.UserNeighbors, "9", []string{""}, 0, 100)
  403. s.NoError(err)
  404. s.Equal([]string{"7", "5", "3"}, cache.ConvertDocumentsToValues(similar))
  405. }
  406. func (s *MasterTestSuite) TestFindUserNeighborsIVF_ZeroIDF() {
  407. ctx := context.Background()
  408. // create config
  409. s.Config = &config.Config{}
  410. s.Config.Recommend.CacheSize = 3
  411. s.Config.Master.NumJobs = 4
  412. s.Config.Recommend.UserNeighbors.EnableIndex = true
  413. s.Config.Recommend.UserNeighbors.IndexRecall = 1
  414. s.Config.Recommend.UserNeighbors.IndexFitEpoch = 10
  415. // create dataset
  416. err := s.DataClient.BatchInsertUsers(ctx, []data.User{
  417. {UserId: "0", Labels: []string{"a", "a"}, Subscribe: nil, Comment: ""},
  418. {UserId: "1", Labels: []string{"a", "a"}, Subscribe: nil, Comment: ""},
  419. })
  420. s.NoError(err)
  421. err = s.DataClient.BatchInsertFeedback(ctx, []data.Feedback{
  422. {FeedbackKey: data.FeedbackKey{FeedbackType: "FeedbackType", UserId: "0", ItemId: "0"}},
  423. {FeedbackKey: data.FeedbackKey{FeedbackType: "FeedbackType", UserId: "1", ItemId: "0"}},
  424. }, true, true, true)
  425. s.NoError(err)
  426. dataset, _, _, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"}, nil, 0, 0, NewOnlineEvaluator())
  427. s.NoError(err)
  428. s.rankingTrainSet = dataset
  429. // similar users (common items)
  430. s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeRelated
  431. neighborTask := NewFindUserNeighborsTask(&s.Master)
  432. s.NoError(neighborTask.run(context.Background(), nil))
  433. similar, err := s.CacheClient.SearchDocuments(ctx, cache.UserNeighbors, "0", []string{""}, 0, 100)
  434. s.NoError(err)
  435. s.Equal([]string{"1"}, cache.ConvertDocumentsToValues(similar))
  436. // similar users (common labels)
  437. s.Config.Recommend.UserNeighbors.NeighborType = config.NeighborTypeSimilar
  438. neighborTask = NewFindUserNeighborsTask(&s.Master)
  439. s.NoError(neighborTask.run(context.Background(), nil))
  440. similar, err = s.CacheClient.SearchDocuments(ctx, cache.UserNeighbors, "0", []string{""}, 0, 100)
  441. s.NoError(err)
  442. s.Equal([]string{"1"}, cache.ConvertDocumentsToValues(similar))
  443. }
  444. func (s *MasterTestSuite) TestLoadDataFromDatabase() {
  445. ctx := context.Background()
  446. // create config
  447. s.Config = &config.Config{}
  448. s.Config.Recommend.CacheSize = 3
  449. s.Config.Recommend.DataSource.PositiveFeedbackTypes = []string{"positive"}
  450. s.Config.Recommend.DataSource.ReadFeedbackTypes = []string{"negative"}
  451. s.Config.Master.NumJobs = runtime.NumCPU()
  452. // insert items
  453. var items []data.Item
  454. for i := 0; i < 9; i++ {
  455. items = append(items, data.Item{
  456. ItemId: strconv.Itoa(i),
  457. Timestamp: time.Date(2000+i, 1, 1, 1, 1, 0, 0, time.UTC),
  458. Labels: []string{strconv.Itoa(i % 3), strconv.Itoa(i*10 + 10)},
  459. Categories: []string{strconv.Itoa(i % 3)},
  460. })
  461. }
  462. err := s.DataClient.BatchInsertItems(ctx, items)
  463. s.NoError(err)
  464. err = s.DataClient.BatchInsertItems(ctx, []data.Item{{
  465. ItemId: "9",
  466. Timestamp: time.Date(2020, 1, 1, 1, 1, 0, 0, time.UTC),
  467. IsHidden: true,
  468. }})
  469. s.NoError(err)
  470. // insert users
  471. var users []data.User
  472. for i := 0; i <= 10; i++ {
  473. users = append(users, data.User{
  474. UserId: strconv.Itoa(i),
  475. Labels: []string{strconv.Itoa(i % 5), strconv.Itoa(i*10 + 10)},
  476. })
  477. }
  478. err = s.DataClient.BatchInsertUsers(ctx, users)
  479. s.NoError(err)
  480. // insert feedback
  481. feedbacks := make([]data.Feedback, 0)
  482. for i := 0; i < 10; i++ {
  483. // positive feedback
  484. // item 0: user 0
  485. // ...
  486. // item 9: user 0 ... user 9
  487. for j := 0; j <= i; j++ {
  488. feedbacks = append(feedbacks, data.Feedback{
  489. FeedbackKey: data.FeedbackKey{
  490. ItemId: strconv.Itoa(i),
  491. UserId: strconv.Itoa(j),
  492. FeedbackType: "positive",
  493. },
  494. Timestamp: time.Now(),
  495. })
  496. }
  497. // negative feedback
  498. // item 0: user 1 .. user 10
  499. // ...
  500. // item 9: user 10
  501. for j := i + 1; j < 11; j++ {
  502. feedbacks = append(feedbacks, data.Feedback{
  503. FeedbackKey: data.FeedbackKey{
  504. ItemId: strconv.Itoa(i),
  505. UserId: strconv.Itoa(j),
  506. FeedbackType: "negative",
  507. },
  508. Timestamp: time.Now(),
  509. })
  510. }
  511. }
  512. err = s.DataClient.BatchInsertFeedback(ctx, feedbacks, false, false, true)
  513. s.NoError(err)
  514. // load dataset
  515. err = s.runLoadDatasetTask()
  516. s.NoError(err)
  517. s.Equal(11, s.rankingTrainSet.UserCount())
  518. s.Equal(10, s.rankingTrainSet.ItemCount())
  519. s.Equal(11, s.rankingTestSet.UserCount())
  520. s.Equal(10, s.rankingTestSet.ItemCount())
  521. s.Equal(55, s.rankingTrainSet.Count()+s.rankingTestSet.Count())
  522. s.Equal(11, s.clickTrainSet.UserCount())
  523. s.Equal(10, s.clickTrainSet.ItemCount())
  524. s.Equal(11, s.clickTestSet.UserCount())
  525. s.Equal(10, s.clickTestSet.ItemCount())
  526. s.Equal(int32(3), s.clickTrainSet.Index.CountItemLabels())
  527. s.Equal(int32(5), s.clickTrainSet.Index.CountUserLabels())
  528. s.Equal(int32(3), s.clickTestSet.Index.CountItemLabels())
  529. s.Equal(int32(5), s.clickTestSet.Index.CountUserLabels())
  530. s.Equal(90, s.clickTrainSet.Count()+s.clickTestSet.Count())
  531. s.Equal(45, s.clickTrainSet.PositiveCount+s.clickTestSet.PositiveCount)
  532. s.Equal(45, s.clickTrainSet.NegativeCount+s.clickTestSet.NegativeCount)
  533. // check latest items
  534. latest, err := s.CacheClient.SearchDocuments(ctx, cache.LatestItems, "", []string{""}, 0, 100)
  535. s.NoError(err)
  536. s.Equal([]cache.Document{
  537. {Id: items[8].ItemId, Score: float64(items[8].Timestamp.Unix())},
  538. {Id: items[7].ItemId, Score: float64(items[7].Timestamp.Unix())},
  539. {Id: items[6].ItemId, Score: float64(items[6].Timestamp.Unix())},
  540. }, lo.Map(latest, func(document cache.Document, _ int) cache.Document {
  541. return cache.Document{Id: document.Id, Score: document.Score}
  542. }))
  543. latest, err = s.CacheClient.SearchDocuments(ctx, cache.LatestItems, "", []string{"2"}, 0, 100)
  544. s.NoError(err)
  545. s.Equal([]cache.Document{
  546. {Id: items[8].ItemId, Score: float64(items[8].Timestamp.Unix())},
  547. {Id: items[5].ItemId, Score: float64(items[5].Timestamp.Unix())},
  548. {Id: items[2].ItemId, Score: float64(items[2].Timestamp.Unix())},
  549. }, lo.Map(latest, func(document cache.Document, _ int) cache.Document {
  550. return cache.Document{Id: document.Id, Score: document.Score}
  551. }))
  552. // check popular items
  553. popular, err := s.CacheClient.SearchDocuments(ctx, cache.PopularItems, "", []string{""}, 0, 3)
  554. s.NoError(err)
  555. s.Equal([]cache.Document{
  556. {Id: items[8].ItemId, Score: 9},
  557. {Id: items[7].ItemId, Score: 8},
  558. {Id: items[6].ItemId, Score: 7},
  559. }, lo.Map(popular, func(document cache.Document, _ int) cache.Document {
  560. return cache.Document{Id: document.Id, Score: document.Score}
  561. }))
  562. popular, err = s.CacheClient.SearchDocuments(ctx, cache.PopularItems, "", []string{"2"}, 0, 3)
  563. s.NoError(err)
  564. s.Equal([]cache.Document{
  565. {Id: items[8].ItemId, Score: 9},
  566. {Id: items[5].ItemId, Score: 6},
  567. {Id: items[2].ItemId, Score: 3},
  568. }, lo.Map(popular, func(document cache.Document, _ int) cache.Document {
  569. return cache.Document{Id: document.Id, Score: document.Score}
  570. }))
  571. // check categories
  572. categories, err := s.CacheClient.GetSet(ctx, cache.ItemCategories)
  573. s.NoError(err)
  574. s.Equal([]string{"0", "1", "2"}, categories)
  575. }
  576. func (s *MasterTestSuite) TestCheckItemNeighborCacheTimeout() {
  577. s.Config = config.GetDefaultConfig()
  578. ctx := context.Background()
  579. // empty cache
  580. s.True(s.checkItemNeighborCacheTimeout("1", nil))
  581. err := s.CacheClient.AddDocuments(ctx, cache.ItemNeighbors, "1", []cache.Document{
  582. {Id: "2", Score: 1, Categories: []string{""}},
  583. {Id: "3", Score: 2, Categories: []string{""}},
  584. {Id: "4", Score: 3, Categories: []string{""}},
  585. })
  586. s.NoError(err)
  587. // digest mismatch
  588. err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.ItemNeighborsDigest, "1"), "digest"))
  589. s.NoError(err)
  590. s.True(s.checkItemNeighborCacheTimeout("1", nil))
  591. // staled cache
  592. err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.ItemNeighborsDigest, "1"), s.Config.ItemNeighborDigest()))
  593. s.NoError(err)
  594. s.True(s.checkItemNeighborCacheTimeout("1", nil))
  595. err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyItemTime, "1"), time.Now().Add(-time.Minute)))
  596. s.NoError(err)
  597. s.True(s.checkItemNeighborCacheTimeout("1", nil))
  598. err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastUpdateItemNeighborsTime, "1"), time.Now().Add(-time.Hour)))
  599. s.NoError(err)
  600. s.True(s.checkItemNeighborCacheTimeout("1", nil))
  601. // not staled cache
  602. err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastUpdateItemNeighborsTime, "1"), time.Now()))
  603. s.NoError(err)
  604. s.False(s.checkItemNeighborCacheTimeout("1", nil))
  605. }
  606. func (s *MasterTestSuite) TestCheckUserNeighborCacheTimeout() {
  607. ctx := context.Background()
  608. s.Config = config.GetDefaultConfig()
  609. // empty cache
  610. s.True(s.checkUserNeighborCacheTimeout("1"))
  611. err := s.CacheClient.AddDocuments(ctx, cache.UserNeighbors, "1", []cache.Document{
  612. {Id: "1", Score: 1, Categories: []string{""}},
  613. {Id: "2", Score: 2, Categories: []string{""}},
  614. {Id: "3", Score: 3, Categories: []string{""}},
  615. })
  616. s.NoError(err)
  617. // digest mismatch
  618. err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.UserNeighborsDigest, "1"), "digest"))
  619. s.NoError(err)
  620. s.True(s.checkUserNeighborCacheTimeout("1"))
  621. // staled cache
  622. err = s.CacheClient.Set(ctx, cache.String(cache.Key(cache.UserNeighborsDigest, "1"), s.Config.UserNeighborDigest()))
  623. s.NoError(err)
  624. s.True(s.checkUserNeighborCacheTimeout("1"))
  625. err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyUserTime, "1"), time.Now().Add(-time.Minute)))
  626. s.NoError(err)
  627. s.True(s.checkUserNeighborCacheTimeout("1"))
  628. err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastUpdateUserNeighborsTime, "1"), time.Now().Add(-time.Hour)))
  629. s.NoError(err)
  630. s.True(s.checkUserNeighborCacheTimeout("1"))
  631. // not staled cache
  632. err = s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastUpdateUserNeighborsTime, "1"), time.Now()))
  633. s.NoError(err)
  634. s.False(s.checkUserNeighborCacheTimeout("1"))
  635. }