From d6624d52dfa25d8361773a9de522f5cdbece7d17 Mon Sep 17 00:00:00 2001 From: Erick Pintor Date: Wed, 24 Apr 2024 09:43:37 -0300 Subject: [PATCH] introduce syntax sugar for simple streams --- client.go | 19 +++++++++++ client_example_test.go | 72 ++++++++++++++++++++++++++++++++++++++++-- stream_test.go | 20 ++++++++++++ 3 files changed, 109 insertions(+), 2 deletions(-) diff --git a/client.go b/client.go index 9040d31..39078bb 100644 --- a/client.go +++ b/client.go @@ -301,6 +301,25 @@ func (c *Client) Paginate(fql *Query, opts ...QueryOptFn) *QueryIterator { } } +// Stream initiates a stream subscription for the [fauna.Query]. +// +// This is a syntax sugar for [fauna.Client.Query] and [fauna.Client.Subscribe]. +// +// Note that the query provided MUST return [fauna.Stream] value. Otherwise, +// this method returns an error. +func (c *Client) Stream(fql *Query, opts ...QueryOptFn) (*Events, error) { + res, err := c.Query(fql, opts...) + if err != nil { + return nil, err + } + + if stream, ok := res.Data.(Stream); ok { + return c.Subscribe(stream) + } + + return nil, fmt.Errorf("expected query to return a fauna.Stream but got %T", res.Data) +} + // Subscribe initiates a stream subscription for the given stream value. func (c *Client) Subscribe(stream Stream, opts ...StreamOptFn) (*Events, error) { req := streamRequest{ diff --git a/client_example_test.go b/client_example_test.go index b8d4f67..855dd4c 100644 --- a/client_example_test.go +++ b/client_example_test.go @@ -318,6 +318,74 @@ func ExampleClient_Paginate() { // Output: 20 } +func ExampleClient_Stream() { + // IMPORTANT: just for the purpose of example, don't actually hardcode secret + _ = os.Setenv(fauna.EnvFaunaSecret, "secret") + _ = os.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) + + client, err := fauna.NewDefaultClient() + if err != nil { + log.Fatalf("client should have been initialized: %s", err) + } + + // setup a collection + setupQuery, _ := fauna.FQL(` + if (!Collection.byName('StreamingSandbox').exists()) { + Collection.create({ name: 'StreamingSandbox' }) + } else { + StreamingSandbox.all().forEach(.delete()) + } + `, nil) + if _, err := client.Query(setupQuery); err != nil { + log.Fatalf("failed to setup the collection: %s", err) + } + + // create a stream + streamQuery, _ := fauna.FQL(`StreamingSandbox.all().toStream()`, nil) + events, err := client.Stream(streamQuery) + if err != nil { + log.Fatalf("failed to subscribe to the stream value: %s", err) + } + defer events.Close() + + // produce some events while the subscription is open + createQuery, _ := fauna.FQL(`StreamingSandbox.create({ foo: 'bar' })`, nil) + updateQuery, _ := fauna.FQL(`StreamingSandbox.all().forEach(.update({ foo: 'baz' }))`, nil) + deleteQuery, _ := fauna.FQL(`StreamingSandbox.all().forEach(.delete())`, nil) + + queries := []*fauna.Query{createQuery, updateQuery, deleteQuery} + for _, query := range queries { + if _, err := client.Query(query); err != nil { + log.Fatalf("failed execute CRUD query: %s", err) + } + } + + // fetch the produced events + type Data struct { + Foo string `fauna:"foo"` + } + + expect := 3 + for expect > 0 { + event, err := events.Next() + if err != nil { + log.Fatalf("failed to receive next event: %s", err) + } + switch event.Type { + case fauna.AddEvent, fauna.UpdateEvent, fauna.RemoveEvent: + var data Data + if err := event.Unmarshal(&data); err != nil { + log.Fatalf("failed to unmarshal event data: %s", err) + } + fmt.Printf("Event: %s Data: %+v\n", event.Type, data) + expect-- + } + } + // Output: Event: add Data: {Foo:bar} + // Event: update Data: {Foo:baz} + // Event: remove Data: {Foo:baz} +} + func ExampleClient_Subscribe() { // IMPORTANT: just for the purpose of example, don't actually hardcode secret _ = os.Setenv(fauna.EnvFaunaSecret, "secret") @@ -331,9 +399,9 @@ func ExampleClient_Subscribe() { // setup a collection setupQuery, _ := fauna.FQL(` if (!Collection.byName('StreamingSandbox').exists()) { - Collection.create({ name: 'StreamingSandbox' }) + Collection.create({ name: 'StreamingSandbox' }) } else { - StreamingSandbox.all().forEach(.delete()) + StreamingSandbox.all().forEach(.delete()) } `, nil) if _, err := client.Query(setupQuery); err != nil { diff --git a/stream_test.go b/stream_test.go index 0da230c..b43d970 100644 --- a/stream_test.go +++ b/stream_test.go @@ -26,6 +26,26 @@ func TestStreaming(t *testing.T) { Foo string `fauna:"foo"` } + t.Run("single-step streaming", func(t *testing.T) { + t.Run("Stream events", func(t *testing.T) { + streamQ, _ := fauna.FQL(`StreamingTest.all().toStream()`, nil) + events, err := client.Stream(streamQ) + require.NoError(t, err) + defer events.Close() + + event, err := events.Next() + require.NoError(t, err) + require.Equal(t, event.Type, fauna.StatusEvent) + }) + + t.Run("Fails on non-streamable values", func(t *testing.T) { + streamQ, _ := fauna.FQL(`"I'm a string"`, nil) + events, err := client.Stream(streamQ) + require.ErrorContains(t, err, "expected query to return a fauna.Stream but got string") + require.Nil(t, events) + }) + }) + t.Run("multi-step streaming", func(t *testing.T) { t.Run("Stream events", func(t *testing.T) { streamQ, _ := fauna.FQL(`StreamingTest.all().toStream()`, nil)