Move postprocessing to update so it only runs on new items
This commit is contained in:
parent
d5b3af1f86
commit
1c8ec5ff8f
@ -91,7 +91,7 @@ func actionExecute(
|
|||||||
|
|
||||||
db := openAndMigrateDb()
|
db := openAndMigrateDb()
|
||||||
|
|
||||||
state, envs, argv, postProcess, err := core.GetSourceActionInputs(db, source, action)
|
state, envs, argv, err := core.GetSourceActionInputs(db, source, action)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error: failed to load data for %s: %v", source, err)
|
log.Fatalf("error: failed to load data for %s: %v", source, err)
|
||||||
}
|
}
|
||||||
@ -109,7 +109,7 @@ func actionExecute(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
newItem, newState, errItem, err := core.ExecuteItemAction(item, argv, envs, state, duration, postProcess)
|
newItem, newState, errItem, err := core.ExecuteItemAction(item, argv, envs, state, duration)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
core.AddErrorItem(db, errItem)
|
core.AddErrorItem(db, errItem)
|
||||||
log.Fatalf("error executing %s: %v", action, err)
|
log.Fatalf("error executing %s: %v", action, err)
|
||||||
|
@ -55,12 +55,12 @@ func sourceFetch(source string, format string, timeout string, dryRun bool) {
|
|||||||
|
|
||||||
db := openAndMigrateDb()
|
db := openAndMigrateDb()
|
||||||
|
|
||||||
state, envs, argv, postProcess, err := core.GetSourceActionInputs(db, source, "fetch")
|
state, envs, argv, err := core.GetSourceActionInputs(db, source, "fetch")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error: failed to load data for %s: %v", source, err)
|
log.Fatalf("error: failed to load data for %s: %v", source, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
items, newState, errItem, err := core.Execute(source, argv, envs, state, "", duration, postProcess)
|
items, newState, errItem, err := core.Execute(source, argv, envs, state, "", duration)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
core.AddErrorItem(db, errItem)
|
core.AddErrorItem(db, errItem)
|
||||||
log.Fatalf("error: failed to execute fetch: %v", err)
|
log.Fatalf("error: failed to execute fetch: %v", err)
|
||||||
|
@ -45,7 +45,7 @@ func sourceTest(env []string, format string, timeout string, cmd []string) {
|
|||||||
log.Fatalf("error: invalid duration: %v", err)
|
log.Fatalf("error: invalid duration: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
items, state, _, err := core.Execute("test", cmd, env, nil, "", duration, nil)
|
items, state, _, err := core.Execute("test", cmd, env, nil, "", duration)
|
||||||
log.Printf("returned %d items", len(items))
|
log.Printf("returned %d items", len(items))
|
||||||
log.Printf("wrote %d bytes of state", len(state))
|
log.Printf("wrote %d bytes of state", len(state))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -203,13 +203,13 @@ func fetchReadySources(db DB) {
|
|||||||
}
|
}
|
||||||
log.Printf("%s: fetching", schedule.Source)
|
log.Printf("%s: fetching", schedule.Source)
|
||||||
|
|
||||||
state, envs, argv, postProcess, err := GetSourceActionInputs(db, schedule.Source, "fetch")
|
state, envs, argv, err := GetSourceActionInputs(db, schedule.Source, "fetch")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error: failed to load data for %s: %v", schedule.Source, err)
|
log.Printf("error: failed to load data for %s: %v", schedule.Source, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
items, newState, errItem, err := Execute(schedule.Source, argv, envs, state, "", DefaultTimeout, postProcess)
|
items, newState, errItem, err := Execute(schedule.Source, argv, envs, state, "", DefaultTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AddErrorItem(db, errItem)
|
AddErrorItem(db, errItem)
|
||||||
log.Printf("error: failed to execute fetch: %v", err)
|
log.Printf("error: failed to execute fetch: %v", err)
|
||||||
|
@ -62,7 +62,6 @@ func Execute(
|
|||||||
state []byte,
|
state []byte,
|
||||||
input string,
|
input string,
|
||||||
timeout time.Duration,
|
timeout time.Duration,
|
||||||
postProcess func(item Item) Item,
|
|
||||||
) (
|
) (
|
||||||
items []Item,
|
items []Item,
|
||||||
newState []byte,
|
newState []byte,
|
||||||
@ -193,9 +192,6 @@ monitor:
|
|||||||
}
|
}
|
||||||
itemIds[item.Id] = true
|
itemIds[item.Id] = true
|
||||||
|
|
||||||
if postProcess != nil {
|
|
||||||
item = postProcess(item)
|
|
||||||
}
|
|
||||||
item.Active = true // These fields aren't up to
|
item.Active = true // These fields aren't up to
|
||||||
item.Created = 0 // the action to set and
|
item.Created = 0 // the action to set and
|
||||||
item.Source = source // shouldn't be overrideable
|
item.Source = source // shouldn't be overrideable
|
||||||
@ -269,7 +265,6 @@ func ExecuteItemAction(
|
|||||||
env []string,
|
env []string,
|
||||||
state []byte,
|
state []byte,
|
||||||
timeout time.Duration,
|
timeout time.Duration,
|
||||||
postProcess func(item Item) Item,
|
|
||||||
) (
|
) (
|
||||||
newItem Item,
|
newItem Item,
|
||||||
newState []byte,
|
newState []byte,
|
||||||
@ -285,7 +280,7 @@ func ExecuteItemAction(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
res, newState, errItem, err := Execute(item.Source, argv, env, state, string(itemJson), timeout, postProcess)
|
res, newState, errItem, err := Execute(item.Source, argv, env, state, string(itemJson), timeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("failed to execute action for %s/%s: %v", item.Source, item.Id, err)
|
err = fmt.Errorf("failed to execute action for %s/%s: %v", item.Source, item.Id, err)
|
||||||
errItem = makeErrorItem(err, nil)
|
errItem = makeErrorItem(err, nil)
|
||||||
|
@ -26,7 +26,7 @@ func TestExecute(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
execute := func(argv []string) ([]Item, error) {
|
execute := func(argv []string) ([]Item, error) {
|
||||||
item, _, _, err := Execute("_", argv, nil, nil, "", time.Minute, nil)
|
item, _, _, err := Execute("_", argv, nil, nil, "", time.Minute)
|
||||||
return item, err
|
return item, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -49,7 +49,7 @@ func TestExecute(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("Timeout", func(t *testing.T) {
|
t.Run("Timeout", func(t *testing.T) {
|
||||||
res, _, _, err := Execute("_", []string{"sleep", "10"}, nil, nil, "", time.Millisecond, nil)
|
res, _, _, err := Execute("_", []string{"sleep", "10"}, nil, nil, "", time.Millisecond)
|
||||||
assertNotNil(t, err)
|
assertNotNil(t, err)
|
||||||
assertLen(t, res, 0)
|
assertLen(t, res, 0)
|
||||||
})
|
})
|
||||||
@ -64,7 +64,7 @@ func TestExecute(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("ReadFromStdin", func(t *testing.T) {
|
t.Run("ReadFromStdin", func(t *testing.T) {
|
||||||
res, _, _, err := Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, nil, "bar", time.Minute, nil)
|
res, _, _, err := Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, nil, "bar", time.Minute)
|
||||||
assertNil(t, err)
|
assertNil(t, err)
|
||||||
assertLen(t, res, 1)
|
assertLen(t, res, 1)
|
||||||
if res[0].Id != "bar" {
|
if res[0].Id != "bar" {
|
||||||
@ -73,7 +73,7 @@ func TestExecute(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("SetEnv", func(t *testing.T) {
|
t.Run("SetEnv", func(t *testing.T) {
|
||||||
res, _, _, err := Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, nil, "", time.Minute, nil)
|
res, _, _, err := Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, nil, "", time.Minute)
|
||||||
assertNil(t, err)
|
assertNil(t, err)
|
||||||
assertLen(t, res, 1)
|
assertLen(t, res, 1)
|
||||||
if res[0].Id != "baz" {
|
if res[0].Id != "baz" {
|
||||||
@ -160,7 +160,7 @@ func TestExecute(t *testing.T) {
|
|||||||
|
|
||||||
t.Run("ReadState", func(t *testing.T) {
|
t.Run("ReadState", func(t *testing.T) {
|
||||||
argv := []string{"sh", "-c", `cat $STATE_PATH | jq -cR '{id: "greeting", title: .} | .title = "Hello " + .title'`}
|
argv := []string{"sh", "-c", `cat $STATE_PATH | jq -cR '{id: "greeting", title: .} | .title = "Hello " + .title'`}
|
||||||
res, _, _, err := Execute("_", argv, nil, []byte("world"), "", time.Minute, nil)
|
res, _, _, err := Execute("_", argv, nil, []byte("world"), "", time.Minute)
|
||||||
assertNil(t, err)
|
assertNil(t, err)
|
||||||
assertLen(t, res, 1)
|
assertLen(t, res, 1)
|
||||||
if res[0].Title != "Hello world" {
|
if res[0].Title != "Hello world" {
|
||||||
@ -170,7 +170,7 @@ func TestExecute(t *testing.T) {
|
|||||||
|
|
||||||
t.Run("WriteState", func(t *testing.T) {
|
t.Run("WriteState", func(t *testing.T) {
|
||||||
argv := []string{"sh", "-c", `printf "Hello world" > $STATE_PATH; jq -cn '{id: "test"}'`}
|
argv := []string{"sh", "-c", `printf "Hello world" > $STATE_PATH; jq -cn '{id: "test"}'`}
|
||||||
res, newState, _, err := Execute("_", argv, nil, nil, "", time.Minute, nil)
|
res, newState, _, err := Execute("_", argv, nil, nil, "", time.Minute)
|
||||||
assertNil(t, err)
|
assertNil(t, err)
|
||||||
assertLen(t, res, 1)
|
assertLen(t, res, 1)
|
||||||
if string(newState) != "Hello world" {
|
if string(newState) != "Hello world" {
|
||||||
@ -178,22 +178,9 @@ func TestExecute(t *testing.T) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("PostprocessSetTtl", func(t *testing.T) {
|
|
||||||
argv := []string{"jq", "-cn", `{id: "foo"}`}
|
|
||||||
res, _, _, err := Execute("_", argv, nil, nil, "", time.Minute, func(item Item) Item {
|
|
||||||
item.Ttl = 123456
|
|
||||||
return item
|
|
||||||
})
|
|
||||||
assertNil(t, err)
|
|
||||||
assertLen(t, res, 1)
|
|
||||||
if res[0].Ttl != 123456 {
|
|
||||||
t.Fatalf("expected ttl to be set to 123456, got %d", res[0].Ttl)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("ErrorItem", func(t *testing.T) {
|
t.Run("ErrorItem", func(t *testing.T) {
|
||||||
argv := []string{"sh", "-c", `echo 1>&2 Hello; jq -cn '{id: "box"}'; echo 1>&2 World; printf '{"whoops": "my bad"}'`}
|
argv := []string{"sh", "-c", `echo 1>&2 Hello; jq -cn '{id: "box"}'; echo 1>&2 World; printf '{"whoops": "my bad"}'`}
|
||||||
_, _, errItem, err := Execute("test", argv, nil, nil, "", time.Minute, nil)
|
_, _, errItem, err := Execute("test", argv, nil, nil, "", time.Minute)
|
||||||
assertNotNil(t, err)
|
assertNotNil(t, err)
|
||||||
if errItem.Id == "" {
|
if errItem.Id == "" {
|
||||||
t.Error("missing erritem id")
|
t.Error("missing erritem id")
|
||||||
|
@ -138,27 +138,21 @@ func GetSourceActionInputs(
|
|||||||
state []byte,
|
state []byte,
|
||||||
envs []string,
|
envs []string,
|
||||||
argv []string,
|
argv []string,
|
||||||
postProcess func(Item) Item,
|
|
||||||
err error,
|
err error,
|
||||||
) {
|
) {
|
||||||
state, err = GetState(db, source)
|
state, err = GetState(db, source)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, nil, fmt.Errorf("failed to load state for %s: %v", source, err)
|
return nil, nil, nil, fmt.Errorf("failed to load state for %s: %v", source, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
envs, err = GetEnvs(db, source)
|
envs, err = GetEnvs(db, source)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, nil, fmt.Errorf("failed to get envs for %s: %v", source, err)
|
return nil, nil, nil, fmt.Errorf("failed to get envs for %s: %v", source, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
argv, err = GetArgvForAction(db, source, action)
|
argv, err = GetArgvForAction(db, source, action)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, nil, fmt.Errorf("failed to get %s action for %s: %v", action, source, err)
|
return nil, nil, nil, fmt.Errorf("failed to get %s action for %s: %v", action, source, err)
|
||||||
}
|
|
||||||
|
|
||||||
postProcess, err = GetSourcePostProcessor(db, source)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, nil, nil, fmt.Errorf("failed to get %s post-processor: %v", source, err)
|
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -191,6 +185,21 @@ func updateWithFetchedItemsTx(
|
|||||||
items []Item,
|
items []Item,
|
||||||
now time.Time,
|
now time.Time,
|
||||||
) (int, int, error) {
|
) (int, int, error) {
|
||||||
|
envs, err := GetEnvs(db, source)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, fmt.Errorf("failed to get envs for %s: %v", source, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
postProcess, err := GetSourcePostProcessor(db, source)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, fmt.Errorf("failed to get post-processor for %s: %v", source, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
onCreateArgv, err := GetArgvForAction(db, source, "on_create")
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("error: failed to get on_create action for %s: %v", source, err)
|
||||||
|
}
|
||||||
|
|
||||||
// Get all existing items
|
// Get all existing items
|
||||||
existingItems, err := GetAllItemsForSource(db, source, 0, -1)
|
existingItems, err := GetAllItemsForSource(db, source, 0, -1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -214,6 +223,13 @@ func updateWithFetchedItemsTx(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Apply post-processing to the new items
|
||||||
|
if postProcess != nil {
|
||||||
|
for i := range newItems {
|
||||||
|
newItems[i] = postProcess(newItems[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Bulk insert the new items
|
// Bulk insert the new items
|
||||||
if err = AddItems(db, newItems); err != nil {
|
if err = AddItems(db, newItems); err != nil {
|
||||||
return 0, 0, err
|
return 0, 0, err
|
||||||
@ -227,25 +243,14 @@ func updateWithFetchedItemsTx(
|
|||||||
return 0, 0, err
|
return 0, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
envs, err := GetEnvs(db, source)
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, fmt.Errorf("failed to get envs for %s: %v", source, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
postProcess, err := GetSourcePostProcessor(db, source)
|
|
||||||
if err != nil {
|
|
||||||
return 0, 0, fmt.Errorf("failed to get post-processor for %s: %v", source, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the source has an on-create trigger, run it for each new item
|
// If the source has an on-create trigger, run it for each new item
|
||||||
// On-create errors are ignored to avoid failing the fetch
|
// On-create errors are ignored to avoid failing the fetch
|
||||||
onCreateArgv, err := GetArgvForAction(db, source, "on_create")
|
if len(onCreateArgv) > 0 {
|
||||||
if err == nil && len(onCreateArgv) > 0 {
|
|
||||||
var updatedNewItems []Item
|
var updatedNewItems []Item
|
||||||
for _, item := range newItems {
|
for _, item := range newItems {
|
||||||
var updatedItem Item
|
var updatedItem Item
|
||||||
var errItem Item
|
var errItem Item
|
||||||
updatedItem, state, errItem, err = ExecuteItemAction(item, onCreateArgv, envs, state, DefaultTimeout, postProcess)
|
updatedItem, state, errItem, err = ExecuteItemAction(item, onCreateArgv, envs, state, DefaultTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
AddErrorItem(db, errItem)
|
AddErrorItem(db, errItem)
|
||||||
log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err)
|
log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err)
|
||||||
|
@ -147,7 +147,7 @@ func TestOnCreateAction(t *testing.T) {
|
|||||||
|
|
||||||
execute := func(argv []string) []Item {
|
execute := func(argv []string) []Item {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
items, _, _, err := Execute("test", argv, nil, nil, "", time.Minute, nil)
|
items, _, _, err := Execute("test", argv, nil, nil, "", time.Minute)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error executing test fetch: %v", err)
|
t.Fatalf("unexpected error executing test fetch: %v", err)
|
||||||
}
|
}
|
||||||
@ -300,7 +300,7 @@ func TestSourceState(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSourceTtx(t *testing.T) {
|
func TestSourcePostProcessor(t *testing.T) {
|
||||||
db := EphemeralDb(t)
|
db := EphemeralDb(t)
|
||||||
if err := AddSource(db, "s"); err != nil {
|
if err := AddSource(db, "s"); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -323,6 +323,34 @@ func TestSourceTtx(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSourceUpdateAppliesPostProcess(t *testing.T) {
|
||||||
|
db := EphemeralDb(t)
|
||||||
|
if err := AddSource(db, "s"); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := SetEnvs(db, "s", []string{
|
||||||
|
"INTAKE_TTL=30",
|
||||||
|
"INTAKE_TTD=60",
|
||||||
|
"INTAKE_TTS=90",
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
item := Item{Source: "s", Id: "i"}
|
||||||
|
add, del, err := UpdateWithFetchedItems(db, "s", nil, []Item{item}, time.Now())
|
||||||
|
if add != 1 || del != 0 || err != nil {
|
||||||
|
t.Fatalf("expected 1 add, got %d and err %v", add, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
after, err := GetItem(db, "s", "i")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("item not added: %v", err)
|
||||||
|
}
|
||||||
|
if after.Ttl != 30 || after.Ttd != 60 || after.Tts != 90 {
|
||||||
|
t.Fatalf("Missing value after postProcess: ttl = %d, ttd = %d, tts = %d", after.Ttl, after.Ttd, after.Tts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestSourceLastUpdated(t *testing.T) {
|
func TestSourceLastUpdated(t *testing.T) {
|
||||||
db := EphemeralDb(t)
|
db := EphemeralDb(t)
|
||||||
if err := AddSource(db, "s"); err != nil {
|
if err := AddSource(db, "s"); err != nil {
|
||||||
|
@ -69,7 +69,7 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) {
|
|||||||
id := req.PathValue("id")
|
id := req.PathValue("id")
|
||||||
action := req.PathValue("action")
|
action := req.PathValue("action")
|
||||||
|
|
||||||
state, envs, argv, postProcess, err := core.GetSourceActionInputs(env.db, source, action)
|
state, envs, argv, err := core.GetSourceActionInputs(env.db, source, action)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(writer, fmt.Sprintf("error: failed to load data for %s: %v", source, err), 500)
|
http.Error(writer, fmt.Sprintf("error: failed to load data for %s: %v", source, err), 500)
|
||||||
}
|
}
|
||||||
@ -85,7 +85,7 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
newItem, newState, errItem, err := core.ExecuteItemAction(item, argv, envs, state, core.DefaultTimeout, postProcess)
|
newItem, newState, errItem, err := core.ExecuteItemAction(item, argv, envs, state, core.DefaultTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
core.AddErrorItem(env.db, errItem)
|
core.AddErrorItem(env.db, errItem)
|
||||||
http.Error(writer, err.Error(), 500)
|
http.Error(writer, err.Error(), 500)
|
||||||
|
@ -56,13 +56,13 @@ func (env *Env) fetchSource(writer http.ResponseWriter, req *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
state, envs, argv, postProcess, err := core.GetSourceActionInputs(env.db, source, "fetch")
|
state, envs, argv, err := core.GetSourceActionInputs(env.db, source, "fetch")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(writer, fmt.Sprintf("error: failed to get data for %s: %v", source, err.Error()), 500)
|
http.Error(writer, fmt.Sprintf("error: failed to get data for %s: %v", source, err.Error()), 500)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
items, newState, errItem, err := core.Execute(source, argv, envs, state, "", core.DefaultTimeout, postProcess)
|
items, newState, errItem, err := core.Execute(source, argv, envs, state, "", core.DefaultTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
core.AddErrorItem(env.db, errItem)
|
core.AddErrorItem(env.db, errItem)
|
||||||
http.Error(writer, fmt.Sprintf("error: failed to execute fetch: %v", err.Error()), 500)
|
http.Error(writer, fmt.Sprintf("error: failed to execute fetch: %v", err.Error()), 500)
|
||||||
|
Loading…
Reference in New Issue
Block a user