Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RunTask does not properly remove / update the executed task #51

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions emulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

. "cloud.google.com/go/cloudtasks/apiv2"
. "github.com/aertje/cloud-tasks-emulator"
"github.com/golang/protobuf/ptypes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/api/iterator"
Expand Down Expand Up @@ -602,6 +603,125 @@ func TestOIDCAuthenticatedTaskExecution(t *testing.T) {
assert.Equal(t, "http://localhost:8980", claims.Issuer, "Specifies issuer")
}

func TestRunTaskRunsAndDeletesSuccessfulTask(t *testing.T) {
serv, client := setUp(t, ServerOptions{})
defer tearDown(t, serv)

createdQueue := createTestQueue(t, client)
defer tearDownQueue(t, client, createdQueue)

srv, receivedRequests := startTestServer()
defer srv.Shutdown(context.Background())

inTwoSeconds, _ := ptypes.TimestampProto(time.Now().Add(2 * time.Second))
createTaskRequest := taskspb.CreateTaskRequest{
Parent: createdQueue.GetName(),
Task: &taskspb.Task{
Name: createdQueue.GetName() + "/tasks/deferred-task",
MessageType: &taskspb.Task_HttpRequest{
HttpRequest: &taskspb.HttpRequest{
Url: "http://localhost:5000/success",
},
},
ScheduleTime: inTwoSeconds,
},
}
createdTask, err := client.CreateTask(context.Background(), &createTaskRequest)
require.NoError(t, err)

// Task was created OK, verify it hasn't been sent yet
_, err = awaitHttpRequestWithTimeout(receivedRequests, 100*time.Millisecond)
assert.Error(t, err, "Deferred task does not send HTTP request immediately")

// Manually run the task
runTaskRequest := taskspb.RunTaskRequest{
Name: createdTask.GetName(),
}
_, err = client.RunTask(context.Background(), &runTaskRequest)
require.NoError(t, err)

// Verify the task runs essentially immediately - needs to be short enough timeout that it can't
// fire even if the task runs at the scheduled time
_, err = awaitHttpRequestWithTimeout(receivedRequests, 300*time.Millisecond)
require.NoError(t, err)

// As soon as the task runs, it should have been removed from the task list and not available to get
assertTaskListIsEmpty(t, client, createdQueue)
assertGetTaskFails(t, grpcCodes.FailedPrecondition, client, createdTask.GetName())

// Finally, verify that the `Run` moved it early - it doesn't still fire at the scheduled time
_, err = awaitHttpRequestWithTimeout(receivedRequests, 3*time.Second)
assert.Error(t, err, "Should not receive any further HTTP requests for this task")
}

func TestRunTaskRunsAndRetriesFailingTask(t *testing.T) {
serv, client := setUp(t, ServerOptions{})
defer tearDown(t, serv)

createdQueue := createTestQueue(t, client)
defer tearDownQueue(t, client, createdQueue)

srv, receivedRequests := startTestServer()
defer srv.Shutdown(context.Background())

// Schedule it far enough ahead it can't fire on the schedule during this test
inOneMinute, _ := ptypes.TimestampProto(time.Now().Add(1 * time.Minute))
createTaskRequest := taskspb.CreateTaskRequest{
Parent: createdQueue.GetName(),
Task: &taskspb.Task{
Name: createdQueue.GetName() + "/tasks/error-task",
MessageType: &taskspb.Task_HttpRequest{
HttpRequest: &taskspb.HttpRequest{
Url: "http://localhost:5000/not_found",
},
},
ScheduleTime: inOneMinute,
},
}
createdTask, err := client.CreateTask(context.Background(), &createTaskRequest)
require.NoError(t, err)

// Task was created OK, verify it hasn't been sent yet
_, err = awaitHttpRequestWithTimeout(receivedRequests, 100*time.Millisecond)
assert.Error(t, err, "Deferred task does not send HTTP request immediately")

// Manually run the task
runTaskRequest := taskspb.RunTaskRequest{
Name: createdTask.GetName(),
}
_, err = client.RunTask(context.Background(), &runTaskRequest)
require.NoError(t, err)

// Verify the task runs when called
receivedRequest, err := awaitHttpRequest(receivedRequests)
require.NoError(t, err)
log.Println("Got 1 request")
assertHeadersMatch(
t,
map[string]string{
"X-CloudTasks-TaskExecutionCount": "0",
"X-CloudTasks-TaskRetryCount": "0",
},
receivedRequest,
)

// Verify the task is retried immediately
receivedRequest, err = awaitHttpRequest(receivedRequests)
require.NoError(t, err)
log.Println("Got 2 requests")
assertHeadersMatch(
t,
map[string]string{
"X-CloudTasks-TaskExecutionCount": "1",
"X-CloudTasks-TaskRetryCount": "1",
},
receivedRequest,
)

// In an ideal world, we'd have a test server endpoint that could fail for a bit then succeed
// to verify the state once the retrying has stopped...
}

