Compare commits

..

15 Commits

29 changed files with 640 additions and 161 deletions

View File

@ -4,7 +4,7 @@ help: ## display this help
@awk 'BEGIN{FS = ":.*##"; printf "\033[1m\nUsage\n \033[1;92m make\033[0;36m <target>\033[0m\n"} /^[a-zA-Z0-9_-]+:.*?##/ { printf " \033[36m%-15s\033[0m %s\n", $$1, $$2 } ' $(MAKEFILE_LIST) @awk 'BEGIN{FS = ":.*##"; printf "\033[1m\nUsage\n \033[1;92m make\033[0;36m <target>\033[0m\n"} /^[a-zA-Z0-9_-]+:.*?##/ { printf " \033[36m%-15s\033[0m %s\n", $$1, $$2 } ' $(MAKEFILE_LIST)
serve: ## Run "intake serve" with live reload serve: ## Run "intake serve" with live reload
@air -build.cmd "go build -o tmp/intake" -build.bin tmp/intake -build.args_bin serve,--data-dir,tmp @air -build.cmd "go build -o tmp/intake" -build.bin tmp/intake -build.args_bin serve,--data-dir,tmp -build.include_ext "go,html,css"
test-data: ## Recreate test data in tmp/ test-data: ## Recreate test data in tmp/
@test/test_items.sh @test/test_items.sh

110
README.md
View File

@ -4,65 +4,6 @@ Intake is an arbitrary feed aggregator that generalizes the concept of a feed.
Rather than being restricted to parsing items out of an RSS feed, Intake provides a middle layer of executing arbitrary commands that conform to a JSON-based specification. Rather than being restricted to parsing items out of an RSS feed, Intake provides a middle layer of executing arbitrary commands that conform to a JSON-based specification.
An Intake source can parse an RSS feed, but it can also scrape a website without a feed, provide additional logic to filter or annotate feed items, or integrate with an API. An Intake source can parse an RSS feed, but it can also scrape a website without a feed, provide additional logic to filter or annotate feed items, or integrate with an API.
## Development
Parity with existing Python version
* [x] create sources
* [ ] rename sources
* fetch sources
* [x] create and delete items
* [x] update existing items
* [ ] support item TTL and TTD
* [x] on_create triggers
* [ ] on_delete triggers
* [x] dry-run
* item actions
* [x] create
* [x] edit
* [ ] rename
* [x] delete
* [x] execute
* [x] require items to declare action support
* [x] state files
* [x] source environment
* [ ] working directory set
* [x] update web UI credentials
* [ ] automatic crontab integration
* [ ] feed supports item TTS
* [x] data directory from envvars
* [ ] source-level tt{s,d,l}
* [ ] source batching
* channels
* [x] create
* [x] edit
* [ ] rename
* [x] delete
* feeds
* [x] show items
* [x] deactivate items
* [x] mass deactivate
* [ ] punt
* [x] trigger actions
* [x] add ad-hoc items
* [x] show/hide deactivated items
* [ ] show/hide tts items
* [x] sort by time ?? created
* [ ] paging
* [ ] NixOS module
* [ ] NixOS module demo
Additional features
* [ ] metric reporting
* [ ] on action failure, create an error item with logs
* [ ] first-party password handling instead of basic auth and htpasswd
* [ ] items gracefully add new fields and `action` keys
* [ ] arbitrary date punt
* [ ] HTTP edit item
* [ ] sort crontab entries
* [ ] TUI feed view
## Overview ## Overview
In Intake, a _source_ represents a single content feed of discrete _items_, such as a blog and its posts or a website and its pages. In Intake, a _source_ represents a single content feed of discrete _items_, such as a blog and its posts or a website and its pages.
@ -94,16 +35,25 @@ Any unspecified field is equivalent to the empty string, object, or 0, depending
| `body` | Optional | Body text of the item as raw HTML. This will be displayed in the item without further processing! Consider your sources' threat models against injection attacks. | `body` | Optional | Body text of the item as raw HTML. This will be displayed in the item without further processing! Consider your sources' threat models against injection attacks.
| `link` | Optional | A hyperlink associated with the item. | `link` | Optional | A hyperlink associated with the item.
| `time` | Optional | A Unix timestamp associated with the item, not necessarily when the item was created. Items sort by `time` when it is defined and fall back to `created`. Displayed in the item footer. | `time` | Optional | A Unix timestamp associated with the item, not necessarily when the item was created. Items sort by `time` when it is defined and fall back to `created`. Displayed in the item footer.
| `ttl` | Optional | The time-to-live of the item. An item with `ttl` defined is not deleted by feed updates as long as `created + ttl` is in the future, even if it is inactive.
| `ttd` | Optional | The time-to-die of the item. An item with `ttd` defined is deleted by feed updates if `created + ttd` is in the past, even if it is active.
| `tts` | Optional | The time-to-show of the item. An item with `tts` defined is hidden from feeds before the time `created + tts`.
| `action` | Optional | A JSON object with keys for all supported actions. No schema is imposed on the values. | `action` | Optional | A JSON object with keys for all supported actions. No schema is imposed on the values.
Existing items are updated with new values when a fetch or action produces them, with some exceptions: Existing items are updated with new values when a fetch or action produces them, with some exceptions:
* Automatic fields cannot be changed. * Automatic fields cannot be changed.
* If a field's previous value is non-empty and the new value is empty, the old value is kept. * Source-level settings for `ttl`, `ttd`, or `tts` override the item's values.
* Fields cannot be updated from a non-empty value to an empty value.
If a field's previous value is non-empty and the new value is empty, the old value is kept.
### Sources ### Sources
A source is identified by its name. A minimally functional source requires a `fetch` action that returns items. A source is identified by its name.
A minimally functional source requires a `fetch` action that returns items.
TTL, TTD, and TTS can be configured at the source level by setting the environment variables `INTAKE_TTL`, `INTAKE_TTS`, or `INTAKE_TTS` to an integer value.
These values override any `ttl`, `ttd`, or `tts` value returned by a fetch or action.
### Action API ### Action API
@ -136,6 +86,38 @@ The item does not need to declare support for `on_create`.
This action is not accessible through the web interface, so if you need to retry the action, you should create another action with the same command as `on_create`. This action is not accessible through the web interface, so if you need to retry the action, you should create another action with the same command as `on_create`.
If an item's `on_create` fails, the item is still created, but without any changes made by action. If an item's `on_create` fails, the item is still created, but without any changes made by action.
The special action `on_delete` is like `on_create`, except it runs right before an item is deleted. ### Web interface
It does not require explicit support and is not accessible in the web interface.
The output of `on_delete` is ignored; it is primarily for causing side effects like managing state. The `intake serve` command runs an HTTP server that gives access to the feed.
While the CLI can rely on normal filesystem access control to secure the database, this does not apply to HTTP.
Instead, the web interface can be locked behind a password set via `intake passwd`.
## Development
Parity features
* [x] web feed supports item TTS
* [ ] item punt
* [ ] web feed paging
* [ ] web fetch
* [ ] set a working directory for item actions
* [ ] crontab integration
* [ ] source batching
* [ ] add item from web
* [ ] Nix build
* [ ] NixOS module
* [ ] NixOS vm demo
* [ ] Nix flake templates
Future features
* [ ] on_delete triggers
* [ ] manual item edits, CLI
* [ ] manual item edits, web
* [ ] source-level TTS
* [ ] metric reporting
* [ ] on action failure, create an error item with logs
* [ ] items gracefully add new fields and `action` keys
* [ ] arbitrary date punt
* [ ] sort crontab entries
* [ ] TUI feed view

View File

@ -109,7 +109,12 @@ func actionExecute(
log.Fatalf("error: failed to get action: %v", err) log.Fatalf("error: failed to get action: %v", err)
} }
newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute) postProcess, err := core.GetSourcePostProcessor(db, source)
if err != nil {
log.Fatalf("error: failed to get source post-processor: %v", err)
}
newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute, postProcess)
if err != nil { if err != nil {
log.Fatalf("error executing %s: %v", action, err) log.Fatalf("error executing %s: %v", action, err)
} }

View File

