Skip to content

Commit

Permalink
feat: timeline events for async calls (#3278)
Browse files Browse the repository at this point in the history
Adds timeline events for async dequeues and errors.

Closes #3282

<img width="1037" alt="Screenshot 2024-10-31 at 3 05 07 PM"
src="https://github.com/user-attachments/assets/1291f8fc-16ee-40f0-bcad-2a443ef558db">
  • Loading branch information
safeer authored Oct 31, 2024
1 parent be8d02f commit 4083751
Show file tree
Hide file tree
Showing 23 changed files with 1,338 additions and 609 deletions.
38 changes: 38 additions & 0 deletions backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,8 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timeline.TimelineFilter
eventTypes = append(eventTypes, timeline.EventTypeIngress)
case pbconsole.EventType_EVENT_TYPE_CRON_SCHEDULED:
eventTypes = append(eventTypes, timeline.EventTypeCronScheduled)
case pbconsole.EventType_EVENT_TYPE_ASYNC_EXECUTE:
eventTypes = append(eventTypes, timeline.EventTypeAsyncExecute)
default:
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unknown event type %v", eventType))
}
Expand Down Expand Up @@ -791,6 +793,42 @@ func eventDALToProto(event timeline.Event) *pbconsole.Event {
},
}

case *timeline.AsyncExecuteEvent:
var requestKey *string
if rstr, ok := event.RequestKey.Get(); ok {
requestKey = &rstr
}

var asyncEventType pbconsole.AsyncExecuteEventType
switch event.EventType {
case timeline.AsyncExecuteEventTypeUnkown:
asyncEventType = pbconsole.AsyncExecuteEventType_ASYNC_EXECUTE_EVENT_TYPE_UNKNOWN
case timeline.AsyncExecuteEventTypeCron:
asyncEventType = pbconsole.AsyncExecuteEventType_ASYNC_EXECUTE_EVENT_TYPE_CRON
case timeline.AsyncExecuteEventTypeFSM:
asyncEventType = pbconsole.AsyncExecuteEventType_ASYNC_EXECUTE_EVENT_TYPE_FSM
case timeline.AsyncExecuteEventTypePubSub:
asyncEventType = pbconsole.AsyncExecuteEventType_ASYNC_EXECUTE_EVENT_TYPE_PUBSUB
}

return &pbconsole.Event{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.Event_AsyncExecute{
AsyncExecute: &pbconsole.AsyncExecuteEvent{
DeploymentKey: event.DeploymentKey.String(),
RequestKey: requestKey,
AsyncEventType: asyncEventType,
VerbRef: &schemapb.Ref{
Module: event.Verb.Module,
Name: event.Verb.Name,
},
Duration: durationpb.New(event.Duration),
Error: event.Error.Ptr(),
},
},
}

default:
panic(fmt.Errorf("unknown event type %T", event))
}
Expand Down
37 changes: 37 additions & 0 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,39 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
logger := log.FromContext(ctx)
logger.Tracef("Acquiring async call")

now := time.Now().UTC()
sstate := s.schemaState.Load()

enqueueTimelineEvent := func(call *dal.AsyncCall, err optional.Option[error]) {
module := call.Verb.Module
route, ok := sstate.routes[module]
if ok {
eventType := timeline.AsyncExecuteEventTypeUnkown
switch call.Origin.(type) {
case async.AsyncOriginCron:
eventType = timeline.AsyncExecuteEventTypeCron
case async.AsyncOriginPubSub:
eventType = timeline.AsyncExecuteEventTypePubSub
case *async.AsyncOriginPubSub:
eventType = timeline.AsyncExecuteEventTypePubSub
default:
break
}
errStr := optional.None[string]()
if e, ok := err.Get(); ok {
errStr = optional.Some(e.Error())
}
s.timeline.EnqueueEvent(ctx, &timeline.AsyncExecute{
DeploymentKey: route.Deployment,
RequestKey: call.ParentRequestKey,
EventType: eventType,
Verb: *call.Verb.ToRef(),
Time: now,
Error: errStr,
})
}
}

