diff --git a/src/server/handlers.go b/src/server/routes.go similarity index 96% rename from src/server/handlers.go rename to src/server/routes.go index 7f798ed..163d9b6 100644 --- a/src/server/handlers.go +++ b/src/server/routes.go @@ -7,6 +7,7 @@ import ( "github.com/nkanaev/yarr/src/router" "github.com/nkanaev/yarr/src/storage" "github.com/nkanaev/yarr/src/opml" + "github.com/nkanaev/yarr/src/worker" "io/ioutil" "log" "math" @@ -67,7 +68,7 @@ func (s *Server) handleStatic(c *router.Context) { func (s *Server) handleStatus(c *router.Context) { c.JSON(http.StatusOK, map[string]interface{}{ - "running": *s.queueSize, + "running": s.worker.FeedsPending(), "stats": s.db.FeedStats(), }) } @@ -122,7 +123,7 @@ func (s *Server) handleFolder(c *router.Context) { func (s *Server) handleFeedRefresh(c *router.Context) { if c.Req.Method == "POST" { - s.fetchAllFeeds() + s.worker.FetchAllFeeds() c.Out.WriteHeader(http.StatusOK) } else { c.Out.WriteHeader(http.StatusMethodNotAllowed) @@ -161,7 +162,7 @@ func (s *Server) handleFeedList(c *router.Context) { return } - feed, sources, err := discoverFeed(form.Url) + feed, sources, err := worker.DiscoverFeed(form.Url) if err != nil { log.Print(err) c.JSON(http.StatusOK, map[string]string{"status": "notfound"}) @@ -176,9 +177,9 @@ func (s *Server) handleFeedList(c *router.Context) { feed.FeedLink, form.FolderID, ) - s.db.CreateItems(convertItems(feed.Items, *storedFeed)) + s.db.CreateItems(worker.ConvertItems(feed.Items, *storedFeed)) - icon, err := findFavicon(storedFeed.Link, storedFeed.FeedLink) + icon, err := worker.FindFavicon(storedFeed.Link, storedFeed.FeedLink) if icon != nil { s.db.UpdateFeedIcon(storedFeed.Id, icon) } @@ -316,7 +317,7 @@ func (s *Server) handleSettings(c *router.Context) { } if s.db.UpdateSettings(settings) { if _, ok := settings["refresh_rate"]; ok { - s.refreshRate <- s.db.GetSettingsValueInt64("refresh_rate") + s.worker.SetRefreshRate(s.db.GetSettingsValueInt64("refresh_rate")) } c.Out.WriteHeader(http.StatusOK) } else { @@ -347,7 +348,7 @@ func (s *Server) handleOPMLImport(c *router.Context) { } } } - s.fetchAllFeeds() + s.worker.FetchAllFeeds() c.Out.WriteHeader(http.StatusOK) } else { c.Out.WriteHeader(http.StatusMethodNotAllowed) diff --git a/src/server/server.go b/src/server/server.go index 674f5cf..1fed691 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -3,11 +3,9 @@ package server import ( "log" "net/http" - "runtime" - "sync/atomic" - "time" "github.com/nkanaev/yarr/src/storage" + "github.com/nkanaev/yarr/src/worker" ) var BasePath string = "" @@ -15,9 +13,7 @@ var BasePath string = "" type Server struct { Addr string db *storage.Storage - feedQueue chan storage.Feed - queueSize *int32 - refreshRate chan int64 + worker *worker.Worker // auth Username string Password string @@ -27,13 +23,10 @@ type Server struct { } func NewServer(db *storage.Storage, addr string) *Server { - queueSize := int32(0) return &Server{ - db: db, - feedQueue: make(chan storage.Feed, 3000), - queueSize: &queueSize, - Addr: addr, - refreshRate: make(chan int64), + db: db, + Addr: addr, + worker: worker.NewWorker(db), } } @@ -46,7 +39,7 @@ func (h *Server) GetAddr() string { } func (s *Server) Start() { - s.startJobs() + s.worker.Start() httpserver := &http.Server{Addr: s.Addr, Handler: s.handler()} @@ -102,103 +95,6 @@ func (h Server) ServeHTTP(rw http.ResponseWriter, req *http.Request) { } */ -func (h *Server) startJobs() { - delTicker := time.NewTicker(time.Hour * 24) - - syncSearchChannel := make(chan bool, 10) - var syncSearchTimer *time.Timer // TODO: should this be atomic? - - syncSearch := func() { - if syncSearchTimer == nil { - syncSearchTimer = time.AfterFunc(time.Second*2, func() { - syncSearchChannel <- true - }) - } else { - syncSearchTimer.Reset(time.Second * 2) - } - } - - worker := func() { - for { - select { - case feed := <-h.feedQueue: - items, err := listItems(feed, h.db) - atomic.AddInt32(h.queueSize, -1) - if err != nil { - log.Printf("Failed to fetch %s (%d): %s", feed.FeedLink, feed.Id, err) - h.db.SetFeedError(feed.Id, err) - continue - } - h.db.CreateItems(items) - syncSearch() - if !feed.HasIcon { - icon, err := findFavicon(feed.Link, feed.FeedLink) - if icon != nil { - h.db.UpdateFeedIcon(feed.Id, icon) - } - if err != nil { - log.Printf("Failed to search favicon for %s (%s): %s", feed.Link, feed.FeedLink, err) - } - } - case <-delTicker.C: - h.db.DeleteOldItems() - case <-syncSearchChannel: - h.db.SyncSearch() - } - } - } - - num := runtime.NumCPU() - 1 - if num < 1 { - num = 1 - } - for i := 0; i < num; i++ { - go worker() - } - go h.db.DeleteOldItems() - go h.db.SyncSearch() - - go func() { - var refreshTicker *time.Ticker - refreshTick := make(<-chan time.Time) - for { - select { - case <-refreshTick: - h.fetchAllFeeds() - case val := <-h.refreshRate: - if refreshTicker != nil { - refreshTicker.Stop() - if val == 0 { - refreshTick = make(<-chan time.Time) - } - } - if val > 0 { - refreshTicker = time.NewTicker(time.Duration(val) * time.Minute) - refreshTick = refreshTicker.C - } - } - } - }() - refreshRate := h.db.GetSettingsValueInt64("refresh_rate") - h.refreshRate <- refreshRate - if refreshRate > 0 { - h.fetchAllFeeds() - } -} - func (h Server) requiresAuth() bool { return h.Username != "" && h.Password != "" } - -func (h *Server) fetchAllFeeds() { - log.Print("Refreshing all feeds") - h.db.ResetFeedErrors() - for _, feed := range h.db.ListFeeds() { - h.fetchFeed(feed) - } -} - -func (h *Server) fetchFeed(feed storage.Feed) { - atomic.AddInt32(h.queueSize, 1) - h.feedQueue <- feed -} diff --git a/src/server/crawler.go b/src/worker/crawler.go similarity index 96% rename from src/server/crawler.go rename to src/worker/crawler.go index 946f798..a5fb2a2 100644 --- a/src/server/crawler.go +++ b/src/worker/crawler.go @@ -1,4 +1,4 @@ -package server +package worker import ( "bytes" @@ -106,7 +106,7 @@ func searchFeedLinks(html []byte, siteurl string) ([]FeedSource, error) { return sources, nil } -func discoverFeed(candidateUrl string) (*gofeed.Feed, *[]FeedSource, error) { +func DiscoverFeed(candidateUrl string) (*gofeed.Feed, *[]FeedSource, error) { // Query URL res, err := defaultClient.get(candidateUrl) if err != nil { @@ -153,12 +153,12 @@ func discoverFeed(candidateUrl string) (*gofeed.Feed, *[]FeedSource, error) { if sources[0].Url == candidateUrl { return nil, nil, errors.New("Recursion!") } - return discoverFeed(sources[0].Url) + return DiscoverFeed(sources[0].Url) } return nil, &sources, nil } -func findFavicon(websiteUrl, feedUrl string) (*[]byte, error) { +func FindFavicon(websiteUrl, feedUrl string) (*[]byte, error) { candidateUrls := make([]string, 0) favicon := func(link string) string { @@ -227,7 +227,7 @@ func findFavicon(websiteUrl, feedUrl string) (*[]byte, error) { return nil, nil } -func convertItems(items []*gofeed.Item, feed storage.Feed) []storage.Item { +func ConvertItems(items []*gofeed.Item, feed storage.Feed) []storage.Item { result := make([]storage.Item, len(items)) for i, item := range items { imageURL := "" @@ -300,7 +300,7 @@ func listItems(f storage.Feed, db *storage.Storage) ([]storage.Item, error) { if err != nil { return nil, err } - return convertItems(feed.Items, f), nil + return ConvertItems(feed.Items, f), nil } func init() { diff --git a/src/worker/worker.go b/src/worker/worker.go new file mode 100644 index 0000000..5c030d8 --- /dev/null +++ b/src/worker/worker.go @@ -0,0 +1,132 @@ +package worker + +import ( + "github.com/nkanaev/yarr/src/storage" + "log" + "sync/atomic" + "runtime" + "time" +) + +type Worker struct { + db *storage.Storage + + feedQueue chan storage.Feed + queueSize *int32 + refreshRate chan int64 +} + +func NewWorker(db *storage.Storage) *Worker { + queueSize := int32(0) + return &Worker{ + db: db, + feedQueue: make(chan storage.Feed, 3000), + queueSize: &queueSize, + refreshRate: make(chan int64), + } +} + +func (w *Worker) Start() { + delTicker := time.NewTicker(time.Hour * 24) + + syncSearchChannel := make(chan bool, 10) + var syncSearchTimer *time.Timer // TODO: should this be atomic? + + syncSearch := func() { + if syncSearchTimer == nil { + syncSearchTimer = time.AfterFunc(time.Second*2, func() { + syncSearchChannel <- true + }) + } else { + syncSearchTimer.Reset(time.Second * 2) + } + } + + worker := func() { + for { + select { + case feed := <-w.feedQueue: + items, err := listItems(feed, w.db) + atomic.AddInt32(w.queueSize, -1) + if err != nil { + log.Printf("Failed to fetch %s (%d): %s", feed.FeedLink, feed.Id, err) + w.db.SetFeedError(feed.Id, err) + continue + } + w.db.CreateItems(items) + syncSearch() + if !feed.HasIcon { + icon, err := FindFavicon(feed.Link, feed.FeedLink) + if icon != nil { + w.db.UpdateFeedIcon(feed.Id, icon) + } + if err != nil { + log.Printf("Failed to search favicon for %s (%s): %s", feed.Link, feed.FeedLink, err) + } + } + case <-delTicker.C: + w.db.DeleteOldItems() + case <-syncSearchChannel: + w.db.SyncSearch() + } + } + } + + num := runtime.NumCPU() - 1 + if num < 1 { + num = 1 + } + for i := 0; i < num; i++ { + go worker() + } + go w.db.DeleteOldItems() + go w.db.SyncSearch() + + go func() { + var refreshTicker *time.Ticker + refreshTick := make(<-chan time.Time) + for { + select { + case <-refreshTick: + w.FetchAllFeeds() + case val := <-w.refreshRate: + if refreshTicker != nil { + refreshTicker.Stop() + if val == 0 { + refreshTick = make(<-chan time.Time) + } + } + if val > 0 { + refreshTicker = time.NewTicker(time.Duration(val) * time.Minute) + refreshTick = refreshTicker.C + } + } + } + }() + refreshRate := w.db.GetSettingsValueInt64("refresh_rate") + w.refreshRate <- refreshRate + if refreshRate > 0 { + w.FetchAllFeeds() + } +} + +func (w *Worker) FetchAllFeeds() { + log.Print("Refreshing all feeds") + w.db.ResetFeedErrors() + for _, feed := range w.db.ListFeeds() { + w.fetchFeed(feed) + } +} + +func (w *Worker) fetchFeed(feed storage.Feed) { + atomic.AddInt32(w.queueSize, 1) + w.feedQueue <- feed +} + +func (w *Worker) FeedsPending() int32 { + return *w.queueSize +} + +func (w *Worker) SetRefreshRate(val int64) { + w.refreshRate <- val +}