diff --git a/server/handlers.go b/server/handlers.go index ba576eb..dd7991a 100644 --- a/server/handlers.go +++ b/server/handlers.go @@ -62,7 +62,7 @@ func StaticHandler(rw http.ResponseWriter, req *http.Request) { func StatusHandler(rw http.ResponseWriter, req *http.Request) { writeJSON(rw, map[string]interface{}{ - "running": handler(req).fetchRunning, + "running": handler(req).queueSize > 0, "stats": db(req).FeedStats(), }) } diff --git a/server/server.go b/server/server.go index a428a8d..b4605e2 100644 --- a/server/server.go +++ b/server/server.go @@ -5,23 +5,24 @@ import ( "github.com/nkanaev/yarr/storage" "log" "net/http" + "runtime" + "sync/atomic" + "time" ) type Handler struct { db *storage.Storage log *log.Logger - fetchRunning bool feedQueue chan storage.Feed - counter chan int - queueSize int + queueSize int32 } func New(db *storage.Storage, logger *log.Logger) *Handler { return &Handler{ db: db, log: logger, - feedQueue: make(chan storage.Feed), - counter: make(chan int), + feedQueue: make(chan storage.Feed, 1000), + queueSize: 0, } } @@ -43,35 +44,41 @@ func (h Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { } func (h *Handler) startJobs() { - h.db.DeleteOldItems() - go func() { + delTicker := time.NewTicker(time.Hour * 24) + worker := func() { for { - feed := <-h.feedQueue - items := listItems(feed) - h.db.CreateItems(items) + select { + case feed := <-h.feedQueue: + items := listItems(feed) + h.db.CreateItems(items) + atomic.AddInt32(&h.queueSize, -1) + case <- delTicker.C: + h.db.DeleteOldItems() + } } - }() - go func() { - for { - val := <-h.counter - h.queueSize += val - } - }() + } + + num := runtime.NumCPU() - 1 + if num < 1 { + num = 1 + } + for i := 0; i < num; i++ { + go worker() + } + go h.db.DeleteOldItems() go h.db.SyncSearch() - h.fetchAllFeeds() -} -func (h *Handler) fetchFeed(feed storage.Feed) { - h.queueSize += 1 - h.feedQueue <- feed -} - -func (h *Handler) fetchAllFeeds() { + // fetch all feeds for _, feed := range h.db.ListFeeds() { h.fetchFeed(feed) } } +func (h *Handler) fetchFeed(feed storage.Feed) { + atomic.AddInt32(&h.queueSize, 1) + h.feedQueue <- feed +} + func Vars(req *http.Request) map[string]string { if rv := req.Context().Value(ctxVars); rv != nil { return rv.(map[string]string) diff --git a/template/index.html b/template/index.html index e1f2ac4..eaa90c4 100644 --- a/template/index.html +++ b/template/index.html @@ -196,7 +196,7 @@