Skip to content

Commit

Permalink
feat: timeline service keeps events in memory (#3618)
Browse files Browse the repository at this point in the history
closes #3626
Very simple implementation to be begin with.
Allows:
- creating events
- getting events with super simple filters
- clearing out old events
  • Loading branch information
matt2e authored Dec 4, 2024
1 parent d84c2af commit 6707213
Show file tree
Hide file tree
Showing 10 changed files with 851 additions and 77 deletions.
431 changes: 367 additions & 64 deletions backend/protos/xyz/block/ftl/timeline/v1/timeline.pb.go

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions backend/protos/xyz/block/ftl/timeline/v1/timeline.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,22 @@ message GetTimelineResponse {
repeated timeline.v1.Event events = 1;
}

message CreateEventRequest {
oneof entry {
LogEvent log = 1;
CallEvent call = 2;
DeploymentCreatedEvent deployment_created = 3;
DeploymentUpdatedEvent deployment_updated = 4;
IngressEvent ingress = 5;
CronScheduledEvent cron_scheduled = 6;
AsyncExecuteEvent async_execute = 7;
PubSubPublishEvent pubsub_publish = 8;
PubSubConsumeEvent pubsub_consume = 9;
}
}

message CreateEventResponse {}

message DeleteOldEventsRequest {
timeline.v1.EventType event_type = 1;
int64 age_seconds = 2;
Expand All @@ -39,6 +55,8 @@ service TimelineService {
option idempotency_level = NO_SIDE_EFFECTS;
}

rpc CreateEvent(CreateEventRequest) returns (CreateEventResponse) {}

// Delete old events of a specific type
rpc DeleteOldEvents(DeleteOldEventsRequest) returns (DeleteOldEventsResponse) {}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 68 additions & 0 deletions backend/timeline/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package timeline

import (
"fmt"
"slices"

timelinepb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1"
)

func filter(event *timelinepb.Event, depKey string, eventTypes []timelinepb.EventType) bool {
if !slices.Contains(eventTypes, eventType(event)) {
return false
}
if depKey != "" && depKey != deploymentKey(event) {
return false
}
return true
}

func eventType(event *timelinepb.Event) timelinepb.EventType {
switch event.Entry.(type) {
case *timelinepb.Event_Log:
return timelinepb.EventType_EVENT_TYPE_LOG
case *timelinepb.Event_Call:
return timelinepb.EventType_EVENT_TYPE_CALL
case *timelinepb.Event_DeploymentCreated:
return timelinepb.EventType_EVENT_TYPE_DEPLOYMENT_CREATED
case *timelinepb.Event_DeploymentUpdated:
return timelinepb.EventType_EVENT_TYPE_DEPLOYMENT_UPDATED
case *timelinepb.Event_Ingress:
return timelinepb.EventType_EVENT_TYPE_INGRESS
case *timelinepb.Event_CronScheduled:
return timelinepb.EventType_EVENT_TYPE_CRON_SCHEDULED
case *timelinepb.Event_AsyncExecute:
return timelinepb.EventType_EVENT_TYPE_ASYNC_EXECUTE
case *timelinepb.Event_PubsubPublish:
return timelinepb.EventType_EVENT_TYPE_PUBSUB_PUBLISH
case *timelinepb.Event_PubsubConsume:
return timelinepb.EventType_EVENT_TYPE_PUBSUB_CONSUME
default:
panic(fmt.Sprintf("unexpected event type: %T", event.Entry))
}
}

func deploymentKey(event *timelinepb.Event) string {
switch entry := event.Entry.(type) {
case *timelinepb.Event_Log:
return entry.Log.DeploymentKey
case *timelinepb.Event_Call:
return entry.Call.DeploymentKey
case *timelinepb.Event_DeploymentCreated:
return entry.DeploymentCreated.Key
case *timelinepb.Event_DeploymentUpdated:
return entry.DeploymentUpdated.Key
case *timelinepb.Event_Ingress:
return entry.Ingress.DeploymentKey
case *timelinepb.Event_CronScheduled:
return entry.CronScheduled.DeploymentKey
case *timelinepb.Event_AsyncExecute:
return entry.AsyncExecute.DeploymentKey
case *timelinepb.Event_PubsubPublish:
return entry.PubsubPublish.DeploymentKey
case *timelinepb.Event_PubsubConsume:
return entry.PubsubConsume.DeploymentKey
default:
panic(fmt.Sprintf("unexpected event type: %T", event.Entry))
}
}
93 changes: 89 additions & 4 deletions backend/timeline/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"context"
"fmt"
"net/url"
"sync"
"time"

"connectrpc.com/connect"
"github.com/alecthomas/kong"
"google.golang.org/protobuf/types/known/timestamppb"

timelinepb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1"
timelineconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1/timelinev1connect"
Expand All @@ -27,7 +30,9 @@ func (c *Config) SetDefaults() {
}

type service struct {
// TODO: add timeline schema view? (whatever is needed per the current timeline tables)
lock sync.RWMutex
nextID int
events []*timelinepb.Event
}

func Start(ctx context.Context, config Config, schemaEventSource schemaeventsource.EventSource) error {
Expand All @@ -47,13 +52,93 @@ func Start(ctx context.Context, config Config, schemaEventSource schemaeventsour
}

func (s *service) Ping(ctx context.Context, req *connect.Request[ftlv1.PingRequest]) (*connect.Response[ftlv1.PingResponse], error) {
panic("not implemented")
return connect.NewResponse(&ftlv1.PingResponse{}), nil
}

func (s *service) CreateEvent(ctx context.Context, req *connect.Request[timelinepb.CreateEventRequest]) (*connect.Response[timelinepb.CreateEventResponse], error) {
s.lock.Lock()
defer s.lock.Unlock()

event := &timelinepb.Event{
Id: int64(s.nextID),
TimeStamp: timestamppb.Now(),
}
switch entry := req.Msg.Entry.(type) {
case *timelinepb.CreateEventRequest_Log:
event.Entry = &timelinepb.Event_Log{
Log: entry.Log,
}
case *timelinepb.CreateEventRequest_Call:
event.Entry = &timelinepb.Event_Call{
Call: entry.Call,
}
case *timelinepb.CreateEventRequest_DeploymentCreated:
event.Entry = &timelinepb.Event_DeploymentCreated{
DeploymentCreated: entry.DeploymentCreated,
}
case *timelinepb.CreateEventRequest_DeploymentUpdated:
event.Entry = &timelinepb.Event_DeploymentUpdated{
DeploymentUpdated: entry.DeploymentUpdated,
}
case *timelinepb.CreateEventRequest_Ingress:
event.Entry = &timelinepb.Event_Ingress{
Ingress: entry.Ingress,
}
case *timelinepb.CreateEventRequest_CronScheduled:
event.Entry = &timelinepb.Event_CronScheduled{
CronScheduled: entry.CronScheduled,
}
case *timelinepb.CreateEventRequest_AsyncExecute:
event.Entry = &timelinepb.Event_AsyncExecute{
AsyncExecute: entry.AsyncExecute,
}
case *timelinepb.CreateEventRequest_PubsubPublish:
event.Entry = &timelinepb.Event_PubsubPublish{
PubsubPublish: entry.PubsubPublish,
}
case *timelinepb.CreateEventRequest_PubsubConsume:
event.Entry = &timelinepb.Event_PubsubConsume{
PubsubConsume: entry.PubsubConsume,
}
}
s.events = append(s.events, event)
s.nextID++
return connect.NewResponse(&timelinepb.CreateEventResponse{}), nil
}

func (s *service) GetTimeline(ctx context.Context, req *connect.Request[timelinepb.GetTimelineRequest]) (*connect.Response[timelinepb.GetTimelineResponse], error) {
panic("not implemented")
s.lock.RLock()
defer s.lock.RUnlock()

results := []*timelinepb.Event{}
// TODO: handle sinceId
for i := len(s.events) - 1; i >= 0; i-- {
event := s.events[i]
if !filter(event, req.Msg.DeploymentKey, req.Msg.EventTypes) {
continue
}
results = append(results, s.events[i])
if req.Msg.Limit != nil && *req.Msg.Limit != 0 && len(results) >= int(*req.Msg.Limit) {
break
}
}
return connect.NewResponse(&timelinepb.GetTimelineResponse{
Events: results,
}), nil
}

func (s *service) DeleteOldEvents(ctx context.Context, req *connect.Request[timelinepb.DeleteOldEventsRequest]) (*connect.Response[timelinepb.DeleteOldEventsResponse], error) {
panic("not implemented")
s.lock.Lock()
defer s.lock.Unlock()

cutoff := time.Now().Add(-1 * time.Duration(req.Msg.AgeSeconds) * time.Second)
filtered := []*timelinepb.Event{}
for _, event := range s.events {
if event.TimeStamp.AsTime().Before(cutoff) && (req.Msg.EventType == timelinepb.EventType_EVENT_TYPE_UNSPECIFIED || req.Msg.EventType != eventType(event)) {
continue
}
filtered = append(filtered, event)
}
s.events = filtered
return connect.NewResponse(&timelinepb.DeleteOldEventsResponse{}), nil
}
Loading

0 comments on commit 6707213

Please sign in to comment.