Compare commits
15 Commits
ff943704c4
...
bc9f2847a1
Author | SHA1 | Date | |
---|---|---|---|
bc9f2847a1 | |||
fd7d9b00b3 | |||
d4c93265a2 | |||
c08458a5cb | |||
2acd6f397f | |||
7cbf48a9b1 | |||
b4db1d4f11 | |||
6c6bde42a1 | |||
2894493d34 | |||
fee7c9225b | |||
1ee91c1abe | |||
c0d8e8ae31 | |||
f7dd694b67 | |||
d6a49593b7 | |||
cb7bd6e3ba |
2
Makefile
2
Makefile
@ -4,7 +4,7 @@ help: ## display this help
|
||||
@awk 'BEGIN{FS = ":.*##"; printf "\033[1m\nUsage\n \033[1;92m make\033[0;36m <target>\033[0m\n"} /^[a-zA-Z0-9_-]+:.*?##/ { printf " \033[36m%-15s\033[0m %s\n", $$1, $$2 } ' $(MAKEFILE_LIST)
|
||||
|
||||
serve: ## Run "intake serve" with live reload
|
||||
@air -build.cmd "go build -o tmp/intake" -build.bin tmp/intake -build.args_bin serve,--data-dir,tmp
|
||||
@air -build.cmd "go build -o tmp/intake" -build.bin tmp/intake -build.args_bin serve,--data-dir,tmp -build.include_ext "go,html,css"
|
||||
|
||||
test-data: ## Recreate test data in tmp/
|
||||
@test/test_items.sh
|
110
README.md
110
README.md
@ -4,65 +4,6 @@ Intake is an arbitrary feed aggregator that generalizes the concept of a feed.
|
||||
Rather than being restricted to parsing items out of an RSS feed, Intake provides a middle layer of executing arbitrary commands that conform to a JSON-based specification.
|
||||
An Intake source can parse an RSS feed, but it can also scrape a website without a feed, provide additional logic to filter or annotate feed items, or integrate with an API.
|
||||
|
||||
## Development
|
||||
|
||||
Parity with existing Python version
|
||||
|
||||
* [x] create sources
|
||||
* [ ] rename sources
|
||||
* fetch sources
|
||||
* [x] create and delete items
|
||||
* [x] update existing items
|
||||
* [ ] support item TTL and TTD
|
||||
* [x] on_create triggers
|
||||
* [ ] on_delete triggers
|
||||
* [x] dry-run
|
||||
* item actions
|
||||
* [x] create
|
||||
* [x] edit
|
||||
* [ ] rename
|
||||
* [x] delete
|
||||
* [x] execute
|
||||
* [x] require items to declare action support
|
||||
* [x] state files
|
||||
* [x] source environment
|
||||
* [ ] working directory set
|
||||
* [x] update web UI credentials
|
||||
* [ ] automatic crontab integration
|
||||
* [ ] feed supports item TTS
|
||||
* [x] data directory from envvars
|
||||
* [ ] source-level tt{s,d,l}
|
||||
* [ ] source batching
|
||||
* channels
|
||||
* [x] create
|
||||
* [x] edit
|
||||
* [ ] rename
|
||||
* [x] delete
|
||||
* feeds
|
||||
* [x] show items
|
||||
* [x] deactivate items
|
||||
* [x] mass deactivate
|
||||
* [ ] punt
|
||||
* [x] trigger actions
|
||||
* [x] add ad-hoc items
|
||||
* [x] show/hide deactivated items
|
||||
* [ ] show/hide tts items
|
||||
* [x] sort by time ?? created
|
||||
* [ ] paging
|
||||
* [ ] NixOS module
|
||||
* [ ] NixOS module demo
|
||||
|
||||
Additional features
|
||||
|
||||
* [ ] metric reporting
|
||||
* [ ] on action failure, create an error item with logs
|
||||
* [ ] first-party password handling instead of basic auth and htpasswd
|
||||
* [ ] items gracefully add new fields and `action` keys
|
||||
* [ ] arbitrary date punt
|
||||
* [ ] HTTP edit item
|
||||
* [ ] sort crontab entries
|
||||
* [ ] TUI feed view
|
||||
|
||||
## Overview
|
||||
|
||||
In Intake, a _source_ represents a single content feed of discrete _items_, such as a blog and its posts or a website and its pages.
|
||||
@ -94,16 +35,25 @@ Any unspecified field is equivalent to the empty string, object, or 0, depending
|
||||
| `body` | Optional | Body text of the item as raw HTML. This will be displayed in the item without further processing! Consider your sources' threat models against injection attacks.
|
||||
| `link` | Optional | A hyperlink associated with the item.
|
||||
| `time` | Optional | A Unix timestamp associated with the item, not necessarily when the item was created. Items sort by `time` when it is defined and fall back to `created`. Displayed in the item footer.
|
||||
| `ttl` | Optional | The time-to-live of the item. An item with `ttl` defined is not deleted by feed updates as long as `created + ttl` is in the future, even if it is inactive.
|
||||
| `ttd` | Optional | The time-to-die of the item. An item with `ttd` defined is deleted by feed updates if `created + ttd` is in the past, even if it is active.
|
||||
| `tts` | Optional | The time-to-show of the item. An item with `tts` defined is hidden from feeds before the time `created + tts`.
|
||||
| `action` | Optional | A JSON object with keys for all supported actions. No schema is imposed on the values.
|
||||
|
||||
Existing items are updated with new values when a fetch or action produces them, with some exceptions:
|
||||
|
||||
* Automatic fields cannot be changed.
|
||||
* If a field's previous value is non-empty and the new value is empty, the old value is kept.
|
||||
* Source-level settings for `ttl`, `ttd`, or `tts` override the item's values.
|
||||
* Fields cannot be updated from a non-empty value to an empty value.
|
||||
If a field's previous value is non-empty and the new value is empty, the old value is kept.
|
||||
|
||||
### Sources
|
||||
|
||||
A source is identified by its name. A minimally functional source requires a `fetch` action that returns items.
|
||||
A source is identified by its name.
|
||||
A minimally functional source requires a `fetch` action that returns items.
|
||||
|
||||
TTL, TTD, and TTS can be configured at the source level by setting the environment variables `INTAKE_TTL`, `INTAKE_TTS`, or `INTAKE_TTS` to an integer value.
|
||||
These values override any `ttl`, `ttd`, or `tts` value returned by a fetch or action.
|
||||
|
||||
### Action API
|
||||
|
||||
@ -136,6 +86,38 @@ The item does not need to declare support for `on_create`.
|
||||
This action is not accessible through the web interface, so if you need to retry the action, you should create another action with the same command as `on_create`.
|
||||
If an item's `on_create` fails, the item is still created, but without any changes made by action.
|
||||
|
||||
The special action `on_delete` is like `on_create`, except it runs right before an item is deleted.
|
||||
It does not require explicit support and is not accessible in the web interface.
|
||||
The output of `on_delete` is ignored; it is primarily for causing side effects like managing state.
|
||||
### Web interface
|
||||
|
||||
The `intake serve` command runs an HTTP server that gives access to the feed.
|
||||
While the CLI can rely on normal filesystem access control to secure the database, this does not apply to HTTP.
|
||||
Instead, the web interface can be locked behind a password set via `intake passwd`.
|
||||
|
||||
## Development
|
||||
|
||||
Parity features
|
||||
|
||||
* [x] web feed supports item TTS
|
||||
* [ ] item punt
|
||||
* [ ] web feed paging
|
||||
* [ ] web fetch
|
||||
* [ ] set a working directory for item actions
|
||||
* [ ] crontab integration
|
||||
* [ ] source batching
|
||||
* [ ] add item from web
|
||||
* [ ] Nix build
|
||||
* [ ] NixOS module
|
||||
* [ ] NixOS vm demo
|
||||
* [ ] Nix flake templates
|
||||
|
||||
Future features
|
||||
|
||||
* [ ] on_delete triggers
|
||||
* [ ] manual item edits, CLI
|
||||
* [ ] manual item edits, web
|
||||
* [ ] source-level TTS
|
||||
* [ ] metric reporting
|
||||
* [ ] on action failure, create an error item with logs
|
||||
* [ ] items gracefully add new fields and `action` keys
|
||||
* [ ] arbitrary date punt
|
||||
* [ ] sort crontab entries
|
||||
* [ ] TUI feed view
|
||||
|
@ -109,7 +109,12 @@ func actionExecute(
|
||||
log.Fatalf("error: failed to get action: %v", err)
|
||||
}
|
||||
|
||||
newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute)
|
||||
postProcess, err := core.GetSourcePostProcessor(db, source)
|
||||
if err != nil {
|
||||
log.Fatalf("error: failed to get source post-processor: %v", err)
|
||||
}
|
||||
|
||||
newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute, postProcess)
|
||||
if err != nil {
|
||||
log.Fatalf("error executing %s: %v", action, err)
|
||||
}
|
||||
|
@ -27,6 +27,9 @@ if it doesn't exist, with a random id.`,
|
||||
stringArg(cmd, "body"),
|
||||
stringArg(cmd, "link"),
|
||||
intArg(cmd, "time"),
|
||||
intArg(cmd, "ttl"),
|
||||
intArg(cmd, "ttd"),
|
||||
intArg(cmd, "tts"),
|
||||
stringArg(cmd, "action"),
|
||||
)
|
||||
},
|
||||
@ -42,6 +45,9 @@ func init() {
|
||||
itemAddCmd.Flags().StringP("body", "b", "", "Item body")
|
||||
itemAddCmd.Flags().StringP("link", "l", "", "Item link")
|
||||
itemAddCmd.Flags().IntP("time", "m", 0, "Item time as a Unix timestamp")
|
||||
itemAddCmd.Flags().Int("ttl", 0, "Item time-to-live in seconds, relative to item creation date")
|
||||
itemAddCmd.Flags().Int("ttd", 0, "Item time-to-die in seconds, relative to item creation date")
|
||||
itemAddCmd.Flags().Int("tts", 0, "Item time-to-show in seconds, relative to item creation date")
|
||||
itemAddCmd.Flags().StringP("action", "x", "", "Item action data as JSON")
|
||||
}
|
||||
|
||||
@ -53,6 +59,9 @@ func itemAdd(
|
||||
body string,
|
||||
link string,
|
||||
time int,
|
||||
ttl int,
|
||||
ttd int,
|
||||
tts int,
|
||||
actions string,
|
||||
) {
|
||||
// Default to "default" source
|
||||
@ -85,6 +94,9 @@ func itemAdd(
|
||||
Body: body,
|
||||
Link: link,
|
||||
Time: time,
|
||||
Ttl: ttl,
|
||||
Ttd: ttd,
|
||||
Tts: tts,
|
||||
Action: itemActions,
|
||||
}}); err != nil {
|
||||
log.Fatalf("error: failed to add item: %s", err)
|
||||
|
@ -29,7 +29,10 @@ func init() {
|
||||
func migrate(listOnly bool) {
|
||||
db := openDb()
|
||||
|
||||
core.InitDatabase(db)
|
||||
err := core.InitDatabase(db)
|
||||
if err != nil {
|
||||
log.Fatalf("error: failed to init database: %v", err)
|
||||
}
|
||||
if listOnly {
|
||||
pending, err := core.GetPendingMigrations(db)
|
||||
if err != nil {
|
||||
@ -43,6 +46,9 @@ func migrate(listOnly bool) {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
core.MigrateDatabase(db)
|
||||
err = core.MigrateDatabase(db)
|
||||
if err != nil {
|
||||
log.Fatalf("error: failed to migrate database: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -58,7 +58,12 @@ func sourceFetch(source string, format string, dryRun bool) {
|
||||
log.Fatalf("error: failed to get fetch action: %v", err)
|
||||
}
|
||||
|
||||
items, newState, err := core.Execute(source, argv, envs, state, "", time.Minute)
|
||||
postProcess, err := core.GetSourcePostProcessor(db, source)
|
||||
if err != nil {
|
||||
log.Fatalf("error: failed to get source post-processor: %v", err)
|
||||
}
|
||||
|
||||
items, newState, err := core.Execute(source, argv, envs, state, "", time.Minute, postProcess)
|
||||
if err != nil {
|
||||
log.Fatalf("error: failed to execute fetch: %v", err)
|
||||
}
|
||||
@ -74,7 +79,7 @@ func sourceFetch(source string, format string, dryRun bool) {
|
||||
return
|
||||
}
|
||||
|
||||
added, deleted, err := core.UpdateWithFetchedItems(db, source, newState, items)
|
||||
added, deleted, err := core.UpdateWithFetchedItems(db, source, newState, items, time.Now())
|
||||
if err != nil {
|
||||
log.Fatalf("error: failed to update: %v", err)
|
||||
}
|
||||
|
@ -14,6 +14,9 @@ var sourceTestCmd = &cobra.Command{
|
||||
Short: "Test a fetch action",
|
||||
Long: fmt.Sprintf(`Execute a command as if it were a feed source's fetch action.
|
||||
|
||||
Source-level configuration that is normally set via environment variable,
|
||||
such as INTAKE_TTL, will not be applied by --env.
|
||||
|
||||
%s`, makeFormatHelpText()),
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
sourceTest(stringArrayArg(cmd, "env"), stringArg(cmd, "format"), getArgv(cmd, args))
|
||||
@ -30,7 +33,7 @@ func init() {
|
||||
func sourceTest(env []string, format string, cmd []string) {
|
||||
formatter := formatAs(format)
|
||||
|
||||
items, state, err := core.Execute("test", cmd, env, nil, "", time.Minute)
|
||||
items, state, err := core.Execute("test", cmd, env, nil, "", time.Minute, nil)
|
||||
log.Printf("returned %d items", len(items))
|
||||
log.Printf("wrote %d bytes of state", len(state))
|
||||
if err != nil {
|
||||
|
@ -45,8 +45,8 @@ func TestChannel(t *testing.T) {
|
||||
|
||||
// Items on both sources appear in the channel
|
||||
if err := AddItems(db, []Item{
|
||||
{"one", "a", 0, true, "", "", "", "", 0, nil},
|
||||
{"two", "b", 0, true, "", "", "", "", 0, nil},
|
||||
{Source: "one", Id: "a"},
|
||||
{Source: "two", Id: "b"},
|
||||
}); err != nil {
|
||||
t.Fatalf("failed to add items to one: %v", err)
|
||||
}
|
||||
|
@ -93,8 +93,8 @@ func TestDeleteSourceCascade(t *testing.T) {
|
||||
t.Fatalf("failed to add source2: %v", err)
|
||||
}
|
||||
if err := AddItems(db, []Item{
|
||||
{"source1", "item1", 0, true, "", "", "", "", 0, nil},
|
||||
{"source2", "item2", 0, true, "", "", "", "", 0, nil},
|
||||
{Source: "source1", Id: "item1"},
|
||||
{Source: "source2", Id: "item2"},
|
||||
}); err != nil {
|
||||
t.Fatalf("failed to add items: %v", err)
|
||||
}
|
||||
@ -118,7 +118,7 @@ func TestDeleteSourceCascade(t *testing.T) {
|
||||
t.Fatalf("Expected only 1 item after source delete, got %d", len(items))
|
||||
}
|
||||
|
||||
err = AddItems(db, []Item{{"source1", "item3", 0, true, "", "", "", "", 0, nil}})
|
||||
err = AddItems(db, []Item{{Source: "source1", Id: "item3"}})
|
||||
if err == nil {
|
||||
t.Fatal("Unexpected success adding item for nonexistent source")
|
||||
}
|
||||
|
@ -14,7 +14,13 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func readStdout(stdout io.ReadCloser, source string, items chan Item, cparse chan bool) {
|
||||
func readStdout(
|
||||
stdout io.ReadCloser,
|
||||
source string,
|
||||
postProcess func(item Item) Item,
|
||||
items chan Item,
|
||||
cparse chan bool,
|
||||
) {
|
||||
var item Item
|
||||
parseError := false
|
||||
scanout := bufio.NewScanner(stdout)
|
||||
@ -25,6 +31,9 @@ func readStdout(stdout io.ReadCloser, source string, items chan Item, cparse cha
|
||||
log.Printf("[%s: stdout] %s\n", source, strings.TrimSpace(string(data)))
|
||||
parseError = true
|
||||
} else {
|
||||
if postProcess != nil {
|
||||
item = postProcess(item)
|
||||
}
|
||||
item.Active = true // These fields aren't up to
|
||||
item.Created = 0 // the action to set and
|
||||
item.Source = source // shouldn't be overrideable
|
||||
@ -58,6 +67,7 @@ func Execute(
|
||||
state []byte,
|
||||
input string,
|
||||
timeout time.Duration,
|
||||
postProcess func(item Item) Item,
|
||||
) ([]Item, []byte, error) {
|
||||
log.Printf("executing %v", argv)
|
||||
|
||||
@ -119,7 +129,7 @@ func Execute(
|
||||
|
||||
// Routines handling the process i/o
|
||||
go writeStdin(stdin, input)
|
||||
go readStdout(stdout, source, cout, cparse)
|
||||
go readStdout(stdout, source, postProcess, cout, cparse)
|
||||
go readStderr(stderr, source, cerr)
|
||||
|
||||
// Kick off the command
|
||||
@ -165,13 +175,14 @@ func ExecuteItemAction(
|
||||
env []string,
|
||||
state []byte,
|
||||
timeout time.Duration,
|
||||
postProcess func(item Item) Item,
|
||||
) (Item, []byte, error) {
|
||||
itemJson, err := json.Marshal(item)
|
||||
if err != nil {
|
||||
return Item{}, nil, fmt.Errorf("failed to serialize item: %v", err)
|
||||
}
|
||||
|
||||
res, newState, err := Execute(item.Source, argv, env, state, string(itemJson), timeout)
|
||||
res, newState, err := Execute(item.Source, argv, env, state, string(itemJson), timeout, postProcess)
|
||||
if err != nil {
|
||||
return Item{}, nil, fmt.Errorf("failed to execute action for %s/%s: %v", item.Source, item.Id, err)
|
||||
}
|
||||
|
@ -7,22 +7,25 @@ import (
|
||||
|
||||
func TestExecute(t *testing.T) {
|
||||
assertLen := func(items []Item, length int) {
|
||||
t.Helper()
|
||||
if len(items) != length {
|
||||
t.Fatalf("Expected %d items, got %d", length, len(items))
|
||||
}
|
||||
}
|
||||
assertNil := func(err error) {
|
||||
t.Helper()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
assertNotNil := func(err error) {
|
||||
t.Helper()
|
||||
if err == nil {
|
||||
t.Fatal("expected err")
|
||||
}
|
||||
}
|
||||
execute := func(argv []string) ([]Item, error) {
|
||||
item, _, err := Execute("_", argv, nil, nil, "", time.Minute)
|
||||
item, _, err := Execute("_", argv, nil, nil, "", time.Minute, nil)
|
||||
return item, err
|
||||
}
|
||||
|
||||
@ -40,7 +43,7 @@ func TestExecute(t *testing.T) {
|
||||
assertLen(res, 0)
|
||||
|
||||
// Timeout
|
||||
res, _, err = Execute("_", []string{"sleep", "10"}, nil, nil, "", time.Millisecond)
|
||||
res, _, err = Execute("_", []string{"sleep", "10"}, nil, nil, "", time.Millisecond, nil)
|
||||
assertNotNil(err)
|
||||
assertLen(res, 0)
|
||||
|
||||
@ -53,7 +56,7 @@ func TestExecute(t *testing.T) {
|
||||
}
|
||||
|
||||
// Read from stdin
|
||||
res, _, err = Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, nil, "bar", time.Minute)
|
||||
res, _, err = Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, nil, "bar", time.Minute, nil)
|
||||
assertNil(err)
|
||||
assertLen(res, 1)
|
||||
if res[0].Id != "bar" {
|
||||
@ -61,7 +64,7 @@ func TestExecute(t *testing.T) {
|
||||
}
|
||||
|
||||
// Set env
|
||||
res, _, err = Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, nil, "", time.Minute)
|
||||
res, _, err = Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, nil, "", time.Minute, nil)
|
||||
assertNil(err)
|
||||
assertLen(res, 1)
|
||||
if res[0].Id != "baz" {
|
||||
@ -133,7 +136,7 @@ func TestExecute(t *testing.T) {
|
||||
|
||||
// Read state
|
||||
argv := []string{"sh", "-c", `cat $STATE_PATH | jq -cR '{id: "greeting", title: .} | .title = "Hello " + .title'`}
|
||||
res, _, err = Execute("_", argv, nil, []byte("world"), "", time.Minute)
|
||||
res, _, err = Execute("_", argv, nil, []byte("world"), "", time.Minute, nil)
|
||||
assertNil(err)
|
||||
assertLen(res, 1)
|
||||
if res[0].Title != "Hello world" {
|
||||
@ -142,10 +145,22 @@ func TestExecute(t *testing.T) {
|
||||
|
||||
// Write state
|
||||
argv = []string{"sh", "-c", `printf "Hello world" > $STATE_PATH; jq -cn '{id: "test"}'`}
|
||||
res, newState, err := Execute("_", argv, nil, nil, "", time.Minute)
|
||||
res, newState, err := Execute("_", argv, nil, nil, "", time.Minute, nil)
|
||||
assertNil(err)
|
||||
assertLen(res, 1)
|
||||
if string(newState) != "Hello world" {
|
||||
t.Fatalf("expected 'Hello world' from write state, got %s", string(newState))
|
||||
}
|
||||
|
||||
// Postprocessing function
|
||||
argv = []string{"jq", "-cn", `{id: "foo"}`}
|
||||
res, _, err = Execute("_", argv, nil, nil, "", time.Minute, func(item Item) Item {
|
||||
item.Ttl = 123456
|
||||
return item
|
||||
})
|
||||
assertNil(err)
|
||||
assertLen(res, 1)
|
||||
if res[0].Ttl != 123456 {
|
||||
t.Fatalf("expected ttl to be set to 123456, got %d", res[0].Ttl)
|
||||
}
|
||||
}
|
||||
|
29
core/item.go
29
core/item.go
@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Actions map[string]json.RawMessage
|
||||
@ -27,11 +28,37 @@ type Item struct {
|
||||
Body string `json:"body"`
|
||||
Link string `json:"link"`
|
||||
Time int `json:"time"`
|
||||
Ttl int `json:"ttl"`
|
||||
Ttd int `json:"ttd"`
|
||||
Tts int `json:"tts"`
|
||||
Action Actions `json:"action"`
|
||||
}
|
||||
|
||||
func (item Item) TtlTime() time.Time {
|
||||
return time.Unix(int64(item.Created)+int64(item.Ttl), 0)
|
||||
}
|
||||
|
||||
func (item Item) TtdTime() time.Time {
|
||||
return time.Unix(int64(item.Created)+int64(item.Ttd), 0)
|
||||
}
|
||||
|
||||
func (item Item) TtsTime() time.Time {
|
||||
return time.Unix(int64(item.Created)+int64(item.Tts), 0)
|
||||
}
|
||||
|
||||
func (item Item) Visible() bool {
|
||||
now := time.Now() // TODO pass this value in
|
||||
return item.Active && now.After(item.TtsTime())
|
||||
}
|
||||
|
||||
// Whether an item that no longer appears in a fetch can be deleted.
|
||||
func (item Item) Deletable() bool {
|
||||
func (item Item) Deletable(now time.Time) bool {
|
||||
if item.Ttl != 0 && item.TtlTime().After(now) {
|
||||
return false
|
||||
}
|
||||
if item.Ttd != 0 && item.TtdTime().Before(now) {
|
||||
return true
|
||||
}
|
||||
return !item.Active
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@ package core
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestItemFormatsExist(t *testing.T) {
|
||||
@ -49,3 +50,35 @@ func TestItemRoundTrip(t *testing.T) {
|
||||
t.Fatalf("items are not equal, err %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestItemLifecycleTimes(t *testing.T) {
|
||||
db := EphemeralDb(t)
|
||||
if err := AddSource(db, "_"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
||||
// active item with expired ttd is deleted
|
||||
add, del, err := UpdateWithFetchedItems(db, "_", nil, []Item{{Source: "_", Id: "_", Ttd: 1}}, now)
|
||||
if add != 1 || del != 0 || err != nil {
|
||||
t.Fatalf("unexpected update result: %d %d %v", add, del, err)
|
||||
}
|
||||
add, del, err = UpdateWithFetchedItems(db, "_", nil, []Item{}, now.Add(time.Second*5))
|
||||
if add != 0 || del != 1 || err != nil {
|
||||
t.Fatalf("unexpected update result: %d %d %v", add, del, err)
|
||||
}
|
||||
|
||||
// inactive item with live ttl is kept
|
||||
add, del, err = UpdateWithFetchedItems(db, "_", nil, []Item{{Source: "_", Id: "_", Ttl: 60}}, now)
|
||||
if add != 1 || del != 0 || err != nil {
|
||||
t.Fatalf("unexpected update result: %d %d %v", add, del, err)
|
||||
}
|
||||
if _, err = DeactivateItem(db, "_", "_"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
add, del, err = UpdateWithFetchedItems(db, "_", nil, []Item{}, now.Add(time.Second*5))
|
||||
if add != 0 || del != 0 || err != nil {
|
||||
t.Fatalf("unexpected update result: %d %d %v", add, del, err)
|
||||
}
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
@ -12,8 +13,8 @@ import (
|
||||
func AddItems(db DB, items []Item) error {
|
||||
return db.Transact(func(tx DB) error {
|
||||
stmt, err := tx.Prepare(`
|
||||
insert into items (source, id, active, title, author, body, link, time, action)
|
||||
values (?, ?, ?, ?, ?, ?, ?, ?, jsonb(?))
|
||||
insert into items (source, id, active, title, author, body, link, time, ttl, ttd, tts, action)
|
||||
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, jsonb(?))
|
||||
`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to prepare insert: %v", err)
|
||||
@ -23,7 +24,9 @@ func AddItems(db DB, items []Item) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal actions for %s/%s: %v", item.Source, item.Id, err)
|
||||
}
|
||||
_, err = stmt.Exec(item.Source, item.Id, true, item.Title, item.Author, item.Body, item.Link, item.Time, actions)
|
||||
_, err = stmt.Exec(
|
||||
item.Source, item.Id, true, item.Title, item.Author, item.Body, item.Link, item.Time, item.Ttl, item.Ttd, item.Tts, actions,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to insert %s/%s: %v", item.Source, item.Id, err)
|
||||
}
|
||||
@ -54,6 +57,15 @@ func BackfillItem(new *Item, old *Item) {
|
||||
if new.Title == "" {
|
||||
new.Title = old.Title
|
||||
}
|
||||
if new.Ttl == 0 {
|
||||
new.Tts = old.Tts
|
||||
}
|
||||
if new.Ttd == 0 {
|
||||
new.Ttd = old.Ttd
|
||||
}
|
||||
if new.Tts == 0 {
|
||||
new.Tts = old.Tts
|
||||
}
|
||||
}
|
||||
|
||||
func UpdateItems(db DB, items []Item) error {
|
||||
@ -66,6 +78,9 @@ func UpdateItems(db DB, items []Item) error {
|
||||
body = ?,
|
||||
link = ?,
|
||||
time = ?,
|
||||
ttl = ?,
|
||||
ttd = ?,
|
||||
tts = ?,
|
||||
action = jsonb(?)
|
||||
where source = ?
|
||||
and id = ?
|
||||
@ -78,7 +93,9 @@ func UpdateItems(db DB, items []Item) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal actions for %s/%s: %v", item.Source, item.Id, err)
|
||||
}
|
||||
_, err = stmt.Exec(item.Title, item.Author, item.Body, item.Link, item.Time, actions, item.Source, item.Id)
|
||||
_, err = stmt.Exec(
|
||||
item.Title, item.Author, item.Body, item.Link, item.Time, item.Ttl, item.Ttd, item.Tts, actions, item.Source, item.Id,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -132,7 +149,21 @@ func getItems(db DB, query string, args ...any) ([]Item, error) {
|
||||
var items []Item
|
||||
for rows.Next() {
|
||||
var item Item
|
||||
err = rows.Scan(&item.Source, &item.Id, &item.Created, &item.Active, &item.Title, &item.Author, &item.Body, &item.Link, &item.Time, &item.Action)
|
||||
err = rows.Scan(
|
||||
&item.Source,
|
||||
&item.Id,
|
||||
&item.Created,
|
||||
&item.Active,
|
||||
&item.Title,
|
||||
&item.Author,
|
||||
&item.Body,
|
||||
&item.Link,
|
||||
&item.Time,
|
||||
&item.Ttl,
|
||||
&item.Ttd,
|
||||
&item.Tts,
|
||||
&item.Action,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -146,7 +177,7 @@ func getItems(db DB, query string, args ...any) ([]Item, error) {
|
||||
|
||||
func GetItem(db DB, source string, id string) (Item, error) {
|
||||
items, err := getItems(db, `
|
||||
select source, id, created, active, title, author, body, link, time, json(action)
|
||||
select source, id, created, active, title, author, body, link, time, ttl, ttd, tts, json(action)
|
||||
from items
|
||||
where source = ?
|
||||
and id = ?
|
||||
@ -164,7 +195,7 @@ func GetItem(db DB, source string, id string) (Item, error) {
|
||||
func GetAllActiveItems(db DB) ([]Item, error) {
|
||||
return getItems(db, `
|
||||
select
|
||||
source, id, created, active, title, author, body, link, time, json(action)
|
||||
source, id, created, active, title, author, body, link, time, ttl, ttd, tts, json(action)
|
||||
from items
|
||||
where active <> 0
|
||||
order by case when time = 0 then created else time end, id
|
||||
@ -174,28 +205,30 @@ func GetAllActiveItems(db DB) ([]Item, error) {
|
||||
func GetAllItems(db DB) ([]Item, error) {
|
||||
return getItems(db, `
|
||||
select
|
||||
source, id, created, active, title, author, body, link, time, json(action)
|
||||
source, id, created, active, title, author, body, link, time, ttl, ttd, tts, json(action)
|
||||
from items
|
||||
order by case when time = 0 then created else time end, id
|
||||
`)
|
||||
}
|
||||
|
||||
func GetActiveItemsForSource(db DB, source string) ([]Item, error) {
|
||||
now := int(time.Now().Unix()) // TODO pass this value in
|
||||
return getItems(db, `
|
||||
select
|
||||
source, id, created, active, title, author, body, link, time, json(action)
|
||||
source, id, created, active, title, author, body, link, time, ttl, ttd, tts, json(action)
|
||||
from items
|
||||
where
|
||||
source = ?
|
||||
and active <> 0
|
||||
and created + tts < ?
|
||||
order by case when time = 0 then created else time end, id
|
||||
`, source)
|
||||
`, source, now)
|
||||
}
|
||||
|
||||
func GetAllItemsForSource(db DB, source string) ([]Item, error) {
|
||||
return getItems(db, `
|
||||
select
|
||||
source, id, created, active, title, author, body, link, time, json(action)
|
||||
source, id, created, active, title, author, body, link, time, ttl, ttd, tts, json(action)
|
||||
from items
|
||||
where
|
||||
source = ?
|
||||
@ -204,22 +237,24 @@ func GetAllItemsForSource(db DB, source string) ([]Item, error) {
|
||||
}
|
||||
|
||||
func GetActiveItemsForChannel(db DB, channel string) ([]Item, error) {
|
||||
now := int(time.Now().Unix()) // TODO pass this value in
|
||||
return getItems(db, `
|
||||
select
|
||||
i.source, i.id, i.created, i.active, i.title, i.author, i.body, i.link, i.time, json(i.action)
|
||||
i.source, i.id, i.created, i.active, i.title, i.author, i.body, i.link, i.time, i.ttl, i.ttd, i.tts, json(i.action)
|
||||
from items i
|
||||
join channels c on i.source = c.source
|
||||
where
|
||||
c.name = ?
|
||||
and i.active <> 0
|
||||
and i.created + i.tts < ?
|
||||
order by case when i.time = 0 then i.created else i.time end, i.id
|
||||
`, channel)
|
||||
`, channel, now)
|
||||
}
|
||||
|
||||
func GetAllItemsForChannel(db DB, channel string) ([]Item, error) {
|
||||
return getItems(db, `
|
||||
select
|
||||
i.source, i.id, i.created, i.active, i.title, i.author, i.body, i.link, i.time, json(i.action)
|
||||
i.source, i.id, i.created, i.active, i.title, i.author, i.body, i.link, i.time, i.ttl, i.ttd, i.tts, json(i.action)
|
||||
from items i
|
||||
join channels c on i.source = c.source
|
||||
where
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
)
|
||||
|
||||
func AssertItemIs(t *testing.T, item Item, expected string) {
|
||||
t.Helper()
|
||||
actual := fmt.Sprintf(
|
||||
"%s/%s/%t/%s/%s/%s/%s/%d",
|
||||
item.Source,
|
||||
@ -31,8 +32,8 @@ func TestAddItem(t *testing.T) {
|
||||
}
|
||||
|
||||
if err := AddItems(db, []Item{
|
||||
{"test", "one", 0, true, "", "", "", "", 0, nil},
|
||||
{"test", "two", 0, true, "title", "author", "body", "link", 123456, nil},
|
||||
{Source: "test", Id: "one", Active: true},
|
||||
{"test", "two", 0, true, "title", "author", "body", "link", 123456, 1, 2, 3, nil},
|
||||
}); err != nil {
|
||||
t.Fatalf("failed to add items: %v", err)
|
||||
}
|
||||
|
@ -22,6 +22,15 @@ func SetPassword(db DB, password string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func HasPassword(db DB) (bool, error) {
|
||||
var i int
|
||||
err := db.QueryRow("select count(*) from password").Scan(&i)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return i > 0, nil
|
||||
}
|
||||
|
||||
func CheckPassword(db DB, password string) (bool, error) {
|
||||
var hash string
|
||||
err := db.QueryRow("select hash from password limit 1").Scan(&hash)
|
||||
|
@ -1,6 +1,8 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
@ -68,22 +70,78 @@ func SetState(db DB, source string, state []byte) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func getSourceTtx(db DB, source string, env string) (int, error) {
|
||||
row := db.QueryRow(`
|
||||
select value
|
||||
from envs
|
||||
where source = ?
|
||||
and name = ?
|
||||
`, source, env)
|
||||
var ttx int
|
||||
if err := row.Scan(&ttx); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return 0, nil
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
return ttx, nil
|
||||
}
|
||||
|
||||
func GetSourcePostProcessor(db DB, source string) (func(item Item) Item, error) {
|
||||
ttl, err := getSourceTtx(db, source, "INTAKE_TTL")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ttd, err := getSourceTtx(db, source, "INTAKE_TTD")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tts, err := getSourceTtx(db, source, "INTAKE_TTS")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return func(item Item) Item {
|
||||
if ttl != 0 {
|
||||
item.Ttl = ttl
|
||||
}
|
||||
if ttd != 0 {
|
||||
item.Ttd = ttd
|
||||
}
|
||||
if tts != 0 {
|
||||
item.Tts = tts
|
||||
}
|
||||
return item
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Given the results of a fetch, add new items, update existing items, and delete expired items.
|
||||
//
|
||||
// Returns the number of new and deleted items on success.
|
||||
func UpdateWithFetchedItems(db DB, source string, state []byte, items []Item) (int, int, error) {
|
||||
func UpdateWithFetchedItems(
|
||||
db DB,
|
||||
source string,
|
||||
state []byte,
|
||||
items []Item,
|
||||
now time.Time,
|
||||
) (int, int, error) {
|
||||
var new int
|
||||
var del int
|
||||
var err error
|
||||
err = db.Transact(func(tx DB) error {
|
||||
new, del, err = updateWithFetchedItemsTx(tx, source, state, items)
|
||||
new, del, err = updateWithFetchedItemsTx(tx, source, state, items, now)
|
||||
return err
|
||||
})
|
||||
return new, del, err
|
||||
}
|
||||
|
||||
// Implementation logic for [UpdateWithFetchedItems], which executes this inside a transaction.
|
||||
func updateWithFetchedItemsTx(db DB, source string, state []byte, items []Item) (int, int, error) {
|
||||
func updateWithFetchedItemsTx(
|
||||
db DB,
|
||||
source string,
|
||||
state []byte,
|
||||
items []Item,
|
||||
now time.Time,
|
||||
) (int, int, error) {
|
||||
// Get the existing items
|
||||
existingItems, err := GetAllItemsForSource(db, source)
|
||||
if err != nil {
|
||||
@ -125,6 +183,11 @@ func updateWithFetchedItemsTx(db DB, source string, state []byte, items []Item)
|
||||
return 0, 0, fmt.Errorf("failed to get envs for %s: %v", source, err)
|
||||
}
|
||||
|
||||
postProcess, err := GetSourcePostProcessor(db, source)
|
||||
if err != nil {
|
||||
return 0, 0, fmt.Errorf("failed to get post-processor for %s: %v", source, err)
|
||||
}
|
||||
|
||||
// If the source has an on-create trigger, run it for each new item
|
||||
// On-create errors are ignored to avoid failing the fetch
|
||||
onCreateArgv, err := GetArgvForAction(db, source, "on_create")
|
||||
@ -132,7 +195,7 @@ func updateWithFetchedItemsTx(db DB, source string, state []byte, items []Item)
|
||||
var updatedNewItems []Item
|
||||
for _, item := range newItems {
|
||||
var updatedItem Item
|
||||
updatedItem, state, err = ExecuteItemAction(item, onCreateArgv, envs, state, time.Minute)
|
||||
updatedItem, state, err = ExecuteItemAction(item, onCreateArgv, envs, state, time.Minute, postProcess)
|
||||
if err != nil {
|
||||
log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err)
|
||||
}
|
||||
@ -154,7 +217,7 @@ func updateWithFetchedItemsTx(db DB, source string, state []byte, items []Item)
|
||||
// Check expired items for deletion
|
||||
idsToDelete := map[string]bool{}
|
||||
for _, item := range existingItems {
|
||||
if expiredIds[item.Id] && item.Deletable() {
|
||||
if expiredIds[item.Id] && item.Deletable(now) {
|
||||
idsToDelete[item.Id] = true
|
||||
}
|
||||
}
|
||||
|
@ -50,20 +50,24 @@ func TestUpdateSourceAddAndDelete(t *testing.T) {
|
||||
if err := AddSource(db, "test"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
update := func(items []Item) (int, int, error) {
|
||||
t.Helper()
|
||||
return UpdateWithFetchedItems(db, "test", nil, items, time.Now())
|
||||
}
|
||||
|
||||
a := Item{Source: "test", Id: "a"}
|
||||
add, del, err := UpdateWithFetchedItems(db, "test", nil, []Item{a})
|
||||
add, del, err := update([]Item{a})
|
||||
if add != 1 || del != 0 || err != nil {
|
||||
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
|
||||
}
|
||||
|
||||
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a})
|
||||
add, del, err = update([]Item{a})
|
||||
if add != 0 || del != 0 || err != nil {
|
||||
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
|
||||
}
|
||||
|
||||
b := Item{Source: "test", Id: "b"}
|
||||
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a, b})
|
||||
add, del, err = update([]Item{a, b})
|
||||
if add != 1 || del != 0 || err != nil {
|
||||
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
|
||||
}
|
||||
@ -71,17 +75,17 @@ func TestUpdateSourceAddAndDelete(t *testing.T) {
|
||||
if _, err = DeactivateItem(db, "test", "a"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a, b})
|
||||
add, del, err = update([]Item{a, b})
|
||||
if add != 0 || del != 0 || err != nil {
|
||||
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
|
||||
}
|
||||
|
||||
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{b})
|
||||
add, del, err = update([]Item{b})
|
||||
if add != 0 || del != 1 || err != nil {
|
||||
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
|
||||
}
|
||||
|
||||
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{b})
|
||||
add, del, err = update([]Item{b})
|
||||
if add != 0 || del != 0 || err != nil {
|
||||
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
|
||||
}
|
||||
@ -97,7 +101,7 @@ func TestUpdateSourceTransaction(t *testing.T) {
|
||||
b := Item{Source: "s", Id: "b"}
|
||||
|
||||
// Add and deactivate a so it will be deleted on next fetch without it
|
||||
if add, _, err := UpdateWithFetchedItems(db, "s", nil, []Item{a}); add != 1 || err != nil {
|
||||
if add, _, err := UpdateWithFetchedItems(db, "s", nil, []Item{a}, time.Now()); add != 1 || err != nil {
|
||||
t.Fatalf("expected 1 add, got %d and err %v", add, err)
|
||||
}
|
||||
if _, err := DeactivateItem(db, "s", "a"); err != nil {
|
||||
@ -114,7 +118,7 @@ func TestUpdateSourceTransaction(t *testing.T) {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
add, del, err := UpdateWithFetchedItems(fdb, "s", nil, []Item{b})
|
||||
add, del, err := UpdateWithFetchedItems(fdb, "s", nil, []Item{b}, time.Now())
|
||||
if add != 0 || del != 0 || err == nil {
|
||||
t.Fatalf("expected failure, got %d %d %v", add, del, err)
|
||||
}
|
||||
@ -142,7 +146,8 @@ func TestOnCreateAction(t *testing.T) {
|
||||
}
|
||||
|
||||
execute := func(argv []string) []Item {
|
||||
items, _, err := Execute("test", argv, nil, nil, "", time.Minute)
|
||||
t.Helper()
|
||||
items, _, err := Execute("test", argv, nil, nil, "", time.Minute, nil)
|
||||
if err != nil {
|
||||
t.Fatal("unexpected error executing test fetch")
|
||||
}
|
||||
@ -152,13 +157,20 @@ func TestOnCreateAction(t *testing.T) {
|
||||
return items
|
||||
}
|
||||
|
||||
update := func(items []Item) (int, int, error) {
|
||||
t.Helper()
|
||||
return UpdateWithFetchedItems(db, "test", nil, items, time.Now())
|
||||
}
|
||||
|
||||
onCreate := func(argv []string) {
|
||||
t.Helper()
|
||||
if err := UpdateAction(db, "test", "on_create", argv); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
getItem := func(id string) Item {
|
||||
t.Helper()
|
||||
item, err := GetItem(db, "test", id)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -169,7 +181,7 @@ func TestOnCreateAction(t *testing.T) {
|
||||
// Noop on_create works
|
||||
onCreate([]string{"tee"})
|
||||
items := execute([]string{"jq", "-cn", `{id: "one"}`})
|
||||
add, _, err := UpdateWithFetchedItems(db, "test", nil, items)
|
||||
add, _, err := update(items)
|
||||
if add != 1 || err != nil {
|
||||
t.Fatal("failed update with noop oncreate")
|
||||
}
|
||||
@ -185,7 +197,7 @@ func TestOnCreateAction(t *testing.T) {
|
||||
if items[0].Title != "Hello, World" {
|
||||
t.Fatal("unexpected title")
|
||||
}
|
||||
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
|
||||
add, _, err = update(items)
|
||||
if add != 1 || err != nil {
|
||||
t.Fatal("failed update with alter oncreate")
|
||||
}
|
||||
@ -200,7 +212,7 @@ func TestOnCreateAction(t *testing.T) {
|
||||
if items[0].Link != "" {
|
||||
t.Fatal("unexpected link")
|
||||
}
|
||||
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
|
||||
add, _, err = update(items)
|
||||
if add != 1 || err != nil {
|
||||
t.Fatal("failed update with augment oncreate")
|
||||
}
|
||||
@ -215,7 +227,7 @@ func TestOnCreateAction(t *testing.T) {
|
||||
if items[0].Link != "gopher://go.dev" {
|
||||
t.Fatal("missing link")
|
||||
}
|
||||
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
|
||||
add, _, err = update(items)
|
||||
if add != 1 || err != nil {
|
||||
t.Fatal("failed update with attempted deletion oncreate")
|
||||
}
|
||||
@ -226,7 +238,7 @@ func TestOnCreateAction(t *testing.T) {
|
||||
// item is created if on_create fails
|
||||
onCreate([]string{"false"})
|
||||
items = execute([]string{"jq", "-cn", `{id: "five"}`})
|
||||
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
|
||||
add, _, err = update(items)
|
||||
if add != 1 || err != nil {
|
||||
t.Fatal("failed update with failing oncreate")
|
||||
}
|
||||
@ -240,7 +252,7 @@ func TestOnCreateAction(t *testing.T) {
|
||||
if items[0].Title != "before" {
|
||||
t.Fatal("unexpected title")
|
||||
}
|
||||
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
|
||||
add, _, err = update(items)
|
||||
if add != 1 || err != nil {
|
||||
t.Fatal("failed update with bad exit code oncreate")
|
||||
}
|
||||
@ -251,7 +263,7 @@ func TestOnCreateAction(t *testing.T) {
|
||||
// on_create can't change id, active, or created
|
||||
onCreate([]string{"jq", "-c", `.id = "seven"; .active = false; .created = 123456`})
|
||||
items = execute([]string{"jq", "-cn", `{id: "seven"}`})
|
||||
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
|
||||
add, _, err = update(items)
|
||||
if add != 1 || err != nil {
|
||||
t.Fatal("failed update with invalid field changes oncreate")
|
||||
}
|
||||
@ -287,3 +299,26 @@ func TestSourceState(t *testing.T) {
|
||||
t.Fatalf("expected hello, world, got %s", state)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSourceTtx(t *testing.T) {
|
||||
db := EphemeralDb(t)
|
||||
if err := AddSource(db, "s"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := SetEnvs(db, "s", []string{
|
||||
"INTAKE_TTL=30",
|
||||
"INTAKE_TTD=60",
|
||||
"INTAKE_TTS=90",
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
postProcess, err := GetSourcePostProcessor(db, "s")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
before := Item{Source: "s", Id: "i"}
|
||||
after := postProcess(before)
|
||||
if after.Ttl != 30 || after.Ttd != 60 || after.Tts != 90 {
|
||||
t.Fatalf("Missing value after postProcess: ttl = %d, ttd = %d, tts = %d", after.Ttl, after.Ttd, after.Tts)
|
||||
}
|
||||
}
|
||||
|
@ -27,6 +27,9 @@ create table items(
|
||||
body text,
|
||||
link text,
|
||||
time int,
|
||||
ttl int,
|
||||
ttd int,
|
||||
tts int,
|
||||
action blob,
|
||||
primary key (source, id),
|
||||
foreign key (source) references sources (name) on delete cascade
|
||||
@ -42,3 +45,8 @@ create table password(
|
||||
hash text,
|
||||
unique (id) on conflict replace
|
||||
) strict;
|
||||
create table sessions(
|
||||
id text not null,
|
||||
expires int default 0,
|
||||
primary key (id)
|
||||
) strict;
|
||||
|
@ -8,18 +8,34 @@ tmp/intake migrate
|
||||
|
||||
tmp/intake source add -s feedtest
|
||||
tmp/intake item add -s feedtest --id "this-item-has-no-title"
|
||||
tmp/intake item add -s feedtest --title "This item has only a title"
|
||||
tmp/intake item add -s feedtest --title "Title and body" --body "This is the item body"
|
||||
tmp/intake item add -s feedtest --title "Title and link" --link "#"
|
||||
tmp/intake item add -s feedtest --title "Title, link, body" --link "#" --body "This is the body"
|
||||
tmp/intake item add -s feedtest --title "<b>HTML title</b>" --link "#" --body "<i>HTML body</i>"
|
||||
tmp/intake item add -s feedtest --title "Title and author" --author "Authorname"
|
||||
tmp/intake item add -s feedtest --title "Title, author, time" --author "Authorname" --time 1700000000
|
||||
tmp/intake item add -s feedtest --title "Title, time" --time 1737780324
|
||||
tmp/intake item add -s feedtest --title "Title, author, body" --author "Authorname" --body "Hello body!"
|
||||
tmp/intake item add -s feedtest --title "Title, author, time, body" --author "Authorname" --time 1700000000 --body "Hello body!"
|
||||
tmp/intake item add -s feedtest --title "Title, time, body" --time 1737780324 --body "Hello, body!"
|
||||
tmp/intake item add -s feedtest --id a --title "This item has only a title"
|
||||
tmp/intake item add -s feedtest --id b --title "Title and body" --body "This is the item body"
|
||||
tmp/intake item add -s feedtest --id c --title "Title and link" --link "#"
|
||||
tmp/intake item add -s feedtest --id d --title "Title, link, body" --link "#" --body "This is the body"
|
||||
tmp/intake item add -s feedtest --id e --title "<b>HTML title</b>" --link "#" --body "<i>HTML body</i>"
|
||||
tmp/intake item add -s feedtest --id f --title "Title and author" --author "Authorname"
|
||||
tmp/intake item add -s feedtest --id g --title "Title, author, time" --author "Authorname" --time 1700000000
|
||||
tmp/intake item add -s feedtest --id h --title "Title, time" --time 1737780324
|
||||
tmp/intake item add -s feedtest --id i --title "Title, author, body" --author "Authorname" --body "Hello body!"
|
||||
tmp/intake item add -s feedtest --id j --title "Title, author, time, body" --author "Authorname" --time 1700000000 --body "Hello body!"
|
||||
tmp/intake item add -s feedtest --id k --title "Title, time, body" --time 1737780324 --body "Hello, body!"
|
||||
tmp/intake item add -s feedtest --id l --title "TTL 30s" --ttl 30
|
||||
tmp/intake item add -s feedtest --id m --title "TTL 10d" --ttl 864000
|
||||
tmp/intake item add -s feedtest --id n --title "TTD 30s" --ttd 30
|
||||
tmp/intake item add -s feedtest --id o --title "TTS 30s" --tts 30
|
||||
tmp/intake item add -s feedtest --id p --title "TTS 10d" --tts 864000
|
||||
tmp/intake item add -s feedtest --id q --title "TTS -10d" --tts "-864000"
|
||||
|
||||
tmp/intake source add -s spook
|
||||
tmp/intake action add -s spook -a spookier -- jq -c '.title = .title + "o"'
|
||||
tmp/intake item add -s spook --id boo --title "Boo" --action '{"spookier": true}'
|
||||
|
||||
tmp/intake source add -s nothing
|
||||
|
||||
tmp/intake channel add -c all -s feedtest
|
||||
tmp/intake channel add -c all -s spook
|
||||
|
||||
tmp/intake channel add -c none -s nothing
|
||||
|
||||
echo "hello" | tmp/intake passwd --stdin
|
||||
echo "hello" | tmp/intake passwd --stdin --verify
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
{{ define "content" -}}
|
||||
<article class="center">
|
||||
<span class="item-title">
|
||||
<span class="feed-controls">
|
||||
<a href="/">Home</a>
|
||||
[<a href="?inactive=0">Active</a> | <a href="?inactive=1">All</a>]
|
||||
</span>
|
||||
@ -23,7 +23,7 @@
|
||||
|
||||
{{ else }}
|
||||
<article class="center">
|
||||
<span class="item-title">Feed is empty</span>
|
||||
<span class="feed-controls">Feed is empty</span>
|
||||
</article>
|
||||
{{ end }}
|
||||
{{/* end if .Items */}}
|
||||
|
@ -3,7 +3,7 @@
|
||||
{{ define "content" -}}
|
||||
<article>
|
||||
<details open>
|
||||
<summary><span class="item-title">Channels</span></summary>
|
||||
<summary><span class="feed-controls">Channels</span></summary>
|
||||
{{ if .Channels }}
|
||||
{{ range .Channels }}
|
||||
<p><a href="/channel/{{ .Name }}">
|
||||
@ -22,7 +22,7 @@
|
||||
|
||||
<article>
|
||||
<details>
|
||||
<summary><span class="item-title">Sources</span></summary>
|
||||
<summary><span class="feed-controls">Sources</span></summary>
|
||||
{{ if .Sources }}
|
||||
<table class="intake-sources">
|
||||
{{ range .Sources }}
|
||||
|
@ -3,6 +3,7 @@ package html
|
||||
import (
|
||||
"embed"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"html/template"
|
||||
"io"
|
||||
"log"
|
||||
@ -15,11 +16,36 @@ func rawHtml(str string) template.HTML {
|
||||
return template.HTML(str)
|
||||
}
|
||||
|
||||
func tsToDate(t int) string {
|
||||
tm := time.Unix(int64(t), 0).UTC()
|
||||
func dateFormat(tm time.Time) string {
|
||||
return tm.Format(time.DateTime)
|
||||
}
|
||||
|
||||
func tsToDate(t int) time.Time {
|
||||
return time.Unix(int64(t), 0).UTC()
|
||||
}
|
||||
|
||||
func until(tm time.Time) string {
|
||||
now := time.Now() // TODO pass this value in
|
||||
dur := tm.Sub(now).Round(time.Second)
|
||||
|
||||
var format string
|
||||
if dur < 0 {
|
||||
format = "%s ago"
|
||||
dur = dur.Abs()
|
||||
} else {
|
||||
format = "in %s"
|
||||
}
|
||||
|
||||
var length string
|
||||
if dur.Hours() > 24 {
|
||||
length = fmt.Sprintf("%dd", int(dur.Hours()/24))
|
||||
} else {
|
||||
length = dur.String()
|
||||
}
|
||||
|
||||
return fmt.Sprintf(format, length)
|
||||
}
|
||||
|
||||
func massDeactivateVals(items []core.Item) string {
|
||||
var shorts []string
|
||||
for _, item := range items {
|
||||
@ -37,7 +63,9 @@ func massDeactivateVals(items []core.Item) string {
|
||||
|
||||
var funcs = template.FuncMap{
|
||||
"raw": rawHtml,
|
||||
"dateFormat": dateFormat,
|
||||
"tsToDate": tsToDate,
|
||||
"until": until,
|
||||
"massDeacVars": massDeactivateVals,
|
||||
}
|
||||
|
||||
@ -71,8 +99,10 @@ type HomeData struct {
|
||||
Sources []SourceData
|
||||
}
|
||||
|
||||
func Home(writer io.Writer, data HomeData) error {
|
||||
return home.Execute(writer, data)
|
||||
func Home(writer io.Writer, data HomeData) {
|
||||
if err := home.Execute(writer, data); err != nil {
|
||||
log.Printf("error: failed to render home: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
var feed = load("feed.html", "item.html")
|
||||
@ -81,8 +111,10 @@ type FeedData struct {
|
||||
Items []core.Item
|
||||
}
|
||||
|
||||
func Feed(writer io.Writer, data FeedData) error {
|
||||
return feed.Execute(writer, data)
|
||||
func Feed(writer io.Writer, data FeedData) {
|
||||
if err := feed.Execute(writer, data); err != nil {
|
||||
log.Printf("error: failed to render feed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
var item = load("itemPage.html", "item.html")
|
||||
@ -92,6 +124,22 @@ type ItemData struct {
|
||||
Open bool
|
||||
}
|
||||
|
||||
func Item(writer io.Writer, data ItemData) error {
|
||||
return item.Execute(writer, data)
|
||||
func Item(writer io.Writer, data ItemData) {
|
||||
if err := item.Execute(writer, data); err != nil {
|
||||
log.Printf("error: failed to render item: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
var login = load("login.html")
|
||||
|
||||
type LoginData struct {
|
||||
Error string
|
||||
}
|
||||
|
||||
func Login(writer io.Writer, data LoginData) error {
|
||||
err := login.Execute(writer, data)
|
||||
if err != nil {
|
||||
log.Printf("render error: %v", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -10,9 +10,19 @@ article {
|
||||
word-break: break-word;
|
||||
display: flow-root;
|
||||
}
|
||||
.feed-controls {
|
||||
font-size: 1.2em;
|
||||
}
|
||||
.item-title {
|
||||
font-size: 1.4em;
|
||||
}
|
||||
.item-title::before, details[open] summary .item-title::before {
|
||||
content: "= ";
|
||||
opacity: 0.3;
|
||||
}
|
||||
summary .item-title::before {
|
||||
content: "> ";
|
||||
}
|
||||
.item-button {
|
||||
font-size: 1em;
|
||||
float:right;
|
||||
|
@ -28,7 +28,7 @@
|
||||
<span class="item-title">{{ or .Title .Id | raw }}</span>
|
||||
{{- end }}
|
||||
|
||||
{{ define "item-class" -}}{{ if not .Active }}strikethru {{ end }}{{ if not .Active }}fade{{ end }}{{- end}}
|
||||
{{ define "item-class" -}}{{ if not .Active }}strikethru {{ end }}{{ if not .Visible }}fade{{ end }}{{- end}}
|
||||
|
||||
{{ define "item" -}}
|
||||
<article
|
||||
@ -56,14 +56,17 @@
|
||||
{{ if or .Author .Time }}
|
||||
<span class="item-info">
|
||||
{{ .Author }}
|
||||
{{ .Time | tsToDate }}
|
||||
<span title="{{ .Time | tsToDate | until }}">{{ .Time | tsToDate | dateFormat }}</span>
|
||||
</span><br>
|
||||
{{ end -}}
|
||||
|
||||
{{- /* source/id/created footer line */ -}}
|
||||
<span class="item-info">
|
||||
<a href="/item/{{ .Source }}/{{ .Id }}">{{ .Source }}/{{ .Id }}</a>
|
||||
{{ .Created | tsToDate }}
|
||||
<span title="{{ .Created | tsToDate | until }}">{{ .Created | tsToDate | dateFormat }}</span>
|
||||
{{ if .Ttl }}<span title="TTL {{ .TtlTime | dateFormat }}, {{ .TtlTime | until }}">[L]</span>{{ end }}
|
||||
{{ if .Ttd }}<span title="TTD {{ .TtdTime | dateFormat }}, {{ .TtdTime | until }}">[D]</span>{{ end }}
|
||||
{{ if .Tts }}<span title="TTS {{ .TtsTime | dateFormat }}, {{ .TtsTime | until }}">[S]</span>{{ end }}
|
||||
</span>
|
||||
</article>
|
||||
{{ end -}}
|
||||
|
16
web/html/login.html
Normal file
16
web/html/login.html
Normal file
@ -0,0 +1,16 @@
|
||||
{{ define "title" }}Intake - Login{{ end }}
|
||||
|
||||
{{ define "content" -}}
|
||||
<article class="center">
|
||||
<form method="post" action="/login">
|
||||
<p>
|
||||
<input name="password" type="password"/>
|
||||
</p>
|
||||
<button
|
||||
hx-post="/login"
|
||||
hx-target="#errors"
|
||||
hx-swap="innerHTML"
|
||||
>Submit</button>
|
||||
</form>
|
||||
<p id="errors">{{ .Error }}</p>
|
||||
{{ end }}
|
@ -72,7 +72,12 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute)
|
||||
postProcess, err := core.GetSourcePostProcessor(env.db, source)
|
||||
if err != nil {
|
||||
log.Fatalf("error: failed to get source post-processor: %v", err)
|
||||
}
|
||||
|
||||
newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute, postProcess)
|
||||
if err != nil {
|
||||
http.Error(writer, err.Error(), 500)
|
||||
return
|
||||
|
123
web/login.go
Normal file
123
web/login.go
Normal file
@ -0,0 +1,123 @@
|
||||
package web
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/Jaculabilis/intake/core"
|
||||
"github.com/Jaculabilis/intake/web/html"
|
||||
)
|
||||
|
||||
var AuthCookieName string = "intake_auth"
|
||||
var AuthDuration time.Duration = time.Hour * 24 * 7
|
||||
|
||||
func newSession(db core.DB) (string, error) {
|
||||
bytes := make([]byte, 32)
|
||||
_, err := rand.Read(bytes)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
session := fmt.Sprintf("%x", bytes)
|
||||
expires := int(time.Now().Add(AuthDuration).Unix())
|
||||
_, err = db.Exec(`
|
||||
insert into sessions (id, expires)
|
||||
values (?, ?)
|
||||
`, session, expires)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return session, nil
|
||||
}
|
||||
|
||||
func checkSession(db core.DB, session string) (bool, error) {
|
||||
row := db.QueryRow(`
|
||||
select expires
|
||||
from sessions
|
||||
where id = ?
|
||||
`, session)
|
||||
var expires int
|
||||
if err := row.Scan(&expires); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
expiration := time.Unix(int64(expires), 0).UTC()
|
||||
if time.Now().After(expiration) {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func renderLoginWithErrorMessage(writer http.ResponseWriter, req *http.Request, message string) {
|
||||
// If an htmx interaction caused the auth error, refresh the page to get the login rendered
|
||||
if req.Header.Get("HX-Request") != "" {
|
||||
writer.Header()["HX-Refresh"] = []string{"true"}
|
||||
writer.WriteHeader(http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
|
||||
data := html.LoginData{Error: message}
|
||||
if err := html.Login(writer, data); err != nil {
|
||||
log.Printf("render error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (env *Env) authed(handler http.HandlerFunc) http.HandlerFunc {
|
||||
return func(writer http.ResponseWriter, req *http.Request) {
|
||||
required, err := core.HasPassword(env.db)
|
||||
if err != nil {
|
||||
renderLoginWithErrorMessage(writer, req, fmt.Sprintf("error: failed to check for password: %v", err))
|
||||
return
|
||||
}
|
||||
if required {
|
||||
cookie, err := req.Cookie(AuthCookieName)
|
||||
if errors.Is(err, http.ErrNoCookie) {
|
||||
renderLoginWithErrorMessage(writer, req, "Your session is expired or invalid")
|
||||
return
|
||||
}
|
||||
if valid, err := checkSession(env.db, cookie.Value); !valid || err != nil {
|
||||
renderLoginWithErrorMessage(writer, req, "Your session is expired or invalid")
|
||||
return
|
||||
}
|
||||
}
|
||||
handler(writer, req)
|
||||
}
|
||||
}
|
||||
|
||||
func (env *Env) login(writer http.ResponseWriter, req *http.Request) {
|
||||
if err := req.ParseForm(); err != nil {
|
||||
http.Error(writer, fmt.Sprintf("error: failed to parse form: %v", err), http.StatusOK)
|
||||
return
|
||||
}
|
||||
password := req.PostForm.Get("password")
|
||||
|
||||
pass, err := core.CheckPassword(env.db, password)
|
||||
if err != nil {
|
||||
http.Error(writer, fmt.Sprintf("error: failed to check password: %v", err), http.StatusOK)
|
||||
return
|
||||
}
|
||||
if !pass {
|
||||
http.Error(writer, "Incorrect password", http.StatusOK)
|
||||
return
|
||||
}
|
||||
|
||||
session, err := newSession(env.db)
|
||||
if err != nil {
|
||||
http.Error(writer, fmt.Sprintf("error: failed to start session: %v", err), http.StatusOK)
|
||||
return
|
||||
}
|
||||
|
||||
cookie := http.Cookie{
|
||||
Name: AuthCookieName,
|
||||
Value: session,
|
||||
}
|
||||
http.SetCookie(writer, &cookie)
|
||||
writer.Header()["HX-Refresh"] = []string{"true"}
|
||||
writer.WriteHeader(http.StatusNoContent)
|
||||
}
|
30
web/main.go
30
web/main.go
@ -19,23 +19,31 @@ func logged(handler http.HandlerFunc) http.HandlerFunc {
|
||||
}
|
||||
}
|
||||
|
||||
func handleFunc(pattern string, handler http.HandlerFunc) {
|
||||
http.HandleFunc(pattern, logged(handler))
|
||||
func handleFunc(
|
||||
pattern string,
|
||||
handler http.HandlerFunc,
|
||||
middlewares ...func(http.HandlerFunc) http.HandlerFunc,
|
||||
) {
|
||||
for _, middleware := range middlewares {
|
||||
handler = middleware(handler)
|
||||
}
|
||||
http.HandleFunc(pattern, handler)
|
||||
}
|
||||
|
||||
func RunServer(db core.DB, addr string, port string) {
|
||||
env := &Env{db}
|
||||
bind := net.JoinHostPort(addr, port)
|
||||
|
||||
handleFunc("GET /", env.getRoot)
|
||||
handleFunc("GET /style.css", env.getStyle)
|
||||
handleFunc("GET /htmx.org@2.0.4.js", env.getScript)
|
||||
handleFunc("GET /source/{source}", env.getSource)
|
||||
handleFunc("GET /channel/{channel}", env.getChannel)
|
||||
handleFunc("GET /item/{source}/{id}", env.getItem)
|
||||
handleFunc("DELETE /item/{source}/{id}", env.deleteItem)
|
||||
handleFunc("POST /item/{source}/{id}/action/{action}", env.doAction)
|
||||
handleFunc("POST /mass-deactivate", env.massDeactivate)
|
||||
handleFunc("GET /", env.getRoot, env.authed, logged)
|
||||
handleFunc("GET /style.css", env.getStyle, logged)
|
||||
handleFunc("GET /htmx.org@2.0.4.js", env.getScript, logged)
|
||||
handleFunc("POST /login", env.login, logged)
|
||||
handleFunc("GET /source/{source}", env.getSource, env.authed, logged)
|
||||
handleFunc("GET /channel/{channel}", env.getChannel, env.authed, logged)
|
||||
handleFunc("GET /item/{source}/{id}", env.getItem, env.authed, logged)
|
||||
handleFunc("DELETE /item/{source}/{id}", env.deleteItem, env.authed, logged)
|
||||
handleFunc("POST /item/{source}/{id}/action/{action}", env.doAction, env.authed, logged)
|
||||
handleFunc("POST /mass-deactivate", env.massDeactivate, env.authed, logged)
|
||||
|
||||
log.Fatal(http.ListenAndServe(bind, nil))
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user