Compare commits
No commits in common. "8d9dcec0387e4f2e436ea1135d69a7335faafc1f" and "cafa1a6665500713828398eaa36e5a6f800d3425" have entirely different histories.
8d9dcec038
...
cafa1a6665
@ -106,7 +106,7 @@ Parity features
|
|||||||
* [ ] crontab integration
|
* [ ] crontab integration
|
||||||
* [ ] source batching
|
* [ ] source batching
|
||||||
* [x] add item from web
|
* [x] add item from web
|
||||||
* [x] Nix build
|
* [ ] Nix build
|
||||||
* [ ] NixOS module
|
* [ ] NixOS module
|
||||||
* [ ] NixOS vm demo
|
* [ ] NixOS vm demo
|
||||||
* [ ] Nix flake templates
|
* [ ] Nix flake templates
|
||||||
@ -118,7 +118,7 @@ Future features
|
|||||||
* [ ] manual item edits, web
|
* [ ] manual item edits, web
|
||||||
* [ ] source-level TTS
|
* [ ] source-level TTS
|
||||||
* [ ] metric reporting
|
* [ ] metric reporting
|
||||||
* [x] on action failure, create an error item with logs
|
* [ ] on action failure, create an error item with logs
|
||||||
* [ ] items gracefully add new fields and `action` keys
|
* [ ] items gracefully add new fields and `action` keys
|
||||||
* [ ] arbitrary date punt
|
* [ ] arbitrary date punt
|
||||||
* [ ] sort crontab entries
|
* [ ] sort crontab entries
|
||||||
|
@ -99,9 +99,8 @@ func actionExecute(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
newItem, newState, errItem, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute, postProcess)
|
newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute, postProcess)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
core.AddErrorItem(db, errItem)
|
|
||||||
log.Fatalf("error executing %s: %v", action, err)
|
log.Fatalf("error executing %s: %v", action, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/rand"
|
||||||
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"log"
|
"log"
|
||||||
|
|
||||||
@ -68,7 +70,11 @@ func itemAdd(
|
|||||||
}
|
}
|
||||||
// Default id to random hex string
|
// Default id to random hex string
|
||||||
if id == "" {
|
if id == "" {
|
||||||
id = core.RandomHex(16)
|
bytes := make([]byte, 16)
|
||||||
|
if _, err := rand.Read(bytes); err != nil {
|
||||||
|
log.Fatalf("error: failed to generate id: %v", err)
|
||||||
|
}
|
||||||
|
id = hex.EncodeToString(bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
var itemActions core.Actions
|
var itemActions core.Actions
|
||||||
|
@ -48,9 +48,8 @@ func sourceFetch(source string, format string, dryRun bool) {
|
|||||||
log.Fatalf("error: failed to load data for %s: %v", source, err)
|
log.Fatalf("error: failed to load data for %s: %v", source, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
items, newState, errItem, err := core.Execute(source, argv, envs, state, "", time.Minute, postProcess)
|
items, newState, err := core.Execute(source, argv, envs, state, "", time.Minute, postProcess)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
core.AddErrorItem(db, errItem)
|
|
||||||
log.Fatalf("error: failed to execute fetch: %v", err)
|
log.Fatalf("error: failed to execute fetch: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,7 +33,7 @@ func init() {
|
|||||||
func sourceTest(env []string, format string, cmd []string) {
|
func sourceTest(env []string, format string, cmd []string) {
|
||||||
formatter := formatAs(format)
|
formatter := formatAs(format)
|
||||||
|
|
||||||
items, state, _, err := core.Execute("test", cmd, env, nil, "", time.Minute, nil)
|
items, state, err := core.Execute("test", cmd, env, nil, "", time.Minute, nil)
|
||||||
log.Printf("returned %d items", len(items))
|
log.Printf("returned %d items", len(items))
|
||||||
log.Printf("wrote %d bytes of state", len(state))
|
log.Printf("wrote %d bytes of state", len(state))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1,22 +0,0 @@
|
|||||||
package core
|
|
||||||
|
|
||||||
import "log"
|
|
||||||
|
|
||||||
func AddErrorItem(db DB, item Item) {
|
|
||||||
exists, err := SourceExists(db, "default")
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("error: could not ensure default source: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !exists {
|
|
||||||
if err = AddSource(db, "default"); err != nil {
|
|
||||||
log.Printf("error: could not add default source: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
item.Source = "default"
|
|
||||||
err = AddItems(db, []Item{item})
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("error: could not add error item: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
137
core/execute.go
137
core/execute.go
@ -4,6 +4,7 @@ import (
|
|||||||
"bufio"
|
"bufio"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
@ -28,22 +29,6 @@ func readPipe(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Two-stage error item helper, one stage to capture the immutable parameters and a second
|
|
||||||
// to include the variable error and logs.
|
|
||||||
func makeMakeErrItem(source string, argv []string) func(err error, logs []string) (item Item) {
|
|
||||||
return func(err error, logs []string) (item Item) {
|
|
||||||
item.Source = "default"
|
|
||||||
item.Id = RandomHex(16)
|
|
||||||
item.Title = fmt.Sprintf("Failed to execute for %s", source)
|
|
||||||
log := "no logs"
|
|
||||||
if len(logs) > 0 {
|
|
||||||
log = strings.Join(logs, "\n")
|
|
||||||
}
|
|
||||||
item.Body = fmt.Sprintf("<p><i>executing:</i> %#v</p><p><i>error:</i> %s</p><pre>%s</pre>", argv, err.Error(), log)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func Execute(
|
func Execute(
|
||||||
source string,
|
source string,
|
||||||
argv []string,
|
argv []string,
|
||||||
@ -55,30 +40,20 @@ func Execute(
|
|||||||
) (
|
) (
|
||||||
items []Item,
|
items []Item,
|
||||||
newState []byte,
|
newState []byte,
|
||||||
errItem Item,
|
|
||||||
err error,
|
err error,
|
||||||
) {
|
) {
|
||||||
log.Printf("executing %v", argv)
|
log.Printf("executing %v", argv)
|
||||||
|
|
||||||
makeErrorItem := makeMakeErrItem(source, argv)
|
|
||||||
var logs []string
|
|
||||||
|
|
||||||
if source == "" {
|
|
||||||
err = fmt.Errorf("empty source")
|
|
||||||
errItem = makeErrorItem(err, logs)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if len(argv) == 0 {
|
if len(argv) == 0 {
|
||||||
err = fmt.Errorf("empty argv for %s", source)
|
return nil, nil, errors.New("empty argv")
|
||||||
errItem = makeErrorItem(err, logs)
|
}
|
||||||
return
|
if source == "" {
|
||||||
|
return nil, nil, errors.New("empty source")
|
||||||
}
|
}
|
||||||
|
|
||||||
stateFile, err := os.CreateTemp("", "intake_state_*")
|
stateFile, err := os.CreateTemp("", "intake_state_*")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("failed to create temp state file: %v", err)
|
return nil, nil, fmt.Errorf("error: failed to create temp state file: %v", err)
|
||||||
errItem = makeErrorItem(err, logs)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := os.Remove(stateFile.Name()); err != nil {
|
if err := os.Remove(stateFile.Name()); err != nil {
|
||||||
@ -88,9 +63,7 @@ func Execute(
|
|||||||
|
|
||||||
_, err = stateFile.Write(state)
|
_, err = stateFile.Write(state)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("failed to write state file: %v", err)
|
return nil, nil, fmt.Errorf("error: failed to write state file: %v", err)
|
||||||
errItem = makeErrorItem(err, logs)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
env = append(env, "STATE_PATH="+stateFile.Name())
|
env = append(env, "STATE_PATH="+stateFile.Name())
|
||||||
@ -104,21 +77,15 @@ func Execute(
|
|||||||
// Open pipes to the command
|
// Open pipes to the command
|
||||||
stdin, err := cmd.StdinPipe()
|
stdin, err := cmd.StdinPipe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("failed to open stdin pipe: %v", err)
|
return nil, nil, err
|
||||||
errItem = makeErrorItem(err, logs)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
stdout, err := cmd.StdoutPipe()
|
stdout, err := cmd.StdoutPipe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("failed to open stdout pipe: %v", err)
|
return nil, nil, err
|
||||||
errItem = makeErrorItem(err, logs)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
stderr, err := cmd.StderrPipe()
|
stderr, err := cmd.StderrPipe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("failed to open stderr pipe: %v", err)
|
return nil, nil, err
|
||||||
errItem = makeErrorItem(err, logs)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cout := make(chan []byte)
|
cout := make(chan []byte)
|
||||||
@ -133,22 +100,12 @@ func Execute(
|
|||||||
// Kick off the command
|
// Kick off the command
|
||||||
err = cmd.Start()
|
err = cmd.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("failed to start execution: %v", err)
|
return nil, nil, err
|
||||||
errItem = makeErrorItem(err, logs)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write any input to stdin and close it
|
// Write any input to stdin and close it
|
||||||
if _, err = io.WriteString(stdin, input); err != nil {
|
io.WriteString(stdin, input)
|
||||||
err = fmt.Errorf("failed to write to stdin: %v", err)
|
stdin.Close()
|
||||||
errItem = makeErrorItem(err, logs)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err = stdin.Close(); err != nil {
|
|
||||||
err = fmt.Errorf("failed to close stdin: %v", err)
|
|
||||||
errItem = makeErrorItem(err, logs)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Collect outputs until std{out,err} close
|
// Collect outputs until std{out,err} close
|
||||||
parseError := false
|
parseError := false
|
||||||
@ -161,9 +118,7 @@ monitor:
|
|||||||
var item Item
|
var item Item
|
||||||
err := json.Unmarshal(data, &item)
|
err := json.Unmarshal(data, &item)
|
||||||
if err != nil || item.Id == "" {
|
if err != nil || item.Id == "" {
|
||||||
msg := fmt.Sprintf("[%s: stdout] %s", source, strings.TrimSpace(string(data)))
|
log.Printf("[%s: stdout] %s", source, strings.TrimSpace(string(data)))
|
||||||
log.Print(msg)
|
|
||||||
logs = append(logs, msg)
|
|
||||||
parseError = true
|
parseError = true
|
||||||
} else {
|
} else {
|
||||||
if postProcess != nil {
|
if postProcess != nil {
|
||||||
@ -172,16 +127,12 @@ monitor:
|
|||||||
item.Active = true // These fields aren't up to
|
item.Active = true // These fields aren't up to
|
||||||
item.Created = 0 // the action to set and
|
item.Created = 0 // the action to set and
|
||||||
item.Source = source // shouldn't be overrideable
|
item.Source = source // shouldn't be overrideable
|
||||||
msg := fmt.Sprintf("[%s: item] %s", source, item.Id)
|
log.Printf("[%s: item] %s\n", source, item.Id)
|
||||||
log.Print(msg)
|
|
||||||
logs = append(logs, msg)
|
|
||||||
items = append(items, item)
|
items = append(items, item)
|
||||||
}
|
}
|
||||||
|
|
||||||
case data := <-cerr:
|
case data := <-cerr:
|
||||||
msg := fmt.Sprintf("[%s: stderr] %s", source, strings.TrimSpace(string(data)))
|
log.Printf("[%s: stderr] %s\n", source, data)
|
||||||
log.Print(msg)
|
|
||||||
logs = append(logs, msg)
|
|
||||||
|
|
||||||
case <-coutDone:
|
case <-coutDone:
|
||||||
stdoutDone = true
|
stdoutDone = true
|
||||||
@ -199,35 +150,24 @@ monitor:
|
|||||||
|
|
||||||
err = cmd.Wait()
|
err = cmd.Wait()
|
||||||
if ctx.Err() == context.DeadlineExceeded {
|
if ctx.Err() == context.DeadlineExceeded {
|
||||||
err = fmt.Errorf("timed out after %v", timeout)
|
log.Printf("Timed out after %v\n", timeout)
|
||||||
errItem = makeErrorItem(err, logs)
|
return nil, nil, err
|
||||||
log.Printf("error: %v", err)
|
|
||||||
return nil, nil, errItem, err
|
|
||||||
} else if exiterr, ok := err.(*exec.ExitError); ok {
|
} else if exiterr, ok := err.(*exec.ExitError); ok {
|
||||||
err = fmt.Errorf("%s failed with exit code %d: %v", argv[0], exiterr.ExitCode(), exiterr)
|
log.Printf("error: %s failed with exit code %d\n", argv[0], exiterr.ExitCode())
|
||||||
errItem = makeErrorItem(err, logs)
|
return nil, nil, err
|
||||||
log.Printf("error: %v", err)
|
|
||||||
return nil, nil, errItem, err
|
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
err = fmt.Errorf("error: %s failed with error: %s", argv[0], err)
|
log.Printf("error: %s failed with error: %s\n", argv[0], err)
|
||||||
errItem = makeErrorItem(err, logs)
|
return nil, nil, err
|
||||||
log.Printf("error: %v", err)
|
|
||||||
return nil, nil, errItem, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if parseError {
|
if parseError {
|
||||||
err = fmt.Errorf("could not parse an item")
|
log.Printf("error: could not parse item\n")
|
||||||
errItem = makeErrorItem(err, logs)
|
return nil, nil, errors.New("invalid JSON")
|
||||||
log.Printf("error: %v", err)
|
|
||||||
return nil, nil, errItem, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
newState, err = os.ReadFile(stateFile.Name())
|
newState, err = os.ReadFile(stateFile.Name())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("error: failed to read state file: %v", err)
|
return nil, nil, fmt.Errorf("error: failed to read state file: %v", err)
|
||||||
errItem = makeErrorItem(err, logs)
|
|
||||||
log.Printf("error: %v", err)
|
|
||||||
return nil, nil, errItem, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
@ -242,34 +182,21 @@ func ExecuteItemAction(
|
|||||||
state []byte,
|
state []byte,
|
||||||
timeout time.Duration,
|
timeout time.Duration,
|
||||||
postProcess func(item Item) Item,
|
postProcess func(item Item) Item,
|
||||||
) (
|
) (Item, []byte, error) {
|
||||||
newItem Item,
|
|
||||||
newState []byte,
|
|
||||||
errItem Item,
|
|
||||||
err error,
|
|
||||||
) {
|
|
||||||
makeErrorItem := makeMakeErrItem(item.Source, argv)
|
|
||||||
|
|
||||||
itemJson, err := json.Marshal(item)
|
itemJson, err := json.Marshal(item)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("failed to serialize item: %v", err)
|
return Item{}, nil, fmt.Errorf("failed to serialize item: %v", err)
|
||||||
errItem = makeErrorItem(err, nil)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
res, newState, errItem, err := Execute(item.Source, argv, env, state, string(itemJson), timeout, postProcess)
|
res, newState, err := Execute(item.Source, argv, env, state, string(itemJson), timeout, postProcess)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("failed to execute action for %s/%s: %v", item.Source, item.Id, err)
|
return Item{}, nil, fmt.Errorf("failed to execute action for %s/%s: %v", item.Source, item.Id, err)
|
||||||
errItem = makeErrorItem(err, nil)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
if len(res) != 1 {
|
if len(res) != 1 {
|
||||||
err = fmt.Errorf("expected action to produce exactly one item, got %d", len(res))
|
return Item{}, nil, fmt.Errorf("expected action to produce exactly one item, got %d", len(res))
|
||||||
errItem = makeErrorItem(err, nil)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
newItem = res[0]
|
newItem := res[0]
|
||||||
BackfillItem(&newItem, &item)
|
BackfillItem(&newItem, &item)
|
||||||
|
|
||||||
return
|
return newItem, newState, nil
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
package core
|
package core
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"strings"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@ -26,7 +25,7 @@ func TestExecute(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
execute := func(argv []string) ([]Item, error) {
|
execute := func(argv []string) ([]Item, error) {
|
||||||
item, _, _, err := Execute("_", argv, nil, nil, "", time.Minute, nil)
|
item, _, err := Execute("_", argv, nil, nil, "", time.Minute, nil)
|
||||||
return item, err
|
return item, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -49,7 +48,7 @@ func TestExecute(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("Timeout", func(t *testing.T) {
|
t.Run("Timeout", func(t *testing.T) {
|
||||||
res, _, _, err := Execute("_", []string{"sleep", "10"}, nil, nil, "", time.Millisecond, nil)
|
res, _, err := Execute("_", []string{"sleep", "10"}, nil, nil, "", time.Millisecond, nil)
|
||||||
assertNotNil(t, err)
|
assertNotNil(t, err)
|
||||||
assertLen(t, res, 0)
|
assertLen(t, res, 0)
|
||||||
})
|
})
|
||||||
@ -64,7 +63,7 @@ func TestExecute(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("ReadFromStdin", func(t *testing.T) {
|
t.Run("ReadFromStdin", func(t *testing.T) {
|
||||||
res, _, _, err := Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, nil, "bar", time.Minute, nil)
|
res, _, err := Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, nil, "bar", time.Minute, nil)
|
||||||
assertNil(t, err)
|
assertNil(t, err)
|
||||||
assertLen(t, res, 1)
|
assertLen(t, res, 1)
|
||||||
if res[0].Id != "bar" {
|
if res[0].Id != "bar" {
|
||||||
@ -73,7 +72,7 @@ func TestExecute(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("SetEnv", func(t *testing.T) {
|
t.Run("SetEnv", func(t *testing.T) {
|
||||||
res, _, _, err := Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, nil, "", time.Minute, nil)
|
res, _, err := Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, nil, "", time.Minute, nil)
|
||||||
assertNil(t, err)
|
assertNil(t, err)
|
||||||
assertLen(t, res, 1)
|
assertLen(t, res, 1)
|
||||||
if res[0].Id != "baz" {
|
if res[0].Id != "baz" {
|
||||||
@ -161,7 +160,7 @@ func TestExecute(t *testing.T) {
|
|||||||
|
|
||||||
t.Run("ReadState", func(t *testing.T) {
|
t.Run("ReadState", func(t *testing.T) {
|
||||||
argv := []string{"sh", "-c", `cat $STATE_PATH | jq -cR '{id: "greeting", title: .} | .title = "Hello " + .title'`}
|
argv := []string{"sh", "-c", `cat $STATE_PATH | jq -cR '{id: "greeting", title: .} | .title = "Hello " + .title'`}
|
||||||
res, _, _, err := Execute("_", argv, nil, []byte("world"), "", time.Minute, nil)
|
res, _, err := Execute("_", argv, nil, []byte("world"), "", time.Minute, nil)
|
||||||
assertNil(t, err)
|
assertNil(t, err)
|
||||||
assertLen(t, res, 1)
|
assertLen(t, res, 1)
|
||||||
if res[0].Title != "Hello world" {
|
if res[0].Title != "Hello world" {
|
||||||
@ -171,7 +170,7 @@ func TestExecute(t *testing.T) {
|
|||||||
|
|
||||||
t.Run("WriteState", func(t *testing.T) {
|
t.Run("WriteState", func(t *testing.T) {
|
||||||
argv := []string{"sh", "-c", `printf "Hello world" > $STATE_PATH; jq -cn '{id: "test"}'`}
|
argv := []string{"sh", "-c", `printf "Hello world" > $STATE_PATH; jq -cn '{id: "test"}'`}
|
||||||
res, newState, _, err := Execute("_", argv, nil, nil, "", time.Minute, nil)
|
res, newState, err := Execute("_", argv, nil, nil, "", time.Minute, nil)
|
||||||
assertNil(t, err)
|
assertNil(t, err)
|
||||||
assertLen(t, res, 1)
|
assertLen(t, res, 1)
|
||||||
if string(newState) != "Hello world" {
|
if string(newState) != "Hello world" {
|
||||||
@ -181,7 +180,7 @@ func TestExecute(t *testing.T) {
|
|||||||
|
|
||||||
t.Run("PostprocessSetTtl", func(t *testing.T) {
|
t.Run("PostprocessSetTtl", func(t *testing.T) {
|
||||||
argv := []string{"jq", "-cn", `{id: "foo"}`}
|
argv := []string{"jq", "-cn", `{id: "foo"}`}
|
||||||
res, _, _, err := Execute("_", argv, nil, nil, "", time.Minute, func(item Item) Item {
|
res, _, err := Execute("_", argv, nil, nil, "", time.Minute, func(item Item) Item {
|
||||||
item.Ttl = 123456
|
item.Ttl = 123456
|
||||||
return item
|
return item
|
||||||
})
|
})
|
||||||
@ -191,22 +190,4 @@ func TestExecute(t *testing.T) {
|
|||||||
t.Fatalf("expected ttl to be set to 123456, got %d", res[0].Ttl)
|
t.Fatalf("expected ttl to be set to 123456, got %d", res[0].Ttl)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("ErrorItem", func(t *testing.T) {
|
|
||||||
argv := []string{"sh", "-c", `echo 1>&2 Hello; jq -cn '{id: "box"}'; echo 1>&2 World; printf '{"whoops": "my bad"}'`}
|
|
||||||
_, _, errItem, err := Execute("test", argv, nil, nil, "", time.Minute, nil)
|
|
||||||
assertNotNil(t, err)
|
|
||||||
if errItem.Id == "" {
|
|
||||||
t.Fatal("missing erritem id")
|
|
||||||
}
|
|
||||||
if errItem.Source != "default" {
|
|
||||||
t.Fatalf("unexpected erritem source: expected default, got %s", errItem.Source)
|
|
||||||
}
|
|
||||||
if !strings.Contains(errItem.Body, "Hello") || !strings.Contains(errItem.Body, "World") {
|
|
||||||
t.Fatal("missing stderr from erritem")
|
|
||||||
}
|
|
||||||
if !strings.Contains(errItem.Body, "whoops") {
|
|
||||||
t.Fatal("missing stdout from erritem")
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
11
core/item.go
11
core/item.go
@ -5,7 +5,6 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"math/rand"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -106,13 +105,3 @@ var AvailableFormats = map[string]string{
|
|||||||
"json": "Full item JSON",
|
"json": "Full item JSON",
|
||||||
"short": "Item source and id",
|
"short": "Item source and id",
|
||||||
}
|
}
|
||||||
|
|
||||||
const hexDigits = "0123456789abcdef"
|
|
||||||
|
|
||||||
func RandomHex(n int) string {
|
|
||||||
bytes := make([]byte, n)
|
|
||||||
for i := range bytes {
|
|
||||||
bytes[i] = hexDigits[rand.Intn(len(hexDigits))]
|
|
||||||
}
|
|
||||||
return string(bytes)
|
|
||||||
}
|
|
||||||
|
@ -228,10 +228,8 @@ func updateWithFetchedItemsTx(
|
|||||||
var updatedNewItems []Item
|
var updatedNewItems []Item
|
||||||
for _, item := range newItems {
|
for _, item := range newItems {
|
||||||
var updatedItem Item
|
var updatedItem Item
|
||||||
var errItem Item
|
updatedItem, state, err = ExecuteItemAction(item, onCreateArgv, envs, state, time.Minute, postProcess)
|
||||||
updatedItem, state, errItem, err = ExecuteItemAction(item, onCreateArgv, envs, state, time.Minute, postProcess)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AddErrorItem(db, errItem)
|
|
||||||
log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err)
|
log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err)
|
||||||
}
|
}
|
||||||
updatedNewItems = append(updatedNewItems, updatedItem)
|
updatedNewItems = append(updatedNewItems, updatedItem)
|
||||||
|
@ -147,9 +147,9 @@ func TestOnCreateAction(t *testing.T) {
|
|||||||
|
|
||||||
execute := func(argv []string) []Item {
|
execute := func(argv []string) []Item {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
items, _, _, err := Execute("test", argv, nil, nil, "", time.Minute, nil)
|
items, _, err := Execute("test", argv, nil, nil, "", time.Minute, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error executing test fetch: %v", err)
|
t.Fatal("unexpected error executing test fetch")
|
||||||
}
|
}
|
||||||
if len(items) != 1 {
|
if len(items) != 1 {
|
||||||
t.Fatalf("expected only one item, got %d", len(items))
|
t.Fatalf("expected only one item, got %d", len(items))
|
||||||
|
@ -18,17 +18,11 @@
|
|||||||
perSystem =
|
perSystem =
|
||||||
{
|
{
|
||||||
pkgs,
|
pkgs,
|
||||||
self',
|
|
||||||
...
|
...
|
||||||
}:
|
}:
|
||||||
{
|
{
|
||||||
formatter = pkgs.nixfmt-rfc-style;
|
formatter = pkgs.nixfmt-rfc-style;
|
||||||
|
|
||||||
packages = {
|
|
||||||
intake = pkgs.callPackage ./package.nix { };
|
|
||||||
default = self'.packages.intake;
|
|
||||||
};
|
|
||||||
|
|
||||||
devShells.default = pkgs.mkShell {
|
devShells.default = pkgs.mkShell {
|
||||||
packages = [
|
packages = [
|
||||||
pkgs.go
|
pkgs.go
|
||||||
|
31
package.nix
31
package.nix
@ -1,31 +0,0 @@
|
|||||||
{
|
|
||||||
buildGoModule,
|
|
||||||
jq,
|
|
||||||
lib,
|
|
||||||
}:
|
|
||||||
let
|
|
||||||
inherit (lib.fileset)
|
|
||||||
toSource
|
|
||||||
unions
|
|
||||||
;
|
|
||||||
in
|
|
||||||
buildGoModule {
|
|
||||||
pname = "intake";
|
|
||||||
version = "2.0.0";
|
|
||||||
src = toSource {
|
|
||||||
root = ./.;
|
|
||||||
fileset = unions [
|
|
||||||
./cmd
|
|
||||||
./core
|
|
||||||
./go.mod
|
|
||||||
./go.sum
|
|
||||||
./main.go
|
|
||||||
./web
|
|
||||||
];
|
|
||||||
};
|
|
||||||
vendorHash = "sha256-sv4rlURaTatxJ5atYWYzD2O4EiuPp66i0RY5lQzhBKU=";
|
|
||||||
|
|
||||||
nativeBuildInputs = [ jq ];
|
|
||||||
|
|
||||||
meta = {};
|
|
||||||
}
|
|
@ -6,7 +6,6 @@ rm tmp/intake.db* || true
|
|||||||
export INTAKE_DATA_DIR="tmp"
|
export INTAKE_DATA_DIR="tmp"
|
||||||
tmp/intake migrate
|
tmp/intake migrate
|
||||||
|
|
||||||
# testing item display format
|
|
||||||
tmp/intake source add -s feedtest
|
tmp/intake source add -s feedtest
|
||||||
tmp/intake item add -s feedtest --id "this-item-has-no-title"
|
tmp/intake item add -s feedtest --id "this-item-has-no-title"
|
||||||
tmp/intake item add -s feedtest --id a --title "This item has only a title"
|
tmp/intake item add -s feedtest --id a --title "This item has only a title"
|
||||||
@ -29,24 +28,18 @@ tmp/intake item add -s feedtest --id q --title "TTS -10d" --tts "-864000"
|
|||||||
|
|
||||||
tmp/intake action add -s feedtest -a fetch -- jq -cn '{id: "0", title: "Returned by fetch"}'
|
tmp/intake action add -s feedtest -a fetch -- jq -cn '{id: "0", title: "Returned by fetch"}'
|
||||||
|
|
||||||
# testing actions that modify items
|
|
||||||
tmp/intake source add -s spook
|
tmp/intake source add -s spook
|
||||||
tmp/intake action add -s spook -a spookier -- jq -c '.title = .title + "o"'
|
tmp/intake action add -s spook -a spookier -- jq -c '.title = .title + "o"'
|
||||||
tmp/intake item add -s spook --id boo --title "Boo" --action '{"spookier": true}'
|
tmp/intake item add -s spook --id boo --title "Boo" --action '{"spookier": true}'
|
||||||
|
|
||||||
tmp/intake channel add -c home -s feedtest
|
|
||||||
tmp/intake channel add -c home -s spook
|
|
||||||
|
|
||||||
# testing empty feeds
|
|
||||||
tmp/intake source add -s nothing
|
tmp/intake source add -s nothing
|
||||||
tmp/intake action add -s nothing -a fetch -- true
|
tmp/intake action add -s nothing -a fetch -- true
|
||||||
|
|
||||||
|
tmp/intake channel add -c all -s feedtest
|
||||||
|
tmp/intake channel add -c all -s spook
|
||||||
|
|
||||||
tmp/intake channel add -c none -s nothing
|
tmp/intake channel add -c none -s nothing
|
||||||
|
|
||||||
# testing error items
|
|
||||||
tmp/intake source add -s gonnafail
|
|
||||||
tmp/intake action add -s gonnafail -a fetch -- sh -c 'echo Three... 1>&2; echo Two... 1>&2; echo One... 1>&2; echo BOOM!'
|
|
||||||
|
|
||||||
# testing feed paging
|
|
||||||
tmp/intake source add -s page
|
tmp/intake source add -s page
|
||||||
tmp/intake item add -s page --id 1 --title "Item 1" --body "This is the body of item 1"
|
tmp/intake item add -s page --id 1 --title "Item 1" --body "This is the body of item 1"
|
||||||
for i in $(seq 2 211); do
|
for i in $(seq 2 211); do
|
||||||
@ -54,6 +47,5 @@ for i in $(seq 2 211); do
|
|||||||
done
|
done
|
||||||
tmp/intake item add -s page --id 212 --title "Item 212" --body "This is the body of item 212"
|
tmp/intake item add -s page --id 212 --title "Item 212" --body "This is the body of item 212"
|
||||||
|
|
||||||
# default password, comment out to test no password
|
|
||||||
echo "hello" | tmp/intake passwd --stdin
|
echo "hello" | tmp/intake passwd --stdin
|
||||||
echo "hello" | tmp/intake passwd --stdin --verify
|
echo "hello" | tmp/intake passwd --stdin --verify
|
||||||
|
11
web/item.go
11
web/item.go
@ -1,6 +1,8 @@
|
|||||||
package web
|
package web
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/rand"
|
||||||
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -85,9 +87,8 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
newItem, newState, errItem, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute, postProcess)
|
newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute, postProcess)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
core.AddErrorItem(env.db, errItem)
|
|
||||||
http.Error(writer, err.Error(), 500)
|
http.Error(writer, err.Error(), 500)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -156,7 +157,11 @@ func (env *Env) addItem(writer http.ResponseWriter, req *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
id := core.RandomHex(16)
|
bytes := make([]byte, 16)
|
||||||
|
if _, err := rand.Read(bytes); err != nil {
|
||||||
|
http.Error(writer, fmt.Sprintf("error: failed to generate id: %v", err), 500)
|
||||||
|
}
|
||||||
|
id := hex.EncodeToString(bytes)
|
||||||
|
|
||||||
title := req.PostForm.Get("title")
|
title := req.PostForm.Get("title")
|
||||||
link := req.PostForm.Get("link")
|
link := req.PostForm.Get("link")
|
||||||
|
@ -15,13 +15,13 @@ import (
|
|||||||
var AuthCookieName string = "intake_auth"
|
var AuthCookieName string = "intake_auth"
|
||||||
var AuthDuration time.Duration = time.Hour * 24 * 7
|
var AuthDuration time.Duration = time.Hour * 24 * 7
|
||||||
|
|
||||||
func newSession(db core.DB) (session string, err error) {
|
func newSession(db core.DB) (string, error) {
|
||||||
bytes := make([]byte, 32)
|
bytes := make([]byte, 32)
|
||||||
_, err = rand.Read(bytes)
|
_, err := rand.Read(bytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
session = fmt.Sprintf("%x", bytes)
|
session := fmt.Sprintf("%x", bytes)
|
||||||
expires := int(time.Now().Add(AuthDuration).Unix())
|
expires := int(time.Now().Add(AuthDuration).Unix())
|
||||||
_, err = db.Exec(`
|
_, err = db.Exec(`
|
||||||
insert into sessions (id, expires)
|
insert into sessions (id, expires)
|
||||||
|
@ -55,9 +55,8 @@ func (env *Env) fetchSource(writer http.ResponseWriter, req *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
items, newState, errItem, err := core.Execute(source, argv, envs, state, "", time.Minute, postProcess)
|
items, newState, err := core.Execute(source, argv, envs, state, "", time.Minute, postProcess)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
core.AddErrorItem(env.db, errItem)
|
|
||||||
http.Error(writer, fmt.Sprintf("error: failed to execute fetch: %v", err.Error()), 500)
|
http.Error(writer, fmt.Sprintf("error: failed to execute fetch: %v", err.Error()), 500)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user