diff --git a/src/assets/javascripts/app.js b/src/assets/javascripts/app.js index b81e949..9e3c567 100644 --- a/src/assets/javascripts/app.js +++ b/src/assets/javascripts/app.js @@ -633,10 +633,7 @@ var vm = new Vue({ fetchAllFeeds: function() { if (this.loading.feeds) return api.feeds.refresh().then(function() { - // TODO: this is hacky. come up with something decent - setTimeout(function() { - vm.refreshStats() - }, 1000) + vm.refreshStats() }) }, computeStats: function() { diff --git a/src/worker/worker.go b/src/worker/worker.go index c516bf8..f264737 100644 --- a/src/worker/worker.go +++ b/src/worker/worker.go @@ -9,6 +9,8 @@ import ( "github.com/nkanaev/yarr/src/storage" ) +const NUM_WORKERS = 4 + type Worker struct { db *storage.Storage pending *int32 @@ -89,30 +91,34 @@ func (w *Worker) SetRefreshRate(minute int64) { } func (w *Worker) RefreshFeeds() { - log.Print("Refreshing feeds") - go w.refresher() -} - -func (w *Worker) refresher() { w.reflock.Lock() + defer w.reflock.Unlock() - w.db.ResetFeedErrors() - - feeds := w.db.ListFeeds() - if len(feeds) == 0 { + if *w.pending > 0 { + log.Print("Refreshing already in progress") return } + feeds := w.db.ListFeeds() + if len(feeds) == 0 { + log.Print("Nothing to refresh") + return + } + + log.Print("Refreshing feeds") atomic.StoreInt32(w.pending, int32(len(feeds))) + go w.refresher(feeds) +} + +func (w *Worker) refresher(feeds []storage.Feed) { + w.db.ResetFeedErrors() srcqueue := make(chan storage.Feed, len(feeds)) dstqueue := make(chan []storage.Item) - // hardcoded to 4 workers ;) - go w.worker(srcqueue, dstqueue) - go w.worker(srcqueue, dstqueue) - go w.worker(srcqueue, dstqueue) - go w.worker(srcqueue, dstqueue) + for i := 0; i < NUM_WORKERS; i++ { + go w.worker(srcqueue, dstqueue) + } for _, feed := range feeds { srcqueue <- feed @@ -120,14 +126,12 @@ func (w *Worker) refresher() { for i := 0; i < len(feeds); i++ { w.db.CreateItems(<-dstqueue) atomic.AddInt32(w.pending, -1) + w.db.SyncSearch() } close(srcqueue) close(dstqueue) - w.db.SyncSearch() log.Printf("Finished refreshing %d feeds", len(feeds)) - - w.reflock.Unlock() } func (w *Worker) worker(srcqueue <-chan storage.Feed, dstqueue chan<- []storage.Item) {