Skip to content

Commit

Permalink
fix: correctly unmarshal feed events
Browse files Browse the repository at this point in the history
We weren't converting our tagged format feed events into the Event shape intended to be consumed. This change makes use of our already existing stream conversion logic to convert the events to the expected shape.
  • Loading branch information
fauna-chase committed Jan 9, 2025
1 parent f0e1686 commit a37ac22
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 1 deletion.
27 changes: 26 additions & 1 deletion event_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,41 @@ type FeedPage struct {
Stats Stats `json:"stats"`
}

// internalFeedPage represents what comes back from the wire from the feed API. We do further processing on the
// events that come back to create the FeedPage returned from [fauna.EventFeed.Next]
type internalFeedPage struct {
Events []rawEvent `json:"events"`
Cursor string `json:"cursor"`
HasNext bool `json:"has_next"`
Stats Stats `json:"stats"`
}

// Next retrieves the next FeedPage from the [fauna.EventFeed]
func (ef *EventFeed) Next(page *FeedPage) error {
if err := ef.open(); err != nil {
return err
}

if err := ef.decoder.Decode(&page); err != nil {
var internalPage internalFeedPage
if err := ef.decoder.Decode(&internalPage); err != nil {
return err
}

parsedEvents := make([]Event, len(internalPage.Events))
for i, rv := range internalPage.Events {
var parsedEvent Event
err := convertRawEvent(&rv, &parsedEvent)
if err != nil {
return err
}
parsedEvents[i] = parsedEvent
}

page.Events = parsedEvents
page.HasNext = internalPage.HasNext
page.Stats = internalPage.Stats
page.Cursor = internalPage.Cursor

ef.lastCursor = page.Cursor

// preserve page size
Expand Down
48 changes: 48 additions & 0 deletions event_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,54 @@ func TestEventFeed(t *testing.T) {
require.True(t, didPaginate, "expected to have called for multiple event pages")
require.Equal(t, end-start, seenEvents, "unexpected number of events")
})

t.Run("can unmarshall events", func(t *testing.T) {
resetCollection(t, client)

createEvent := func(v string) {
createOneQuery, createOneQueryErr := fauna.FQL(`EventFeedTest.create({ foo: ${v} })`, map[string]any{"v": v})
require.NoError(t, createOneQueryErr, "failed to init query for create statement")
require.NotNil(t, createOneQuery, "create statement is nil")
_, createOneErr := client.Query(createOneQuery)
require.NoError(t, createOneErr, "failed to create a document")
}

query, queryErr := fauna.FQL(`EventFeedTest.all().eventSource()`, nil)
require.NoError(t, queryErr, "failed to create a query for EventSource")

feed, feedErr := client.FeedFromQuery(query)
require.NoError(t, feedErr, "failed to init events feed")

createEvent("bar")
createEvent("baz")
createEvent("bak")

type TestEvent struct {
Foo string `fauna:"foo"`
}
var (
page fauna.FeedPage
testEvent TestEvent
)
events := make([]TestEvent, 0, 3)

for {
eventsErr := feed.Next(&page)
require.NoError(t, eventsErr, "failed to get page of events")

for _, event := range page.Events {
err := event.Unmarshal(&testEvent)
require.NoError(t, err, "error unmarshalling event")
events = append(events, testEvent)
}

if !page.HasNext {
break
}
}

require.Equal(t, []TestEvent{TestEvent{Foo: "bar"}, TestEvent{Foo: "baz"}, TestEvent{Foo: "bak"}}, events)
})
}

func resetCollection(t *testing.T, client *fauna.Client) {
Expand Down

0 comments on commit a37ac22

Please sign in to comment.