1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501 |
- // Copyright 2021 gorse Project Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package master
- import (
- "bufio"
- "context"
- "encoding/json"
- "fmt"
- "io"
- "net/http"
- "os"
- "reflect"
- "strconv"
- "strings"
- "time"
- "github.com/araddon/dateparse"
- mapset "github.com/deckarep/golang-set/v2"
- restfulspec "github.com/emicklei/go-restful-openapi/v2"
- "github.com/emicklei/go-restful/v3"
- "github.com/gorilla/securecookie"
- _ "github.com/gorse-io/dashboard"
- "github.com/juju/errors"
- "github.com/mitchellh/mapstructure"
- "github.com/rakyll/statik/fs"
- "github.com/samber/lo"
- "github.com/zhenghaoz/gorse/base"
- "github.com/zhenghaoz/gorse/base/encoding"
- "github.com/zhenghaoz/gorse/base/log"
- "github.com/zhenghaoz/gorse/base/progress"
- "github.com/zhenghaoz/gorse/cmd/version"
- "github.com/zhenghaoz/gorse/config"
- "github.com/zhenghaoz/gorse/model/click"
- "github.com/zhenghaoz/gorse/model/ranking"
- "github.com/zhenghaoz/gorse/server"
- "github.com/zhenghaoz/gorse/storage/cache"
- "github.com/zhenghaoz/gorse/storage/data"
- "go.uber.org/zap"
- )
- func (m *Master) CreateWebService() {
- ws := m.WebService
- ws.Consumes(restful.MIME_JSON).Produces(restful.MIME_JSON)
- ws.Path("/api/")
- ws.Filter(m.LoginFilter)
- ws.Route(ws.GET("/dashboard/cluster").To(m.getCluster).
- Doc("Get nodes in the cluster.").
- Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}).
- Returns(http.StatusOK, "OK", []Node{}).
- Writes([]Node{}))
- ws.Route(ws.GET("/dashboard/categories").To(m.getCategories).
- Doc("Get categories of items.").
- Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}).
- Returns(http.StatusOK, "OK", []string{}).
- Writes([]string{}))
- ws.Route(ws.GET("/dashboard/config").To(m.getConfig).
- Doc("Get config.").
- Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}).
- Returns(http.StatusOK, "OK", config.Config{}).
- Writes(config.Config{}))
- ws.Route(ws.GET("/dashboard/stats").To(m.getStats).
- Doc("Get global status.").
- Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}).
- Param(ws.HeaderParameter("X-API-Key", "secret key for RESTful API")).
- Returns(http.StatusOK, "OK", Status{}).
- Writes(Status{}))
- ws.Route(ws.GET("/dashboard/tasks").To(m.getTasks).
- Doc("Get tasks.").
- Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}).
- Param(ws.HeaderParameter("X-API-Key", "secret key for RESTful API")).
- Returns(http.StatusOK, "OK", []progress.Progress{}).
- Writes([]progress.Progress{}))
- ws.Route(ws.GET("/dashboard/rates").To(m.getRates).
- Doc("Get positive feedback rates.").
- Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}).
- Param(ws.HeaderParameter("X-API-Key", "secret key for RESTful API")).
- Returns(http.StatusOK, "OK", map[string][]cache.TimeSeriesPoint{}).
- Writes(map[string][]cache.TimeSeriesPoint{}))
- // Get a user
- ws.Route(ws.GET("/dashboard/user/{user-id}").To(m.getUser).
- Doc("Get a user.").
- Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}).
- Param(ws.PathParameter("user-id", "identifier of the user").DataType("string")).
- Returns(http.StatusOK, "OK", User{}).
- Writes(User{}))
- // Get a user feedback
- ws.Route(ws.GET("/dashboard/user/{user-id}/feedback/{feedback-type}").To(m.getTypedFeedbackByUser).
- Doc("Get feedback by user id with feedback type.").
- Metadata(restfulspec.KeyOpenAPITags, []string{"feedback"}).
- Param(ws.HeaderParameter("X-API-Key", "secret key for RESTful API")).
- Param(ws.PathParameter("user-id", "identifier of the user").DataType("string")).
- Param(ws.PathParameter("feedback-type", "feedback type").DataType("string")).
- Returns(http.StatusOK, "OK", []Feedback{}).
- Writes([]Feedback{}))
- // Get users
- ws.Route(ws.GET("/dashboard/users").To(m.getUsers).
- Doc("Get users.").
- Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}).
- Param(ws.QueryParameter("n", "number of returned users").DataType("int")).
- Param(ws.QueryParameter("cursor", "cursor for next page").DataType("string")).
- Returns(http.StatusOK, "OK", UserIterator{}).
- Writes(UserIterator{}))
- // Get popular items
- ws.Route(ws.GET("/dashboard/popular/").To(m.getPopular).
- Doc("get popular items").
- Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}).
- Param(ws.QueryParameter("n", "number of returned items").DataType("int")).
- Param(ws.QueryParameter("offset", "offset of the list").DataType("int")).
- Returns(http.StatusOK, "OK", []ScoredItem{}).
- Writes([]ScoredItem{}))
- ws.Route(ws.GET("/dashboard/popular/{category}").To(m.getPopular).
- Doc("get popular items").
- Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}).
- Param(ws.PathParameter("category", "category of items").DataType("string")).
- Param(ws.QueryParameter("n", "number of returned items").DataType("int")).
- Param(ws.QueryParameter("offset", "offset of the list").DataType("int")).
- Returns(http.StatusOK, "OK", []ScoredItem{}).
- Writes([]ScoredItem{}))
- // Get latest items
- ws.Route(ws.GET("/dashboard/latest/").To(m.getLatest).
- Doc("get latest items").
- Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}).
- Param(ws.QueryParameter("n", "number of returned items").DataType("int")).
- Param(ws.QueryParameter("offset", "offset of the list").DataType("int")).
- Returns(http.StatusOK, "OK", []ScoredItem{}).
- Writes([]ScoredItem{}))
- ws.Route(ws.GET("/dashboard/latest/{category}").To(m.getLatest).
- Doc("get latest items").
- Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}).
- Param(ws.PathParameter("category", "category of items").DataType("string")).
- Param(ws.QueryParameter("n", "number of returned items").DataType("int")).
- Param(ws.QueryParameter("offset", "offset of the list").DataType("int")).
- Returns(http.StatusOK, "OK", []ScoredItem{}).
- Writes([]ScoredItem{}))
- ws.Route(ws.GET("/dashboard/recommend/{user-id}").To(m.getRecommend).
- Doc("Get recommendation for user.").
- Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}).
- Param(ws.PathParameter("user-id", "identifier of the user").DataType("string")).
- Param(ws.QueryParameter("n", "number of returned items").DataType("int")).
- Returns(http.StatusOK, "OK", []data.Item{}).
- Writes([]data.Item{}))
- ws.Route(ws.GET("/dashboard/recommend/{user-id}/{recommender}").To(m.getRecommend).
- Doc("Get recommendation for user.").
- Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}).
- Param(ws.PathParameter("user-id", "identifier of the user").DataType("string")).
- Param(ws.PathParameter("recommender", "one of `final`, `collaborative`, `user_based` and `item_based`").DataType("string")).
- Param(ws.QueryParameter("n", "number of returned items").DataType("int")).
- Returns(http.StatusOK, "OK", []data.Item{}).
- Writes([]data.Item{}))
- ws.Route(ws.GET("/dashboard/recommend/{user-id}/{recommender}/{category}").To(m.getRecommend).
- Doc("Get recommendation for user.").
- Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}).
- Param(ws.PathParameter("user-id", "identifier of the user").DataType("string")).
- Param(ws.PathParameter("recommender", "one of `final`, `collaborative`, `user_based` and `item_based`").DataType("string")).
- Param(ws.PathParameter("category", "category of items").DataType("string")).
- Param(ws.QueryParameter("n", "number of returned items").DataType("int")).
- Returns(http.StatusOK, "OK", []data.Item{}).
- Writes([]data.Item{}))
- ws.Route(ws.GET("/dashboard/item/{item-id}/neighbors").To(m.getItemNeighbors).
- Doc("get neighbors of a item").
- Metadata(restfulspec.KeyOpenAPITags, []string{"recommendation"}).
- Param(ws.PathParameter("item-id", "identifier of the item").DataType("string")).
- Param(ws.QueryParameter("n", "number of returned items").DataType("int")).
- Param(ws.QueryParameter("offset", "offset of the list").DataType("int")).
- Returns(http.StatusOK, "OK", []ScoredItem{}).
- Writes([]ScoredItem{}))
- ws.Route(ws.GET("/dashboard/item/{item-id}/neighbors/{category}").To(m.getItemCategorizedNeighbors).
- Doc("get neighbors of a item").
- Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}).
- Param(ws.PathParameter("item-id", "identifier of the item").DataType("string")).
- Param(ws.PathParameter("category", "category of items").DataType("string")).
- Param(ws.QueryParameter("n", "number of returned items").DataType("int")).
- Param(ws.QueryParameter("offset", "offset of the list").DataType("int")).
- Returns(http.StatusOK, "OK", []ScoredItem{}).
- Writes([]ScoredItem{}))
- ws.Route(ws.GET("/dashboard/user/{user-id}/neighbors").To(m.getUserNeighbors).
- Doc("get neighbors of a user").
- Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}).
- Param(ws.PathParameter("user-id", "identifier of the user").DataType("string")).
- Param(ws.QueryParameter("n", "number of returned users").DataType("int")).
- Param(ws.QueryParameter("offset", "offset of the list").DataType("int")).
- Returns(http.StatusOK, "OK", []ScoreUser{}).
- Writes([]ScoreUser{}))
- }
- // SinglePageAppFileSystem is the file system for single page app.
- type SinglePageAppFileSystem struct {
- root http.FileSystem
- }
- // Open index.html if required file not exists.
- func (fs *SinglePageAppFileSystem) Open(name string) (http.File, error) {
- f, err := fs.root.Open(name)
- if os.IsNotExist(err) {
- return fs.root.Open("/index.html")
- }
- return f, err
- }
- func (m *Master) SetOneMode(workerScheduleHandler http.HandlerFunc) {
- m.workerScheduleHandler = workerScheduleHandler
- }
- func (m *Master) StartHttpServer() {
- m.CreateWebService()
- container := restful.NewContainer()
- container.Handle("/", http.HandlerFunc(m.dashboard))
- container.Handle("/login", http.HandlerFunc(m.login))
- container.Handle("/logout", http.HandlerFunc(m.logout))
- container.Handle("/api/purge", http.HandlerFunc(m.purge))
- container.Handle("/api/bulk/users", http.HandlerFunc(m.importExportUsers))
- container.Handle("/api/bulk/items", http.HandlerFunc(m.importExportItems))
- container.Handle("/api/bulk/feedback", http.HandlerFunc(m.importExportFeedback))
- if m.workerScheduleHandler == nil {
- container.Handle("/api/admin/schedule", http.HandlerFunc(m.scheduleAPIHandler))
- } else {
- container.Handle("/api/admin/schedule/master", http.HandlerFunc(m.scheduleAPIHandler))
- container.Handle("/api/admin/schedule/worker", m.workerScheduleHandler)
- }
- m.RestServer.StartHttpServer(container)
- }
- var (
- cookieHandler = securecookie.New(
- securecookie.GenerateRandomKey(64),
- securecookie.GenerateRandomKey(32))
- staticFileSystem http.FileSystem
- staticFileServer http.Handler
- )
- func init() {
- var err error
- staticFileSystem, err = fs.New()
- if err != nil {
- log.Logger().Fatal("failed to load statik files", zap.Error(err))
- }
- staticFileServer = http.FileServer(&SinglePageAppFileSystem{staticFileSystem})
- // Create temporary directory if not exist
- tempDir := os.TempDir()
- if err = os.MkdirAll(tempDir, 1777); err != nil {
- log.Logger().Fatal("failed to create temporary directory", zap.String("directory", tempDir), zap.Error(err))
- }
- }
- // Taken from https://github.com/mytrile/nocache
- var noCacheHeaders = map[string]string{
- "Expires": time.Unix(0, 0).Format(time.RFC1123),
- "Cache-Control": "no-cache, no-store, no-transform, must-revalidate, private, max-age=0",
- "Pragma": "no-cache",
- "X-Accel-Expires": "0",
- }
- var etagHeaders = []string{
- "ETag",
- "If-Modified-Since",
- "If-Match",
- "If-None-Match",
- "If-Range",
- "If-Unmodified-Since",
- }
- // noCache is a simple piece of middleware that sets a number of HTTP headers to prevent
- // a router (or subrouter) from being cached by an upstream proxy and/or client.
- //
- // As per http://wiki.nginx.org/HttpProxyModule - noCache sets:
- //
- // Expires: Thu, 01 Jan 1970 00:00:00 UTC
- // Cache-Control: no-cache, private, max-age=0
- // X-Accel-Expires: 0
- // Pragma: no-cache (for HTTP/1.0 proxies/clients)
- func noCache(h http.Handler) http.Handler {
- fn := func(w http.ResponseWriter, r *http.Request) {
- // Delete any ETag headers that may have been set
- for _, v := range etagHeaders {
- if r.Header.Get(v) != "" {
- r.Header.Del(v)
- }
- }
- // Set our noCache headers
- for k, v := range noCacheHeaders {
- w.Header().Set(k, v)
- }
- h.ServeHTTP(w, r)
- }
- return http.HandlerFunc(fn)
- }
- func (m *Master) dashboard(response http.ResponseWriter, request *http.Request) {
- _, err := staticFileSystem.Open(request.RequestURI)
- if request.RequestURI == "/" || os.IsNotExist(err) {
- if !m.checkLogin(request) {
- http.Redirect(response, request, "/login", http.StatusFound)
- log.Logger().Info(fmt.Sprintf("%s %s", request.Method, request.URL), zap.Int("status_code", http.StatusFound))
- return
- }
- noCache(staticFileServer).ServeHTTP(response, request)
- return
- }
- staticFileServer.ServeHTTP(response, request)
- }
- func (m *Master) checkToken(token string) (bool, error) {
- resp, err := http.Get(fmt.Sprintf("%s/auth/dashboard/%s", m.Config.Master.DashboardAuthServer, token))
- if err != nil {
- return false, errors.Trace(err)
- }
- if resp.StatusCode == http.StatusOK {
- return true, nil
- } else if resp.StatusCode == http.StatusUnauthorized {
- return false, nil
- } else {
- if message, err := io.ReadAll(resp.Body); err != nil {
- return false, errors.Trace(err)
- } else {
- return false, errors.New(string(message))
- }
- }
- }
- func (m *Master) login(response http.ResponseWriter, request *http.Request) {
- switch request.Method {
- case http.MethodGet:
- log.Logger().Info("GET /login", zap.Int("status_code", http.StatusOK))
- staticFileServer.ServeHTTP(response, request)
- case http.MethodPost:
- token := request.FormValue("token")
- name := request.FormValue("user_name")
- pass := request.FormValue("password")
- if m.Config.Master.DashboardAuthServer != "" {
- // check access token
- if isValid, err := m.checkToken(token); err != nil {
- server.InternalServerError(restful.NewResponse(response), err)
- return
- } else if !isValid {
- http.Redirect(response, request, "login?msg=incorrect", http.StatusFound)
- log.Logger().Info("POST /login", zap.Int("status_code", http.StatusUnauthorized))
- return
- }
- // save token to cache
- if encoded, err := cookieHandler.Encode("token", token); err != nil {
- server.InternalServerError(restful.NewResponse(response), err)
- return
- } else {
- cookie := &http.Cookie{
- Name: "token",
- Value: encoded,
- Path: "/",
- }
- http.SetCookie(response, cookie)
- http.Redirect(response, request, "/", http.StatusFound)
- log.Logger().Info("POST /login", zap.Int("status_code", http.StatusUnauthorized))
- return
- }
- } else if m.Config.Master.DashboardUserName != "" || m.Config.Master.DashboardPassword != "" {
- if name != m.Config.Master.DashboardUserName || pass != m.Config.Master.DashboardPassword {
- http.Redirect(response, request, "login?msg=incorrect", http.StatusFound)
- log.Logger().Info("POST /login", zap.Int("status_code", http.StatusUnauthorized))
- return
- }
- value := map[string]string{
- "user_name": name,
- "password": pass,
- }
- if encoded, err := cookieHandler.Encode("session", value); err != nil {
- server.InternalServerError(restful.NewResponse(response), err)
- return
- } else {
- cookie := &http.Cookie{
- Name: "session",
- Value: encoded,
- Path: "/",
- }
- http.SetCookie(response, cookie)
- http.Redirect(response, request, "/", http.StatusFound)
- log.Logger().Info("POST /login", zap.Int("status_code", http.StatusFound))
- return
- }
- } else {
- http.Redirect(response, request, "/", http.StatusFound)
- log.Logger().Info("POST /login", zap.Int("status_code", http.StatusFound))
- }
- default:
- server.BadRequest(restful.NewResponse(response), errors.New("unsupported method"))
- }
- }
- func (m *Master) logout(response http.ResponseWriter, request *http.Request) {
- cookie := &http.Cookie{
- Name: "session",
- Value: "",
- Path: "/",
- MaxAge: -1,
- }
- http.SetCookie(response, cookie)
- http.Redirect(response, request, "/login", http.StatusFound)
- log.Logger().Info(fmt.Sprintf("%s %s", request.Method, request.RequestURI), zap.Int("status_code", http.StatusFound))
- }
- func (m *Master) LoginFilter(req *restful.Request, resp *restful.Response, chain *restful.FilterChain) {
- if m.checkLogin(req.Request) {
- req.Request.Header.Set("X-API-Key", m.Config.Server.APIKey)
- chain.ProcessFilter(req, resp)
- } else if !strings.HasPrefix(req.SelectedRoutePath(), "/api/dashboard") {
- chain.ProcessFilter(req, resp)
- } else {
- if err := resp.WriteError(http.StatusUnauthorized, fmt.Errorf("unauthorized")); err != nil {
- log.ResponseLogger(resp).Error("failed to write error", zap.Error(err))
- }
- }
- }
- func (m *Master) checkLogin(request *http.Request) bool {
- if m.Config.Master.AdminAPIKey != "" && m.Config.Master.AdminAPIKey == request.Header.Get("X-Api-Key") {
- return true
- }
- if m.Config.Master.DashboardAuthServer != "" {
- if tokenCookie, err := request.Cookie("token"); err == nil {
- var token string
- if err = cookieHandler.Decode("token", tokenCookie.Value, &token); err == nil {
- if isValid, err := m.checkToken(token); err != nil {
- log.Logger().Error("failed to check access token", zap.Error(err))
- } else if isValid {
- return true
- }
- }
- }
- return false
- } else if m.Config.Master.DashboardUserName != "" || m.Config.Master.DashboardPassword != "" {
- if sessionCookie, err := request.Cookie("session"); err == nil {
- cookieValue := make(map[string]string)
- if err = cookieHandler.Decode("session", sessionCookie.Value, &cookieValue); err == nil {
- userName := cookieValue["user_name"]
- password := cookieValue["password"]
- if userName == m.Config.Master.DashboardUserName && password == m.Config.Master.DashboardPassword {
- return true
- }
- }
- }
- return false
- }
- return true
- }
- func (m *Master) getCategories(request *restful.Request, response *restful.Response) {
- ctx := context.Background()
- if request != nil && request.Request != nil {
- ctx = request.Request.Context()
- }
- categories, err := m.CacheClient.GetSet(ctx, cache.ItemCategories)
- if err != nil {
- server.InternalServerError(response, err)
- return
- }
- server.Ok(response, categories)
- }
- func (m *Master) getCluster(_ *restful.Request, response *restful.Response) {
- // collect nodes
- workers := make([]*Node, 0)
- servers := make([]*Node, 0)
- m.nodesInfoMutex.RLock()
- for _, info := range m.nodesInfo {
- switch info.Type {
- case WorkerNode:
- workers = append(workers, info)
- case ServerNode:
- servers = append(servers, info)
- }
- }
- m.nodesInfoMutex.RUnlock()
- // return nodes
- nodes := make([]*Node, 0)
- nodes = append(nodes, workers...)
- nodes = append(nodes, servers...)
- server.Ok(response, nodes)
- }
- func formatConfig(configMap map[string]interface{}) map[string]interface{} {
- return lo.MapValues(configMap, func(v interface{}, _ string) interface{} {
- switch value := v.(type) {
- case time.Duration:
- s := value.String()
- if strings.HasSuffix(s, "m0s") {
- s = s[:len(s)-2]
- }
- if strings.HasSuffix(s, "h0m") {
- s = s[:len(s)-2]
- }
- return s
- case map[string]interface{}:
- return formatConfig(value)
- default:
- return v
- }
- })
- }
- func (m *Master) getConfig(_ *restful.Request, response *restful.Response) {
- var configMap map[string]interface{}
- err := mapstructure.Decode(m.Config, &configMap)
- if err != nil {
- server.InternalServerError(response, err)
- return
- }
- if m.Config.Master.DashboardRedacted {
- delete(configMap, "database")
- }
- server.Ok(response, formatConfig(configMap))
- }
- type Status struct {
- BinaryVersion string
- NumServers int
- NumWorkers int
- NumUsers int
- NumItems int
- NumUserLabels int
- NumItemLabels int
- NumTotalPosFeedback int
- NumValidPosFeedback int
- NumValidNegFeedback int
- PopularItemsUpdateTime time.Time
- LatestItemsUpdateTime time.Time
- MatchingModelFitTime time.Time
- MatchingModelScore ranking.Score
- RankingModelFitTime time.Time
- RankingModelScore click.Score
- UserNeighborIndexRecall float32
- ItemNeighborIndexRecall float32
- MatchingIndexRecall float32
- }
- func (m *Master) getStats(request *restful.Request, response *restful.Response) {
- ctx := context.Background()
- if request != nil && request.Request != nil {
- ctx = request.Request.Context()
- }
- status := Status{BinaryVersion: version.Version}
- var err error
- // read number of users
- if status.NumUsers, err = m.CacheClient.Get(ctx, cache.Key(cache.GlobalMeta, cache.NumUsers)).Integer(); err != nil {
- log.ResponseLogger(response).Warn("failed to get number of users", zap.Error(err))
- }
- // read number of items
- if status.NumItems, err = m.CacheClient.Get(ctx, cache.Key(cache.GlobalMeta, cache.NumItems)).Integer(); err != nil {
- log.ResponseLogger(response).Warn("failed to get number of items", zap.Error(err))
- }
- // read number of user labels
- if status.NumUserLabels, err = m.CacheClient.Get(ctx, cache.Key(cache.GlobalMeta, cache.NumUserLabels)).Integer(); err != nil {
- log.ResponseLogger(response).Warn("failed to get number of user labels", zap.Error(err))
- }
- // read number of item labels
- if status.NumItemLabels, err = m.CacheClient.Get(ctx, cache.Key(cache.GlobalMeta, cache.NumItemLabels)).Integer(); err != nil {
- log.ResponseLogger(response).Warn("failed to get number of item labels", zap.Error(err))
- }
- // read number of total positive feedback
- if status.NumTotalPosFeedback, err = m.CacheClient.Get(ctx, cache.Key(cache.GlobalMeta, cache.NumTotalPosFeedbacks)).Integer(); err != nil {
- log.ResponseLogger(response).Warn("failed to get number of total positive feedbacks", zap.Error(err))
- }
- // read number of valid positive feedback
- if status.NumValidPosFeedback, err = m.CacheClient.Get(ctx, cache.Key(cache.GlobalMeta, cache.NumValidPosFeedbacks)).Integer(); err != nil {
- log.ResponseLogger(response).Warn("failed to get number of valid positive feedbacks", zap.Error(err))
- }
- // read number of valid negative feedback
- if status.NumValidNegFeedback, err = m.CacheClient.Get(ctx, cache.Key(cache.GlobalMeta, cache.NumValidNegFeedbacks)).Integer(); err != nil {
- log.ResponseLogger(response).Warn("failed to get number of valid negative feedbacks", zap.Error(err))
- }
- // count the number of workers and servers
- m.nodesInfoMutex.Lock()
- for _, node := range m.nodesInfo {
- switch node.Type {
- case ServerNode:
- status.NumServers++
- case WorkerNode:
- status.NumWorkers++
- }
- }
- m.nodesInfoMutex.Unlock()
- // read popular items update time
- if status.PopularItemsUpdateTime, err = m.CacheClient.Get(ctx, cache.Key(cache.GlobalMeta, cache.LastUpdatePopularItemsTime)).Time(); err != nil {
- log.ResponseLogger(response).Warn("failed to get popular items update time", zap.Error(err))
- }
- // read the latest items update time
- if status.LatestItemsUpdateTime, err = m.CacheClient.Get(ctx, cache.Key(cache.GlobalMeta, cache.LastUpdateLatestItemsTime)).Time(); err != nil {
- log.ResponseLogger(response).Warn("failed to get latest items update time", zap.Error(err))
- }
- status.MatchingModelScore = m.rankingScore
- status.RankingModelScore = m.clickScore
- // read last fit matching model time
- if status.MatchingModelFitTime, err = m.CacheClient.Get(ctx, cache.Key(cache.GlobalMeta, cache.LastFitMatchingModelTime)).Time(); err != nil {
- log.ResponseLogger(response).Warn("failed to get last fit matching model time", zap.Error(err))
- }
- // read last fit ranking model time
- if status.RankingModelFitTime, err = m.CacheClient.Get(ctx, cache.Key(cache.GlobalMeta, cache.LastFitRankingModelTime)).Time(); err != nil {
- log.ResponseLogger(response).Warn("failed to get last fit ranking model time", zap.Error(err))
- }
- // read user neighbor index recall
- var temp string
- if m.Config.Recommend.UserNeighbors.EnableIndex {
- if temp, err = m.CacheClient.Get(ctx, cache.Key(cache.GlobalMeta, cache.UserNeighborIndexRecall)).String(); err != nil {
- log.ResponseLogger(response).Warn("failed to get user neighbor index recall", zap.Error(err))
- } else {
- status.UserNeighborIndexRecall = encoding.ParseFloat32(temp)
- }
- }
- // read item neighbor index recall
- if m.Config.Recommend.ItemNeighbors.EnableIndex {
- if temp, err = m.CacheClient.Get(ctx, cache.Key(cache.GlobalMeta, cache.ItemNeighborIndexRecall)).String(); err != nil {
- log.ResponseLogger(response).Warn("failed to get item neighbor index recall", zap.Error(err))
- } else {
- status.ItemNeighborIndexRecall = encoding.ParseFloat32(temp)
- }
- }
- // read matching index recall
- if m.Config.Recommend.Collaborative.EnableIndex {
- if temp, err = m.CacheClient.Get(ctx, cache.Key(cache.GlobalMeta, cache.MatchingIndexRecall)).String(); err != nil {
- log.ResponseLogger(response).Warn("failed to get matching index recall", zap.Error(err))
- } else {
- status.MatchingIndexRecall = encoding.ParseFloat32(temp)
- }
- }
- server.Ok(response, status)
- }
- func (m *Master) getTasks(_ *restful.Request, response *restful.Response) {
- // List workers
- workers := mapset.NewSet[string]()
- m.nodesInfoMutex.RLock()
- for _, info := range m.nodesInfo {
- if info.Type == WorkerNode {
- workers.Add(info.Name)
- }
- }
- m.nodesInfoMutex.RUnlock()
- // List local progress
- progressList := m.tracer.List()
- // list remote progress
- m.remoteProgress.Range(func(key, value interface{}) bool {
- if workers.Contains(key.(string)) {
- progressList = append(progressList, value.([]progress.Progress)...)
- }
- return true
- })
- server.Ok(response, progressList)
- }
- func (m *Master) getRates(request *restful.Request, response *restful.Response) {
- ctx := context.Background()
- if request != nil && request.Request != nil {
- ctx = request.Request.Context()
- }
- // Parse parameters
- n, err := server.ParseInt(request, "n", 100)
- if err != nil {
- server.BadRequest(response, err)
- return
- }
- measurements := make(map[string][]cache.TimeSeriesPoint, len(m.Config.Recommend.DataSource.PositiveFeedbackTypes))
- for _, feedbackType := range m.Config.Recommend.DataSource.PositiveFeedbackTypes {
- measurements[feedbackType], err = m.CacheClient.GetTimeSeriesPoints(ctx, cache.Key(PositiveFeedbackRate, feedbackType),
- time.Now().Add(-24*time.Hour*time.Duration(n)), time.Now())
- if err != nil {
- server.InternalServerError(response, err)
- return
- }
- }
- server.Ok(response, measurements)
- }
- type UserIterator struct {
- Cursor string
- Users []User
- }
- type User struct {
- data.User
- LastActiveTime time.Time
- LastUpdateTime time.Time
- }
- func (m *Master) getUser(request *restful.Request, response *restful.Response) {
- ctx := context.Background()
- if request != nil && request.Request != nil {
- ctx = request.Request.Context()
- }
- // get user id
- userId := request.PathParameter("user-id")
- // get user
- user, err := m.DataClient.GetUser(ctx, userId)
- if err != nil {
- if errors.Is(err, errors.NotFound) {
- server.PageNotFound(response, err)
- } else {
- server.InternalServerError(response, err)
- }
- return
- }
- detail := User{User: user}
- if detail.LastActiveTime, err = m.CacheClient.Get(ctx, cache.Key(cache.LastModifyUserTime, user.UserId)).Time(); err != nil && !errors.Is(err, errors.NotFound) {
- server.InternalServerError(response, err)
- return
- }
- if detail.LastUpdateTime, err = m.CacheClient.Get(ctx, cache.Key(cache.LastUpdateUserRecommendTime, user.UserId)).Time(); err != nil && !errors.Is(err, errors.NotFound) {
- server.InternalServerError(response, err)
- return
- }
- server.Ok(response, detail)
- }
- func (m *Master) getUsers(request *restful.Request, response *restful.Response) {
- ctx := context.Background()
- if request != nil && request.Request != nil {
- ctx = request.Request.Context()
- }
- // Authorize
- cursor := request.QueryParameter("cursor")
- n, err := server.ParseInt(request, "n", m.Config.Server.DefaultN)
- if err != nil {
- server.BadRequest(response, err)
- return
- }
- // get all users
- cursor, users, err := m.DataClient.GetUsers(ctx, cursor, n)
- if err != nil {
- server.InternalServerError(response, err)
- return
- }
- details := make([]User, len(users))
- for i, user := range users {
- details[i].User = user
- if details[i].LastActiveTime, err = m.CacheClient.Get(ctx, cache.Key(cache.LastModifyUserTime, user.UserId)).Time(); err != nil && !errors.Is(err, errors.NotFound) {
- server.InternalServerError(response, err)
- return
- }
- if details[i].LastUpdateTime, err = m.CacheClient.Get(ctx, cache.Key(cache.LastUpdateUserRecommendTime, user.UserId)).Time(); err != nil && !errors.Is(err, errors.NotFound) {
- server.InternalServerError(response, err)
- return
- }
- }
- server.Ok(response, UserIterator{Cursor: cursor, Users: details})
- }
- func (m *Master) getRecommend(request *restful.Request, response *restful.Response) {
- ctx := context.Background()
- if request != nil && request.Request != nil {
- ctx = request.Request.Context()
- }
- // parse arguments
- recommender := request.PathParameter("recommender")
- userId := request.PathParameter("user-id")
- categories := []string{request.PathParameter("category")}
- n, err := server.ParseInt(request, "n", m.Config.Server.DefaultN)
- if err != nil {
- server.BadRequest(response, err)
- return
- }
- var results []string
- switch recommender {
- case "offline":
- results, err = m.Recommend(ctx, response, userId, categories, n, m.RecommendOffline)
- case "collaborative":
- results, err = m.Recommend(ctx, response, userId, categories, n, m.RecommendCollaborative)
- case "user_based":
- results, err = m.Recommend(ctx, response, userId, categories, n, m.RecommendUserBased)
- case "item_based":
- results, err = m.Recommend(ctx, response, userId, categories, n, m.RecommendItemBased)
- case "_":
- recommenders := []server.Recommender{m.RecommendOffline}
- for _, recommender := range m.Config.Recommend.Online.FallbackRecommend {
- switch recommender {
- case "collaborative":
- recommenders = append(recommenders, m.RecommendCollaborative)
- case "item_based":
- recommenders = append(recommenders, m.RecommendItemBased)
- case "user_based":
- recommenders = append(recommenders, m.RecommendUserBased)
- case "latest":
- recommenders = append(recommenders, m.RecommendLatest)
- case "popular":
- recommenders = append(recommenders, m.RecommendPopular)
- default:
- server.InternalServerError(response, fmt.Errorf("unknown fallback recommendation method `%s`", recommender))
- return
- }
- }
- results, err = m.Recommend(ctx, response, userId, categories, n, recommenders...)
- }
- if err != nil {
- server.InternalServerError(response, err)
- return
- }
- // Send result
- details := make([]data.Item, len(results))
- for i := range results {
- details[i], err = m.DataClient.GetItem(ctx, results[i])
- if err != nil {
- server.InternalServerError(response, err)
- return
- }
- }
- server.Ok(response, details)
- }
- type Feedback struct {
- FeedbackType string
- UserId string
- Item data.Item
- Timestamp time.Time
- Comment string
- }
- // get feedback by user-id with feedback type
- func (m *Master) getTypedFeedbackByUser(request *restful.Request, response *restful.Response) {
- ctx := context.Background()
- if request != nil && request.Request != nil {
- ctx = request.Request.Context()
- }
- feedbackType := request.PathParameter("feedback-type")
- userId := request.PathParameter("user-id")
- feedback, err := m.DataClient.GetUserFeedback(ctx, userId, m.Config.Now(), feedbackType)
- if err != nil {
- server.InternalServerError(response, err)
- return
- }
- details := make([]Feedback, len(feedback))
- for i := range feedback {
- details[i].FeedbackType = feedback[i].FeedbackType
- details[i].UserId = feedback[i].UserId
- details[i].Timestamp = feedback[i].Timestamp
- details[i].Comment = feedback[i].Comment
- details[i].Item, err = m.DataClient.GetItem(ctx, feedback[i].ItemId)
- if errors.Is(err, errors.NotFound) {
- details[i].Item = data.Item{ItemId: feedback[i].ItemId, Comment: "** This item doesn't exist in Gorse **"}
- } else if err != nil {
- server.InternalServerError(response, err)
- return
- }
- }
- server.Ok(response, details)
- }
- type ScoredItem struct {
- data.Item
- Score float64
- }
- type ScoreUser struct {
- data.User
- Score float64
- }
- func (m *Master) searchDocuments(collection, subset, category string, request *restful.Request, response *restful.Response, retType interface{}) {
- ctx := context.Background()
- if request != nil && request.Request != nil {
- ctx = request.Request.Context()
- }
- var n, offset int
- var err error
- // read arguments
- if offset, err = server.ParseInt(request, "offset", 0); err != nil {
- server.BadRequest(response, err)
- return
- }
- if n, err = server.ParseInt(request, "n", m.Config.Server.DefaultN); err != nil {
- server.BadRequest(response, err)
- return
- }
- // Get the popular list
- scores, err := m.CacheClient.SearchDocuments(ctx, collection, subset, []string{category}, offset, m.Config.Recommend.CacheSize)
- if err != nil {
- server.InternalServerError(response, err)
- return
- }
- if n > 0 && len(scores) > n {
- scores = scores[:n]
- }
- // Send result
- switch retType.(type) {
- case data.Item:
- details := make([]ScoredItem, len(scores))
- for i := range scores {
- details[i].Score = scores[i].Score
- details[i].Item, err = m.DataClient.GetItem(ctx, scores[i].Id)
- if err != nil {
- server.InternalServerError(response, err)
- return
- }
- }
- server.Ok(response, details)
- case data.User:
- details := make([]ScoreUser, len(scores))
- for i := range scores {
- details[i].Score = scores[i].Score
- details[i].User, err = m.DataClient.GetUser(ctx, scores[i].Id)
- if err != nil {
- server.InternalServerError(response, err)
- return
- }
- }
- server.Ok(response, details)
- default:
- log.ResponseLogger(response).Fatal("unknown return type", zap.Any("ret_type", reflect.TypeOf(retType)))
- }
- }
- // getPopular gets popular items from database.
- func (m *Master) getPopular(request *restful.Request, response *restful.Response) {
- category := request.PathParameter("category")
- m.searchDocuments(cache.PopularItems, "", category, request, response, data.Item{})
- }
- func (m *Master) getLatest(request *restful.Request, response *restful.Response) {
- category := request.PathParameter("category")
- m.searchDocuments(cache.LatestItems, "", category, request, response, data.Item{})
- }
- func (m *Master) getItemNeighbors(request *restful.Request, response *restful.Response) {
- itemId := request.PathParameter("item-id")
- m.searchDocuments(cache.ItemNeighbors, itemId, "", request, response, data.Item{})
- }
- func (m *Master) getItemCategorizedNeighbors(request *restful.Request, response *restful.Response) {
- itemId := request.PathParameter("item-id")
- category := request.PathParameter("category")
- m.searchDocuments(cache.ItemNeighbors, itemId, category, request, response, data.Item{})
- }
- func (m *Master) getUserNeighbors(request *restful.Request, response *restful.Response) {
- userId := request.PathParameter("user-id")
- m.searchDocuments(cache.UserNeighbors, userId, "", request, response, data.User{})
- }
- func (m *Master) importExportUsers(response http.ResponseWriter, request *http.Request) {
- ctx := context.Background()
- if request != nil {
- ctx = request.Context()
- }
- if !m.checkLogin(request) {
- resp := restful.NewResponse(response)
- err := resp.WriteErrorString(http.StatusUnauthorized, "unauthorized")
- if err != nil {
- server.InternalServerError(resp, err)
- return
- }
- return
- }
- switch request.Method {
- case http.MethodGet:
- var err error
- response.Header().Set("Content-Type", "text/csv")
- response.Header().Set("Content-Disposition", "attachment;filename=users.csv")
- // write header
- if _, err = response.Write([]byte("user_id,labels\r\n")); err != nil {
- server.InternalServerError(restful.NewResponse(response), err)
- return
- }
- // write rows
- userChan, errChan := m.DataClient.GetUserStream(ctx, batchSize)
- for users := range userChan {
- for _, user := range users {
- labels, err := json.Marshal(user.Labels)
- if err != nil {
- server.InternalServerError(restful.NewResponse(response), err)
- return
- }
- if _, err = response.Write([]byte(fmt.Sprintf("%s,%s\r\n",
- base.Escape(user.UserId), base.Escape(string(labels))))); err != nil {
- server.InternalServerError(restful.NewResponse(response), err)
- return
- }
- }
- }
- if err = <-errChan; err != nil {
- server.InternalServerError(restful.NewResponse(response), errors.Trace(err))
- return
- }
- case http.MethodPost:
- hasHeader := formValue(request, "has-header", "true") == "true"
- sep := formValue(request, "sep", ",")
- // field separator must be a single character
- if len(sep) != 1 {
- server.BadRequest(restful.NewResponse(response), fmt.Errorf("field separator must be a single character"))
- return
- }
- labelSep := formValue(request, "label-sep", "|")
- fmtString := formValue(request, "format", "ul")
- file, _, err := request.FormFile("file")
- if err != nil {
- server.BadRequest(restful.NewResponse(response), err)
- return
- }
- defer file.Close()
- m.importUsers(ctx, response, file, hasHeader, sep, labelSep, fmtString)
- }
- }
- func (m *Master) importUsers(ctx context.Context, response http.ResponseWriter, file io.Reader, hasHeader bool, sep, labelSep, fmtString string) {
- lineCount := 0
- timeStart := time.Now()
- users := make([]data.User, 0)
- err := base.ReadLines(bufio.NewScanner(file), sep, func(lineNumber int, splits []string) bool {
- var err error
- // skip header
- if hasHeader {
- hasHeader = false
- return true
- }
- splits, err = format(fmtString, "ul", splits, lineNumber)
- if err != nil {
- server.BadRequest(restful.NewResponse(response), err)
- return false
- }
- // 1. user id
- if err = base.ValidateId(splits[0]); err != nil {
- server.BadRequest(restful.NewResponse(response),
- fmt.Errorf("invalid user id `%v` at line %d (%s)", splits[0], lineNumber, err.Error()))
- return false
- }
- user := data.User{UserId: splits[0]}
- // 2. labels
- if splits[1] != "" {
- var labels any
- if err = json.Unmarshal([]byte(splits[1]), &labels); err != nil {
- server.BadRequest(restful.NewResponse(response),
- fmt.Errorf("invalid labels `%v` at line %d (%s)", splits[1], lineNumber, err.Error()))
- return false
- }
- user.Labels = labels
- }
- users = append(users, user)
- // batch insert
- if len(users) == batchSize {
- err = m.DataClient.BatchInsertUsers(ctx, users)
- if err != nil {
- server.InternalServerError(restful.NewResponse(response), err)
- return false
- }
- users = nil
- }
- lineCount++
- return true
- })
- if err != nil {
- server.BadRequest(restful.NewResponse(response), err)
- return
- }
- if len(users) > 0 {
- err = m.DataClient.BatchInsertUsers(ctx, users)
- if err != nil {
- server.InternalServerError(restful.NewResponse(response), err)
- return
- }
- }
- m.notifyDataImported()
- timeUsed := time.Since(timeStart)
- log.Logger().Info("complete import users",
- zap.Duration("time_used", timeUsed),
- zap.Int("num_users", lineCount))
- server.Ok(restful.NewResponse(response), server.Success{RowAffected: lineCount})
- }
- func (m *Master) importExportItems(response http.ResponseWriter, request *http.Request) {
- ctx := context.Background()
- if request != nil {
- ctx = request.Context()
- }
- if !m.checkLogin(request) {
- resp := restful.NewResponse(response)
- err := resp.WriteErrorString(http.StatusUnauthorized, "unauthorized")
- if err != nil {
- server.InternalServerError(resp, err)
- return
- }
- return
- }
- switch request.Method {
- case http.MethodGet:
- var err error
- response.Header().Set("Content-Type", "text/csv")
- response.Header().Set("Content-Disposition", "attachment;filename=items.csv")
- // write header
- if _, err = response.Write([]byte("item_id,is_hidden,categories,time_stamp,labels,description\r\n")); err != nil {
- server.InternalServerError(restful.NewResponse(response), err)
- return
- }
- // write rows
- itemChan, errChan := m.DataClient.GetItemStream(ctx, batchSize, nil)
- for items := range itemChan {
- for _, item := range items {
- labels, err := json.Marshal(item.Labels)
- if err != nil {
- server.InternalServerError(restful.NewResponse(response), err)
- return
- }
- if _, err = response.Write([]byte(fmt.Sprintf("%s,%t,%s,%v,%s,%s\r\n",
- base.Escape(item.ItemId), item.IsHidden, base.Escape(strings.Join(item.Categories, "|")),
- item.Timestamp, base.Escape(string(labels)), base.Escape(item.Comment)))); err != nil {
- server.InternalServerError(restful.NewResponse(response), err)
- return
- }
- }
- }
- if err = <-errChan; err != nil {
- server.InternalServerError(restful.NewResponse(response), errors.Trace(err))
- return
- }
- case http.MethodPost:
- hasHeader := formValue(request, "has-header", "true") == "true"
- sep := formValue(request, "sep", ",")
- // field separator must be a single character
- if len(sep) != 1 {
- server.BadRequest(restful.NewResponse(response), fmt.Errorf("field separator must be a single character"))
- return
- }
- labelSep := formValue(request, "label-sep", "|")
- fmtString := formValue(request, "format", "ihctld")
- file, _, err := request.FormFile("file")
- if err != nil {
- server.BadRequest(restful.NewResponse(response), err)
- return
- }
- defer file.Close()
- m.importItems(ctx, response, file, hasHeader, sep, labelSep, fmtString)
- default:
- writeError(response, http.StatusMethodNotAllowed, "method not allowed")
- }
- }
- func (m *Master) importItems(ctx context.Context, response http.ResponseWriter, file io.Reader, hasHeader bool, sep, labelSep, fmtString string) {
- lineCount := 0
- timeStart := time.Now()
- items := make([]data.Item, 0)
- err := base.ReadLines(bufio.NewScanner(file), sep, func(lineNumber int, splits []string) bool {
- var err error
- // skip header
- if hasHeader {
- hasHeader = false
- return true
- }
- splits, err = format(fmtString, "ihctld", splits, lineNumber)
- if err != nil {
- server.BadRequest(restful.NewResponse(response), err)
- return false
- }
- // 1. item id
- if err = base.ValidateId(splits[0]); err != nil {
- server.BadRequest(restful.NewResponse(response),
- fmt.Errorf("invalid item id `%v` at line %d (%s)", splits[0], lineNumber, err.Error()))
- return false
- }
- item := data.Item{ItemId: splits[0]}
- // 2. hidden
- if splits[1] != "" {
- item.IsHidden, err = strconv.ParseBool(splits[1])
- if err != nil {
- server.BadRequest(restful.NewResponse(response),
- fmt.Errorf("invalid hidden value `%v` at line %d (%s)", splits[1], lineNumber, err.Error()))
- return false
- }
- }
- // 3. categories
- if splits[2] != "" {
- item.Categories = strings.Split(splits[2], labelSep)
- for _, category := range item.Categories {
- if err = base.ValidateId(category); err != nil {
- server.BadRequest(restful.NewResponse(response),
- fmt.Errorf("invalid category `%v` at line %d (%s)", category, lineNumber, err.Error()))
- return false
- }
- }
- }
- // 4. timestamp
- if splits[3] != "" {
- item.Timestamp, err = dateparse.ParseAny(splits[3])
- if err != nil {
- server.BadRequest(restful.NewResponse(response),
- fmt.Errorf("failed to parse datetime `%v` at line %v", splits[1], lineNumber))
- return false
- }
- }
- // 5. labels
- if splits[4] != "" {
- var labels any
- if err = json.Unmarshal([]byte(splits[4]), &labels); err != nil {
- server.BadRequest(restful.NewResponse(response),
- fmt.Errorf("failed to parse labels `%v` at line %v", splits[4], lineNumber))
- return false
- }
- item.Labels = labels
- }
- // 6. comment
- item.Comment = splits[5]
- items = append(items, item)
- // batch insert
- if len(items) == batchSize {
- err = m.DataClient.BatchInsertItems(ctx, items)
- if err != nil {
- server.InternalServerError(restful.NewResponse(response), err)
- return false
- }
- items = nil
- }
- lineCount++
- return true
- })
- if err != nil {
- server.BadRequest(restful.NewResponse(response), err)
- return
- }
- if len(items) > 0 {
- err = m.DataClient.BatchInsertItems(ctx, items)
- if err != nil {
- server.InternalServerError(restful.NewResponse(response), err)
- return
- }
- }
- m.notifyDataImported()
- timeUsed := time.Since(timeStart)
- log.Logger().Info("complete import items",
- zap.Duration("time_used", timeUsed),
- zap.Int("num_items", lineCount))
- server.Ok(restful.NewResponse(response), server.Success{RowAffected: lineCount})
- }
- func format(inFmt, outFmt string, s []string, lineCount int) ([]string, error) {
- if len(s) < len(inFmt) {
- log.Logger().Error("number of fields mismatch",
- zap.Int("expect", len(inFmt)),
- zap.Int("actual", len(s)))
- return nil, fmt.Errorf("number of fields mismatch at line %v", lineCount)
- }
- if inFmt == outFmt {
- return s, nil
- }
- pool := make(map[uint8]string)
- for i := range inFmt {
- pool[inFmt[i]] = s[i]
- }
- out := make([]string, len(outFmt))
- for i, c := range outFmt {
- out[i] = pool[uint8(c)]
- }
- return out, nil
- }
- func formValue(request *http.Request, fieldName, defaultValue string) string {
- value := request.FormValue(fieldName)
- if value == "" {
- return defaultValue
- }
- return value
- }
- func (m *Master) importExportFeedback(response http.ResponseWriter, request *http.Request) {
- ctx := context.Background()
- if request != nil {
- ctx = request.Context()
- }
- if !m.checkLogin(request) {
- writeError(response, http.StatusUnauthorized, "unauthorized")
- return
- }
- switch request.Method {
- case http.MethodGet:
- var err error
- response.Header().Set("Content-Type", "text/csv")
- response.Header().Set("Content-Disposition", "attachment;filename=feedback.csv")
- // write header
- if _, err = response.Write([]byte("feedback_type,user_id,item_id,time_stamp\r\n")); err != nil {
- server.InternalServerError(restful.NewResponse(response), err)
- return
- }
- // write rows
- feedbackChan, errChan := m.DataClient.GetFeedbackStream(ctx, batchSize, data.WithEndTime(*m.Config.Now()))
- for feedback := range feedbackChan {
- for _, v := range feedback {
- if _, err = response.Write([]byte(fmt.Sprintf("%s,%s,%s,%v\r\n",
- base.Escape(v.FeedbackType), base.Escape(v.UserId), base.Escape(v.ItemId), v.Timestamp))); err != nil {
- server.InternalServerError(restful.NewResponse(response), err)
- return
- }
- }
- }
- if err = <-errChan; err != nil {
- server.InternalServerError(restful.NewResponse(response), errors.Trace(err))
- return
- }
- case http.MethodPost:
- hasHeader := formValue(request, "has-header", "true") == "true"
- sep := formValue(request, "sep", ",")
- // field separator must be a single character
- if len(sep) != 1 {
- server.BadRequest(restful.NewResponse(response), fmt.Errorf("field separator must be a single character"))
- return
- }
- fmtString := formValue(request, "format", "fuit")
- // import items
- file, _, err := request.FormFile("file")
- if err != nil {
- server.BadRequest(restful.NewResponse(response), err)
- return
- }
- defer file.Close()
- m.importFeedback(ctx, response, file, hasHeader, sep, fmtString)
- default:
- writeError(response, http.StatusMethodNotAllowed, "method not allowed")
- }
- }
- func (m *Master) importFeedback(ctx context.Context, response http.ResponseWriter, file io.Reader, hasHeader bool, sep, fmtString string) {
- var err error
- scanner := bufio.NewScanner(file)
- lineCount := 0
- timeStart := time.Now()
- feedbacks := make([]data.Feedback, 0)
- err = base.ReadLines(scanner, sep, func(lineNumber int, splits []string) bool {
- if hasHeader {
- hasHeader = false
- return true
- }
- // reorder fields
- splits, err = format(fmtString, "fuit", splits, lineNumber)
- if err != nil {
- server.BadRequest(restful.NewResponse(response), err)
- return false
- }
- feedback := data.Feedback{}
- // 1. feedback type
- feedback.FeedbackType = splits[0]
- if err = base.ValidateId(splits[0]); err != nil {
- server.BadRequest(restful.NewResponse(response),
- fmt.Errorf("invalid feedback type `%v` at line %d (%s)", splits[0], lineNumber, err.Error()))
- return false
- }
- // 2. user id
- if err = base.ValidateId(splits[1]); err != nil {
- server.BadRequest(restful.NewResponse(response),
- fmt.Errorf("invalid user id `%v` at line %d (%s)", splits[1], lineNumber, err.Error()))
- return false
- }
- feedback.UserId = splits[1]
- // 3. item id
- if err = base.ValidateId(splits[2]); err != nil {
- server.BadRequest(restful.NewResponse(response),
- fmt.Errorf("invalid item id `%v` at line %d (%s)", splits[2], lineNumber, err.Error()))
- return false
- }
- feedback.ItemId = splits[2]
- feedback.Timestamp, err = dateparse.ParseAny(splits[3])
- if err != nil {
- server.BadRequest(restful.NewResponse(response),
- fmt.Errorf("failed to parse datetime `%v` at line %d", splits[3], lineNumber))
- return false
- }
- feedbacks = append(feedbacks, feedback)
- // batch insert
- if len(feedbacks) == batchSize {
- // batch insert to data store
- err = m.DataClient.BatchInsertFeedback(ctx, feedbacks,
- m.Config.Server.AutoInsertUser,
- m.Config.Server.AutoInsertItem, true)
- if err != nil {
- server.InternalServerError(restful.NewResponse(response), err)
- return false
- }
- feedbacks = nil
- }
- lineCount++
- return true
- })
- if err != nil {
- server.BadRequest(restful.NewResponse(response), err)
- return
- }
- // insert to cache store
- if len(feedbacks) > 0 {
- // insert to data store
- err = m.DataClient.BatchInsertFeedback(ctx, feedbacks,
- m.Config.Server.AutoInsertUser,
- m.Config.Server.AutoInsertItem, true)
- if err != nil {
- server.InternalServerError(restful.NewResponse(response), err)
- return
- }
- }
- m.notifyDataImported()
- timeUsed := time.Since(timeStart)
- log.Logger().Info("complete import feedback",
- zap.Duration("time_used", timeUsed),
- zap.Int("num_items", lineCount))
- server.Ok(restful.NewResponse(response), server.Success{RowAffected: lineCount})
- }
- var checkList = mapset.NewSet("delete_users", "delete_items", "delete_feedback", "delete_cache")
- func (m *Master) purge(response http.ResponseWriter, request *http.Request) {
- // check method
- if request.Method != http.MethodPost {
- writeError(response, http.StatusMethodNotAllowed, "method not allowed")
- return
- }
- // check login
- if !m.checkLogin(request) {
- resp := restful.NewResponse(response)
- err := resp.WriteErrorString(http.StatusUnauthorized, "unauthorized")
- if err != nil {
- server.InternalServerError(resp, err)
- return
- }
- return
- }
- // check password
- if m.Config.Master.DashboardPassword == "" {
- writeError(response, http.StatusUnauthorized, "purge is not allowed without dashboard password")
- return
- }
- // check list
- if err := request.ParseForm(); err != nil {
- server.BadRequest(restful.NewResponse(response), err)
- return
- }
- checkedList := strings.Split(request.Form.Get("check_list"), ",")
- if !checkList.Equal(mapset.NewSet(checkedList...)) {
- writeError(response, http.StatusUnauthorized, "please confirm by checking all")
- return
- }
- // purge data
- if err := m.DataClient.Purge(); err != nil {
- writeError(response, http.StatusInternalServerError, err.Error())
- return
- }
- if err := m.CacheClient.Purge(); err != nil {
- writeError(response, http.StatusInternalServerError, err.Error())
- return
- }
- }
- func (m *Master) scheduleAPIHandler(writer http.ResponseWriter, request *http.Request) {
- if !m.checkAdmin(request) {
- writeError(writer, http.StatusUnauthorized, "unauthorized")
- return
- }
- switch request.Method {
- case http.MethodGet:
- writer.WriteHeader(http.StatusOK)
- bytes, err := json.Marshal(m.scheduleState)
- if err != nil {
- writeError(writer, http.StatusInternalServerError, err.Error())
- }
- if _, err = writer.Write(bytes); err != nil {
- writeError(writer, http.StatusInternalServerError, err.Error())
- }
- case http.MethodPost:
- s := request.FormValue("search_model")
- if s != "" {
- if searchModel, err := strconv.ParseBool(s); err != nil {
- writeError(writer, http.StatusBadRequest, err.Error())
- } else {
- m.scheduleState.SearchModel = searchModel
- }
- }
- m.triggerChan.Signal()
- default:
- writeError(writer, http.StatusMethodNotAllowed, "method not allowed")
- }
- }
- func writeError(response http.ResponseWriter, httpStatus int, message string) {
- log.Logger().Error(strings.ToLower(http.StatusText(httpStatus)), zap.String("error", message))
- response.Header().Set("Access-Control-Allow-Origin", "*")
- response.WriteHeader(httpStatus)
- if _, err := response.Write([]byte(message)); err != nil {
- log.Logger().Error("failed to write error", zap.Error(err))
- }
- }
- func (s *Master) checkAdmin(request *http.Request) bool {
- if s.Config.Master.AdminAPIKey == "" {
- return true
- }
- if request.FormValue("X-API-Key") == s.Config.Master.AdminAPIKey {
- return true
- }
- return false
- }
|