func newQueue(formattedParent, name string) *taskspb.Queue {
return &taskspb.Queue{Name: formatQueueName(formattedParent, name)}
}
Expand Down
53 changes: 35 additions & 18 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type Task struct {

cancel chan bool

isDone bool

onDone func(*Task)

stateMutex sync.Mutex
Expand All @@ -71,6 +73,7 @@ func NewTask(queue *Queue, taskState *tasks.Task, onDone func(task *Task)) *Task
task := &Task{
queue: queue,
state: taskState,
isDone: false,
onDone: onDone,
cancel: make(chan bool, 1), // Buffered in case cancel comes when task is not scheduled
}
Expand Down Expand Up @@ -267,26 +270,31 @@ func updateStateAfterDispatch(task *Task, statusCode int) *tasks.Task {
return frozenTaskState
}

func (task *Task) reschedule(retry bool, statusCode int) {
func (task *Task) markDone() {
task.stateMutex.Lock()
defer task.stateMutex.Unlock()
task.isDone = true
task.onDone(task)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be included inside the lock?

}

func (task *Task) reschedule(statusCode int) {
if statusCode >= 200 && statusCode <= 299 {
log.Println("Task done")
task.onDone(task)
task.markDone()
} else {
log.Println("Task exec error with status " + strconv.Itoa(statusCode))
if retry {
retryConfig := task.queue.state.GetRetryConfig()
retryConfig := task.queue.state.GetRetryConfig()

if task.state.DispatchCount >= retryConfig.GetMaxAttempts() {
log.Println("Ran out of attempts")
} else {
updateStateForReschedule(task)
task.Schedule()
}
if task.state.DispatchCount >= retryConfig.GetMaxAttempts() {
log.Println("Ran out of attempts")
} else {
updateStateForReschedule(task)
task.Schedule()
}
}
}

func dispatch(retry bool, taskState *tasks.Task) int {
func dispatch(taskState *tasks.Task) int {
client := &http.Client{}
client.Timeout, _ = ptypes.Duration(taskState.GetDispatchDeadline())

Expand Down Expand Up @@ -360,26 +368,31 @@ func dispatch(retry bool, taskState *tasks.Task) int {
return resp.StatusCode
}

func (task *Task) doDispatch(retry bool) {
respCode := dispatch(retry, task.state)
func (task *Task) doDispatch() {
respCode := dispatch(task.state)

updateStateAfterDispatch(task, respCode)
task.reschedule(retry, respCode)
task.reschedule(respCode)
}

// Attempt tries to execute a task
func (task *Task) Attempt() {
updateStateForDispatch(task)

task.doDispatch(true)
task.doDispatch()
}

// Run runs the task outside of the normal queueing mechanism.
// This method is called directly by request.
func (task *Task) Run() *tasks.Task {
// Update the schedule time so that retries are calculated correctly
task.stateMutex.Lock()
task.state.ScheduleTime = ptypes.TimestampNow()
task.stateMutex.Unlock()

taskState := updateStateForDispatch(task)

go task.doDispatch(false)
go task.doDispatch()

return taskState
}
Expand All @@ -402,10 +415,14 @@ func (task *Task) Schedule() {
go func() {
select {
case <-time.After(fromNow):
task.queue.fire <- task
// It's possible the task might already be `done` if e.g. it was called by RunTask before the originally
// scheduled execution time. In that case this is a no-op, otherwise, run the task.
if !task.isDone {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get how this check avoids that problem - what if runtask is called at exactly this point?

task.queue.fire <- task
}
return
case <-task.cancel:
task.onDone(task)
task.markDone()
return
}
}()
Expand Down