Skip to content

Commit

Permalink
test(timeout): add test for timeout (#344)
Browse files Browse the repository at this point in the history
  • Loading branch information
steebchen authored Apr 10, 2024
1 parent 32aebd9 commit 2f7483a
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
.DS_Store
.env
*.env
docker/.env
.envrc
*.pem
app
!frontend/app
Expand Down
48 changes: 48 additions & 0 deletions examples/timeout/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package main

import (
"fmt"
"time"

"github.com/joho/godotenv"

"github.com/hatchet-dev/hatchet/pkg/worker"
)

type userCreateEvent struct {
Username string `json:"username"`
UserID string `json:"user_id"`
Data map[string]string `json:"data"`
}

type stepOneOutput struct {
Message string `json:"message"`
}

func main() {
err := godotenv.Load()
if err != nil {
panic(err)
}

events := make(chan string, 50)
cleanup, err := run(events, worker.WorkflowJob{
Name: "timeout",
Description: "timeout",
Steps: []*worker.WorkflowStep{
worker.Fn(func(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
time.Sleep(time.Second * 60)
return nil, nil
}).SetName("step-one").SetTimeout("10s"),
},
})
if err != nil {
panic(err)
}

<-events

if err := cleanup(); err != nil {
panic(fmt.Errorf("cleanup() error = %v", err))
}
}
82 changes: 82 additions & 0 deletions examples/timeout/main_e2e_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//go:build e2e

package main

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/hatchet-dev/hatchet/internal/testutils"
"github.com/hatchet-dev/hatchet/pkg/worker"
)

func TestTimeout(t *testing.T) {
testutils.Prepare(t)

tests := []struct {
name string
job func(done func()) worker.WorkflowJob
}{
{
name: "step timeout",
job: func(done func()) worker.WorkflowJob {
return worker.WorkflowJob{
Name: "timeout",
Description: "timeout",
Steps: []*worker.WorkflowStep{
worker.Fn(func(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
select {
case <-time.After(time.Second * 30):
return &stepOneOutput{
Message: "finished",
}, nil
case <-ctx.Done():
done()
return nil, nil
}
}).SetName("step-one").SetTimeout("10s"),
},
}
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

events := make(chan string, 50)

cleanup, err := run(events, tt.job(func() {
events <- "done"
}))
if err != nil {
t.Fatalf("run() error = %s", err)
}

var items []string

outer:
for {
select {
case item := <-events:
items = append(items, item)
case <-ctx.Done():
break outer
}
}

assert.Equal(t, []string{
"done", // cancellation signal
"done", // test check
}, items)

if err := cleanup(); err != nil {
t.Fatalf("cleanup() error = %s", err)
}
})
}
}
109 changes: 109 additions & 0 deletions examples/timeout/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package main

import (
"context"
"fmt"
"log"
"time"

"github.com/hatchet-dev/hatchet/internal/repository/prisma/db"
"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/worker"
)

func run(done chan<- string, job worker.WorkflowJob) (func() error, error) {
c, err := client.New()
if err != nil {
return nil, fmt.Errorf("error creating client: %w", err)
}

w, err := worker.NewWorker(
worker.WithClient(
c,
),
)
if err != nil {
return nil, fmt.Errorf("error creating worker: %w", err)
}

err = w.On(
worker.Events("user:create:timeout"),
&job,
)
if err != nil {
return nil, fmt.Errorf("error registering workflow: %w", err)
}

go func() {
log.Printf("pushing event")

testEvent := userCreateEvent{
Username: "echo-test",
UserID: "1234",
Data: map[string]string{
"test": "test",
},
}

// push an event
err := c.Event().Push(
context.Background(),
"user:create:timeout",
testEvent,
)
if err != nil {
panic(fmt.Errorf("error pushing event: %w", err))
}

time.Sleep(20 * time.Second)

client := db.NewClient()
if err := client.Connect(); err != nil {
panic(fmt.Errorf("error connecting to database: %w", err))
}
defer client.Disconnect()

// TODO check for the database status

events, err := client.Event.FindMany(
db.Event.TenantID.Equals(c.TenantId()),
db.Event.Key.Equals("user:create:timeout"),
).With(
db.Event.WorkflowRuns.Fetch().With(
db.WorkflowRunTriggeredBy.Parent.Fetch().With(
db.WorkflowRun.JobRuns.Fetch().With(
db.JobRun.StepRuns.Fetch(),
),
),
),
).Exec(context.Background())
if err != nil {
panic(fmt.Errorf("error finding events: %w", err))
}

for _, event := range events {
for _, workflowRun := range event.WorkflowRuns() {
for _, jobRuns := range workflowRun.Parent().JobRuns() {
for _, stepRun := range jobRuns.StepRuns() {
if stepRun.Status != db.StepRunStatusCancelled {
panic(fmt.Errorf("expected step run to be failed, got %s", stepRun.Status))
}
reason, _ := stepRun.CancelledReason()
if reason != "TIMED_OUT" {
panic(fmt.Errorf("expected step run to be failed, got %s", reason))
}
}
}
}
}

done <- "done"
}()

cleanup, err := w.Start()
if err != nil {
return nil, fmt.Errorf("error starting worker: %w", err)
}

return cleanup, nil
}

0 comments on commit 2f7483a

Please sign in to comment.