package worker import ( "log" "sync" "sync/atomic" "time" "github.com/nkanaev/yarr/src/storage" ) const NUM_WORKERS = 4 type Worker struct { db *storage.Storage pending *int32 refresh *time.Ticker reflock sync.Mutex stopper chan bool } func NewWorker(db *storage.Storage) *Worker { pending := int32(0) return &Worker{db: db, pending: &pending} } func (w *Worker) FeedsPending() int32 { return *w.pending } func (w *Worker) StartFeedCleaner() { go w.db.DeleteOldItems() ticker := time.NewTicker(time.Hour * 24) go func() { for { <-ticker.C w.db.DeleteOldItems() } }() } func (w *Worker) FindFeedFavicon(feed storage.Feed) { icon, err := findFavicon(feed.Link, feed.FeedLink) if err != nil { log.Printf("Failed to find favicon for %s (%s): %s", feed.FeedLink, feed.Link, err) } if icon != nil { w.db.UpdateFeed(feed.Id, storage.UpdateFeedParams{Icon: storage.SetNullable(icon)}) } } func (w *Worker) SetRefreshRate(minute int64) { if w.stopper != nil { w.refresh.Stop() w.refresh = nil w.stopper <- true w.stopper = nil } if minute == 0 { return } w.stopper = make(chan bool) w.refresh = time.NewTicker(time.Minute * time.Duration(minute)) go func(fire <-chan time.Time, stop <-chan bool, m int64) { log.Printf("auto-refresh %dm: starting", m) for { select { case <-fire: log.Printf("auto-refresh %dm: firing", m) w.RefreshFeeds() case <-stop: log.Printf("auto-refresh %dm: stopping", m) return } } }(w.refresh.C, w.stopper, minute) } func (w *Worker) RefreshFeeds() { w.reflock.Lock() defer w.reflock.Unlock() if *w.pending > 0 { log.Print("Refreshing already in progress") return } feeds := w.db.ListFeeds() if len(feeds) == 0 { log.Print("Nothing to refresh") return } log.Print("Refreshing feeds") atomic.StoreInt32(w.pending, int32(len(feeds))) go w.refresher(feeds) } func (w *Worker) refresher(feeds []storage.Feed) { // w.db.ResetFeedErrors() srcqueue := make(chan storage.Feed, len(feeds)) dstqueue := make(chan []storage.Item) for range NUM_WORKERS { go w.worker(srcqueue, dstqueue) } for _, feed := range feeds { srcqueue <- feed } for range feeds { items := <-dstqueue if len(items) > 0 { w.db.CreateItems(items) } atomic.AddInt32(w.pending, -1) } close(srcqueue) close(dstqueue) log.Printf("Finished refreshing %d feeds", len(feeds)) } func (w *Worker) worker(srcqueue <-chan storage.Feed, dstqueue chan<- []storage.Item) { for feed := range srcqueue { empty := "" w.db.UpdateFeedState(feed.Id, storage.UpdateFeedStateParams{LastError: &empty}) items, err := listItems(feed, w.db) if err != nil { errMsg := err.Error() w.db.UpdateFeedState(feed.Id, storage.UpdateFeedStateParams{LastError: &errMsg}) } if len(items) > 0 && !feed.HasIcon { w.FindFeedFavicon(feed) } dstqueue <- items } }