From d072f8dc8e8fc6bb1383f47541fd1f2dbc41c8f6 Mon Sep 17 00:00:00 2001 From: Aaron Harper <aaron@inngest.com> Date: Mon, 5 Aug 2024 20:29:08 -0400 Subject: [PATCH 1/2] Add group.parallel --- .github/workflows/go.yml | 23 ++++++- Makefile | 10 ++- experimental/group/group.go | 50 ++++++++++++++ handler.go | 18 ++++- handler_test.go | 10 +-- internal/sdkrequest/request.go | 13 ++-- step/run.go | 21 +++++- step/step.go | 33 +++++++++ tests/main_test.go | 97 ++++++++++++++++++++++++++ tests/parallel_test.go | 122 +++++++++++++++++++++++++++++++++ tests/utils.go | 35 ++++++++++ 11 files changed, 412 insertions(+), 20 deletions(-) create mode 100644 experimental/group/group.go create mode 100644 tests/main_test.go create mode 100644 tests/parallel_test.go create mode 100644 tests/utils.go diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index d388fa02..3d765a48 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -37,6 +37,25 @@ jobs: uses: actions/setup-go@v3 with: go-version: '1.21' - - name: Test - run: go test -v -race -count=1 + - name: Unit test + run: go test -v -race -count=1 -short + itest: + strategy: + matrix: + os: [ubuntu-latest] + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v3 + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: '1.21' + + # Need npx to start the Dev Server + - name: Set up Node.js + uses: actions/setup-node@v3 + with: + node-version: '18' + - name: Integration test + run: make itest diff --git a/Makefile b/Makefile index 72e97943..b808d4e6 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,10 @@ -.PHONY: test -test: - go test -test.v +.PHONY: itest +itest: + go test ./tests -v -count=1 + +.PHONY: utest +utest: + go test -test.v -short .PHONY: lint lint: diff --git a/experimental/group/group.go b/experimental/group/group.go new file mode 100644 index 00000000..2ee4c744 --- /dev/null +++ b/experimental/group/group.go @@ -0,0 +1,50 @@ +package group + +import ( + "context" + "fmt" + + "github.com/inngest/inngestgo/step" +) + +type Result struct { + Error error + Value any +} + +func Parallel( + ctx context.Context, + fns ...func(ctx context.Context, + ) (any, error)) []Result { + ctx = context.WithValue(ctx, step.ParallelKey, true) + + results := []Result{} + isPlanned := false + ch := make(chan struct{}, 1) + for _, fn := range fns { + fn := fn + go func(fn func(ctx context.Context) (any, error)) { + defer func() { + if r := recover(); r != nil { + if _, ok := r.(step.ControlHijack); ok { + isPlanned = true + } else { + // TODO: What to do here? + fmt.Println("TODO") + } + } + ch <- struct{}{} + }() + + value, err := fn(ctx) + results = append(results, Result{Error: err, Value: value}) + }(fn) + <-ch + } + + if isPlanned { + panic(step.ControlHijack{}) + } + + return results +} diff --git a/handler.go b/handler.go index 57bf4f1c..36a382db 100644 --- a/handler.go +++ b/handler.go @@ -556,8 +556,13 @@ func (h *handler) invoke(w http.ResponseWriter, r *http.Request) error { }() } + var stepID *string + if rawStepID := r.URL.Query().Get("stepId"); rawStepID != "" && rawStepID != "step" { + stepID = &rawStepID + } + // Invoke the function, then immediately stop the streaming buffer. - resp, ops, err := invoke(r.Context(), fn, request) + resp, ops, err := invoke(r.Context(), fn, request, stepID) streamCancel() // NOTE: When triggering step errors, we should have an OpcodeStepError @@ -779,7 +784,12 @@ type StreamResponse struct { // invoke calls a given servable function with the specified input event. The input event must // be fully typed. -func invoke(ctx context.Context, sf ServableFunction, input *sdkrequest.Request) (any, []state.GeneratorOpcode, error) { +func invoke( + ctx context.Context, + sf ServableFunction, + input *sdkrequest.Request, + stepID *string, +) (any, []state.GeneratorOpcode, error) { if sf.Func() == nil { // This should never happen, but as sf.Func returns a nillable type we // must check that the function exists. @@ -790,6 +800,10 @@ func invoke(ctx context.Context, sf ServableFunction, input *sdkrequest.Request) // within a step. This allows us to prevent any execution of future tools after a // tool has run. fCtx, cancel := context.WithCancel(context.Background()) + if stepID != nil { + fCtx = step.SetTargetStepID(fCtx, *stepID) + } + // This must be a pointer so that it can be mutated from within function tools. mgr := sdkrequest.NewManager(cancel, input) fCtx = sdkrequest.SetManager(fCtx, mgr) diff --git a/handler_test.go b/handler_test.go index a8b7f61d..be1a3f70 100644 --- a/handler_test.go +++ b/handler_test.go @@ -97,7 +97,7 @@ func TestInvoke(t *testing.T) { Register(a) t.Run("it invokes the function with correct types", func(t *testing.T) { - actual, op, err := invoke(ctx, a, createRequest(t, input)) + actual, op, err := invoke(ctx, a, createRequest(t, input), nil) require.NoError(t, err) require.Nil(t, op) require.Equal(t, resp, actual) @@ -131,7 +131,7 @@ func TestInvoke(t *testing.T) { Register(a) t.Run("it invokes the function with correct types", func(t *testing.T) { - actual, op, err := invoke(ctx, a, createBatchRequest(t, input, 5)) + actual, op, err := invoke(ctx, a, createBatchRequest(t, input, 5), nil) require.NoError(t, err) require.Nil(t, op) require.Equal(t, resp, actual) @@ -166,7 +166,7 @@ func TestInvoke(t *testing.T) { ctx := context.Background() t.Run("it invokes the function with correct types", func(t *testing.T) { - actual, op, err := invoke(ctx, a, createRequest(t, input)) + actual, op, err := invoke(ctx, a, createRequest(t, input), nil) require.NoError(t, err) require.Nil(t, op) require.Equal(t, resp, actual) @@ -204,7 +204,7 @@ func TestInvoke(t *testing.T) { ctx := context.Background() t.Run("it invokes the function with correct types", func(t *testing.T) { - actual, op, err := invoke(ctx, a, createRequest(t, input)) + actual, op, err := invoke(ctx, a, createRequest(t, input), nil) require.NoError(t, err) require.Nil(t, op) require.Equal(t, resp, actual) @@ -241,7 +241,7 @@ func TestInvoke(t *testing.T) { ctx := context.Background() t.Run("it invokes the function with correct types", func(t *testing.T) { - actual, op, err := invoke(ctx, a, createRequest(t, input)) + actual, op, err := invoke(ctx, a, createRequest(t, input), nil) require.NoError(t, err) require.Nil(t, op) require.Equal(t, resp, actual) diff --git a/internal/sdkrequest/request.go b/internal/sdkrequest/request.go index cca27098..4b694ef0 100644 --- a/internal/sdkrequest/request.go +++ b/internal/sdkrequest/request.go @@ -14,12 +14,13 @@ type Request struct { // CallCtx represents context for individual function calls. This logs the function ID, the // specific run ID, and sep information. type CallCtx struct { - Env string `json:"env"` - FunctionID string `json:"fn_id"` - RunID string `json:"run_id"` - StepID string `json:"step_id"` - Stack CallStack `json:"stack"` - Attempt int `json:"attempt"` + DisableImmediateExecution bool `json:"disable_immediate_execution"` + Env string `json:"env"` + FunctionID string `json:"fn_id"` + RunID string `json:"run_id"` + StepID string `json:"step_id"` + Stack CallStack `json:"stack"` + Attempt int `json:"attempt"` } type CallStack struct { diff --git a/step/run.go b/step/run.go index 740c13ea..1e98bc2e 100644 --- a/step/run.go +++ b/step/run.go @@ -32,8 +32,10 @@ func Run[T any]( id string, f func(ctx context.Context) (T, error), ) (T, error) { + targetID := getTargetStepID(ctx) mgr := preflight(ctx) op := mgr.NewOp(enums.OpcodeStep, id, nil) + hashedID := op.MustHash() if val, ok := mgr.Step(op); ok { // Create a new empty type T in v @@ -78,6 +80,21 @@ func Run[T any]( return val, nil } + if targetID != nil && *targetID != hashedID { + panic(ControlHijack{}) + } + + planParallel := targetID == nil && isParallel(ctx) + planBeforeRun := targetID == nil && mgr.Request().CallCtx.DisableImmediateExecution + if planParallel || planBeforeRun { + mgr.AppendOp(state.GeneratorOpcode{ + ID: hashedID, + Op: enums.OpcodeStepPlanned, + Name: id, + }) + panic(ControlHijack{}) + } + // We're calling a function, so always cancel the context afterwards so that no // other tools run. defer mgr.Cancel() @@ -94,7 +111,7 @@ func Run[T any]( // Implement per-step errors. mgr.AppendOp(state.GeneratorOpcode{ - ID: op.MustHash(), + ID: hashedID, Op: enums.OpcodeStepError, Name: id, Error: &state.UserError{ @@ -112,7 +129,7 @@ func Run[T any]( mgr.SetErr(fmt.Errorf("unable to marshal run respone for '%s': %w", id, err)) } mgr.AppendOp(state.GeneratorOpcode{ - ID: op.MustHash(), + ID: hashedID, Op: enums.OpcodeStepRun, Name: id, Data: byt, diff --git a/step/step.go b/step/step.go index 0fd8dbef..3ac3934c 100644 --- a/step/step.go +++ b/step/step.go @@ -8,6 +8,13 @@ import ( type ControlHijack struct{} +type ctxKey string + +const ( + targetStepIDKey = ctxKey("stepID") + ParallelKey = ctxKey("parallelKey") +) + var ( // ErrNotInFunction is called when a step tool is executed outside of an Inngest // function call context. @@ -23,6 +30,32 @@ func (errNotInFunction) Error() string { return "step called without function context" } +func getTargetStepID(ctx context.Context) *string { + if v := ctx.Value(targetStepIDKey); v != nil { + if c, ok := v.(string); ok { + return &c + } + } + return nil +} + +func SetTargetStepID(ctx context.Context, id string) context.Context { + if id == "" || id == "step" { + return ctx + } + + return context.WithValue(ctx, targetStepIDKey, id) +} + +func isParallel(ctx context.Context) bool { + if v := ctx.Value(ParallelKey); v != nil { + if c, ok := v.(bool); ok { + return c + } + } + return false +} + func preflight(ctx context.Context) sdkrequest.InvocationManager { if ctx.Err() != nil { // Another tool has already ran and the context is closed. Return diff --git a/tests/main_test.go b/tests/main_test.go new file mode 100644 index 00000000..03c66fb5 --- /dev/null +++ b/tests/main_test.go @@ -0,0 +1,97 @@ +package tests + +import ( + "fmt" + "net/http" + "os" + "os/exec" + "syscall" + "testing" + "time" + + "github.com/inngest/inngestgo" +) + +func TestMain(m *testing.M) { + teardown, err := setup() + if err != nil { + fmt.Fprintf(os.Stderr, "failed to setup: %v\n", err) + os.Exit(1) + } + + code := m.Run() + + err = teardown() + if err != nil { + fmt.Fprintf(os.Stderr, "failed to teardown: %v\n", err) + os.Exit(1) + } + + os.Exit(code) +} + +func setup() (func() error, error) { + os.Setenv("INNGEST_DEV", "1") + + inngestgo.DefaultClient = inngestgo.NewClient( + inngestgo.ClientOpts{ + EventKey: inngestgo.StrPtr("dev"), + }, + ) + + // return func() error { return nil }, nil + + stopDevServer, err := startDevServer() + if err != nil { + return nil, err + } + + return stopDevServer, nil +} + +func startDevServer() (func() error, error) { + fmt.Println("Starting Dev Server") + cmd := exec.Command( + "bash", + "-c", + "npx --yes inngest-cli@latest dev --no-discovery --no-poll", + ) + + // Run in a new process group so we can kill the process and its children + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + + err := cmd.Start() + if err != nil { + return nil, fmt.Errorf("failed to start command: %w", err) + } + + // Wait for Dev Server to start + fmt.Println("Waiting for Dev Server to start") + httpClient := http.Client{Timeout: time.Second} + start := time.Now() + for { + resp, err := httpClient.Get("http://0.0.0.0:8288") + if err == nil && resp.StatusCode == 200 { + break + } + if time.Since(start) > 20*time.Second { + return nil, fmt.Errorf("timeout waiting for Dev Server to start: %w", err) + } + <-time.After(500 * time.Millisecond) + } + + // Callback to stop the Dev Server + stop := func() error { + fmt.Println("Stopping Dev Server") + pgid, err := syscall.Getpgid(cmd.Process.Pid) + if err != nil { + return fmt.Errorf("failed to get process group ID: %w", err) + } + if err := syscall.Kill(-pgid, syscall.SIGKILL); err != nil { + return fmt.Errorf("failed to kill process group: %w", err) + } + return nil + } + + return stop, nil +} diff --git a/tests/parallel_test.go b/tests/parallel_test.go new file mode 100644 index 00000000..b2ee5630 --- /dev/null +++ b/tests/parallel_test.go @@ -0,0 +1,122 @@ +package tests + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/inngest/inngestgo" + "github.com/inngest/inngestgo/experimental/group" + "github.com/inngest/inngestgo/step" + "github.com/stretchr/testify/require" +) + +func TestParallel(t *testing.T) { + if testing.Short() { + t.Skip() + } + + ctx := context.Background() + r := require.New(t) + + state := struct { + invokedFnCounter int + step1ACounter int + step1BCounter int + stepAfterCounter int + parallelResults []group.Result + }{} + + appName := "test" + h := inngestgo.NewHandler(appName, inngestgo.HandlerOpts{}) + + fn1 := inngestgo.CreateFunction( + inngestgo.FunctionOpts{ + ID: "my-fn-1", + Name: "my-fn-1", + }, + inngestgo.EventTrigger("dummy", nil), + func(ctx context.Context, input inngestgo.Input[any]) (any, error) { + state.invokedFnCounter++ + return "invoked output", nil + }, + ) + + eventName := "my-event" + fn2 := inngestgo.CreateFunction( + inngestgo.FunctionOpts{ + ID: "my-fn-2", + Name: "my-fn-2", + }, + inngestgo.EventTrigger(eventName, nil), + func(ctx context.Context, input inngestgo.Input[any]) (any, error) { + state.parallelResults = group.Parallel( + ctx, + func(ctx context.Context) (any, error) { + return step.Invoke[any](ctx, "invoke", step.InvokeOpts{ + FunctionId: fmt.Sprintf("%s-%s", appName, fn1.Config().ID), + }) + }, + func(ctx context.Context) (any, error) { + return step.Run(ctx, "1a", func(ctx context.Context) (int, error) { + state.step1ACounter++ + return 1, nil + }) + }, + func(ctx context.Context) (any, error) { + return step.Run(ctx, "1b", func(ctx context.Context) (int, error) { + state.step1BCounter++ + return 2, nil + }) + }, + func(ctx context.Context) (any, error) { + step.Sleep(ctx, "sleep", time.Second) + return nil, nil + }, + func(ctx context.Context) (any, error) { + return step.WaitForEvent[any](ctx, "wait", step.WaitForEventOpts{ + Event: "never", + Timeout: time.Second, + }) + }, + ) + + _, err := step.Run(ctx, "after", func(ctx context.Context) (any, error) { + state.stepAfterCounter++ + return nil, nil + }) + if err != nil { + return nil, err + } + + return nil, nil + }, + ) + + h.Register(fn1, fn2) + + server, sync := serve(t, h) + defer server.Close() + r.NoError(sync()) + + _, err := inngestgo.Send(ctx, inngestgo.Event{ + Name: eventName, + Data: map[string]any{"foo": "bar"}}, + ) + r.NoError(err) + + r.Eventually(func() bool { + return state.stepAfterCounter == 1 + }, 5*time.Second, 10*time.Millisecond) + r.Equal(1, state.invokedFnCounter) + r.Equal(1, state.step1ACounter) + r.Equal(1, state.step1BCounter) + r.Equal(state.parallelResults, []group.Result{ + {Value: "invoked output"}, + {Value: 1}, + {Value: 2}, + {Value: nil}, + {Error: step.ErrEventNotReceived}, + }) +} diff --git a/tests/utils.go b/tests/utils.go new file mode 100644 index 00000000..03ab6a49 --- /dev/null +++ b/tests/utils.go @@ -0,0 +1,35 @@ +package tests + +import ( + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/inngest/inngestgo" +) + +func serve(t *testing.T, h inngestgo.Handler) (*httptest.Server, func() error) { + server := httptest.NewServer(h) + + sync := func() error { + t.Helper() + req, err := http.NewRequest(http.MethodPut, server.URL, nil) + if err != nil { + return err + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + byt, _ := io.ReadAll(resp.Body) + if resp.StatusCode > 299 { + return fmt.Errorf("invalid status code: %d (%s)", resp.StatusCode, byt) + } + _ = resp.Body.Close() + return nil + } + + return server, sync +} From 88bf4707ae37e6d5cb49d0662ff2c6c5abe6544e Mon Sep 17 00:00:00 2001 From: Aaron Harper <aaron@inngest.com> Date: Wed, 18 Sep 2024 15:04:00 -0400 Subject: [PATCH 2/2] Handle unexpected panic --- experimental/group/group.go | 10 +- handler_test.go | 1 + tests/parallel_test.go | 255 ++++++++++++++++++++++-------------- tests/utils.go | 70 ++++++++++ 4 files changed, 234 insertions(+), 102 deletions(-) diff --git a/experimental/group/group.go b/experimental/group/group.go index 2ee4c744..89ee9fc5 100644 --- a/experimental/group/group.go +++ b/experimental/group/group.go @@ -2,7 +2,6 @@ package group import ( "context" - "fmt" "github.com/inngest/inngestgo/step" ) @@ -21,6 +20,7 @@ func Parallel( results := []Result{} isPlanned := false ch := make(chan struct{}, 1) + var unexpectedPanic any for _, fn := range fns { fn := fn go func(fn func(ctx context.Context) (any, error)) { @@ -29,8 +29,7 @@ func Parallel( if _, ok := r.(step.ControlHijack); ok { isPlanned = true } else { - // TODO: What to do here? - fmt.Println("TODO") + unexpectedPanic = r } } ch <- struct{}{} @@ -42,6 +41,11 @@ func Parallel( <-ch } + if unexpectedPanic != nil { + // Repanic to let our normal panic recovery handle it + panic(unexpectedPanic) + } + if isPlanned { panic(step.ControlHijack{}) } diff --git a/handler_test.go b/handler_test.go index 1847cb2c..b82e3358 100644 --- a/handler_test.go +++ b/handler_test.go @@ -282,6 +282,7 @@ func TestInvoke(t *testing.T) { actual, op, err := invoke( ctx, a, createRequest(t, EventA{Name: "my-event"}), + nil, ) r.Nil(actual) r.Nil(op) diff --git a/tests/parallel_test.go b/tests/parallel_test.go index b2ee5630..3a325b1d 100644 --- a/tests/parallel_test.go +++ b/tests/parallel_test.go @@ -17,106 +17,163 @@ func TestParallel(t *testing.T) { t.Skip() } - ctx := context.Background() - r := require.New(t) - - state := struct { - invokedFnCounter int - step1ACounter int - step1BCounter int - stepAfterCounter int - parallelResults []group.Result - }{} - - appName := "test" - h := inngestgo.NewHandler(appName, inngestgo.HandlerOpts{}) - - fn1 := inngestgo.CreateFunction( - inngestgo.FunctionOpts{ - ID: "my-fn-1", - Name: "my-fn-1", - }, - inngestgo.EventTrigger("dummy", nil), - func(ctx context.Context, input inngestgo.Input[any]) (any, error) { - state.invokedFnCounter++ - return "invoked output", nil - }, - ) - - eventName := "my-event" - fn2 := inngestgo.CreateFunction( - inngestgo.FunctionOpts{ - ID: "my-fn-2", - Name: "my-fn-2", - }, - inngestgo.EventTrigger(eventName, nil), - func(ctx context.Context, input inngestgo.Input[any]) (any, error) { - state.parallelResults = group.Parallel( - ctx, - func(ctx context.Context) (any, error) { - return step.Invoke[any](ctx, "invoke", step.InvokeOpts{ - FunctionId: fmt.Sprintf("%s-%s", appName, fn1.Config().ID), - }) - }, - func(ctx context.Context) (any, error) { - return step.Run(ctx, "1a", func(ctx context.Context) (int, error) { - state.step1ACounter++ - return 1, nil - }) - }, - func(ctx context.Context) (any, error) { - return step.Run(ctx, "1b", func(ctx context.Context) (int, error) { - state.step1BCounter++ - return 2, nil - }) - }, - func(ctx context.Context) (any, error) { - step.Sleep(ctx, "sleep", time.Second) + t.Run("successful with a mix of step kinds", func(t *testing.T) { + ctx := context.Background() + r := require.New(t) + + state := struct { + invokedFnCounter int + step1ACounter int + step1BCounter int + stepAfterCounter int + parallelResults []group.Result + }{} + + appName := randomSuffix("TestParallel") + h := inngestgo.NewHandler(appName, inngestgo.HandlerOpts{}) + + fn1 := inngestgo.CreateFunction( + inngestgo.FunctionOpts{ + ID: "my-fn-1", + Name: "my-fn-1", + }, + inngestgo.EventTrigger("dummy", nil), + func(ctx context.Context, input inngestgo.Input[any]) (any, error) { + state.invokedFnCounter++ + return "invoked output", nil + }, + ) + + eventName := randomSuffix("my-event") + fn2 := inngestgo.CreateFunction( + inngestgo.FunctionOpts{ + ID: "my-fn-2", + Name: "my-fn-2", + }, + inngestgo.EventTrigger(eventName, nil), + func(ctx context.Context, input inngestgo.Input[any]) (any, error) { + state.parallelResults = group.Parallel( + ctx, + func(ctx context.Context) (any, error) { + return step.Invoke[any](ctx, "invoke", step.InvokeOpts{ + FunctionId: fmt.Sprintf("%s-%s", appName, fn1.Config().ID), + }) + }, + func(ctx context.Context) (any, error) { + return step.Run(ctx, "1a", func(ctx context.Context) (int, error) { + state.step1ACounter++ + return 1, nil + }) + }, + func(ctx context.Context) (any, error) { + return step.Run(ctx, "1b", func(ctx context.Context) (int, error) { + state.step1BCounter++ + return 2, nil + }) + }, + func(ctx context.Context) (any, error) { + step.Sleep(ctx, "sleep", time.Second) + return nil, nil + }, + func(ctx context.Context) (any, error) { + return step.WaitForEvent[any](ctx, "wait", step.WaitForEventOpts{ + Event: "never", + Timeout: time.Second, + }) + }, + ) + + _, err := step.Run(ctx, "after", func(ctx context.Context) (any, error) { + state.stepAfterCounter++ return nil, nil - }, - func(ctx context.Context) (any, error) { - return step.WaitForEvent[any](ctx, "wait", step.WaitForEventOpts{ - Event: "never", - Timeout: time.Second, - }) - }, - ) - - _, err := step.Run(ctx, "after", func(ctx context.Context) (any, error) { - state.stepAfterCounter++ + }) + if err != nil { + return nil, err + } + return nil, nil - }) - if err != nil { - return nil, err - } - - return nil, nil - }, - ) - - h.Register(fn1, fn2) - - server, sync := serve(t, h) - defer server.Close() - r.NoError(sync()) - - _, err := inngestgo.Send(ctx, inngestgo.Event{ - Name: eventName, - Data: map[string]any{"foo": "bar"}}, - ) - r.NoError(err) - - r.Eventually(func() bool { - return state.stepAfterCounter == 1 - }, 5*time.Second, 10*time.Millisecond) - r.Equal(1, state.invokedFnCounter) - r.Equal(1, state.step1ACounter) - r.Equal(1, state.step1BCounter) - r.Equal(state.parallelResults, []group.Result{ - {Value: "invoked output"}, - {Value: 1}, - {Value: 2}, - {Value: nil}, - {Error: step.ErrEventNotReceived}, + }, + ) + + h.Register(fn1, fn2) + + server, sync := serve(t, h) + defer server.Close() + r.NoError(sync()) + + _, err := inngestgo.Send(ctx, inngestgo.Event{ + Name: eventName, + Data: map[string]any{"foo": "bar"}}, + ) + r.NoError(err) + + r.Eventually(func() bool { + return state.stepAfterCounter == 1 + }, 5*time.Second, 10*time.Millisecond) + r.Equal(1, state.invokedFnCounter) + r.Equal(1, state.step1ACounter) + r.Equal(1, state.step1BCounter) + r.Equal(state.parallelResults, []group.Result{ + {Value: "invoked output"}, + {Value: 1}, + {Value: 2}, + {Value: nil}, + {Error: step.ErrEventNotReceived}, + }) + }) + + t.Run("panic", func(t *testing.T) { + ctx := context.Background() + r := require.New(t) + + appName := randomSuffix("TestParallel") + h := inngestgo.NewHandler(appName, inngestgo.HandlerOpts{}) + + var runID string + eventName := randomSuffix("my-event") + fn := inngestgo.CreateFunction( + inngestgo.FunctionOpts{ + ID: "my-fn", + Name: "my-fn", + Retries: inngestgo.IntPtr(0), + }, + inngestgo.EventTrigger(eventName, nil), + func(ctx context.Context, input inngestgo.Input[any]) (any, error) { + runID = input.InputCtx.RunID + + group.Parallel( + ctx, + func(ctx context.Context) (any, error) { + return step.Run(ctx, "1a", func(ctx context.Context) (int, error) { + return 1, nil + }) + }, + func(ctx context.Context) (any, error) { + return step.Run(ctx, "1b", func(ctx context.Context) (int, error) { + panic("oops") + }) + }, + ) + + return nil, nil + }, + ) + + h.Register(fn) + + server, sync := serve(t, h) + defer server.Close() + r.NoError(sync()) + + _, err := inngestgo.Send(ctx, inngestgo.Event{ + Name: eventName, + Data: map[string]any{"foo": "bar"}}, + ) + r.NoError(err) + + run, err := waitForRun(&runID, StatusFailed) + r.NoError(err) + r.Nil(run.Output.Data) + r.Contains(run.Output.Error, "function panicked: oops") }) } diff --git a/tests/utils.go b/tests/utils.go index 03ab6a49..82f16297 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -1,15 +1,85 @@ package tests import ( + "encoding/json" "fmt" "io" "net/http" "net/http/httptest" "testing" + "time" + "github.com/google/uuid" "github.com/inngest/inngestgo" ) +const ( + StatusFailed = "Failed" +) + +type Run struct { + Output struct { + Data any `json:"data"` + Error any `json:"error"` + } + Status string `json:"status"` +} + +func getRun(id string) (*Run, error) { + res, err := http.Get(fmt.Sprintf("http://localhost:8288/v1/runs/%s", id)) + if err != nil { + return nil, err + } + defer res.Body.Close() + + byt, err := io.ReadAll(res.Body) + if err != nil { + return nil, err + } + + var body struct { + Data Run `json:"data"` + } + err = json.Unmarshal(byt, &body) + if err != nil { + return nil, err + } + + return &body.Data, nil +} + +func waitForRun(id *string, status string) (*Run, error) { + start := time.Now() + timeout := 5 * time.Second + + for { + if time.Now().After(start.Add(timeout)) { + break + } + + if id == nil || *id != "" { + run, err := getRun(*id) + if err != nil { + return nil, err + } + if run.Status == status { + return run, nil + } + } + <-time.After(100 * time.Millisecond) + } + + if id == nil || *id == "" { + return nil, fmt.Errorf("run ID is empty") + } + + return nil, fmt.Errorf("run did not reach status %s", status) +} + +func randomSuffix(s string) string { + return s + uuid.NewString() +} + func serve(t *testing.T, h inngestgo.Handler) (*httptest.Server, func() error) { server := httptest.NewServer(h)