Skip to content

Commit

Permalink
Add test to prove RunTask doesn't delete tasks or retry as expected
Browse files Browse the repository at this point in the history
Per documentation for RunTask
https://cloud.google.com/tasks/docs/reference/rpc/google.cloud.tasks.v2#google.cloud.tasks.v2.CloudTasks.RunTask

> If Cloud Tasks receives a successful response from the task's
> target, then the task will be deleted; otherwise the task's
> schedule_time will be reset to the time that RunTask was called
> plus the retry delay specified in the queue's RetryConfig.

In other words, it essentially internally works the same as
if you'd set `ScheduleTime` to `now` (albeit that it also
bypasses rate limits / queue pause state / etc).

However the emulator is treating RunTask as a separate
execution - the task will still run at the original
scheduled time, albeit it immediately disappears from
ListTasks / GetTasks.

And additionally, the emulator is not retrying if a task
triggered by RunTasks fails.
  • Loading branch information
acoulton committed Sep 13, 2021
1 parent 821fa3e commit 0453b91
Showing 1 changed file with 120 additions and 0 deletions.
120 changes: 120 additions & 0 deletions emulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

. "cloud.google.com/go/cloudtasks/apiv2"
. "github.com/aertje/cloud-tasks-emulator"
"github.com/golang/protobuf/ptypes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/api/iterator"
Expand Down Expand Up @@ -602,6 +603,125 @@ func TestOIDCAuthenticatedTaskExecution(t *testing.T) {
assert.Equal(t, "http://localhost:8980", claims.Issuer, "Specifies issuer")
}

func TestRunTaskRunsAndDeletesSuccessfulTask(t *testing.T) {
serv, client := setUp(t, ServerOptions{})
defer tearDown(t, serv)

createdQueue := createTestQueue(t, client)
defer tearDownQueue(t, client, createdQueue)

srv, receivedRequests := startTestServer()
defer srv.Shutdown(context.Background())

inTwoSeconds, _ := ptypes.TimestampProto(time.Now().Add(2 * time.Second))
createTaskRequest := taskspb.CreateTaskRequest{
Parent: createdQueue.GetName(),
Task: &taskspb.Task{
Name: createdQueue.GetName() + "/tasks/deferred-task",
MessageType: &taskspb.Task_HttpRequest{
HttpRequest: &taskspb.HttpRequest{
Url: "http://localhost:5000/success",
},
},
ScheduleTime: inTwoSeconds,
},
}
createdTask, err := client.CreateTask(context.Background(), &createTaskRequest)
require.NoError(t, err)

// Task was created OK, verify it hasn't been sent yet
_, err = awaitHttpRequestWithTimeout(receivedRequests, 100*time.Millisecond)
assert.Error(t, err, "Deferred task does not send HTTP request immediately")

// Manually run the task
runTaskRequest := taskspb.RunTaskRequest{
Name: createdTask.GetName(),
}
_, err = client.RunTask(context.Background(), &runTaskRequest)
require.NoError(t, err)

// Verify the task runs essentially immediately - needs to be short enough timeout that it can't
// fire even if the task runs at the scheduled time
_, err = awaitHttpRequestWithTimeout(receivedRequests, 300*time.Millisecond)
require.NoError(t, err)

// As soon as the task runs, it should have been removed from the task list and not available to get
assertTaskListIsEmpty(t, client, createdQueue)
assertGetTaskFails(t, grpcCodes.FailedPrecondition, client, createdTask.GetName())

// Finally, verify that the `Run` moved it early - it doesn't still fire at the scheduled time
_, err = awaitHttpRequestWithTimeout(receivedRequests, 3*time.Second)
assert.Error(t, err, "Should not receive any further HTTP requests for this task")
}

func TestRunTaskRunsAndRetriesFailingTask(t *testing.T) {
serv, client := setUp(t, ServerOptions{})
defer tearDown(t, serv)

createdQueue := createTestQueue(t, client)
defer tearDownQueue(t, client, createdQueue)

srv, receivedRequests := startTestServer()
defer srv.Shutdown(context.Background())

// Schedule it far enough ahead it can't fire on the schedule during this test
inOneMinute, _ := ptypes.TimestampProto(time.Now().Add(1 * time.Minute))
createTaskRequest := taskspb.CreateTaskRequest{
Parent: createdQueue.GetName(),
Task: &taskspb.Task{
Name: createdQueue.GetName() + "/tasks/error-task",
MessageType: &taskspb.Task_HttpRequest{
HttpRequest: &taskspb.HttpRequest{
Url: "http://localhost:5000/not_found",
},
},
ScheduleTime: inOneMinute,
},
}
createdTask, err := client.CreateTask(context.Background(), &createTaskRequest)
require.NoError(t, err)

// Task was created OK, verify it hasn't been sent yet
_, err = awaitHttpRequestWithTimeout(receivedRequests, 100*time.Millisecond)
assert.Error(t, err, "Deferred task does not send HTTP request immediately")

// Manually run the task
runTaskRequest := taskspb.RunTaskRequest{
Name: createdTask.GetName(),
}
_, err = client.RunTask(context.Background(), &runTaskRequest)
require.NoError(t, err)

// Verify the task runs when called
receivedRequest, err := awaitHttpRequest(receivedRequests)
require.NoError(t, err)
log.Println("Got 1 request")
assertHeadersMatch(
t,
map[string]string{
"X-CloudTasks-TaskExecutionCount": "0",
"X-CloudTasks-TaskRetryCount": "0",
},
receivedRequest,
)

// Verify the task is retried immediately
receivedRequest, err = awaitHttpRequest(receivedRequests)
require.NoError(t, err)
log.Println("Got 2 requests")
assertHeadersMatch(
t,
map[string]string{
"X-CloudTasks-TaskExecutionCount": "1",
"X-CloudTasks-TaskRetryCount": "1",
},
receivedRequest,
)

// In an ideal world, we'd have a test server endpoint that could fail for a bit then succeed
// to verify the state once the retrying has stopped...
}

func newQueue(formattedParent, name string) *taskspb.Queue {
return &taskspb.Queue{Name: formatQueueName(formattedParent, name)}
}
Expand Down

0 comments on commit 0453b91

Please sign in to comment.