From a2bfd1682be3917f87f876b6d4436f6581915139 Mon Sep 17 00:00:00 2001 From: Nazar Kanaev Date: Wed, 24 Mar 2021 12:07:15 +0000 Subject: [PATCH] rewrite worker --- src/server/routes.go | 4 +- src/server/server.go | 7 +- src/worker/worker.go | 227 +++++++++++++++++++++---------------------- 3 files changed, 121 insertions(+), 117 deletions(-) diff --git a/src/server/routes.go b/src/server/routes.go index 7b5ac02..453033c 100644 --- a/src/server/routes.go +++ b/src/server/routes.go @@ -116,7 +116,7 @@ func (s *Server) handleFolder(c *router.Context) { func (s *Server) handleFeedRefresh(c *router.Context) { if c.Req.Method == "POST" { - s.worker.FetchAllFeeds() + s.worker.RefreshFeeds() c.Out.WriteHeader(http.StatusOK) } else { c.Out.WriteHeader(http.StatusMethodNotAllowed) @@ -342,7 +342,7 @@ func (s *Server) handleOPMLImport(c *router.Context) { } } - s.worker.FetchAllFeeds() + s.worker.RefreshFeeds() c.Out.WriteHeader(http.StatusOK) } else { c.Out.WriteHeader(http.StatusMethodNotAllowed) diff --git a/src/server/server.go b/src/server/server.go index cc1f958..708aded 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -39,7 +39,12 @@ func (h *Server) GetAddr() string { } func (s *Server) Start() { - s.worker.Start() + refreshRate := s.db.GetSettingsValueInt64("refresh_rate") + s.worker.StartFeedCleaner() + s.worker.SetRefreshRate(refreshRate) + if refreshRate > 0 { + s.worker.RefreshFeeds() + } httpserver := &http.Server{Addr: s.Addr, Handler: s.handler()} diff --git a/src/worker/worker.go b/src/worker/worker.go index 58587cf..3bd2c04 100644 --- a/src/worker/worker.go +++ b/src/worker/worker.go @@ -1,132 +1,131 @@ package worker import ( - "github.com/nkanaev/yarr/src/storage" "log" - "runtime" + "sync" "sync/atomic" "time" + + "github.com/nkanaev/yarr/src/storage" ) type Worker struct { - db *storage.Storage - - feedQueue chan storage.Feed - queueSize *int32 - refreshRate chan int64 + db *storage.Storage + pending *int32 + refresh *time.Ticker + reflock sync.Mutex + stopper chan bool } func NewWorker(db *storage.Storage) *Worker { - queueSize := int32(0) - return &Worker{ - db: db, - feedQueue: make(chan storage.Feed, 3000), - queueSize: &queueSize, - refreshRate: make(chan int64), - } -} - -func (w *Worker) Start() { - delTicker := time.NewTicker(time.Hour * 24) - - syncSearchChannel := make(chan bool, 10) - var syncSearchTimer *time.Timer // TODO: should this be atomic? - - syncSearch := func() { - if syncSearchTimer == nil { - syncSearchTimer = time.AfterFunc(time.Second*2, func() { - syncSearchChannel <- true - }) - } else { - syncSearchTimer.Reset(time.Second * 2) - } - } - - worker := func() { - for { - select { - case feed := <-w.feedQueue: - items, err := listItems(feed, w.db) - atomic.AddInt32(w.queueSize, -1) - if err != nil { - log.Printf("Failed to fetch %s (%d): %s", feed.FeedLink, feed.Id, err) - w.db.SetFeedError(feed.Id, err) - continue - } - w.db.CreateItems(items) - syncSearch() - if !feed.HasIcon { - icon, err := FindFavicon(feed.Link, feed.FeedLink) - if icon != nil { - w.db.UpdateFeedIcon(feed.Id, icon) - } - if err != nil { - log.Printf("Failed to search favicon for %s (%s): %s", feed.Link, feed.FeedLink, err) - } - } - case <-delTicker.C: - w.db.DeleteOldItems() - case <-syncSearchChannel: - w.db.SyncSearch() - } - } - } - - num := runtime.NumCPU() - 1 - if num < 1 { - num = 1 - } - for i := 0; i < num; i++ { - go worker() - } - go w.db.DeleteOldItems() - go w.db.SyncSearch() - - go func() { - var refreshTicker *time.Ticker - refreshTick := make(<-chan time.Time) - for { - select { - case <-refreshTick: - w.FetchAllFeeds() - case val := <-w.refreshRate: - if refreshTicker != nil { - refreshTicker.Stop() - if val == 0 { - refreshTick = make(<-chan time.Time) - } - } - if val > 0 { - refreshTicker = time.NewTicker(time.Duration(val) * time.Minute) - refreshTick = refreshTicker.C - } - } - } - }() - refreshRate := w.db.GetSettingsValueInt64("refresh_rate") - w.refreshRate <- refreshRate - if refreshRate > 0 { - w.FetchAllFeeds() - } -} - -func (w *Worker) FetchAllFeeds() { - log.Print("Refreshing all feeds") - w.db.ResetFeedErrors() - for _, feed := range w.db.ListFeeds() { - w.fetchFeed(feed) - } -} - -func (w *Worker) fetchFeed(feed storage.Feed) { - atomic.AddInt32(w.queueSize, 1) - w.feedQueue <- feed + pending := int32(0) + return &Worker{db: db, pending: &pending} } func (w *Worker) FeedsPending() int32 { - return *w.queueSize + return *w.pending } -func (w *Worker) SetRefreshRate(val int64) { - w.refreshRate <- val +func (w *Worker) StartFeedCleaner() { + go w.db.DeleteOldItems() + ticker := time.NewTicker(time.Hour * 24) + go func() { + for { + <-ticker.C + w.db.DeleteOldItems() + } + }() +} + +func (w *Worker) SetRefreshRate(minute int64) { + if w.stopper != nil { + w.refresh.Stop() + w.refresh = nil + w.stopper <- true + w.stopper = nil + } + + if minute == 0 { + return + } + + w.stopper = make(chan bool) + w.refresh = time.NewTicker(time.Minute * time.Duration(minute)) + + go func(fire <-chan time.Time, stop <-chan bool, m int64) { + log.Printf("auto-refresh %dm: starting", m) + for { + select { + case <-fire: + log.Printf("auto-refresh %dm: firing", m) + w.RefreshFeeds() + case <-stop: + log.Printf("auto-refresh %dm: stopping", m) + return + } + } + }(w.refresh.C, w.stopper, minute) +} + +func (w *Worker) RefreshFeeds() { + log.Print("Refreshing feeds") + go w.refresher() +} + +func (w *Worker) refresher() { + w.reflock.Lock() + + w.db.ResetFeedErrors() + + feeds := w.db.ListFeeds() + if len(feeds) == 0 { + return + } + + atomic.StoreInt32(w.pending, int32(len(feeds))) + + srcqueue := make(chan storage.Feed, len(feeds)) + dstqueue := make(chan []storage.Item) + + // hardcoded to 4 workers ;) + go w.worker(srcqueue, dstqueue) + go w.worker(srcqueue, dstqueue) + go w.worker(srcqueue, dstqueue) + go w.worker(srcqueue, dstqueue) + + for _, feed := range feeds { + srcqueue <- feed + } + for i := 0; i < len(feeds); i++ { + w.db.CreateItems(<-dstqueue) + atomic.AddInt32(w.pending, -1) + } + close(srcqueue) + close(dstqueue) + + w.db.SyncSearch() + log.Printf("Finished refreshing %d feeds", len(feeds)) + + w.reflock.Unlock() +} + +func (w *Worker) worker(srcqueue <-chan storage.Feed, dstqueue chan<- []storage.Item) { + for feed := range srcqueue { + items, err := listItems(feed, w.db) + if err != nil { + w.db.SetFeedError(feed.Id, err) + } + dstqueue <- items + + // TODO: move somewhere else + if !feed.HasIcon { + icon, err := FindFavicon(feed.Link, feed.FeedLink) + if icon != nil { + w.db.UpdateFeedIcon(feed.Id, icon) + } + if err != nil { + log.Printf("Failed to find favicon for %s (%s): %s", feed.FeedLink, feed.Link, err) + } + } + } }