extract worker & crawler from server

This commit is contained in:
Nazar Kanaev 2021-03-17 10:36:18 +00:00
parent 682b0c1729
commit dfb32d4ebe
4 changed files with 152 additions and 123 deletions

View File

@ -7,6 +7,7 @@ import (
"github.com/nkanaev/yarr/src/router" "github.com/nkanaev/yarr/src/router"
"github.com/nkanaev/yarr/src/storage" "github.com/nkanaev/yarr/src/storage"
"github.com/nkanaev/yarr/src/opml" "github.com/nkanaev/yarr/src/opml"
"github.com/nkanaev/yarr/src/worker"
"io/ioutil" "io/ioutil"
"log" "log"
"math" "math"
@ -67,7 +68,7 @@ func (s *Server) handleStatic(c *router.Context) {
func (s *Server) handleStatus(c *router.Context) { func (s *Server) handleStatus(c *router.Context) {
c.JSON(http.StatusOK, map[string]interface{}{ c.JSON(http.StatusOK, map[string]interface{}{
"running": *s.queueSize, "running": s.worker.FeedsPending(),
"stats": s.db.FeedStats(), "stats": s.db.FeedStats(),
}) })
} }
@ -122,7 +123,7 @@ func (s *Server) handleFolder(c *router.Context) {
func (s *Server) handleFeedRefresh(c *router.Context) { func (s *Server) handleFeedRefresh(c *router.Context) {
if c.Req.Method == "POST" { if c.Req.Method == "POST" {
s.fetchAllFeeds() s.worker.FetchAllFeeds()
c.Out.WriteHeader(http.StatusOK) c.Out.WriteHeader(http.StatusOK)
} else { } else {
c.Out.WriteHeader(http.StatusMethodNotAllowed) c.Out.WriteHeader(http.StatusMethodNotAllowed)
@ -161,7 +162,7 @@ func (s *Server) handleFeedList(c *router.Context) {
return return
} }
feed, sources, err := discoverFeed(form.Url) feed, sources, err := worker.DiscoverFeed(form.Url)
if err != nil { if err != nil {
log.Print(err) log.Print(err)
c.JSON(http.StatusOK, map[string]string{"status": "notfound"}) c.JSON(http.StatusOK, map[string]string{"status": "notfound"})
@ -176,9 +177,9 @@ func (s *Server) handleFeedList(c *router.Context) {
feed.FeedLink, feed.FeedLink,
form.FolderID, form.FolderID,
) )
s.db.CreateItems(convertItems(feed.Items, *storedFeed)) s.db.CreateItems(worker.ConvertItems(feed.Items, *storedFeed))
icon, err := findFavicon(storedFeed.Link, storedFeed.FeedLink) icon, err := worker.FindFavicon(storedFeed.Link, storedFeed.FeedLink)
if icon != nil { if icon != nil {
s.db.UpdateFeedIcon(storedFeed.Id, icon) s.db.UpdateFeedIcon(storedFeed.Id, icon)
} }
@ -316,7 +317,7 @@ func (s *Server) handleSettings(c *router.Context) {
} }
if s.db.UpdateSettings(settings) { if s.db.UpdateSettings(settings) {
if _, ok := settings["refresh_rate"]; ok { if _, ok := settings["refresh_rate"]; ok {
s.refreshRate <- s.db.GetSettingsValueInt64("refresh_rate") s.worker.SetRefreshRate(s.db.GetSettingsValueInt64("refresh_rate"))
} }
c.Out.WriteHeader(http.StatusOK) c.Out.WriteHeader(http.StatusOK)
} else { } else {
@ -347,7 +348,7 @@ func (s *Server) handleOPMLImport(c *router.Context) {
} }
} }
} }
s.fetchAllFeeds() s.worker.FetchAllFeeds()
c.Out.WriteHeader(http.StatusOK) c.Out.WriteHeader(http.StatusOK)
} else { } else {
c.Out.WriteHeader(http.StatusMethodNotAllowed) c.Out.WriteHeader(http.StatusMethodNotAllowed)

View File

@ -3,11 +3,9 @@ package server
import ( import (
"log" "log"
"net/http" "net/http"
"runtime"
"sync/atomic"
"time"
"github.com/nkanaev/yarr/src/storage" "github.com/nkanaev/yarr/src/storage"
"github.com/nkanaev/yarr/src/worker"
) )
var BasePath string = "" var BasePath string = ""
@ -15,9 +13,7 @@ var BasePath string = ""
type Server struct { type Server struct {
Addr string Addr string
db *storage.Storage db *storage.Storage
feedQueue chan storage.Feed worker *worker.Worker
queueSize *int32
refreshRate chan int64
// auth // auth
Username string Username string
Password string Password string
@ -27,13 +23,10 @@ type Server struct {
} }
func NewServer(db *storage.Storage, addr string) *Server { func NewServer(db *storage.Storage, addr string) *Server {
queueSize := int32(0)
return &Server{ return &Server{
db: db, db: db,
feedQueue: make(chan storage.Feed, 3000), Addr: addr,
queueSize: &queueSize, worker: worker.NewWorker(db),
Addr: addr,
refreshRate: make(chan int64),
} }
} }
@ -46,7 +39,7 @@ func (h *Server) GetAddr() string {
} }
func (s *Server) Start() { func (s *Server) Start() {
s.startJobs() s.worker.Start()
httpserver := &http.Server{Addr: s.Addr, Handler: s.handler()} httpserver := &http.Server{Addr: s.Addr, Handler: s.handler()}
@ -102,103 +95,6 @@ func (h Server) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
} }
*/ */
func (h *Server) startJobs() {
delTicker := time.NewTicker(time.Hour * 24)
syncSearchChannel := make(chan bool, 10)
var syncSearchTimer *time.Timer // TODO: should this be atomic?
syncSearch := func() {
if syncSearchTimer == nil {
syncSearchTimer = time.AfterFunc(time.Second*2, func() {
syncSearchChannel <- true
})
} else {
syncSearchTimer.Reset(time.Second * 2)
}
}
worker := func() {
for {
select {
case feed := <-h.feedQueue:
items, err := listItems(feed, h.db)
atomic.AddInt32(h.queueSize, -1)
if err != nil {
log.Printf("Failed to fetch %s (%d): %s", feed.FeedLink, feed.Id, err)
h.db.SetFeedError(feed.Id, err)
continue
}
h.db.CreateItems(items)
syncSearch()
if !feed.HasIcon {
icon, err := findFavicon(feed.Link, feed.FeedLink)
if icon != nil {
h.db.UpdateFeedIcon(feed.Id, icon)
}
if err != nil {
log.Printf("Failed to search favicon for %s (%s): %s", feed.Link, feed.FeedLink, err)
}
}
case <-delTicker.C:
h.db.DeleteOldItems()
case <-syncSearchChannel:
h.db.SyncSearch()
}
}
}
num := runtime.NumCPU() - 1
if num < 1 {
num = 1
}
for i := 0; i < num; i++ {
go worker()
}
go h.db.DeleteOldItems()
go h.db.SyncSearch()
go func() {
var refreshTicker *time.Ticker
refreshTick := make(<-chan time.Time)
for {
select {
case <-refreshTick:
h.fetchAllFeeds()
case val := <-h.refreshRate:
if refreshTicker != nil {
refreshTicker.Stop()
if val == 0 {
refreshTick = make(<-chan time.Time)
}
}
if val > 0 {
refreshTicker = time.NewTicker(time.Duration(val) * time.Minute)
refreshTick = refreshTicker.C
}
}
}
}()
refreshRate := h.db.GetSettingsValueInt64("refresh_rate")
h.refreshRate <- refreshRate
if refreshRate > 0 {
h.fetchAllFeeds()
}
}
func (h Server) requiresAuth() bool { func (h Server) requiresAuth() bool {
return h.Username != "" && h.Password != "" return h.Username != "" && h.Password != ""
} }
func (h *Server) fetchAllFeeds() {
log.Print("Refreshing all feeds")
h.db.ResetFeedErrors()
for _, feed := range h.db.ListFeeds() {
h.fetchFeed(feed)
}
}
func (h *Server) fetchFeed(feed storage.Feed) {
atomic.AddInt32(h.queueSize, 1)
h.feedQueue <- feed
}

View File

@ -1,4 +1,4 @@
package server package worker
import ( import (
"bytes" "bytes"
@ -106,7 +106,7 @@ func searchFeedLinks(html []byte, siteurl string) ([]FeedSource, error) {
return sources, nil return sources, nil
} }
func discoverFeed(candidateUrl string) (*gofeed.Feed, *[]FeedSource, error) { func DiscoverFeed(candidateUrl string) (*gofeed.Feed, *[]FeedSource, error) {
// Query URL // Query URL
res, err := defaultClient.get(candidateUrl) res, err := defaultClient.get(candidateUrl)
if err != nil { if err != nil {
@ -153,12 +153,12 @@ func discoverFeed(candidateUrl string) (*gofeed.Feed, *[]FeedSource, error) {
if sources[0].Url == candidateUrl { if sources[0].Url == candidateUrl {
return nil, nil, errors.New("Recursion!") return nil, nil, errors.New("Recursion!")
} }
return discoverFeed(sources[0].Url) return DiscoverFeed(sources[0].Url)
} }
return nil, &sources, nil return nil, &sources, nil
} }
func findFavicon(websiteUrl, feedUrl string) (*[]byte, error) { func FindFavicon(websiteUrl, feedUrl string) (*[]byte, error) {
candidateUrls := make([]string, 0) candidateUrls := make([]string, 0)
favicon := func(link string) string { favicon := func(link string) string {
@ -227,7 +227,7 @@ func findFavicon(websiteUrl, feedUrl string) (*[]byte, error) {
return nil, nil return nil, nil
} }
func convertItems(items []*gofeed.Item, feed storage.Feed) []storage.Item { func ConvertItems(items []*gofeed.Item, feed storage.Feed) []storage.Item {
result := make([]storage.Item, len(items)) result := make([]storage.Item, len(items))
for i, item := range items { for i, item := range items {
imageURL := "" imageURL := ""
@ -300,7 +300,7 @@ func listItems(f storage.Feed, db *storage.Storage) ([]storage.Item, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return convertItems(feed.Items, f), nil return ConvertItems(feed.Items, f), nil
} }
func init() { func init() {

132
src/worker/worker.go Normal file
View File

@ -0,0 +1,132 @@
package worker
import (
"github.com/nkanaev/yarr/src/storage"
"log"
"sync/atomic"
"runtime"
"time"
)
type Worker struct {
db *storage.Storage
feedQueue chan storage.Feed
queueSize *int32
refreshRate chan int64
}
func NewWorker(db *storage.Storage) *Worker {
queueSize := int32(0)
return &Worker{
db: db,
feedQueue: make(chan storage.Feed, 3000),
queueSize: &queueSize,
refreshRate: make(chan int64),
}
}
func (w *Worker) Start() {
delTicker := time.NewTicker(time.Hour * 24)
syncSearchChannel := make(chan bool, 10)
var syncSearchTimer *time.Timer // TODO: should this be atomic?
syncSearch := func() {
if syncSearchTimer == nil {
syncSearchTimer = time.AfterFunc(time.Second*2, func() {
syncSearchChannel <- true
})
} else {
syncSearchTimer.Reset(time.Second * 2)
}
}
worker := func() {
for {
select {
case feed := <-w.feedQueue:
items, err := listItems(feed, w.db)
atomic.AddInt32(w.queueSize, -1)
if err != nil {
log.Printf("Failed to fetch %s (%d): %s", feed.FeedLink, feed.Id, err)
w.db.SetFeedError(feed.Id, err)
continue
}
w.db.CreateItems(items)
syncSearch()
if !feed.HasIcon {
icon, err := FindFavicon(feed.Link, feed.FeedLink)
if icon != nil {
w.db.UpdateFeedIcon(feed.Id, icon)
}
if err != nil {
log.Printf("Failed to search favicon for %s (%s): %s", feed.Link, feed.FeedLink, err)
}
}
case <-delTicker.C:
w.db.DeleteOldItems()
case <-syncSearchChannel:
w.db.SyncSearch()
}
}
}
num := runtime.NumCPU() - 1
if num < 1 {
num = 1
}
for i := 0; i < num; i++ {
go worker()
}
go w.db.DeleteOldItems()
go w.db.SyncSearch()
go func() {
var refreshTicker *time.Ticker
refreshTick := make(<-chan time.Time)
for {
select {
case <-refreshTick:
w.FetchAllFeeds()
case val := <-w.refreshRate:
if refreshTicker != nil {
refreshTicker.Stop()
if val == 0 {
refreshTick = make(<-chan time.Time)
}
}
if val > 0 {
refreshTicker = time.NewTicker(time.Duration(val) * time.Minute)
refreshTick = refreshTicker.C
}
}
}
}()
refreshRate := w.db.GetSettingsValueInt64("refresh_rate")
w.refreshRate <- refreshRate
if refreshRate > 0 {
w.FetchAllFeeds()
}
}
func (w *Worker) FetchAllFeeds() {
log.Print("Refreshing all feeds")
w.db.ResetFeedErrors()
for _, feed := range w.db.ListFeeds() {
w.fetchFeed(feed)
}
}
func (w *Worker) fetchFeed(feed storage.Feed) {
atomic.AddInt32(w.queueSize, 1)
w.feedQueue <- feed
}
func (w *Worker) FeedsPending() int32 {
return *w.queueSize
}
func (w *Worker) SetRefreshRate(val int64) {
w.refreshRate <- val
}