rest.go 70 KB


  1. // Copyright 2020 gorse Project Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "context"
  17. "encoding/json"
  18. "fmt"
  19. "net/http"
  20. "net/http/pprof"
  21. "strconv"
  22. "strings"
  23. "time"
  24. "github.com/araddon/dateparse"
  25. mapset "github.com/deckarep/golang-set/v2"
  26. restfulspec "github.com/emicklei/go-restful-openapi/v2"
  27. "github.com/emicklei/go-restful/v3"
  28. "github.com/google/uuid"
  29. "github.com/juju/errors"
  30. "github.com/prometheus/client_golang/prometheus/promhttp"
  31. "github.com/samber/lo"
  32. "github.com/thoas/go-funk"
  33. "github.com/zhenghaoz/gorse/base/heap"
  34. "github.com/zhenghaoz/gorse/base/log"
  35. "github.com/zhenghaoz/gorse/config"
  36. "github.com/zhenghaoz/gorse/storage/cache"
  37. "github.com/zhenghaoz/gorse/storage/data"
  38. "go.opentelemetry.io/contrib/instrumentation/github.com/emicklei/go-restful/otelrestful"
  39. "go.uber.org/zap"
  40. "google.golang.org/protobuf/proto"
  41. "modernc.org/mathutil"
  42. )
  43. const (
  44. HealthAPITag = "health"
  45. UsersAPITag = "users"
  46. ItemsAPITag = "items"
  47. FeedbackAPITag = "feedback"
  48. RecommendationAPITag = "recommendation"
  49. MeasurementsAPITag = "measurements"
  50. DetractedAPITag = "deprecated"
  51. )
  52. // RestServer implements a REST-ful API server.
  53. type RestServer struct {
  54. *config.Settings
  55. HttpHost string
  56. HttpPort int
  57. DisableLog bool
  58. WebService *restful.WebService
  59. HttpServer *http.Server
  60. }
  61. // StartHttpServer starts the REST-ful API server.
  62. func (s *RestServer) StartHttpServer(container *restful.Container) {
  63. // register restful APIs
  64. s.CreateWebService()
  65. container.Add(s.WebService)
  66. // register swagger UI
  67. specConfig := restfulspec.Config{
  68. WebServices: []*restful.WebService{s.WebService},
  69. APIPath: "/apidocs.json",
  70. }
  71. container.Add(restfulspec.NewOpenAPIService(specConfig))
  72. swaggerFile = specConfig.APIPath
  73. container.Handle(apiDocsPath, http.HandlerFunc(handler))
  74. // register prometheus
  75. container.Handle("/metrics", promhttp.Handler())
  76. // register pprof
  77. container.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
  78. container.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
  79. container.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
  80. container.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
  81. container.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
  82. container.Handle("/debug/pprof/allocs", pprof.Handler("allocs"))
  83. container.Handle("/debug/pprof/block", pprof.Handler("block"))
  84. container.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine"))
  85. container.Handle("/debug/pprof/heap", pprof.Handler("heap"))
  86. container.Handle("/debug/pprof/mutex", pprof.Handler("mutex"))
  87. container.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
  88. // Add container filter to enable CORS
  89. cors := restful.CrossOriginResourceSharing{
  90. AllowedHeaders: []string{"Content-Type", "Accept", "X-API-Key"},
  91. AllowedDomains: s.Config.Master.HttpCorsDomains,
  92. AllowedMethods: s.Config.Master.HttpCorsMethods,
  93. CookiesAllowed: false,
  94. Container: container}
  95. container.Filter(cors.Filter)
  96. log.Logger().Info("start http server",
  97. zap.String("url", fmt.Sprintf("http://%s:%d", s.HttpHost, s.HttpPort)),
  98. zap.Strings("cors_methods", s.Config.Master.HttpCorsMethods),
  99. zap.Strings("cors_domains", s.Config.Master.HttpCorsDomains),
  100. )
  101. s.HttpServer = &http.Server{
  102. Addr: fmt.Sprintf("%s:%d", s.HttpHost, s.HttpPort),
  103. Handler: container,
  104. }
  105. if err := s.HttpServer.ListenAndServe(); err != http.ErrServerClosed {
  106. log.Logger().Fatal("failed to start http server", zap.Error(err))
  107. }
  108. }
  109. func (s *RestServer) LogFilter(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
  110. // generate request id
  111. requestId := uuid.New().String()
  112. resp.AddHeader("X-Request-ID", requestId)
  113. start := time.Now()
  114. chain.ProcessFilter(req, resp)
  115. responseTime := time.Since(start)
  116. if !s.DisableLog && req.Request.URL.Path != "/api/dashboard/cluster" &&
  117. req.Request.URL.Path != "/api/dashboard/tasks" {
  118. log.ResponseLogger(resp).Info(fmt.Sprintf("%s %s", req.Request.Method, req.Request.URL),
  119. zap.Int("status_code", resp.StatusCode()),
  120. zap.Duration("response_time", responseTime))
  121. }
  122. }
  123. func (s *RestServer) AuthFilter(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
  124. if strings.HasPrefix(req.SelectedRoute().Path(), "/api/health/") {
  125. // Health check APIs don't need API key,
  126. chain.ProcessFilter(req, resp)
  127. return
  128. }
  129. if s.Config.Server.APIKey == "" {
  130. chain.ProcessFilter(req, resp)
  131. return
  132. }
  133. apikey := req.HeaderParameter("X-API-Key")
  134. if apikey == s.Config.Server.APIKey {
  135. chain.ProcessFilter(req, resp)
  136. return
  137. }
  138. log.ResponseLogger(resp).Error("unauthorized",
  139. zap.String("api_key", s.Config.Server.APIKey),
  140. zap.String("X-API-Key", apikey))
  141. if err := resp.WriteError(http.StatusUnauthorized, fmt.Errorf("unauthorized")); err != nil {
  142. log.ResponseLogger(resp).Error("failed to write error", zap.Error(err))
  143. }
  144. }
  145. func (s *RestServer) MetricsFilter(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
  146. startTime := time.Now()
  147. chain.ProcessFilter(req, resp)
  148. if req.SelectedRoute() != nil && resp.StatusCode() == http.StatusOK {
  149. routePath := req.SelectedRoutePath()
  150. if !strings.HasPrefix(routePath, "/api/dashboard") {
  151. RestAPIRequestSecondsVec.WithLabelValues(fmt.Sprintf("%s %s", req.Request.Method, routePath)).
  152. Observe(time.Since(startTime).Seconds())
  153. }
  154. }
  155. }
  156. // CreateWebService creates web service.
  157. func (s *RestServer) CreateWebService() {
  158. // Create a server
  159. ws := s.WebService
  160. ws.Path("/api/").
  161. Produces(restful.MIME_JSON).
  162. Filter(s.LogFilter).
  163. Filter(s.AuthFilter).
  164. Filter(s.MetricsFilter).
  165. Filter(otelrestful.OTelFilter("gorse"))
  166. /* Health check */
  167. ws.Route(ws.GET("/health/live").To(s.checkLive).
  168. Doc("Probe the liveness of this node. Return OK once the server starts.").
  169. Metadata(restfulspec.KeyOpenAPITags, []string{HealthAPITag}).
  170. Returns(http.StatusOK, "OK", HealthStatus{}).
  171. Writes(HealthStatus{}))
  172. ws.Route(ws.GET("/health/ready").To(s.checkReady).
  173. Doc("Probe the readiness of this node. Return OK if the server is able to handle requests.").
  174. Metadata(restfulspec.KeyOpenAPITags, []string{HealthAPITag}).
  175. Returns(http.StatusOK, "OK", HealthStatus{}).
  176. Writes(HealthStatus{}))
  177. // Insert a user
  178. ws.Route(ws.POST("/user").To(s.insertUser).
  179. Doc("Insert a user.").
  180. Metadata(restfulspec.KeyOpenAPITags, []string{UsersAPITag}).
  181. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  182. Reads(data.User{}).
  183. Returns(http.StatusOK, "OK", Success{}).
  184. Writes(Success{}))
  185. // Modify a user
  186. ws.Route(ws.PATCH("/user/{user-id}").To(s.modifyUser).
  187. Doc("Modify a user.").
  188. Metadata(restfulspec.KeyOpenAPITags, []string{UsersAPITag}).
  189. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  190. Param(ws.PathParameter("user-id", "ID of the user to modify").DataType("string")).
  191. Reads(data.UserPatch{}).
  192. Returns(http.StatusOK, "OK", Success{}).
  193. Writes(Success{}))
  194. // Get a user
  195. ws.Route(ws.GET("/user/{user-id}").To(s.getUser).
  196. Doc("Get a user.").
  197. Metadata(restfulspec.KeyOpenAPITags, []string{UsersAPITag}).
  198. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  199. Param(ws.PathParameter("user-id", "ID of the user to get").DataType("string")).
  200. Returns(http.StatusOK, "OK", data.User{}).
  201. Writes(data.User{}))
  202. // Insert users
  203. ws.Route(ws.POST("/users").To(s.insertUsers).
  204. Doc("Insert users.").
  205. Metadata(restfulspec.KeyOpenAPITags, []string{UsersAPITag}).
  206. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  207. Reads([]data.User{}).
  208. Returns(http.StatusOK, "OK", Success{}).
  209. Writes(Success{}))
  210. // Get users
  211. ws.Route(ws.GET("/users").To(s.getUsers).
  212. Doc("Get users.").
  213. Metadata(restfulspec.KeyOpenAPITags, []string{UsersAPITag}).
  214. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  215. Param(ws.QueryParameter("n", "Number of returned users").DataType("integer")).
  216. Param(ws.QueryParameter("cursor", "Cursor for the next page").DataType("string")).
  217. Returns(http.StatusOK, "OK", UserIterator{}).
  218. Writes(UserIterator{}))
  219. // Delete a user
  220. ws.Route(ws.DELETE("/user/{user-id}").To(s.deleteUser).
  221. Doc("Delete a user and his or her feedback.").
  222. Metadata(restfulspec.KeyOpenAPITags, []string{UsersAPITag}).
  223. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  224. Param(ws.PathParameter("user-id", "ID of the user to delete").DataType("string")).
  225. Returns(http.StatusOK, "OK", Success{}).
  226. Writes(Success{}))
  227. // Insert an item
  228. ws.Route(ws.POST("/item").To(s.insertItem).
  229. Doc("Insert an item. Overwrite if the item exists.").
  230. Metadata(restfulspec.KeyOpenAPITags, []string{ItemsAPITag}).
  231. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  232. Reads(data.Item{}).
  233. Returns(http.StatusOK, "OK", Success{}).
  234. Writes(Success{}))
  235. // Modify an item
  236. ws.Route(ws.PATCH("/item/{item-id}").To(s.modifyItem).
  237. Doc("Modify an item.").
  238. Metadata(restfulspec.KeyOpenAPITags, []string{ItemsAPITag}).
  239. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  240. Param(ws.PathParameter("item-id", "ID of the item to modify").DataType("string")).
  241. Reads(data.ItemPatch{}).
  242. Returns(http.StatusOK, "OK", Success{}).
  243. Writes(Success{}))
  244. // Get items
  245. ws.Route(ws.GET("/items").To(s.getItems).
  246. Doc("Get items.").
  247. Metadata(restfulspec.KeyOpenAPITags, []string{ItemsAPITag}).
  248. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  249. Param(ws.QueryParameter("n", "Number of returned items").DataType("integer")).
  250. Param(ws.QueryParameter("cursor", "Cursor for the next page").DataType("string")).
  251. Returns(http.StatusOK, "OK", ItemIterator{}).
  252. Writes(ItemIterator{}))
  253. // Get item
  254. ws.Route(ws.GET("/item/{item-id}").To(s.getItem).
  255. Doc("Get a item.").
  256. Metadata(restfulspec.KeyOpenAPITags, []string{ItemsAPITag}).
  257. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  258. Param(ws.PathParameter("item-id", "ID of the item to get.").DataType("string")).
  259. Returns(http.StatusOK, "OK", data.Item{}).
  260. Writes(data.Item{}))
  261. // Insert items
  262. ws.Route(ws.POST("/items").To(s.insertItems).
  263. Doc("Insert items. Overwrite if items exist").
  264. Metadata(restfulspec.KeyOpenAPITags, []string{ItemsAPITag}).
  265. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  266. Reads([]data.Item{}).
  267. Returns(http.StatusOK, "OK", Success{}).
  268. Writes(Success{}))
  269. // Delete item
  270. ws.Route(ws.DELETE("/item/{item-id}").To(s.deleteItem).
  271. Doc("Delete an item and its feedback.").
  272. Metadata(restfulspec.KeyOpenAPITags, []string{ItemsAPITag}).
  273. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  274. Param(ws.PathParameter("item-id", "ID of the item to delete").DataType("string")).
  275. Returns(http.StatusOK, "OK", Success{}).
  276. Writes(Success{}))
  277. // Insert category
  278. ws.Route(ws.PUT("/item/{item-id}/category/{category}").To(s.insertItemCategory).
  279. Doc("Insert a category for a item.").
  280. Metadata(restfulspec.KeyOpenAPITags, []string{ItemsAPITag}).
  281. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  282. Param(ws.PathParameter("item-id", "ID of the item to insert category").DataType("string")).
  283. Param(ws.PathParameter("category", "Category to insert").DataType("string")).
  284. Returns(http.StatusOK, "OK", Success{}).
  285. Writes(Success{}))
  286. // Delete category
  287. ws.Route(ws.DELETE("/item/{item-id}/category/{category}").To(s.deleteItemCategory).
  288. Doc("Delete a category from a item.").
  289. Metadata(restfulspec.KeyOpenAPITags, []string{ItemsAPITag}).
  290. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  291. Param(ws.PathParameter("item-id", "ID of the item to delete categoryßßß").DataType("string")).
  292. Param(ws.PathParameter("category", "Category to delete").DataType("string")).
  293. Returns(http.StatusOK, "OK", Success{}).
  294. Writes(Success{}))
  295. // Insert feedback
  296. ws.Route(ws.POST("/feedback").To(s.insertFeedback(false)).
  297. Doc("Insert feedbacks. Ignore insertion if feedback exists.").
  298. Metadata(restfulspec.KeyOpenAPITags, []string{FeedbackAPITag}).
  299. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  300. Reads([]data.Feedback{}).
  301. Returns(http.StatusOK, "OK", Success{}).
  302. Writes(Success{}))
  303. ws.Route(ws.PUT("/feedback").To(s.insertFeedback(true)).
  304. Doc("Insert feedbacks. Existed feedback will be overwritten.").
  305. Metadata(restfulspec.KeyOpenAPITags, []string{FeedbackAPITag}).
  306. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  307. Reads([]data.Feedback{}).
  308. Returns(http.StatusOK, "OK", Success{}).
  309. Writes(Success{}))
  310. // Get feedback
  311. ws.Route(ws.GET("/feedback").To(s.getFeedback).
  312. Doc("Get feedbacks.").
  313. Metadata(restfulspec.KeyOpenAPITags, []string{FeedbackAPITag}).
  314. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  315. Param(ws.QueryParameter("cursor", "Cursor for the next page").DataType("string")).
  316. Param(ws.QueryParameter("n", "Number of returned feedback").DataType("integer")).
  317. Returns(http.StatusOK, "OK", FeedbackIterator{}).
  318. Writes(FeedbackIterator{}))
  319. ws.Route(ws.GET("/feedback/{user-id}/{item-id}").To(s.getUserItemFeedback).
  320. Doc("Get feedbacks between a user and a item.").
  321. Metadata(restfulspec.KeyOpenAPITags, []string{FeedbackAPITag}).
  322. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  323. Param(ws.PathParameter("user-id", "User ID of returned feedbacks").DataType("string")).
  324. Param(ws.PathParameter("item-id", "Item ID of returned feedbacks").DataType("string")).
  325. Returns(http.StatusOK, "OK", []data.Feedback{}).
  326. Writes([]data.Feedback{}))
  327. ws.Route(ws.DELETE("/feedback/{user-id}/{item-id}").To(s.deleteUserItemFeedback).
  328. Doc("Delete feedbacks between a user and a item.").
  329. Metadata(restfulspec.KeyOpenAPITags, []string{FeedbackAPITag}).
  330. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  331. Param(ws.PathParameter("user-id", "User ID of returned feedbacks").DataType("string")).
  332. Param(ws.PathParameter("item-id", "Item ID of returned feedbacks").DataType("string")).
  333. Returns(http.StatusOK, "OK", []data.Feedback{}).
  334. Writes([]data.Feedback{}))
  335. ws.Route(ws.GET("/feedback/{feedback-type}").To(s.getTypedFeedback).
  336. Doc("Get feedbacks with feedback type.").
  337. Metadata(restfulspec.KeyOpenAPITags, []string{FeedbackAPITag}).
  338. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  339. Param(ws.PathParameter("feedback-type", "Type of returned feedbacks").DataType("string")).
  340. Param(ws.QueryParameter("cursor", "Cursor for the next page").DataType("string")).
  341. Param(ws.QueryParameter("n", "Number of returned feedbacks").DataType("integer")).
  342. Returns(http.StatusOK, "OK", FeedbackIterator{}).
  343. Writes(FeedbackIterator{}))
  344. ws.Route(ws.GET("/feedback/{feedback-type}/{user-id}/{item-id}").To(s.getTypedUserItemFeedback).
  345. Doc("Get feedbacks between a user and a item with feedback type.").
  346. Metadata(restfulspec.KeyOpenAPITags, []string{FeedbackAPITag}).
  347. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  348. Param(ws.PathParameter("feedback-type", "Type of returned feedbacks").DataType("string")).
  349. Param(ws.PathParameter("user-id", "User ID of returned feedbacks").DataType("string")).
  350. Param(ws.PathParameter("item-id", "Item ID of returned feedbacks").DataType("string")).
  351. Returns(http.StatusOK, "OK", data.Feedback{}).
  352. Writes(data.Feedback{}))
  353. ws.Route(ws.DELETE("/feedback/{feedback-type}/{user-id}/{item-id}").To(s.deleteTypedUserItemFeedback).
  354. Doc("Delete feedbacks between a user and a item with feedback type.").
  355. Metadata(restfulspec.KeyOpenAPITags, []string{FeedbackAPITag}).
  356. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  357. Param(ws.PathParameter("feedback-type", "Type of returned feedbacks").DataType("string")).
  358. Param(ws.PathParameter("user-id", "User ID of returned feedbacks").DataType("string")).
  359. Param(ws.PathParameter("item-id", "Item ID of returned feedbacks").DataType("string")).
  360. Returns(http.StatusOK, "OK", data.Feedback{}).
  361. Writes(data.Feedback{}))
  362. // Get feedback by user id
  363. ws.Route(ws.GET("/user/{user-id}/feedback/{feedback-type}").To(s.getTypedFeedbackByUser).
  364. Doc("Get feedbacks by user id with feedback type.").
  365. Metadata(restfulspec.KeyOpenAPITags, []string{FeedbackAPITag}).
  366. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  367. Param(ws.PathParameter("user-id", "User ID of returned feedbacks").DataType("string")).
  368. Param(ws.PathParameter("feedback-type", "Type of returned feedbacks").DataType("string")).
  369. Returns(http.StatusOK, "OK", []data.Feedback{}).
  370. Writes([]data.Feedback{}))
  371. ws.Route(ws.GET("/user/{user-id}/feedback").To(s.getFeedbackByUser).
  372. Doc("Get feedbacks by user id.").
  373. Metadata(restfulspec.KeyOpenAPITags, []string{FeedbackAPITag}).
  374. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  375. Param(ws.PathParameter("user-id", "User ID of returned feedbacks").DataType("string")).
  376. Returns(http.StatusOK, "OK", []data.Feedback{}).
  377. Writes([]data.Feedback{}))
  378. // Get feedback by item-id
  379. ws.Route(ws.GET("/item/{item-id}/feedback/{feedback-type}").To(s.getTypedFeedbackByItem).
  380. Doc("Get feedbacks by item id with feedback type.").
  381. Metadata(restfulspec.KeyOpenAPITags, []string{FeedbackAPITag}).
  382. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  383. Param(ws.PathParameter("item-id", "Item ID of returned feedbacks").DataType("string")).
  384. Param(ws.PathParameter("feedback-type", "Type of returned feedbacks").DataType("string")).
  385. Returns(http.StatusOK, "OK", []data.Feedback{}).
  386. Writes([]data.Feedback{}))
  387. ws.Route(ws.GET("/item/{item-id}/feedback/").To(s.getFeedbackByItem).
  388. Doc("Get feedbacks by item id.").
  389. Metadata(restfulspec.KeyOpenAPITags, []string{FeedbackAPITag}).
  390. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  391. Param(ws.PathParameter("item-id", "Item ID of returned feedbacks").DataType("string")).
  392. Returns(http.StatusOK, "OK", []data.Feedback{}).
  393. Writes([]data.Feedback{}))
  394. // Get collaborative filtering recommendation by user id
  395. ws.Route(ws.GET("/intermediate/recommend/{user-id}").To(s.getCollaborative).
  396. Doc("Get the collaborative filtering recommendation for a user").
  397. Metadata(restfulspec.KeyOpenAPITags, []string{DetractedAPITag}).
  398. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  399. Param(ws.PathParameter("user-id", "ID of the user to get recommendation").DataType("string")).
  400. Param(ws.QueryParameter("n", "Number of returned items").DataType("integer")).
  401. Param(ws.QueryParameter("offset", "Offset of returned items").DataType("integer")).
  402. Returns(http.StatusOK, "OK", []cache.Document{}).
  403. Writes([]cache.Document{}))
  404. ws.Route(ws.GET("/intermediate/recommend/{user-id}/{category}").To(s.getCollaborative).
  405. Doc("Get the collaborative filtering recommendation for a user").
  406. Metadata(restfulspec.KeyOpenAPITags, []string{DetractedAPITag}).
  407. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  408. Param(ws.PathParameter("user-id", "ID of the user to get recommendation").DataType("string")).
  409. Param(ws.PathParameter("category", "Category of returned items.").DataType("string")).
  410. Param(ws.QueryParameter("n", "Number of returned items").DataType("integer")).
  411. Param(ws.QueryParameter("offset", "Offset of returned items").DataType("integer")).
  412. Returns(http.StatusOK, "OK", []cache.Document{}).
  413. Writes([]cache.Document{}))
  414. // Get popular items
  415. ws.Route(ws.GET("/popular").To(s.getPopular).
  416. Doc("Get popular items.").
  417. Metadata(restfulspec.KeyOpenAPITags, []string{RecommendationAPITag}).
  418. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  419. Param(ws.QueryParameter("n", "Number of returned recommendations").DataType("integer")).
  420. Param(ws.QueryParameter("offset", "Offset of returned recommendations").DataType("integer")).
  421. Param(ws.QueryParameter("user-id", "Remove read items of a user").DataType("string")).
  422. Returns(http.StatusOK, "OK", []cache.Document{}).
  423. Writes([]cache.Document{}))
  424. ws.Route(ws.GET("/popular/{category}").To(s.getPopular).
  425. Doc("Get popular items in category.").
  426. Metadata(restfulspec.KeyOpenAPITags, []string{RecommendationAPITag}).
  427. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  428. Param(ws.PathParameter("category", "Category of returned items.").DataType("string")).
  429. Param(ws.QueryParameter("n", "Number of returned items").DataType("integer")).
  430. Param(ws.QueryParameter("offset", "Offset of returned items").DataType("integer")).
  431. Param(ws.QueryParameter("user-id", "Remove read items of a user").DataType("string")).
  432. Returns(http.StatusOK, "OK", []cache.Document{}).
  433. Writes([]cache.Document{}))
  434. // Get latest items
  435. ws.Route(ws.GET("/latest").To(s.getLatest).
  436. Doc("Get the latest items.").
  437. Metadata(restfulspec.KeyOpenAPITags, []string{RecommendationAPITag}).
  438. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  439. Param(ws.QueryParameter("n", "Number of returned items").DataType("integer")).
  440. Param(ws.QueryParameter("offset", "Offset of returned items").DataType("integer")).
  441. Param(ws.QueryParameter("user-id", "Remove read items of a user").DataType("string")).
  442. Returns(http.StatusOK, "OK", []cache.Document{}).
  443. Writes([]cache.Document{}))
  444. ws.Route(ws.GET("/latest/{category}").To(s.getLatest).
  445. Doc("Get the latest items in category.").
  446. Metadata(restfulspec.KeyOpenAPITags, []string{RecommendationAPITag}).
  447. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  448. Param(ws.PathParameter("category", "Category of returned items.").DataType("string")).
  449. Param(ws.QueryParameter("n", "Number of returned items").DataType("integer")).
  450. Param(ws.QueryParameter("offset", "Offset of returned items").DataType("integer")).
  451. Param(ws.QueryParameter("user-id", "Remove read items of a user").DataType("string")).
  452. Returns(http.StatusOK, "OK", []cache.Document{}).
  453. Writes([]cache.Document{}))
  454. // Get neighbors
  455. ws.Route(ws.GET("/item/{item-id}/neighbors/").To(s.getItemNeighbors).
  456. Doc("Get neighbors of a item").
  457. Metadata(restfulspec.KeyOpenAPITags, []string{RecommendationAPITag}).
  458. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  459. Param(ws.PathParameter("item-id", "ID of the item to get neighbors").DataType("string")).
  460. Param(ws.QueryParameter("n", "Number of returned items").DataType("integer")).
  461. Param(ws.QueryParameter("offset", "Offset of returned items").DataType("integer")).
  462. Returns(http.StatusOK, "OK", []cache.Document{}).
  463. Writes([]cache.Document{}))
  464. ws.Route(ws.GET("/item/{item-id}/neighbors/{category}").To(s.getItemNeighbors).
  465. Doc("Get neighbors of a item in category.").
  466. Metadata(restfulspec.KeyOpenAPITags, []string{RecommendationAPITag}).
  467. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  468. Param(ws.PathParameter("item-id", "ID of the item to get neighbors").DataType("string")).
  469. Param(ws.PathParameter("category", "Category of returned items").DataType("string")).
  470. Param(ws.QueryParameter("n", "Number of returned items").DataType("integer")).
  471. Param(ws.QueryParameter("offset", "Offset of returned items").DataType("integer")).
  472. Returns(http.StatusOK, "OK", []cache.Document{}).
  473. Writes([]cache.Document{}))
  474. ws.Route(ws.GET("/user/{user-id}/neighbors/").To(s.getUserNeighbors).
  475. Doc("Get neighbors of a user.").
  476. Metadata(restfulspec.KeyOpenAPITags, []string{RecommendationAPITag}).
  477. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  478. Param(ws.PathParameter("user-id", "ID of the user to get neighbors").DataType("string")).
  479. Param(ws.QueryParameter("n", "Number of returned users").DataType("integer")).
  480. Param(ws.QueryParameter("offset", "Offset of returned users").DataType("integer")).
  481. Returns(http.StatusOK, "OK", []cache.Document{}).
  482. Writes([]cache.Document{}))
  483. ws.Route(ws.GET("/recommend/{user-id}").To(s.getRecommend).
  484. Doc("Get recommendation for user.").
  485. Metadata(restfulspec.KeyOpenAPITags, []string{RecommendationAPITag}).
  486. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  487. Param(ws.PathParameter("user-id", "ID of the user to get recommendation").DataType("string")).
  488. Param(ws.QueryParameter("category", "Category of the returned items (support multi-categories filtering)").DataType("string")).
  489. Param(ws.QueryParameter("write-back-type", "Type of write back feedback").DataType("string")).
  490. Param(ws.QueryParameter("write-back-delay", "Timestamp delay of write back feedback (format 0h0m0s)").DataType("string")).
  491. Param(ws.QueryParameter("n", "Number of returned items").DataType("integer")).
  492. Param(ws.QueryParameter("offset", "Offset of returned items").DataType("integer")).
  493. Returns(http.StatusOK, "OK", []string{}).
  494. Writes([]string{}))
  495. ws.Route(ws.GET("/recommend/{user-id}/{category}").To(s.getRecommend).
  496. Doc("Get recommendation for user.").
  497. Metadata(restfulspec.KeyOpenAPITags, []string{RecommendationAPITag}).
  498. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  499. Param(ws.PathParameter("user-id", "ID of the user to get recommendation").DataType("string")).
  500. Param(ws.PathParameter("category", "Category of the returned items").DataType("string")).
  501. Param(ws.QueryParameter("write-back-type", "Type of write back feedback").DataType("string")).
  502. Param(ws.QueryParameter("write-back-delay", "Timestamp delay of write back feedback (format 0h0m0s)").DataType("string")).
  503. Param(ws.QueryParameter("n", "Number of returned items").DataType("integer")).
  504. Param(ws.QueryParameter("offset", "Offset of returned items").DataType("integer")).
  505. Returns(http.StatusOK, "OK", []string{}).
  506. Writes([]string{}))
  507. ws.Route(ws.POST("/session/recommend").To(s.sessionRecommend).
  508. Doc("Get recommendation for session.").
  509. Metadata(restfulspec.KeyOpenAPITags, []string{RecommendationAPITag}).
  510. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  511. Param(ws.QueryParameter("n", "Number of returned items").DataType("integer")).
  512. Param(ws.QueryParameter("offset", "Offset of returned items").DataType("integer")).
  513. Reads([]Feedback{}).
  514. Returns(http.StatusOK, "OK", []cache.Document{}).
  515. Writes([]cache.Document{}))
  516. ws.Route(ws.POST("/session/recommend/{category}").To(s.sessionRecommend).
  517. Doc("Get recommendation for session.").
  518. Metadata(restfulspec.KeyOpenAPITags, []string{RecommendationAPITag}).
  519. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  520. Param(ws.PathParameter("category", "Category of the returned items").DataType("string")).
  521. Param(ws.QueryParameter("n", "Number of returned items").DataType("integer")).
  522. Param(ws.QueryParameter("offset", "Offset of returned items").DataType("integer")).
  523. Reads([]Feedback{}).
  524. Returns(http.StatusOK, "OK", []cache.Document{}).
  525. Writes([]cache.Document{}))
  526. ws.Route(ws.GET("/measurements/{name}").To(s.getMeasurements).
  527. Doc("Get measurements.").
  528. Metadata(restfulspec.KeyOpenAPITags, []string{MeasurementsAPITag}).
  529. Param(ws.HeaderParameter("X-API-Key", "API key").DataType("string")).
  530. Param(ws.PathParameter("name", "Name of returned measurements").DataType("string")).
  531. Param(ws.QueryParameter("n", "Number of returned measurements").DataType("integer")).
  532. Returns(http.StatusOK, "OK", []cache.TimeSeriesPoint{}).
  533. Writes([]cache.TimeSeriesPoint{}))
  534. }
  535. // ParseInt parses integers from the query parameter.
  536. func ParseInt(request *restful.Request, name string, fallback int) (value int, err error) {
  537. valueString := request.QueryParameter(name)
  538. value, err = strconv.Atoi(valueString)
  539. if err != nil && valueString == "" {
  540. value = fallback
  541. err = nil
  542. }
  543. return
  544. }
  545. // ParseDuration parses duration from the query parameter.
  546. func ParseDuration(request *restful.Request, name string) (time.Duration, error) {
  547. valueString := request.QueryParameter(name)
  548. if valueString == "" {
  549. return 0, nil
  550. }
  551. return time.ParseDuration(valueString)
  552. }
  553. func (s *RestServer) searchDocuments(collection, subset, category string, isItem bool, request *restful.Request, response *restful.Response) {
  554. var (
  555. ctx = request.Request.Context()
  556. n int
  557. offset int
  558. userId string
  559. err error
  560. )
  561. // parse arguments
  562. if offset, err = ParseInt(request, "offset", 0); err != nil {
  563. BadRequest(response, err)
  564. return
  565. }
  566. if n, err = ParseInt(request, "n", s.Config.Server.DefaultN); err != nil {
  567. BadRequest(response, err)
  568. return
  569. }
  570. userId = request.QueryParameter("user-id")
  571. readItems := mapset.NewSet[string]()
  572. if userId != "" {
  573. feedback, err := s.DataClient.GetUserFeedback(ctx, userId, s.Config.Now())
  574. if err != nil {
  575. InternalServerError(response, err)
  576. return
  577. }
  578. for _, f := range feedback {
  579. readItems.Add(f.ItemId)
  580. }
  581. }
  582. end := offset + n
  583. if end > 0 && readItems.Cardinality() > 0 {
  584. end += readItems.Cardinality()
  585. }
  586. // Get the sorted list
  587. items, err := s.CacheClient.SearchDocuments(ctx, collection, subset, []string{category}, offset, end)
  588. if err != nil {
  589. InternalServerError(response, err)
  590. return
  591. }
  592. // Remove read items
  593. if userId != "" {
  594. prunedItems := make([]cache.Document, 0, len(items))
  595. for _, item := range items {
  596. if !readItems.Contains(item.Id) {
  597. prunedItems = append(prunedItems, item)
  598. }
  599. }
  600. items = prunedItems
  601. }
  602. // Send result
  603. if n > 0 && len(items) > n {
  604. items = items[:n]
  605. }
  606. Ok(response, items)
  607. }
  608. func (s *RestServer) getPopular(request *restful.Request, response *restful.Response) {
  609. category := request.PathParameter("category")
  610. log.ResponseLogger(response).Debug("get category popular items in category", zap.String("category", category))
  611. s.searchDocuments(cache.PopularItems, "", category, true, request, response)
  612. }
  613. func (s *RestServer) getLatest(request *restful.Request, response *restful.Response) {
  614. category := request.PathParameter("category")
  615. log.ResponseLogger(response).Debug("get category latest items in category", zap.String("category", category))
  616. s.searchDocuments(cache.LatestItems, "", category, true, request, response)
  617. }
  618. // get feedback by item-id with feedback type
  619. func (s *RestServer) getTypedFeedbackByItem(request *restful.Request, response *restful.Response) {
  620. ctx := context.Background()
  621. if request != nil && request.Request != nil {
  622. ctx = request.Request.Context()
  623. }
  624. feedbackType := request.PathParameter("feedback-type")
  625. itemId := request.PathParameter("item-id")
  626. feedback, err := s.DataClient.GetItemFeedback(ctx, itemId, feedbackType)
  627. if err != nil {
  628. InternalServerError(response, err)
  629. return
  630. }
  631. Ok(response, feedback)
  632. }
  633. // get feedback by item-id
  634. func (s *RestServer) getFeedbackByItem(request *restful.Request, response *restful.Response) {
  635. ctx := context.Background()
  636. if request != nil && request.Request != nil {
  637. ctx = request.Request.Context()
  638. }
  639. itemId := request.PathParameter("item-id")
  640. feedback, err := s.DataClient.GetItemFeedback(ctx, itemId)
  641. if err != nil {
  642. InternalServerError(response, err)
  643. return
  644. }
  645. Ok(response, feedback)
  646. }
  647. // getItemNeighbors gets neighbors of a item from database.
  648. func (s *RestServer) getItemNeighbors(request *restful.Request, response *restful.Response) {
  649. // Get item id
  650. itemId := request.PathParameter("item-id")
  651. category := request.PathParameter("category")
  652. s.searchDocuments(cache.ItemNeighbors, itemId, category, true, request, response)
  653. }
  654. // getUserNeighbors gets neighbors of a user from database.
  655. func (s *RestServer) getUserNeighbors(request *restful.Request, response *restful.Response) {
  656. // Get item id
  657. userId := request.PathParameter("user-id")
  658. s.searchDocuments(cache.UserNeighbors, userId, "", false, request, response)
  659. }
  660. // getCollaborative gets cached recommended items from database.
  661. func (s *RestServer) getCollaborative(request *restful.Request, response *restful.Response) {
  662. // Get user id
  663. userId := request.PathParameter("user-id")
  664. category := request.PathParameter("category")
  665. s.searchDocuments(cache.OfflineRecommend, userId, category, true, request, response)
  666. }
  667. // Recommend items to users.
  668. // 1. If there are recommendations in cache, return cached recommendations.
  669. // 2. If there are historical interactions of the users, return similar items.
  670. // 3. Otherwise, return fallback recommendation (popular/latest).
  671. func (s *RestServer) Recommend(ctx context.Context, response *restful.Response, userId string, categories []string, n int, recommenders ...Recommender) ([]string, error) {
  672. initStart := time.Now()
  673. // create context
  674. recommendCtx, err := s.createRecommendContext(ctx, userId, categories, n)
  675. if err != nil {
  676. return nil, errors.Trace(err)
  677. }
  678. // execute recommenders
  679. for _, recommender := range recommenders {
  680. err = recommender(recommendCtx)
  681. if err != nil {
  682. return nil, errors.Trace(err)
  683. }
  684. }
  685. // return recommendations
  686. if len(recommendCtx.results) > n {
  687. recommendCtx.results = recommendCtx.results[:n]
  688. }
  689. totalTime := time.Since(initStart)
  690. log.ResponseLogger(response).Info("complete recommendation",
  691. zap.Int("num_from_final", recommendCtx.numFromOffline),
  692. zap.Int("num_from_collaborative", recommendCtx.numFromCollaborative),
  693. zap.Int("num_from_item_based", recommendCtx.numFromItemBased),
  694. zap.Int("num_from_user_based", recommendCtx.numFromUserBased),
  695. zap.Int("num_from_latest", recommendCtx.numFromLatest),
  696. zap.Int("num_from_poplar", recommendCtx.numFromPopular),
  697. zap.Duration("total_time", totalTime),
  698. zap.Duration("load_final_recommend_time", recommendCtx.loadOfflineRecTime),
  699. zap.Duration("load_col_recommend_time", recommendCtx.loadColRecTime),
  700. zap.Duration("load_hist_time", recommendCtx.loadLoadHistTime),
  701. zap.Duration("item_based_recommend_time", recommendCtx.itemBasedTime),
  702. zap.Duration("user_based_recommend_time", recommendCtx.userBasedTime),
  703. zap.Duration("load_latest_time", recommendCtx.loadLatestTime),
  704. zap.Duration("load_popular_time", recommendCtx.loadPopularTime))
  705. return recommendCtx.results, nil
  706. }
  707. type recommendContext struct {
  708. context context.Context
  709. userId string
  710. categories []string
  711. userFeedback []data.Feedback
  712. n int
  713. results []string
  714. excludeSet mapset.Set[string]
  715. numPrevStage int
  716. numFromLatest int
  717. numFromPopular int
  718. numFromUserBased int
  719. numFromItemBased int
  720. numFromCollaborative int
  721. numFromOffline int
  722. loadOfflineRecTime time.Duration
  723. loadColRecTime time.Duration
  724. loadLoadHistTime time.Duration
  725. itemBasedTime time.Duration
  726. userBasedTime time.Duration
  727. loadLatestTime time.Duration
  728. loadPopularTime time.Duration
  729. }
  730. func (s *RestServer) createRecommendContext(ctx context.Context, userId string, categories []string, n int) (*recommendContext, error) {
  731. // pull historical feedback
  732. userFeedback, err := s.DataClient.GetUserFeedback(ctx, userId, s.Config.Now())
  733. if err != nil {
  734. return nil, errors.Trace(err)
  735. }
  736. excludeSet := mapset.NewSet[string]()
  737. for _, item := range userFeedback {
  738. if !s.Config.Recommend.Replacement.EnableReplacement {
  739. excludeSet.Add(item.ItemId)
  740. }
  741. }
  742. return &recommendContext{
  743. userId: userId,
  744. categories: categories,
  745. n: n,
  746. excludeSet: excludeSet,
  747. userFeedback: userFeedback,
  748. context: ctx,
  749. }, nil
  750. }
  751. type Recommender func(ctx *recommendContext) error
  752. func (s *RestServer) RecommendOffline(ctx *recommendContext) error {
  753. if len(ctx.results) < ctx.n {
  754. start := time.Now()
  755. recommendation, err := s.CacheClient.SearchDocuments(ctx.context, cache.OfflineRecommend, ctx.userId, ctx.categories, 0, s.Config.Recommend.CacheSize)
  756. if err != nil {
  757. return errors.Trace(err)
  758. }
  759. for _, item := range recommendation {
  760. if !ctx.excludeSet.Contains(item.Id) {
  761. ctx.results = append(ctx.results, item.Id)
  762. ctx.excludeSet.Add(item.Id)
  763. }
  764. }
  765. ctx.loadOfflineRecTime = time.Since(start)
  766. ctx.numFromOffline = len(ctx.results) - ctx.numPrevStage
  767. ctx.numPrevStage = len(ctx.results)
  768. }
  769. return nil
  770. }
  771. func (s *RestServer) RecommendCollaborative(ctx *recommendContext) error {
  772. if len(ctx.results) < ctx.n {
  773. start := time.Now()
  774. collaborativeRecommendation, err := s.CacheClient.SearchDocuments(ctx.context, cache.CollaborativeRecommend, ctx.userId, ctx.categories, 0, s.Config.Recommend.CacheSize)
  775. if err != nil {
  776. return errors.Trace(err)
  777. }
  778. for _, item := range collaborativeRecommendation {
  779. if !ctx.excludeSet.Contains(item.Id) {
  780. ctx.results = append(ctx.results, item.Id)
  781. ctx.excludeSet.Add(item.Id)
  782. }
  783. }
  784. ctx.loadColRecTime = time.Since(start)
  785. ctx.numFromCollaborative = len(ctx.results) - ctx.numPrevStage
  786. ctx.numPrevStage = len(ctx.results)
  787. }
  788. return nil
  789. }
  790. func (s *RestServer) RecommendUserBased(ctx *recommendContext) error {
  791. if len(ctx.results) < ctx.n {
  792. start := time.Now()
  793. candidates := make(map[string]float64)
  794. // load similar users
  795. similarUsers, err := s.CacheClient.SearchDocuments(ctx.context, cache.UserNeighbors, ctx.userId, []string{""}, 0, s.Config.Recommend.CacheSize)
  796. if err != nil {
  797. return errors.Trace(err)
  798. }
  799. for _, user := range similarUsers {
  800. // load historical feedback
  801. feedbacks, err := s.DataClient.GetUserFeedback(ctx.context, user.Id, s.Config.Now(), s.Config.Recommend.DataSource.PositiveFeedbackTypes...)
  802. if err != nil {
  803. return errors.Trace(err)
  804. }
  805. // add unseen items
  806. for _, feedback := range feedbacks {
  807. if !ctx.excludeSet.Contains(feedback.ItemId) {
  808. item, err := s.DataClient.GetItem(ctx.context, feedback.ItemId)
  809. if err != nil {
  810. return errors.Trace(err)
  811. }
  812. if funk.Equal(ctx.categories, []string{""}) || funk.Subset(ctx.categories, item.Categories) {
  813. candidates[feedback.ItemId] += user.Score
  814. }
  815. }
  816. }
  817. }
  818. // collect top k
  819. k := ctx.n - len(ctx.results)
  820. filter := heap.NewTopKFilter[string, float64](k)
  821. for id, score := range candidates {
  822. filter.Push(id, score)
  823. }
  824. ids, _ := filter.PopAll()
  825. ctx.results = append(ctx.results, ids...)
  826. ctx.excludeSet.Append(ids...)
  827. ctx.userBasedTime = time.Since(start)
  828. ctx.numFromUserBased = len(ctx.results) - ctx.numPrevStage
  829. ctx.numPrevStage = len(ctx.results)
  830. }
  831. return nil
  832. }
  833. func (s *RestServer) RecommendItemBased(ctx *recommendContext) error {
  834. if len(ctx.results) < ctx.n {
  835. start := time.Now()
  836. // truncate user feedback
  837. data.SortFeedbacks(ctx.userFeedback)
  838. userFeedback := make([]data.Feedback, 0, s.Config.Recommend.Online.NumFeedbackFallbackItemBased)
  839. for _, feedback := range ctx.userFeedback {
  840. if s.Config.Recommend.Online.NumFeedbackFallbackItemBased <= len(userFeedback) {
  841. break
  842. }
  843. if funk.ContainsString(s.Config.Recommend.DataSource.PositiveFeedbackTypes, feedback.FeedbackType) {
  844. userFeedback = append(userFeedback, feedback)
  845. }
  846. }
  847. // collect candidates
  848. candidates := make(map[string]float64)
  849. for _, feedback := range userFeedback {
  850. // load similar items
  851. similarItems, err := s.CacheClient.SearchDocuments(ctx.context, cache.ItemNeighbors, feedback.ItemId, ctx.categories, 0, s.Config.Recommend.CacheSize)
  852. if err != nil {
  853. return errors.Trace(err)
  854. }
  855. // add unseen items
  856. for _, item := range similarItems {
  857. if !ctx.excludeSet.Contains(item.Id) {
  858. candidates[item.Id] += item.Score
  859. }
  860. }
  861. }
  862. // collect top k
  863. k := ctx.n - len(ctx.results)
  864. filter := heap.NewTopKFilter[string, float64](k)
  865. for id, score := range candidates {
  866. filter.Push(id, score)
  867. }
  868. ids, _ := filter.PopAll()
  869. ctx.results = append(ctx.results, ids...)
  870. ctx.excludeSet.Append(ids...)
  871. ctx.itemBasedTime = time.Since(start)
  872. ctx.numFromItemBased = len(ctx.results) - ctx.numPrevStage
  873. ctx.numPrevStage = len(ctx.results)
  874. }
  875. return nil
  876. }
  877. func (s *RestServer) RecommendLatest(ctx *recommendContext) error {
  878. if len(ctx.results) < ctx.n {
  879. start := time.Now()
  880. items, err := s.CacheClient.SearchDocuments(ctx.context, cache.LatestItems, "", ctx.categories, 0, s.Config.Recommend.CacheSize)
  881. if err != nil {
  882. return errors.Trace(err)
  883. }
  884. for _, item := range items {
  885. if !ctx.excludeSet.Contains(item.Id) {
  886. ctx.results = append(ctx.results, item.Id)
  887. ctx.excludeSet.Add(item.Id)
  888. }
  889. }
  890. ctx.loadLatestTime = time.Since(start)
  891. ctx.numFromLatest = len(ctx.results) - ctx.numPrevStage
  892. ctx.numPrevStage = len(ctx.results)
  893. }
  894. return nil
  895. }
  896. func (s *RestServer) RecommendPopular(ctx *recommendContext) error {
  897. if len(ctx.results) < ctx.n {
  898. start := time.Now()
  899. items, err := s.CacheClient.SearchDocuments(ctx.context, cache.PopularItems, "", ctx.categories, 0, s.Config.Recommend.CacheSize)
  900. if err != nil {
  901. return errors.Trace(err)
  902. }
  903. for _, item := range items {
  904. if !ctx.excludeSet.Contains(item.Id) {
  905. ctx.results = append(ctx.results, item.Id)
  906. ctx.excludeSet.Add(item.Id)
  907. }
  908. }
  909. ctx.loadPopularTime = time.Since(start)
  910. ctx.numFromPopular = len(ctx.results) - ctx.numPrevStage
  911. ctx.numPrevStage = len(ctx.results)
  912. }
  913. return nil
  914. }
  915. func (s *RestServer) getRecommend(request *restful.Request, response *restful.Response) {
  916. ctx := context.Background()
  917. if request != nil && request.Request != nil {
  918. ctx = request.Request.Context()
  919. }
  920. // parse arguments
  921. userId := request.PathParameter("user-id")
  922. n, err := ParseInt(request, "n", s.Config.Server.DefaultN)
  923. if err != nil {
  924. BadRequest(response, err)
  925. return
  926. }
  927. categories := request.QueryParameters("category")
  928. if len(categories) == 0 {
  929. categories = []string{request.PathParameter("category")}
  930. }
  931. offset, err := ParseInt(request, "offset", 0)
  932. if err != nil {
  933. BadRequest(response, err)
  934. return
  935. }
  936. writeBackFeedback := request.QueryParameter("write-back-type")
  937. writeBackDelay, err := ParseDuration(request, "write-back-delay")
  938. if err != nil {
  939. BadRequest(response, err)
  940. return
  941. }
  942. // online recommendation
  943. recommenders := []Recommender{s.RecommendOffline}
  944. for _, recommender := range s.Config.Recommend.Online.FallbackRecommend {
  945. switch recommender {
  946. case "collaborative":
  947. recommenders = append(recommenders, s.RecommendCollaborative)
  948. case "item_based":
  949. recommenders = append(recommenders, s.RecommendItemBased)
  950. case "user_based":
  951. recommenders = append(recommenders, s.RecommendUserBased)
  952. case "latest":
  953. recommenders = append(recommenders, s.RecommendLatest)
  954. case "popular":
  955. recommenders = append(recommenders, s.RecommendPopular)
  956. default:
  957. InternalServerError(response, fmt.Errorf("unknown fallback recommendation method `%s`", recommender))
  958. return
  959. }
  960. }
  961. results, err := s.Recommend(ctx, response, userId, categories, offset+n, recommenders...)
  962. if err != nil {
  963. InternalServerError(response, err)
  964. return
  965. }
  966. results = results[mathutil.Min(offset, len(results)):]
  967. // write back
  968. if writeBackFeedback != "" {
  969. startTime := time.Now()
  970. for _, itemId := range results {
  971. // insert to data store
  972. feedback := data.Feedback{
  973. FeedbackKey: data.FeedbackKey{
  974. UserId: userId,
  975. ItemId: itemId,
  976. FeedbackType: writeBackFeedback,
  977. },
  978. Timestamp: startTime.Add(writeBackDelay),
  979. }
  980. err = s.DataClient.BatchInsertFeedback(ctx, []data.Feedback{feedback}, false, false, false)
  981. if err != nil {
  982. InternalServerError(response, err)
  983. return
  984. }
  985. }
  986. }
  987. // Send result
  988. Ok(response, results)
  989. }
  990. func (s *RestServer) sessionRecommend(request *restful.Request, response *restful.Response) {
  991. ctx := context.Background()
  992. if request != nil && request.Request != nil {
  993. ctx = request.Request.Context()
  994. }
  995. // parse arguments
  996. var feedbacks []Feedback
  997. if err := request.ReadEntity(&feedbacks); err != nil {
  998. BadRequest(response, err)
  999. return
  1000. }
  1001. n, err := ParseInt(request, "n", s.Config.Server.DefaultN)
  1002. if err != nil {
  1003. BadRequest(response, err)
  1004. return
  1005. }
  1006. category := request.PathParameter("category")
  1007. offset, err := ParseInt(request, "offset", 0)
  1008. if err != nil {
  1009. BadRequest(response, err)
  1010. return
  1011. }
  1012. // pre-process feedback
  1013. dataFeedback := make([]data.Feedback, len(feedbacks))
  1014. for i := range dataFeedback {
  1015. var err error
  1016. dataFeedback[i], err = feedbacks[i].ToDataFeedback()
  1017. if err != nil {
  1018. BadRequest(response, err)
  1019. return
  1020. }
  1021. }
  1022. data.SortFeedbacks(dataFeedback)
  1023. // item-based recommendation
  1024. var excludeSet = mapset.NewSet[string]()
  1025. var userFeedback []data.Feedback
  1026. for _, feedback := range dataFeedback {
  1027. excludeSet.Add(feedback.ItemId)
  1028. if funk.ContainsString(s.Config.Recommend.DataSource.PositiveFeedbackTypes, feedback.FeedbackType) {
  1029. userFeedback = append(userFeedback, feedback)
  1030. }
  1031. }
  1032. // collect candidates
  1033. candidates := make(map[string]float64)
  1034. usedFeedbackCount := 0
  1035. for _, feedback := range userFeedback {
  1036. // load similar items
  1037. similarItems, err := s.CacheClient.SearchDocuments(ctx, cache.ItemNeighbors, feedback.ItemId, []string{category}, 0, s.Config.Recommend.CacheSize)
  1038. if err != nil {
  1039. BadRequest(response, err)
  1040. return
  1041. }
  1042. // add unseen items
  1043. // similarItems = s.FilterOutHiddenScores(response, similarItems, "")
  1044. for _, item := range similarItems {
  1045. if !excludeSet.Contains(item.Id) {
  1046. candidates[item.Id] += item.Score
  1047. }
  1048. }
  1049. // finish recommendation if the number of used feedbacks is enough
  1050. if len(similarItems) > 0 {
  1051. usedFeedbackCount++
  1052. if usedFeedbackCount >= s.Config.Recommend.Online.NumFeedbackFallbackItemBased {
  1053. break
  1054. }
  1055. }
  1056. }
  1057. // collect top k
  1058. filter := heap.NewTopKFilter[string, float64](n + offset)
  1059. for id, score := range candidates {
  1060. filter.Push(id, score)
  1061. }
  1062. names, scores := filter.PopAll()
  1063. result := lo.Map(names, func(_ string, i int) cache.Document {
  1064. return cache.Document{
  1065. Id: names[i],
  1066. Score: scores[i],
  1067. }
  1068. })
  1069. if len(result) > offset {
  1070. result = result[offset:]
  1071. } else {
  1072. result = nil
  1073. }
  1074. result = result[:lo.Min([]int{len(result), n})]
  1075. // Send result
  1076. Ok(response, result)
  1077. }
  1078. // Success is the returned data structure for data insert operations.
  1079. type Success struct {
  1080. RowAffected int
  1081. }
  1082. func (s *RestServer) insertUser(request *restful.Request, response *restful.Response) {
  1083. ctx := context.Background()
  1084. if request != nil && request.Request != nil {
  1085. ctx = request.Request.Context()
  1086. }
  1087. temp := data.User{}
  1088. // get userInfo from request and put into temp
  1089. if err := request.ReadEntity(&temp); err != nil {
  1090. BadRequest(response, err)
  1091. return
  1092. }
  1093. // validate labels
  1094. if err := data.ValidateLabels(temp.Labels); err != nil {
  1095. BadRequest(response, err)
  1096. return
  1097. }
  1098. if err := s.DataClient.BatchInsertUsers(ctx, []data.User{temp}); err != nil {
  1099. InternalServerError(response, err)
  1100. return
  1101. }
  1102. // insert modify timestamp
  1103. if err := s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyUserTime, temp.UserId), time.Now())); err != nil {
  1104. InternalServerError(response, err)
  1105. return
  1106. }
  1107. Ok(response, Success{RowAffected: 1})
  1108. }
  1109. func (s *RestServer) modifyUser(request *restful.Request, response *restful.Response) {
  1110. ctx := context.Background()
  1111. if request != nil && request.Request != nil {
  1112. ctx = request.Request.Context()
  1113. }
  1114. // get user id
  1115. userId := request.PathParameter("user-id")
  1116. // modify user
  1117. var patch data.UserPatch
  1118. if err := request.ReadEntity(&patch); err != nil {
  1119. BadRequest(response, err)
  1120. return
  1121. }
  1122. // validate labels
  1123. if err := data.ValidateLabels(patch.Labels); err != nil {
  1124. BadRequest(response, err)
  1125. return
  1126. }
  1127. if err := s.DataClient.ModifyUser(ctx, userId, patch); err != nil {
  1128. InternalServerError(response, err)
  1129. return
  1130. }
  1131. // insert modify timestamp
  1132. if err := s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyUserTime, userId), time.Now())); err != nil {
  1133. return
  1134. }
  1135. Ok(response, Success{RowAffected: 1})
  1136. }
  1137. func (s *RestServer) getUser(request *restful.Request, response *restful.Response) {
  1138. ctx := context.Background()
  1139. if request != nil && request.Request != nil {
  1140. ctx = request.Request.Context()
  1141. }
  1142. // get user id
  1143. userId := request.PathParameter("user-id")
  1144. // get user
  1145. user, err := s.DataClient.GetUser(ctx, userId)
  1146. if err != nil {
  1147. if errors.Is(err, errors.NotFound) {
  1148. PageNotFound(response, err)
  1149. } else {
  1150. InternalServerError(response, err)
  1151. }
  1152. return
  1153. }
  1154. Ok(response, user)
  1155. }
  1156. func (s *RestServer) insertUsers(request *restful.Request, response *restful.Response) {
  1157. ctx := context.Background()
  1158. if request != nil && request.Request != nil {
  1159. ctx = request.Request.Context()
  1160. }
  1161. var temp []data.User
  1162. // get param from request and put into temp
  1163. if err := request.ReadEntity(&temp); err != nil {
  1164. BadRequest(response, err)
  1165. return
  1166. }
  1167. // validate labels
  1168. for _, user := range temp {
  1169. if err := data.ValidateLabels(user.Labels); err != nil {
  1170. BadRequest(response, err)
  1171. return
  1172. }
  1173. }
  1174. // range temp and achieve user
  1175. if err := s.DataClient.BatchInsertUsers(ctx, temp); err != nil {
  1176. InternalServerError(response, err)
  1177. return
  1178. }
  1179. // insert modify timestamp
  1180. values := make([]cache.Value, len(temp))
  1181. for i, user := range temp {
  1182. values[i] = cache.Time(cache.Key(cache.LastModifyUserTime, user.UserId), time.Now())
  1183. }
  1184. if err := s.CacheClient.Set(ctx, values...); err != nil {
  1185. InternalServerError(response, err)
  1186. return
  1187. }
  1188. Ok(response, Success{RowAffected: len(temp)})
  1189. }
  1190. type UserIterator struct {
  1191. Cursor string
  1192. Users []data.User
  1193. }
  1194. func (s *RestServer) getUsers(request *restful.Request, response *restful.Response) {
  1195. ctx := context.Background()
  1196. if request != nil && request.Request != nil {
  1197. ctx = request.Request.Context()
  1198. }
  1199. cursor := request.QueryParameter("cursor")
  1200. n, err := ParseInt(request, "n", s.Config.Server.DefaultN)
  1201. if err != nil {
  1202. BadRequest(response, err)
  1203. return
  1204. }
  1205. // get all users
  1206. cursor, users, err := s.DataClient.GetUsers(ctx, cursor, n)
  1207. if err != nil {
  1208. InternalServerError(response, err)
  1209. return
  1210. }
  1211. Ok(response, UserIterator{Cursor: cursor, Users: users})
  1212. }
  1213. // delete a user by user-id
  1214. func (s *RestServer) deleteUser(request *restful.Request, response *restful.Response) {
  1215. ctx := context.Background()
  1216. if request != nil && request.Request != nil {
  1217. ctx = request.Request.Context()
  1218. }
  1219. // get user-id and put into temp
  1220. userId := request.PathParameter("user-id")
  1221. if err := s.DataClient.DeleteUser(ctx, userId); err != nil {
  1222. InternalServerError(response, err)
  1223. return
  1224. }
  1225. Ok(response, Success{RowAffected: 1})
  1226. }
  1227. // get feedback by user-id with feedback type
  1228. func (s *RestServer) getTypedFeedbackByUser(request *restful.Request, response *restful.Response) {
  1229. ctx := context.Background()
  1230. if request != nil && request.Request != nil {
  1231. ctx = request.Request.Context()
  1232. }
  1233. feedbackType := request.PathParameter("feedback-type")
  1234. userId := request.PathParameter("user-id")
  1235. feedback, err := s.DataClient.GetUserFeedback(ctx, userId, s.Config.Now(), feedbackType)
  1236. if err != nil {
  1237. InternalServerError(response, err)
  1238. return
  1239. }
  1240. Ok(response, feedback)
  1241. }
  1242. // get feedback by user-id
  1243. func (s *RestServer) getFeedbackByUser(request *restful.Request, response *restful.Response) {
  1244. ctx := context.Background()
  1245. if request != nil && request.Request != nil {
  1246. ctx = request.Request.Context()
  1247. }
  1248. userId := request.PathParameter("user-id")
  1249. feedback, err := s.DataClient.GetUserFeedback(ctx, userId, s.Config.Now())
  1250. if err != nil {
  1251. InternalServerError(response, err)
  1252. return
  1253. }
  1254. Ok(response, feedback)
  1255. }
  1256. // Item is the data structure for the item but stores the timestamp using string.
  1257. type Item struct {
  1258. ItemId string
  1259. IsHidden bool
  1260. Categories []string
  1261. Timestamp string
  1262. Labels any
  1263. Comment string
  1264. }
  1265. func (s *RestServer) batchInsertItems(ctx context.Context, response *restful.Response, temp []Item) {
  1266. var (
  1267. count int
  1268. items = make([]data.Item, 0, len(temp))
  1269. // popularScore = lo.Map(temp, func(item Item, i int) float64 {
  1270. // return s.PopularItemsCache.GetSortedScore(item.ItemId)
  1271. // })
  1272. loadExistedItemsTime time.Duration
  1273. parseTimesatmpTime time.Duration
  1274. insertItemsTime time.Duration
  1275. insertCacheTime time.Duration
  1276. )
  1277. // load existed items
  1278. start := time.Now()
  1279. existedItems, err := s.DataClient.BatchGetItems(ctx, lo.Map(temp, func(t Item, i int) string {
  1280. return t.ItemId
  1281. }))
  1282. if err != nil {
  1283. InternalServerError(response, err)
  1284. return
  1285. }
  1286. existedItemsSet := make(map[string]data.Item)
  1287. for _, item := range existedItems {
  1288. existedItemsSet[item.ItemId] = item
  1289. }
  1290. loadExistedItemsTime = time.Since(start)
  1291. start = time.Now()
  1292. for _, item := range temp {
  1293. // parse datetime
  1294. var timestamp time.Time
  1295. var err error
  1296. if item.Timestamp != "" {
  1297. if timestamp, err = dateparse.ParseAny(item.Timestamp); err != nil {
  1298. BadRequest(response, err)
  1299. return
  1300. }
  1301. }
  1302. items = append(items, data.Item{
  1303. ItemId: item.ItemId,
  1304. IsHidden: item.IsHidden,
  1305. Categories: item.Categories,
  1306. Timestamp: timestamp,
  1307. Labels: item.Labels,
  1308. Comment: item.Comment,
  1309. })
  1310. // insert to latest items cache
  1311. if err = s.CacheClient.AddDocuments(ctx, cache.LatestItems, "", []cache.Document{{
  1312. Id: item.ItemId,
  1313. Score: float64(timestamp.Unix()),
  1314. Categories: withWildCard(item.Categories),
  1315. Timestamp: time.Now(),
  1316. }}); err != nil {
  1317. InternalServerError(response, err)
  1318. return
  1319. }
  1320. // update items cache
  1321. if err = s.CacheClient.UpdateDocuments(ctx, cache.ItemCache, item.ItemId, cache.DocumentPatch{
  1322. Categories: withWildCard(item.Categories),
  1323. IsHidden: &item.IsHidden,
  1324. }); err != nil {
  1325. InternalServerError(response, err)
  1326. return
  1327. }
  1328. count++
  1329. }
  1330. parseTimesatmpTime = time.Since(start)
  1331. // insert items
  1332. start = time.Now()
  1333. if err = s.DataClient.BatchInsertItems(ctx, items); err != nil {
  1334. InternalServerError(response, err)
  1335. return
  1336. }
  1337. insertItemsTime = time.Since(start)
  1338. // insert modify timestamp
  1339. start = time.Now()
  1340. categories := mapset.NewSet[string]()
  1341. values := make([]cache.Value, len(items))
  1342. for i, item := range items {
  1343. values[i] = cache.Time(cache.Key(cache.LastModifyItemTime, item.ItemId), time.Now())
  1344. categories.Append(item.Categories...)
  1345. }
  1346. if err = s.CacheClient.Set(ctx, values...); err != nil {
  1347. InternalServerError(response, err)
  1348. return
  1349. }
  1350. // insert categories
  1351. if err = s.CacheClient.AddSet(ctx, cache.ItemCategories, categories.ToSlice()...); err != nil {
  1352. InternalServerError(response, err)
  1353. return
  1354. }
  1355. insertCacheTime = time.Since(start)
  1356. log.ResponseLogger(response).Info("batch insert items",
  1357. zap.Duration("load_existed_items_time", loadExistedItemsTime),
  1358. zap.Duration("parse_timestamp_time", parseTimesatmpTime),
  1359. zap.Duration("insert_items_time", insertItemsTime),
  1360. zap.Duration("insert_cache_time", insertCacheTime))
  1361. Ok(response, Success{RowAffected: count})
  1362. }
  1363. func (s *RestServer) insertItems(request *restful.Request, response *restful.Response) {
  1364. ctx := context.Background()
  1365. if request != nil && request.Request != nil {
  1366. ctx = request.Request.Context()
  1367. }
  1368. var items []Item
  1369. if err := request.ReadEntity(&items); err != nil {
  1370. BadRequest(response, err)
  1371. return
  1372. }
  1373. // validate labels
  1374. for _, user := range items {
  1375. if err := data.ValidateLabels(user.Labels); err != nil {
  1376. BadRequest(response, err)
  1377. return
  1378. }
  1379. }
  1380. // Insert items
  1381. s.batchInsertItems(ctx, response, items)
  1382. }
  1383. func (s *RestServer) insertItem(request *restful.Request, response *restful.Response) {
  1384. ctx := context.Background()
  1385. if request != nil && request.Request != nil {
  1386. ctx = request.Request.Context()
  1387. }
  1388. var item Item
  1389. var err error
  1390. if err = request.ReadEntity(&item); err != nil {
  1391. BadRequest(response, err)
  1392. return
  1393. }
  1394. // validate labels
  1395. if err := data.ValidateLabels(item.Labels); err != nil {
  1396. BadRequest(response, err)
  1397. return
  1398. }
  1399. s.batchInsertItems(ctx, response, []Item{item})
  1400. }
  1401. func (s *RestServer) modifyItem(request *restful.Request, response *restful.Response) {
  1402. ctx := context.Background()
  1403. if request != nil && request.Request != nil {
  1404. ctx = request.Request.Context()
  1405. }
  1406. itemId := request.PathParameter("item-id")
  1407. var patch data.ItemPatch
  1408. if err := request.ReadEntity(&patch); err != nil {
  1409. BadRequest(response, err)
  1410. return
  1411. }
  1412. // validate labels
  1413. if err := data.ValidateLabels(patch.Labels); err != nil {
  1414. BadRequest(response, err)
  1415. return
  1416. }
  1417. // remove hidden item from cache
  1418. if patch.IsHidden != nil {
  1419. if err := s.CacheClient.UpdateDocuments(ctx, cache.ItemCache, itemId, cache.DocumentPatch{IsHidden: patch.IsHidden}); err != nil {
  1420. InternalServerError(response, err)
  1421. return
  1422. }
  1423. }
  1424. // add item to latest items cache
  1425. if patch.Timestamp != nil {
  1426. if err := s.CacheClient.UpdateDocuments(ctx, []string{cache.LatestItems}, itemId, cache.DocumentPatch{Score: proto.Float64(float64(patch.Timestamp.Unix()))}); err != nil {
  1427. InternalServerError(response, err)
  1428. return
  1429. }
  1430. }
  1431. // update categories in cache
  1432. if patch.Categories != nil {
  1433. if err := s.CacheClient.UpdateDocuments(ctx, cache.ItemCache, itemId, cache.DocumentPatch{Categories: withWildCard(patch.Categories)}); err != nil {
  1434. InternalServerError(response, err)
  1435. return
  1436. }
  1437. }
  1438. // modify item
  1439. if err := s.DataClient.ModifyItem(ctx, itemId, patch); err != nil {
  1440. InternalServerError(response, err)
  1441. return
  1442. }
  1443. // insert modify timestamp
  1444. if err := s.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastModifyItemTime, itemId), time.Now())); err != nil {
  1445. return
  1446. }
  1447. Ok(response, Success{RowAffected: 1})
  1448. }
  1449. // ItemIterator is the iterator for items.
  1450. type ItemIterator struct {
  1451. Cursor string
  1452. Items []data.Item
  1453. }
  1454. func (s *RestServer) getItems(request *restful.Request, response *restful.Response) {
  1455. ctx := context.Background()
  1456. if request != nil && request.Request != nil {
  1457. ctx = request.Request.Context()
  1458. }
  1459. cursor := request.QueryParameter("cursor")
  1460. n, err := ParseInt(request, "n", s.Config.Server.DefaultN)
  1461. if err != nil {
  1462. BadRequest(response, err)
  1463. return
  1464. }
  1465. cursor, items, err := s.DataClient.GetItems(ctx, cursor, n, nil)
  1466. if err != nil {
  1467. InternalServerError(response, err)
  1468. return
  1469. }
  1470. Ok(response, ItemIterator{Cursor: cursor, Items: items})
  1471. }
  1472. func (s *RestServer) getItem(request *restful.Request, response *restful.Response) {
  1473. ctx := context.Background()
  1474. if request != nil && request.Request != nil {
  1475. ctx = request.Request.Context()
  1476. }
  1477. // Get item id
  1478. itemId := request.PathParameter("item-id")
  1479. // Get item
  1480. item, err := s.DataClient.GetItem(ctx, itemId)
  1481. if err != nil {
  1482. if errors.Is(err, errors.NotFound) {
  1483. PageNotFound(response, err)
  1484. } else {
  1485. InternalServerError(response, err)
  1486. }
  1487. return
  1488. }
  1489. Ok(response, item)
  1490. }
  1491. func (s *RestServer) deleteItem(request *restful.Request, response *restful.Response) {
  1492. ctx := context.Background()
  1493. if request != nil && request.Request != nil {
  1494. ctx = request.Request.Context()
  1495. }
  1496. itemId := request.PathParameter("item-id")
  1497. // delete item from database
  1498. if err := s.DataClient.DeleteItem(ctx, itemId); err != nil {
  1499. InternalServerError(response, err)
  1500. return
  1501. }
  1502. // delete item from cache
  1503. if err := s.CacheClient.DeleteDocuments(ctx, cache.ItemCache, cache.DocumentCondition{Id: &itemId}); err != nil {
  1504. InternalServerError(response, err)
  1505. return
  1506. }
  1507. Ok(response, Success{RowAffected: 1})
  1508. }
  1509. func (s *RestServer) insertItemCategory(request *restful.Request, response *restful.Response) {
  1510. ctx := context.Background()
  1511. if request != nil && request.Request != nil {
  1512. ctx = request.Request.Context()
  1513. }
  1514. // fetch item id and category
  1515. itemId := request.PathParameter("item-id")
  1516. category := request.PathParameter("category")
  1517. // fetch item
  1518. item, err := s.DataClient.GetItem(ctx, itemId)
  1519. if err != nil {
  1520. InternalServerError(response, err)
  1521. return
  1522. }
  1523. if !funk.ContainsString(item.Categories, category) {
  1524. item.Categories = append(item.Categories, category)
  1525. }
  1526. // insert category to database
  1527. if err = s.DataClient.BatchInsertItems(ctx, []data.Item{item}); err != nil {
  1528. InternalServerError(response, err)
  1529. return
  1530. }
  1531. // insert category to cache
  1532. if err = s.CacheClient.UpdateDocuments(ctx, cache.ItemCache, itemId, cache.DocumentPatch{Categories: withWildCard(item.Categories)}); err != nil {
  1533. InternalServerError(response, err)
  1534. return
  1535. }
  1536. Ok(response, Success{RowAffected: 1})
  1537. }
  1538. func (s *RestServer) deleteItemCategory(request *restful.Request, response *restful.Response) {
  1539. ctx := context.Background()
  1540. if request != nil && request.Request != nil {
  1541. ctx = request.Request.Context()
  1542. }
  1543. // fetch item id and category
  1544. itemId := request.PathParameter("item-id")
  1545. category := request.PathParameter("category")
  1546. // fetch item
  1547. item, err := s.DataClient.GetItem(ctx, itemId)
  1548. if err != nil {
  1549. InternalServerError(response, err)
  1550. return
  1551. }
  1552. categories := make([]string, 0, len(item.Categories))
  1553. for _, cat := range item.Categories {
  1554. if cat != category {
  1555. categories = append(categories, cat)
  1556. }
  1557. }
  1558. item.Categories = categories
  1559. // delete category from cache
  1560. if err = s.CacheClient.UpdateDocuments(ctx, cache.ItemCache, itemId, cache.DocumentPatch{Categories: withWildCard(categories)}); err != nil {
  1561. InternalServerError(response, err)
  1562. return
  1563. }
  1564. // delete category from database
  1565. if err = s.DataClient.BatchInsertItems(ctx, []data.Item{item}); err != nil {
  1566. InternalServerError(response, err)
  1567. return
  1568. }
  1569. Ok(response, Success{RowAffected: 1})
  1570. }
  1571. // Feedback is the data structure for the feedback but stores the timestamp using string.
  1572. type Feedback struct {
  1573. data.FeedbackKey
  1574. Timestamp string
  1575. Comment string
  1576. }
  1577. func (f Feedback) ToDataFeedback() (data.Feedback, error) {
  1578. var feedback data.Feedback
  1579. feedback.FeedbackKey = f.FeedbackKey
  1580. feedback.Comment = f.Comment
  1581. if f.Timestamp != "" {
  1582. var err error
  1583. feedback.Timestamp, err = dateparse.ParseAny(f.Timestamp)
  1584. if err != nil {
  1585. return data.Feedback{}, err
  1586. }
  1587. }
  1588. return feedback, nil
  1589. }
  1590. func (s *RestServer) insertFeedback(overwrite bool) func(request *restful.Request, response *restful.Response) {
  1591. return func(request *restful.Request, response *restful.Response) {
  1592. ctx := context.Background()
  1593. if request != nil && request.Request != nil {
  1594. ctx = request.Request.Context()
  1595. }
  1596. // add ratings
  1597. var feedbackLiterTime []Feedback
  1598. if err := request.ReadEntity(&feedbackLiterTime); err != nil {
  1599. BadRequest(response, err)
  1600. return
  1601. }
  1602. // parse datetime
  1603. var err error
  1604. feedback := make([]data.Feedback, len(feedbackLiterTime))
  1605. users := mapset.NewSet[string]()
  1606. items := mapset.NewSet[string]()
  1607. for i := range feedback {
  1608. users.Add(feedbackLiterTime[i].UserId)
  1609. items.Add(feedbackLiterTime[i].ItemId)
  1610. feedback[i], err = feedbackLiterTime[i].ToDataFeedback()
  1611. if err != nil {
  1612. BadRequest(response, err)
  1613. return
  1614. }
  1615. }
  1616. // insert feedback to data store
  1617. err = s.DataClient.BatchInsertFeedback(ctx, feedback,
  1618. s.Config.Server.AutoInsertUser,
  1619. s.Config.Server.AutoInsertItem, overwrite)
  1620. if err != nil {
  1621. InternalServerError(response, err)
  1622. return
  1623. }
  1624. values := make([]cache.Value, 0, users.Cardinality()+items.Cardinality())
  1625. for _, userId := range users.ToSlice() {
  1626. values = append(values, cache.Time(cache.Key(cache.LastModifyUserTime, userId), time.Now()))
  1627. }
  1628. for _, itemId := range items.ToSlice() {
  1629. values = append(values, cache.Time(cache.Key(cache.LastModifyItemTime, itemId), time.Now()))
  1630. }
  1631. if err = s.CacheClient.Set(ctx, values...); err != nil {
  1632. InternalServerError(response, err)
  1633. return
  1634. }
  1635. log.ResponseLogger(response).Info("Insert feedback successfully", zap.Int("num_feedback", len(feedback)))
  1636. Ok(response, Success{RowAffected: len(feedback)})
  1637. }
  1638. }
  1639. // FeedbackIterator is the iterator for feedback.
  1640. type FeedbackIterator struct {
  1641. Cursor string
  1642. Feedback []data.Feedback
  1643. }
  1644. func (s *RestServer) getFeedback(request *restful.Request, response *restful.Response) {
  1645. ctx := context.Background()
  1646. if request != nil && request.Request != nil {
  1647. ctx = request.Request.Context()
  1648. }
  1649. // Parse parameters
  1650. cursor := request.QueryParameter("cursor")
  1651. n, err := ParseInt(request, "n", s.Config.Server.DefaultN)
  1652. if err != nil {
  1653. BadRequest(response, err)
  1654. return
  1655. }
  1656. cursor, feedback, err := s.DataClient.GetFeedback(ctx, cursor, n, nil, s.Config.Now())
  1657. if err != nil {
  1658. InternalServerError(response, err)
  1659. return
  1660. }
  1661. Ok(response, FeedbackIterator{Cursor: cursor, Feedback: feedback})
  1662. }
  1663. func (s *RestServer) getTypedFeedback(request *restful.Request, response *restful.Response) {
  1664. ctx := context.Background()
  1665. if request != nil && request.Request != nil {
  1666. ctx = request.Request.Context()
  1667. }
  1668. // Parse parameters
  1669. feedbackType := request.PathParameter("feedback-type")
  1670. cursor := request.QueryParameter("cursor")
  1671. n, err := ParseInt(request, "n", s.Config.Server.DefaultN)
  1672. if err != nil {
  1673. BadRequest(response, err)
  1674. return
  1675. }
  1676. cursor, feedback, err := s.DataClient.GetFeedback(ctx, cursor, n, nil, s.Config.Now(), feedbackType)
  1677. if err != nil {
  1678. InternalServerError(response, err)
  1679. return
  1680. }
  1681. Ok(response, FeedbackIterator{Cursor: cursor, Feedback: feedback})
  1682. }
  1683. func (s *RestServer) getUserItemFeedback(request *restful.Request, response *restful.Response) {
  1684. ctx := context.Background()
  1685. if request != nil && request.Request != nil {
  1686. ctx = request.Request.Context()
  1687. }
  1688. // Parse parameters
  1689. userId := request.PathParameter("user-id")
  1690. itemId := request.PathParameter("item-id")
  1691. if feedback, err := s.DataClient.GetUserItemFeedback(ctx, userId, itemId); err != nil {
  1692. InternalServerError(response, err)
  1693. } else {
  1694. Ok(response, feedback)
  1695. }
  1696. }
  1697. func (s *RestServer) deleteUserItemFeedback(request *restful.Request, response *restful.Response) {
  1698. ctx := context.Background()
  1699. if request != nil && request.Request != nil {
  1700. ctx = request.Request.Context()
  1701. }
  1702. // Parse parameters
  1703. userId := request.PathParameter("user-id")
  1704. itemId := request.PathParameter("item-id")
  1705. if deleteCount, err := s.DataClient.DeleteUserItemFeedback(ctx, userId, itemId); err != nil {
  1706. InternalServerError(response, err)
  1707. } else {
  1708. Ok(response, Success{RowAffected: deleteCount})
  1709. }
  1710. }
  1711. func (s *RestServer) getTypedUserItemFeedback(request *restful.Request, response *restful.Response) {
  1712. ctx := context.Background()
  1713. if request != nil && request.Request != nil {
  1714. ctx = request.Request.Context()
  1715. }
  1716. // Parse parameters
  1717. feedbackType := request.PathParameter("feedback-type")
  1718. userId := request.PathParameter("user-id")
  1719. itemId := request.PathParameter("item-id")
  1720. if feedback, err := s.DataClient.GetUserItemFeedback(ctx, userId, itemId, feedbackType); err != nil {
  1721. InternalServerError(response, err)
  1722. } else if feedbackType == "" {
  1723. Text(response, "{}")
  1724. } else {
  1725. Ok(response, feedback[0])
  1726. }
  1727. }
  1728. func (s *RestServer) deleteTypedUserItemFeedback(request *restful.Request, response *restful.Response) {
  1729. ctx := context.Background()
  1730. if request != nil && request.Request != nil {
  1731. ctx = request.Request.Context()
  1732. }
  1733. // Parse parameters
  1734. feedbackType := request.PathParameter("feedback-type")
  1735. userId := request.PathParameter("user-id")
  1736. itemId := request.PathParameter("item-id")
  1737. if deleteCount, err := s.DataClient.DeleteUserItemFeedback(ctx, userId, itemId, feedbackType); err != nil {
  1738. InternalServerError(response, err)
  1739. } else {
  1740. Ok(response, Success{deleteCount})
  1741. }
  1742. }
  1743. type HealthStatus struct {
  1744. Ready bool
  1745. DataStoreError error
  1746. CacheStoreError error
  1747. DataStoreConnected bool
  1748. CacheStoreConnected bool
  1749. }
  1750. func (s *RestServer) checkHealth() HealthStatus {
  1751. healthStatus := HealthStatus{}
  1752. healthStatus.DataStoreError = s.DataClient.Ping()
  1753. healthStatus.CacheStoreError = s.CacheClient.Ping()
  1754. healthStatus.DataStoreConnected = healthStatus.DataStoreError == nil
  1755. healthStatus.CacheStoreConnected = healthStatus.CacheStoreError == nil
  1756. healthStatus.Ready = healthStatus.DataStoreConnected && healthStatus.CacheStoreConnected
  1757. return healthStatus
  1758. }
  1759. func (s *RestServer) checkReady(_ *restful.Request, response *restful.Response) {
  1760. healthStatus := s.checkHealth()
  1761. if healthStatus.Ready {
  1762. Ok(response, healthStatus)
  1763. } else {
  1764. errReason, err := json.Marshal(healthStatus)
  1765. if err != nil {
  1766. Error(response, http.StatusInternalServerError, err)
  1767. } else {
  1768. Error(response, http.StatusServiceUnavailable, errors.New(string(errReason)))
  1769. }
  1770. }
  1771. }
  1772. func (s *RestServer) checkLive(_ *restful.Request, response *restful.Response) {
  1773. healthStatus := s.checkHealth()
  1774. Ok(response, healthStatus)
  1775. }
  1776. func (s *RestServer) getMeasurements(request *restful.Request, response *restful.Response) {
  1777. ctx := context.Background()
  1778. if request != nil && request.Request != nil {
  1779. ctx = request.Request.Context()
  1780. }
  1781. // Parse parameters
  1782. name := request.PathParameter("name")
  1783. n, err := ParseInt(request, "n", 100)
  1784. if err != nil {
  1785. BadRequest(response, err)
  1786. return
  1787. }
  1788. measurements, err := s.CacheClient.GetTimeSeriesPoints(ctx, name, time.Now().Add(-24*time.Hour*time.Duration(n)), time.Now())
  1789. if err != nil {
  1790. InternalServerError(response, err)
  1791. return
  1792. }
  1793. Ok(response, measurements)
  1794. }
  1795. // BadRequest returns a bad request error.
  1796. func BadRequest(response *restful.Response, err error) {
  1797. response.Header().Set("Access-Control-Allow-Origin", "*")
  1798. log.ResponseLogger(response).Error("bad request", zap.Error(err))
  1799. if err = response.WriteError(http.StatusBadRequest, err); err != nil {
  1800. log.ResponseLogger(response).Error("failed to write error", zap.Error(err))
  1801. }
  1802. }
  1803. // InternalServerError returns a internal server error.
  1804. func InternalServerError(response *restful.Response, err error) {
  1805. response.Header().Set("Access-Control-Allow-Origin", "*")
  1806. log.ResponseLogger(response).Error("internal server error", zap.Error(err))
  1807. if err = response.WriteError(http.StatusInternalServerError, err); err != nil {
  1808. log.ResponseLogger(response).Error("failed to write error", zap.Error(err))
  1809. }
  1810. }
  1811. // PageNotFound returns a not found error.
  1812. func PageNotFound(response *restful.Response, err error) {
  1813. response.Header().Set("Access-Control-Allow-Origin", "*")
  1814. if err := response.WriteError(http.StatusNotFound, err); err != nil {
  1815. log.ResponseLogger(response).Error("failed to write error", zap.Error(err))
  1816. }
  1817. }
  1818. // Ok sends the content as JSON to the client.
  1819. func Ok(response *restful.Response, content interface{}) {
  1820. response.Header().Set("Access-Control-Allow-Origin", "*")
  1821. if err := response.WriteAsJson(content); err != nil {
  1822. log.ResponseLogger(response).Error("failed to write json", zap.Error(err))
  1823. }
  1824. }
  1825. func Error(response *restful.Response, httpStatus int, responseError error) {
  1826. response.Header().Set("Access-Control-Allow-Origin", "*")
  1827. if err := response.WriteError(httpStatus, responseError); err != nil {
  1828. log.ResponseLogger(response).Error("failed to write error", zap.Error(err))
  1829. }
  1830. }
  1831. // Text returns a plain text.
  1832. func Text(response *restful.Response, content string) {
  1833. response.Header().Set("Access-Control-Allow-Origin", "*")
  1834. if _, err := response.Write([]byte(content)); err != nil {
  1835. log.ResponseLogger(response).Error("failed to write text", zap.Error(err))
  1836. }
  1837. }
  1838. func withWildCard(categories []string) []string {
  1839. result := make([]string, len(categories), len(categories)+1)
  1840. copy(result, categories)
  1841. result = append(result, "")
  1842. return result
  1843. }