@ -27,6 +27,9 @@ if it doesn't exist, with a random id.`,
stringArg(cmd, "body"), stringArg(cmd, "body"),
stringArg(cmd, "link"), stringArg(cmd, "link"),
intArg(cmd, "time"), intArg(cmd, "time"),
intArg(cmd, "ttl"),
intArg(cmd, "ttd"),
intArg(cmd, "tts"),
stringArg(cmd, "action"), stringArg(cmd, "action"),
) )
}, },
@ -42,6 +45,9 @@ func init() {
itemAddCmd.Flags().StringP("body", "b", "", "Item body") itemAddCmd.Flags().StringP("body", "b", "", "Item body")
itemAddCmd.Flags().StringP("link", "l", "", "Item link") itemAddCmd.Flags().StringP("link", "l", "", "Item link")
itemAddCmd.Flags().IntP("time", "m", 0, "Item time as a Unix timestamp") itemAddCmd.Flags().IntP("time", "m", 0, "Item time as a Unix timestamp")
itemAddCmd.Flags().Int("ttl", 0, "Item time-to-live in seconds, relative to item creation date")
itemAddCmd.Flags().Int("ttd", 0, "Item time-to-die in seconds, relative to item creation date")
itemAddCmd.Flags().Int("tts", 0, "Item time-to-show in seconds, relative to item creation date")
itemAddCmd.Flags().StringP("action", "x", "", "Item action data as JSON") itemAddCmd.Flags().StringP("action", "x", "", "Item action data as JSON")
} }
@ -53,6 +59,9 @@ func itemAdd(
body string, body string,
link string, link string,
time int, time int,
ttl int,
ttd int,
tts int,
actions string, actions string,
) { ) {
// Default to "default" source // Default to "default" source
@ -85,6 +94,9 @@ func itemAdd(
Body: body, Body: body,
Link: link, Link: link,
Time: time, Time: time,
Ttl: ttl,
Ttd: ttd,
Tts: tts,
Action: itemActions, Action: itemActions,
}}); err != nil { }}); err != nil {
log.Fatalf("error: failed to add item: %s", err) log.Fatalf("error: failed to add item: %s", err)

View File

@ -29,7 +29,10 @@ func init() {
func migrate(listOnly bool) { func migrate(listOnly bool) {
db := openDb() db := openDb()
core.InitDatabase(db) err := core.InitDatabase(db)
if err != nil {
log.Fatalf("error: failed to init database: %v", err)
}
if listOnly { if listOnly {
pending, err := core.GetPendingMigrations(db) pending, err := core.GetPendingMigrations(db)
if err != nil { if err != nil {
@ -43,6 +46,9 @@ func migrate(listOnly bool) {
} }
} }
} else { } else {
core.MigrateDatabase(db) err = core.MigrateDatabase(db)
if err != nil {
log.Fatalf("error: failed to migrate database: %v", err)
}
} }
} }

View File

@ -58,7 +58,12 @@ func sourceFetch(source string, format string, dryRun bool) {
log.Fatalf("error: failed to get fetch action: %v", err) log.Fatalf("error: failed to get fetch action: %v", err)
} }
items, newState, err := core.Execute(source, argv, envs, state, "", time.Minute) postProcess, err := core.GetSourcePostProcessor(db, source)
if err != nil {
log.Fatalf("error: failed to get source post-processor: %v", err)
}
items, newState, err := core.Execute(source, argv, envs, state, "", time.Minute, postProcess)
if err != nil { if err != nil {
log.Fatalf("error: failed to execute fetch: %v", err) log.Fatalf("error: failed to execute fetch: %v", err)
} }
@ -74,7 +79,7 @@ func sourceFetch(source string, format string, dryRun bool) {
return return
} }
added, deleted, err := core.UpdateWithFetchedItems(db, source, newState, items) added, deleted, err := core.UpdateWithFetchedItems(db, source, newState, items, time.Now())
if err != nil { if err != nil {
log.Fatalf("error: failed to update: %v", err) log.Fatalf("error: failed to update: %v", err)
} }

View File