call, leaseCtx, err := s.dal.AcquireAsyncCall(ctx)
if errors.Is(err, libdal.ErrNotFound) {
logger.Tracef("No async calls to execute")
Expand All @@ -1190,6 +1223,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
observability.AsyncCalls.AcquireFailed(ctx, err)
} else {
observability.AsyncCalls.Acquired(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, call.Catching, err)
enqueueTimelineEvent(call, optional.Some(err))
}
return 0, err
}
Expand All @@ -1201,6 +1235,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
ctx, err = observability.ExtractTraceContextToContext(ctx, call.TraceContext)
if err != nil {
observability.AsyncCalls.Acquired(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, call.Catching, err)
enqueueTimelineEvent(call, optional.Some(err))
return 0, fmt.Errorf("failed to extract trace context: %w", err)
}

Expand Down Expand Up @@ -1270,6 +1305,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
})
if err != nil {
observability.AsyncCalls.Completed(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, false, queueDepth, err)
enqueueTimelineEvent(call, optional.Some(err))
return 0, fmt.Errorf("failed to complete async call: %w", err)
}
if !didScheduleAnotherCall {
Expand All @@ -1278,6 +1314,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
queueDepth = call.QueueDepth - 1
}
observability.AsyncCalls.Completed(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, false, queueDepth, nil)
enqueueTimelineEvent(call, optional.None[error]())
return 0, nil
}

Expand Down
3 changes: 2 additions & 1 deletion backend/controller/dal/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, leaseCtx c

lease, leaseCtx := d.leaser.NewLease(ctx, row.LeaseKey, row.LeaseIdempotencyKey, ttl)
return &AsyncCall{
ID: row.AsyncCallID,
ID: row.AsyncCallID,

Verb: row.Verb,
Origin: origin,
CatchVerb: row.CatchVerb,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- migrate:up

ALTER TYPE event_type ADD VALUE IF NOT EXISTS 'async_execute';

-- migrate:down
88 changes: 88 additions & 0 deletions backend/controller/timeline/events_async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package timeline

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/alecthomas/types/optional"

ftlencryption "github.com/TBD54566975/ftl/backend/controller/encryption/api"
"github.com/TBD54566975/ftl/backend/controller/timeline/internal/sql"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/schema"
)

type AsyncExecuteEvent struct {
ID int64
Duration time.Duration
AsyncExecute
}

func (e *AsyncExecuteEvent) GetID() int64 { return e.ID }
func (e *AsyncExecuteEvent) event() {}

type AsyncExecuteEventType string

const (
AsyncExecuteEventTypeUnkown AsyncExecuteEventType = "unknown"
AsyncExecuteEventTypeCron AsyncExecuteEventType = "cron"
AsyncExecuteEventTypeFSM AsyncExecuteEventType = "fsm"
AsyncExecuteEventTypePubSub AsyncExecuteEventType = "pubsub"
)

type AsyncExecute struct {
DeploymentKey model.DeploymentKey
RequestKey optional.Option[string]
EventType AsyncExecuteEventType
Verb schema.Ref
Time time.Time
Error optional.Option[string]
}

func (e *AsyncExecute) toEvent() (Event, error) { //nolint:unparam
return &AsyncExecuteEvent{
AsyncExecute: *e,
Duration: time.Since(e.Time),
}, nil
}

type eventAsyncExecuteJSON struct {
DurationMS int64 `json:"duration_ms"`
EventType AsyncExecuteEventType `json:"event_type"`
Error optional.Option[string] `json:"error,omitempty"`
}

func (s *Service) insertAsyncExecuteEvent(ctx context.Context, querier sql.Querier, event *AsyncExecuteEvent) error {
asyncJSON := eventAsyncExecuteJSON{
DurationMS: event.Duration.Milliseconds(),
EventType: event.EventType,
Error: event.Error,
}

data, err := json.Marshal(asyncJSON)
if err != nil {
return fmt.Errorf("failed to marshal async execute event: %w", err)
}

var payload ftlencryption.EncryptedTimelineColumn
err = s.encryption.EncryptJSON(json.RawMessage(data), &payload)
if err != nil {
return fmt.Errorf("failed to encrypt cron JSON: %w", err)
}

err = libdal.TranslatePGError(querier.InsertTimelineAsyncExecuteEvent(ctx, sql.InsertTimelineAsyncExecuteEventParams{
DeploymentKey: event.DeploymentKey,
RequestKey: event.RequestKey,
TimeStamp: event.Time,
Module: event.Verb.Module,
Verb: event.Verb.Name,
Payload: payload,
}))
if err != nil {
return fmt.Errorf("failed to insert async execute event: %w", err)
}
return err
}
1 change: 1 addition & 0 deletions backend/controller/timeline/internal/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/controller/timeline/internal/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions backend/controller/timeline/internal/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,29 @@ VALUES (
sqlc.arg('payload')
);

