Compare commits

...

14 Commits

36 changed files with 900 additions and 365 deletions

View File

@ -113,7 +113,8 @@ To execute an action, Intake executes the command specified by that action's `ar
The process's environment is as follows: The process's environment is as follows:
* `intake`'s environment is inherited. * `intake`'s environment is inherited.
* `STATE_PATH` is set to the absolute path of a file containing the source's persistent state. * Each environment variable defined in the source is set.
* `STATE_PATH` is set to the absolute path of a file that the source can use for persistent state. This file can be used for any data in any format. Changes to the state file are only saved if the action succeeds.
When an action receives an item as input, that item's JSON representation is written to that action's `stdin`. When an action receives an item as input, that item's JSON representation is written to that action's `stdin`.
When an action outputs an item, it should write the item's JSON representation to `stdout` on one line. When an action outputs an item, it should write the item's JSON representation to `stdout` on one line.

View File

@ -13,28 +13,25 @@ var actionAddCmd = &cobra.Command{
Long: `Add an action to a source. Long: `Add an action to a source.
`, `,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
actionAdd(getArgv(cmd, args)) actionAdd(stringArg(cmd, "source"), stringArg(cmd, "action"), getArgv(cmd, args))
}, },
} }
var actionAddSource string
var actionAddAction string
func init() { func init() {
actionCmd.AddCommand(actionAddCmd) actionCmd.AddCommand(actionAddCmd)
actionAddCmd.Flags().StringVarP(&actionAddSource, "source", "s", "", "Source to add action") actionAddCmd.Flags().StringP("source", "s", "", "Source to add action")
actionAddCmd.MarkFlagRequired("source") actionAddCmd.MarkFlagRequired("source")
actionAddCmd.Flags().StringVarP(&actionAddAction, "action", "a", "", "Action name") actionAddCmd.Flags().StringP("action", "a", "", "Action name")
actionAddCmd.MarkFlagRequired("action") actionAddCmd.MarkFlagRequired("action")
} }
func actionAdd(argv []string) { func actionAdd(source string, action string, argv []string) {
if actionAddSource == "" { if source == "" {
log.Fatal("error: --source is empty") log.Fatal("error: --source is empty")
} }
if actionAddAction == "" { if action == "" {
log.Fatal("error: --action is empty") log.Fatal("error: --action is empty")
} }
if len(argv) == 0 { if len(argv) == 0 {
@ -43,10 +40,10 @@ func actionAdd(argv []string) {
db := openAndMigrateDb() db := openAndMigrateDb()
err := core.AddAction(db, actionAddSource, actionAddAction, argv) err := core.AddAction(db, source, action, argv)
if err != nil { if err != nil {
log.Fatalf("error: failed to add action: %v", err) log.Fatalf("error: failed to add action: %v", err)
} }
log.Printf("Added action %s to source %s", actionAddAction, actionAddSource) log.Printf("Added action %s to source %s", action, source)
} }

View File

@ -14,37 +14,34 @@ var actionDeleteCmd = &cobra.Command{
Long: `Delete an action from a source. Long: `Delete an action from a source.
`, `,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
actionDelete() actionDelete(stringArg(cmd, "source"), stringArg(cmd, "action"))
}, },
} }
var actionDeleteSource string
var actionDeleteAction string
func init() { func init() {
actionCmd.AddCommand(actionDeleteCmd) actionCmd.AddCommand(actionDeleteCmd)
actionDeleteCmd.Flags().StringVarP(&actionDeleteSource, "source", "s", "", "Source to add action") actionDeleteCmd.Flags().StringP("source", "s", "", "Source to add action")
actionDeleteCmd.MarkFlagRequired("source") actionDeleteCmd.MarkFlagRequired("source")
actionDeleteCmd.Flags().StringVarP(&actionDeleteAction, "action", "a", "", "Action name") actionDeleteCmd.Flags().StringP("action", "a", "", "Action name")
actionDeleteCmd.MarkFlagRequired("action") actionDeleteCmd.MarkFlagRequired("action")
} }
func actionDelete() { func actionDelete(source string, action string) {
if actionDeleteSource == "" { if source == "" {
log.Fatal("error: --source is empty") log.Fatal("error: --source is empty")
} }
if actionDeleteAction == "" { if action == "" {
log.Fatal("error: --action is empty") log.Fatal("error: --action is empty")
} }
db := openAndMigrateDb() db := openAndMigrateDb()
err := core.DeleteAction(db, actionDeleteSource, actionDeleteAction) err := core.DeleteAction(db, source, action)
if err != nil { if err != nil {
log.Fatalf("error: failed to delete action: %v", err) log.Fatalf("error: failed to delete action: %v", err)
} }
log.Printf("Deleted action %s from source %s", actionDeleteAction, actionDeleteSource) log.Printf("Deleted action %s from source %s", action, source)
} }

View File

@ -13,28 +13,25 @@ var actionEditCmd = &cobra.Command{
Long: `Edit an action on a source. Long: `Edit an action on a source.
`, `,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
actionEdit(getArgv(cmd, args)) actionEdit(stringArg(cmd, "source"), stringArg(cmd, "action"), getArgv(cmd, args))
}, },
} }
var actionEditSource string
var actionEditAction string
func init() { func init() {
actionCmd.AddCommand(actionEditCmd) actionCmd.AddCommand(actionEditCmd)
actionEditCmd.Flags().StringVarP(&actionEditSource, "source", "s", "", "Source to edit action") actionEditCmd.Flags().StringP("source", "s", "", "Source to edit action")
actionEditCmd.MarkFlagRequired("source") actionEditCmd.MarkFlagRequired("source")
actionEditCmd.Flags().StringVarP(&actionEditAction, "action", "a", "", "Action name") actionEditCmd.Flags().StringP("action", "a", "", "Action name")
actionEditCmd.MarkFlagRequired("action") actionEditCmd.MarkFlagRequired("action")
} }
func actionEdit(argv []string) { func actionEdit(source string, action string, argv []string) {
if actionEditSource == "" { if source == "" {
log.Fatal("error: --source is empty") log.Fatal("error: --source is empty")
} }
if actionEditAction == "" { if action == "" {
log.Fatal("error: --action is empty") log.Fatal("error: --action is empty")
} }
if len(argv) == 0 { if len(argv) == 0 {
@ -43,10 +40,10 @@ func actionEdit(argv []string) {
db := openAndMigrateDb() db := openAndMigrateDb()
err := core.UpdateAction(db, actionEditSource, actionEditAction, argv) err := core.UpdateAction(db, source, action, argv)
if err != nil { if err != nil {
log.Fatalf("error: failed to update action: %v", err) log.Fatalf("error: failed to update action: %v", err)
} }
log.Printf("Updated action %s on source %s", actionEditAction, actionEditSource) log.Printf("Updated action %s on source %s", action, source)
} }

View File

@ -1,9 +1,9 @@
package cmd package cmd
import ( import (
"encoding/json"
"fmt" "fmt"
"log" "log"
"slices"
"time" "time"
"github.com/Jaculabilis/intake/core" "github.com/Jaculabilis/intake/core"
@ -26,87 +26,95 @@ In a dry run, the item will be printed in the chosen format and not updated.
%s`, makeFormatHelpText()), %s`, makeFormatHelpText()),
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
actionExecute() actionExecute(
stringArg(cmd, "source"),
stringArg(cmd, "action"),
stringArg(cmd, "item"),
stringArg(cmd, "format"),
boolArg(cmd, "dry-run"),
boolArg(cmd, "diff"),
boolArg(cmd, "force"),
)
}, },
} }
var actionExecuteSource string
var actionExecuteAction string
var actionExecuteItem string
var actionExecuteFormat string
var actionExecuteDryRun bool
var actionExecuteDiff bool
var actionExecuteForce bool
func init() { func init() {
actionCmd.AddCommand(actionExecuteCmd) actionCmd.AddCommand(actionExecuteCmd)
actionExecuteCmd.PersistentFlags().StringVarP(&actionExecuteSource, "source", "s", "", "Source of the item") actionExecuteCmd.PersistentFlags().StringP("source", "s", "", "Source of the item")
actionExecuteCmd.MarkFlagRequired("source") actionExecuteCmd.MarkFlagRequired("source")
actionExecuteCmd.PersistentFlags().StringVarP(&actionExecuteItem, "item", "i", "", "Item to run action on") actionExecuteCmd.PersistentFlags().StringP("item", "i", "", "Item to run action on")
actionExecuteCmd.MarkFlagRequired("item") actionExecuteCmd.MarkFlagRequired("item")
actionExecuteCmd.PersistentFlags().StringVarP(&actionExecuteAction, "action", "a", "", "Action to run") actionExecuteCmd.PersistentFlags().StringP("action", "a", "", "Action to run")
actionExecuteCmd.MarkFlagRequired("action") actionExecuteCmd.MarkFlagRequired("action")
actionExecuteCmd.Flags().StringVarP(&actionExecuteFormat, "format", "f", "headlines", "Feed format for returned items") actionExecuteCmd.Flags().StringP("format", "f", "headlines", "Feed format for returned items")
actionExecuteCmd.Flags().BoolVar(&actionExecuteDryRun, "dry-run", false, "Instead of updating the item, print it") actionExecuteCmd.Flags().Bool("dry-run", false, "Instead of updating the item, print it")
actionExecuteCmd.Flags().BoolVar(&actionExecuteDiff, "diff", false, "Show which fields of the item changed") actionExecuteCmd.Flags().Bool("diff", false, "Show which fields of the item changed")
actionExecuteCmd.Flags().BoolVar(&actionExecuteForce, "force", false, "Execute the action even if the item does not support it") actionExecuteCmd.Flags().Bool("force", false, "Execute the action even if the item does not support it")
} }
func actionExecute() { func actionExecute(
formatter := formatAs(actionExecuteFormat) source string,
action string,
itemId string,
format string,
dryRun bool,
diff bool,
force bool,
) {
formatter := formatAs(format)
if actionExecuteSource == "" { if source == "" {
log.Fatal("error: --source is empty") log.Fatal("error: --source is empty")
} }
if actionExecuteAction == "" { if action == "" {
log.Fatal("error: --action is empty") log.Fatal("error: --action is empty")
} }
if actionExecuteItem == "" { if itemId == "" {
log.Fatal("error: --item is empty") log.Fatal("error: --item is empty")
} }
db := openAndMigrateDb() db := openAndMigrateDb()
item, err := core.GetItem(db, actionExecuteSource, actionExecuteItem) state, err := core.GetState(db, source)
if err != nil {
log.Fatalf("error: failed to load state for %s: %v", source, err)
}
envs, err := core.GetEnvs(db, source)
if err != nil {
log.Fatalf("error: failed to get envs for %s: %v", source, err)
}
item, err := core.GetItem(db, source, itemId)
if err != nil { if err != nil {
log.Fatalf("error: failed to get item: %v", err) log.Fatalf("error: failed to get item: %v", err)
} }
if item.Action[actionExecuteAction] == nil { if item.Action[action] == nil {
if actionExecuteForce { if force {
log.Printf("warning: force-executing %s on %s/%s", actionExecuteAction, actionExecuteSource, actionExecuteItem) log.Printf("warning: force-executing %s on %s/%s", action, source, itemId)
} else { } else {
log.Fatalf("error: %s/%s does not support %s", actionExecuteSource, actionExecuteItem, actionExecuteAction) log.Fatalf("error: %s/%s does not support %s", source, itemId, action)
} }
} }
argv, err := core.GetArgvForAction(db, actionExecuteSource, actionExecuteAction) argv, err := core.GetArgvForAction(db, source, action)
if err != nil { if err != nil {
log.Fatalf("error: failed to get action: %v", err) log.Fatalf("error: failed to get action: %v", err)
} }
itemJson, err := json.Marshal(item) newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute)
if err != nil { if err != nil {
log.Fatalf("error: failed to serialize item: %v", err) log.Fatalf("error executing %s: %v", action, err)
} }
res, err := core.Execute(actionExecuteSource, argv, nil, string(itemJson), time.Minute) if diff {
if err != nil {
log.Fatalf("error: failed to execute action: %v", err)
}
if len(res) != 1 {
log.Fatalf("error: expected action to produce exactly one item, got %d", len(res))
}
newItem := res[0]
core.BackfillItem(&newItem, &item)
if actionExecuteDiff {
if item.Title != newItem.Title { if item.Title != newItem.Title {
log.Printf("title: %s => %s", item.Title, newItem.Title) log.Printf("title: %s => %s", item.Title, newItem.Title)
} }
@ -125,14 +133,25 @@ func actionExecute() {
if core.ItemsAreEqual(item, newItem) { if core.ItemsAreEqual(item, newItem) {
log.Printf("no changes\n") log.Printf("no changes\n")
} }
if !slices.Equal(state, newState) {
log.Printf("state changed (%d => %d bytes)", len(state), len(newState))
}
} }
if actionExecuteDryRun { if dryRun {
fmt.Println(formatter(res[0])) fmt.Println(formatter(newItem))
return return
} }
if err = core.UpdateItems(db, []core.Item{newItem}); err != nil { if err = db.Transact(func(tx core.DB) error {
log.Fatalf("error: failed to update item: %v", err) if _err := core.UpdateItems(tx, []core.Item{newItem}); err != nil {
return fmt.Errorf("failed to update item: %v", _err)
}
if _err := core.SetState(tx, source, newState); err != nil {
return fmt.Errorf("failed to set state for %s: %v", source, _err)
}
return nil
}); err != nil {
log.Fatalf("error: %v", err)
} }
} }

