Compare commits

...

14 Commits

36 changed files with 900 additions and 365 deletions

View File

@ -113,7 +113,8 @@ To execute an action, Intake executes the command specified by that action's `ar
The process's environment is as follows:
* `intake`'s environment is inherited.
* `STATE_PATH` is set to the absolute path of a file containing the source's persistent state.
* Each environment variable defined in the source is set.
* `STATE_PATH` is set to the absolute path of a file that the source can use for persistent state. This file can be used for any data in any format. Changes to the state file are only saved if the action succeeds.
When an action receives an item as input, that item's JSON representation is written to that action's `stdin`.
When an action outputs an item, it should write the item's JSON representation to `stdout` on one line.

View File

@ -13,28 +13,25 @@ var actionAddCmd = &cobra.Command{
Long: `Add an action to a source.
`,
Run: func(cmd *cobra.Command, args []string) {
actionAdd(getArgv(cmd, args))
actionAdd(stringArg(cmd, "source"), stringArg(cmd, "action"), getArgv(cmd, args))
},
}
var actionAddSource string
var actionAddAction string
func init() {
actionCmd.AddCommand(actionAddCmd)
actionAddCmd.Flags().StringVarP(&actionAddSource, "source", "s", "", "Source to add action")
actionAddCmd.Flags().StringP("source", "s", "", "Source to add action")
actionAddCmd.MarkFlagRequired("source")
actionAddCmd.Flags().StringVarP(&actionAddAction, "action", "a", "", "Action name")
actionAddCmd.Flags().StringP("action", "a", "", "Action name")
actionAddCmd.MarkFlagRequired("action")
}
func actionAdd(argv []string) {
if actionAddSource == "" {
func actionAdd(source string, action string, argv []string) {
if source == "" {
log.Fatal("error: --source is empty")
}
if actionAddAction == "" {
if action == "" {
log.Fatal("error: --action is empty")
}
if len(argv) == 0 {
@ -43,10 +40,10 @@ func actionAdd(argv []string) {
db := openAndMigrateDb()
err := core.AddAction(db, actionAddSource, actionAddAction, argv)
err := core.AddAction(db, source, action, argv)
if err != nil {
log.Fatalf("error: failed to add action: %v", err)
}
log.Printf("Added action %s to source %s", actionAddAction, actionAddSource)
log.Printf("Added action %s to source %s", action, source)
}

View File

@ -14,37 +14,34 @@ var actionDeleteCmd = &cobra.Command{
Long: `Delete an action from a source.
`,
Run: func(cmd *cobra.Command, args []string) {
actionDelete()
actionDelete(stringArg(cmd, "source"), stringArg(cmd, "action"))
},
}
var actionDeleteSource string
var actionDeleteAction string
func init() {
actionCmd.AddCommand(actionDeleteCmd)
actionDeleteCmd.Flags().StringVarP(&actionDeleteSource, "source", "s", "", "Source to add action")
actionDeleteCmd.Flags().StringP("source", "s", "", "Source to add action")
actionDeleteCmd.MarkFlagRequired("source")
actionDeleteCmd.Flags().StringVarP(&actionDeleteAction, "action", "a", "", "Action name")
actionDeleteCmd.Flags().StringP("action", "a", "", "Action name")
actionDeleteCmd.MarkFlagRequired("action")
}
func actionDelete() {
if actionDeleteSource == "" {
func actionDelete(source string, action string) {
if source == "" {
log.Fatal("error: --source is empty")
}
if actionDeleteAction == "" {
if action == "" {
log.Fatal("error: --action is empty")
}
db := openAndMigrateDb()
err := core.DeleteAction(db, actionDeleteSource, actionDeleteAction)
err := core.DeleteAction(db, source, action)
if err != nil {
log.Fatalf("error: failed to delete action: %v", err)
}
log.Printf("Deleted action %s from source %s", actionDeleteAction, actionDeleteSource)
log.Printf("Deleted action %s from source %s", action, source)
}

View File

@ -13,28 +13,25 @@ var actionEditCmd = &cobra.Command{
Long: `Edit an action on a source.
`,
Run: func(cmd *cobra.Command, args []string) {
actionEdit(getArgv(cmd, args))
actionEdit(stringArg(cmd, "source"), stringArg(cmd, "action"), getArgv(cmd, args))
},
}
var actionEditSource string
var actionEditAction string
func init() {
actionCmd.AddCommand(actionEditCmd)
actionEditCmd.Flags().StringVarP(&actionEditSource, "source", "s", "", "Source to edit action")
actionEditCmd.Flags().StringP("source", "s", "", "Source to edit action")
actionEditCmd.MarkFlagRequired("source")
actionEditCmd.Flags().StringVarP(&actionEditAction, "action", "a", "", "Action name")
actionEditCmd.Flags().StringP("action", "a", "", "Action name")
actionEditCmd.MarkFlagRequired("action")
}
func actionEdit(argv []string) {
if actionEditSource == "" {
func actionEdit(source string, action string, argv []string) {
if source == "" {
log.Fatal("error: --source is empty")
}
if actionEditAction == "" {
if action == "" {
log.Fatal("error: --action is empty")
}
if len(argv) == 0 {
@ -43,10 +40,10 @@ func actionEdit(argv []string) {
db := openAndMigrateDb()
err := core.UpdateAction(db, actionEditSource, actionEditAction, argv)
err := core.UpdateAction(db, source, action, argv)
if err != nil {
log.Fatalf("error: failed to update action: %v", err)
}
log.Printf("Updated action %s on source %s", actionEditAction, actionEditSource)
log.Printf("Updated action %s on source %s", action, source)
}

View File

@ -1,9 +1,9 @@
package cmd
import (
"encoding/json"
"fmt"
"log"
"slices"
"time"
"github.com/Jaculabilis/intake/core"
@ -26,87 +26,95 @@ In a dry run, the item will be printed in the chosen format and not updated.
%s`, makeFormatHelpText()),
Run: func(cmd *cobra.Command, args []string) {
actionExecute()
actionExecute(
stringArg(cmd, "source"),
stringArg(cmd, "action"),
stringArg(cmd, "item"),
stringArg(cmd, "format"),
boolArg(cmd, "dry-run"),
boolArg(cmd, "diff"),
boolArg(cmd, "force"),
)
},
}
var actionExecuteSource string
var actionExecuteAction string
var actionExecuteItem string
var actionExecuteFormat string
var actionExecuteDryRun bool
var actionExecuteDiff bool
var actionExecuteForce bool
func init() {
actionCmd.AddCommand(actionExecuteCmd)
actionExecuteCmd.PersistentFlags().StringVarP(&actionExecuteSource, "source", "s", "", "Source of the item")
actionExecuteCmd.PersistentFlags().StringP("source", "s", "", "Source of the item")
actionExecuteCmd.MarkFlagRequired("source")
actionExecuteCmd.PersistentFlags().StringVarP(&actionExecuteItem, "item", "i", "", "Item to run action on")
actionExecuteCmd.PersistentFlags().StringP("item", "i", "", "Item to run action on")
actionExecuteCmd.MarkFlagRequired("item")
actionExecuteCmd.PersistentFlags().StringVarP(&actionExecuteAction, "action", "a", "", "Action to run")
actionExecuteCmd.PersistentFlags().StringP("action", "a", "", "Action to run")
actionExecuteCmd.MarkFlagRequired("action")
actionExecuteCmd.Flags().StringVarP(&actionExecuteFormat, "format", "f", "headlines", "Feed format for returned items")
actionExecuteCmd.Flags().BoolVar(&actionExecuteDryRun, "dry-run", false, "Instead of updating the item, print it")
actionExecuteCmd.Flags().StringP("format", "f", "headlines", "Feed format for returned items")
actionExecuteCmd.Flags().Bool("dry-run", false, "Instead of updating the item, print it")
actionExecuteCmd.Flags().BoolVar(&actionExecuteDiff, "diff", false, "Show which fields of the item changed")
actionExecuteCmd.Flags().Bool("diff", false, "Show which fields of the item changed")
actionExecuteCmd.Flags().BoolVar(&actionExecuteForce, "force", false, "Execute the action even if the item does not support it")
actionExecuteCmd.Flags().Bool("force", false, "Execute the action even if the item does not support it")
}
func actionExecute() {
formatter := formatAs(actionExecuteFormat)
func actionExecute(
source string,
action string,
itemId string,
format string,
dryRun bool,
diff bool,
force bool,
) {
formatter := formatAs(format)
if actionExecuteSource == "" {
if source == "" {
log.Fatal("error: --source is empty")
}
if actionExecuteAction == "" {
if action == "" {
log.Fatal("error: --action is empty")
}
if actionExecuteItem == "" {
if itemId == "" {
log.Fatal("error: --item is empty")
}
db := openAndMigrateDb()
item, err := core.GetItem(db, actionExecuteSource, actionExecuteItem)
state, err := core.GetState(db, source)
if err != nil {
log.Fatalf("error: failed to load state for %s: %v", source, err)
}
envs, err := core.GetEnvs(db, source)
if err != nil {
log.Fatalf("error: failed to get envs for %s: %v", source, err)
}
item, err := core.GetItem(db, source, itemId)
if err != nil {
log.Fatalf("error: failed to get item: %v", err)
}
if item.Action[actionExecuteAction] == nil {
if actionExecuteForce {
log.Printf("warning: force-executing %s on %s/%s", actionExecuteAction, actionExecuteSource, actionExecuteItem)
if item.Action[action] == nil {
if force {
log.Printf("warning: force-executing %s on %s/%s", action, source, itemId)
} else {
log.Fatalf("error: %s/%s does not support %s", actionExecuteSource, actionExecuteItem, actionExecuteAction)
log.Fatalf("error: %s/%s does not support %s", source, itemId, action)
}
}
argv, err := core.GetArgvForAction(db, actionExecuteSource, actionExecuteAction)
argv, err := core.GetArgvForAction(db, source, action)
if err != nil {
log.Fatalf("error: failed to get action: %v", err)
}
itemJson, err := json.Marshal(item)
newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute)
if err != nil {
log.Fatalf("error: failed to serialize item: %v", err)
log.Fatalf("error executing %s: %v", action, err)
}
res, err := core.Execute(actionExecuteSource, argv, nil, string(itemJson), time.Minute)
if err != nil {
log.Fatalf("error: failed to execute action: %v", err)
}
if len(res) != 1 {
log.Fatalf("error: expected action to produce exactly one item, got %d", len(res))
}
newItem := res[0]
core.BackfillItem(&newItem, &item)
if actionExecuteDiff {
if diff {
if item.Title != newItem.Title {
log.Printf("title: %s => %s", item.Title, newItem.Title)
}
@ -125,14 +133,25 @@ func actionExecute() {
if core.ItemsAreEqual(item, newItem) {
log.Printf("no changes\n")
}
if !slices.Equal(state, newState) {
log.Printf("state changed (%d => %d bytes)", len(state), len(newState))
}
}
if actionExecuteDryRun {
fmt.Println(formatter(res[0]))
if dryRun {
fmt.Println(formatter(newItem))
return
}
if err = core.UpdateItems(db, []core.Item{newItem}); err != nil {
log.Fatalf("error: failed to update item: %v", err)
if err = db.Transact(func(tx core.DB) error {
if _err := core.UpdateItems(tx, []core.Item{newItem}); err != nil {
return fmt.Errorf("failed to update item: %v", _err)
}
if _err := core.SetState(tx, source, newState); err != nil {
return fmt.Errorf("failed to set state for %s: %v", source, _err)
}
return nil
}); err != nil {
log.Fatalf("error: %v", err)
}
}

View File

@ -16,41 +16,38 @@ var actionListCmd = &cobra.Command{
Long: `List actions on a source.
`,
Run: func(cmd *cobra.Command, args []string) {
actionList()
actionList(stringArg(cmd, "source"), boolArg(cmd, "argv"))
},
}
var actionListSource string
var actionListArgv bool
func init() {
actionCmd.AddCommand(actionListCmd)
actionListCmd.Flags().StringVarP(&actionListSource, "source", "s", "", "Source to list actions")
actionListCmd.Flags().StringP("source", "s", "", "Source to list actions")
actionListCmd.MarkFlagRequired("source")
actionListCmd.Flags().BoolVarP(&actionListArgv, "argv", "a", false, "Include action command")
actionListCmd.Flags().BoolP("argv", "a", false, "Include action command")
}
func actionList() {
if actionListSource == "" {
func actionList(source string, argv bool) {
if source == "" {
log.Fatal("error: --source is empty")
}
db := openAndMigrateDb()
actions, err := core.GetActionsForSource(db, actionListSource)
actions, err := core.GetActionsForSource(db, source)
if err != nil {
log.Fatal(err)
}
slices.SortFunc(actions, actionSort)
if actionListArgv {
if argv {
actionArgv := make(map[string][]string)
for _, name := range actions {
argv, err := core.GetArgvForAction(db, actionListSource, name)
argv, err := core.GetArgvForAction(db, source, name)
if err != nil {
log.Fatalf("error: could not get argv for source %s action %s: %v", actionListSource, name, err)
log.Fatalf("error: could not get argv for source %s action %s: %v", source, name, err)
}
actionArgv[name] = argv
}

53
cmd/args.go Normal file
View File

@ -0,0 +1,53 @@
package cmd
import (
"log"
"github.com/spf13/cobra"
)
// Get the value of a bool flag.
func boolArg(cmd *cobra.Command, name string) bool {
b, err := cmd.Flags().GetBool(name)
if err != nil {
log.Fatal(err)
}
return b
}
// Get the value of an int flag.
func intArg(cmd *cobra.Command, name string) int {
i, err := cmd.Flags().GetInt(name)
if err != nil {
log.Fatal(err)
}
return i
}
// Get the value of a string flag.
func stringArg(cmd *cobra.Command, name string) string {
str, err := cmd.Flags().GetString(name)
if err != nil {
log.Fatal(err)
}
return str
}
// Get the value of a string array flag.
func stringArrayArg(cmd *cobra.Command, name string) []string {
s, err := cmd.Flags().GetStringArray(name)
if err != nil {
log.Fatal(err)
}
return s
}
// Get the argv after the -- separator.
func getArgv(cmd *cobra.Command, args []string) []string {
lenAtDash := cmd.Flags().ArgsLenAtDash()
if lenAtDash == -1 {
return nil
} else {
return args[lenAtDash:]
}
}

View File

@ -16,45 +16,50 @@ The default format is "headlines".
%s`, makeFormatHelpText()),
Run: func(cmd *cobra.Command, args []string) {
feed()
feed(
stringArg(cmd, "format"),
stringArg(cmd, "source"),
stringArg(cmd, "channel"),
boolArg(cmd, "all"),
)
},
}
var feedFormat string
var feedSource string
var feedChannel string
var feedShowInactive bool
func init() {
rootCmd.AddCommand(feedCmd)
feedCmd.Flags().StringVarP(&feedFormat, "format", "f", "headlines", "Feed format")
feedCmd.Flags().StringVarP(&feedSource, "source", "s", "", "Limit to items from source")
feedCmd.Flags().StringVarP(&feedChannel, "channel", "c", "", "Limit to items from channel")
feedCmd.Flags().StringP("format", "f", "headlines", "Feed format")
feedCmd.Flags().StringP("source", "s", "", "Limit to items from source")
feedCmd.Flags().StringP("channel", "c", "", "Limit to items from channel")
feedCmd.MarkFlagsMutuallyExclusive("source", "channel")
feedCmd.Flags().BoolVar(&feedShowInactive, "all", false, "Show inactive items")
feedCmd.Flags().Bool("all", false, "Show inactive items")
}
func feed() {
formatter := formatAs(feedFormat)
func feed(
format string,
source string,
channel string,
showInactive bool,
) {
formatter := formatAs(format)
db := openAndMigrateDb()
var items []core.Item
var err error
if feedSource != "" {
if feedShowInactive {
items, err = core.GetAllItemsForSource(db, feedSource)
if source != "" {
if showInactive {
items, err = core.GetAllItemsForSource(db, source)
} else {
items, err = core.GetActiveItemsForSource(db, feedSource)
items, err = core.GetActiveItemsForSource(db, source)
}
if err != nil {
log.Fatalf("error: failed to fetch items from %s:, %v", feedSource, err)
log.Fatalf("error: failed to fetch items from %s:, %v", source, err)
}
} else if feedChannel != "" {
} else if channel != "" {
log.Fatal("error: unimplemented")
} else {
if feedShowInactive {
if showInactive {
items, err = core.GetAllItems(db)
} else {
items, err = core.GetAllActiveItems(db)

View File

@ -19,49 +19,58 @@ var itemAddCmd = &cobra.Command{
By default, the item is created in the "default" source, which is created
if it doesn't exist, with a random id.`,
Run: func(cmd *cobra.Command, args []string) {
itemAdd()
itemAdd(
stringArg(cmd, "source"),
stringArg(cmd, "id"),
stringArg(cmd, "title"),
stringArg(cmd, "author"),
stringArg(cmd, "body"),
stringArg(cmd, "link"),
intArg(cmd, "time"),
stringArg(cmd, "action"),
)
},
}
var addItemSource string
var addItemId string
var addItemTitle string
var addItemAuthor string
var addItemBody string
var addItemLink string
var addItemTime int
var addItemActions string
func init() {
itemCmd.AddCommand(itemAddCmd)
itemAddCmd.Flags().StringVarP(&addItemSource, "source", "s", "", "Source in which to create the item (default: default)")
itemAddCmd.Flags().StringVarP(&addItemId, "id", "i", "", "Item id (default: random hex)")
itemAddCmd.Flags().StringVarP(&addItemTitle, "title", "t", "", "Item title")
itemAddCmd.Flags().StringVarP(&addItemAuthor, "author", "a", "", "Item author")
itemAddCmd.Flags().StringVarP(&addItemBody, "body", "b", "", "Item body")
itemAddCmd.Flags().StringVarP(&addItemLink, "link", "l", "", "Item link")
itemAddCmd.Flags().IntVarP(&addItemTime, "time", "m", 0, "Item time as a Unix timestamp")
itemAddCmd.Flags().StringVarP(&addItemActions, "action", "x", "", "Item time as a Unix timestamp")
itemAddCmd.Flags().StringP("source", "s", "", "Source in which to create the item (default: default)")
itemAddCmd.Flags().StringP("id", "i", "", "Item id (default: random hex)")
itemAddCmd.Flags().StringP("title", "t", "", "Item title")
itemAddCmd.Flags().StringP("author", "a", "", "Item author")
itemAddCmd.Flags().StringP("body", "b", "", "Item body")
itemAddCmd.Flags().StringP("link", "l", "", "Item link")
itemAddCmd.Flags().IntP("time", "m", 0, "Item time as a Unix timestamp")
itemAddCmd.Flags().StringP("action", "x", "", "Item action data as JSON")
}
func itemAdd() {
func itemAdd(
source string,
id string,
title string,
author string,
body string,
link string,
time int,
actions string,
) {
// Default to "default" source
if addItemSource == "" {
addItemSource = "default"
if source == "" {
source = "default"
}
// Default id to random hex string
if addItemId == "" {
if id == "" {
bytes := make([]byte, 16)
if _, err := rand.Read(bytes); err != nil {
log.Fatalf("error: failed to generate id: %v", err)
}
addItemId = hex.EncodeToString(bytes)
id = hex.EncodeToString(bytes)
}
var actions core.Actions
if addItemActions != "" {
if err := json.Unmarshal([]byte(addItemActions), &actions); err != nil {
var itemActions core.Actions
if actions != "" {
if err := json.Unmarshal([]byte(actions), &itemActions); err != nil {
log.Fatalf("error: could not parse actions: %v", err)
}
}
@ -69,17 +78,17 @@ func itemAdd() {
db := openAndMigrateDb()
if err := core.AddItems(db, []core.Item{{
Source: addItemSource,
Id: addItemId,
Title: addItemTitle,
Author: addItemAuthor,
Body: addItemBody,
Link: addItemLink,
Time: addItemTime,
Action: actions,
Source: source,
Id: id,
Title: title,
Author: author,
Body: body,
Link: link,
Time: time,
Action: itemActions,
}}); err != nil {
log.Fatalf("error: failed to add item: %s", err)
}
log.Printf("Added %s/%s\n", addItemSource, addItemId)
log.Printf("Added %s/%s\n", source, id)
}

View File

@ -17,30 +17,27 @@ var itemDeactivateCmd = &cobra.Command{
Deactivation is idempotent.`,
Run: func(cmd *cobra.Command, args []string) {
itemDeactivate()
itemDeactivate(stringArg(cmd, "source"), stringArg(cmd, "item"))
},
}
var deacSource string
var deacItem string
func init() {
itemCmd.AddCommand(itemDeactivateCmd)
itemDeactivateCmd.Flags().StringVarP(&deacSource, "source", "s", "", "Source of the item")
itemDeactivateCmd.Flags().StringP("source", "s", "", "Source of the item")
itemDeactivateCmd.MarkFlagRequired("source")
itemDeactivateCmd.Flags().StringVarP(&deacItem, "item", "i", "", "Item id")
itemDeactivateCmd.Flags().StringP("item", "i", "", "Item id")
itemDeactivateCmd.MarkFlagRequired("item")
}
func itemDeactivate() {
func itemDeactivate(source string, item string) {
db := openAndMigrateDb()
active, err := core.DeactivateItem(db, deacSource, deacItem)
active, err := core.DeactivateItem(db, source, item)
if err != nil {
log.Fatalf("Failed to deactivate item: %s", err)
}
if active {
fmt.Printf("Deactivated %s/%s\n", deacSource, deacItem)
fmt.Printf("Deactivated %s/%s\n", source, item)
}
}

View File

@ -16,23 +16,21 @@ var migrateCmd = &cobra.Command{
Note that the database will be created if it does not exist, even with --list.`,
Run: func(cmd *cobra.Command, args []string) {
migrate()
migrate(boolArg(cmd, "list"))
},
}
var migrateListOnly bool
func init() {
rootCmd.AddCommand(migrateCmd)
migrateCmd.Flags().BoolVarP(&migrateListOnly, "list", "l", false, "Show the list of migrations")
migrateCmd.Flags().BoolP("list", "l", false, "Show the list of migrations")
}
func migrate() {
func migrate(listOnly bool) {
db := openDb()
core.InitDatabase(db)
if migrateListOnly {
if listOnly {
pending, err := core.GetPendingMigrations(db)
if err != nil {
log.Fatal(err)

View File

@ -51,7 +51,7 @@ func getDbPath() string {
}
// Attempt to open the specified database and exit with an error if it fails.
func openDb() *core.DB {
func openDb() core.DB {
dbPath := getDbPath()
db, err := core.OpenDb(dbPath)
if err != nil {
@ -61,7 +61,7 @@ func openDb() *core.DB {
}
// Attempt to open and migrate the specified database and exit with an error if it fails.
func openAndMigrateDb() *core.DB {
func openAndMigrateDb() core.DB {
db := openDb()
if err := core.InitDatabase(db); err != nil {
log.Fatalf("error: failed to init database: %v", err)
@ -72,15 +72,6 @@ func openAndMigrateDb() *core.DB {
return db
}
func getArgv(cmd *cobra.Command, args []string) []string {
lenAtDash := cmd.Flags().ArgsLenAtDash()
if lenAtDash == -1 {
return nil
} else {
return args[lenAtDash:]
}
}
// Sort "fetch" action ahead of other actions
func actionSort(a string, b string) int {
if a == "fetch" {

View File

@ -11,21 +11,18 @@ var serveCmd = &cobra.Command{
Long: `Serve the intake web interface.
`,
Run: func(cmd *cobra.Command, args []string) {
serve()
serve(stringArg(cmd, "addr"), stringArg(cmd, "port"))
},
}
var serveAddr string
var servePort string
func init() {
rootCmd.AddCommand(serveCmd)
serveCmd.Flags().StringVarP(&serveAddr, "addr", "a", "localhost", "Address to bind to")
serveCmd.Flags().StringVarP(&servePort, "port", "p", "8081", "Port to bind to")
serveCmd.Flags().StringP("addr", "a", "localhost", "Address to bind to")
serveCmd.Flags().StringP("port", "p", "8081", "Port to bind to")
}
func serve() {
func serve(addr string, port string) {
db := openAndMigrateDb()
web.RunServer(db, serveAddr, servePort)
web.RunServer(db, addr, port)
}

View File

@ -13,29 +13,27 @@ var sourceAddCmd = &cobra.Command{
Long: `Create a source.
`,
Run: func(cmd *cobra.Command, args []string) {
sourceAdd()
sourceAdd(stringArg(cmd, "source"))
},
}
var sourceAddSource string
func init() {
sourceCmd.AddCommand(sourceAddCmd)
sourceAddCmd.Flags().StringVarP(&sourceAddSource, "source", "s", "", "Source name")
sourceAddCmd.Flags().StringP("source", "s", "", "Source name")
sourceAddCmd.MarkFlagRequired("source")
}
func sourceAdd() {
if sourceAddSource == "" {
func sourceAdd(source string) {
if source == "" {
log.Fatal("error: --source is empty")
}
db := openAndMigrateDb()
if err := core.AddSource(db, sourceAddSource); err != nil {
if err := core.AddSource(db, source); err != nil {
log.Fatalf("error: failed to add source: %v", err)
}
log.Printf("Added source %s", sourceAddSource)
log.Printf("Added source %s", source)
}

View File

@ -14,28 +14,26 @@ var sourceDeleteCmd = &cobra.Command{
Long: `Delete a source.
`,
Run: func(cmd *cobra.Command, args []string) {
sourceDelete()
sourceDelete(stringArg(cmd, "source"))
},
}
var sourceDeleteSource string
func init() {
sourceCmd.AddCommand(sourceDeleteCmd)
sourceDeleteCmd.Flags().StringVarP(&sourceDeleteSource, "source", "s", "", "Source to delete")
sourceDeleteCmd.Flags().StringP("source", "s", "", "Source to delete")
}
func sourceDelete() {
if sourceDeleteSource == "" {
func sourceDelete(source string) {
if source == "" {
log.Fatal("error: --source is empty")
}
db := openAndMigrateDb()
if err := core.DeleteSource(db, sourceDeleteSource); err != nil {
if err := core.DeleteSource(db, source); err != nil {
log.Fatalf("error: failed to delete source: %v", err)
}
log.Printf("Deleted source %s", sourceDeleteSource)
log.Printf("Deleted source %s", source)
}

View File

@ -1,21 +0,0 @@
package cmd
import (
"log"
"github.com/spf13/cobra"
)
var sourceEditCmd = &cobra.Command{
Use: "edit",
Short: "Edit a source",
Long: `
`,
Run: func(cmd *cobra.Command, args []string) {
log.Fatal("not implemented")
},
}
func init() {
sourceCmd.AddCommand(sourceEditCmd)
}

53
cmd/sourceEnv.go Normal file
View File

@ -0,0 +1,53 @@
package cmd
import (
"fmt"
"log"
"github.com/Jaculabilis/intake/core"
"github.com/spf13/cobra"
)
var sourceEnvCmd = &cobra.Command{
Use: "env",
Short: "Manage source environment variables",
Long: `Add, edit, list, or delete environment variables.
When --set is not specified, list the environment for the source.
--set KEY=VALUE will add or edit an environment variable to be set in all
action executions.
--set KEY= will delete the environment variable from the source.
`,
Run: func(cmd *cobra.Command, args []string) {
sourceEnv(stringArg(cmd, "source"), stringArrayArg(cmd, "set"))
},
}
func init() {
sourceCmd.AddCommand(sourceEnvCmd)
sourceEnvCmd.Flags().StringP("source", "s", "", "Source to edit")
sourceEnvCmd.MarkFlagRequired("source")
sourceEnvCmd.Flags().StringArray("set", nil, "Set or modify environment variable")
}
func sourceEnv(source string, env []string) {
db := openAndMigrateDb()
if len(env) == 0 {
envs, err := core.GetEnvs(db, source)
if err != nil {
log.Fatalf("failed to get envs: %v", err)
}
for _, env := range envs {
fmt.Println(env)
}
}
if err := core.SetEnvs(db, source, env); err != nil {
log.Fatalf("failed to set envs: %v", err)
}
}

View File

@ -3,6 +3,7 @@ package cmd
import (
"fmt"
"log"
"slices"
"time"
"github.com/Jaculabilis/intake/core"
@ -23,50 +24,59 @@ the source will not be updated with the fetch result.
%s`, makeFormatHelpText()),
Run: func(cmd *cobra.Command, args []string) {
sourceFetch()
sourceFetch(stringArg(cmd, "source"), stringArg(cmd, "format"), boolArg(cmd, "dry-run"))
},
}
var sourceFetchSource string
var sourceFetchFormat string
var sourceFetchDryRun bool
func init() {
sourceCmd.AddCommand(sourceFetchCmd)
sourceFetchCmd.Flags().StringVarP(&sourceFetchSource, "source", "s", "", "Source name to fetch (required)")
sourceFetchCmd.Flags().StringP("source", "s", "", "Source name to fetch (required)")
sourceFetchCmd.MarkFlagRequired("source")
sourceFetchCmd.Flags().StringVarP(&sourceFetchFormat, "format", "f", "headlines", "Feed format for returned items.")
sourceFetchCmd.Flags().BoolVar(&sourceFetchDryRun, "dry-run", false, "Instead of updating the source, print the fetched items")
sourceFetchCmd.Flags().StringP("format", "f", "headlines", "Feed format for returned items.")
sourceFetchCmd.Flags().Bool("dry-run", false, "Instead of updating the source, print the fetched items")
}
func sourceFetch() {
formatter := formatAs(sourceFetchFormat)
func sourceFetch(source string, format string, dryRun bool) {
formatter := formatAs(format)
db := openAndMigrateDb()
argv, err := core.GetArgvForAction(db, sourceFetchSource, "fetch")
state, err := core.GetState(db, source)
if err != nil {
log.Fatalf("error: failed to load state for %s: %v", source, err)
}
envs, err := core.GetEnvs(db, source)
if err != nil {
log.Fatalf("error: failed to get envs for %s: %v", source, err)
}
argv, err := core.GetArgvForAction(db, source, "fetch")
if err != nil {
log.Fatalf("error: failed to get fetch action: %v", err)
}
items, err := core.Execute(sourceFetchSource, argv, nil, "", time.Minute)
items, newState, err := core.Execute(source, argv, envs, state, "", time.Minute)
if err != nil {
log.Fatalf("error: failed to execute fetch: %v", err)
}
if sourceFetchDryRun {
if dryRun {
log.Printf("Fetch returned %d items", len(items))
if !slices.Equal(state, newState) {
log.Printf("State update (%d => %d bytes)", len(state), len(newState))
}
for _, item := range items {
fmt.Println(formatter(item))
}
return
}
added, deleted, err := core.UpdateWithFetchedItems(db, sourceFetchSource, items)
added, deleted, err := core.UpdateWithFetchedItems(db, source, newState, items)
if err != nil {
log.Fatalf("error: failed to update: %v", err)
}
log.Printf("%s added %d items, updated %d items, and deleted %d items", sourceFetchSource, added, len(items)-added, deleted)
log.Printf("%s added %d items, updated %d items, and deleted %d items", source, added, len(items)-added, deleted)
}

View File

@ -16,19 +16,17 @@ var sourceListCmd = &cobra.Command{
Long: `Print the list of sources.
`,
Run: func(cmd *cobra.Command, args []string) {
sourceList()
sourceList(boolArg(cmd, "actions"))
},
}
var sourceListShowActions bool
func init() {
sourceCmd.AddCommand(sourceListCmd)
sourceListCmd.Flags().BoolVarP(&sourceListShowActions, "actions", "a", false, "Include source actions")
sourceListCmd.Flags().BoolP("actions", "a", false, "Include source actions")
}
func sourceList() {
func sourceList(showActions bool) {
db := openAndMigrateDb()
names, err := core.GetSources(db)
@ -37,7 +35,7 @@ func sourceList() {
}
slices.Sort(names)
if sourceListShowActions {
if showActions {
sourceActions := make(map[string][]string)
for _, name := range names {
actions, err := core.GetActionsForSource(db, name)

View File

@ -16,30 +16,23 @@ var sourceTestCmd = &cobra.Command{
%s`, makeFormatHelpText()),
Run: func(cmd *cobra.Command, args []string) {
l := cmd.Flags().ArgsLenAtDash()
if l == -1 {
sourceTest(nil)
} else {
sourceTest(args[l:])
}
sourceTest(stringArrayArg(cmd, "env"), stringArg(cmd, "format"), getArgv(cmd, args))
},
}
var sourceTestEnv []string
var sourceTestFormat string
func init() {
sourceCmd.AddCommand(sourceTestCmd)
sourceTestCmd.Flags().StringArrayVarP(&sourceTestEnv, "env", "e", nil, "Environment variables to set, in the form KEY=VAL")
sourceTestCmd.Flags().StringVarP(&sourceTestFormat, "format", "f", "headlines", "Feed format for returned items.")
sourceTestCmd.Flags().StringArrayP("env", "e", nil, "Environment variables to set, in the form KEY=VAL")
sourceTestCmd.Flags().StringP("format", "f", "headlines", "Feed format for returned items.")
}
func sourceTest(cmd []string) {
formatter := formatAs(sourceTestFormat)
func sourceTest(env []string, format string, cmd []string) {
formatter := formatAs(format)
items, err := core.Execute("", cmd, sourceTestEnv, "", time.Minute)
log.Printf("Returned %d items", len(items))
items, state, err := core.Execute("test", cmd, env, nil, "", time.Minute)
log.Printf("returned %d items", len(items))
log.Printf("wrote %d bytes of state", len(state))
if err != nil {
log.Fatal(err)
}

View File

@ -6,6 +6,7 @@ import (
"database/sql/driver"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"os"
@ -25,7 +26,7 @@ func (a *argList) Scan(value interface{}) error {
return json.Unmarshal([]byte(value.(string)), a)
}
func AddAction(db *DB, source string, name string, argv []string) error {
func AddAction(db DB, source string, name string, argv []string) error {
_, err := db.Exec(`
insert into actions (source, name, argv)
values (?, ?, jsonb(?))
@ -33,7 +34,7 @@ func AddAction(db *DB, source string, name string, argv []string) error {
return err
}
func UpdateAction(db *DB, source string, name string, argv []string) error {
func UpdateAction(db DB, source string, name string, argv []string) error {
_, err := db.Exec(`
update actions
set argv = jsonb(?)
@ -42,7 +43,7 @@ func UpdateAction(db *DB, source string, name string, argv []string) error {
return err
}
func GetActionsForSource(db *DB, source string) ([]string, error) {
func GetActionsForSource(db DB, source string) ([]string, error) {
rows, err := db.Query(`
select name
from actions
@ -63,7 +64,7 @@ func GetActionsForSource(db *DB, source string) ([]string, error) {
return names, nil
}
func GetArgvForAction(db *DB, source string, name string) ([]string, error) {
func GetArgvForAction(db DB, source string, name string) ([]string, error) {
rows := db.QueryRow(`
select json(argv)
from actions
@ -77,7 +78,7 @@ func GetArgvForAction(db *DB, source string, name string) ([]string, error) {
return argv, nil
}
func DeleteAction(db *DB, source string, name string) error {
func DeleteAction(db DB, source string, name string) error {
_, err := db.Exec(`
delete from actions
where source = ? and name = ?
@ -126,19 +127,35 @@ func Execute(
source string,
argv []string,
env []string,
state []byte,
input string,
timeout time.Duration,
) ([]Item, error) {
log.Printf("Executing %v", argv)
) ([]Item, []byte, error) {
log.Printf("executing %v", argv)
if len(argv) == 0 {
return nil, errors.New("empty argv")
return nil, nil, errors.New("empty argv")
}
if source == "" {
return nil, errors.New("empty source")
return nil, nil, errors.New("empty source")
}
env = append(env, "STATE_PATH=")
stateFile, err := os.CreateTemp("", "intake_state_*")
if err != nil {
return nil, nil, fmt.Errorf("error: failed to create temp state file: %v", err)
}
defer func() {
if err := os.Remove(stateFile.Name()); err != nil {
log.Printf("error: failed to delete %s", stateFile.Name())
}
}()
_, err = stateFile.Write(state)
if err != nil {
return nil, nil, fmt.Errorf("error: failed to write state file: %v", err)
}
env = append(env, "STATE_PATH="+stateFile.Name())
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
@ -149,15 +166,15 @@ func Execute(
// Open pipes to the command
stdin, err := cmd.StdinPipe()
if err != nil {
return nil, err
return nil, nil, err
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
return nil, nil, err
}
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, err
return nil, nil, err
}
cout := make(chan Item)
@ -180,7 +197,7 @@ func Execute(
// Kick off the command
err = cmd.Start()
if err != nil {
return nil, err
return nil, nil, err
}
// Block until std{out,err} close
@ -190,19 +207,51 @@ func Execute(
err = cmd.Wait()
if ctx.Err() == context.DeadlineExceeded {
log.Printf("Timed out after %v\n", timeout)
return nil, err
return nil, nil, err
} else if exiterr, ok := err.(*exec.ExitError); ok {
log.Printf("error: %s failed with exit code %d\n", argv[0], exiterr.ExitCode())
return nil, err
return nil, nil, err
} else if err != nil {
log.Printf("error: %s failed with error: %s\n", argv[0], err)
return nil, err
return nil, nil, err
}
if parseError {
log.Printf("error: could not parse item\n")
return nil, errors.New("invalid JSON")
return nil, nil, errors.New("invalid JSON")
}
return items, nil
newState, err := os.ReadFile(stateFile.Name())
if err != nil {
return nil, nil, fmt.Errorf("error: failed to read state file: %v", err)
}
return items, newState, nil
}
// Execute an action that takes an item as input and returns the item modified.
// This is basically just a wrapper over [Execute] that handles the input and backfilling.
func ExecuteItemAction(
item Item,
argv []string,
env []string,
state []byte,
timeout time.Duration,
) (Item, []byte, error) {
itemJson, err := json.Marshal(item)
if err != nil {
return Item{}, nil, fmt.Errorf("failed to serialize item: %v", err)
}
res, newState, err := Execute(item.Source, argv, env, state, string(itemJson), timeout)
if err != nil {
return Item{}, nil, fmt.Errorf("failed to execute action for %s/%s: %v", item.Source, item.Id, err)
}
if len(res) != 1 {
return Item{}, nil, fmt.Errorf("expected action to produce exactly one item, got %d", len(res))
}
newItem := res[0]
BackfillItem(&newItem, &item)
return newItem, newState, nil
}

View File

@ -72,7 +72,8 @@ func TestExecute(t *testing.T) {
}
}
execute := func(argv []string) ([]Item, error) {
return Execute("_", argv, nil, "", time.Minute)
item, _, err := Execute("_", argv, nil, nil, "", time.Minute)
return item, err
}
res, err := execute([]string{"true"})
@ -89,7 +90,7 @@ func TestExecute(t *testing.T) {
assertLen(res, 0)
// Timeout
res, err = Execute("_", []string{"sleep", "10"}, nil, "", time.Millisecond)
res, _, err = Execute("_", []string{"sleep", "10"}, nil, nil, "", time.Millisecond)
assertNotNil(err)
assertLen(res, 0)
@ -102,7 +103,7 @@ func TestExecute(t *testing.T) {
}
// Read from stdin
res, err = Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, "bar", time.Minute)
res, _, err = Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, nil, "bar", time.Minute)
assertNil(err)
assertLen(res, 1)
if res[0].Id != "bar" {
@ -110,7 +111,7 @@ func TestExecute(t *testing.T) {
}
// Set env
res, err = Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, "", time.Minute)
res, _, err = Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, nil, "", time.Minute)
assertNil(err)
assertLen(res, 1)
if res[0].Id != "baz" {
@ -179,4 +180,22 @@ func TestExecute(t *testing.T) {
if res[0].Action["hello"] == nil {
t.Fatal("missing hello action")
}
// Read state
argv := []string{"sh", "-c", `cat $STATE_PATH | jq -cR '{id: "greeting", title: .} | .title = "Hello " + .title'`}
res, _, err = Execute("_", argv, nil, []byte("world"), "", time.Minute)
assertNil(err)
assertLen(res, 1)
if res[0].Title != "Hello world" {
t.Fatalf("expected 'Hello world' from read state, got '%s'", res[0].Title)
}
// Write state
argv = []string{"sh", "-c", `printf "Hello world" > $STATE_PATH; jq -cn '{id: "test"}'`}
res, newState, err := Execute("_", argv, nil, nil, "", time.Minute)
assertNil(err)
assertLen(res, 1)
if string(newState) != "Hello world" {
t.Fatalf("expected 'Hello world' from write state, got %s", string(newState))
}
}

View File

@ -7,24 +7,36 @@ import (
_ "github.com/mattn/go-sqlite3"
)
type DB struct {
type DB interface {
Query(query string, args ...any) (*sql.Rows, error)
QueryRow(query string, args ...any) *sql.Row
Exec(query string, args ...any) (sql.Result, error)
Prepare(query string) (*sql.Stmt, error)
Transact(func(DB) error) error
}
type RoRwDb struct {
ro *sql.DB
rw *sql.DB
}
func (db *DB) Query(query string, args ...any) (*sql.Rows, error) {
func (db *RoRwDb) Query(query string, args ...any) (*sql.Rows, error) {
return db.ro.Query(query, args...)
}
func (db *DB) QueryRow(query string, args ...any) *sql.Row {
func (db *RoRwDb) QueryRow(query string, args ...any) *sql.Row {
return db.ro.QueryRow(query, args...)
}
func (db *DB) Exec(query string, args ...any) (sql.Result, error) {
func (db *RoRwDb) Exec(query string, args ...any) (sql.Result, error) {
return db.rw.Exec(query, args...)
}
func (db *DB) Transact(transaction func(*sql.Tx) error) error {
func (db *RoRwDb) Prepare(query string) (*sql.Stmt, error) {
return db.rw.Prepare(query)
}
func (db *RoRwDb) Transact(transaction func(DB) error) error {
tx, err := db.rw.Begin()
if err != nil {
return err
@ -34,7 +46,7 @@ func (db *DB) Transact(transaction func(*sql.Tx) error) error {
if err != nil {
return err
}
if err = transaction(tx); err != nil {
if err = transaction(&TxDb{tx}); err != nil {
return err
}
if err = tx.Commit(); err != nil {
@ -43,6 +55,30 @@ func (db *DB) Transact(transaction func(*sql.Tx) error) error {
return nil
}
type TxDb struct {
*sql.Tx
}
func (tx *TxDb) Query(query string, args ...any) (*sql.Rows, error) {
return tx.Tx.Query(query, args...)
}
func (tx *TxDb) QueryRow(query string, args ...any) *sql.Row {
return tx.Tx.QueryRow(query, args...)
}
func (tx *TxDb) Exec(query string, args ...any) (sql.Result, error) {
return tx.Tx.Exec(query, args...)
}
func (tx *TxDb) Prepare(query string) (*sql.Stmt, error) {
return tx.Tx.Prepare(query)
}
func (tx *TxDb) Transact(transaction func(DB) error) error {
return transaction(tx)
}
func defaultPragma(db *sql.DB) (sql.Result, error) {
return db.Exec(`
pragma journal_mode = WAL;
@ -55,7 +91,7 @@ func defaultPragma(db *sql.DB) (sql.Result, error) {
`)
}
func OpenDb(dataSourceName string) (*DB, error) {
func OpenDb(dataSourceName string) (DB, error) {
ro, err := sql.Open("sqlite3", dataSourceName)
if err != nil {
defer ro.Close()
@ -82,7 +118,7 @@ func OpenDb(dataSourceName string) (*DB, error) {
return nil, err
}
wrapper := new(DB)
wrapper := new(RoRwDb)
wrapper.ro = ro
wrapper.rw = rw
return wrapper, nil

View File

@ -2,11 +2,87 @@ package core
import (
"database/sql"
"errors"
"strings"
"testing"
_ "github.com/mattn/go-sqlite3"
)
type FailureDb struct {
db DB
queryError func(string, ...any) error
execError func(string, ...any) error
}
func (f *FailureDb) Query(query string, args ...any) (*sql.Rows, error) {
if f.queryError != nil {
if err := f.queryError(query, args...); err != nil {
return nil, err
}
}
return f.db.Query(query, args...)
}
func (f *FailureDb) QueryRow(query string, args ...any) *sql.Row {
return f.db.QueryRow(query, args...)
}
func (f *FailureDb) Exec(query string, args ...any) (sql.Result, error) {
if f.execError != nil {
if err := f.execError(query, args...); err != nil {
return nil, err
}
}
return f.db.Exec(query, args...)
}
func (f *FailureDb) Prepare(query string) (*sql.Stmt, error) {
return f.db.Prepare(query)
}
func (f *FailureDb) Transact(txFunc func(DB) error) error {
return f.db.Transact(func(tx DB) error {
ftx := FailureDb{
db: tx,
queryError: f.queryError,
execError: f.execError,
}
return txFunc(&ftx)
})
}
func TestFailureDb(t *testing.T) {
db := EphemeralDb(t)
fdb := FailureDb{
db: db,
queryError: func(q string, a ...any) error {
if strings.Contains(q, "2") {
return errors.New("oopsie")
}
return nil
},
}
if _, err := fdb.Query("select 1"); err != nil {
t.Fatal(err)
}
if _, err := fdb.Query("select 2"); err == nil {
t.Fatal("expected error")
}
if err := fdb.Transact(func(tx DB) error {
if _, err := tx.Query("select 1"); err != nil {
t.Fatal(err)
}
_, err := tx.Query("select 2")
return err
}); err == nil {
t.Fatal("expected error from inside transaction")
}
}
func TestDeleteSourceCascade(t *testing.T) {
db := EphemeralDb(t)
@ -55,7 +131,7 @@ func TestTransaction(t *testing.T) {
}
// A transaction that should succeed
err := db.Transact(func(tx *sql.Tx) error {
err := db.Transact(func(tx DB) error {
if _, err := tx.Exec("insert into planets (name) values (?)", "mercury"); err != nil {
t.Fatal(err)
}
@ -86,7 +162,7 @@ func TestTransaction(t *testing.T) {
}
// A transaction that should fail
err = db.Transact(func(tx *sql.Tx) error {
err = db.Transact(func(tx DB) error {
if _, err := tx.Exec("insert into planets (name) values (?)", "earth"); err != nil {
t.Fatal(err)
}

56
core/env.go Normal file
View File

@ -0,0 +1,56 @@
package core
import (
"fmt"
"strings"
)
func GetEnvs(db DB, source string) ([]string, error) {
rows, err := db.Query(`
select name, value
from envs
where source = ?
`, source)
if err != nil {
return nil, err
}
var envs []string
for rows.Next() {
var name string
var value string
if err := rows.Scan(&name, &value); err != nil {
return nil, err
}
envs = append(envs, fmt.Sprintf("%s=%s", name, value))
}
return envs, nil
}
func SetEnvs(db DB, source string, envs []string) error {
return db.Transact(func(tx DB) error {
for _, env := range envs {
parts := strings.SplitN(env, "=", 2)
if len(parts) != 2 {
return fmt.Errorf("invalid env format: %s", env)
}
if parts[1] == "" {
_, err := tx.Exec(`
delete from envs
where source = ? and name = ?
`, source, parts[0])
if err != nil {
return fmt.Errorf("failed to clear source %s env %s: %v", source, parts[0], err)
}
} else {
_, err := tx.Exec(`
insert into envs (source, name, value)
values (?, ?, ?)
`, source, parts[0], parts[1])
if err != nil {
return fmt.Errorf("failed to set source %s env %s = %s: %v", source, parts[0], parts[1], err)
}
}
}
return nil
})
}

91
core/env_test.go Normal file
View File

@ -0,0 +1,91 @@
package core
import (
"slices"
"testing"
)
func TestEnvs(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "_"); err != nil {
t.Fatal(err)
}
// Insert env
if err := SetEnvs(db, "_", []string{"ONE=hello"}); err != nil {
t.Fatal(err)
}
envs, err := GetEnvs(db, "_")
if err != nil {
t.Fatal(err)
}
if len(envs) != 1 {
t.Fatal("expected 1 env")
}
if envs[0] != "ONE=hello" {
t.Fatalf("Expected ONE=hello, got %s", envs[0])
}
// Insert env with = in value
if err := SetEnvs(db, "_", []string{"TWO=world=true"}); err != nil {
t.Fatal(err)
}
envs, err = GetEnvs(db, "_")
if err != nil {
t.Fatal(err)
}
if len(envs) != 2 {
t.Fatal("expected 2 envs")
}
slices.Sort(envs) // ONE > TWO
if envs[1] != "TWO=world=true" {
t.Fatalf("Expected TWO=world=true, got %s", envs[1])
}
// Replace env
if err := SetEnvs(db, "_", []string{"TWO=goodbye"}); err != nil {
t.Fatal(err)
}
envs, err = GetEnvs(db, "_")
if err != nil {
t.Fatal(err)
}
if len(envs) != 2 {
t.Fatal("expected 2 envs")
}
slices.Sort(envs) // ONE > TWO
if envs[1] != "TWO=goodbye" {
t.Fatalf("Expected TWO=goodbye, got %s", envs[1])
}
// Insert is transactional on error
if err := SetEnvs(db, "_", []string{"THREE=crowd", "FOUR"}); err == nil {
t.Fatal("expected bad env insert to fail")
}
envs, err = GetEnvs(db, "_")
if err != nil {
t.Fatal(err)
}
if len(envs) != 2 {
t.Fatal("expected 2 envs after failed insert")
}
slices.Sort(envs) // ONE > TWO
if envs[0] != "ONE=hello" || envs[1] != "TWO=goodbye" {
t.Fatalf("Expected ONE=hello and TWO=goodbye, got %v", envs)
}
// Delete env
if err := SetEnvs(db, "_", []string{"ONE="}); err != nil {
t.Fatal(err)
}
envs, err = GetEnvs(db, "_")
if err != nil {
t.Fatal(err)
}
if len(envs) != 1 {
t.Fatal("expected 1 env after deletion")
}
if envs[0] != "TWO=goodbye" {
t.Fatalf("Expected TWO=goodbye, got %s", envs[0])
}
}

View File

@ -11,7 +11,7 @@ import (
var migrations embed.FS
// Idempotently initialize the database. Safe to call unconditionally.
func InitDatabase(db *DB) error {
func InitDatabase(db DB) error {
rows, err := db.Query(`
select exists (
select 1
@ -41,7 +41,7 @@ func InitDatabase(db *DB) error {
}
// Get a map of migration names to whether the migration has been applied.
func GetPendingMigrations(db *DB) (map[string]bool, error) {
func GetPendingMigrations(db DB) (map[string]bool, error) {
allMigrations, err := migrations.ReadDir("sql")
if err != nil {
return nil, err
@ -69,7 +69,7 @@ func GetPendingMigrations(db *DB) (map[string]bool, error) {
}
// Apply a migration by name.
func ApplyMigration(db *DB, name string) error {
func ApplyMigration(db DB, name string) error {
data, err := migrations.ReadFile("sql/" + name)
if err != nil {
log.Fatalf("Missing migration %s", name)
@ -84,7 +84,7 @@ func ApplyMigration(db *DB, name string) error {
}
// Apply all pending migrations.
func MigrateDatabase(db *DB) error {
func MigrateDatabase(db DB) error {
pending, err := GetPendingMigrations(db)
if err != nil {
return err

View File

@ -7,7 +7,7 @@ import (
_ "github.com/mattn/go-sqlite3"
)
func EphemeralDb(t *testing.T) *DB {
func EphemeralDb(t *testing.T) DB {
// We don't use OpenDb here because you can't open two connections to the same memory mem
mem, err := sql.Open("sqlite3", ":memory:")
if err != nil {
@ -16,7 +16,7 @@ func EphemeralDb(t *testing.T) *DB {
if _, err = defaultPragma(mem); err != nil {
t.Fatal(err)
}
db := new(DB)
db := new(RoRwDb)
db.ro = mem
db.rw = mem
if err = InitDatabase(db); err != nil {
@ -33,7 +33,7 @@ func TestInitIdempotency(t *testing.T) {
if err != nil {
t.Fatal(err)
}
db := new(DB)
db := new(RoRwDb)
db.ro = mem
db.rw = mem
if err = InitDatabase(db); err != nil {

View File

@ -11,7 +11,7 @@ import (
_ "github.com/mattn/go-sqlite3"
)
func AddSource(db *DB, name string) error {
func AddSource(db DB, name string) error {
_, err := db.Exec(`
insert into sources (name)
values (?)
@ -20,7 +20,7 @@ func AddSource(db *DB, name string) error {
return err
}
func GetSources(db *DB) ([]string, error) {
func GetSources(db DB) ([]string, error) {
rows, err := db.Query(`
select name
from sources
@ -39,7 +39,7 @@ func GetSources(db *DB) ([]string, error) {
return names, nil
}
func DeleteSource(db *DB, name string) error {
func DeleteSource(db DB, name string) error {
_, err := db.Exec(`
delete from sources
where name = ?
@ -48,8 +48,8 @@ func DeleteSource(db *DB, name string) error {
return err
}
func AddItems(db *DB, items []Item) error {
return db.Transact(func(tx *sql.Tx) error {
func AddItems(db DB, items []Item) error {
return db.Transact(func(tx DB) error {
stmt, err := tx.Prepare(`
insert into items (source, id, active, title, author, body, link, time, action)
values (?, ?, ?, ?, ?, ?, ?, ?, jsonb(?))
@ -95,8 +95,8 @@ func BackfillItem(new *Item, old *Item) {
}
}
func UpdateItems(db *DB, items []Item) error {
return db.Transact(func(tx *sql.Tx) error {
func UpdateItems(db DB, items []Item) error {
return db.Transact(func(tx DB) error {
stmt, err := tx.Prepare(`
update items
set
@ -127,7 +127,7 @@ func UpdateItems(db *DB, items []Item) error {
}
// Deactivate an item, returning its previous active state.
func DeactivateItem(db *DB, source string, id string) (bool, error) {
func DeactivateItem(db DB, source string, id string) (bool, error) {
row := db.QueryRow(`
select active
from items
@ -150,7 +150,7 @@ func DeactivateItem(db *DB, source string, id string) (bool, error) {
return active, nil
}
func DeleteItem(db *DB, source string, id string) (int64, error) {
func DeleteItem(db DB, source string, id string) (int64, error) {
res, err := db.Exec(`
delete from items
where source = ?
@ -162,7 +162,7 @@ func DeleteItem(db *DB, source string, id string) (int64, error) {
return res.RowsAffected()
}
func getItems(db *DB, query string, args ...any) ([]Item, error) {
func getItems(db DB, query string, args ...any) ([]Item, error) {
rows, err := db.Query(query, args...)
if err != nil {
return nil, err
@ -179,7 +179,7 @@ func getItems(db *DB, query string, args ...any) ([]Item, error) {
return items, nil
}
func GetItem(db *DB, source string, id string) (Item, error) {
func GetItem(db DB, source string, id string) (Item, error) {
items, err := getItems(db, `
select source, id, created, active, title, author, body, link, time, json(action)
from items
@ -196,7 +196,7 @@ func GetItem(db *DB, source string, id string) (Item, error) {
return items[0], nil
}
func GetAllActiveItems(db *DB) ([]Item, error) {
func GetAllActiveItems(db DB) ([]Item, error) {
return getItems(db, `
select
source, id, created, active, title, author, body, link, time, json(action)
@ -206,7 +206,7 @@ func GetAllActiveItems(db *DB) ([]Item, error) {
`)
}
func GetAllItems(db *DB) ([]Item, error) {
func GetAllItems(db DB) ([]Item, error) {
return getItems(db, `
select
source, id, created, active, title, author, body, link, time, json(action)
@ -215,7 +215,7 @@ func GetAllItems(db *DB) ([]Item, error) {
`)
}
func GetActiveItemsForSource(db *DB, source string) ([]Item, error) {
func GetActiveItemsForSource(db DB, source string) ([]Item, error) {
return getItems(db, `
select
source, id, created, active, title, author, body, link, time, json(action)
@ -227,7 +227,7 @@ func GetActiveItemsForSource(db *DB, source string) ([]Item, error) {
`, source)
}
func GetAllItemsForSource(db *DB, source string) ([]Item, error) {
func GetAllItemsForSource(db DB, source string) ([]Item, error) {
return getItems(db, `
select
source, id, created, active, title, author, body, link, time, json(action)
@ -238,10 +238,34 @@ func GetAllItemsForSource(db *DB, source string) ([]Item, error) {
`, source)
}
func GetState(db DB, source string) ([]byte, error) {
row := db.QueryRow("select state from sources where name = ?", source)
var state []byte
err := row.Scan(&state)
return state, err
}
func SetState(db DB, source string, state []byte) error {
_, err := db.Exec("update sources set state = ? where name = ?", state, source)
return err
}
// Given the results of a fetch, add new items, update existing items, and delete expired items.
//
// Returns the number of new and deleted items on success.
func UpdateWithFetchedItems(db *DB, source string, items []Item) (int, int, error) {
func UpdateWithFetchedItems(db DB, source string, state []byte, items []Item) (int, int, error) {
var new int
var del int
var err error
err = db.Transact(func(tx DB) error {
new, del, err = updateWithFetchedItemsTx(tx, source, state, items)
return err
})
return new, del, err
}
// Implementation logic for [UpdateWithFetchedItems], which executes this inside a transaction.
func updateWithFetchedItemsTx(db DB, source string, state []byte, items []Item) (int, int, error) {
// Get the existing items
existingItems, err := GetAllItemsForSource(db, source)
if err != nil {
@ -278,26 +302,22 @@ func UpdateWithFetchedItems(db *DB, source string, items []Item) (int, int, erro
return 0, 0, err
}
envs, err := GetEnvs(db, source)
if err != nil {
return 0, 0, fmt.Errorf("failed to get envs for %s: %v", source, err)
}
// If the source has an on-create trigger, run it for each new item
// On-create errors are ignored to avoid failing the fetch
onCreateArgv, err := GetArgvForAction(db, source, "on_create")
if err == nil {
if err == nil && len(onCreateArgv) > 0 {
var updatedNewItems []Item
for _, item := range newItems {
itemJson, err := json.Marshal(item)
var updatedItem Item
updatedItem, state, err = ExecuteItemAction(item, onCreateArgv, envs, state, time.Minute)
if err != nil {
log.Fatalf("error: failed to serialize item: %v", err)
log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err)
}
res, err := Execute(source, onCreateArgv, nil, string(itemJson), time.Minute)
if err != nil {
log.Printf("error: failed to execute on_create for %s/%s: %v", item.Source, item.Id, err)
continue
}
if len(res) != 1 {
log.Printf("error: expected on_create for %s/%s to produce exactly one item, got %d", item.Source, item.Id, len(res))
}
updatedItem := res[0]
BackfillItem(&updatedItem, &item)
updatedNewItems = append(updatedNewItems, updatedItem)
}
UpdateItems(db, updatedNewItems)
@ -328,5 +348,9 @@ func UpdateWithFetchedItems(db *DB, source string, items []Item) (int, int, erro
}
}
if err = SetState(db, source, state); err != nil {
return 0, 0, err
}
return len(newItems), len(idsToDelete), nil
}

View File

@ -1,8 +1,10 @@
package core
import (
"errors"
"fmt"
"slices"
"strings"
"testing"
"time"
@ -128,18 +130,18 @@ func TestUpdateSourceAddAndDelete(t *testing.T) {
}
a := Item{Source: "test", Id: "a"}
add, del, err := UpdateWithFetchedItems(db, "test", []Item{a})
add, del, err := UpdateWithFetchedItems(db, "test", nil, []Item{a})
if add != 1 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
add, del, err = UpdateWithFetchedItems(db, "test", []Item{a})
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a})
if add != 0 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
b := Item{Source: "test", Id: "b"}
add, del, err = UpdateWithFetchedItems(db, "test", []Item{a, b})
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a, b})
if add != 1 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
@ -147,22 +149,67 @@ func TestUpdateSourceAddAndDelete(t *testing.T) {
if _, err = DeactivateItem(db, "test", "a"); err != nil {
t.Fatal(err)
}
add, del, err = UpdateWithFetchedItems(db, "test", []Item{a, b})
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a, b})
if add != 0 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
add, del, err = UpdateWithFetchedItems(db, "test", []Item{b})
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{b})
if add != 0 || del != 1 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
add, del, err = UpdateWithFetchedItems(db, "test", []Item{b})
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{b})
if add != 0 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
}
func TestUpdateSourceTransaction(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "s"); err != nil {
t.Fatal(err)
}
a := Item{Source: "s", Id: "a"}
b := Item{Source: "s", Id: "b"}
// Add and deactivate a so it will be deleted on next fetch without it
if add, _, err := UpdateWithFetchedItems(db, "s", nil, []Item{a}); add != 1 || err != nil {
t.Fatalf("expected 1 add, got %d and err %v", add, err)
}
if _, err := DeactivateItem(db, "s", "a"); err != nil {
t.Fatal(err)
}
// Add b and cause a to be deleted, but the delete throws an error
fdb := &FailureDb{
db: db,
execError: func(q string, a ...any) error {
if strings.Contains(q, "delete from") {
return errors.New("no deletes!")
}
return nil
},
}
add, del, err := UpdateWithFetchedItems(fdb, "s", nil, []Item{b})
if add != 0 || del != 0 || err == nil {
t.Fatalf("expected failure, got %d %d %v", add, del, err)
}
// Failure should not add b
items, err := GetAllItemsForSource(db, "s")
if err != nil {
t.Fatal(err)
}
if len(items) != 1 {
t.Fatalf("should only have one item, got %d", len((items)))
}
if items[0].Id != "a" {
t.Fatalf("expected only item to still be a, got %s", items[0].Id)
}
}
func TestOnCreateAction(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "test"); err != nil {
@ -173,7 +220,7 @@ func TestOnCreateAction(t *testing.T) {
}
execute := func(argv []string) []Item {
items, err := Execute("test", argv, nil, "", time.Minute)
items, _, err := Execute("test", argv, nil, nil, "", time.Minute)
if err != nil {
t.Fatal("unexpected error executing test fetch")
}
@ -200,7 +247,7 @@ func TestOnCreateAction(t *testing.T) {
// Noop on_create works
onCreate([]string{"tee"})
items := execute([]string{"jq", "-cn", `{id: "one"}`})
add, _, err := UpdateWithFetchedItems(db, "test", items)
add, _, err := UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil {
t.Fatal("failed update with noop oncreate")
}
@ -216,7 +263,7 @@ func TestOnCreateAction(t *testing.T) {
if items[0].Title != "Hello, World" {
t.Fatal("unexpected title")
}
add, _, err = UpdateWithFetchedItems(db, "test", items)
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil {
t.Fatal("failed update with alter oncreate")
}
@ -231,7 +278,7 @@ func TestOnCreateAction(t *testing.T) {
if items[0].Link != "" {
t.Fatal("unexpected link")
}
add, _, err = UpdateWithFetchedItems(db, "test", items)
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil {
t.Fatal("failed update with augment oncreate")
}
@ -246,7 +293,7 @@ func TestOnCreateAction(t *testing.T) {
if items[0].Link != "gopher://go.dev" {
t.Fatal("missing link")
}
add, _, err = UpdateWithFetchedItems(db, "test", items)
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil {
t.Fatal("failed update with attempted deletion oncreate")
}
@ -257,7 +304,7 @@ func TestOnCreateAction(t *testing.T) {
// item is created if on_create fails
onCreate([]string{"false"})
items = execute([]string{"jq", "-cn", `{id: "five"}`})
add, _, err = UpdateWithFetchedItems(db, "test", items)
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil {
t.Fatal("failed update with failing oncreate")
}
@ -271,7 +318,7 @@ func TestOnCreateAction(t *testing.T) {
if items[0].Title != "before" {
t.Fatal("unexpected title")
}
add, _, err = UpdateWithFetchedItems(db, "test", items)
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil {
t.Fatal("failed update with bad exit code oncreate")
}
@ -282,7 +329,7 @@ func TestOnCreateAction(t *testing.T) {
// on_create can't change id, active, or created
onCreate([]string{"jq", "-c", `.id = "seven"; .active = false; .created = 123456`})
items = execute([]string{"jq", "-cn", `{id: "seven"}`})
add, _, err = UpdateWithFetchedItems(db, "test", items)
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil {
t.Fatal("failed update with invalid field changes oncreate")
}
@ -291,3 +338,30 @@ func TestOnCreateAction(t *testing.T) {
t.Fatal("unexpected changes to id, active, or created fields")
}
}
func TestSourceState(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "s"); err != nil {
t.Fatal(err)
}
state, err := GetState(db, "s")
if err != nil {
t.Fatal(err)
}
if len(state) != 0 {
t.Fatal("expected no state on a fresh source")
}
if err = SetState(db, "s", []byte("hello, world")); err != nil {
t.Fatal(err)
}
state, err = GetState(db, "s")
if err != nil {
t.Fatal(err)
}
if string(state) != "hello, world" {
t.Fatalf("expected hello, world, got %s", state)
}
}

View File

@ -1,5 +1,6 @@
create table sources(
name text not null,
state blob,
primary key (name)
) strict;
create table actions(
@ -9,6 +10,13 @@ create table actions(
primary key (source, name),
foreign key (source) references sources (name) on delete cascade
) strict;
create table envs(
source text not null,
name text not null,
value text not null,
unique (source, name) on conflict replace,
foreign key (source) references sources (name) on delete cascade
) strict;
create table items(
source text not null,
id text not null,

View File

@ -4,7 +4,7 @@
<article class="center">
<span class="item-title">
<a href="/">Home</a>
[<a href="#">Active</a> | <a href="#">All</a>]
[<a href="?inactive=0">Active</a> | <a href="?inactive=1">All</a>]
</span>
</article>

View File

@ -1,7 +1,7 @@
package web
import (
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
@ -17,7 +17,7 @@ func (env *Env) getItem(writer http.ResponseWriter, req *http.Request) {
item, err := core.GetItem(env.db, source, id)
if err != nil {
writer.Write([]byte(err.Error()))
http.Error(writer, err.Error(), 500)
return
}
html.Item(writer, html.ItemData{Item: item})
@ -29,12 +29,12 @@ func (env *Env) deleteItem(writer http.ResponseWriter, req *http.Request) {
_, err := core.DeactivateItem(env.db, source, id)
if err != nil {
writer.Write([]byte(err.Error()))
http.Error(writer, err.Error(), 500)
return
}
item, err := core.GetItem(env.db, source, id)
if err != nil {
writer.Write([]byte(err.Error()))
http.Error(writer, err.Error(), 500)
return
}
html.Item(writer, html.ItemData{Item: item})
@ -45,6 +45,16 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) {
id := req.PathValue("id")
action := req.PathValue("action")
state, err := core.GetState(env.db, source)
if err != nil {
log.Fatalf("error: failed to load state for %s: %v", source, err)
}
envs, err := core.GetEnvs(env.db, source)
if err != nil {
log.Fatalf("error: failed to get envs for %s: %v", source, err)
}
item, err := core.GetItem(env.db, source, id)
if err != nil {
http.Error(writer, err.Error(), 500)
@ -62,27 +72,22 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) {
return
}
itemJson, err := json.Marshal(item)
newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute)
if err != nil {
http.Error(writer, err.Error(), 500)
return
}
res, err := core.Execute(source, argv, nil, string(itemJson), time.Minute)
if err != nil {
if err = env.db.Transact(func(tx core.DB) error {
if _err := core.UpdateItems(tx, []core.Item{newItem}); err != nil {
return fmt.Errorf("failed to update item: %v", _err)
}
if _err := core.SetState(tx, source, newState); err != nil {
return fmt.Errorf("failed to set state for %s: %v", source, _err)
}
return nil
}); err != nil {
http.Error(writer, err.Error(), 500)
return
}
if len(res) != 1 {
http.Error(writer, "not exactly one item", 500)
return
}
newItem := res[0]
core.BackfillItem(&newItem, &item)
if err = core.UpdateItems(env.db, []core.Item{newItem}); err != nil {
http.Error(writer, err.Error(), 500)
return
}
html.Item(writer, html.ItemData{Item: newItem})

View File

@ -9,7 +9,7 @@ import (
)
type Env struct {
db *core.DB
db core.DB
}
func logged(handler http.HandlerFunc) http.HandlerFunc {
@ -23,15 +23,15 @@ func handleFunc(pattern string, handler http.HandlerFunc) {
http.HandleFunc(pattern, logged(handler))
}
func RunServer(db *core.DB, addr string, port string) {
func RunServer(db core.DB, addr string, port string) {
env := &Env{db}
bind := net.JoinHostPort(addr, port)
handleFunc("GET /", env.getRoot)
handleFunc("GET /style.css", env.getStyle)
handleFunc("GET /htmx.org@2.0.4.js", env.getScript)
handleFunc("GET /source/{source}", env.getSource)
handleFunc("GET /item/{source}/{id}", env.getItem)
handleFunc("GET /", env.getRoot)
handleFunc("GET /style.css", env.getStyle)
handleFunc("GET /htmx.org@2.0.4.js", env.getScript)
handleFunc("GET /source/{source}", env.getSource)
handleFunc("GET /item/{source}/{id}", env.getItem)
handleFunc("DELETE /item/{source}/{id}", env.deleteItem)
handleFunc("POST /item/{source}/{id}/action/{action}", env.doAction)
handleFunc("POST /mass-deactivate", env.massDeactivate)

View File

@ -15,7 +15,7 @@ func (env *Env) getRoot(writer http.ResponseWriter, req *http.Request) {
names, err := core.GetSources(env.db)
if err != nil {
writer.Write([]byte(err.Error()))
http.Error(writer, err.Error(), 500)
}
var sources []html.SourceData

View File

@ -1,6 +1,7 @@
package web
import (
"log"
"net/http"
"github.com/Jaculabilis/intake/core"
@ -15,11 +16,20 @@ func (env *Env) getSource(writer http.ResponseWriter, req *http.Request) {
}
// TODO this needs to properly error if the source doesn't exist instead of just returning []
items, err := core.GetAllItemsForSource(env.db, source)
var items []core.Item
var err error
inactive := req.URL.Query().Get("inactive") == "1"
log.Printf("inactive = %t", inactive)
if inactive {
items, err = core.GetAllItemsForSource(env.db, source)
} else {
items, err = core.GetActiveItemsForSource(env.db, source)
}
if err != nil {
http.NotFound(writer, req)
http.Error(writer, err.Error(), 500)
return
}
data := html.FeedData{
Items: items,
}