Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into moe/py-schema-extraction
Browse files Browse the repository at this point in the history
mistermoe authored Nov 6, 2024
2 parents 5e4f268 + fdaa8bb commit a7a69af
Showing 45 changed files with 2,274 additions and 971 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/smoketest.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
on:
pull_request:
push:
branches:
- main
@@ -40,6 +41,8 @@ jobs:
set -euo pipefail
echo "Deploying the tagged release to the cluster"
cd deployment && just deploy-version ${{ env.LATEST_VERSION }} && cd ..
- name: Build Language Plugins
run: just build-language-plugins
- name: Smoke test the tagged release images
run: |
set -euo pipefail
@@ -52,6 +55,8 @@ jobs:
fetch-depth: 1
- name: Init Hermit
uses: cashapp/activate-hermit@v1
- name: Build Language Plugins
run: just build-language-plugins
- name: Smoke test HEAD with a full deploy to test upgrade path
run: |
set -euo pipefail
57 changes: 57 additions & 0 deletions backend/controller/console/console.go
Original file line number Diff line number Diff line change
@@ -597,6 +597,10 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timeline.TimelineFilter
eventTypes = append(eventTypes, timeline.EventTypeCronScheduled)
case pbconsole.EventType_EVENT_TYPE_ASYNC_EXECUTE:
eventTypes = append(eventTypes, timeline.EventTypeAsyncExecute)
case pbconsole.EventType_EVENT_TYPE_PUBSUB_PUBLISH:
eventTypes = append(eventTypes, timeline.EventTypePubSubPublish)
case pbconsole.EventType_EVENT_TYPE_PUBSUB_CONSUME:
eventTypes = append(eventTypes, timeline.EventTypePubSubConsume)
default:
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unknown event type %v", eventType))
}
@@ -827,6 +831,59 @@ func eventDALToProto(event timeline.Event) *pbconsole.Event {
},
}

case *timeline.PubSubPublishEvent:
var requestKey *string
if r, ok := event.RequestKey.Get(); ok {
requestKey = &r
}

return &pbconsole.Event{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.Event_PubsubPublish{
PubsubPublish: &pbconsole.PubSubPublishEvent{
DeploymentKey: event.DeploymentKey.String(),
RequestKey: requestKey,
VerbRef: event.SourceVerb.ToProto().(*schemapb.Ref), //nolint:forcetypeassert
TimeStamp: timestamppb.New(event.Time),
Duration: durationpb.New(event.Duration),
Topic: event.Topic,
Request: string(event.Request),
Error: event.Error.Ptr(),
},
},
}

case *timeline.PubSubConsumeEvent:
var requestKey *string
if r, ok := event.RequestKey.Get(); ok {
requestKey = &r
}

var destVerbModule string
var destVerbName string
if destVerb, ok := event.DestVerb.Get(); ok {
destVerbModule = destVerb.Module
destVerbName = destVerb.Name
}

return &pbconsole.Event{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.Event_PubsubConsume{
PubsubConsume: &pbconsole.PubSubConsumeEvent{
DeploymentKey: event.DeploymentKey.String(),
RequestKey: requestKey,
DestVerbModule: &destVerbModule,
DestVerbName: &destVerbName,
TimeStamp: timestamppb.New(event.Time),
Duration: durationpb.New(event.Duration),
Topic: event.Topic,
Error: event.Error.Ptr(),
},
},
}

default:
panic(fmt.Errorf("unknown event type %T", event))
}
34 changes: 31 additions & 3 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
@@ -293,13 +293,12 @@ func New(
}
svc.schemaState.Store(schemaState{routes: map[string]Route{}, schema: &schema.Schema{}})

pubSub := pubsub.New(ctx, conn, encryption, optional.Some[pubsub.AsyncCallListener](svc))
svc.pubSub = pubSub

svc.registry = artefacts.New(conn)

timelineSvc := timeline.New(ctx, conn, encryption)
svc.timeline = timelineSvc
pubSub := pubsub.New(ctx, conn, encryption, optional.Some[pubsub.AsyncCallListener](svc), timelineSvc)
svc.pubSub = pubSub
cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryption, timelineSvc, conn)
svc.cronJobs = cronSvc
svc.dal = dal.New(ctx, conn, encryption, pubSub, cronSvc)
@@ -859,7 +858,36 @@ func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallReque

func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftlv1.PublishEventRequest]) (*connect.Response[ftlv1.PublishEventResponse], error) {
// Publish the event.
now := time.Now().UTC()
pubishError := optional.None[string]()
err := s.pubSub.PublishEventForTopic(ctx, req.Msg.Topic.Module, req.Msg.Topic.Name, req.Msg.Caller, req.Msg.Body)
if err != nil {
pubishError = optional.Some(err.Error())
}

requestKey := optional.None[string]()
if rk, err := rpc.RequestKeyFromContext(ctx); err == nil {
if rk, ok := rk.Get(); ok {
requestKey = optional.Some(rk.String())
}
}

// Add to timeline.
sstate := s.schemaState.Load()
module := req.Msg.Topic.Module
route, ok := sstate.routes[module]
if ok {
s.timeline.EnqueueEvent(ctx, &timeline.PubSubPublish{
DeploymentKey: route.Deployment,
RequestKey: requestKey,
Time: now,
SourceVerb: schema.Ref{Name: req.Msg.Caller, Module: req.Msg.Topic.Module},
Topic: req.Msg.Topic.Name,
Request: req.Msg,
Error: pubishError,
})
}

if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("failed to publish a event to topic %s:%s: %w", req.Msg.Topic.Module, req.Msg.Topic.Name, err))
}
2 changes: 1 addition & 1 deletion backend/controller/cronjobs/internal/cronjobs_test.go
Original file line number Diff line number Diff line change
@@ -47,7 +47,7 @@ func TestNewCronJobsForModule(t *testing.T) {
timelineSrv := timeline.New(ctx, conn, encryption)
cjs := cronjobs.NewForTesting(ctx, key, "test.com", encryption, timelineSrv, *dal, clk)

pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener](), timelineSrv)
parentDAL := parentdal.New(ctx, conn, encryption, pubSub, cjs)
moduleName := "initial"
jobsToCreate := newCronJobs(t, moduleName, "* * * * * *", clk, 2) // every minute
5 changes: 4 additions & 1 deletion backend/controller/dal/async_calls_test.go
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ import (
"github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/backend/controller/timeline"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
@@ -22,7 +23,9 @@ func TestNoCallToAcquire(t *testing.T) {
conn := sqltest.OpenForTesting(ctx, t)
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder())
assert.NoError(t, err)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())

timelineSvc := timeline.New(ctx, conn, encryption)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener](), timelineSvc)
dal := New(ctx, conn, encryption, pubSub, nil)

_, _, err = dal.AcquireAsyncCall(ctx)
8 changes: 4 additions & 4 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
@@ -33,8 +33,8 @@ func TestDAL(t *testing.T) {
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder())
assert.NoError(t, err)

pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())
timelineSrv := timeline.New(ctx, conn, encryption)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener](), timelineSrv)
key := model.NewControllerKey("localhost", "8081")
cjs := cronjobs.New(ctx, key, "test.com", encryption, timelineSrv, conn)
dal := New(ctx, conn, encryption, pubSub, cjs)
@@ -110,7 +110,7 @@ func TestDAL(t *testing.T) {
})

runnerID := model.NewRunnerKey("localhost", "8080")
labels := map[string]any{"languages": []any{"go"}}
labels := map[string]any{}

