worker_test.go 43 KB


  1. // Copyright 2020 gorse Project Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package worker
  15. import (
  16. "bytes"
  17. "context"
  18. "encoding/json"
  19. "fmt"
  20. "io"
  21. "math/rand"
  22. "net"
  23. "net/http"
  24. "net/http/httptest"
  25. "strconv"
  26. "testing"
  27. "time"
  28. "github.com/bits-and-blooms/bitset"
  29. mapset "github.com/deckarep/golang-set/v2"
  30. "github.com/samber/lo"
  31. "github.com/stretchr/testify/assert"
  32. "github.com/stretchr/testify/suite"
  33. "github.com/thoas/go-funk"
  34. "github.com/zhenghaoz/gorse/base"
  35. "github.com/zhenghaoz/gorse/base/parallel"
  36. "github.com/zhenghaoz/gorse/base/progress"
  37. "github.com/zhenghaoz/gorse/config"
  38. "github.com/zhenghaoz/gorse/model"
  39. "github.com/zhenghaoz/gorse/model/click"
  40. "github.com/zhenghaoz/gorse/model/ranking"
  41. "github.com/zhenghaoz/gorse/protocol"
  42. "github.com/zhenghaoz/gorse/storage/cache"
  43. "github.com/zhenghaoz/gorse/storage/data"
  44. "google.golang.org/grpc"
  45. "google.golang.org/grpc/credentials/insecure"
  46. "google.golang.org/protobuf/proto"
  47. )
  48. type WorkerTestSuite struct {
  49. suite.Suite
  50. Worker
  51. }
  52. func (suite *WorkerTestSuite) SetupSuite() {
  53. // open database
  54. var err error
  55. suite.tracer = progress.NewTracer("test")
  56. suite.Settings = config.NewSettings()
  57. suite.DataClient, err = data.Open(fmt.Sprintf("sqlite://%s/data.db", suite.T().TempDir()), "")
  58. suite.NoError(err)
  59. suite.CacheClient, err = cache.Open(fmt.Sprintf("sqlite://%s/cache.db", suite.T().TempDir()), "")
  60. suite.NoError(err)
  61. // init database
  62. err = suite.DataClient.Init()
  63. suite.NoError(err)
  64. err = suite.CacheClient.Init()
  65. suite.NoError(err)
  66. }
  67. func (suite *WorkerTestSuite) TearDownSuite() {
  68. err := suite.DataClient.Close()
  69. suite.NoError(err)
  70. err = suite.CacheClient.Close()
  71. suite.NoError(err)
  72. }
  73. func (suite *WorkerTestSuite) SetupTest() {
  74. err := suite.DataClient.Purge()
  75. suite.NoError(err)
  76. err = suite.CacheClient.Purge()
  77. suite.NoError(err)
  78. // configuration
  79. suite.Config = config.GetDefaultConfig()
  80. suite.jobs = 1
  81. // reset random generator
  82. suite.randGenerator = rand.New(rand.NewSource(0))
  83. // reset index
  84. suite.rankingIndex = nil
  85. }
  86. func (suite *WorkerTestSuite) TestPullUsers() {
  87. ctx := context.Background()
  88. // create user index
  89. err := suite.DataClient.BatchInsertUsers(ctx, []data.User{
  90. {UserId: "1"},
  91. {UserId: "2"},
  92. {UserId: "3"},
  93. {UserId: "4"},
  94. {UserId: "5"},
  95. {UserId: "6"},
  96. {UserId: "7"},
  97. {UserId: "8"},
  98. })
  99. suite.NoError(err)
  100. // create nodes
  101. nodes := []string{"a", "b", "c"}
  102. users, err := suite.pullUsers(nodes, "b")
  103. suite.NoError(err)
  104. suite.Equal([]data.User{{UserId: "1"}, {UserId: "3"}, {UserId: "6"}}, users)
  105. _, err = suite.pullUsers(nodes, "d")
  106. suite.Error(err)
  107. }
  108. func (suite *WorkerTestSuite) TestCheckRecommendCacheTimeout() {
  109. ctx := context.Background()
  110. // empty cache
  111. suite.True(suite.checkRecommendCacheTimeout(ctx, "0", nil))
  112. err := suite.CacheClient.AddDocuments(ctx, cache.OfflineRecommend, "0", []cache.Document{{Id: "0", Score: 0, Categories: []string{""}}})
  113. suite.NoError(err)
  114. // digest mismatch
  115. suite.True(suite.checkRecommendCacheTimeout(ctx, "0", nil))
  116. err = suite.CacheClient.Set(ctx, cache.String(cache.Key(cache.OfflineRecommendDigest, "0"), suite.Config.OfflineRecommendDigest()))
  117. suite.NoError(err)
  118. err = suite.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyUserTime, "0"), time.Now().Add(-time.Hour)))
  119. suite.NoError(err)
  120. suite.True(suite.checkRecommendCacheTimeout(ctx, "0", nil))
  121. err = suite.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastUpdateUserRecommendTime, "0"), time.Now().Add(-time.Hour*100)))
  122. suite.NoError(err)
  123. suite.True(suite.checkRecommendCacheTimeout(ctx, "0", nil))
  124. err = suite.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastUpdateUserRecommendTime, "0"), time.Now().Add(time.Hour*100)))
  125. suite.NoError(err)
  126. suite.False(suite.checkRecommendCacheTimeout(ctx, "0", nil))
  127. err = suite.CacheClient.DeleteDocuments(ctx, []string{cache.OfflineRecommend}, cache.DocumentCondition{Subset: proto.String("0")})
  128. suite.NoError(err)
  129. suite.True(suite.checkRecommendCacheTimeout(ctx, "0", nil))
  130. }
  131. type mockMatrixFactorizationForRecommend struct {
  132. ranking.BaseMatrixFactorization
  133. }
  134. func (m *mockMatrixFactorizationForRecommend) Complexity() int {
  135. panic("implement me")
  136. }
  137. func newMockMatrixFactorizationForRecommend(numUsers, numItems int) *mockMatrixFactorizationForRecommend {
  138. m := new(mockMatrixFactorizationForRecommend)
  139. m.UserIndex = base.NewMapIndex()
  140. m.ItemIndex = base.NewMapIndex()
  141. for i := 0; i < numUsers; i++ {
  142. m.UserIndex.Add(strconv.Itoa(i))
  143. }
  144. for i := 0; i < numItems; i++ {
  145. m.ItemIndex.Add(strconv.Itoa(i))
  146. }
  147. m.UserPredictable = bitset.New(uint(numUsers)).Complement()
  148. m.ItemPredictable = bitset.New(uint(numItems)).Complement()
  149. return m
  150. }
  151. func (m *mockMatrixFactorizationForRecommend) GetUserFactor(_ int32) []float32 {
  152. return []float32{1}
  153. }
  154. func (m *mockMatrixFactorizationForRecommend) GetItemFactor(itemId int32) []float32 {
  155. return []float32{float32(itemId)}
  156. }
  157. func (m *mockMatrixFactorizationForRecommend) Invalid() bool {
  158. return false
  159. }
  160. func (m *mockMatrixFactorizationForRecommend) Fit(_ context.Context, _, _ *ranking.DataSet, _ *ranking.FitConfig) ranking.Score {
  161. panic("implement me")
  162. }
  163. func (m *mockMatrixFactorizationForRecommend) Predict(_, itemId string) float32 {
  164. itemIndex, err := strconv.Atoi(itemId)
  165. if err != nil {
  166. panic(err)
  167. }
  168. return float32(itemIndex)
  169. }
  170. func (m *mockMatrixFactorizationForRecommend) InternalPredict(_, itemId int32) float32 {
  171. return float32(itemId)
  172. }
  173. func (m *mockMatrixFactorizationForRecommend) Clear() {
  174. // do nothing
  175. }
  176. func (m *mockMatrixFactorizationForRecommend) GetParamsGrid(_ bool) model.ParamsGrid {
  177. panic("don't call me")
  178. }
  179. func (suite *WorkerTestSuite) TestRecommendMatrixFactorizationBruteForce() {
  180. ctx := context.Background()
  181. suite.Config.Recommend.Offline.EnableColRecommend = true
  182. suite.Config.Recommend.Collaborative.EnableIndex = false
  183. // insert feedbacks
  184. now := time.Now()
  185. err := suite.DataClient.BatchInsertFeedback(ctx, []data.Feedback{
  186. {FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "9"}, Timestamp: now.Add(-time.Hour)},
  187. {FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "8"}, Timestamp: now.Add(-time.Hour)},
  188. {FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "7"}, Timestamp: now.Add(-time.Hour)},
  189. {FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "6"}, Timestamp: now.Add(-time.Hour)},
  190. {FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "5"}, Timestamp: now.Add(-time.Hour)},
  191. {FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "4"}, Timestamp: now.Add(-time.Hour)},
  192. {FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "3"}, Timestamp: now.Add(time.Hour)},
  193. {FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "2"}, Timestamp: now.Add(time.Hour)},
  194. {FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "1"}, Timestamp: now.Add(time.Hour)},
  195. {FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "0"}, Timestamp: now.Add(time.Hour)},
  196. }, true, true, true)
  197. suite.NoError(err)
  198. // insert hidden items and categorized items
  199. err = suite.DataClient.BatchInsertItems(ctx, []data.Item{
  200. {ItemId: "10", IsHidden: true},
  201. {ItemId: "11", IsHidden: true},
  202. {ItemId: "3", Categories: []string{"*"}},
  203. {ItemId: "1", Categories: []string{"*"}},
  204. })
  205. suite.NoError(err)
  206. // create mock model
  207. suite.RankingModel = newMockMatrixFactorizationForRecommend(1, 12)
  208. suite.Recommend([]data.User{{UserId: "0"}})
  209. // read recommend time
  210. recommendTime, err := suite.CacheClient.Get(ctx, cache.Key(cache.LastUpdateUserRecommendTime, "0")).Time()
  211. suite.NoError(err)
  212. recommends, err := suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "0", []string{""}, 0, -1)
  213. suite.NoError(err)
  214. suite.Equal([]cache.Document{
  215. {Id: "3", Score: 3, Categories: []string{"", "*"}, Timestamp: recommendTime},
  216. {Id: "2", Score: 2, Categories: []string{""}, Timestamp: recommendTime},
  217. {Id: "1", Score: 1, Categories: []string{"", "*"}, Timestamp: recommendTime},
  218. {Id: "0", Score: 0, Categories: []string{""}, Timestamp: recommendTime},
  219. }, recommends)
  220. recommends, err = suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "0", []string{"*"}, 0, -1)
  221. suite.NoError(err)
  222. suite.Equal([]cache.Document{
  223. {Id: "3", Score: 3, Categories: []string{"", "*"}, Timestamp: recommendTime},
  224. {Id: "1", Score: 1, Categories: []string{"", "*"}, Timestamp: recommendTime},
  225. }, recommends)
  226. }
  227. func (suite *WorkerTestSuite) TestRecommendMatrixFactorizationHNSW() {
  228. ctx := context.Background()
  229. suite.Config.Recommend.Offline.EnableColRecommend = true
  230. suite.Config.Recommend.Collaborative.EnableIndex = true
  231. // insert feedbacks
  232. now := time.Now()
  233. err := suite.DataClient.BatchInsertFeedback(ctx, []data.Feedback{
  234. {FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "9"}, Timestamp: now.Add(-time.Hour)},
  235. {FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "8"}, Timestamp: now.Add(-time.Hour)},
  236. {FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "7"}, Timestamp: now.Add(-time.Hour)},
  237. {FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "6"}, Timestamp: now.Add(-time.Hour)},
  238. {FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "5"}, Timestamp: now.Add(-time.Hour)},
  239. {FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "4"}, Timestamp: now.Add(-time.Hour)},
  240. {FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "3"}, Timestamp: now.Add(time.Hour)},
  241. {FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "2"}, Timestamp: now.Add(time.Hour)},
  242. {FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "1"}, Timestamp: now.Add(time.Hour)},
  243. {FeedbackKey: data.FeedbackKey{FeedbackType: "click", UserId: "0", ItemId: "0"}, Timestamp: now.Add(time.Hour)},
  244. }, true, true, true)
  245. suite.NoError(err)
  246. // insert hidden items and categorized items
  247. err = suite.DataClient.BatchInsertItems(ctx, []data.Item{
  248. {ItemId: "10", IsHidden: true},
  249. {ItemId: "11", IsHidden: true},
  250. {ItemId: "3", Categories: []string{"*"}},
  251. {ItemId: "1", Categories: []string{"*"}},
  252. })
  253. suite.NoError(err)
  254. // create mock model
  255. suite.RankingModel = newMockMatrixFactorizationForRecommend(1, 12)
  256. suite.Recommend([]data.User{{UserId: "0"}})
  257. // read recommend time
  258. recommendTime, err := suite.CacheClient.Get(ctx, cache.Key(cache.LastUpdateUserRecommendTime, "0")).Time()
  259. suite.NoError(err)
  260. recommends, err := suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "0", []string{""}, 0, -1)
  261. suite.NoError(err)
  262. suite.Equal([]cache.Document{
  263. {Id: "3", Score: 3, Categories: []string{"", "*"}, Timestamp: recommendTime},
  264. {Id: "2", Score: 2, Categories: []string{""}, Timestamp: recommendTime},
  265. {Id: "1", Score: 1, Categories: []string{"", "*"}, Timestamp: recommendTime},
  266. {Id: "0", Score: 0, Categories: []string{""}, Timestamp: recommendTime},
  267. }, recommends)
  268. }
  269. func (suite *WorkerTestSuite) TestRecommendItemBased() {
  270. ctx := context.Background()
  271. suite.Config.Recommend.Offline.EnableColRecommend = false
  272. suite.Config.Recommend.Offline.EnableItemBasedRecommend = true
  273. // insert feedback
  274. err := suite.DataClient.BatchInsertFeedback(ctx, []data.Feedback{
  275. {FeedbackKey: data.FeedbackKey{FeedbackType: "a", UserId: "0", ItemId: "21"}},
  276. {FeedbackKey: data.FeedbackKey{FeedbackType: "a", UserId: "0", ItemId: "22"}},
  277. {FeedbackKey: data.FeedbackKey{FeedbackType: "a", UserId: "0", ItemId: "23"}},
  278. {FeedbackKey: data.FeedbackKey{FeedbackType: "a", UserId: "0", ItemId: "24"}},
  279. }, true, true, true)
  280. suite.NoError(err)
  281. // insert similar items
  282. err = suite.CacheClient.AddDocuments(ctx, cache.ItemNeighbors, "21", []cache.Document{
  283. {Id: "22", Score: 100000, Categories: []string{"", "*"}},
  284. {Id: "25", Score: 1000000, Categories: []string{""}},
  285. {Id: "29", Score: 1, Categories: []string{""}},
  286. })
  287. suite.NoError(err)
  288. err = suite.CacheClient.AddDocuments(ctx, cache.ItemNeighbors, "22", []cache.Document{
  289. {Id: "23", Score: 100000, Categories: []string{"", "*"}},
  290. {Id: "25", Score: 1000000, Categories: []string{""}},
  291. {Id: "28", Score: 1, Categories: []string{"", "*"}},
  292. {Id: "29", Score: 1, Categories: []string{""}},
  293. })
  294. suite.NoError(err)
  295. err = suite.CacheClient.AddDocuments(ctx, cache.ItemNeighbors, "23", []cache.Document{
  296. {Id: "24", Score: 100000, Categories: []string{"", "*"}},
  297. {Id: "25", Score: 1000000, Categories: []string{""}},
  298. {Id: "27", Score: 1, Categories: []string{""}},
  299. {Id: "28", Score: 1, Categories: []string{"", "*"}},
  300. {Id: "29", Score: 1, Categories: []string{""}},
  301. })
  302. suite.NoError(err)
  303. err = suite.CacheClient.AddDocuments(ctx, cache.ItemNeighbors, "24", []cache.Document{
  304. {Id: "21", Score: 100000, Categories: []string{""}},
  305. {Id: "25", Score: 1000000, Categories: []string{""}},
  306. {Id: "26", Score: 1, Categories: []string{"", "*"}},
  307. {Id: "27", Score: 1, Categories: []string{""}},
  308. {Id: "28", Score: 1, Categories: []string{"", "*"}},
  309. {Id: "29", Score: 1, Categories: []string{""}},
  310. })
  311. suite.NoError(err)
  312. // insert items
  313. err = suite.DataClient.BatchInsertItems(ctx, []data.Item{{ItemId: "21"}, {ItemId: "22"}, {ItemId: "23"}, {ItemId: "24"},
  314. {ItemId: "25"}, {ItemId: "26"}, {ItemId: "27"}, {ItemId: "28"}, {ItemId: "29"}})
  315. suite.NoError(err)
  316. // insert hidden items
  317. err = suite.DataClient.BatchInsertItems(ctx, []data.Item{{ItemId: "25", IsHidden: true}})
  318. suite.NoError(err)
  319. // insert categorized items
  320. err = suite.DataClient.BatchInsertItems(ctx, []data.Item{{ItemId: "26", Categories: []string{"*"}}, {ItemId: "28", Categories: []string{"*"}}})
  321. suite.NoError(err)
  322. suite.RankingModel = newMockMatrixFactorizationForRecommend(1, 10)
  323. suite.Recommend([]data.User{{UserId: "0"}})
  324. // read recommend time
  325. recommendTime, err := suite.CacheClient.Get(ctx, cache.Key(cache.LastUpdateUserRecommendTime, "0")).Time()
  326. suite.NoError(err)
  327. // read recommend result
  328. recommends, err := suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "0", []string{""}, 0, 3)
  329. suite.NoError(err)
  330. suite.Equal([]cache.Document{
  331. {Id: "29", Score: 29, Categories: []string{""}, Timestamp: recommendTime},
  332. {Id: "28", Score: 28, Categories: []string{"", "*"}, Timestamp: recommendTime},
  333. {Id: "27", Score: 27, Categories: []string{""}, Timestamp: recommendTime},
  334. }, recommends)
  335. recommends, err = suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "0", []string{"*"}, 0, 3)
  336. suite.NoError(err)
  337. suite.Equal([]cache.Document{
  338. {Id: "28", Score: 28, Categories: []string{"", "*"}, Timestamp: recommendTime},
  339. {Id: "26", Score: 26, Categories: []string{"", "*"}, Timestamp: recommendTime},
  340. }, recommends)
  341. }
  342. func (suite *WorkerTestSuite) TestRecommendUserBased() {
  343. ctx := context.Background()
  344. suite.Config.Recommend.Offline.EnableColRecommend = false
  345. suite.Config.Recommend.Offline.EnableUserBasedRecommend = true
  346. // insert similar users
  347. err := suite.CacheClient.AddDocuments(ctx, cache.UserNeighbors, "0", []cache.Document{
  348. {Id: "1", Score: 2, Categories: []string{""}},
  349. {Id: "2", Score: 1.5, Categories: []string{""}},
  350. {Id: "3", Score: 1, Categories: []string{""}},
  351. })
  352. suite.NoError(err)
  353. // insert feedback
  354. err = suite.DataClient.BatchInsertFeedback(ctx, []data.Feedback{
  355. {FeedbackKey: data.FeedbackKey{FeedbackType: "a", UserId: "1", ItemId: "10"}},
  356. {FeedbackKey: data.FeedbackKey{FeedbackType: "a", UserId: "1", ItemId: "11"}},
  357. }, true, true, true)
  358. suite.NoError(err)
  359. err = suite.DataClient.BatchInsertFeedback(ctx, []data.Feedback{
  360. {FeedbackKey: data.FeedbackKey{FeedbackType: "a", UserId: "2", ItemId: "10"}},
  361. {FeedbackKey: data.FeedbackKey{FeedbackType: "a", UserId: "2", ItemId: "12"}},
  362. {FeedbackKey: data.FeedbackKey{FeedbackType: "a", UserId: "2", ItemId: "48"}},
  363. }, true, true, true)
  364. suite.NoError(err)
  365. err = suite.DataClient.BatchInsertFeedback(ctx, []data.Feedback{
  366. {FeedbackKey: data.FeedbackKey{FeedbackType: "a", UserId: "3", ItemId: "10"}},
  367. {FeedbackKey: data.FeedbackKey{FeedbackType: "a", UserId: "3", ItemId: "13"}},
  368. {FeedbackKey: data.FeedbackKey{FeedbackType: "a", UserId: "3", ItemId: "48"}},
  369. }, true, true, true)
  370. suite.NoError(err)
  371. // insert hidden items
  372. err = suite.DataClient.BatchInsertItems(ctx, []data.Item{{ItemId: "10", IsHidden: true}})
  373. suite.NoError(err)
  374. // insert categorized items
  375. err = suite.DataClient.BatchInsertItems(ctx, []data.Item{
  376. {ItemId: "12", Categories: []string{"*"}},
  377. {ItemId: "48", Categories: []string{"*"}},
  378. })
  379. suite.NoError(err)
  380. suite.RankingModel = newMockMatrixFactorizationForRecommend(1, 10)
  381. suite.Recommend([]data.User{{UserId: "0"}})
  382. // read recommend time
  383. recommendTime, err := suite.CacheClient.Get(ctx, cache.Key(cache.LastUpdateUserRecommendTime, "0")).Time()
  384. suite.NoError(err)
  385. // read recommend result
  386. recommends, err := suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "0", []string{""}, 0, 3)
  387. suite.NoError(err)
  388. suite.Equal([]cache.Document{
  389. {Id: "48", Score: 48, Categories: []string{"", "*"}, Timestamp: recommendTime},
  390. {Id: "13", Score: 13, Categories: []string{""}, Timestamp: recommendTime},
  391. {Id: "12", Score: 12, Categories: []string{"", "*"}, Timestamp: recommendTime},
  392. }, recommends)
  393. recommends, err = suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "0", []string{"*"}, 0, 3)
  394. suite.NoError(err)
  395. suite.Equal([]cache.Document{
  396. {Id: "48", Score: 48, Categories: []string{"", "*"}, Timestamp: recommendTime},
  397. {Id: "12", Score: 12, Categories: []string{"", "*"}, Timestamp: recommendTime},
  398. }, recommends)
  399. }
  400. func (suite *WorkerTestSuite) TestRecommendPopular() {
  401. ctx := context.Background()
  402. suite.Config.Recommend.Offline.EnableColRecommend = false
  403. suite.Config.Recommend.Offline.EnablePopularRecommend = true
  404. // insert popular items
  405. err := suite.CacheClient.AddDocuments(ctx, cache.PopularItems, "", []cache.Document{
  406. {Id: "11", Score: 11, Categories: []string{""}},
  407. {Id: "10", Score: 10, Categories: []string{""}},
  408. {Id: "9", Score: 9, Categories: []string{""}},
  409. {Id: "8", Score: 8, Categories: []string{""}},
  410. {Id: "20", Score: 20, Categories: []string{"*"}},
  411. {Id: "19", Score: 19, Categories: []string{"*"}},
  412. {Id: "18", Score: 18, Categories: []string{"*"}},
  413. })
  414. suite.NoError(err)
  415. // insert items
  416. err = suite.DataClient.BatchInsertItems(ctx, []data.Item{
  417. {ItemId: "11"}, {ItemId: "10"}, {ItemId: "9"}, {ItemId: "8"},
  418. {ItemId: "20", Categories: []string{"*"}},
  419. {ItemId: "19", Categories: []string{"*"}},
  420. {ItemId: "18", Categories: []string{"*"}},
  421. })
  422. suite.NoError(err)
  423. // insert hidden items
  424. err = suite.DataClient.BatchInsertItems(ctx, []data.Item{{ItemId: "11", IsHidden: true}})
  425. suite.NoError(err)
  426. suite.RankingModel = newMockMatrixFactorizationForRecommend(1, 10)
  427. suite.Recommend([]data.User{{UserId: "0"}})
  428. // read recommend time
  429. recommendTime, err := suite.CacheClient.Get(ctx, cache.Key(cache.LastUpdateUserRecommendTime, "0")).Time()
  430. suite.NoError(err)
  431. // read recommend result
  432. recommends, err := suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "0", []string{""}, 0, -1)
  433. suite.NoError(err)
  434. suite.Equal([]cache.Document{
  435. {Id: "10", Score: 10, Categories: []string{""}, Timestamp: recommendTime},
  436. {Id: "9", Score: 9, Categories: []string{""}, Timestamp: recommendTime},
  437. {Id: "8", Score: 8, Categories: []string{""}, Timestamp: recommendTime},
  438. }, recommends)
  439. recommends, err = suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "0", []string{"*"}, 0, -1)
  440. suite.NoError(err)
  441. suite.Equal([]cache.Document{
  442. {Id: "20", Score: 20, Categories: []string{"*"}, Timestamp: recommendTime},
  443. {Id: "19", Score: 19, Categories: []string{"*"}, Timestamp: recommendTime},
  444. {Id: "18", Score: 18, Categories: []string{"*"}, Timestamp: recommendTime},
  445. }, recommends)
  446. }
  447. func (suite *WorkerTestSuite) TestRecommendLatest() {
  448. // create mock worker
  449. ctx := context.Background()
  450. suite.Config.Recommend.Offline.EnableColRecommend = false
  451. suite.Config.Recommend.Offline.EnableLatestRecommend = true
  452. // insert latest items
  453. err := suite.CacheClient.AddDocuments(ctx, cache.LatestItems, "", []cache.Document{
  454. {Id: "11", Score: 11, Categories: []string{""}},
  455. {Id: "10", Score: 10, Categories: []string{""}},
  456. {Id: "9", Score: 9, Categories: []string{""}},
  457. {Id: "8", Score: 8, Categories: []string{""}},
  458. {Id: "20", Score: 20, Categories: []string{"*"}},
  459. {Id: "19", Score: 19, Categories: []string{"*"}},
  460. {Id: "18", Score: 18, Categories: []string{"*"}},
  461. })
  462. suite.NoError(err)
  463. // insert items
  464. err = suite.DataClient.BatchInsertItems(ctx, []data.Item{
  465. {ItemId: "11"}, {ItemId: "10"}, {ItemId: "9"}, {ItemId: "8"},
  466. {ItemId: "20", Categories: []string{"*"}},
  467. {ItemId: "19", Categories: []string{"*"}},
  468. {ItemId: "18", Categories: []string{"*"}},
  469. })
  470. suite.NoError(err)
  471. // insert hidden items
  472. err = suite.DataClient.BatchInsertItems(ctx, []data.Item{{ItemId: "11", IsHidden: true}})
  473. suite.NoError(err)
  474. suite.RankingModel = newMockMatrixFactorizationForRecommend(1, 10)
  475. suite.Recommend([]data.User{{UserId: "0"}})
  476. // read recommend time
  477. recommendTime, err := suite.CacheClient.Get(ctx, cache.Key(cache.LastUpdateUserRecommendTime, "0")).Time()
  478. suite.NoError(err)
  479. // read recommend result
  480. recommends, err := suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "0", []string{""}, 0, -1)
  481. suite.NoError(err)
  482. suite.Equal([]cache.Document{
  483. {Id: "10", Score: 10, Categories: []string{""}, Timestamp: recommendTime},
  484. {Id: "9", Score: 9, Categories: []string{""}, Timestamp: recommendTime},
  485. {Id: "8", Score: 8, Categories: []string{""}, Timestamp: recommendTime},
  486. }, recommends)
  487. recommends, err = suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "0", []string{"*"}, 0, -1)
  488. suite.NoError(err)
  489. suite.Equal([]cache.Document{
  490. {Id: "20", Score: 20, Categories: []string{"*"}, Timestamp: recommendTime},
  491. {Id: "19", Score: 19, Categories: []string{"*"}, Timestamp: recommendTime},
  492. {Id: "18", Score: 18, Categories: []string{"*"}, Timestamp: recommendTime},
  493. }, recommends)
  494. }
  495. func (suite *WorkerTestSuite) TestRecommendColdStart() {
  496. ctx := context.Background()
  497. suite.Config.Recommend.Offline.EnableColRecommend = true
  498. suite.Config.Recommend.Offline.EnableLatestRecommend = true
  499. // insert latest items
  500. err := suite.CacheClient.AddDocuments(ctx, cache.LatestItems, "", []cache.Document{
  501. {Id: "11", Score: 11, Categories: []string{""}},
  502. {Id: "10", Score: 10, Categories: []string{""}},
  503. {Id: "9", Score: 9, Categories: []string{""}},
  504. {Id: "8", Score: 8, Categories: []string{""}},
  505. {Id: "20", Score: 20, Categories: []string{"*"}},
  506. {Id: "19", Score: 19, Categories: []string{"*"}},
  507. {Id: "18", Score: 18, Categories: []string{"*"}},
  508. })
  509. suite.NoError(err)
  510. // insert items
  511. err = suite.DataClient.BatchInsertItems(ctx, []data.Item{
  512. {ItemId: "11"}, {ItemId: "10"}, {ItemId: "9"}, {ItemId: "8"},
  513. {ItemId: "20", Categories: []string{"*"}},
  514. {ItemId: "19", Categories: []string{"*"}},
  515. {ItemId: "18", Categories: []string{"*"}},
  516. })
  517. suite.NoError(err)
  518. // insert hidden items
  519. err = suite.DataClient.BatchInsertItems(ctx, []data.Item{{ItemId: "11", IsHidden: true}})
  520. suite.NoError(err)
  521. // ranking model not exist
  522. m := newMockMatrixFactorizationForRecommend(10, 100)
  523. suite.Recommend([]data.User{{UserId: "0"}})
  524. recommends, err := suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "0", []string{""}, 0, -1)
  525. suite.NoError(err)
  526. suite.Equal([]string{"10", "9", "8"}, lo.Map(recommends, func(d cache.Document, _ int) string { return d.Id }))
  527. recommends, err = suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "0", []string{"*"}, 0, -1)
  528. suite.NoError(err)
  529. suite.Equal([]string{"20", "19", "18"}, lo.Map(recommends, func(d cache.Document, _ int) string { return d.Id }))
  530. // user not predictable
  531. suite.RankingModel = m
  532. suite.Recommend([]data.User{{UserId: "100"}})
  533. recommends, err = suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "100", []string{""}, 0, -1)
  534. suite.NoError(err)
  535. suite.Equal([]string{"10", "9", "8"}, lo.Map(recommends, func(d cache.Document, _ int) string { return d.Id }))
  536. recommends, err = suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "100", []string{"*"}, 0, -1)
  537. suite.NoError(err)
  538. suite.Equal([]string{"20", "19", "18"}, lo.Map(recommends, func(d cache.Document, _ int) string { return d.Id }))
  539. }
  540. func (suite *WorkerTestSuite) TestMergeAndShuffle() {
  541. scores := suite.mergeAndShuffle([][]string{{"1", "2", "3"}, {"1", "3", "5"}})
  542. suite.ElementsMatch([]string{"1", "2", "3", "5"}, lo.Map(scores, func(d cache.Document, _ int) string { return d.Id }))
  543. }
  544. func (suite *WorkerTestSuite) TestExploreRecommend() {
  545. ctx := context.Background()
  546. suite.Config.Recommend.Offline.ExploreRecommend = map[string]float64{"popular": 0.3, "latest": 0.3}
  547. // insert popular items
  548. err := suite.CacheClient.AddDocuments(ctx, cache.PopularItems, "", []cache.Document{{Id: "popular", Score: 0, Categories: []string{""}, Timestamp: time.Now()}})
  549. suite.NoError(err)
  550. // insert latest items
  551. err = suite.CacheClient.AddDocuments(ctx, cache.LatestItems, "", []cache.Document{{Id: "latest", Score: 0, Categories: []string{""}, Timestamp: time.Now()}})
  552. suite.NoError(err)
  553. recommend, err := suite.exploreRecommend([]cache.Document{
  554. {Id: "8", Score: 8},
  555. {Id: "7", Score: 7},
  556. {Id: "6", Score: 6},
  557. {Id: "5", Score: 5},
  558. {Id: "4", Score: 4},
  559. {Id: "3", Score: 3},
  560. {Id: "2", Score: 2},
  561. {Id: "1", Score: 1},
  562. }, mapset.NewSet[string](), "")
  563. suite.NoError(err)
  564. items := lo.Map(recommend, func(d cache.Document, _ int) string { return d.Id })
  565. suite.Contains(items, "latest")
  566. suite.Contains(items, "popular")
  567. items = funk.FilterString(items, func(item string) bool {
  568. return item != "latest" && item != "popular"
  569. })
  570. suite.IsDecreasing(items)
  571. scores := lo.Map(recommend, func(d cache.Document, _ int) float64 { return d.Score })
  572. suite.IsDecreasing(scores)
  573. suite.Equal(8, len(recommend))
  574. }
  575. func marshal(t *testing.T, v interface{}) string {
  576. s, err := json.Marshal(v)
  577. assert.NoError(t, err)
  578. return string(s)
  579. }
  580. func newRankingDataset() (*ranking.DataSet, *ranking.DataSet) {
  581. dataset := &ranking.DataSet{
  582. UserIndex: base.NewMapIndex(),
  583. ItemIndex: base.NewMapIndex(),
  584. }
  585. return dataset, dataset
  586. }
  587. func newClickDataset() (*click.Dataset, *click.Dataset) {
  588. dataset := &click.Dataset{
  589. Index: click.NewUnifiedMapIndexBuilder().Build(),
  590. }
  591. return dataset, dataset
  592. }
  593. type mockMaster struct {
  594. protocol.UnimplementedMasterServer
  595. addr chan string
  596. grpcServer *grpc.Server
  597. cacheFilePath string
  598. dataFilePath string
  599. meta *protocol.Meta
  600. rankingModel []byte
  601. clickModel []byte
  602. userIndex []byte
  603. }
  604. func newMockMaster(t *testing.T) *mockMaster {
  605. cfg := config.GetDefaultConfig()
  606. cfg.Database.DataStore = fmt.Sprintf("sqlite://%s/data.db", t.TempDir())
  607. cfg.Database.CacheStore = fmt.Sprintf("sqlite://%s/cache.db", t.TempDir())
  608. // create click model
  609. train, test := newClickDataset()
  610. fm := click.NewFM(click.FMClassification, model.Params{model.NEpochs: 0})
  611. fm.Fit(context.Background(), train, test, nil)
  612. clickModelBuffer := bytes.NewBuffer(nil)
  613. err := click.MarshalModel(clickModelBuffer, fm)
  614. assert.NoError(t, err)
  615. // create ranking model
  616. trainSet, testSet := newRankingDataset()
  617. bpr := ranking.NewBPR(model.Params{model.NEpochs: 0})
  618. bpr.Fit(context.Background(), trainSet, testSet, nil)
  619. rankingModelBuffer := bytes.NewBuffer(nil)
  620. err = ranking.MarshalModel(rankingModelBuffer, bpr)
  621. assert.NoError(t, err)
  622. // create user index
  623. userIndexBuffer := bytes.NewBuffer(nil)
  624. err = base.MarshalIndex(userIndexBuffer, base.NewMapIndex())
  625. assert.NoError(t, err)
  626. return &mockMaster{
  627. addr: make(chan string),
  628. meta: &protocol.Meta{
  629. Config: marshal(t, cfg),
  630. ClickModelVersion: 1,
  631. RankingModelVersion: 2,
  632. },
  633. cacheFilePath: cfg.Database.CacheStore,
  634. dataFilePath: cfg.Database.DataStore,
  635. userIndex: userIndexBuffer.Bytes(),
  636. clickModel: clickModelBuffer.Bytes(),
  637. rankingModel: rankingModelBuffer.Bytes(),
  638. }
  639. }
  640. func (m *mockMaster) GetMeta(_ context.Context, _ *protocol.NodeInfo) (*protocol.Meta, error) {
  641. return m.meta, nil
  642. }
  643. func (m *mockMaster) GetRankingModel(_ *protocol.VersionInfo, sender protocol.Master_GetRankingModelServer) error {
  644. return sender.Send(&protocol.Fragment{Data: m.rankingModel})
  645. }
  646. func (m *mockMaster) GetClickModel(_ *protocol.VersionInfo, sender protocol.Master_GetClickModelServer) error {
  647. return sender.Send(&protocol.Fragment{Data: m.clickModel})
  648. }
  649. func (m *mockMaster) Start(t *testing.T) {
  650. listen, err := net.Listen("tcp", "localhost:0")
  651. assert.NoError(t, err)
  652. m.addr <- listen.Addr().String()
  653. var opts []grpc.ServerOption
  654. m.grpcServer = grpc.NewServer(opts...)
  655. protocol.RegisterMasterServer(m.grpcServer, m)
  656. err = m.grpcServer.Serve(listen)
  657. assert.NoError(t, err)
  658. }
  659. func (m *mockMaster) Stop() {
  660. m.grpcServer.Stop()
  661. }
  662. func TestWorker_Sync(t *testing.T) {
  663. master := newMockMaster(t)
  664. go master.Start(t)
  665. address := <-master.addr
  666. conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
  667. assert.NoError(t, err)
  668. serv := &Worker{
  669. Settings: config.NewSettings(),
  670. testMode: true,
  671. masterClient: protocol.NewMasterClient(conn),
  672. syncedChan: parallel.NewConditionChannel(),
  673. ticker: time.NewTicker(time.Minute),
  674. }
  675. // This clause is used to test race condition.
  676. done := make(chan struct{})
  677. go func() {
  678. for {
  679. select {
  680. case <-done:
  681. return
  682. default:
  683. p, _ := serv.Config.Recommend.Offline.GetExploreRecommend("popular")
  684. assert.Zero(t, p)
  685. }
  686. }
  687. }()
  688. serv.Sync()
  689. assert.Equal(t, master.dataFilePath, serv.dataPath)
  690. assert.Equal(t, master.cacheFilePath, serv.cachePath)
  691. assert.NoError(t, serv.DataClient.Close())
  692. assert.NoError(t, serv.CacheClient.Close())
  693. assert.Equal(t, int64(1), serv.latestClickModelVersion)
  694. assert.Equal(t, int64(2), serv.latestRankingModelVersion)
  695. assert.Zero(t, serv.ClickModelVersion)
  696. assert.Zero(t, serv.RankingModelVersion)
  697. serv.Pull()
  698. assert.Equal(t, int64(1), serv.ClickModelVersion)
  699. assert.Equal(t, int64(2), serv.RankingModelVersion)
  700. master.Stop()
  701. done <- struct{}{}
  702. }
  703. func TestWorker_SyncRecommend(t *testing.T) {
  704. cfg := config.GetDefaultConfig()
  705. cfg.Recommend.Offline.ExploreRecommend = map[string]float64{"popular": 0.5}
  706. master := newMockMaster(t)
  707. master.meta.Config = marshal(t, cfg)
  708. go master.Start(t)
  709. address := <-master.addr
  710. conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
  711. assert.NoError(t, err)
  712. worker := &Worker{
  713. Settings: config.NewSettings(),
  714. jobs: 1,
  715. testMode: true,
  716. masterClient: protocol.NewMasterClient(conn),
  717. syncedChan: parallel.NewConditionChannel(),
  718. ticker: time.NewTicker(time.Minute),
  719. }
  720. worker.Sync()
  721. stopSync := make(chan struct{})
  722. go func() {
  723. for {
  724. select {
  725. case <-stopSync:
  726. return
  727. default:
  728. worker.Sync()
  729. }
  730. }
  731. }()
  732. stopRecommend := make(chan struct{})
  733. go func() {
  734. for {
  735. select {
  736. case <-stopRecommend:
  737. return
  738. default:
  739. worker.Settings.Config.OfflineRecommendDigest()
  740. }
  741. }
  742. }()
  743. time.Sleep(time.Second)
  744. stopSync <- struct{}{}
  745. stopRecommend <- struct{}{}
  746. master.Stop()
  747. }
  748. type mockFactorizationMachine struct {
  749. click.BaseFactorizationMachine
  750. }
  751. func (m mockFactorizationMachine) Complexity() int {
  752. panic("implement me")
  753. }
  754. func (m mockFactorizationMachine) Bytes() int {
  755. panic("implement me")
  756. }
  757. func (m mockFactorizationMachine) GetParamsGrid(_ bool) model.ParamsGrid {
  758. panic("implement me")
  759. }
  760. func (m mockFactorizationMachine) Clear() {
  761. panic("implement me")
  762. }
  763. func (m mockFactorizationMachine) Invalid() bool {
  764. return false
  765. }
  766. func (m mockFactorizationMachine) Predict(_, itemId string, _, _ []click.Feature) float32 {
  767. score, err := strconv.Atoi(itemId)
  768. if err != nil {
  769. panic(err)
  770. }
  771. return float32(score)
  772. }
  773. func (m mockFactorizationMachine) InternalPredict(_ []int32, _ []float32) float32 {
  774. panic("implement me")
  775. }
  776. func (m mockFactorizationMachine) Fit(_ context.Context, _, _ *click.Dataset, _ *click.FitConfig) click.Score {
  777. panic("implement me")
  778. }
  779. func (m mockFactorizationMachine) Marshal(_ io.Writer) error {
  780. panic("implement me")
  781. }
  782. func (suite *WorkerTestSuite) TestRankByCollaborativeFiltering() {
  783. ctx := context.Background()
  784. // insert a user
  785. err := suite.DataClient.BatchInsertUsers(ctx, []data.User{{UserId: "1"}})
  786. suite.NoError(err)
  787. // insert items
  788. itemCache := make(map[string]data.Item)
  789. for i := 1; i <= 5; i++ {
  790. itemCache[strconv.Itoa(i)] = data.Item{ItemId: strconv.Itoa(i)}
  791. }
  792. // rank items
  793. suite.RankingModel = newMockMatrixFactorizationForRecommend(10, 10)
  794. result, err := suite.rankByCollaborativeFiltering("1", [][]string{{"1", "2", "3", "4", "5"}})
  795. suite.NoError(err)
  796. suite.Equal([]string{"5", "4", "3", "2", "1"}, lo.Map(result, func(d cache.Document, _ int) string {
  797. return d.Id
  798. }))
  799. suite.IsDecreasing(lo.Map(result, func(d cache.Document, _ int) float64 {
  800. return d.Score
  801. }))
  802. }
  803. func (suite *WorkerTestSuite) TestRankByClickTroughRate() {
  804. ctx := context.Background()
  805. // insert a user
  806. err := suite.DataClient.BatchInsertUsers(ctx, []data.User{{UserId: "1"}})
  807. suite.NoError(err)
  808. // insert items
  809. itemCache := NewItemCache()
  810. for i := 1; i <= 5; i++ {
  811. itemCache.Set(strconv.Itoa(i), data.Item{ItemId: strconv.Itoa(i)})
  812. }
  813. // rank items
  814. result, err := suite.rankByClickTroughRate(&data.User{UserId: "1"}, [][]string{{"1", "2", "3", "4", "5"}}, itemCache, new(mockFactorizationMachine))
  815. suite.NoError(err)
  816. suite.Equal([]string{"5", "4", "3", "2", "1"}, lo.Map(result, func(d cache.Document, _ int) string {
  817. return d.Id
  818. }))
  819. suite.IsDecreasing(lo.Map(result, func(d cache.Document, _ int) float64 {
  820. return d.Score
  821. }))
  822. }
  823. func (suite *WorkerTestSuite) TestReplacement_ClickThroughRate() {
  824. ctx := context.Background()
  825. suite.Config.Recommend.DataSource.PositiveFeedbackTypes = []string{"p"}
  826. suite.Config.Recommend.DataSource.ReadFeedbackTypes = []string{"n"}
  827. suite.Config.Recommend.Offline.EnableColRecommend = false
  828. suite.Config.Recommend.Offline.EnablePopularRecommend = true
  829. suite.Config.Recommend.Replacement.EnableReplacement = true
  830. suite.Config.Recommend.Offline.EnableClickThroughPrediction = true
  831. // 1. Insert historical items into empty recommendation.
  832. // insert items
  833. err := suite.DataClient.BatchInsertItems(ctx, []data.Item{
  834. {ItemId: "10"}, {ItemId: "9"}, {ItemId: "8"}, {ItemId: "7"}, {ItemId: "6"}, {ItemId: "5"},
  835. })
  836. suite.NoError(err)
  837. // insert feedback
  838. err = suite.DataClient.BatchInsertFeedback(ctx, []data.Feedback{
  839. {FeedbackKey: data.FeedbackKey{FeedbackType: "p", UserId: "0", ItemId: "10"}},
  840. {FeedbackKey: data.FeedbackKey{FeedbackType: "n", UserId: "0", ItemId: "9"}},
  841. {FeedbackKey: data.FeedbackKey{FeedbackType: "i", UserId: "0", ItemId: "8"}},
  842. }, true, false, true)
  843. suite.NoError(err)
  844. suite.rankers = []click.FactorizationMachine{new(mockFactorizationMachine)}
  845. suite.Recommend([]data.User{{UserId: "0"}})
  846. // read recommend time
  847. recommendTime, err := suite.CacheClient.Get(ctx, cache.Key(cache.LastUpdateUserRecommendTime, "0")).Time()
  848. suite.NoError(err)
  849. // read recommend result
  850. recommends, err := suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "0", []string{""}, 0, 3)
  851. suite.NoError(err)
  852. suite.Equal([]cache.Document{
  853. {Id: "10", Score: 10, Categories: []string{""}, Timestamp: recommendTime},
  854. {Id: "9", Score: 9, Categories: []string{""}, Timestamp: recommendTime},
  855. }, recommends)
  856. // 2. Insert historical items into non-empty recommendation.
  857. err = suite.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastUpdateUserRecommendTime, "0"), time.Now().AddDate(-1, 0, 0)))
  858. suite.NoError(err)
  859. // insert popular items
  860. err = suite.CacheClient.AddDocuments(ctx, cache.PopularItems, "", []cache.Document{
  861. {Id: "7", Score: 10, Categories: []string{""}},
  862. {Id: "6", Score: 9, Categories: []string{""}},
  863. {Id: "5", Score: 8, Categories: []string{""}},
  864. })
  865. suite.NoError(err)
  866. // insert feedback
  867. err = suite.DataClient.BatchInsertFeedback(ctx, []data.Feedback{
  868. {FeedbackKey: data.FeedbackKey{FeedbackType: "p", UserId: "0", ItemId: "10"}},
  869. {FeedbackKey: data.FeedbackKey{FeedbackType: "n", UserId: "0", ItemId: "9"}},
  870. {FeedbackKey: data.FeedbackKey{FeedbackType: "i", UserId: "0", ItemId: "8"}},
  871. }, true, false, true)
  872. suite.NoError(err)
  873. suite.Recommend([]data.User{{UserId: "0"}})
  874. // read recommend time
  875. recommendTime, err = suite.CacheClient.Get(ctx, cache.Key(cache.LastUpdateUserRecommendTime, "0")).Time()
  876. suite.NoError(err)
  877. // read recommend result
  878. recommends, err = suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "0", []string{""}, 0, 3)
  879. suite.NoError(err)
  880. suite.Equal([]cache.Document{
  881. {Id: "10", Score: 9, Categories: []string{""}, Timestamp: recommendTime},
  882. {Id: "9", Score: 7.4, Categories: []string{""}, Timestamp: recommendTime},
  883. {Id: "7", Score: 7, Categories: []string{""}, Timestamp: recommendTime},
  884. }, recommends)
  885. }
  886. func (suite *WorkerTestSuite) TestReplacement_CollaborativeFiltering() {
  887. ctx := context.Background()
  888. suite.Config.Recommend.DataSource.PositiveFeedbackTypes = []string{"p"}
  889. suite.Config.Recommend.DataSource.ReadFeedbackTypes = []string{"n"}
  890. suite.Config.Recommend.Offline.EnableColRecommend = false
  891. suite.Config.Recommend.Offline.EnablePopularRecommend = true
  892. suite.Config.Recommend.Replacement.EnableReplacement = true
  893. // 1. Insert historical items into empty recommendation.
  894. // insert items
  895. err := suite.DataClient.BatchInsertItems(ctx, []data.Item{
  896. {ItemId: "10"}, {ItemId: "9"}, {ItemId: "8"}, {ItemId: "7"}, {ItemId: "6"}, {ItemId: "5"},
  897. })
  898. suite.NoError(err)
  899. // insert feedback
  900. err = suite.DataClient.BatchInsertFeedback(ctx, []data.Feedback{
  901. {FeedbackKey: data.FeedbackKey{FeedbackType: "p", UserId: "0", ItemId: "10"}},
  902. {FeedbackKey: data.FeedbackKey{FeedbackType: "n", UserId: "0", ItemId: "9"}},
  903. {FeedbackKey: data.FeedbackKey{FeedbackType: "i", UserId: "0", ItemId: "8"}},
  904. }, true, false, true)
  905. suite.NoError(err)
  906. suite.RankingModel = newMockMatrixFactorizationForRecommend(1, 10)
  907. suite.Recommend([]data.User{{UserId: "0"}})
  908. // read recommend time
  909. recommendTime, err := suite.CacheClient.Get(ctx, cache.Key(cache.LastUpdateUserRecommendTime, "0")).Time()
  910. suite.NoError(err)
  911. // read recommend result
  912. recommends, err := suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "0", []string{""}, 0, 3)
  913. suite.NoError(err)
  914. suite.Equal([]cache.Document{
  915. {Id: "10", Score: 10, Categories: []string{""}, Timestamp: recommendTime},
  916. {Id: "9", Score: 9, Categories: []string{""}, Timestamp: recommendTime},
  917. }, recommends)
  918. // 2. Insert historical items into non-empty recommendation.
  919. err = suite.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastUpdateUserRecommendTime, "0"), time.Now().AddDate(-1, 0, 0)))
  920. suite.NoError(err)
  921. // insert popular items
  922. err = suite.CacheClient.AddDocuments(ctx, cache.PopularItems, "", []cache.Document{
  923. {Id: "7", Score: 10, Categories: []string{""}},
  924. {Id: "6", Score: 9, Categories: []string{""}},
  925. {Id: "5", Score: 8, Categories: []string{""}}})
  926. suite.NoError(err)
  927. // insert feedback
  928. err = suite.DataClient.BatchInsertFeedback(ctx, []data.Feedback{
  929. {FeedbackKey: data.FeedbackKey{FeedbackType: "p", UserId: "0", ItemId: "10"}},
  930. {FeedbackKey: data.FeedbackKey{FeedbackType: "n", UserId: "0", ItemId: "9"}},
  931. {FeedbackKey: data.FeedbackKey{FeedbackType: "i", UserId: "0", ItemId: "8"}},
  932. }, true, false, true)
  933. suite.NoError(err)
  934. suite.Recommend([]data.User{{UserId: "0"}})
  935. // read recommend time
  936. recommendTime, err = suite.CacheClient.Get(ctx, cache.Key(cache.LastUpdateUserRecommendTime, "0")).Time()
  937. suite.NoError(err)
  938. // read recommend result
  939. recommends, err = suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "0", []string{""}, 0, 3)
  940. suite.NoError(err)
  941. suite.Equal([]cache.Document{
  942. {Id: "10", Score: 9, Categories: []string{""}, Timestamp: recommendTime},
  943. {Id: "9", Score: 7.4, Categories: []string{""}, Timestamp: recommendTime},
  944. {Id: "7", Score: 7, Categories: []string{""}, Timestamp: recommendTime},
  945. }, recommends)
  946. }
  947. func (suite *WorkerTestSuite) TestUserActivity() {
  948. ctx := context.Background()
  949. err := suite.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyUserTime, "0"), time.Now().AddDate(0, 0, -1)))
  950. suite.NoError(err)
  951. err = suite.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyUserTime, "1"), time.Now().AddDate(0, 0, -10)))
  952. suite.NoError(err)
  953. err = suite.CacheClient.AddDocuments(ctx, cache.OfflineRecommend, "0", []cache.Document{{Id: "0", Score: 1, Categories: []string{""}}})
  954. suite.NoError(err)
  955. err = suite.CacheClient.AddDocuments(ctx, cache.OfflineRecommend, "1", []cache.Document{{Id: "1", Score: 1, Categories: []string{""}}})
  956. suite.NoError(err)
  957. err = suite.CacheClient.AddDocuments(ctx, cache.OfflineRecommend, "2", []cache.Document{{Id: "2", Score: 1, Categories: []string{""}}})
  958. suite.NoError(err)
  959. suite.True(suite.checkUserActiveTime(ctx, "0"))
  960. suite.True(suite.checkUserActiveTime(ctx, "1"))
  961. suite.True(suite.checkUserActiveTime(ctx, "2"))
  962. docs, err := suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "0", []string{""}, 0, 1)
  963. suite.NoError(err)
  964. suite.NotEmpty(docs)
  965. docs, err = suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "1", []string{""}, 0, 1)
  966. suite.NoError(err)
  967. suite.NotEmpty(docs)
  968. docs, err = suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "2", []string{""}, 0, 1)
  969. suite.NoError(err)
  970. suite.NotEmpty(docs)
  971. suite.Config.Recommend.ActiveUserTTL = 5
  972. suite.True(suite.checkUserActiveTime(ctx, "0"))
  973. suite.False(suite.checkUserActiveTime(ctx, "1"))
  974. suite.True(suite.checkUserActiveTime(ctx, "2"))
  975. docs, err = suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "0", []string{""}, 0, 1)
  976. suite.NoError(err)
  977. suite.NotEmpty(docs)
  978. docs, err = suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "1", []string{""}, 0, 1)
  979. suite.NoError(err)
  980. suite.Empty(docs)
  981. docs, err = suite.CacheClient.SearchDocuments(ctx, cache.OfflineRecommend, "2", []string{""}, 0, 1)
  982. suite.NoError(err)
  983. suite.NotEmpty(docs)
  984. }
  985. func (suite *WorkerTestSuite) TestHealth() {
  986. // ready
  987. req := httptest.NewRequest("GET", "https://example.com/", nil)
  988. w := httptest.NewRecorder()
  989. suite.checkLive(w, req)
  990. suite.Equal(http.StatusOK, w.Code)
  991. suite.Equal(marshal(suite.T(), HealthStatus{
  992. DataStoreError: nil,
  993. CacheStoreError: nil,
  994. DataStoreConnected: true,
  995. CacheStoreConnected: true,
  996. }), w.Body.String())
  997. // not ready
  998. dataClient, cacheClient := suite.DataClient, suite.CacheClient
  999. suite.DataClient, suite.CacheClient = data.NoDatabase{}, cache.NoDatabase{}
  1000. w = httptest.NewRecorder()
  1001. suite.checkLive(w, req)
  1002. suite.Equal(http.StatusOK, w.Code)
  1003. suite.Equal(marshal(suite.T(), HealthStatus{
  1004. DataStoreError: data.ErrNoDatabase,
  1005. CacheStoreError: cache.ErrNoDatabase,
  1006. DataStoreConnected: false,
  1007. CacheStoreConnected: false,
  1008. }), w.Body.String())
  1009. suite.DataClient, suite.CacheClient = dataClient, cacheClient
  1010. }
  1011. func TestWorker(t *testing.T) {
  1012. suite.Run(t, new(WorkerTestSuite))
  1013. }