123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- // Copyright 2021 gorse Project Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package task
- import (
- "runtime"
- "sort"
- "sync"
- "github.com/samber/lo"
- "github.com/zhenghaoz/gorse/base/log"
- "go.uber.org/zap"
- "modernc.org/mathutil"
- )
- type JobsAllocator struct {
- numJobs int // the max number of jobs
- taskName string // its task name in scheduler
- scheduler *JobsScheduler
- }
- func NewConstantJobsAllocator(num int) *JobsAllocator {
- return &JobsAllocator{
- numJobs: num,
- }
- }
- func (allocator *JobsAllocator) MaxJobs() int {
- if allocator == nil || allocator.numJobs < 1 {
- // Return 1 for invalid allocator
- return 1
- }
- return allocator.numJobs
- }
- func (allocator *JobsAllocator) AvailableJobs() int {
- if allocator == nil || allocator.numJobs < 1 {
- // Return 1 for invalid allocator
- return 1
- } else if allocator.scheduler != nil {
- // Use jobs scheduler
- return allocator.scheduler.allocateJobsForTask(allocator.taskName, true)
- }
- return allocator.numJobs
- }
- // Init jobs allocation. This method is used to request allocation of jobs for the first time.
- func (allocator *JobsAllocator) Init() {
- if allocator.scheduler != nil {
- allocator.scheduler.allocateJobsForTask(allocator.taskName, true)
- }
- }
- // taskInfo represents a task in JobsScheduler.
- type taskInfo struct {
- name string // name of the task
- priority int // high priority tasks are allocated first
- privileged bool // privileged tasks are allocated first
- jobs int // number of jobs allocated to the task
- previous int // previous number of jobs allocated to the task
- }
- // JobsScheduler allocates jobs to multiple tasks.
- type JobsScheduler struct {
- *sync.Cond
- numJobs int // number of jobs
- freeJobs int // number of free jobs
- tasks map[string]*taskInfo
- }
- // NewJobsScheduler creates a JobsScheduler with num jobs.
- func NewJobsScheduler(num int) *JobsScheduler {
- if num <= 0 {
- // Use all cores if num is less than 1.
- num = runtime.NumCPU()
- }
- return &JobsScheduler{
- Cond: sync.NewCond(&sync.Mutex{}),
- numJobs: num,
- freeJobs: num,
- tasks: make(map[string]*taskInfo),
- }
- }
- // Register a task in the JobsScheduler. Registered tasks will be ignored and return false.
- func (s *JobsScheduler) Register(taskName string, priority int, privileged bool) bool {
- s.L.Lock()
- defer s.L.Unlock()
- if _, exits := s.tasks[taskName]; !exits {
- s.tasks[taskName] = &taskInfo{name: taskName, priority: priority, privileged: privileged}
- return true
- } else {
- return false
- }
- }
- // Unregister a task from the JobsScheduler.
- func (s *JobsScheduler) Unregister(taskName string) {
- s.L.Lock()
- defer s.L.Unlock()
- if task, exits := s.tasks[taskName]; exits {
- // Return allocated jobs.
- s.freeJobs += task.jobs
- delete(s.tasks, taskName)
- s.Broadcast()
- }
- }
- func (s *JobsScheduler) GetJobsAllocator(taskName string) *JobsAllocator {
- return &JobsAllocator{
- numJobs: s.numJobs,
- taskName: taskName,
- scheduler: s,
- }
- }
- func (s *JobsScheduler) allocateJobsForTask(taskName string, block bool) int {
- // Find current task and return the jobs temporarily.
- s.L.Lock()
- currentTask, exist := s.tasks[taskName]
- if !exist {
- panic("task not found")
- }
- s.freeJobs += currentTask.jobs
- currentTask.jobs = 0
- s.L.Unlock()
- s.L.Lock()
- defer s.L.Unlock()
- for {
- s.allocateJobsForAll()
- if currentTask.jobs == 0 && block {
- if currentTask.previous > 0 {
- log.Logger().Debug("suspend task", zap.String("task", currentTask.name))
- s.Broadcast()
- }
- s.Wait()
- } else {
- if currentTask.previous == 0 {
- log.Logger().Debug("resume task", zap.String("task", currentTask.name))
- }
- return currentTask.jobs
- }
- }
- }
- func (s *JobsScheduler) allocateJobsForAll() {
- // Separate privileged tasks and normal tasks
- privileged := make([]*taskInfo, 0)
- normal := make([]*taskInfo, 0)
- for _, task := range s.tasks {
- if task.privileged {
- privileged = append(privileged, task)
- } else {
- normal = append(normal, task)
- }
- }
- var tasks []*taskInfo
- if len(privileged) > 0 {
- tasks = privileged
- } else {
- tasks = normal
- }
- // allocate jobs
- sort.Slice(tasks, func(i, j int) bool {
- return tasks[i].priority > tasks[j].priority
- })
- for i, task := range tasks {
- if s.freeJobs == 0 {
- return
- }
- targetJobs := s.numJobs/len(tasks) + lo.If(i < s.numJobs%len(tasks), 1).Else(0)
- targetJobs = mathutil.Min(targetJobs, s.freeJobs)
- if task.jobs < targetJobs {
- if task.previous != targetJobs {
- log.Logger().Debug("reallocate jobs for task",
- zap.String("task", task.name),
- zap.Int("previous_jobs", task.previous),
- zap.Int("target_jobs", targetJobs))
- }
- s.freeJobs -= targetJobs - task.jobs
- task.jobs = targetJobs
- task.previous = task.jobs
- }
- }
- }
|