@ -14,6 +14,9 @@ var sourceTestCmd = &cobra.Command{
Short: "Test a fetch action", Short: "Test a fetch action",
Long: fmt.Sprintf(`Execute a command as if it were a feed source's fetch action. Long: fmt.Sprintf(`Execute a command as if it were a feed source's fetch action.
Source-level configuration that is normally set via environment variable,
such as INTAKE_TTL, will not be applied by --env.
%s`, makeFormatHelpText()), %s`, makeFormatHelpText()),
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
sourceTest(stringArrayArg(cmd, "env"), stringArg(cmd, "format"), getArgv(cmd, args)) sourceTest(stringArrayArg(cmd, "env"), stringArg(cmd, "format"), getArgv(cmd, args))
@ -30,7 +33,7 @@ func init() {
func sourceTest(env []string, format string, cmd []string) { func sourceTest(env []string, format string, cmd []string) {
formatter := formatAs(format) formatter := formatAs(format)
items, state, err := core.Execute("test", cmd, env, nil, "", time.Minute) items, state, err := core.Execute("test", cmd, env, nil, "", time.Minute, nil)
log.Printf("returned %d items", len(items)) log.Printf("returned %d items", len(items))
log.Printf("wrote %d bytes of state", len(state)) log.Printf("wrote %d bytes of state", len(state))
if err != nil { if err != nil {

View File

@ -45,8 +45,8 @@ func TestChannel(t *testing.T) {
// Items on both sources appear in the channel // Items on both sources appear in the channel
if err := AddItems(db, []Item{ if err := AddItems(db, []Item{
{"one", "a", 0, true, "", "", "", "", 0, nil}, {Source: "one", Id: "a"},
{"two", "b", 0, true, "", "", "", "", 0, nil}, {Source: "two", Id: "b"},
}); err != nil { }); err != nil {
t.Fatalf("failed to add items to one: %v", err) t.Fatalf("failed to add items to one: %v", err)
} }

View File

@ -93,8 +93,8 @@ func TestDeleteSourceCascade(t *testing.T) {
t.Fatalf("failed to add source2: %v", err) t.Fatalf("failed to add source2: %v", err)
} }
if err := AddItems(db, []Item{ if err := AddItems(db, []Item{
{"source1", "item1", 0, true, "", "", "", "", 0, nil}, {Source: "source1", Id: "item1"},
{"source2", "item2", 0, true, "", "", "", "", 0, nil}, {Source: "source2", Id: "item2"},
}); err != nil { }); err != nil {
t.Fatalf("failed to add items: %v", err) t.Fatalf("failed to add items: %v", err)
} }
@ -118,7 +118,7 @@ func TestDeleteSourceCascade(t *testing.T) {
t.Fatalf("Expected only 1 item after source delete, got %d", len(items)) t.Fatalf("Expected only 1 item after source delete, got %d", len(items))
} }
err = AddItems(db, []Item{{"source1", "item3", 0, true, "", "", "", "", 0, nil}}) err = AddItems(db, []Item{{Source: "source1", Id: "item3"}})
if err == nil { if err == nil {
t.Fatal("Unexpected success adding item for nonexistent source") t.Fatal("Unexpected success adding item for nonexistent source")
} }

View File

@ -14,7 +14,13 @@ import (
"time" "time"
) )
func readStdout(stdout io.ReadCloser, source string, items chan Item, cparse chan bool) { func readStdout(
stdout io.ReadCloser,
source string,
postProcess func(item Item) Item,
items chan Item,
cparse chan bool,
) {
var item Item var item Item
parseError := false parseError := false
scanout := bufio.NewScanner(stdout) scanout := bufio.NewScanner(stdout)
@ -25,6 +31,9 @@ func readStdout(stdout io.ReadCloser, source string, items chan Item, cparse cha
log.Printf("[%s: stdout] %s\n", source, strings.TrimSpace(string(data))) log.Printf("[%s: stdout] %s\n", source, strings.TrimSpace(string(data)))
parseError = true parseError = true
} else { } else {
if postProcess != nil {
item = postProcess(item)
}
item.Active = true // These fields aren't up to item.Active = true // These fields aren't up to
item.Created = 0 // the action to set and item.Created = 0 // the action to set and
item.Source = source // shouldn't be overrideable item.Source = source // shouldn't be overrideable
@ -58,6 +67,7 @@ func Execute(
state []byte, state []byte,
input string, input string,
timeout time.Duration, timeout time.Duration,
postProcess func(item Item) Item,
) ([]Item, []byte, error) { ) ([]Item, []byte, error) {
log.Printf("executing %v", argv) log.Printf("executing %v", argv)
@ -119,7 +129,7 @@ func Execute(
// Routines handling the process i/o // Routines handling the process i/o
go writeStdin(stdin, input) go writeStdin(stdin, input)
go readStdout(stdout, source, cout, cparse) go readStdout(stdout, source, postProcess, cout, cparse)
go readStderr(stderr, source, cerr) go readStderr(stderr, source, cerr)
// Kick off the command // Kick off the command
@ -165,13 +175,14 @@ func ExecuteItemAction(
env []string, env []string,
state []byte, state []byte,
timeout time.Duration, timeout time.Duration,
postProcess func(item Item) Item,
) (Item, []byte, error) { ) (Item, []byte, error) {
itemJson, err := json.Marshal(item) itemJson, err := json.Marshal(item)
if err != nil { if err != nil {
return Item{}, nil, fmt.Errorf("failed to serialize item: %v", err) return Item{}, nil, fmt.Errorf("failed to serialize item: %v", err)
} }
res, newState, err := Execute(item.Source, argv, env, state, string(itemJson), timeout) res, newState, err := Execute(item.Source, argv, env, state, string(itemJson), timeout, postProcess)
if err != nil { if err != nil {
return Item{}, nil, fmt.Errorf("failed to execute action for %s/%s: %v", item.Source, item.Id, err) return Item{}, nil, fmt.Errorf("failed to execute action for %s/%s: %v", item.Source, item.Id, err)
} }

View File

@ -7,22 +7,25 @@ import (
func TestExecute(t *testing.T) { func TestExecute(t *testing.T) {
assertLen := func(items []Item, length int) { assertLen := func(items []Item, length int) {
t.Helper()
if len(items) != length { if len(items) != length {
t.Fatalf("Expected %d items, got %d", length, len(items)) t.Fatalf("Expected %d items, got %d", length, len(items))
} }
} }
assertNil := func(err error) { assertNil := func(err error) {
t.Helper()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
assertNotNil := func(err error) { assertNotNil := func(err error) {
t.Helper()
if err == nil { if err == nil {
t.Fatal("expected err") t.Fatal("expected err")
} }
} }
execute := func(argv []string) ([]Item, error) { execute := func(argv []string) ([]Item, error) {
item, _, err := Execute("_", argv, nil, nil, "", time.Minute) item, _, err := Execute("_", argv, nil, nil, "", time.Minute, nil)
return item, err return item, err
} }
@ -40,7 +43,7 @@ func TestExecute(t *testing.T) {
assertLen(res, 0) assertLen(res, 0)
// Timeout // Timeout
res, _, err = Execute("_", []string{"sleep", "10"}, nil, nil, "", time.Millisecond) res, _, err = Execute("_", []string{"sleep", "10"}, nil, nil, "", time.Millisecond, nil)
assertNotNil(err) assertNotNil(err)
assertLen(res, 0) assertLen(res, 0)
@ -53,7 +56,7 @@ func TestExecute(t *testing.T) {
} }
// Read from stdin // Read from stdin
res, _, err = Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, nil, "bar", time.Minute) res, _, err = Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, nil, "bar", time.Minute, nil)
assertNil(err) assertNil(err)
assertLen(res, 1) assertLen(res, 1)
if res[0].Id != "bar" { if res[0].Id != "bar" {
@ -61,7 +64,7 @@ func TestExecute(t *testing.T) {
} }
// Set env // Set env
res, _, err = Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, nil, "", time.Minute) res, _, err = Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, nil, "", time.Minute, nil)
assertNil(err) assertNil(err)
assertLen(res, 1) assertLen(res, 1)
if res[0].Id != "baz" { if res[0].Id != "baz" {
@ -133,7 +136,7 @@ func TestExecute(t *testing.T) {
// Read state // Read state
argv := []string{"sh", "-c", `cat $STATE_PATH | jq -cR '{id: "greeting", title: .} | .title = "Hello " + .title'`} argv := []string{"sh", "-c", `cat $STATE_PATH | jq -cR '{id: "greeting", title: .} | .title = "Hello " + .title'`}
res, _, err = Execute("_", argv, nil, []byte("world"), "", time.Minute) res, _, err = Execute("_", argv, nil, []byte("world"), "", time.Minute, nil)
assertNil(err) assertNil(err)
assertLen(res, 1) assertLen(res, 1)
if res[0].Title != "Hello world" { if res[0].Title != "Hello world" {
@ -142,10 +145,22 @@ func TestExecute(t *testing.T) {
// Write state // Write state
argv = []string{"sh", "-c", `printf "Hello world" > $STATE_PATH; jq -cn '{id: "test"}'`} argv = []string{"sh", "-c", `printf "Hello world" > $STATE_PATH; jq -cn '{id: "test"}'`}
res, newState, err := Execute("_", argv, nil, nil, "", time.Minute) res, newState, err := Execute("_", argv, nil, nil, "", time.Minute, nil)
assertNil(err) assertNil(err)
assertLen(res, 1) assertLen(res, 1)
if string(newState) != "Hello world" { if string(newState) != "Hello world" {
t.Fatalf("expected 'Hello world' from write state, got %s", string(newState)) t.Fatalf("expected 'Hello world' from write state, got %s", string(newState))
} }
// Postprocessing function
argv = []string{"jq", "-cn", `{id: "foo"}`}
res, _, err = Execute("_", argv, nil, nil, "", time.Minute, func(item Item) Item {
item.Ttl = 123456
return item
})
assertNil(err)
assertLen(res, 1)
if res[0].Ttl != 123456 {
t.Fatalf("expected ttl to be set to 123456, got %d", res[0].Ttl)
}
} }

View File

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
"time"
) )
type Actions map[string]json.RawMessage type Actions map[string]json.RawMessage
@ -27,11 +28,37 @@ type Item struct {
Body string `json:"body"` Body string `json:"body"`
Link string `json:"link"` Link string `json:"link"`
Time int `json:"time"` Time int `json:"time"`
Ttl int `json:"ttl"`
Ttd int `json:"ttd"`
Tts int `json:"tts"`
Action Actions `json:"action"` Action Actions `json:"action"`
} }
func (item Item) TtlTime() time.Time {
return time.Unix(int64(item.Created)+int64(item.Ttl), 0)
}
func (item Item) TtdTime() time.Time {
return time.Unix(int64(item.Created)+int64(item.Ttd), 0)
}
func (item Item) TtsTime() time.Time {
return time.Unix(int64(item.Created)+int64(item.Tts), 0)
}
func (item Item) Visible() bool {
now := time.Now() // TODO pass this value in
return item.Active && now.After(item.TtsTime())
}
// Whether an item that no longer appears in a fetch can be deleted. // Whether an item that no longer appears in a fetch can be deleted.
func (item Item) Deletable() bool { func (item Item) Deletable(now time.Time) bool {
if item.Ttl != 0 && item.TtlTime().After(now) {
return false
}
if item.Ttd != 0 && item.TtdTime().Before(now) {
return true
}
return !item.Active return !item.Active
} }

View File

@ -3,6 +3,7 @@ package core
import ( import (
"encoding/json" "encoding/json"
"testing" "testing"
"time"
) )
func TestItemFormatsExist(t *testing.T) { func TestItemFormatsExist(t *testing.T) {
@ -49,3 +50,35 @@ func TestItemRoundTrip(t *testing.T) {
t.Fatalf("items are not equal, err %v", err) t.Fatalf("items are not equal, err %v", err)
} }
} }
func TestItemLifecycleTimes(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "_"); err != nil {
t.Fatal(err)
}
now := time.Now()
// active item with expired ttd is deleted
add, del, err := UpdateWithFetchedItems(db, "_", nil, []Item{{Source: "_", Id: "_", Ttd: 1}}, now)
if add != 1 || del != 0 || err != nil {
t.Fatalf("unexpected update result: %d %d %v", add, del, err)
}
add, del, err = UpdateWithFetchedItems(db, "_", nil, []Item{}, now.Add(time.Second*5))
if add != 0 || del != 1 || err != nil {
t.Fatalf("unexpected update result: %d %d %v", add, del, err)
}
// inactive item with live ttl is kept
add, del, err = UpdateWithFetchedItems(db, "_", nil, []Item{{Source: "_", Id: "_", Ttl: 60}}, now)
if add != 1 || del != 0 || err != nil {
t.Fatalf("unexpected update result: %d %d %v", add, del, err)
}
if _, err = DeactivateItem(db, "_", "_"); err != nil {
t.Fatal(err)
}
add, del, err = UpdateWithFetchedItems(db, "_", nil, []Item{}, now.Add(time.Second*5))
if add != 0 || del != 0 || err != nil {
t.Fatalf("unexpected update result: %d %d %v", add, del, err)
}
}

View File

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"time"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
) )
@ -12,8 +13,8 @@ import (
func AddItems(db DB, items []Item) error { func AddItems(db DB, items []Item) error {
return db.Transact(func(tx DB) error { return db.Transact(func(tx DB) error {
stmt, err := tx.Prepare(` stmt, err := tx.Prepare(`
insert into items (source, id, active, title, author, body, link, time, action) insert into items (source, id, active, title, author, body, link, time, ttl, ttd, tts, action)
values (?, ?, ?, ?, ?, ?, ?, ?, jsonb(?)) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, jsonb(?))
`) `)
if err != nil { if err != nil {
return fmt.Errorf("failed to prepare insert: %v", err) return fmt.Errorf("failed to prepare insert: %v", err)
@ -23,7 +24,9 @@ func AddItems(db DB, items []Item) error {
if err != nil { if err != nil {
return fmt.Errorf("failed to marshal actions for %s/%s: %v", item.Source, item.Id, err) return fmt.Errorf("failed to marshal actions for %s/%s: %v", item.Source, item.Id, err)
} }
_, err = stmt.Exec(item.Source, item.Id, true, item.Title, item.Author, item.Body, item.Link, item.Time, actions) _, err = stmt.Exec(
item.Source, item.Id, true, item.Title, item.Author, item.Body, item.Link, item.Time, item.Ttl, item.Ttd, item.Tts, actions,
)
if err != nil { if err != nil {
return fmt.Errorf("failed to insert %s/%s: %v", item.Source, item.Id, err) return fmt.Errorf("failed to insert %s/%s: %v", item.Source, item.Id, err)
} }
@ -54,6 +57,15 @@ func BackfillItem(new *Item, old *Item) {
if new.Title == "" { if new.Title == "" {
new.Title = old.Title new.Title = old.Title
} }
if new.Ttl == 0 {
new.Tts = old.Tts
}
if new.Ttd == 0 {
new.Ttd = old.Ttd
}
if new.Tts == 0 {
new.Tts = old.Tts
}
} }
func UpdateItems(db DB, items []Item) error { func UpdateItems(db DB, items []Item) error {
@ -66,6 +78,9 @@ func UpdateItems(db DB, items []Item) error {
body = ?, body = ?,
link = ?, link = ?,
time = ?, time = ?,
ttl = ?,
ttd = ?,
tts = ?,
action = jsonb(?) action = jsonb(?)
where source = ? where source = ?
and id = ? and id = ?
@ -78,7 +93,9 @@ func UpdateItems(db DB, items []Item) error {
if err != nil { if err != nil {
return fmt.Errorf("failed to marshal actions for %s/%s: %v", item.Source, item.Id, err) return fmt.Errorf("failed to marshal actions for %s/%s: %v", item.Source, item.Id, err)
} }
_, err = stmt.Exec(item.Title, item.Author, item.Body, item.Link, item.Time, actions, item.Source, item.Id) _, err = stmt.Exec(
item.Title, item.Author, item.Body, item.Link, item.Time, item.Ttl, item.Ttd, item.Tts, actions, item.Source, item.Id,
)
if err != nil { if err != nil {
return err return err
} }
@ -132,7 +149,21 @@ func getItems(db DB, query string, args ...any) ([]Item, error) {
var items []Item var items []Item
for rows.Next() { for rows.Next() {
var item Item var item Item
err = rows.Scan(&item.Source, &item.Id, &item.Created, &item.Active, &item.Title, &item.Author, &item.Body, &item.Link, &item.Time, &item.Action) err = rows.Scan(
&item.Source,
&item.Id,
&item.Created,
&item.Active,
&item.Title,
&item.Author,
&item.Body,
&item.Link,
&item.Time,
&item.Ttl,
&item.Ttd,
&item.Tts,
&item.Action,
)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -146,7 +177,7 @@ func getItems(db DB, query string, args ...any) ([]Item, error) {
func GetItem(db DB, source string, id string) (Item, error) { func GetItem(db DB, source string, id string) (Item, error) {
items, err := getItems(db, ` items, err := getItems(db, `
select source, id, created, active, title, author, body, link, time, json(action) select source, id, created, active, title, author, body, link, time, ttl, ttd, tts, json(action)
from items from items
where source = ? where source = ?
and id = ? and id = ?
@ -164,7 +195,7 @@ func GetItem(db DB, source string, id string) (Item, error) {
func GetAllActiveItems(db DB) ([]Item, error) { func GetAllActiveItems(db DB) ([]Item, error) {
return getItems(db, ` return getItems(db, `
select select
source, id, created, active, title, author, body, link, time, json(action) source, id, created, active, title, author, body, link, time, ttl, ttd, tts, json(action)
from items from items
where active <> 0 where active <> 0
order by case when time = 0 then created else time end, id order by case when time = 0 then created else time end, id
@ -174,28 +205,30 @@ func GetAllActiveItems(db DB) ([]Item, error) {
func GetAllItems(db DB) ([]Item, error) { func GetAllItems(db DB) ([]Item, error) {
return getItems(db, ` return getItems(db, `
select select
source, id, created, active, title, author, body, link, time, json(action) source, id, created, active, title, author, body, link, time, ttl, ttd, tts, json(action)
from items from items
order by case when time = 0 then created else time end, id order by case when time = 0 then created else time end, id
`) `)
} }
func GetActiveItemsForSource(db DB, source string) ([]Item, error) { func GetActiveItemsForSource(db DB, source string) ([]Item, error) {
now := int(time.Now().Unix()) // TODO pass this value in
return getItems(db, ` return getItems(db, `
select select
source, id, created, active, title, author, body, link, time, json(action) source, id, created, active, title, author, body, link, time, ttl, ttd, tts, json(action)
from items from items
where where
source = ? source = ?
and active <> 0 and active <> 0
and created + tts < ?
order by case when time = 0 then created else time end, id order by case when time = 0 then created else time end, id
`, source) `, source, now)
} }
func GetAllItemsForSource(db DB, source string) ([]Item, error) { func GetAllItemsForSource(db DB, source string) ([]Item, error) {
return getItems(db, ` return getItems(db, `
select select
source, id, created, active, title, author, body, link, time, json(action) source, id, created, active, title, author, body, link, time, ttl, ttd, tts, json(action)
from items from items
where where
source = ? source = ?
@ -204,22 +237,24 @@ func GetAllItemsForSource(db DB, source string) ([]Item, error) {
} }
func GetActiveItemsForChannel(db DB, channel string) ([]Item, error) { func GetActiveItemsForChannel(db DB, channel string) ([]Item, error) {
now := int(time.Now().Unix()) // TODO pass this value in
return getItems(db, ` return getItems(db, `
select select
i.source, i.id, i.created, i.active, i.title, i.author, i.body, i.link, i.time, json(i.action) i.source, i.id, i.created, i.active, i.title, i.author, i.body, i.link, i.time, i.ttl, i.ttd, i.tts, json(i.action)
from items i from items i
join channels c on i.source = c.source join channels c on i.source = c.source
where where
c.name = ? c.name = ?
and i.active <> 0 and i.active <> 0
and i.created + i.tts < ?
order by case when i.time = 0 then i.created else i.time end, i.id order by case when i.time = 0 then i.created else i.time end, i.id
`, channel) `, channel, now)
} }
func GetAllItemsForChannel(db DB, channel string) ([]Item, error) { func GetAllItemsForChannel(db DB, channel string) ([]Item, error) {
return getItems(db, ` return getItems(db, `
select select
i.source, i.id, i.created, i.active, i.title, i.author, i.body, i.link, i.time, json(i.action) i.source, i.id, i.created, i.active, i.title, i.author, i.body, i.link, i.time, i.ttl, i.ttd, i.tts, json(i.action)
from items i from items i
join channels c on i.source = c.source join channels c on i.source = c.source
where where

View File

@ -8,6 +8,7 @@ import (
) )
func AssertItemIs(t *testing.T, item Item, expected string) { func AssertItemIs(t *testing.T, item Item, expected string) {
t.Helper()
actual := fmt.Sprintf( actual := fmt.Sprintf(
"%s/%s/%t/%s/%s/%s/%s/%d", "%s/%s/%t/%s/%s/%s/%s/%d",
item.Source, item.Source,
@ -31,8 +32,8 @@ func TestAddItem(t *testing.T) {
} }
if err := AddItems(db, []Item{ if err := AddItems(db, []Item{
{"test", "one", 0, true, "", "", "", "", 0, nil}, {Source: "test", Id: "one", Active: true},
{"test", "two", 0, true, "title", "author", "body", "link", 123456, nil}, {"test", "two", 0, true, "title", "author", "body", "link", 123456, 1, 2, 3, nil},
}); err != nil { }); err != nil {
t.Fatalf("failed to add items: %v", err) t.Fatalf("failed to add items: %v", err)
} }

View File

@ -22,6 +22,15 @@ func SetPassword(db DB, password string) error {
return nil return nil
} }
func HasPassword(db DB) (bool, error) {
var i int
err := db.QueryRow("select count(*) from password").Scan(&i)
if err != nil {
return false, err
}
return i > 0, nil
}
func CheckPassword(db DB, password string) (bool, error) { func CheckPassword(db DB, password string) (bool, error) {
var hash string var hash string
err := db.QueryRow("select hash from password limit 1").Scan(&hash) err := db.QueryRow("select hash from password limit 1").Scan(&hash)

View File

@ -1,6 +1,8 @@
package core package core
import ( import (
"database/sql"
"errors"
"fmt" "fmt"
"log" "log"
"time" "time"
@ -68,22 +70,78 @@ func SetState(db DB, source string, state []byte) error {
return err return err
} }
func getSourceTtx(db DB, source string, env string) (int, error) {
row := db.QueryRow(`
select value
from envs
where source = ?
and name = ?
`, source, env)
var ttx int
if err := row.Scan(&ttx); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return 0, nil
}
return 0, err
}
return ttx, nil
}
func GetSourcePostProcessor(db DB, source string) (func(item Item) Item, error) {
ttl, err := getSourceTtx(db, source, "INTAKE_TTL")
if err != nil {
return nil, err
}
ttd, err := getSourceTtx(db, source, "INTAKE_TTD")
if err != nil {
return nil, err
}
tts, err := getSourceTtx(db, source, "INTAKE_TTS")
if err != nil {
return nil, err
}
return func(item Item) Item {
if ttl != 0 {
item.Ttl = ttl
}
if ttd != 0 {
item.Ttd = ttd
}
if tts != 0 {
item.Tts = tts
}
return item
}, nil
}
// Given the results of a fetch, add new items, update existing items, and delete expired items. // Given the results of a fetch, add new items, update existing items, and delete expired items.
// //
// Returns the number of new and deleted items on success. // Returns the number of new and deleted items on success.
func UpdateWithFetchedItems(db DB, source string, state []byte, items []Item) (int, int, error) { func UpdateWithFetchedItems(
db DB,
source string,
state []byte,
items []Item,
now time.Time,
) (int, int, error) {
var new int var new int
var del int var del int
var err error var err error
err = db.Transact(func(tx DB) error { err = db.Transact(func(tx DB) error {
new, del, err = updateWithFetchedItemsTx(tx, source, state, items) new, del, err = updateWithFetchedItemsTx(tx, source, state, items, now)
return err return err
}) })
return new, del, err return new, del, err
} }
// Implementation logic for [UpdateWithFetchedItems], which executes this inside a transaction. // Implementation logic for [UpdateWithFetchedItems], which executes this inside a transaction.
func updateWithFetchedItemsTx(db DB, source string, state []byte, items []Item) (int, int, error) { func updateWithFetchedItemsTx(
db DB,
source string,
state []byte,
items []Item,
now time.Time,
) (int, int, error) {
// Get the existing items // Get the existing items
existingItems, err := GetAllItemsForSource(db, source) existingItems, err := GetAllItemsForSource(db, source)
if err != nil { if err != nil {
@ -125,6 +183,11 @@ func updateWithFetchedItemsTx(db DB, source string, state []byte, items []Item)
return 0, 0, fmt.Errorf("failed to get envs for %s: %v", source, err) return 0, 0, fmt.Errorf("failed to get envs for %s: %v", source, err)
} }
postProcess, err := GetSourcePostProcessor(db, source)
if err != nil {
return 0, 0, fmt.Errorf("failed to get post-processor for %s: %v", source, err)
}
// If the source has an on-create trigger, run it for each new item // If the source has an on-create trigger, run it for each new item
// On-create errors are ignored to avoid failing the fetch // On-create errors are ignored to avoid failing the fetch
onCreateArgv, err := GetArgvForAction(db, source, "on_create") onCreateArgv, err := GetArgvForAction(db, source, "on_create")
@ -132,7 +195,7 @@ func updateWithFetchedItemsTx(db DB, source string, state []byte, items []Item)
var updatedNewItems []Item var updatedNewItems []Item
for _, item := range newItems { for _, item := range newItems {
var updatedItem Item var updatedItem Item
updatedItem, state, err = ExecuteItemAction(item, onCreateArgv, envs, state, time.Minute) updatedItem, state, err = ExecuteItemAction(item, onCreateArgv, envs, state, time.Minute, postProcess)
if err != nil { if err != nil {
log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err) log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err)
} }
@ -154,7 +217,7 @@ func updateWithFetchedItemsTx(db DB, source string, state []byte, items []Item)
// Check expired items for deletion // Check expired items for deletion
idsToDelete := map[string]bool{} idsToDelete := map[string]bool{}
for _, item := range existingItems { for _, item := range existingItems {
if expiredIds[item.Id] && item.Deletable() { if expiredIds[item.Id] && item.Deletable(now) {
idsToDelete[item.Id] = true idsToDelete[item.Id] = true
} }
} }

View File

@ -50,20 +50,24 @@ func TestUpdateSourceAddAndDelete(t *testing.T) {
if err := AddSource(db, "test"); err != nil { if err := AddSource(db, "test"); err != nil {
t.Fatal(err) t.Fatal(err)
} }
update := func(items []Item) (int, int, error) {
t.Helper()
return UpdateWithFetchedItems(db, "test", nil, items, time.Now())
}
a := Item{Source: "test", Id: "a"} a := Item{Source: "test", Id: "a"}
add, del, err := UpdateWithFetchedItems(db, "test", nil, []Item{a}) add, del, err := update([]Item{a})
if add != 1 || del != 0 || err != nil { if add != 1 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
} }
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a}) add, del, err = update([]Item{a})
if add != 0 || del != 0 || err != nil { if add != 0 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
} }
b := Item{Source: "test", Id: "b"} b := Item{Source: "test", Id: "b"}
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a, b}) add, del, err = update([]Item{a, b})
if add != 1 || del != 0 || err != nil { if add != 1 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
} }
@ -71,17 +75,17 @@ func TestUpdateSourceAddAndDelete(t *testing.T) {
if _, err = DeactivateItem(db, "test", "a"); err != nil { if _, err = DeactivateItem(db, "test", "a"); err != nil {
t.Fatal(err) t.Fatal(err)
} }
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a, b}) add, del, err = update([]Item{a, b})
if add != 0 || del != 0 || err != nil { if add != 0 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
} }
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{b}) add, del, err = update([]Item{b})
if add != 0 || del != 1 || err != nil { if add != 0 || del != 1 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
} }
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{b}) add, del, err = update([]Item{b})
if add != 0 || del != 0 || err != nil { if add != 0 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
} }
@ -97,7 +101,7 @@ func TestUpdateSourceTransaction(t *testing.T) {
b := Item{Source: "s", Id: "b"} b := Item{Source: "s", Id: "b"}
// Add and deactivate a so it will be deleted on next fetch without it // Add and deactivate a so it will be deleted on next fetch without it
if add, _, err := UpdateWithFetchedItems(db, "s", nil, []Item{a}); add != 1 || err != nil { if add, _, err := UpdateWithFetchedItems(db, "s", nil, []Item{a}, time.Now()); add != 1 || err != nil {
t.Fatalf("expected 1 add, got %d and err %v", add, err) t.Fatalf("expected 1 add, got %d and err %v", add, err)
} }
if _, err := DeactivateItem(db, "s", "a"); err != nil { if _, err := DeactivateItem(db, "s", "a"); err != nil {
@ -114,7 +118,7 @@ func TestUpdateSourceTransaction(t *testing.T) {
return nil return nil
}, },
} }
add, del, err := UpdateWithFetchedItems(fdb, "s", nil, []Item{b}) add, del, err := UpdateWithFetchedItems(fdb, "s", nil, []Item{b}, time.Now())
if add != 0 || del != 0 || err == nil { if add != 0 || del != 0 || err == nil {
t.Fatalf("expected failure, got %d %d %v", add, del, err) t.Fatalf("expected failure, got %d %d %v", add, del, err)
} }
@ -142,7 +146,8 @@ func TestOnCreateAction(t *testing.T) {
} }
execute := func(argv []string) []Item { execute := func(argv []string) []Item {
items, _, err := Execute("test", argv, nil, nil, "", time.Minute) t.Helper()
items, _, err := Execute("test", argv, nil, nil, "", time.Minute, nil)
if err != nil { if err != nil {
t.Fatal("unexpected error executing test fetch") t.Fatal("unexpected error executing test fetch")
} }
@ -152,13 +157,20 @@ func TestOnCreateAction(t *testing.T) {
return items return items
} }
update := func(items []Item) (int, int, error) {
t.Helper()
return UpdateWithFetchedItems(db, "test", nil, items, time.Now())
}
onCreate := func(argv []string) { onCreate := func(argv []string) {
t.Helper()
if err := UpdateAction(db, "test", "on_create", argv); err != nil { if err := UpdateAction(db, "test", "on_create", argv); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
getItem := func(id string) Item { getItem := func(id string) Item {
t.Helper()
item, err := GetItem(db, "test", id) item, err := GetItem(db, "test", id)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -169,7 +181,7 @@ func TestOnCreateAction(t *testing.T) {
// Noop on_create works // Noop on_create works
onCreate([]string{"tee"}) onCreate([]string{"tee"})
items := execute([]string{"jq", "-cn", `{id: "one"}`}) items := execute([]string{"jq", "-cn", `{id: "one"}`})
add, _, err := UpdateWithFetchedItems(db, "test", nil, items) add, _, err := update(items)
if add != 1 || err != nil { if add != 1 || err != nil {
t.Fatal("failed update with noop oncreate") t.Fatal("failed update with noop oncreate")
} }
@ -185,7 +197,7 @@ func TestOnCreateAction(t *testing.T) {
if items[0].Title != "Hello, World" { if items[0].Title != "Hello, World" {
t.Fatal("unexpected title") t.Fatal("unexpected title")
} }
add, _, err = UpdateWithFetchedItems(db, "test", nil, items) add, _, err = update(items)
if add != 1 || err != nil { if add != 1 || err != nil {
t.Fatal("failed update with alter oncreate") t.Fatal("failed update with alter oncreate")
} }
@ -200,7 +212,7 @@ func TestOnCreateAction(t *testing.T) {
if items[0].Link != "" { if items[0].Link != "" {
t.Fatal("unexpected link") t.Fatal("unexpected link")
} }
add, _, err = UpdateWithFetchedItems(db, "test", nil, items) add, _, err = update(items)
if add != 1 || err != nil { if add != 1 || err != nil {
t.Fatal("failed update with augment oncreate") t.Fatal("failed update with augment oncreate")
} }
@ -215,7 +227,7 @@ func TestOnCreateAction(t *testing.T) {
if items[0].Link != "gopher://go.dev" { if items[0].Link != "gopher://go.dev" {
t.Fatal("missing link") t.Fatal("missing link")
} }
add, _, err = UpdateWithFetchedItems(db, "test", nil, items) add, _, err = update(items)
if add != 1 || err != nil { if add != 1 || err != nil {
t.Fatal("failed update with attempted deletion oncreate") t.Fatal("failed update with attempted deletion oncreate")
} }
@ -226,7 +238,7 @@ func TestOnCreateAction(t *testing.T) {
// item is created if on_create fails // item is created if on_create fails
onCreate([]string{"false"}) onCreate([]string{"false"})
items = execute([]string{"jq", "-cn", `{id: "five"}`}) items = execute([]string{"jq", "-cn", `{id: "five"}`})
add, _, err = UpdateWithFetchedItems(db, "test", nil, items) add, _, err = update(items)
if add != 1 || err != nil { if add != 1 || err != nil {
t.Fatal("failed update with failing oncreate") t.Fatal("failed update with failing oncreate")
} }
@ -240,7 +252,7 @@ func TestOnCreateAction(t *testing.T) {
if items[0].Title != "before" { if items[0].Title != "before" {
t.Fatal("unexpected title") t.Fatal("unexpected title")
} }
add, _, err = UpdateWithFetchedItems(db, "test", nil, items) add, _, err = update(items)
if add != 1 || err != nil { if add != 1 || err != nil {
t.Fatal("failed update with bad exit code oncreate") t.Fatal("failed update with bad exit code oncreate")
} }
@ -251,7 +263,7 @@ func TestOnCreateAction(t *testing.T) {
// on_create can't change id, active, or created // on_create can't change id, active, or created
onCreate([]string{"jq", "-c", `.id = "seven"; .active = false; .created = 123456`}) onCreate([]string{"jq", "-c", `.id = "seven"; .active = false; .created = 123456`})
items = execute([]string{"jq", "-cn", `{id: "seven"}`}) items = execute([]string{"jq", "-cn", `{id: "seven"}`})
add, _, err = UpdateWithFetchedItems(db, "test", nil, items) add, _, err = update(items)
if add != 1 || err != nil { if add != 1 || err != nil {
t.Fatal("failed update with invalid field changes oncreate") t.Fatal("failed update with invalid field changes oncreate")
} }
@ -287,3 +299,26 @@ func TestSourceState(t *testing.T) {
t.Fatalf("expected hello, world, got %s", state) t.Fatalf("expected hello, world, got %s", state)
} }
} }
func TestSourceTtx(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "s"); err != nil {
t.Fatal(err)
}
if err := SetEnvs(db, "s", []string{
"INTAKE_TTL=30",
"INTAKE_TTD=60",
"INTAKE_TTS=90",
}); err != nil {
t.Fatal(err)
}
postProcess, err := GetSourcePostProcessor(db, "s")
if err != nil {
t.Fatal(err)
}
before := Item{Source: "s", Id: "i"}
after := postProcess(before)
if after.Ttl != 30 || after.Ttd != 60 || after.Tts != 90 {
t.Fatalf("Missing value after postProcess: ttl = %d, ttd = %d, tts = %d", after.Ttl, after.Ttd, after.Tts)
}
}

View File

@ -27,6 +27,9 @@ create table items(
body text, body text,
link text, link text,
time int, time int,
ttl int,
ttd int,
tts int,
action blob, action blob,
primary key (source, id), primary key (source, id),
foreign key (source) references sources (name) on delete cascade foreign key (source) references sources (name) on delete cascade
@ -42,3 +45,8 @@ create table password(
hash text, hash text,
unique (id) on conflict replace unique (id) on conflict replace
) strict; ) strict;
create table sessions(
id text not null,
expires int default 0,
primary key (id)
) strict;

View File

@ -8,18 +8,34 @@ tmp/intake migrate
tmp/intake source add -s feedtest tmp/intake source add -s feedtest
tmp/intake item add -s feedtest --id "this-item-has-no-title" tmp/intake item add -s feedtest --id "this-item-has-no-title"
tmp/intake item add -s feedtest --title "This item has only a title" tmp/intake item add -s feedtest --id a --title "This item has only a title"
tmp/intake item add -s feedtest --title "Title and body" --body "This is the item body" tmp/intake item add -s feedtest --id b --title "Title and body" --body "This is the item body"
tmp/intake item add -s feedtest --title "Title and link" --link "#" tmp/intake item add -s feedtest --id c --title "Title and link" --link "#"
tmp/intake item add -s feedtest --title "Title, link, body" --link "#" --body "This is the body" tmp/intake item add -s feedtest --id d --title "Title, link, body" --link "#" --body "This is the body"
tmp/intake item add -s feedtest --title "<b>HTML title</b>" --link "#" --body "<i>HTML body</i>" tmp/intake item add -s feedtest --id e --title "<b>HTML title</b>" --link "#" --body "<i>HTML body</i>"
tmp/intake item add -s feedtest --title "Title and author" --author "Authorname" tmp/intake item add -s feedtest --id f --title "Title and author" --author "Authorname"
tmp/intake item add -s feedtest --title "Title, author, time" --author "Authorname" --time 1700000000 tmp/intake item add -s feedtest --id g --title "Title, author, time" --author "Authorname" --time 1700000000
tmp/intake item add -s feedtest --title "Title, time" --time 1737780324 tmp/intake item add -s feedtest --id h --title "Title, time" --time 1737780324
tmp/intake item add -s feedtest --title "Title, author, body" --author "Authorname" --body "Hello body!" tmp/intake item add -s feedtest --id i --title "Title, author, body" --author "Authorname" --body "Hello body!"
tmp/intake item add -s feedtest --title "Title, author, time, body" --author "Authorname" --time 1700000000 --body "Hello body!" tmp/intake item add -s feedtest --id j --title "Title, author, time, body" --author "Authorname" --time 1700000000 --body "Hello body!"
tmp/intake item add -s feedtest --title "Title, time, body" --time 1737780324 --body "Hello, body!" tmp/intake item add -s feedtest --id k --title "Title, time, body" --time 1737780324 --body "Hello, body!"
tmp/intake item add -s feedtest --id l --title "TTL 30s" --ttl 30
tmp/intake item add -s feedtest --id m --title "TTL 10d" --ttl 864000
tmp/intake item add -s feedtest --id n --title "TTD 30s" --ttd 30
tmp/intake item add -s feedtest --id o --title "TTS 30s" --tts 30
tmp/intake item add -s feedtest --id p --title "TTS 10d" --tts 864000
tmp/intake item add -s feedtest --id q --title "TTS -10d" --tts "-864000"
tmp/intake source add -s spook tmp/intake source add -s spook
tmp/intake action add -s spook -a spookier -- jq -c '.title = .title + "o"' tmp/intake action add -s spook -a spookier -- jq -c '.title = .title + "o"'
tmp/intake item add -s spook --id boo --title "Boo" --action '{"spookier": true}' tmp/intake item add -s spook --id boo --title "Boo" --action '{"spookier": true}'
tmp/intake source add -s nothing
tmp/intake channel add -c all -s feedtest
tmp/intake channel add -c all -s spook
tmp/intake channel add -c none -s nothing
echo "hello" | tmp/intake passwd --stdin
echo "hello" | tmp/intake passwd --stdin --verify

View File

@ -2,7 +2,7 @@
{{ define "content" -}} {{ define "content" -}}
<article class="center"> <article class="center">
<span class="item-title"> <span class="feed-controls">
<a href="/">Home</a> <a href="/">Home</a>
[<a href="?inactive=0">Active</a> | <a href="?inactive=1">All</a>] [<a href="?inactive=0">Active</a> | <a href="?inactive=1">All</a>]
</span> </span>
@ -23,7 +23,7 @@
{{ else }} {{ else }}
<article class="center"> <article class="center">
<span class="item-title">Feed is empty</span> <span class="feed-controls">Feed is empty</span>
</article> </article>
{{ end }} {{ end }}
{{/* end if .Items */}} {{/* end if .Items */}}

View File

@ -3,7 +3,7 @@
{{ define "content" -}} {{ define "content" -}}
<article> <article>
<details open> <details open>
<summary><span class="item-title">Channels</span></summary> <summary><span class="feed-controls">Channels</span></summary>
{{ if .Channels }} {{ if .Channels }}
{{ range .Channels }} {{ range .Channels }}
<p><a href="/channel/{{ .Name }}"> <p><a href="/channel/{{ .Name }}">
@ -22,7 +22,7 @@
<article> <article>
<details> <details>
<summary><span class="item-title">Sources</span></summary> <summary><span class="feed-controls">Sources</span></summary>
{{ if .Sources }} {{ if .Sources }}
<table class="intake-sources"> <table class="intake-sources">
{{ range .Sources }} {{ range .Sources }}

View File

@ -3,6 +3,7 @@ package html
import ( import (
"embed" "embed"
"encoding/json" "encoding/json"
"fmt"
"html/template" "html/template"
"io" "io"
"log" "log"
@ -15,11 +16,36 @@ func rawHtml(str string) template.HTML {
return template.HTML(str) return template.HTML(str)
} }
func tsToDate(t int) string { func dateFormat(tm time.Time) string {
tm := time.Unix(int64(t), 0).UTC()
return tm.Format(time.DateTime) return tm.Format(time.DateTime)
} }
func tsToDate(t int) time.Time {
return time.Unix(int64(t), 0).UTC()
}
func until(tm time.Time) string {
now := time.Now() // TODO pass this value in
dur := tm.Sub(now).Round(time.Second)
var format string
if dur < 0 {
format = "%s ago"
dur = dur.Abs()
} else {
format = "in %s"
}
var length string
if dur.Hours() > 24 {
length = fmt.Sprintf("%dd", int(dur.Hours()/24))
} else {
length = dur.String()
}
return fmt.Sprintf(format, length)
}
func massDeactivateVals(items []core.Item) string { func massDeactivateVals(items []core.Item) string {
var shorts []string var shorts []string
for _, item := range items { for _, item := range items {
@ -37,7 +63,9 @@ func massDeactivateVals(items []core.Item) string {
var funcs = template.FuncMap{ var funcs = template.FuncMap{
"raw": rawHtml, "raw": rawHtml,
"dateFormat": dateFormat,
"tsToDate": tsToDate, "tsToDate": tsToDate,
"until": until,
"massDeacVars": massDeactivateVals, "massDeacVars": massDeactivateVals,
} }
@ -71,8 +99,10 @@ type HomeData struct {
Sources []SourceData Sources []SourceData
} }
func Home(writer io.Writer, data HomeData) error { func Home(writer io.Writer, data HomeData) {
return home.Execute(writer, data) if err := home.Execute(writer, data); err != nil {
log.Printf("error: failed to render home: %v", err)
}
} }
var feed = load("feed.html", "item.html") var feed = load("feed.html", "item.html")
@ -81,8 +111,10 @@ type FeedData struct {
Items []core.Item Items []core.Item
} }
func Feed(writer io.Writer, data FeedData) error { func Feed(writer io.Writer, data FeedData) {
return feed.Execute(writer, data) if err := feed.Execute(writer, data); err != nil {
log.Printf("error: failed to render feed: %v", err)
}
} }
var item = load("itemPage.html", "item.html") var item = load("itemPage.html", "item.html")
@ -92,6 +124,22 @@ type ItemData struct {
Open bool Open bool
} }
func Item(writer io.Writer, data ItemData) error { func Item(writer io.Writer, data ItemData) {
return item.Execute(writer, data) if err := item.Execute(writer, data); err != nil {
log.Printf("error: failed to render item: %v", err)
}
}
var login = load("login.html")
type LoginData struct {
Error string
}
func Login(writer io.Writer, data LoginData) error {
err := login.Execute(writer, data)
if err != nil {
log.Printf("render error: %v", err)
}
return err
} }

View File

@ -10,9 +10,19 @@ article {
word-break: break-word; word-break: break-word;
display: flow-root; display: flow-root;
} }
.feed-controls {
font-size: 1.2em;
}
.item-title { .item-title {
font-size: 1.4em; font-size: 1.4em;
} }
.item-title::before, details[open] summary .item-title::before {
content: "= ";
opacity: 0.3;
}
summary .item-title::before {
content: "> ";
}
.item-button { .item-button {
font-size: 1em; font-size: 1em;
float:right; float:right;

View File

@ -28,7 +28,7 @@
<span class="item-title">{{ or .Title .Id | raw }}</span> <span class="item-title">{{ or .Title .Id | raw }}</span>
{{- end }} {{- end }}
{{ define "item-class" -}}{{ if not .Active }}strikethru {{ end }}{{ if not .Active }}fade{{ end }}{{- end}} {{ define "item-class" -}}{{ if not .Active }}strikethru {{ end }}{{ if not .Visible }}fade{{ end }}{{- end}}
{{ define "item" -}} {{ define "item" -}}
<article <article
@ -56,14 +56,17 @@
{{ if or .Author .Time }} {{ if or .Author .Time }}
<span class="item-info"> <span class="item-info">
{{ .Author }} {{ .Author }}
{{ .Time | tsToDate }} <span title="{{ .Time | tsToDate | until }}">{{ .Time | tsToDate | dateFormat }}</span>
</span><br> </span><br>
{{ end -}} {{ end -}}
{{- /* source/id/created footer line */ -}} {{- /* source/id/created footer line */ -}}
<span class="item-info"> <span class="item-info">
<a href="/item/{{ .Source }}/{{ .Id }}">{{ .Source }}/{{ .Id }}</a> <a href="/item/{{ .Source }}/{{ .Id }}">{{ .Source }}/{{ .Id }}</a>
{{ .Created | tsToDate }} <span title="{{ .Created | tsToDate | until }}">{{ .Created | tsToDate | dateFormat }}</span>
{{ if .Ttl }}<span title="TTL {{ .TtlTime | dateFormat }}, {{ .TtlTime | until }}">[L]</span>{{ end }}
{{ if .Ttd }}<span title="TTD {{ .TtdTime | dateFormat }}, {{ .TtdTime | until }}">[D]</span>{{ end }}
{{ if .Tts }}<span title="TTS {{ .TtsTime | dateFormat }}, {{ .TtsTime | until }}">[S]</span>{{ end }}
</span> </span>
</article> </article>
{{ end -}} {{ end -}}

16
web/html/login.html Normal file
View File

@ -0,0 +1,16 @@
{{ define "title" }}Intake - Login{{ end }}
{{ define "content" -}}
<article class="center">
<form method="post" action="/login">
<p>
<input name="password" type="password"/>
</p>
<button
hx-post="/login"
hx-target="#errors"
hx-swap="innerHTML"
>Submit</button>
</form>
<p id="errors">{{ .Error }}</p>
{{ end }}

View File

@ -72,7 +72,12 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) {
return return
} }
newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute) postProcess, err := core.GetSourcePostProcessor(env.db, source)
if err != nil {
log.Fatalf("error: failed to get source post-processor: %v", err)
}
newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute, postProcess)
if err != nil { if err != nil {
http.Error(writer, err.Error(), 500) http.Error(writer, err.Error(), 500)
return return

123
web/login.go Normal file
View File

@ -0,0 +1,123 @@
package web
import (
"crypto/rand"
"database/sql"
"errors"
"fmt"
"log"
"net/http"
"time"
"github.com/Jaculabilis/intake/core"
"github.com/Jaculabilis/intake/web/html"
)
var AuthCookieName string = "intake_auth"
var AuthDuration time.Duration = time.Hour * 24 * 7
func newSession(db core.DB) (string, error) {
bytes := make([]byte, 32)
_, err := rand.Read(bytes)
if err != nil {
return "", err
}
session := fmt.Sprintf("%x", bytes)
expires := int(time.Now().Add(AuthDuration).Unix())
_, err = db.Exec(`
insert into sessions (id, expires)
values (?, ?)
`, session, expires)
if err != nil {
return "", err
}
return session, nil
}
func checkSession(db core.DB, session string) (bool, error) {
row := db.QueryRow(`
select expires
from sessions
where id = ?
`, session)
var expires int
if err := row.Scan(&expires); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return false, nil
}
return false, err
}
expiration := time.Unix(int64(expires), 0).UTC()
if time.Now().After(expiration) {
return false, nil
}
return true, nil
}
func renderLoginWithErrorMessage(writer http.ResponseWriter, req *http.Request, message string) {
// If an htmx interaction caused the auth error, refresh the page to get the login rendered
if req.Header.Get("HX-Request") != "" {
writer.Header()["HX-Refresh"] = []string{"true"}
writer.WriteHeader(http.StatusForbidden)
return
}
data := html.LoginData{Error: message}
if err := html.Login(writer, data); err != nil {
log.Printf("render error: %v", err)
}
}
func (env *Env) authed(handler http.HandlerFunc) http.HandlerFunc {
return func(writer http.ResponseWriter, req *http.Request) {
required, err := core.HasPassword(env.db)
if err != nil {
renderLoginWithErrorMessage(writer, req, fmt.Sprintf("error: failed to check for password: %v", err))
return
}
if required {
cookie, err := req.Cookie(AuthCookieName)
if errors.Is(err, http.ErrNoCookie) {
renderLoginWithErrorMessage(writer, req, "Your session is expired or invalid")
return
}
if valid, err := checkSession(env.db, cookie.Value); !valid || err != nil {
renderLoginWithErrorMessage(writer, req, "Your session is expired or invalid")
return
}
}
handler(writer, req)
}
}
func (env *Env) login(writer http.ResponseWriter, req *http.Request) {
if err := req.ParseForm(); err != nil {
http.Error(writer, fmt.Sprintf("error: failed to parse form: %v", err), http.StatusOK)
return
}
password := req.PostForm.Get("password")
pass, err := core.CheckPassword(env.db, password)
if err != nil {
http.Error(writer, fmt.Sprintf("error: failed to check password: %v", err), http.StatusOK)
return
}
if !pass {
http.Error(writer, "Incorrect password", http.StatusOK)
return
}
session, err := newSession(env.db)
if err != nil {
http.Error(writer, fmt.Sprintf("error: failed to start session: %v", err), http.StatusOK)
return
}
cookie := http.Cookie{
Name: AuthCookieName,
Value: session,
}
http.SetCookie(writer, &cookie)
writer.Header()["HX-Refresh"] = []string{"true"}
writer.WriteHeader(http.StatusNoContent)
}

View File

@ -19,23 +19,31 @@ func logged(handler http.HandlerFunc) http.HandlerFunc {
} }
} }
func handleFunc(pattern string, handler http.HandlerFunc) { func handleFunc(
http.HandleFunc(pattern, logged(handler)) pattern string,
handler http.HandlerFunc,
middlewares ...func(http.HandlerFunc) http.HandlerFunc,
) {
for _, middleware := range middlewares {
handler = middleware(handler)
}
http.HandleFunc(pattern, handler)
} }
func RunServer(db core.DB, addr string, port string) { func RunServer(db core.DB, addr string, port string) {
env := &Env{db} env := &Env{db}
bind := net.JoinHostPort(addr, port) bind := net.JoinHostPort(addr, port)
handleFunc("GET /", env.getRoot) handleFunc("GET /", env.getRoot, env.authed, logged)
handleFunc("GET /style.css", env.getStyle) handleFunc("GET /style.css", env.getStyle, logged)
handleFunc("GET /htmx.org@2.0.4.js", env.getScript) handleFunc("GET /htmx.org@2.0.4.js", env.getScript, logged)
handleFunc("GET /source/{source}", env.getSource) handleFunc("POST /login", env.login, logged)
handleFunc("GET /channel/{channel}", env.getChannel) handleFunc("GET /source/{source}", env.getSource, env.authed, logged)
handleFunc("GET /item/{source}/{id}", env.getItem) handleFunc("GET /channel/{channel}", env.getChannel, env.authed, logged)
handleFunc("DELETE /item/{source}/{id}", env.deleteItem) handleFunc("GET /item/{source}/{id}", env.getItem, env.authed, logged)
handleFunc("POST /item/{source}/{id}/action/{action}", env.doAction) handleFunc("DELETE /item/{source}/{id}", env.deleteItem, env.authed, logged)
handleFunc("POST /mass-deactivate", env.massDeactivate) handleFunc("POST /item/{source}/{id}/action/{action}", env.doAction, env.authed, logged)
handleFunc("POST /mass-deactivate", env.massDeactivate, env.authed, logged)
log.Fatal(http.ListenAndServe(bind, nil)) log.Fatal(http.ListenAndServe(bind, nil))
} }