t.Run("RegisterRunner", func(t *testing.T) {
err = dal.UpsertRunner(ctx, dalmodel.Runner{
@@ -195,9 +195,9 @@ func TestCreateArtefactConflict(t *testing.T) {
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder())
assert.NoError(t, err)

pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener]())

timelineSrv := timeline.New(ctx, conn, encryption)
pubSub := pubsub.New(ctx, conn, encryption, optional.None[pubsub.AsyncCallListener](), timelineSrv)

key := model.NewControllerKey("localhost", "8081")
cjs := cronjobs.New(ctx, key, "test.com", encryption, timelineSrv, conn)
dal := New(ctx, conn, encryption, pubSub, cjs)
23 changes: 17 additions & 6 deletions backend/controller/dal/internal/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 32 additions & 6 deletions backend/controller/pubsub/internal/dal/dal.go
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ import (
"github.com/TBD54566975/ftl/backend/controller/observability"
dalsql "github.com/TBD54566975/ftl/backend/controller/pubsub/internal/sql"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltypes"
"github.com/TBD54566975/ftl/backend/controller/timeline"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
@@ -97,7 +98,7 @@ func (d *DAL) GetSubscriptionsNeedingUpdate(ctx context.Context) ([]model.Subscr
}), nil
}

func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay time.Duration) (count int, err error) {
func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay time.Duration, timelineSvc *timeline.Service) (count int, err error) {
tx, err := d.Begin(ctx)
if err != nil {
return 0, fmt.Errorf("failed to begin transaction: %w", err)
@@ -115,36 +116,58 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t

successful := 0
for _, subscription := range subs {
now := time.Now().UTC()
enqueueTimelineEvent := func(destVerb optional.Option[schema.RefKey], err optional.Option[string]) {
timelineSvc.EnqueueEvent(ctx, &timeline.PubSubConsume{
DeploymentKey: subscription.DeploymentKey,
RequestKey: subscription.RequestKey,
Time: now,
DestVerb: destVerb,
Topic: subscription.Topic.Payload.Name,
Error: err,
})
}

nextCursor, err := tx.db.GetNextEventForSubscription(ctx, sqltypes.Duration(eventConsumptionDelay), subscription.Topic, subscription.Cursor)
if err != nil {
observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.None[schema.RefKey]())
return 0, fmt.Errorf("failed to get next cursor: %w", libdal.TranslatePGError(err))
err = fmt.Errorf("failed to get next cursor: %w", libdal.TranslatePGError(err))
enqueueTimelineEvent(optional.None[schema.RefKey](), optional.Some(err.Error()))
return 0, err
}
payload, ok := nextCursor.Payload.Get()
if !ok {
observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription-->Payload.Get", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.None[schema.RefKey]())
return 0, fmt.Errorf("could not find payload to progress subscription: %w", libdal.TranslatePGError(err))
err = fmt.Errorf("could not find payload to progress subscription: %w", libdal.TranslatePGError(err))
enqueueTimelineEvent(optional.None[schema.RefKey](), optional.Some(err.Error()))
return 0, err
}
nextCursorKey, ok := nextCursor.Event.Get()
if !ok {
observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription-->Event.Get", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.None[schema.RefKey]())
return 0, fmt.Errorf("could not find event to progress subscription: %w", libdal.TranslatePGError(err))
err = fmt.Errorf("could not find event to progress subscription: %w", libdal.TranslatePGError(err))
enqueueTimelineEvent(optional.None[schema.RefKey](), optional.Some(err.Error()))
return 0, err
}
if !nextCursor.Ready {
logger.Tracef("Skipping subscription %s because event is too new", subscription.Key)
enqueueTimelineEvent(optional.None[schema.RefKey](), optional.Some(fmt.Sprintf("Skipping subscription %s because event is too new", subscription.Key)))
continue
}

subscriber, err := tx.db.GetRandomSubscriber(ctx, subscription.Key)
if err != nil {
logger.Tracef("no subscriber for subscription %s", subscription.Key)
enqueueTimelineEvent(optional.None[schema.RefKey](), optional.Some(fmt.Sprintf("no subscriber for subscription %s", subscription.Key)))
continue
}

err = tx.db.BeginConsumingTopicEvent(ctx, subscription.Key, nextCursorKey)
if err != nil {
observability.PubSub.PropagationFailed(ctx, "BeginConsumingTopicEvent", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.Some(subscriber.Sink))
return 0, fmt.Errorf("failed to progress subscription: %w", libdal.TranslatePGError(err))
err = fmt.Errorf("failed to progress subscription: %w", libdal.TranslatePGError(err))
enqueueTimelineEvent(optional.Some(subscriber.Sink), optional.Some(err.Error()))
return 0, err
}

origin := async.AsyncOriginPubSub{
@@ -169,10 +192,13 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t
observability.AsyncCalls.Created(ctx, subscriber.Sink, subscriber.CatchVerb, origin.String(), int64(subscriber.RetryAttempts), err)
if err != nil {
observability.PubSub.PropagationFailed(ctx, "CreateAsyncCall", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.Some(subscriber.Sink))
return 0, fmt.Errorf("failed to schedule async task for subscription: %w", libdal.TranslatePGError(err))
err = fmt.Errorf("failed to schedule async task for subscription: %w", libdal.TranslatePGError(err))
enqueueTimelineEvent(optional.Some(subscriber.Sink), optional.Some(err.Error()))
return 0, err
}

observability.PubSub.SinkCalled(ctx, subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), subscriber.Sink)
enqueueTimelineEvent(optional.Some(subscriber.Sink), optional.None[string]())
successful++
}

Loading

0 comments on commit a7a69af

Please sign in to comment.