-- name: InsertTimelineAsyncExecuteEvent :exec
INSERT INTO timeline (
deployment_id,
request_id,
time_stamp,
type,
custom_key_1,
custom_key_2,
payload
)
VALUES (
(SELECT id FROM deployments d WHERE d.key = sqlc.arg('deployment_key')::deployment_key LIMIT 1),
(CASE
WHEN sqlc.narg('request_key')::TEXT IS NULL THEN NULL
ELSE (SELECT id FROM requests ir WHERE ir.key = sqlc.narg('request_key')::TEXT)
END),
sqlc.arg('time_stamp')::TIMESTAMPTZ,
'async_execute',
sqlc.arg('module')::TEXT,
sqlc.arg('verb')::TEXT,
sqlc.arg('payload')
);

-- name: DeleteOldTimelineEvents :one
WITH deleted AS (
DELETE FROM timeline
Expand Down
45 changes: 45 additions & 0 deletions backend/controller/timeline/internal/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 27 additions & 3 deletions backend/controller/timeline/internal/timeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestTimeline(t *testing.T) {
Path: "dir/filename",
}}, nil)
assert.NoError(t, err)
time.Sleep(200 * time.Millisecond)
})

t.Run("SetDeploymentReplicas", func(t *testing.T) {
Expand Down Expand Up @@ -162,6 +163,29 @@ func TestTimeline(t *testing.T) {
time.Sleep(200 * time.Millisecond)
})

asyncEvent := &timeline2.AsyncExecuteEvent{
AsyncExecute: timeline2.AsyncExecute{
DeploymentKey: deploymentKey,
RequestKey: optional.Some(requestKey.String()),
EventType: timeline2.AsyncExecuteEventTypeCron,
Verb: schema.Ref{Module: "time", Name: "time"},
Time: time.Now().Round(time.Millisecond),
Error: optional.None[string](),
},
}

t.Run("InsertAsyncExecuteEvent", func(t *testing.T) {
timeline.EnqueueEvent(ctx, &timeline2.AsyncExecute{
DeploymentKey: asyncEvent.DeploymentKey,
RequestKey: asyncEvent.RequestKey,
EventType: asyncEvent.EventType,
Verb: asyncEvent.Verb,
Time: asyncEvent.Time,
Error: asyncEvent.Error,
})
time.Sleep(200 * time.Millisecond)
})

expectedDeploymentUpdatedEvent := &timeline2.DeploymentUpdatedEvent{
DeploymentKey: deploymentKey,
MinReplicas: 1,
Expand All @@ -177,13 +201,13 @@ func TestTimeline(t *testing.T) {
t.Run("NoFilters", func(t *testing.T) {
events, err := timeline.QueryTimeline(ctx, 1000)
assert.NoError(t, err)
assertEventsEqual(t, []timeline2.Event{expectedDeploymentUpdatedEvent, callEvent, logEvent, ingressEvent, cronEvent}, events)
assertEventsEqual(t, []timeline2.Event{expectedDeploymentUpdatedEvent, callEvent, logEvent, ingressEvent, cronEvent, asyncEvent}, events)
})

t.Run("ByDeployment", func(t *testing.T) {
events, err := timeline.QueryTimeline(ctx, 1000, timeline2.FilterDeployments(deploymentKey))
assert.NoError(t, err)
assertEventsEqual(t, []timeline2.Event{expectedDeploymentUpdatedEvent, callEvent, logEvent, ingressEvent, cronEvent}, events)
assertEventsEqual(t, []timeline2.Event{expectedDeploymentUpdatedEvent, callEvent, logEvent, ingressEvent, cronEvent, asyncEvent}, events)
})

t.Run("ByCall", func(t *testing.T) {
Expand Down Expand Up @@ -213,7 +237,7 @@ func TestTimeline(t *testing.T) {
t.Run("ByRequests", func(t *testing.T) {
events, err := timeline.QueryTimeline(ctx, 1000, timeline2.FilterRequests(requestKey))
assert.NoError(t, err)
assertEventsEqual(t, []timeline2.Event{callEvent, logEvent, ingressEvent}, events)
assertEventsEqual(t, []timeline2.Event{callEvent, logEvent, ingressEvent, asyncEvent}, events)
})
})
}
Expand Down
Loading

0 comments on commit 4083751

Please sign in to comment.