mirror of
https://github.com/nkanaev/yarr.git
synced 2026-06-09 18:03:19 +00:00
refactor feedstate + swap implementation
This commit is contained in:
@@ -56,17 +56,14 @@ type FeverFavicon struct {
|
||||
func writeFeverJSON(c *router.Context, data map[string]any, lastRefreshed int64) {
|
||||
data["api_version"] = 3
|
||||
data["auth"] = 1
|
||||
// TODO: remove duplicates
|
||||
data["last_refreshed_on_time"] = lastRefreshed
|
||||
c.JSON(http.StatusOK, data)
|
||||
}
|
||||
|
||||
func getLastRefreshedOnTime(httpStates map[int64]storage.HTTPState) int64 {
|
||||
if len(httpStates) == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func getLastRefreshedOnTime(feedStates []storage.FeedState) int64 {
|
||||
var lastRefreshed int64
|
||||
for _, state := range httpStates {
|
||||
for _, state := range feedStates {
|
||||
if state.LastRefreshed.Unix() > lastRefreshed {
|
||||
lastRefreshed = state.LastRefreshed.Unix()
|
||||
}
|
||||
@@ -123,10 +120,11 @@ func (s *Server) handleFever(c *router.Context) {
|
||||
case formHasValue(c.Req.Form, "mark"):
|
||||
s.feverMarkHandler(c)
|
||||
default:
|
||||
states, _ := s.db.ListFeedStates()
|
||||
c.JSON(http.StatusOK, map[string]any{
|
||||
"api_version": 3,
|
||||
"auth": 1,
|
||||
"last_refreshed_on_time": getLastRefreshedOnTime(s.db.ListHTTPStates()),
|
||||
"last_refreshed_on_time": getLastRefreshedOnTime(states),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -168,20 +166,25 @@ func (s *Server) feverGroupsHandler(c *router.Context) {
|
||||
for i, folder := range folders {
|
||||
groups[i] = &FeverGroup{ID: folder.Id, Title: folder.Title}
|
||||
}
|
||||
states, _ := s.db.ListFeedStates()
|
||||
writeFeverJSON(c, map[string]any{
|
||||
"groups": groups,
|
||||
"feeds_groups": feedGroups(s.db),
|
||||
}, getLastRefreshedOnTime(s.db.ListHTTPStates()))
|
||||
}, getLastRefreshedOnTime(states))
|
||||
}
|
||||
|
||||
func (s *Server) feverFeedsHandler(c *router.Context) {
|
||||
feeds := s.db.ListFeeds()
|
||||
httpStates := s.db.ListHTTPStates()
|
||||
states, _ := s.db.ListFeedStates()
|
||||
statesMap := make(map[int64]storage.FeedState)
|
||||
for _, state := range states {
|
||||
statesMap[state.FeedID] = state
|
||||
}
|
||||
|
||||
feverFeeds := make([]*FeverFeed, len(feeds))
|
||||
for i, feed := range feeds {
|
||||
var lastUpdated int64
|
||||
if state, ok := httpStates[feed.Id]; ok {
|
||||
if state, ok := statesMap[feed.Id]; ok {
|
||||
lastUpdated = state.LastRefreshed.Unix()
|
||||
}
|
||||
feverFeeds[i] = &FeverFeed{
|
||||
@@ -197,7 +200,7 @@ func (s *Server) feverFeedsHandler(c *router.Context) {
|
||||
writeFeverJSON(c, map[string]any{
|
||||
"feeds": feverFeeds,
|
||||
"feeds_groups": feedGroups(s.db),
|
||||
}, getLastRefreshedOnTime(httpStates))
|
||||
}, getLastRefreshedOnTime(states))
|
||||
}
|
||||
|
||||
func (s *Server) feverFaviconsHandler(c *router.Context) {
|
||||
@@ -216,9 +219,10 @@ func (s *Server) feverFaviconsHandler(c *router.Context) {
|
||||
favicons[i] = &FeverFavicon{ID: feed.Id, Data: data}
|
||||
}
|
||||
|
||||
states, _ := s.db.ListFeedStates()
|
||||
writeFeverJSON(c, map[string]any{
|
||||
"favicons": favicons,
|
||||
}, getLastRefreshedOnTime(s.db.ListHTTPStates()))
|
||||
}, getLastRefreshedOnTime(states))
|
||||
}
|
||||
|
||||
// for memory pressure reasons, we only return a limited number of items
|
||||
@@ -280,16 +284,18 @@ func (s *Server) feverItemsHandler(c *router.Context) {
|
||||
|
||||
totalItems := s.db.CountItems()
|
||||
|
||||
states, _ := s.db.ListFeedStates()
|
||||
writeFeverJSON(c, map[string]any{
|
||||
"items": feverItems,
|
||||
"total_items": totalItems,
|
||||
}, getLastRefreshedOnTime(s.db.ListHTTPStates()))
|
||||
}, getLastRefreshedOnTime(states))
|
||||
}
|
||||
|
||||
func (s *Server) feverLinksHandler(c *router.Context) {
|
||||
states, _ := s.db.ListFeedStates()
|
||||
writeFeverJSON(c, map[string]any{
|
||||
"links": make([]any, 0),
|
||||
}, getLastRefreshedOnTime(s.db.ListHTTPStates()))
|
||||
}, getLastRefreshedOnTime(states))
|
||||
}
|
||||
|
||||
func (s *Server) feverUnreadItemIDsHandler(c *router.Context) {
|
||||
@@ -309,9 +315,10 @@ func (s *Server) feverUnreadItemIDsHandler(c *router.Context) {
|
||||
}
|
||||
itemFilter.After = &items[len(items)-1].Id
|
||||
}
|
||||
states, _ := s.db.ListFeedStates()
|
||||
writeFeverJSON(c, map[string]any{
|
||||
"unread_item_ids": joinInts(itemIds),
|
||||
}, getLastRefreshedOnTime(s.db.ListHTTPStates()))
|
||||
}, getLastRefreshedOnTime(states))
|
||||
}
|
||||
|
||||
func (s *Server) feverSavedItemIDsHandler(c *router.Context) {
|
||||
@@ -331,9 +338,10 @@ func (s *Server) feverSavedItemIDsHandler(c *router.Context) {
|
||||
}
|
||||
itemFilter.After = &items[len(items)-1].Id
|
||||
}
|
||||
states, _ := s.db.ListFeedStates()
|
||||
writeFeverJSON(c, map[string]any{
|
||||
"saved_item_ids": joinInts(itemIds),
|
||||
}, getLastRefreshedOnTime(s.db.ListHTTPStates()))
|
||||
}, getLastRefreshedOnTime(states))
|
||||
}
|
||||
|
||||
func (s *Server) feverMarkHandler(c *router.Context) {
|
||||
|
||||
@@ -162,7 +162,15 @@ func (s *Server) handleFeedRefresh(c *router.Context) {
|
||||
}
|
||||
|
||||
func (s *Server) handleFeedErrors(c *router.Context) {
|
||||
errors := s.db.GetFeedErrors()
|
||||
errors := make(map[int64]string)
|
||||
states, err := s.db.ListFeedStates()
|
||||
if err == nil {
|
||||
for _, state := range states {
|
||||
if state.LastError != "" {
|
||||
errors[state.FeedID] = state.LastError
|
||||
}
|
||||
}
|
||||
}
|
||||
c.JSON(http.StatusOK, errors)
|
||||
}
|
||||
|
||||
|
||||
@@ -185,42 +185,3 @@ func (s *Storage) GetFeed(id int64) *Feed {
|
||||
}
|
||||
return &f
|
||||
}
|
||||
|
||||
func (s *Storage) ResetFeedErrors() {
|
||||
if _, err := s.db.Exec(`delete from feed_errors`); err != nil {
|
||||
log.Print(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Storage) SetFeedError(feedID int64, lastError error) {
|
||||
_, err := s.db.Exec(`
|
||||
insert into feed_errors (feed_id, error)
|
||||
values (:feed_id, :error)
|
||||
on conflict (feed_id) do update set error = excluded.error`,
|
||||
sql.Named("feed_id", feedID),
|
||||
sql.Named("error", lastError.Error()),
|
||||
)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Storage) GetFeedErrors() map[int64]string {
|
||||
errors := make(map[int64]string)
|
||||
|
||||
rows, err := s.db.Query(`select feed_id, error from feed_errors`)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
return errors
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
var id int64
|
||||
var error string
|
||||
if err = rows.Scan(&id, &error); err != nil {
|
||||
log.Print(err)
|
||||
}
|
||||
errors[id] = error
|
||||
}
|
||||
return errors
|
||||
}
|
||||
|
||||
@@ -8,14 +8,19 @@ import (
|
||||
type FeedState struct {
|
||||
FeedID int64
|
||||
LastRefreshed time.Time
|
||||
LastError *string
|
||||
LastError string
|
||||
HTTPLastModified string
|
||||
HTTPEtag string
|
||||
}
|
||||
|
||||
func (s *Storage) ListFeedStates() ([]FeedState, error) {
|
||||
rows, err := s.db.Query(`
|
||||
select feed_id, last_refreshed, last_modified, etag, last_error
|
||||
select
|
||||
feed_id
|
||||
, last_refreshed
|
||||
, last_error
|
||||
, http_lmod
|
||||
, http_etag
|
||||
from feed_states
|
||||
`)
|
||||
if err != nil {
|
||||
@@ -29,9 +34,9 @@ func (s *Storage) ListFeedStates() ([]FeedState, error) {
|
||||
err := rows.Scan(
|
||||
&state.FeedID,
|
||||
&state.LastRefreshed,
|
||||
&state.LastError,
|
||||
&state.HTTPLastModified,
|
||||
&state.HTTPEtag,
|
||||
&state.LastError,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -44,14 +49,19 @@ func (s *Storage) ListFeedStates() ([]FeedState, error) {
|
||||
func (s *Storage) GetFeedState(feedID int64) (*FeedState, error) {
|
||||
var state FeedState
|
||||
err := s.db.QueryRow(`
|
||||
select feed_id, last_refreshed, last_modified, etag, last_error
|
||||
select
|
||||
feed_id
|
||||
, last_refreshed
|
||||
, last_error
|
||||
, http_lmod
|
||||
, http_etag
|
||||
from feed_states where feed_id = :id
|
||||
`, sql.Named("id", feedID)).Scan(
|
||||
&state.FeedID,
|
||||
&state.LastRefreshed,
|
||||
&state.LastError,
|
||||
&state.HTTPLastModified,
|
||||
&state.HTTPEtag,
|
||||
&state.LastError,
|
||||
)
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, nil
|
||||
@@ -79,28 +89,28 @@ func (s *Storage) UpdateFeedState(feedID int64, params UpdateFeedStateParams) (b
|
||||
insert into feed_states (
|
||||
feed_id
|
||||
, last_refreshed
|
||||
, last_modified
|
||||
, etag
|
||||
, last_error
|
||||
, http_lmod
|
||||
, http_etag
|
||||
)
|
||||
values (
|
||||
:id
|
||||
, coalesce(:refreshed, 0)
|
||||
, coalesce(:last_modified, '')
|
||||
, coalesce(:etag, '')
|
||||
, coalesce(:last_refreshed, 0)
|
||||
, coalesce(:last_error, '')
|
||||
, coalesce(:http_lmod, '')
|
||||
, coalesce(:http_etag, '')
|
||||
)
|
||||
on conflict (feed_id) do update set
|
||||
last_refreshed = coalesce(:refreshed, last_refreshed),
|
||||
last_modified = coalesce(:last_modified, last_modified),
|
||||
etag = coalesce(:etag, etag),
|
||||
last_error = coalesce(:last_error, last_error)
|
||||
last_refreshed = coalesce(:last_refreshed, last_refreshed),
|
||||
last_error = coalesce(:last_error, last_modified),
|
||||
http_lmod = coalesce(:http_lmod, http_lmod),
|
||||
http_etag = coalesce(:http_etag, http_etag)
|
||||
`,
|
||||
sql.Named("id", feedID),
|
||||
sql.Named("refreshed", params.LastRefreshed),
|
||||
sql.Named("last_modified", params.HTTPLastModified),
|
||||
sql.Named("etag", params.HTTPEtag),
|
||||
sql.Named("last_refreshed", params.LastRefreshed),
|
||||
sql.Named("last_error", params.LastError),
|
||||
sql.Named("http_lmod", params.HTTPLastModified),
|
||||
sql.Named("http_etag", params.HTTPEtag),
|
||||
)
|
||||
if err != nil {
|
||||
return false, err
|
||||
|
||||
@@ -39,7 +39,7 @@ func TestUpdateFeedState_Full(t *testing.T) {
|
||||
if !state.LastRefreshed.Equal(now) {
|
||||
t.Errorf("expected %v, got %v", now, state.LastRefreshed)
|
||||
}
|
||||
if state.LastError == nil || *state.LastError != errMsg {
|
||||
if state.LastError != errMsg {
|
||||
t.Errorf("expected %s, got %v", errMsg, state.LastError)
|
||||
}
|
||||
if state.HTTPLastModified != lmod {
|
||||
@@ -70,7 +70,7 @@ func TestUpdateFeedState_Partial(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if state.LastError == nil || *state.LastError != newErr {
|
||||
if state.LastError != newErr {
|
||||
t.Errorf("expected %s, got %v", newErr, state.LastError)
|
||||
}
|
||||
if state.HTTPEtag != etag {
|
||||
@@ -98,8 +98,8 @@ func TestUpdateFeedState_ClearError(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if state.LastError == nil || *state.LastError != "" {
|
||||
t.Errorf("expected empty string error, got %v", state.LastError)
|
||||
if state.LastError != "" {
|
||||
t.Errorf("expected empty error string, got %v", state.LastError)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -111,9 +111,8 @@ func TestListFeedStates(t *testing.T) {
|
||||
f2 := s.CreateFeed(CreateFeedParams{Title: "F2", FeedLink: "L2"})
|
||||
|
||||
errMsg := "fail"
|
||||
etag := "e"
|
||||
s.UpdateFeedState(f1.Id, UpdateFeedStateParams{LastError: &errMsg})
|
||||
s.UpdateFeedState(f2.Id, UpdateFeedStateParams{HTTPEtag: &etag})
|
||||
s.UpdateFeedState(f2.Id, UpdateFeedStateParams{HTTPEtag: ptr("e")})
|
||||
|
||||
states, err := s.ListFeedStates()
|
||||
if err != nil {
|
||||
@@ -124,3 +123,7 @@ func TestListFeedStates(t *testing.T) {
|
||||
t.Errorf("expected 2 states, got %d", len(states))
|
||||
}
|
||||
}
|
||||
|
||||
func ptr[T any](v T) *T {
|
||||
return &v
|
||||
}
|
||||
|
||||
@@ -1,73 +0,0 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
type HTTPState struct {
|
||||
FeedID int64
|
||||
LastRefreshed time.Time
|
||||
|
||||
LastModified string
|
||||
Etag string
|
||||
}
|
||||
|
||||
func (s *Storage) ListHTTPStates() map[int64]HTTPState {
|
||||
result := make(map[int64]HTTPState)
|
||||
rows, err := s.db.Query(`select feed_id, last_refreshed, last_modified, etag from http_states`)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
return result
|
||||
}
|
||||
for rows.Next() {
|
||||
var state HTTPState
|
||||
err = rows.Scan(
|
||||
&state.FeedID,
|
||||
&state.LastRefreshed,
|
||||
&state.LastModified,
|
||||
&state.Etag,
|
||||
)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
return result
|
||||
}
|
||||
result[state.FeedID] = state
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (s *Storage) GetHTTPState(feedID int64) *HTTPState {
|
||||
row := s.db.QueryRow(`
|
||||
select feed_id, last_refreshed, last_modified, etag
|
||||
from http_states where feed_id = :feed_id
|
||||
`, sql.Named("feed_id", feedID))
|
||||
|
||||
if row == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var state HTTPState
|
||||
row.Scan(
|
||||
&state.FeedID,
|
||||
&state.LastRefreshed,
|
||||
&state.LastModified,
|
||||
&state.Etag,
|
||||
)
|
||||
return &state
|
||||
}
|
||||
|
||||
func (s *Storage) SetHTTPState(feedID int64, lastModified, etag string) {
|
||||
_, err := s.db.Exec(`
|
||||
insert into http_states (feed_id, last_modified, etag, last_refreshed)
|
||||
values (:feed_id, :last_modified, :etag, datetime())
|
||||
on conflict (feed_id) do update set last_modified = :last_modified, etag = :etag, last_refreshed = datetime()`,
|
||||
sql.Named("feed_id", feedID),
|
||||
sql.Named("last_modified", lastModified),
|
||||
sql.Named("etag", etag),
|
||||
)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
}
|
||||
}
|
||||
@@ -350,20 +350,27 @@ func m12_remove_feed_sizes(tx *sql.Tx) error {
|
||||
func m13_consolidate_feed_states(tx *sql.Tx) error {
|
||||
sql := `
|
||||
create table feed_states (
|
||||
feed_id references feeds(id) on delete cascade unique,
|
||||
last_refreshed datetime not null default 0,
|
||||
last_modified string not null default '',
|
||||
etag string not null default '',
|
||||
last_error string not null default ''
|
||||
feed_id references feeds(id) on delete cascade unique
|
||||
, last_refreshed datetime not null default 0
|
||||
, last_error string not null default ''
|
||||
|
||||
, http_lmod string not null default ''
|
||||
, http_etag string not null default ''
|
||||
);
|
||||
|
||||
insert into feed_states (feed_id, last_refreshed, last_modified, etag, last_error)
|
||||
insert into feed_states (
|
||||
feed_id
|
||||
, last_refreshed
|
||||
, last_error
|
||||
, http_lmod
|
||||
, http_etag
|
||||
)
|
||||
select
|
||||
f.id,
|
||||
coalesce(h.last_refreshed, 0),
|
||||
coalesce(e.error, '')
|
||||
coalesce(h.last_modified, ''),
|
||||
coalesce(h.etag, ''),
|
||||
coalesce(e.error, '')
|
||||
from feeds f
|
||||
left join http_states h on f.id = h.feed_id
|
||||
left join feed_errors e on f.id = e.feed_id
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/nkanaev/yarr/src/content/scraper"
|
||||
"github.com/nkanaev/yarr/src/parser"
|
||||
@@ -162,9 +163,9 @@ func ConvertItems(items []parser.Item, feed storage.Feed) []storage.Item {
|
||||
func listItems(f storage.Feed, db *storage.Storage) ([]storage.Item, error) {
|
||||
lmod := ""
|
||||
etag := ""
|
||||
if state := db.GetHTTPState(f.Id); state != nil {
|
||||
lmod = state.LastModified
|
||||
etag = state.Etag
|
||||
if state, _ := db.GetFeedState(f.Id); state != nil {
|
||||
lmod = state.HTTPLastModified
|
||||
etag = state.HTTPEtag
|
||||
}
|
||||
|
||||
res, err := client.getConditional(f.FeedLink, lmod, etag)
|
||||
@@ -190,8 +191,13 @@ func listItems(f storage.Feed, db *storage.Storage) ([]storage.Item, error) {
|
||||
|
||||
lmod = res.Header.Get("Last-Modified")
|
||||
etag = res.Header.Get("Etag")
|
||||
now := time.Now().UTC()
|
||||
if lmod != "" || etag != "" {
|
||||
db.SetHTTPState(f.Id, lmod, etag)
|
||||
db.UpdateFeedState(f.Id, storage.UpdateFeedStateParams{
|
||||
HTTPLastModified: &lmod,
|
||||
HTTPEtag: &etag,
|
||||
LastRefreshed: &now,
|
||||
})
|
||||
}
|
||||
return ConvertItems(feed.Items, f), nil
|
||||
}
|
||||
|
||||
@@ -108,7 +108,7 @@ func (w *Worker) RefreshFeeds() {
|
||||
}
|
||||
|
||||
func (w *Worker) refresher(feeds []storage.Feed) {
|
||||
w.db.ResetFeedErrors()
|
||||
// w.db.ResetFeedErrors()
|
||||
|
||||
srcqueue := make(chan storage.Feed, len(feeds))
|
||||
dstqueue := make(chan []storage.Item)
|
||||
@@ -136,9 +136,13 @@ func (w *Worker) refresher(feeds []storage.Feed) {
|
||||
|
||||
func (w *Worker) worker(srcqueue <-chan storage.Feed, dstqueue chan<- []storage.Item) {
|
||||
for feed := range srcqueue {
|
||||
empty := ""
|
||||
w.db.UpdateFeedState(feed.Id, storage.UpdateFeedStateParams{LastError: &empty})
|
||||
|
||||
items, err := listItems(feed, w.db)
|
||||
if err != nil {
|
||||
w.db.SetFeedError(feed.Id, err)
|
||||
errMsg := err.Error()
|
||||
w.db.UpdateFeedState(feed.Id, storage.UpdateFeedStateParams{LastError: &errMsg})
|
||||
}
|
||||
dstqueue <- items
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user