rewrite worker

This commit is contained in:
Nazar Kanaev 2021-03-24 12:07:15 +00:00
parent 1f393faf79
commit a2bfd1682b
3 changed files with 121 additions and 117 deletions

View File

@ -116,7 +116,7 @@ func (s *Server) handleFolder(c *router.Context) {
func (s *Server) handleFeedRefresh(c *router.Context) {
if c.Req.Method == "POST" {
s.worker.FetchAllFeeds()
s.worker.RefreshFeeds()
c.Out.WriteHeader(http.StatusOK)
} else {
c.Out.WriteHeader(http.StatusMethodNotAllowed)
@ -342,7 +342,7 @@ func (s *Server) handleOPMLImport(c *router.Context) {
}
}
s.worker.FetchAllFeeds()
s.worker.RefreshFeeds()
c.Out.WriteHeader(http.StatusOK)
} else {
c.Out.WriteHeader(http.StatusMethodNotAllowed)

View File

@ -39,7 +39,12 @@ func (h *Server) GetAddr() string {
}
func (s *Server) Start() {
s.worker.Start()
refreshRate := s.db.GetSettingsValueInt64("refresh_rate")
s.worker.StartFeedCleaner()
s.worker.SetRefreshRate(refreshRate)
if refreshRate > 0 {
s.worker.RefreshFeeds()
}
httpserver := &http.Server{Addr: s.Addr, Handler: s.handler()}

View File

@ -1,132 +1,131 @@
package worker
import (
"github.com/nkanaev/yarr/src/storage"
"log"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/nkanaev/yarr/src/storage"
)
type Worker struct {
db *storage.Storage
feedQueue chan storage.Feed
queueSize *int32
refreshRate chan int64
pending *int32
refresh *time.Ticker
reflock sync.Mutex
stopper chan bool
}
func NewWorker(db *storage.Storage) *Worker {
queueSize := int32(0)
return &Worker{
db: db,
feedQueue: make(chan storage.Feed, 3000),
queueSize: &queueSize,
refreshRate: make(chan int64),
}
pending := int32(0)
return &Worker{db: db, pending: &pending}
}
func (w *Worker) Start() {
delTicker := time.NewTicker(time.Hour * 24)
syncSearchChannel := make(chan bool, 10)
var syncSearchTimer *time.Timer // TODO: should this be atomic?
syncSearch := func() {
if syncSearchTimer == nil {
syncSearchTimer = time.AfterFunc(time.Second*2, func() {
syncSearchChannel <- true
})
} else {
syncSearchTimer.Reset(time.Second * 2)
}
func (w *Worker) FeedsPending() int32 {
return *w.pending
}
worker := func() {
func (w *Worker) StartFeedCleaner() {
go w.db.DeleteOldItems()
ticker := time.NewTicker(time.Hour * 24)
go func() {
for {
<-ticker.C
w.db.DeleteOldItems()
}
}()
}
func (w *Worker) SetRefreshRate(minute int64) {
if w.stopper != nil {
w.refresh.Stop()
w.refresh = nil
w.stopper <- true
w.stopper = nil
}
if minute == 0 {
return
}
w.stopper = make(chan bool)
w.refresh = time.NewTicker(time.Minute * time.Duration(minute))
go func(fire <-chan time.Time, stop <-chan bool, m int64) {
log.Printf("auto-refresh %dm: starting", m)
for {
select {
case feed := <-w.feedQueue:
items, err := listItems(feed, w.db)
atomic.AddInt32(w.queueSize, -1)
if err != nil {
log.Printf("Failed to fetch %s (%d): %s", feed.FeedLink, feed.Id, err)
w.db.SetFeedError(feed.Id, err)
continue
case <-fire:
log.Printf("auto-refresh %dm: firing", m)
w.RefreshFeeds()
case <-stop:
log.Printf("auto-refresh %dm: stopping", m)
return
}
w.db.CreateItems(items)
syncSearch()
}
}(w.refresh.C, w.stopper, minute)
}
func (w *Worker) RefreshFeeds() {
log.Print("Refreshing feeds")
go w.refresher()
}
func (w *Worker) refresher() {
w.reflock.Lock()
w.db.ResetFeedErrors()
feeds := w.db.ListFeeds()
if len(feeds) == 0 {
return
}
atomic.StoreInt32(w.pending, int32(len(feeds)))
srcqueue := make(chan storage.Feed, len(feeds))
dstqueue := make(chan []storage.Item)
// hardcoded to 4 workers ;)
go w.worker(srcqueue, dstqueue)
go w.worker(srcqueue, dstqueue)
go w.worker(srcqueue, dstqueue)
go w.worker(srcqueue, dstqueue)
for _, feed := range feeds {
srcqueue <- feed
}
for i := 0; i < len(feeds); i++ {
w.db.CreateItems(<-dstqueue)
atomic.AddInt32(w.pending, -1)
}
close(srcqueue)
close(dstqueue)
w.db.SyncSearch()
log.Printf("Finished refreshing %d feeds", len(feeds))
w.reflock.Unlock()
}
func (w *Worker) worker(srcqueue <-chan storage.Feed, dstqueue chan<- []storage.Item) {
for feed := range srcqueue {
items, err := listItems(feed, w.db)
if err != nil {
w.db.SetFeedError(feed.Id, err)
}
dstqueue <- items
// TODO: move somewhere else
if !feed.HasIcon {
icon, err := FindFavicon(feed.Link, feed.FeedLink)
if icon != nil {
w.db.UpdateFeedIcon(feed.Id, icon)
}
if err != nil {
log.Printf("Failed to search favicon for %s (%s): %s", feed.Link, feed.FeedLink, err)
}
}
case <-delTicker.C:
w.db.DeleteOldItems()
case <-syncSearchChannel:
w.db.SyncSearch()
log.Printf("Failed to find favicon for %s (%s): %s", feed.FeedLink, feed.Link, err)
}
}
}
num := runtime.NumCPU() - 1
if num < 1 {
num = 1
}
for i := 0; i < num; i++ {
go worker()
}
go w.db.DeleteOldItems()
go w.db.SyncSearch()
go func() {
var refreshTicker *time.Ticker
refreshTick := make(<-chan time.Time)
for {
select {
case <-refreshTick:
w.FetchAllFeeds()
case val := <-w.refreshRate:
if refreshTicker != nil {
refreshTicker.Stop()
if val == 0 {
refreshTick = make(<-chan time.Time)
}
}
if val > 0 {
refreshTicker = time.NewTicker(time.Duration(val) * time.Minute)
refreshTick = refreshTicker.C
}
}
}
}()
refreshRate := w.db.GetSettingsValueInt64("refresh_rate")
w.refreshRate <- refreshRate
if refreshRate > 0 {
w.FetchAllFeeds()
}
}
func (w *Worker) FetchAllFeeds() {
log.Print("Refreshing all feeds")
w.db.ResetFeedErrors()
for _, feed := range w.db.ListFeeds() {
w.fetchFeed(feed)
}
}
func (w *Worker) fetchFeed(feed storage.Feed) {
atomic.AddInt32(w.queueSize, 1)
w.feedQueue <- feed
}
func (w *Worker) FeedsPending() int32 {
return *w.queueSize
}
func (w *Worker) SetRefreshRate(val int64) {
w.refreshRate <- val
}