rewrite background jobs

This commit is contained in:
Nazar Kanaev 2020-07-30 20:17:05 +01:00
parent 2392a72856
commit fb0833618f
3 changed files with 34 additions and 27 deletions

View File

@ -62,7 +62,7 @@ func StaticHandler(rw http.ResponseWriter, req *http.Request) {
func StatusHandler(rw http.ResponseWriter, req *http.Request) { func StatusHandler(rw http.ResponseWriter, req *http.Request) {
writeJSON(rw, map[string]interface{}{ writeJSON(rw, map[string]interface{}{
"running": handler(req).fetchRunning, "running": handler(req).queueSize > 0,
"stats": db(req).FeedStats(), "stats": db(req).FeedStats(),
}) })
} }

View File

@ -5,23 +5,24 @@ import (
"github.com/nkanaev/yarr/storage" "github.com/nkanaev/yarr/storage"
"log" "log"
"net/http" "net/http"
"runtime"
"sync/atomic"
"time"
) )
type Handler struct { type Handler struct {
db *storage.Storage db *storage.Storage
log *log.Logger log *log.Logger
fetchRunning bool
feedQueue chan storage.Feed feedQueue chan storage.Feed
counter chan int queueSize int32
queueSize int
} }
func New(db *storage.Storage, logger *log.Logger) *Handler { func New(db *storage.Storage, logger *log.Logger) *Handler {
return &Handler{ return &Handler{
db: db, db: db,
log: logger, log: logger,
feedQueue: make(chan storage.Feed), feedQueue: make(chan storage.Feed, 1000),
counter: make(chan int), queueSize: 0,
} }
} }
@ -43,35 +44,41 @@ func (h Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
} }
func (h *Handler) startJobs() { func (h *Handler) startJobs() {
h.db.DeleteOldItems() delTicker := time.NewTicker(time.Hour * 24)
go func() { worker := func() {
for { for {
feed := <-h.feedQueue select {
items := listItems(feed) case feed := <-h.feedQueue:
h.db.CreateItems(items) items := listItems(feed)
h.db.CreateItems(items)
atomic.AddInt32(&h.queueSize, -1)
case <- delTicker.C:
h.db.DeleteOldItems()
}
} }
}() }
go func() {
for { num := runtime.NumCPU() - 1
val := <-h.counter if num < 1 {
h.queueSize += val num = 1
} }
}() for i := 0; i < num; i++ {
go worker()
}
go h.db.DeleteOldItems()
go h.db.SyncSearch() go h.db.SyncSearch()
h.fetchAllFeeds()
}
func (h *Handler) fetchFeed(feed storage.Feed) { // fetch all feeds
h.queueSize += 1
h.feedQueue <- feed
}
func (h *Handler) fetchAllFeeds() {
for _, feed := range h.db.ListFeeds() { for _, feed := range h.db.ListFeeds() {
h.fetchFeed(feed) h.fetchFeed(feed)
} }
} }
func (h *Handler) fetchFeed(feed storage.Feed) {
atomic.AddInt32(&h.queueSize, 1)
h.feedQueue <- feed
}
func Vars(req *http.Request) map[string]string { func Vars(req *http.Request) map[string]string {
if rv := req.Context().Value(ctxVars); rv != nil { if rv := req.Context().Value(ctxVars); rv != nil {
return rv.(map[string]string) return rv.(map[string]string)

View File

@ -196,7 +196,7 @@
<div class="mt-4" v-if="feedNewChoice.length"> <div class="mt-4" v-if="feedNewChoice.length">
<p class="mb-2"> <p class="mb-2">
Multiple feeds found. Choose one below: Multiple feeds found. Choose one below:
<a href="#" class="float-right" @click.prevent="resetFeedChoice()">cancel</a> <a href="#" class="float-right text-decoration-none" @click.prevent="resetFeedChoice()">cancel</a>
</p> </p>
<label class="selectgroup" v-for="choice in feedNewChoice"> <label class="selectgroup" v-for="choice in feedNewChoice">
<input type="radio" name="feedToAdd" :value="choice.url" v-model="feedNewChoiceSelected"> <input type="radio" name="feedToAdd" :value="choice.url" v-model="feedNewChoiceSelected">