View File

@ -16,41 +16,38 @@ var actionListCmd = &cobra.Command{
Long: `List actions on a source. Long: `List actions on a source.
`, `,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
actionList() actionList(stringArg(cmd, "source"), boolArg(cmd, "argv"))
}, },
} }
var actionListSource string
var actionListArgv bool
func init() { func init() {
actionCmd.AddCommand(actionListCmd) actionCmd.AddCommand(actionListCmd)
actionListCmd.Flags().StringVarP(&actionListSource, "source", "s", "", "Source to list actions") actionListCmd.Flags().StringP("source", "s", "", "Source to list actions")
actionListCmd.MarkFlagRequired("source") actionListCmd.MarkFlagRequired("source")
actionListCmd.Flags().BoolVarP(&actionListArgv, "argv", "a", false, "Include action command") actionListCmd.Flags().BoolP("argv", "a", false, "Include action command")
} }
func actionList() { func actionList(source string, argv bool) {
if actionListSource == "" { if source == "" {
log.Fatal("error: --source is empty") log.Fatal("error: --source is empty")
} }
db := openAndMigrateDb() db := openAndMigrateDb()
actions, err := core.GetActionsForSource(db, actionListSource) actions, err := core.GetActionsForSource(db, source)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
slices.SortFunc(actions, actionSort) slices.SortFunc(actions, actionSort)
if actionListArgv { if argv {
actionArgv := make(map[string][]string) actionArgv := make(map[string][]string)
for _, name := range actions { for _, name := range actions {
argv, err := core.GetArgvForAction(db, actionListSource, name) argv, err := core.GetArgvForAction(db, source, name)
if err != nil { if err != nil {
log.Fatalf("error: could not get argv for source %s action %s: %v", actionListSource, name, err) log.Fatalf("error: could not get argv for source %s action %s: %v", source, name, err)
} }
actionArgv[name] = argv actionArgv[name] = argv
} }

53
cmd/args.go Normal file
View File

@ -0,0 +1,53 @@
package cmd
import (
"log"
"github.com/spf13/cobra"
)
// Get the value of a bool flag.
func boolArg(cmd *cobra.Command, name string) bool {
b, err := cmd.Flags().GetBool(name)
if err != nil {
log.Fatal(err)
}
return b
}
// Get the value of an int flag.
func intArg(cmd *cobra.Command, name string) int {
i, err := cmd.Flags().GetInt(name)
if err != nil {
log.Fatal(err)
}
return i
}
// Get the value of a string flag.
func stringArg(cmd *cobra.Command, name string) string {
str, err := cmd.Flags().GetString(name)
if err != nil {
log.Fatal(err)
}
return str
}
// Get the value of a string array flag.
func stringArrayArg(cmd *cobra.Command, name string) []string {
s, err := cmd.Flags().GetStringArray(name)
if err != nil {
log.Fatal(err)
}
return s
}
// Get the argv after the -- separator.
func getArgv(cmd *cobra.Command, args []string) []string {
lenAtDash := cmd.Flags().ArgsLenAtDash()
if lenAtDash == -1 {
return nil
} else {
return args[lenAtDash:]
}
}

View File

@ -16,45 +16,50 @@ The default format is "headlines".
%s`, makeFormatHelpText()), %s`, makeFormatHelpText()),
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
feed() feed(
stringArg(cmd, "format"),
stringArg(cmd, "source"),
stringArg(cmd, "channel"),
boolArg(cmd, "all"),
)
}, },
} }
var feedFormat string
var feedSource string
var feedChannel string
var feedShowInactive bool
func init() { func init() {
rootCmd.AddCommand(feedCmd) rootCmd.AddCommand(feedCmd)
feedCmd.Flags().StringVarP(&feedFormat, "format", "f", "headlines", "Feed format") feedCmd.Flags().StringP("format", "f", "headlines", "Feed format")
feedCmd.Flags().StringVarP(&feedSource, "source", "s", "", "Limit to items from source") feedCmd.Flags().StringP("source", "s", "", "Limit to items from source")
feedCmd.Flags().StringVarP(&feedChannel, "channel", "c", "", "Limit to items from channel") feedCmd.Flags().StringP("channel", "c", "", "Limit to items from channel")
feedCmd.MarkFlagsMutuallyExclusive("source", "channel") feedCmd.MarkFlagsMutuallyExclusive("source", "channel")
feedCmd.Flags().BoolVar(&feedShowInactive, "all", false, "Show inactive items") feedCmd.Flags().Bool("all", false, "Show inactive items")
} }
func feed() { func feed(
formatter := formatAs(feedFormat) format string,
source string,
channel string,
showInactive bool,
) {
formatter := formatAs(format)
db := openAndMigrateDb() db := openAndMigrateDb()
var items []core.Item var items []core.Item
var err error var err error
if feedSource != "" { if source != "" {
if feedShowInactive { if showInactive {
items, err = core.GetAllItemsForSource(db, feedSource) items, err = core.GetAllItemsForSource(db, source)
} else { } else {
items, err = core.GetActiveItemsForSource(db, feedSource) items, err = core.GetActiveItemsForSource(db, source)
} }
if err != nil { if err != nil {
log.Fatalf("error: failed to fetch items from %s:, %v", feedSource, err) log.Fatalf("error: failed to fetch items from %s:, %v", source, err)
} }
} else if feedChannel != "" { } else if channel != "" {
log.Fatal("error: unimplemented") log.Fatal("error: unimplemented")
} else { } else {
if feedShowInactive { if showInactive {
items, err = core.GetAllItems(db) items, err = core.GetAllItems(db)
} else { } else {
items, err = core.GetAllActiveItems(db) items, err = core.GetAllActiveItems(db)

View File

@ -19,49 +19,58 @@ var itemAddCmd = &cobra.Command{
By default, the item is created in the "default" source, which is created By default, the item is created in the "default" source, which is created
if it doesn't exist, with a random id.`, if it doesn't exist, with a random id.`,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
itemAdd() itemAdd(
stringArg(cmd, "source"),
stringArg(cmd, "id"),
stringArg(cmd, "title"),
stringArg(cmd, "author"),
stringArg(cmd, "body"),
stringArg(cmd, "link"),
intArg(cmd, "time"),
stringArg(cmd, "action"),
)
}, },
} }
var addItemSource string
var addItemId string
var addItemTitle string
var addItemAuthor string
var addItemBody string
var addItemLink string
var addItemTime int
var addItemActions string
func init() { func init() {
itemCmd.AddCommand(itemAddCmd) itemCmd.AddCommand(itemAddCmd)
itemAddCmd.Flags().StringVarP(&addItemSource, "source", "s", "", "Source in which to create the item (default: default)") itemAddCmd.Flags().StringP("source", "s", "", "Source in which to create the item (default: default)")
itemAddCmd.Flags().StringVarP(&addItemId, "id", "i", "", "Item id (default: random hex)") itemAddCmd.Flags().StringP("id", "i", "", "Item id (default: random hex)")
itemAddCmd.Flags().StringVarP(&addItemTitle, "title", "t", "", "Item title") itemAddCmd.Flags().StringP("title", "t", "", "Item title")
itemAddCmd.Flags().StringVarP(&addItemAuthor, "author", "a", "", "Item author") itemAddCmd.Flags().StringP("author", "a", "", "Item author")
itemAddCmd.Flags().StringVarP(&addItemBody, "body", "b", "", "Item body") itemAddCmd.Flags().StringP("body", "b", "", "Item body")
itemAddCmd.Flags().StringVarP(&addItemLink, "link", "l", "", "Item link") itemAddCmd.Flags().StringP("link", "l", "", "Item link")
itemAddCmd.Flags().IntVarP(&addItemTime, "time", "m", 0, "Item time as a Unix timestamp") itemAddCmd.Flags().IntP("time", "m", 0, "Item time as a Unix timestamp")
itemAddCmd.Flags().StringVarP(&addItemActions, "action", "x", "", "Item time as a Unix timestamp") itemAddCmd.Flags().StringP("action", "x", "", "Item action data as JSON")
} }
func itemAdd() { func itemAdd(
source string,
id string,
title string,
author string,
body string,
link string,
time int,
actions string,
) {
// Default to "default" source // Default to "default" source
if addItemSource == "" { if source == "" {
addItemSource = "default" source = "default"
} }
// Default id to random hex string // Default id to random hex string
if addItemId == "" { if id == "" {
bytes := make([]byte, 16) bytes := make([]byte, 16)
if _, err := rand.Read(bytes); err != nil { if _, err := rand.Read(bytes); err != nil {
log.Fatalf("error: failed to generate id: %v", err) log.Fatalf("error: failed to generate id: %v", err)
} }
addItemId = hex.EncodeToString(bytes) id = hex.EncodeToString(bytes)
} }
var actions core.Actions var itemActions core.Actions
if addItemActions != "" { if actions != "" {
if err := json.Unmarshal([]byte(addItemActions), &actions); err != nil { if err := json.Unmarshal([]byte(actions), &itemActions); err != nil {
log.Fatalf("error: could not parse actions: %v", err) log.Fatalf("error: could not parse actions: %v", err)
} }
} }
@ -69,17 +78,17 @@ func itemAdd() {
db := openAndMigrateDb() db := openAndMigrateDb()
if err := core.AddItems(db, []core.Item{{ if err := core.AddItems(db, []core.Item{{
Source: addItemSource, Source: source,
Id: addItemId, Id: id,
Title: addItemTitle, Title: title,
Author: addItemAuthor, Author: author,
Body: addItemBody, Body: body,
Link: addItemLink, Link: link,
Time: addItemTime, Time: time,
Action: actions, Action: itemActions,
}}); err != nil { }}); err != nil {
log.Fatalf("error: failed to add item: %s", err) log.Fatalf("error: failed to add item: %s", err)
} }
log.Printf("Added %s/%s\n", addItemSource, addItemId) log.Printf("Added %s/%s\n", source, id)
} }

View File

@ -17,30 +17,27 @@ var itemDeactivateCmd = &cobra.Command{
Deactivation is idempotent.`, Deactivation is idempotent.`,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
itemDeactivate() itemDeactivate(stringArg(cmd, "source"), stringArg(cmd, "item"))
}, },
} }
var deacSource string
var deacItem string
func init() { func init() {
itemCmd.AddCommand(itemDeactivateCmd) itemCmd.AddCommand(itemDeactivateCmd)
itemDeactivateCmd.Flags().StringVarP(&deacSource, "source", "s", "", "Source of the item") itemDeactivateCmd.Flags().StringP("source", "s", "", "Source of the item")
itemDeactivateCmd.MarkFlagRequired("source") itemDeactivateCmd.MarkFlagRequired("source")
itemDeactivateCmd.Flags().StringVarP(&deacItem, "item", "i", "", "Item id") itemDeactivateCmd.Flags().StringP("item", "i", "", "Item id")
itemDeactivateCmd.MarkFlagRequired("item") itemDeactivateCmd.MarkFlagRequired("item")
} }
func itemDeactivate() { func itemDeactivate(source string, item string) {
db := openAndMigrateDb() db := openAndMigrateDb()
active, err := core.DeactivateItem(db, deacSource, deacItem) active, err := core.DeactivateItem(db, source, item)
if err != nil { if err != nil {
log.Fatalf("Failed to deactivate item: %s", err) log.Fatalf("Failed to deactivate item: %s", err)
} }
if active { if active {
fmt.Printf("Deactivated %s/%s\n", deacSource, deacItem) fmt.Printf("Deactivated %s/%s\n", source, item)
} }
} }

View File

@ -16,23 +16,21 @@ var migrateCmd = &cobra.Command{
Note that the database will be created if it does not exist, even with --list.`, Note that the database will be created if it does not exist, even with --list.`,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
migrate() migrate(boolArg(cmd, "list"))
}, },
} }
var migrateListOnly bool
func init() { func init() {
rootCmd.AddCommand(migrateCmd) rootCmd.AddCommand(migrateCmd)
migrateCmd.Flags().BoolVarP(&migrateListOnly, "list", "l", false, "Show the list of migrations") migrateCmd.Flags().BoolP("list", "l", false, "Show the list of migrations")
} }
func migrate() { func migrate(listOnly bool) {
db := openDb() db := openDb()
core.InitDatabase(db) core.InitDatabase(db)
if migrateListOnly { if listOnly {
pending, err := core.GetPendingMigrations(db) pending, err := core.GetPendingMigrations(db)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)

View File

@ -51,7 +51,7 @@ func getDbPath() string {
} }
// Attempt to open the specified database and exit with an error if it fails. // Attempt to open the specified database and exit with an error if it fails.
func openDb() *core.DB { func openDb() core.DB {
dbPath := getDbPath() dbPath := getDbPath()
db, err := core.OpenDb(dbPath) db, err := core.OpenDb(dbPath)
if err != nil { if err != nil {
@ -61,7 +61,7 @@ func openDb() *core.DB {
} }
// Attempt to open and migrate the specified database and exit with an error if it fails. // Attempt to open and migrate the specified database and exit with an error if it fails.
func openAndMigrateDb() *core.DB { func openAndMigrateDb() core.DB {
db := openDb() db := openDb()
if err := core.InitDatabase(db); err != nil { if err := core.InitDatabase(db); err != nil {
log.Fatalf("error: failed to init database: %v", err) log.Fatalf("error: failed to init database: %v", err)
@ -72,15 +72,6 @@ func openAndMigrateDb() *core.DB {
return db return db
} }
func getArgv(cmd *cobra.Command, args []string) []string {
lenAtDash := cmd.Flags().ArgsLenAtDash()
if lenAtDash == -1 {
return nil
} else {
return args[lenAtDash:]
}
}
// Sort "fetch" action ahead of other actions // Sort "fetch" action ahead of other actions
func actionSort(a string, b string) int { func actionSort(a string, b string) int {
if a == "fetch" { if a == "fetch" {

View File

@ -11,21 +11,18 @@ var serveCmd = &cobra.Command{
Long: `Serve the intake web interface. Long: `Serve the intake web interface.
`, `,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
serve() serve(stringArg(cmd, "addr"), stringArg(cmd, "port"))
}, },
} }
var serveAddr string
var servePort string
func init() { func init() {
rootCmd.AddCommand(serveCmd) rootCmd.AddCommand(serveCmd)
serveCmd.Flags().StringVarP(&serveAddr, "addr", "a", "localhost", "Address to bind to") serveCmd.Flags().StringP("addr", "a", "localhost", "Address to bind to")
serveCmd.Flags().StringVarP(&servePort, "port", "p", "8081", "Port to bind to") serveCmd.Flags().StringP("port", "p", "8081", "Port to bind to")
} }
func serve() { func serve(addr string, port string) {
db := openAndMigrateDb() db := openAndMigrateDb()
web.RunServer(db, serveAddr, servePort) web.RunServer(db, addr, port)
} }

View File

@ -13,29 +13,27 @@ var sourceAddCmd = &cobra.Command{
Long: `Create a source. Long: `Create a source.
`, `,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
sourceAdd() sourceAdd(stringArg(cmd, "source"))
}, },
} }
var sourceAddSource string
func init() { func init() {
sourceCmd.AddCommand(sourceAddCmd) sourceCmd.AddCommand(sourceAddCmd)
sourceAddCmd.Flags().StringVarP(&sourceAddSource, "source", "s", "", "Source name") sourceAddCmd.Flags().StringP("source", "s", "", "Source name")
sourceAddCmd.MarkFlagRequired("source") sourceAddCmd.MarkFlagRequired("source")
} }
func sourceAdd() { func sourceAdd(source string) {
if sourceAddSource == "" { if source == "" {
log.Fatal("error: --source is empty") log.Fatal("error: --source is empty")
} }
db := openAndMigrateDb() db := openAndMigrateDb()
if err := core.AddSource(db, sourceAddSource); err != nil { if err := core.AddSource(db, source); err != nil {
log.Fatalf("error: failed to add source: %v", err) log.Fatalf("error: failed to add source: %v", err)
} }
log.Printf("Added source %s", sourceAddSource) log.Printf("Added source %s", source)
} }

View File

@ -14,28 +14,26 @@ var sourceDeleteCmd = &cobra.Command{
Long: `Delete a source. Long: `Delete a source.
`, `,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
sourceDelete() sourceDelete(stringArg(cmd, "source"))
}, },
} }
var sourceDeleteSource string
func init() { func init() {
sourceCmd.AddCommand(sourceDeleteCmd) sourceCmd.AddCommand(sourceDeleteCmd)
sourceDeleteCmd.Flags().StringVarP(&sourceDeleteSource, "source", "s", "", "Source to delete") sourceDeleteCmd.Flags().StringP("source", "s", "", "Source to delete")
} }
func sourceDelete() { func sourceDelete(source string) {
if sourceDeleteSource == "" { if source == "" {
log.Fatal("error: --source is empty") log.Fatal("error: --source is empty")
} }
db := openAndMigrateDb() db := openAndMigrateDb()
if err := core.DeleteSource(db, sourceDeleteSource); err != nil { if err := core.DeleteSource(db, source); err != nil {
log.Fatalf("error: failed to delete source: %v", err) log.Fatalf("error: failed to delete source: %v", err)
} }
log.Printf("Deleted source %s", sourceDeleteSource) log.Printf("Deleted source %s", source)
} }

View File

@ -1,21 +0,0 @@
package cmd
import (
"log"
"github.com/spf13/cobra"
)
var sourceEditCmd = &cobra.Command{
Use: "edit",
Short: "Edit a source",
Long: `
`,
Run: func(cmd *cobra.Command, args []string) {
log.Fatal("not implemented")
},
}
func init() {
sourceCmd.AddCommand(sourceEditCmd)
}

53
cmd/sourceEnv.go Normal file
View File

@ -0,0 +1,53 @@
package cmd
import (
"fmt"
"log"
"github.com/Jaculabilis/intake/core"
"github.com/spf13/cobra"
)
var sourceEnvCmd = &cobra.Command{
Use: "env",
Short: "Manage source environment variables",
Long: `Add, edit, list, or delete environment variables.
When --set is not specified, list the environment for the source.
--set KEY=VALUE will add or edit an environment variable to be set in all
action executions.
--set KEY= will delete the environment variable from the source.
`,
Run: func(cmd *cobra.Command, args []string) {
sourceEnv(stringArg(cmd, "source"), stringArrayArg(cmd, "set"))
},
}
func init() {
sourceCmd.AddCommand(sourceEnvCmd)
sourceEnvCmd.Flags().StringP("source", "s", "", "Source to edit")
sourceEnvCmd.MarkFlagRequired("source")
sourceEnvCmd.Flags().StringArray("set", nil, "Set or modify environment variable")
}
func sourceEnv(source string, env []string) {
db := openAndMigrateDb()
if len(env) == 0 {
envs, err := core.GetEnvs(db, source)
if err != nil {
log.Fatalf("failed to get envs: %v", err)
}
for _, env := range envs {
fmt.Println(env)
}
}
if err := core.SetEnvs(db, source, env); err != nil {
log.Fatalf("failed to set envs: %v", err)
}
}

View File

@ -3,6 +3,7 @@ package cmd
import ( import (
"fmt" "fmt"
"log" "log"
"slices"
"time" "time"
"github.com/Jaculabilis/intake/core" "github.com/Jaculabilis/intake/core"
@ -23,50 +24,59 @@ the source will not be updated with the fetch result.
%s`, makeFormatHelpText()), %s`, makeFormatHelpText()),
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
sourceFetch() sourceFetch(stringArg(cmd, "source"), stringArg(cmd, "format"), boolArg(cmd, "dry-run"))
}, },
} }
var sourceFetchSource string
var sourceFetchFormat string
var sourceFetchDryRun bool
func init() { func init() {
sourceCmd.AddCommand(sourceFetchCmd) sourceCmd.AddCommand(sourceFetchCmd)
sourceFetchCmd.Flags().StringVarP(&sourceFetchSource, "source", "s", "", "Source name to fetch (required)") sourceFetchCmd.Flags().StringP("source", "s", "", "Source name to fetch (required)")
sourceFetchCmd.MarkFlagRequired("source") sourceFetchCmd.MarkFlagRequired("source")
sourceFetchCmd.Flags().StringVarP(&sourceFetchFormat, "format", "f", "headlines", "Feed format for returned items.") sourceFetchCmd.Flags().StringP("format", "f", "headlines", "Feed format for returned items.")
sourceFetchCmd.Flags().BoolVar(&sourceFetchDryRun, "dry-run", false, "Instead of updating the source, print the fetched items") sourceFetchCmd.Flags().Bool("dry-run", false, "Instead of updating the source, print the fetched items")
} }
func sourceFetch() { func sourceFetch(source string, format string, dryRun bool) {
formatter := formatAs(sourceFetchFormat) formatter := formatAs(format)
db := openAndMigrateDb() db := openAndMigrateDb()
argv, err := core.GetArgvForAction(db, sourceFetchSource, "fetch") state, err := core.GetState(db, source)
if err != nil {
log.Fatalf("error: failed to load state for %s: %v", source, err)
}
envs, err := core.GetEnvs(db, source)
if err != nil {
log.Fatalf("error: failed to get envs for %s: %v", source, err)
}
argv, err := core.GetArgvForAction(db, source, "fetch")
if err != nil { if err != nil {
log.Fatalf("error: failed to get fetch action: %v", err) log.Fatalf("error: failed to get fetch action: %v", err)
} }
items, err := core.Execute(sourceFetchSource, argv, nil, "", time.Minute) items, newState, err := core.Execute(source, argv, envs, state, "", time.Minute)
if err != nil { if err != nil {
log.Fatalf("error: failed to execute fetch: %v", err) log.Fatalf("error: failed to execute fetch: %v", err)
} }
if sourceFetchDryRun { if dryRun {
log.Printf("Fetch returned %d items", len(items)) log.Printf("Fetch returned %d items", len(items))
if !slices.Equal(state, newState) {
log.Printf("State update (%d => %d bytes)", len(state), len(newState))
}
for _, item := range items { for _, item := range items {
fmt.Println(formatter(item)) fmt.Println(formatter(item))
} }
return return
} }
added, deleted, err := core.UpdateWithFetchedItems(db, sourceFetchSource, items) added, deleted, err := core.UpdateWithFetchedItems(db, source, newState, items)
if err != nil { if err != nil {
log.Fatalf("error: failed to update: %v", err) log.Fatalf("error: failed to update: %v", err)
} }
log.Printf("%s added %d items, updated %d items, and deleted %d items", sourceFetchSource, added, len(items)-added, deleted) log.Printf("%s added %d items, updated %d items, and deleted %d items", source, added, len(items)-added, deleted)
} }

View File

@ -16,19 +16,17 @@ var sourceListCmd = &cobra.Command{
Long: `Print the list of sources. Long: `Print the list of sources.
`, `,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
sourceList() sourceList(boolArg(cmd, "actions"))
}, },
} }
var sourceListShowActions bool
func init() { func init() {
sourceCmd.AddCommand(sourceListCmd) sourceCmd.AddCommand(sourceListCmd)
sourceListCmd.Flags().BoolVarP(&sourceListShowActions, "actions", "a", false, "Include source actions") sourceListCmd.Flags().BoolP("actions", "a", false, "Include source actions")
} }
func sourceList() { func sourceList(showActions bool) {
db := openAndMigrateDb() db := openAndMigrateDb()
names, err := core.GetSources(db) names, err := core.GetSources(db)
@ -37,7 +35,7 @@ func sourceList() {
} }
slices.Sort(names) slices.Sort(names)
if sourceListShowActions { if showActions {
sourceActions := make(map[string][]string) sourceActions := make(map[string][]string)
for _, name := range names { for _, name := range names {
actions, err := core.GetActionsForSource(db, name) actions, err := core.GetActionsForSource(db, name)

View File

@ -16,30 +16,23 @@ var sourceTestCmd = &cobra.Command{
%s`, makeFormatHelpText()), %s`, makeFormatHelpText()),
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
l := cmd.Flags().ArgsLenAtDash() sourceTest(stringArrayArg(cmd, "env"), stringArg(cmd, "format"), getArgv(cmd, args))
if l == -1 {
sourceTest(nil)
} else {
sourceTest(args[l:])
}
}, },
} }
var sourceTestEnv []string
var sourceTestFormat string
func init() { func init() {
sourceCmd.AddCommand(sourceTestCmd) sourceCmd.AddCommand(sourceTestCmd)
sourceTestCmd.Flags().StringArrayVarP(&sourceTestEnv, "env", "e", nil, "Environment variables to set, in the form KEY=VAL") sourceTestCmd.Flags().StringArrayP("env", "e", nil, "Environment variables to set, in the form KEY=VAL")
sourceTestCmd.Flags().StringVarP(&sourceTestFormat, "format", "f", "headlines", "Feed format for returned items.") sourceTestCmd.Flags().StringP("format", "f", "headlines", "Feed format for returned items.")
} }
func sourceTest(cmd []string) { func sourceTest(env []string, format string, cmd []string) {
formatter := formatAs(sourceTestFormat) formatter := formatAs(format)
items, err := core.Execute("", cmd, sourceTestEnv, "", time.Minute) items, state, err := core.Execute("test", cmd, env, nil, "", time.Minute)
log.Printf("Returned %d items", len(items)) log.Printf("returned %d items", len(items))
log.Printf("wrote %d bytes of state", len(state))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@ -6,6 +6,7 @@ import (
"database/sql/driver" "database/sql/driver"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"io" "io"
"log" "log"
"os" "os"
@ -25,7 +26,7 @@ func (a *argList) Scan(value interface{}) error {
return json.Unmarshal([]byte(value.(string)), a) return json.Unmarshal([]byte(value.(string)), a)
} }
func AddAction(db *DB, source string, name string, argv []string) error { func AddAction(db DB, source string, name string, argv []string) error {
_, err := db.Exec(` _, err := db.Exec(`
insert into actions (source, name, argv) insert into actions (source, name, argv)
values (?, ?, jsonb(?)) values (?, ?, jsonb(?))
@ -33,7 +34,7 @@ func AddAction(db *DB, source string, name string, argv []string) error {
return err return err
} }
func UpdateAction(db *DB, source string, name string, argv []string) error { func UpdateAction(db DB, source string, name string, argv []string) error {
_, err := db.Exec(` _, err := db.Exec(`
update actions update actions
set argv = jsonb(?) set argv = jsonb(?)
@ -42,7 +43,7 @@ func UpdateAction(db *DB, source string, name string, argv []string) error {
return err return err
} }
func GetActionsForSource(db *DB, source string) ([]string, error) { func GetActionsForSource(db DB, source string) ([]string, error) {
rows, err := db.Query(` rows, err := db.Query(`
select name select name
from actions from actions
@ -63,7 +64,7 @@ func GetActionsForSource(db *DB, source string) ([]string, error) {
return names, nil return names, nil
} }
func GetArgvForAction(db *DB, source string, name string) ([]string, error) { func GetArgvForAction(db DB, source string, name string) ([]string, error) {
rows := db.QueryRow(` rows := db.QueryRow(`
select json(argv) select json(argv)
from actions from actions
@ -77,7 +78,7 @@ func GetArgvForAction(db *DB, source string, name string) ([]string, error) {
return argv, nil return argv, nil
} }
func DeleteAction(db *DB, source string, name string) error { func DeleteAction(db DB, source string, name string) error {
_, err := db.Exec(` _, err := db.Exec(`
delete from actions delete from actions
where source = ? and name = ? where source = ? and name = ?
@ -126,19 +127,35 @@ func Execute(
source string, source string,
argv []string, argv []string,
env []string, env []string,
state []byte,
input string, input string,
timeout time.Duration, timeout time.Duration,
) ([]Item, error) { ) ([]Item, []byte, error) {
log.Printf("Executing %v", argv) log.Printf("executing %v", argv)
if len(argv) == 0 { if len(argv) == 0 {
return nil, errors.New("empty argv") return nil, nil, errors.New("empty argv")
} }
if source == "" { if source == "" {
return nil, errors.New("empty source") return nil, nil, errors.New("empty source")
} }
env = append(env, "STATE_PATH=") stateFile, err := os.CreateTemp("", "intake_state_*")
if err != nil {
return nil, nil, fmt.Errorf("error: failed to create temp state file: %v", err)
}
defer func() {
if err := os.Remove(stateFile.Name()); err != nil {
log.Printf("error: failed to delete %s", stateFile.Name())
}
}()
_, err = stateFile.Write(state)
if err != nil {
return nil, nil, fmt.Errorf("error: failed to write state file: %v", err)
}
env = append(env, "STATE_PATH="+stateFile.Name())
ctx, cancel := context.WithTimeout(context.Background(), timeout) ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel() defer cancel()
@ -149,15 +166,15 @@ func Execute(
// Open pipes to the command // Open pipes to the command
stdin, err := cmd.StdinPipe() stdin, err := cmd.StdinPipe()
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
stdout, err := cmd.StdoutPipe() stdout, err := cmd.StdoutPipe()
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
stderr, err := cmd.StderrPipe() stderr, err := cmd.StderrPipe()
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
cout := make(chan Item) cout := make(chan Item)
@ -180,7 +197,7 @@ func Execute(
// Kick off the command // Kick off the command
err = cmd.Start() err = cmd.Start()
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
// Block until std{out,err} close // Block until std{out,err} close
@ -190,19 +207,51 @@ func Execute(
err = cmd.Wait() err = cmd.Wait()
if ctx.Err() == context.DeadlineExceeded { if ctx.Err() == context.DeadlineExceeded {
log.Printf("Timed out after %v\n", timeout) log.Printf("Timed out after %v\n", timeout)
return nil, err return nil, nil, err
} else if exiterr, ok := err.(*exec.ExitError); ok { } else if exiterr, ok := err.(*exec.ExitError); ok {
log.Printf("error: %s failed with exit code %d\n", argv[0], exiterr.ExitCode()) log.Printf("error: %s failed with exit code %d\n", argv[0], exiterr.ExitCode())
return nil, err return nil, nil, err
} else if err != nil { } else if err != nil {
log.Printf("error: %s failed with error: %s\n", argv[0], err) log.Printf("error: %s failed with error: %s\n", argv[0], err)
return nil, err return nil, nil, err
} }
if parseError { if parseError {
log.Printf("error: could not parse item\n") log.Printf("error: could not parse item\n")
return nil, errors.New("invalid JSON") return nil, nil, errors.New("invalid JSON")
} }
return items, nil newState, err := os.ReadFile(stateFile.Name())
if err != nil {
return nil, nil, fmt.Errorf("error: failed to read state file: %v", err)
}
return items, newState, nil
}
// Execute an action that takes an item as input and returns the item modified.
// This is basically just a wrapper over [Execute] that handles the input and backfilling.
func ExecuteItemAction(
item Item,
argv []string,
env []string,
state []byte,
timeout time.Duration,
) (Item, []byte, error) {
itemJson, err := json.Marshal(item)
if err != nil {
return Item{}, nil, fmt.Errorf("failed to serialize item: %v", err)
}
res, newState, err := Execute(item.Source, argv, env, state, string(itemJson), timeout)
if err != nil {
return Item{}, nil, fmt.Errorf("failed to execute action for %s/%s: %v", item.Source, item.Id, err)
}
if len(res) != 1 {
return Item{}, nil, fmt.Errorf("expected action to produce exactly one item, got %d", len(res))
}
newItem := res[0]
BackfillItem(&newItem, &item)
return newItem, newState, nil
} }

View File

@ -72,7 +72,8 @@ func TestExecute(t *testing.T) {
} }
} }
execute := func(argv []string) ([]Item, error) { execute := func(argv []string) ([]Item, error) {
return Execute("_", argv, nil, "", time.Minute) item, _, err := Execute("_", argv, nil, nil, "", time.Minute)
return item, err
} }
res, err := execute([]string{"true"}) res, err := execute([]string{"true"})
@ -89,7 +90,7 @@ func TestExecute(t *testing.T) {
assertLen(res, 0) assertLen(res, 0)
// Timeout // Timeout
res, err = Execute("_", []string{"sleep", "10"}, nil, "", time.Millisecond) res, _, err = Execute("_", []string{"sleep", "10"}, nil, nil, "", time.Millisecond)
assertNotNil(err) assertNotNil(err)
assertLen(res, 0) assertLen(res, 0)
@ -102,7 +103,7 @@ func TestExecute(t *testing.T) {
} }
// Read from stdin // Read from stdin
res, err = Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, "bar", time.Minute) res, _, err = Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, nil, "bar", time.Minute)
assertNil(err) assertNil(err)
assertLen(res, 1) assertLen(res, 1)
if res[0].Id != "bar" { if res[0].Id != "bar" {
@ -110,7 +111,7 @@ func TestExecute(t *testing.T) {
} }
// Set env // Set env
res, err = Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, "", time.Minute) res, _, err = Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, nil, "", time.Minute)
assertNil(err) assertNil(err)
assertLen(res, 1) assertLen(res, 1)
if res[0].Id != "baz" { if res[0].Id != "baz" {
@ -179,4 +180,22 @@ func TestExecute(t *testing.T) {
if res[0].Action["hello"] == nil { if res[0].Action["hello"] == nil {
t.Fatal("missing hello action") t.Fatal("missing hello action")
} }
// Read state
argv := []string{"sh", "-c", `cat $STATE_PATH | jq -cR '{id: "greeting", title: .} | .title = "Hello " + .title'`}
res, _, err = Execute("_", argv, nil, []byte("world"), "", time.Minute)
assertNil(err)
assertLen(res, 1)
if res[0].Title != "Hello world" {
t.Fatalf("expected 'Hello world' from read state, got '%s'", res[0].Title)
}
// Write state
argv = []string{"sh", "-c", `printf "Hello world" > $STATE_PATH; jq -cn '{id: "test"}'`}
res, newState, err := Execute("_", argv, nil, nil, "", time.Minute)
assertNil(err)
assertLen(res, 1)
if string(newState) != "Hello world" {
t.Fatalf("expected 'Hello world' from write state, got %s", string(newState))
}
} }

View File

@ -7,24 +7,36 @@ import (
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
) )
type DB struct { type DB interface {
Query(query string, args ...any) (*sql.Rows, error)
QueryRow(query string, args ...any) *sql.Row
Exec(query string, args ...any) (sql.Result, error)
Prepare(query string) (*sql.Stmt, error)
Transact(func(DB) error) error
}
type RoRwDb struct {
ro *sql.DB ro *sql.DB
rw *sql.DB rw *sql.DB
} }
func (db *DB) Query(query string, args ...any) (*sql.Rows, error) { func (db *RoRwDb) Query(query string, args ...any) (*sql.Rows, error) {
return db.ro.Query(query, args...) return db.ro.Query(query, args...)
} }
func (db *DB) QueryRow(query string, args ...any) *sql.Row { func (db *RoRwDb) QueryRow(query string, args ...any) *sql.Row {
return db.ro.QueryRow(query, args...) return db.ro.QueryRow(query, args...)
} }
func (db *DB) Exec(query string, args ...any) (sql.Result, error) { func (db *RoRwDb) Exec(query string, args ...any) (sql.Result, error) {
return db.rw.Exec(query, args...) return db.rw.Exec(query, args...)
} }
func (db *DB) Transact(transaction func(*sql.Tx) error) error { func (db *RoRwDb) Prepare(query string) (*sql.Stmt, error) {
return db.rw.Prepare(query)
}
func (db *RoRwDb) Transact(transaction func(DB) error) error {
tx, err := db.rw.Begin() tx, err := db.rw.Begin()
if err != nil { if err != nil {
return err return err
@ -34,7 +46,7 @@ func (db *DB) Transact(transaction func(*sql.Tx) error) error {
if err != nil { if err != nil {
return err return err
} }
if err = transaction(tx); err != nil { if err = transaction(&TxDb{tx}); err != nil {
return err return err
} }
if err = tx.Commit(); err != nil { if err = tx.Commit(); err != nil {
@ -43,6 +55,30 @@ func (db *DB) Transact(transaction func(*sql.Tx) error) error {
return nil return nil
} }
type TxDb struct {
*sql.Tx
}
func (tx *TxDb) Query(query string, args ...any) (*sql.Rows, error) {
return tx.Tx.Query(query, args...)
}
func (tx *TxDb) QueryRow(query string, args ...any) *sql.Row {
return tx.Tx.QueryRow(query, args...)
}
func (tx *TxDb) Exec(query string, args ...any) (sql.Result, error) {
return tx.Tx.Exec(query, args...)
}
func (tx *TxDb) Prepare(query string) (*sql.Stmt, error) {
return tx.Tx.Prepare(query)
}
func (tx *TxDb) Transact(transaction func(DB) error) error {
return transaction(tx)
}
func defaultPragma(db *sql.DB) (sql.Result, error) { func defaultPragma(db *sql.DB) (sql.Result, error) {
return db.Exec(` return db.Exec(`
pragma journal_mode = WAL; pragma journal_mode = WAL;
@ -55,7 +91,7 @@ func defaultPragma(db *sql.DB) (sql.Result, error) {
`) `)
} }
func OpenDb(dataSourceName string) (*DB, error) { func OpenDb(dataSourceName string) (DB, error) {
ro, err := sql.Open("sqlite3", dataSourceName) ro, err := sql.Open("sqlite3", dataSourceName)
if err != nil { if err != nil {
defer ro.Close() defer ro.Close()
@ -82,7 +118,7 @@ func OpenDb(dataSourceName string) (*DB, error) {
return nil, err return nil, err
} }
wrapper := new(DB) wrapper := new(RoRwDb)
wrapper.ro = ro wrapper.ro = ro
wrapper.rw = rw wrapper.rw = rw
return wrapper, nil return wrapper, nil

View File

@ -2,11 +2,87 @@ package core
import ( import (
"database/sql" "database/sql"
"errors"
"strings"
"testing" "testing"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
) )
type FailureDb struct {
db DB
queryError func(string, ...any) error
execError func(string, ...any) error
}
func (f *FailureDb) Query(query string, args ...any) (*sql.Rows, error) {
if f.queryError != nil {
if err := f.queryError(query, args...); err != nil {
return nil, err
}
}
return f.db.Query(query, args...)
}
func (f *FailureDb) QueryRow(query string, args ...any) *sql.Row {
return f.db.QueryRow(query, args...)
}
func (f *FailureDb) Exec(query string, args ...any) (sql.Result, error) {
if f.execError != nil {
if err := f.execError(query, args...); err != nil {
return nil, err
}
}
return f.db.Exec(query, args...)
}
func (f *FailureDb) Prepare(query string) (*sql.Stmt, error) {
return f.db.Prepare(query)
}
func (f *FailureDb) Transact(txFunc func(DB) error) error {
return f.db.Transact(func(tx DB) error {
ftx := FailureDb{
db: tx,
queryError: f.queryError,
execError: f.execError,
}
return txFunc(&ftx)
})
}
func TestFailureDb(t *testing.T) {
db := EphemeralDb(t)
fdb := FailureDb{
db: db,
queryError: func(q string, a ...any) error {
if strings.Contains(q, "2") {
return errors.New("oopsie")
}
return nil
},
}
if _, err := fdb.Query("select 1"); err != nil {
t.Fatal(err)
}
if _, err := fdb.Query("select 2"); err == nil {
t.Fatal("expected error")
}
if err := fdb.Transact(func(tx DB) error {
if _, err := tx.Query("select 1"); err != nil {
t.Fatal(err)
}
_, err := tx.Query("select 2")
return err
}); err == nil {
t.Fatal("expected error from inside transaction")
}
}
func TestDeleteSourceCascade(t *testing.T) { func TestDeleteSourceCascade(t *testing.T) {
db := EphemeralDb(t) db := EphemeralDb(t)
@ -55,7 +131,7 @@ func TestTransaction(t *testing.T) {
} }
// A transaction that should succeed // A transaction that should succeed
err := db.Transact(func(tx *sql.Tx) error { err := db.Transact(func(tx DB) error {
if _, err := tx.Exec("insert into planets (name) values (?)", "mercury"); err != nil { if _, err := tx.Exec("insert into planets (name) values (?)", "mercury"); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -86,7 +162,7 @@ func TestTransaction(t *testing.T) {
} }
// A transaction that should fail // A transaction that should fail
err = db.Transact(func(tx *sql.Tx) error { err = db.Transact(func(tx DB) error {
if _, err := tx.Exec("insert into planets (name) values (?)", "earth"); err != nil { if _, err := tx.Exec("insert into planets (name) values (?)", "earth"); err != nil {
t.Fatal(err) t.Fatal(err)
} }

56
core/env.go Normal file
View File

@ -0,0 +1,56 @@
package core
import (
"fmt"
"strings"
)
func GetEnvs(db DB, source string) ([]string, error) {
rows, err := db.Query(`
select name, value
from envs
where source = ?
`, source)
if err != nil {
return nil, err
}
var envs []string
for rows.Next() {
var name string
var value string
if err := rows.Scan(&name, &value); err != nil {
return nil, err
}
envs = append(envs, fmt.Sprintf("%s=%s", name, value))
}
return envs, nil
}
func SetEnvs(db DB, source string, envs []string) error {
return db.Transact(func(tx DB) error {
for _, env := range envs {
parts := strings.SplitN(env, "=", 2)
if len(parts) != 2 {
return fmt.Errorf("invalid env format: %s", env)
}
if parts[1] == "" {
_, err := tx.Exec(`
delete from envs
where source = ? and name = ?
`, source, parts[0])
if err != nil {
return fmt.Errorf("failed to clear source %s env %s: %v", source, parts[0], err)
}
} else {
_, err := tx.Exec(`
insert into envs (source, name, value)
values (?, ?, ?)
`, source, parts[0], parts[1])
if err != nil {
return fmt.Errorf("failed to set source %s env %s = %s: %v", source, parts[0], parts[1], err)
}
}
}
return nil
})
}

91
core/env_test.go Normal file
View File

@ -0,0 +1,91 @@
package core
import (
"slices"
"testing"
)
func TestEnvs(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "_"); err != nil {
t.Fatal(err)
}
// Insert env
if err := SetEnvs(db, "_", []string{"ONE=hello"}); err != nil {
t.Fatal(err)
}
envs, err := GetEnvs(db, "_")
if err != nil {
t.Fatal(err)
}
if len(envs) != 1 {
t.Fatal("expected 1 env")
}
if envs[0] != "ONE=hello" {
t.Fatalf("Expected ONE=hello, got %s", envs[0])
}
// Insert env with = in value
if err := SetEnvs(db, "_", []string{"TWO=world=true"}); err != nil {
t.Fatal(err)
}
envs, err = GetEnvs(db, "_")
if err != nil {
t.Fatal(err)
}
if len(envs) != 2 {
t.Fatal("expected 2 envs")
}
slices.Sort(envs) // ONE > TWO
if envs[1] != "TWO=world=true" {
t.Fatalf("Expected TWO=world=true, got %s", envs[1])
}
// Replace env
if err := SetEnvs(db, "_", []string{"TWO=goodbye"}); err != nil {
t.Fatal(err)
}
envs, err = GetEnvs(db, "_")
if err != nil {
t.Fatal(err)
}
if len(envs) != 2 {
t.Fatal("expected 2 envs")
}
slices.Sort(envs) // ONE > TWO
if envs[1] != "TWO=goodbye" {
t.Fatalf("Expected TWO=goodbye, got %s", envs[1])
}
// Insert is transactional on error
if err := SetEnvs(db, "_", []string{"THREE=crowd", "FOUR"}); err == nil {
t.Fatal("expected bad env insert to fail")
}
envs, err = GetEnvs(db, "_")
if err != nil {
t.Fatal(err)
}
if len(envs) != 2 {
t.Fatal("expected 2 envs after failed insert")
}
slices.Sort(envs) // ONE > TWO
if envs[0] != "ONE=hello" || envs[1] != "TWO=goodbye" {
t.Fatalf("Expected ONE=hello and TWO=goodbye, got %v", envs)
}
// Delete env
if err := SetEnvs(db, "_", []string{"ONE="}); err != nil {
t.Fatal(err)
}
envs, err = GetEnvs(db, "_")
if err != nil {
t.Fatal(err)
}
if len(envs) != 1 {
t.Fatal("expected 1 env after deletion")
}
if envs[0] != "TWO=goodbye" {
t.Fatalf("Expected TWO=goodbye, got %s", envs[0])
}
}

View File

@ -11,7 +11,7 @@ import (
var migrations embed.FS var migrations embed.FS
// Idempotently initialize the database. Safe to call unconditionally. // Idempotently initialize the database. Safe to call unconditionally.
func InitDatabase(db *DB) error { func InitDatabase(db DB) error {
rows, err := db.Query(` rows, err := db.Query(`
select exists ( select exists (
select 1 select 1
@ -41,7 +41,7 @@ func InitDatabase(db *DB) error {
} }
// Get a map of migration names to whether the migration has been applied. // Get a map of migration names to whether the migration has been applied.
func GetPendingMigrations(db *DB) (map[string]bool, error) { func GetPendingMigrations(db DB) (map[string]bool, error) {
allMigrations, err := migrations.ReadDir("sql") allMigrations, err := migrations.ReadDir("sql")
if err != nil { if err != nil {
return nil, err return nil, err
@ -69,7 +69,7 @@ func GetPendingMigrations(db *DB) (map[string]bool, error) {
} }
// Apply a migration by name. // Apply a migration by name.
func ApplyMigration(db *DB, name string) error { func ApplyMigration(db DB, name string) error {
data, err := migrations.ReadFile("sql/" + name) data, err := migrations.ReadFile("sql/" + name)
if err != nil { if err != nil {
log.Fatalf("Missing migration %s", name) log.Fatalf("Missing migration %s", name)
@ -84,7 +84,7 @@ func ApplyMigration(db *DB, name string) error {
} }
// Apply all pending migrations. // Apply all pending migrations.
func MigrateDatabase(db *DB) error { func MigrateDatabase(db DB) error {
pending, err := GetPendingMigrations(db) pending, err := GetPendingMigrations(db)
if err != nil { if err != nil {
return err return err

View File

@ -7,7 +7,7 @@ import (
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
) )
func EphemeralDb(t *testing.T) *DB { func EphemeralDb(t *testing.T) DB {
// We don't use OpenDb here because you can't open two connections to the same memory mem // We don't use OpenDb here because you can't open two connections to the same memory mem
mem, err := sql.Open("sqlite3", ":memory:") mem, err := sql.Open("sqlite3", ":memory:")
if err != nil { if err != nil {
@ -16,7 +16,7 @@ func EphemeralDb(t *testing.T) *DB {
if _, err = defaultPragma(mem); err != nil { if _, err = defaultPragma(mem); err != nil {
t.Fatal(err) t.Fatal(err)
} }
db := new(DB) db := new(RoRwDb)
db.ro = mem db.ro = mem
db.rw = mem db.rw = mem
if err = InitDatabase(db); err != nil { if err = InitDatabase(db); err != nil {
@ -33,7 +33,7 @@ func TestInitIdempotency(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
db := new(DB) db := new(RoRwDb)
db.ro = mem db.ro = mem
db.rw = mem db.rw = mem
if err = InitDatabase(db); err != nil { if err = InitDatabase(db); err != nil {

View File

@ -11,7 +11,7 @@ import (
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
) )
func AddSource(db *DB, name string) error { func AddSource(db DB, name string) error {
_, err := db.Exec(` _, err := db.Exec(`
insert into sources (name) insert into sources (name)
values (?) values (?)
@ -20,7 +20,7 @@ func AddSource(db *DB, name string) error {
return err return err
} }
func GetSources(db *DB) ([]string, error) { func GetSources(db DB) ([]string, error) {
rows, err := db.Query(` rows, err := db.Query(`
select name select name
from sources from sources
@ -39,7 +39,7 @@ func GetSources(db *DB) ([]string, error) {
return names, nil return names, nil
} }
func DeleteSource(db *DB, name string) error { func DeleteSource(db DB, name string) error {
_, err := db.Exec(` _, err := db.Exec(`
delete from sources delete from sources
where name = ? where name = ?
@ -48,8 +48,8 @@ func DeleteSource(db *DB, name string) error {
return err return err
} }
func AddItems(db *DB, items []Item) error { func AddItems(db DB, items []Item) error {
return db.Transact(func(tx *sql.Tx) error { return db.Transact(func(tx DB) error {
stmt, err := tx.Prepare(` stmt, err := tx.Prepare(`
insert into items (source, id, active, title, author, body, link, time, action) insert into items (source, id, active, title, author, body, link, time, action)
values (?, ?, ?, ?, ?, ?, ?, ?, jsonb(?)) values (?, ?, ?, ?, ?, ?, ?, ?, jsonb(?))
@ -95,8 +95,8 @@ func BackfillItem(new *Item, old *Item) {
} }
} }
func UpdateItems(db *DB, items []Item) error { func UpdateItems(db DB, items []Item) error {
return db.Transact(func(tx *sql.Tx) error { return db.Transact(func(tx DB) error {
stmt, err := tx.Prepare(` stmt, err := tx.Prepare(`
update items update items
set set
@ -127,7 +127,7 @@ func UpdateItems(db *DB, items []Item) error {
} }
// Deactivate an item, returning its previous active state. // Deactivate an item, returning its previous active state.
func DeactivateItem(db *DB, source string, id string) (bool, error) { func DeactivateItem(db DB, source string, id string) (bool, error) {
row := db.QueryRow(` row := db.QueryRow(`
select active select active
from items from items
@ -150,7 +150,7 @@ func DeactivateItem(db *DB, source string, id string) (bool, error) {
return active, nil return active, nil
} }
func DeleteItem(db *DB, source string, id string) (int64, error) { func DeleteItem(db DB, source string, id string) (int64, error) {
res, err := db.Exec(` res, err := db.Exec(`
delete from items delete from items
where source = ? where source = ?
@ -162,7 +162,7 @@ func DeleteItem(db *DB, source string, id string) (int64, error) {
return res.RowsAffected() return res.RowsAffected()
} }
func getItems(db *DB, query string, args ...any) ([]Item, error) { func getItems(db DB, query string, args ...any) ([]Item, error) {
rows, err := db.Query(query, args...) rows, err := db.Query(query, args...)
if err != nil { if err != nil {
return nil, err return nil, err
@ -179,7 +179,7 @@ func getItems(db *DB, query string, args ...any) ([]Item, error) {
return items, nil return items, nil
} }
func GetItem(db *DB, source string, id string) (Item, error) { func GetItem(db DB, source string, id string) (Item, error) {
items, err := getItems(db, ` items, err := getItems(db, `
select source, id, created, active, title, author, body, link, time, json(action) select source, id, created, active, title, author, body, link, time, json(action)
from items from items
@ -196,7 +196,7 @@ func GetItem(db *DB, source string, id string) (Item, error) {
return items[0], nil return items[0], nil
} }
func GetAllActiveItems(db *DB) ([]Item, error) { func GetAllActiveItems(db DB) ([]Item, error) {
return getItems(db, ` return getItems(db, `
select select
source, id, created, active, title, author, body, link, time, json(action) source, id, created, active, title, author, body, link, time, json(action)
@ -206,7 +206,7 @@ func GetAllActiveItems(db *DB) ([]Item, error) {
`) `)
} }
func GetAllItems(db *DB) ([]Item, error) { func GetAllItems(db DB) ([]Item, error) {
return getItems(db, ` return getItems(db, `
select select
source, id, created, active, title, author, body, link, time, json(action) source, id, created, active, title, author, body, link, time, json(action)
@ -215,7 +215,7 @@ func GetAllItems(db *DB) ([]Item, error) {
`) `)
} }
func GetActiveItemsForSource(db *DB, source string) ([]Item, error) { func GetActiveItemsForSource(db DB, source string) ([]Item, error) {
return getItems(db, ` return getItems(db, `
select select
source, id, created, active, title, author, body, link, time, json(action) source, id, created, active, title, author, body, link, time, json(action)
@ -227,7 +227,7 @@ func GetActiveItemsForSource(db *DB, source string) ([]Item, error) {
`, source) `, source)
} }
func GetAllItemsForSource(db *DB, source string) ([]Item, error) { func GetAllItemsForSource(db DB, source string) ([]Item, error) {
return getItems(db, ` return getItems(db, `
select select
source, id, created, active, title, author, body, link, time, json(action) source, id, created, active, title, author, body, link, time, json(action)
@ -238,10 +238,34 @@ func GetAllItemsForSource(db *DB, source string) ([]Item, error) {
`, source) `, source)
} }
func GetState(db DB, source string) ([]byte, error) {
row := db.QueryRow("select state from sources where name = ?", source)
var state []byte
err := row.Scan(&state)
return state, err
}
func SetState(db DB, source string, state []byte) error {
_, err := db.Exec("update sources set state = ? where name = ?", state, source)
return err
}
// Given the results of a fetch, add new items, update existing items, and delete expired items. // Given the results of a fetch, add new items, update existing items, and delete expired items.
// //
// Returns the number of new and deleted items on success. // Returns the number of new and deleted items on success.
func UpdateWithFetchedItems(db *DB, source string, items []Item) (int, int, error) { func UpdateWithFetchedItems(db DB, source string, state []byte, items []Item) (int, int, error) {
var new int
var del int
var err error
err = db.Transact(func(tx DB) error {
new, del, err = updateWithFetchedItemsTx(tx, source, state, items)
return err
})
return new, del, err
}
// Implementation logic for [UpdateWithFetchedItems], which executes this inside a transaction.
func updateWithFetchedItemsTx(db DB, source string, state []byte, items []Item) (int, int, error) {
// Get the existing items // Get the existing items
existingItems, err := GetAllItemsForSource(db, source) existingItems, err := GetAllItemsForSource(db, source)
if err != nil { if err != nil {
@ -278,26 +302,22 @@ func UpdateWithFetchedItems(db *DB, source string, items []Item) (int, int, erro
return 0, 0, err return 0, 0, err
} }
envs, err := GetEnvs(db, source)
if err != nil {
return 0, 0, fmt.Errorf("failed to get envs for %s: %v", source, err)
}
// If the source has an on-create trigger, run it for each new item // If the source has an on-create trigger, run it for each new item
// On-create errors are ignored to avoid failing the fetch // On-create errors are ignored to avoid failing the fetch
onCreateArgv, err := GetArgvForAction(db, source, "on_create") onCreateArgv, err := GetArgvForAction(db, source, "on_create")
if err == nil { if err == nil && len(onCreateArgv) > 0 {
var updatedNewItems []Item var updatedNewItems []Item
for _, item := range newItems { for _, item := range newItems {
itemJson, err := json.Marshal(item) var updatedItem Item
updatedItem, state, err = ExecuteItemAction(item, onCreateArgv, envs, state, time.Minute)
if err != nil { if err != nil {
log.Fatalf("error: failed to serialize item: %v", err) log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err)
} }
res, err := Execute(source, onCreateArgv, nil, string(itemJson), time.Minute)
if err != nil {
log.Printf("error: failed to execute on_create for %s/%s: %v", item.Source, item.Id, err)
continue
}
if len(res) != 1 {
log.Printf("error: expected on_create for %s/%s to produce exactly one item, got %d", item.Source, item.Id, len(res))
}
updatedItem := res[0]
BackfillItem(&updatedItem, &item)
updatedNewItems = append(updatedNewItems, updatedItem) updatedNewItems = append(updatedNewItems, updatedItem)
} }
UpdateItems(db, updatedNewItems) UpdateItems(db, updatedNewItems)
@ -328,5 +348,9 @@ func UpdateWithFetchedItems(db *DB, source string, items []Item) (int, int, erro
} }
} }
if err = SetState(db, source, state); err != nil {
return 0, 0, err
}
return len(newItems), len(idsToDelete), nil return len(newItems), len(idsToDelete), nil
} }

View File

@ -1,8 +1,10 @@
package core package core
import ( import (
"errors"
"fmt" "fmt"
"slices" "slices"
"strings"
"testing" "testing"
"time" "time"
@ -128,18 +130,18 @@ func TestUpdateSourceAddAndDelete(t *testing.T) {
} }
a := Item{Source: "test", Id: "a"} a := Item{Source: "test", Id: "a"}
add, del, err := UpdateWithFetchedItems(db, "test", []Item{a}) add, del, err := UpdateWithFetchedItems(db, "test", nil, []Item{a})
if add != 1 || del != 0 || err != nil { if add != 1 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
} }
add, del, err = UpdateWithFetchedItems(db, "test", []Item{a}) add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a})
if add != 0 || del != 0 || err != nil { if add != 0 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
} }
b := Item{Source: "test", Id: "b"} b := Item{Source: "test", Id: "b"}
add, del, err = UpdateWithFetchedItems(db, "test", []Item{a, b}) add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a, b})
if add != 1 || del != 0 || err != nil { if add != 1 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
} }
@ -147,22 +149,67 @@ func TestUpdateSourceAddAndDelete(t *testing.T) {
if _, err = DeactivateItem(db, "test", "a"); err != nil { if _, err = DeactivateItem(db, "test", "a"); err != nil {
t.Fatal(err) t.Fatal(err)
} }
add, del, err = UpdateWithFetchedItems(db, "test", []Item{a, b}) add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a, b})
if add != 0 || del != 0 || err != nil { if add != 0 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
} }
add, del, err = UpdateWithFetchedItems(db, "test", []Item{b}) add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{b})
if add != 0 || del != 1 || err != nil { if add != 0 || del != 1 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
} }
add, del, err = UpdateWithFetchedItems(db, "test", []Item{b}) add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{b})
if add != 0 || del != 0 || err != nil { if add != 0 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
} }
} }
func TestUpdateSourceTransaction(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "s"); err != nil {
t.Fatal(err)
}
a := Item{Source: "s", Id: "a"}
b := Item{Source: "s", Id: "b"}
// Add and deactivate a so it will be deleted on next fetch without it
if add, _, err := UpdateWithFetchedItems(db, "s", nil, []Item{a}); add != 1 || err != nil {
t.Fatalf("expected 1 add, got %d and err %v", add, err)
}
if _, err := DeactivateItem(db, "s", "a"); err != nil {
t.Fatal(err)
}
// Add b and cause a to be deleted, but the delete throws an error
fdb := &FailureDb{
db: db,
execError: func(q string, a ...any) error {
if strings.Contains(q, "delete from") {
return errors.New("no deletes!")
}
return nil
},
}
add, del, err := UpdateWithFetchedItems(fdb, "s", nil, []Item{b})
if add != 0 || del != 0 || err == nil {
t.Fatalf("expected failure, got %d %d %v", add, del, err)
}
// Failure should not add b
items, err := GetAllItemsForSource(db, "s")
if err != nil {
t.Fatal(err)
}
if len(items) != 1 {
t.Fatalf("should only have one item, got %d", len((items)))
}
if items[0].Id != "a" {
t.Fatalf("expected only item to still be a, got %s", items[0].Id)
}
}
func TestOnCreateAction(t *testing.T) { func TestOnCreateAction(t *testing.T) {
db := EphemeralDb(t) db := EphemeralDb(t)
if err := AddSource(db, "test"); err != nil { if err := AddSource(db, "test"); err != nil {
@ -173,7 +220,7 @@ func TestOnCreateAction(t *testing.T) {
} }
execute := func(argv []string) []Item { execute := func(argv []string) []Item {
items, err := Execute("test", argv, nil, "", time.Minute) items, _, err := Execute("test", argv, nil, nil, "", time.Minute)
if err != nil { if err != nil {
t.Fatal("unexpected error executing test fetch") t.Fatal("unexpected error executing test fetch")
} }
@ -200,7 +247,7 @@ func TestOnCreateAction(t *testing.T) {
// Noop on_create works // Noop on_create works
onCreate([]string{"tee"}) onCreate([]string{"tee"})
items := execute([]string{"jq", "-cn", `{id: "one"}`}) items := execute([]string{"jq", "-cn", `{id: "one"}`})
add, _, err := UpdateWithFetchedItems(db, "test", items) add, _, err := UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil { if add != 1 || err != nil {
t.Fatal("failed update with noop oncreate") t.Fatal("failed update with noop oncreate")
} }
@ -216,7 +263,7 @@ func TestOnCreateAction(t *testing.T) {
if items[0].Title != "Hello, World" { if items[0].Title != "Hello, World" {
t.Fatal("unexpected title") t.Fatal("unexpected title")
} }
add, _, err = UpdateWithFetchedItems(db, "test", items) add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil { if add != 1 || err != nil {
t.Fatal("failed update with alter oncreate") t.Fatal("failed update with alter oncreate")
} }
@ -231,7 +278,7 @@ func TestOnCreateAction(t *testing.T) {
if items[0].Link != "" { if items[0].Link != "" {
t.Fatal("unexpected link") t.Fatal("unexpected link")
} }
add, _, err = UpdateWithFetchedItems(db, "test", items) add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil { if add != 1 || err != nil {
t.Fatal("failed update with augment oncreate") t.Fatal("failed update with augment oncreate")
} }
@ -246,7 +293,7 @@ func TestOnCreateAction(t *testing.T) {
if items[0].Link != "gopher://go.dev" { if items[0].Link != "gopher://go.dev" {
t.Fatal("missing link") t.Fatal("missing link")
} }
add, _, err = UpdateWithFetchedItems(db, "test", items) add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil { if add != 1 || err != nil {
t.Fatal("failed update with attempted deletion oncreate") t.Fatal("failed update with attempted deletion oncreate")
} }
@ -257,7 +304,7 @@ func TestOnCreateAction(t *testing.T) {
// item is created if on_create fails // item is created if on_create fails
onCreate([]string{"false"}) onCreate([]string{"false"})
items = execute([]string{"jq", "-cn", `{id: "five"}`}) items = execute([]string{"jq", "-cn", `{id: "five"}`})
add, _, err = UpdateWithFetchedItems(db, "test", items) add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil { if add != 1 || err != nil {
t.Fatal("failed update with failing oncreate") t.Fatal("failed update with failing oncreate")
} }
@ -271,7 +318,7 @@ func TestOnCreateAction(t *testing.T) {
if items[0].Title != "before" { if items[0].Title != "before" {
t.Fatal("unexpected title") t.Fatal("unexpected title")
} }
add, _, err = UpdateWithFetchedItems(db, "test", items) add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil { if add != 1 || err != nil {
t.Fatal("failed update with bad exit code oncreate") t.Fatal("failed update with bad exit code oncreate")
} }
@ -282,7 +329,7 @@ func TestOnCreateAction(t *testing.T) {
// on_create can't change id, active, or created // on_create can't change id, active, or created
onCreate([]string{"jq", "-c", `.id = "seven"; .active = false; .created = 123456`}) onCreate([]string{"jq", "-c", `.id = "seven"; .active = false; .created = 123456`})
items = execute([]string{"jq", "-cn", `{id: "seven"}`}) items = execute([]string{"jq", "-cn", `{id: "seven"}`})
add, _, err = UpdateWithFetchedItems(db, "test", items) add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil { if add != 1 || err != nil {
t.Fatal("failed update with invalid field changes oncreate") t.Fatal("failed update with invalid field changes oncreate")
} }
@ -291,3 +338,30 @@ func TestOnCreateAction(t *testing.T) {
t.Fatal("unexpected changes to id, active, or created fields") t.Fatal("unexpected changes to id, active, or created fields")
} }
} }
func TestSourceState(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "s"); err != nil {
t.Fatal(err)
}
state, err := GetState(db, "s")
if err != nil {
t.Fatal(err)
}
if len(state) != 0 {
t.Fatal("expected no state on a fresh source")
}
if err = SetState(db, "s", []byte("hello, world")); err != nil {
t.Fatal(err)
}
state, err = GetState(db, "s")
if err != nil {
t.Fatal(err)
}
if string(state) != "hello, world" {
t.Fatalf("expected hello, world, got %s", state)
}
}

View File

@ -1,5 +1,6 @@
create table sources( create table sources(
name text not null, name text not null,
state blob,
primary key (name) primary key (name)
) strict; ) strict;
create table actions( create table actions(
@ -9,6 +10,13 @@ create table actions(
primary key (source, name), primary key (source, name),
foreign key (source) references sources (name) on delete cascade foreign key (source) references sources (name) on delete cascade
) strict; ) strict;
create table envs(
source text not null,
name text not null,
value text not null,
unique (source, name) on conflict replace,
foreign key (source) references sources (name) on delete cascade
) strict;
create table items( create table items(
source text not null, source text not null,
id text not null, id text not null,

View File

@ -4,7 +4,7 @@
<article class="center"> <article class="center">
<span class="item-title"> <span class="item-title">
<a href="/">Home</a> <a href="/">Home</a>
[<a href="#">Active</a> | <a href="#">All</a>] [<a href="?inactive=0">Active</a> | <a href="?inactive=1">All</a>]
</span> </span>
</article> </article>

View File

@ -1,7 +1,7 @@
package web package web
import ( import (
"encoding/json" "fmt"
"log" "log"
"net/http" "net/http"
"strings" "strings"
@ -17,7 +17,7 @@ func (env *Env) getItem(writer http.ResponseWriter, req *http.Request) {
item, err := core.GetItem(env.db, source, id) item, err := core.GetItem(env.db, source, id)
if err != nil { if err != nil {
writer.Write([]byte(err.Error())) http.Error(writer, err.Error(), 500)
return return
} }
html.Item(writer, html.ItemData{Item: item}) html.Item(writer, html.ItemData{Item: item})
@ -29,12 +29,12 @@ func (env *Env) deleteItem(writer http.ResponseWriter, req *http.Request) {
_, err := core.DeactivateItem(env.db, source, id) _, err := core.DeactivateItem(env.db, source, id)
if err != nil { if err != nil {
writer.Write([]byte(err.Error())) http.Error(writer, err.Error(), 500)
return return
} }
item, err := core.GetItem(env.db, source, id) item, err := core.GetItem(env.db, source, id)
if err != nil { if err != nil {
writer.Write([]byte(err.Error())) http.Error(writer, err.Error(), 500)
return return
} }
html.Item(writer, html.ItemData{Item: item}) html.Item(writer, html.ItemData{Item: item})
@ -45,6 +45,16 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) {
id := req.PathValue("id") id := req.PathValue("id")
action := req.PathValue("action") action := req.PathValue("action")
state, err := core.GetState(env.db, source)
if err != nil {
log.Fatalf("error: failed to load state for %s: %v", source, err)
}
envs, err := core.GetEnvs(env.db, source)
if err != nil {
log.Fatalf("error: failed to get envs for %s: %v", source, err)
}
item, err := core.GetItem(env.db, source, id) item, err := core.GetItem(env.db, source, id)
if err != nil { if err != nil {
http.Error(writer, err.Error(), 500) http.Error(writer, err.Error(), 500)
@ -62,27 +72,22 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) {
return return
} }
itemJson, err := json.Marshal(item) newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute)
if err != nil { if err != nil {
http.Error(writer, err.Error(), 500) http.Error(writer, err.Error(), 500)
return return
} }
res, err := core.Execute(source, argv, nil, string(itemJson), time.Minute) if err = env.db.Transact(func(tx core.DB) error {
if err != nil { if _err := core.UpdateItems(tx, []core.Item{newItem}); err != nil {
return fmt.Errorf("failed to update item: %v", _err)
}
if _err := core.SetState(tx, source, newState); err != nil {
return fmt.Errorf("failed to set state for %s: %v", source, _err)
}
return nil
}); err != nil {
http.Error(writer, err.Error(), 500) http.Error(writer, err.Error(), 500)
return
}
if len(res) != 1 {
http.Error(writer, "not exactly one item", 500)
return
}
newItem := res[0]
core.BackfillItem(&newItem, &item)
if err = core.UpdateItems(env.db, []core.Item{newItem}); err != nil {
http.Error(writer, err.Error(), 500)
return
} }
html.Item(writer, html.ItemData{Item: newItem}) html.Item(writer, html.ItemData{Item: newItem})

View File

@ -9,7 +9,7 @@ import (
) )
type Env struct { type Env struct {
db *core.DB db core.DB
} }
func logged(handler http.HandlerFunc) http.HandlerFunc { func logged(handler http.HandlerFunc) http.HandlerFunc {
@ -23,15 +23,15 @@ func handleFunc(pattern string, handler http.HandlerFunc) {
http.HandleFunc(pattern, logged(handler)) http.HandleFunc(pattern, logged(handler))
} }
func RunServer(db *core.DB, addr string, port string) { func RunServer(db core.DB, addr string, port string) {
env := &Env{db} env := &Env{db}
bind := net.JoinHostPort(addr, port) bind := net.JoinHostPort(addr, port)
handleFunc("GET /", env.getRoot) handleFunc("GET /", env.getRoot)
handleFunc("GET /style.css", env.getStyle) handleFunc("GET /style.css", env.getStyle)
handleFunc("GET /htmx.org@2.0.4.js", env.getScript) handleFunc("GET /htmx.org@2.0.4.js", env.getScript)
handleFunc("GET /source/{source}", env.getSource) handleFunc("GET /source/{source}", env.getSource)
handleFunc("GET /item/{source}/{id}", env.getItem) handleFunc("GET /item/{source}/{id}", env.getItem)
handleFunc("DELETE /item/{source}/{id}", env.deleteItem) handleFunc("DELETE /item/{source}/{id}", env.deleteItem)
handleFunc("POST /item/{source}/{id}/action/{action}", env.doAction) handleFunc("POST /item/{source}/{id}/action/{action}", env.doAction)
handleFunc("POST /mass-deactivate", env.massDeactivate) handleFunc("POST /mass-deactivate", env.massDeactivate)

View File

@ -15,7 +15,7 @@ func (env *Env) getRoot(writer http.ResponseWriter, req *http.Request) {
names, err := core.GetSources(env.db) names, err := core.GetSources(env.db)
if err != nil { if err != nil {
writer.Write([]byte(err.Error())) http.Error(writer, err.Error(), 500)
} }
var sources []html.SourceData var sources []html.SourceData

View File

@ -1,6 +1,7 @@
package web package web
import ( import (
"log"
"net/http" "net/http"
"github.com/Jaculabilis/intake/core" "github.com/Jaculabilis/intake/core"
@ -15,11 +16,20 @@ func (env *Env) getSource(writer http.ResponseWriter, req *http.Request) {
} }
// TODO this needs to properly error if the source doesn't exist instead of just returning [] // TODO this needs to properly error if the source doesn't exist instead of just returning []
items, err := core.GetAllItemsForSource(env.db, source) var items []core.Item
var err error
inactive := req.URL.Query().Get("inactive") == "1"
log.Printf("inactive = %t", inactive)
if inactive {
items, err = core.GetAllItemsForSource(env.db, source)
} else {
items, err = core.GetActiveItemsForSource(env.db, source)
}
if err != nil { if err != nil {
http.NotFound(writer, req) http.Error(writer, err.Error(), 500)
return return
} }
data := html.FeedData{ data := html.FeedData{
Items: items, Items: items,
} }