diff options
Diffstat (limited to 'scheduler')
-rw-r--r-- | scheduler/scheduler.go | 24 | ||||
-rw-r--r-- | scheduler/worker_pool.go | 8 |
2 files changed, 19 insertions, 13 deletions
diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index ab87e99..acfb401 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -5,20 +5,24 @@ package scheduler import ( - "github.com/miniflux/miniflux2/storage" "log" "time" + + "github.com/miniflux/miniflux2/storage" ) -// NewScheduler starts a new scheduler to push jobs to a pool of workers. +// NewScheduler starts a new scheduler that push jobs to a pool of workers. func NewScheduler(store *storage.Storage, workerPool *WorkerPool, frequency, batchSize int) { - c := time.Tick(time.Duration(frequency) * time.Minute) - for now := range c { - jobs := store.GetJobs(batchSize) - log.Printf("[Scheduler:%v] => Pushing %d jobs\n", now, len(jobs)) - - for _, job := range jobs { - workerPool.Push(job) + go func() { + c := time.Tick(time.Duration(frequency) * time.Minute) + for now := range c { + jobs, err := store.NewBatch(batchSize) + if err != nil { + log.Println("[Scheduler]", err) + } else { + log.Printf("[Scheduler:%v] => Pushing %d jobs\n", now, len(jobs)) + workerPool.Push(jobs) + } } - } + }() } diff --git a/scheduler/worker_pool.go b/scheduler/worker_pool.go index b753f89..c4a8372 100644 --- a/scheduler/worker_pool.go +++ b/scheduler/worker_pool.go @@ -14,9 +14,11 @@ type WorkerPool struct { queue chan model.Job } -// Push send a job on the queue. -func (w *WorkerPool) Push(job model.Job) { - w.queue <- job +// Push send a list of jobs to the queue. +func (w *WorkerPool) Push(jobs model.JobList) { + for _, job := range jobs { + w.queue <- job + } } // NewWorkerPool creates a pool of background workers. |