From 487852f07eb191ef56967b7b7d7f01537a55eabd Mon Sep 17 00:00:00 2001 From: Frédéric Guillot Date: Sun, 11 Nov 2018 15:32:48 -0800 Subject: Replace daemon and scheduler package with service package --- cli/cli.go | 33 +++++------ cli/daemon.go | 57 ++++++++++++++++++ daemon/daemon.go | 61 ------------------- daemon/doc.go | 10 ---- daemon/routes.go | 37 ------------ daemon/server.go | 94 ----------------------------- reader/browser/doc.go | 10 ++++ scheduler/doc.go | 10 ---- scheduler/scheduler.go | 44 -------------- scheduler/worker.go | 36 ------------ scheduler/worker_pool.go | 36 ------------ service/httpd/doc.go | 10 ++++ service/httpd/httpd.go | 130 +++++++++++++++++++++++++++++++++++++++++ service/scheduler/doc.go | 10 ++++ service/scheduler/scheduler.go | 46 +++++++++++++++ ui/handler.go | 4 +- ui/ui.go | 4 +- worker/doc.go | 10 ++++ worker/pool.go | 36 ++++++++++++ worker/worker.go | 32 ++++++++++ 20 files changed, 361 insertions(+), 349 deletions(-) create mode 100644 cli/daemon.go delete mode 100644 daemon/daemon.go delete mode 100644 daemon/doc.go delete mode 100644 daemon/routes.go delete mode 100644 daemon/server.go create mode 100644 reader/browser/doc.go delete mode 100644 scheduler/doc.go delete mode 100644 scheduler/scheduler.go delete mode 100644 scheduler/worker.go delete mode 100644 scheduler/worker_pool.go create mode 100644 service/httpd/doc.go create mode 100644 service/httpd/httpd.go create mode 100644 service/scheduler/doc.go create mode 100644 service/scheduler/scheduler.go create mode 100644 worker/doc.go create mode 100644 worker/pool.go create mode 100644 worker/worker.go diff --git a/cli/cli.go b/cli/cli.go index aa4d4b3..629367a 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -9,7 +9,6 @@ import ( "fmt" "miniflux.app/config" - "miniflux.app/daemon" "miniflux.app/database" "miniflux.app/logger" "miniflux.app/storage" @@ -17,27 +16,27 @@ import ( ) const ( - flagInfoHelp = "Show application information" - flagVersionHelp = "Show application version" - flagMigrateHelp = "Run SQL migrations" - flagFlsuhSessionsHelp = "Flush all sessions (disconnect users)" - flagCreateAdminHelp = "Create admin user" - flagResetPasswordHelp = "Reset user password" + flagInfoHelp = "Show application information" + flagVersionHelp = "Show application version" + flagMigrateHelp = "Run SQL migrations" + flagFlsuhSessionsHelp = "Flush all sessions (disconnect users)" + flagCreateAdminHelp = "Create admin user" + flagResetPasswordHelp = "Reset user password" flagResetFeedErrorsHelp = "Clear all feed errors for all users" - flagDebugModeHelp = "Show debug logs" + flagDebugModeHelp = "Show debug logs" ) // Parse parses command line arguments. func Parse() { var ( - flagInfo bool - flagVersion bool - flagMigrate bool - flagFlushSessions bool - flagCreateAdmin bool - flagResetPassword bool + flagInfo bool + flagVersion bool + flagMigrate bool + flagFlushSessions bool + flagCreateAdmin bool + flagResetPassword bool flagResetFeedErrors bool - flagDebugMode bool + flagDebugMode bool ) flag.BoolVar(&flagInfo, "info", false, flagInfoHelp) @@ -49,7 +48,7 @@ func Parse() { flag.BoolVar(&flagCreateAdmin, "create-admin", false, flagCreateAdminHelp) flag.BoolVar(&flagResetPassword, "reset-password", false, flagResetPasswordHelp) flag.BoolVar(&flagResetFeedErrors, "reset-feed-errors", false, flagResetFeedErrorsHelp) - flag.BoolVar(&flagDebugMode,"debug", false, flagDebugModeHelp) + flag.BoolVar(&flagDebugMode, "debug", false, flagDebugModeHelp) flag.Parse() cfg := config.NewConfig() @@ -111,5 +110,5 @@ func Parse() { createAdmin(store) } - daemon.Run(cfg, store) + startDaemon(cfg, store) } diff --git a/cli/daemon.go b/cli/daemon.go new file mode 100644 index 0000000..436729c --- /dev/null +++ b/cli/daemon.go @@ -0,0 +1,57 @@ +// Copyright 2018 Frédéric Guillot. All rights reserved. +// Use of this source code is governed by the Apache 2.0 +// license that can be found in the LICENSE file. + +package cli // import "miniflux.app/cli" + +import ( + "context" + "os" + "os/signal" + "runtime" + "syscall" + "time" + + "miniflux.app/config" + "miniflux.app/logger" + "miniflux.app/reader/feed" + "miniflux.app/service/scheduler" + "miniflux.app/service/httpd" + "miniflux.app/storage" + "miniflux.app/worker" +) + +func startDaemon(cfg *config.Config, store *storage.Storage) { + logger.Info("Starting Miniflux...") + + stop := make(chan os.Signal, 1) + signal.Notify(stop, os.Interrupt) + signal.Notify(stop, syscall.SIGTERM) + + feedHandler := feed.NewFeedHandler(store) + pool := worker.NewPool(feedHandler, cfg.WorkerPoolSize()) + + go scheduler.Serve(cfg, store, pool) + go showProcessStatistics() + + httpServer := httpd.Serve(cfg, store, pool, feedHandler) + + <-stop + logger.Info("Shutting down the process...") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + httpServer.Shutdown(ctx) + logger.Info("Process gracefully stopped") +} + +func showProcessStatistics() { + for { + var m runtime.MemStats + runtime.ReadMemStats(&m) + logger.Debug("Sys=%vK, InUse=%vK, HeapInUse=%vK, StackSys=%vK, StackInUse=%vK, GoRoutines=%d, NumCPU=%d", + m.Sys/1024, (m.Sys-m.HeapReleased)/1024, m.HeapInuse/1024, m.StackSys/1024, m.StackInuse/1024, + runtime.NumGoroutine(), runtime.NumCPU()) + time.Sleep(30 * time.Second) + } +} \ No newline at end of file diff --git a/daemon/daemon.go b/daemon/daemon.go deleted file mode 100644 index a1fbf8e..0000000 --- a/daemon/daemon.go +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright 2018 Frédéric Guillot. All rights reserved. -// Use of this source code is governed by the Apache 2.0 -// license that can be found in the LICENSE file. - -package daemon // import "miniflux.app/daemon" - -import ( - "context" - "os" - "os/signal" - "runtime" - "syscall" - "time" - - "miniflux.app/config" - "miniflux.app/logger" - "miniflux.app/reader/feed" - "miniflux.app/scheduler" - "miniflux.app/storage" -) - -// Run starts the daemon. -func Run(cfg *config.Config, store *storage.Storage) { - logger.Info("Starting Miniflux...") - - stop := make(chan os.Signal, 1) - signal.Notify(stop, os.Interrupt) - signal.Notify(stop, syscall.SIGTERM) - - go func() { - for { - var m runtime.MemStats - runtime.ReadMemStats(&m) - logger.Debug("Sys=%vK, InUse=%vK, HeapInUse=%vK, StackSys=%vK, StackInUse=%vK, GoRoutines=%d, NumCPU=%d", - m.Sys/1024, (m.Sys-m.HeapReleased)/1024, m.HeapInuse/1024, m.StackSys/1024, m.StackInuse/1024, - runtime.NumGoroutine(), runtime.NumCPU()) - time.Sleep(30 * time.Second) - } - }() - - feedHandler := feed.NewFeedHandler(store) - pool := scheduler.NewWorkerPool(feedHandler, cfg.WorkerPoolSize()) - server := newServer(cfg, store, pool, feedHandler) - - scheduler.NewFeedScheduler( - store, - pool, - cfg.PollingFrequency(), - cfg.BatchSize(), - ) - - scheduler.NewCleanupScheduler(store, cfg.CleanupFrequency()) - - <-stop - logger.Info("Shutting down the server...") - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - server.Shutdown(ctx) - logger.Info("Server gracefully stopped") -} diff --git a/daemon/doc.go b/daemon/doc.go deleted file mode 100644 index d530b3e..0000000 --- a/daemon/doc.go +++ /dev/null @@ -1,10 +0,0 @@ -// Copyright 2018 Frédéric Guillot. All rights reserved. -// Use of this source code is governed by the Apache 2.0 -// license that can be found in the LICENSE file. - -/* - -Package daemon handles the main application process. - -*/ -package daemon // import "miniflux.app/daemon" diff --git a/daemon/routes.go b/daemon/routes.go deleted file mode 100644 index b7cfedc..0000000 --- a/daemon/routes.go +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright 2018 Frédéric Guillot. All rights reserved. -// Use of this source code is governed by the Apache 2.0 -// license that can be found in the LICENSE file. - -package daemon // import "miniflux.app/daemon" - -import ( - "miniflux.app/api" - "miniflux.app/config" - "miniflux.app/fever" - "miniflux.app/middleware" - "miniflux.app/reader/feed" - "miniflux.app/scheduler" - "miniflux.app/storage" - "miniflux.app/ui" - - "github.com/gorilla/mux" -) - -func routes(cfg *config.Config, store *storage.Storage, feedHandler *feed.Handler, pool *scheduler.WorkerPool) *mux.Router { - router := mux.NewRouter() - middleware := middleware.New(cfg, store, router) - - if cfg.BasePath() != "" { - router = router.PathPrefix(cfg.BasePath()).Subrouter() - } - - router.Use(middleware.ClientIP) - router.Use(middleware.HeaderConfig) - router.Use(middleware.Logging) - - fever.Serve(router, cfg, store) - api.Serve(router, store, feedHandler) - ui.Serve(router, cfg, store, pool, feedHandler) - - return router -} diff --git a/daemon/server.go b/daemon/server.go deleted file mode 100644 index 38afd0a..0000000 --- a/daemon/server.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright 2018 Frédéric Guillot. All rights reserved. -// Use of this source code is governed by the Apache 2.0 -// license that can be found in the LICENSE file. - -package daemon // import "miniflux.app/daemon" - -import ( - "crypto/tls" - "net/http" - "time" - - "miniflux.app/config" - "miniflux.app/logger" - "miniflux.app/reader/feed" - "miniflux.app/scheduler" - "miniflux.app/storage" - - "golang.org/x/crypto/acme/autocert" -) - -func newServer(cfg *config.Config, store *storage.Storage, pool *scheduler.WorkerPool, feedHandler *feed.Handler) *http.Server { - certFile := cfg.CertFile() - keyFile := cfg.KeyFile() - certDomain := cfg.CertDomain() - certCache := cfg.CertCache() - server := &http.Server{ - ReadTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, - IdleTimeout: 60 * time.Second, - Addr: cfg.ListenAddr(), - Handler: routes(cfg, store, feedHandler, pool), - } - - if certDomain != "" && certCache != "" { - cfg.IsHTTPS = true - server.Addr = ":https" - certManager := autocert.Manager{ - Cache: autocert.DirCache(certCache), - Prompt: autocert.AcceptTOS, - HostPolicy: autocert.HostWhitelist(certDomain), - } - - // Handle http-01 challenge. - s := &http.Server{ - Handler: certManager.HTTPHandler(nil), - Addr: ":http", - } - go s.ListenAndServe() - - go func() { - logger.Info(`Listening on "%s" by using auto-configured certificate for "%s"`, server.Addr, certDomain) - if err := server.Serve(certManager.Listener()); err != http.ErrServerClosed { - logger.Fatal(`Server failed to start: %v`, err) - } - }() - } else if certFile != "" && keyFile != "" { - cfg.IsHTTPS = true - - // See https://blog.cloudflare.com/exposing-go-on-the-internet/ - // And https://wiki.mozilla.org/Security/Server_Side_TLS - server.TLSConfig = &tls.Config{ - MinVersion: tls.VersionTLS12, - PreferServerCipherSuites: true, - CurvePreferences: []tls.CurveID{ - tls.CurveP256, - tls.X25519, - }, - CipherSuites: []uint16{ - tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, - tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, - tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, - tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, - tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, - }, - } - - go func() { - logger.Info(`Listening on "%s" by using certificate "%s" and key "%s"`, server.Addr, certFile, keyFile) - if err := server.ListenAndServeTLS(certFile, keyFile); err != http.ErrServerClosed { - logger.Fatal(`Server failed to start: %v`, err) - } - }() - } else { - go func() { - logger.Info(`Listening on "%s" without TLS`, server.Addr) - if err := server.ListenAndServe(); err != http.ErrServerClosed { - logger.Fatal(`Server failed to start: %v`, err) - } - }() - } - - return server -} diff --git a/reader/browser/doc.go b/reader/browser/doc.go new file mode 100644 index 0000000..381a799 --- /dev/null +++ b/reader/browser/doc.go @@ -0,0 +1,10 @@ +// Copyright 2018 Frédéric Guillot. All rights reserved. +// Use of this source code is governed by the Apache 2.0 +// license that can be found in the LICENSE file. + +/* + +Package browser handles website crawling. + +*/ +package browser // import "miniflux.app/reader/browser" diff --git a/scheduler/doc.go b/scheduler/doc.go deleted file mode 100644 index 74a6f20..0000000 --- a/scheduler/doc.go +++ /dev/null @@ -1,10 +0,0 @@ -// Copyright 2018 Frédéric Guillot. All rights reserved. -// Use of this source code is governed by the Apache 2.0 -// license that can be found in the LICENSE file. - -/* - -Package scheduler implements the application internal scheduler. - -*/ -package scheduler // import "miniflux.app/scheduler" diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go deleted file mode 100644 index f0a8134..0000000 --- a/scheduler/scheduler.go +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2017 Frédéric Guillot. All rights reserved. -// Use of this source code is governed by the Apache 2.0 -// license that can be found in the LICENSE file. - -package scheduler // import "miniflux.app/scheduler" - -import ( - "time" - - "miniflux.app/logger" - "miniflux.app/storage" -) - -// NewFeedScheduler starts a new scheduler that push jobs to a pool of workers. -func NewFeedScheduler(store *storage.Storage, workerPool *WorkerPool, frequency, batchSize int) { - go func() { - c := time.Tick(time.Duration(frequency) * time.Minute) - for range c { - jobs, err := store.NewBatch(batchSize) - if err != nil { - logger.Error("[FeedScheduler] %v", err) - } else { - logger.Debug("[FeedScheduler] Pushing %d jobs", len(jobs)) - workerPool.Push(jobs) - } - } - }() -} - -// NewCleanupScheduler starts a new scheduler that clean old sessions and archive read items. -func NewCleanupScheduler(store *storage.Storage, frequency int) { - go func() { - c := time.Tick(time.Duration(frequency) * time.Hour) - for range c { - nbSessions := store.CleanOldSessions() - nbUserSessions := store.CleanOldUserSessions() - logger.Info("[CleanupScheduler] Cleaned %d sessions and %d user sessions", nbSessions, nbUserSessions) - - if err := store.ArchiveEntries(); err != nil { - logger.Error("[CleanupScheduler] %v", err) - } - } - }() -} diff --git a/scheduler/worker.go b/scheduler/worker.go deleted file mode 100644 index 68efed7..0000000 --- a/scheduler/worker.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright 2017 Frédéric Guillot. All rights reserved. -// Use of this source code is governed by the Apache 2.0 -// license that can be found in the LICENSE file. - -package scheduler // import "miniflux.app/scheduler" - -import ( - "time" - - "miniflux.app/logger" - "miniflux.app/model" - "miniflux.app/reader/feed" -) - -// Worker refreshes a feed in the background. -type Worker struct { - id int - feedHandler *feed.Handler -} - -// Run wait for a job and refresh the given feed. -func (w *Worker) Run(c chan model.Job) { - logger.Info("[Worker] #%d started", w.id) - - for { - job := <-c - logger.Debug("[Worker #%d] got userID=%d, feedID=%d", w.id, job.UserID, job.FeedID) - - err := w.feedHandler.RefreshFeed(job.UserID, job.FeedID) - if err != nil { - logger.Error("[Worker] %v", err) - } - - time.Sleep(time.Millisecond * 1000) - } -} diff --git a/scheduler/worker_pool.go b/scheduler/worker_pool.go deleted file mode 100644 index ce5a453..0000000 --- a/scheduler/worker_pool.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright 2017 Frédéric Guillot. All rights reserved. -// Use of this source code is governed by the Apache 2.0 -// license that can be found in the LICENSE file. - -package scheduler // import "miniflux.app/scheduler" - -import ( - "miniflux.app/model" - "miniflux.app/reader/feed" -) - -// WorkerPool handle a pool of workers. -type WorkerPool struct { - queue chan model.Job -} - -// Push send a list of jobs to the queue. -func (w *WorkerPool) Push(jobs model.JobList) { - for _, job := range jobs { - w.queue <- job - } -} - -// NewWorkerPool creates a pool of background workers. -func NewWorkerPool(feedHandler *feed.Handler, nbWorkers int) *WorkerPool { - workerPool := &WorkerPool{ - queue: make(chan model.Job), - } - - for i := 0; i < nbWorkers; i++ { - worker := &Worker{id: i, feedHandler: feedHandler} - go worker.Run(workerPool.queue) - } - - return workerPool -} diff --git a/service/httpd/doc.go b/service/httpd/doc.go new file mode 100644 index 0000000..8436b2b --- /dev/null +++ b/service/httpd/doc.go @@ -0,0 +1,10 @@ +// Copyright 2018 Frédéric Guillot. All rights reserved. +// Use of this source code is governed by the Apache 2.0 +// license that can be found in the LICENSE file. + +/* + +Package httpd implements the HTTP service. + +*/ +package httpd // import "miniflux.app/service/httpd" diff --git a/service/httpd/httpd.go b/service/httpd/httpd.go new file mode 100644 index 0000000..4ab7cfd --- /dev/null +++ b/service/httpd/httpd.go @@ -0,0 +1,130 @@ +// Copyright 2018 Frédéric Guillot. All rights reserved. +// Use of this source code is governed by the Apache 2.0 +// license that can be found in the LICENSE file. + +package httpd // import "miniflux.app/service/httpd" + +import ( + "crypto/tls" + "net/http" + "time" + + "miniflux.app/api" + "miniflux.app/config" + "miniflux.app/fever" + "miniflux.app/logger" + "miniflux.app/middleware" + "miniflux.app/reader/feed" + "miniflux.app/storage" + "miniflux.app/ui" + "miniflux.app/worker" + + "github.com/gorilla/mux" + "golang.org/x/crypto/acme/autocert" +) + +// Serve starts a new HTTP server. +func Serve(cfg *config.Config, store *storage.Storage, pool *worker.Pool, feedHandler *feed.Handler) *http.Server { + certFile := cfg.CertFile() + keyFile := cfg.KeyFile() + certDomain := cfg.CertDomain() + certCache := cfg.CertCache() + server := &http.Server{ + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + IdleTimeout: 60 * time.Second, + Addr: cfg.ListenAddr(), + Handler: setupHandler(cfg, store, feedHandler, pool), + } + + if certDomain != "" && certCache != "" { + cfg.IsHTTPS = true + startAutoCertTLSServer(server, certDomain, certCache) + } else if certFile != "" && keyFile != "" { + cfg.IsHTTPS = true + startTLSServer(server, certFile, keyFile) + } else { + startHTTPServer(server) + } + + return server +} + +func startAutoCertTLSServer(server *http.Server, certDomain, certCache string) { + server.Addr = ":https" + certManager := autocert.Manager{ + Cache: autocert.DirCache(certCache), + Prompt: autocert.AcceptTOS, + HostPolicy: autocert.HostWhitelist(certDomain), + } + + // Handle http-01 challenge. + s := &http.Server{ + Handler: certManager.HTTPHandler(nil), + Addr: ":http", + } + go s.ListenAndServe() + + go func() { + logger.Info(`Listening on %q by using auto-configured certificate for %q`, server.Addr, certDomain) + if err := server.Serve(certManager.Listener()); err != http.ErrServerClosed { + logger.Fatal(`Server failed to start: %v`, err) + } + }() +} + +func startTLSServer(server *http.Server, certFile, keyFile string) { + // See https://blog.cloudflare.com/exposing-go-on-the-internet/ + // And https://wiki.mozilla.org/Security/Server_Side_TLS + server.TLSConfig = &tls.Config{ + MinVersion: tls.VersionTLS12, + PreferServerCipherSuites: true, + CurvePreferences: []tls.CurveID{ + tls.CurveP256, + tls.X25519, + }, + CipherSuites: []uint16{ + tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + }, + } + + go func() { + logger.Info(`Listening on %q by using certificate %q and key %q`, server.Addr, certFile, keyFile) + if err := server.ListenAndServeTLS(certFile, keyFile); err != http.ErrServerClosed { + logger.Fatal(`Server failed to start: %v`, err) + } + }() +} + +func startHTTPServer(server *http.Server) { + go func() { + logger.Info(`Listening on %q without TLS`, server.Addr) + if err := server.ListenAndServe(); err != http.ErrServerClosed { + logger.Fatal(`Server failed to start: %v`, err) + } + }() +} + +func setupHandler(cfg *config.Config, store *storage.Storage, feedHandler *feed.Handler, pool *worker.Pool) *mux.Router { + router := mux.NewRouter() + middleware := middleware.New(cfg, store, router) + + if cfg.BasePath() != "" { + router = router.PathPrefix(cfg.BasePath()).Subrouter() + } + + router.Use(middleware.ClientIP) + router.Use(middleware.HeaderConfig) + router.Use(middleware.Logging) + + fever.Serve(router, cfg, store) + api.Serve(router, store, feedHandler) + ui.Serve(router, cfg, store, pool, feedHandler) + + return router +} diff --git a/service/scheduler/doc.go b/service/scheduler/doc.go new file mode 100644 index 0000000..94615ad --- /dev/null +++ b/service/scheduler/doc.go @@ -0,0 +1,10 @@ +// Copyright 2018 Frédéric Guillot. All rights reserved. +// Use of this source code is governed by the Apache 2.0 +// license that can be found in the LICENSE file. + +/* + +Package scheduler implements the scheduler service. + +*/ +package scheduler // import "miniflux.app/service/scheduler" diff --git a/service/scheduler/scheduler.go b/service/scheduler/scheduler.go new file mode 100644 index 0000000..31c63e7 --- /dev/null +++ b/service/scheduler/scheduler.go @@ -0,0 +1,46 @@ +// Copyright 2018 Frédéric Guillot. All rights reserved. +// Use of this source code is governed by the Apache 2.0 +// license that can be found in the LICENSE file. + +package scheduler // import "miniflux.app/service/scheduler" + +import ( + "time" + + "miniflux.app/config" + "miniflux.app/logger" + "miniflux.app/storage" + "miniflux.app/worker" +) + +// Serve starts the internal scheduler. +func Serve(cfg *config.Config, store *storage.Storage, pool *worker.Pool) { + go feedScheduler(store, pool, cfg.PollingFrequency(), cfg.BatchSize()) + go cleanupScheduler(store, cfg.CleanupFrequency()) +} + +func feedScheduler(store *storage.Storage, pool *worker.Pool, frequency, batchSize int) { + c := time.Tick(time.Duration(frequency) * time.Minute) + for range c { + jobs, err := store.NewBatch(batchSize) + if err != nil { + logger.Error("[Scheduler:Feed] %v", err) + } else { + logger.Debug("[Scheduler:Feed] Pushing %d jobs", len(jobs)) + pool.Push(jobs) + } + } +} + +func cleanupScheduler(store *storage.Storage, frequency int) { + c := time.Tick(time.Duration(frequency) * time.Hour) + for range c { + nbSessions := store.CleanOldSessions() + nbUserSessions := store.CleanOldUserSessions() + logger.Info("[Scheduler:Cleanup] Cleaned %d sessions and %d user sessions", nbSessions, nbUserSessions) + + if err := store.ArchiveEntries(); err != nil { + logger.Error("[Scheduler:Cleanup] %v", err) + } + } +} diff --git a/ui/handler.go b/ui/handler.go index 39fe560..2bccf61 100644 --- a/ui/handler.go +++ b/ui/handler.go @@ -7,9 +7,9 @@ package ui // import "miniflux.app/ui" import ( "miniflux.app/config" "miniflux.app/reader/feed" - "miniflux.app/scheduler" "miniflux.app/storage" "miniflux.app/template" + "miniflux.app/worker" "github.com/gorilla/mux" ) @@ -19,6 +19,6 @@ type handler struct { cfg *config.Config store *storage.Storage tpl *template.Engine - pool *scheduler.WorkerPool + pool *worker.Pool feedHandler *feed.Handler } diff --git a/ui/ui.go b/ui/ui.go index 3577636..0a41042 100644 --- a/ui/ui.go +++ b/ui/ui.go @@ -9,15 +9,15 @@ import ( "miniflux.app/config" "miniflux.app/reader/feed" - "miniflux.app/scheduler" "miniflux.app/storage" "miniflux.app/template" + "miniflux.app/worker" "github.com/gorilla/mux" ) // Serve declares all routes for the user interface. -func Serve(router *mux.Router, cfg *config.Config, store *storage.Storage, pool *scheduler.WorkerPool, feedHandler *feed.Handler) { +func Serve(router *mux.Router, cfg *config.Config, store *storage.Storage, pool *worker.Pool, feedHandler *feed.Handler) { middleware := newMiddleware(router, cfg, store) handler := &handler{router, cfg, store, template.NewEngine(cfg, router), pool, feedHandler} diff --git a/worker/doc.go b/worker/doc.go new file mode 100644 index 0000000..816f8ad --- /dev/null +++ b/worker/doc.go @@ -0,0 +1,10 @@ +// Copyright 2018 Frédéric Guillot. All rights reserved. +// Use of this source code is governed by the Apache 2.0 +// license that can be found in the LICENSE file. + +/* + +Package worker implements the background workers. + +*/ +package worker // import "miniflux.app/worker" diff --git a/worker/pool.go b/worker/pool.go new file mode 100644 index 0000000..909e815 --- /dev/null +++ b/worker/pool.go @@ -0,0 +1,36 @@ +// Copyright 2018 Frédéric Guillot. All rights reserved. +// Use of this source code is governed by the Apache 2.0 +// license that can be found in the LICENSE file. + +package worker // import "miniflux.app/worker" + +import ( + "miniflux.app/model" + "miniflux.app/reader/feed" +) + +// Pool handles a pool of workers. +type Pool struct { + queue chan model.Job +} + +// Push send a list of jobs to the queue. +func (p *Pool) Push(jobs model.JobList) { + for _, job := range jobs { + p.queue <- job + } +} + +// NewPool creates a pool of background workers. +func NewPool(feedHandler *feed.Handler, nbWorkers int) *Pool { + workerPool := &Pool{ + queue: make(chan model.Job), + } + + for i := 0; i < nbWorkers; i++ { + worker := &Worker{id: i, feedHandler: feedHandler} + go worker.Run(workerPool.queue) + } + + return workerPool +} diff --git a/worker/worker.go b/worker/worker.go new file mode 100644 index 0000000..beff209 --- /dev/null +++ b/worker/worker.go @@ -0,0 +1,32 @@ +// Copyright 2017 Frédéric Guillot. All rights reserved. +// Use of this source code is governed by the Apache 2.0 +// license that can be found in the LICENSE file. + +package worker // import "miniflux.app/worker" + +import ( + "miniflux.app/logger" + "miniflux.app/model" + "miniflux.app/reader/feed" +) + +// Worker refreshes a feed in the background. +type Worker struct { + id int + feedHandler *feed.Handler +} + +// Run wait for a job and refresh the given feed. +func (w *Worker) Run(c chan model.Job) { + logger.Info("[Worker] #%d started", w.id) + + for { + job := <-c + logger.Debug("[Worker #%d] got userID=%d, feedID=%d", w.id, job.UserID, job.FeedID) + + err := w.feedHandler.RefreshFeed(job.UserID, job.FeedID) + if err != nil { + logger.Error("[Worker] %v", err) + } + } +} -- cgit v1.2.3