Skip to content

Commit

Permalink
unify error handling; drop internal coroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
erickpintor committed Apr 24, 2024
1 parent 84d64e8 commit 34b4d4b
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 115 deletions.
10 changes: 2 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,19 +302,13 @@ func (c *Client) Paginate(fql *Query, opts ...QueryOptFn) *QueryIterator {
}

// Subscribe initiates a stream subscription for the given stream value.
func (c *Client) Subscribe(stream Stream) (*Subscription, error) {
func (c *Client) Subscribe(stream Stream) (*Events, error) {
streamReq := streamRequest{
apiRequest: apiRequest{c.ctx, c.headers},
Stream: stream,
}

if byteStream, err := streamReq.do(c); err == nil {
sub := &Subscription{
events: make(chan *Event),
byteStream: byteStream,
}
go sub.consume()
return sub, nil
return newEvents(byteStream), nil
} else {
return nil, err
}
Expand Down
14 changes: 6 additions & 8 deletions client_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,11 @@ func ExampleClient_Subscribe() {
}

// initiate the stream subscription
subscription, err := client.Subscribe(stream)
events, err := client.Subscribe(stream)
if err != nil {
log.Fatalf("failed to subscribe to the stream value: %s", err)
}
defer subscription.Close()
defer events.Close()

// produce some events while the subscription is open
createQuery, _ := fauna.FQL(`StreamingSandbox.create({ foo: 'bar' })`, nil)
Expand All @@ -373,16 +373,14 @@ func ExampleClient_Subscribe() {
Foo string `fauna:"foo"`
}

events := subscription.Events()
expect := 3

for expect > 0 {
event := <-events
if event == nil {
break
event, err := events.Next()
if err != nil {
log.Fatalf("failed to receive next event: %s", err)
}
switch event.Type {
case "add", "update", "remove":
case fauna.AddEvent, fauna.UpdateEvent, fauna.RemoveEvent:
var data Data
if err := event.Unmarshal(&data); err != nil {
log.Fatalf("failed to unmarshal event data: %s", err)
Expand Down
164 changes: 88 additions & 76 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,35 @@ import (
"io"
)

// EventType represents a Fauna's event type.
type EventType string

const (
// AddEvent happens when a new value is added to the stream's watched set.
AddEvent EventType = "add"
// UpdateEvent happens when a value in the stream's watched set changes.
UpdateEvent EventType = "update"
// Remove event happens when a value in the stream's watched set is removed.
RemoveEvent EventType = "remove"
// StatusEvent happens periodically and comunicates the stream's latest
// transacion time as well as ops aquired during its idle period.
StatusEvent EventType = "status"
)

// Event represents a streaming event.
//
// All events contain the [fauna.Event.Type] and [fauna.Event.Stats] fields.
//
// Events of type "add", "update", and "remove" will contain the
// [fauna.Event.Data] field with the event's data in it. Data events have their
// [fauna.Event.Error] field set to nil. Data events can be umarmshalled into a
// user-defined struct via the [fauna.Event.Unmarshal] method.
//
// Events of type "status" and "error" will have their [fauna.Event.Data] field
// set to nil. Error events contain the [fauna.Event.Error] field present with
// the underlying error information.
// Events of type [fauna.StatusEvent] have its [fauna.Event.Data] field set to
// nil. Other event's [fauna.Data] can be unmarshalled via the
// [fauna.Event.Unmarshal] method.
type Event struct {
// Type is this event's type.
Type string `json:"type"`

Type EventType
// TxnTime is the transaction time that produce this event.
TxnTime int64 `json:"txn_ts,omitempty"`

// Data is the event's data. Data is set to nil if the Type field is set to
// "status" or "error".
Data any `json:"data,omitempty"`

// Error contains error information when the event Type is set to "error".
Error *ErrEvent `json:"error,omitempty"`

TxnTime int64
// Data is the event's data.
Data any
// Stats contains the ops acquired to process the event.
Stats Stats `json:"stats"`
Stats Stats
}

// Unmarshal will unmarshal the raw [fauna.Event.Data] (if present) into the
Expand All @@ -53,8 +54,14 @@ type ErrEvent struct {
// Message is the error's message.
Message string `json:"message"`

// Abort is the error's abort data, present if Code == "abort".
Abort any `json:"abort"`
// Abort is the error's abort data, present if [fauna.ErrEvent.Code] is
// equals to "abort".
Abort any `json:"abort,ommitempty"`
}

// Error provides the underlying error message.
func (e *ErrEvent) Error() string {
return e.Message
}

// Unmarshal will unmarshal the raw [fauna.ErrEvent.Abort] (if present) into the
Expand All @@ -63,71 +70,76 @@ func (e *ErrEvent) Unmarshal(into any) error {
return decodeInto(e.Abort, into)
}

// Subscription is a Fauna stream subscription.
//
// Events can be obtained by reading from the [fauna.Subscription.Events]
// channel. Note that the events channel emits a nil event on closing.
// Events is an iterator of Fauna events.
//
// If the subscription gets closed unexpectedly, its closing error can be
// retrieved via the [fauna.Subscription.Error] method.
//
// A stream subscription can be gracefully closed via the
// [fauna.Subscription.Close] method.
type Subscription struct {
// The next available event can be obtained by calling the
// [fauna.Subscription.Next] method. Note this method blocks until the next
// event is available or until the events iterator is closed via the
// [fauna.Events.Close] method.
type Events struct {
byteStream io.ReadCloser
events chan *Event
error error
closed bool
decoder *json.Decoder
}

// Events return the subscription's events channel.
func (s *Subscription) Events() <-chan *Event { return s.events }

// Error returns the subscription's closing error, if any.
func (s *Subscription) Error() error { return s.error }
func newEvents(byteStream io.ReadCloser) *Events {
return &Events{
byteStream: byteStream,
decoder: json.NewDecoder(byteStream),
}
}

// Close gracefully closes the stream subscription.
func (s *Subscription) Close() (err error) {
if !s.closed {
s.closed = true
err = s.byteStream.Close()
}
return
func (es *Events) Close() (err error) {
// XXX: Is there a way to make sure there are no bytes left on the stream
// after closing it? According to go's docs, the underlying connection will
// remain unusable for the duration of its idle time if there are bytes left
// in its read buffer.
return es.byteStream.Close()
}

func (s *Subscription) consume() {
defer close(s.events)
decoder := json.NewDecoder(s.byteStream)

for {
event := &Event{}
if err := decoder.Decode(event); err != nil {
// NOTE: When closing the stream, a network error may occur as due
// to its socket closing while the json decoder is blocked reading
// it. Errors to close the socket are already emitted by the Close()
// method, therefore, we don't want to propagate them here again.
if !s.closed {
s.error = err
}
break
}
if err := convertEvent(event); err != nil {
s.error = err
break
type rawEvent = struct {
Type EventType `json:"type"`
TxnTime int64 `json:"txm_time"`
Data any `json:"data,ommitempty"`
Error *ErrEvent `json:"error,ommitempty"`
Stats Stats `json:"stats"`
}

// Next blocks until the next event is available.
//
// Note that network errors of type [fauna.ErrEvent] are considered fatal and
// close the underlying stream. Calling next after an error event occurs will
// return an error.
func (es *Events) Next() (event *Event, err error) {
raw := rawEvent{}
if err = es.decoder.Decode(&raw); err == nil {
event, err = convertRawEvent(&raw)
if _, ok := err.(*ErrEvent); ok {
es.Close() // no more events are comming
}
s.events <- event
}
return
}

func convertEvent(event *Event) (err error) {
if event.Data != nil {
if event.Data, err = convert(false, event.Data); err != nil {
return
func convertRawEvent(raw *rawEvent) (event *Event, err error) {
if raw.Error != nil {
if raw.Error.Abort != nil {
if raw.Error.Abort, err = convert(false, raw.Error.Abort); err != nil {
return
}
}
}
if event.Error != nil && event.Error.Abort != nil {
if event.Error.Abort, err = convert(false, event.Error.Abort); err != nil {
return
err = raw.Error
} else {
if raw.Data != nil {
if raw.Data, err = convert(false, raw.Data); err != nil {
return
}
}
event = &Event{
Type: raw.Type,
TxnTime: raw.TxnTime,
Data: raw.Data,
Stats: raw.Stats,
}
}
return
Expand Down
48 changes: 25 additions & 23 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,33 +35,32 @@ func TestStreaming(t *testing.T) {
var stream fauna.Stream
require.NoError(t, res.Unmarshal(&stream))

sub, err := client.Subscribe(stream)
events, err := client.Subscribe(stream)
require.NoError(t, err)
defer sub.Close()
defer events.Close()

event := <-sub.Events()
require.NotNil(t, event)
require.Equal(t, event.Type, "status")
event, err := events.Next()
require.NoError(t, err)
require.Equal(t, event.Type, fauna.StatusEvent)

createQ, _ := fauna.FQL(`StreamingTest.create({ foo: 'bar' })`, nil)
_, err = client.Query(createQ)
require.NoError(t, err)

event = <-sub.Events()
require.NotNil(t, event)
require.Equal(t, event.Type, "add")
event, err = events.Next()
require.NoError(t, err)
require.Equal(t, event.Type, fauna.AddEvent)

var doc TestDoc
require.NoError(t, event.Unmarshal(&doc))
require.Equal(t, doc.Foo, "bar")

require.NoError(t, sub.Close())
require.NoError(t, sub.Error())
require.NoError(t, events.Close())
})

t.Run("Handle subscription errors", func(t *testing.T) {
_, err := client.Subscribe(fauna.Stream("abc1234=="))
events, err := client.Subscribe(fauna.Stream("abc1234=="))
require.IsType(t, err, &fauna.ErrInvalidRequest{})
require.Nil(t, events)
})

t.Run("Handle error events", func(t *testing.T) {
Expand All @@ -72,27 +71,30 @@ func TestStreaming(t *testing.T) {
var stream fauna.Stream
require.NoError(t, res.Unmarshal(&stream))

sub, err := client.Subscribe(stream)
events, err := client.Subscribe(stream)
require.NoError(t, err)
defer sub.Close()
defer events.Close()

event := <-sub.Events()
require.NotNil(t, event)
require.Equal(t, event.Type, "status")
event, err := events.Next()
require.NoError(t, err)
require.Equal(t, event.Type, fauna.StatusEvent)

createQ, _ := fauna.FQL(`StreamingTest.create({ foo: 'bar' })`, nil)
_, err = client.Query(createQ)
require.NoError(t, err)

event = <-sub.Events()
require.NotNil(t, event)
require.Equal(t, event.Type, "error")
require.Equal(t, event.Error.Code, "abort")
require.Equal(t, event.Error.Message, "Query aborted.")
event, err = events.Next()
require.IsType(t, err, &fauna.ErrEvent{})
require.Nil(t, event)

evErr := err.(*fauna.ErrEvent)
require.Equal(t, evErr.Code, "abort")
require.Equal(t, evErr.Message, "Query aborted.")

var msg string
require.NoError(t, event.Error.Unmarshal(&msg))
require.NoError(t, evErr.Unmarshal(&msg))
require.Equal(t, msg, "oops")
require.NoError(t, events.Close())
})
})
}

0 comments on commit 34b4d4b

Please sign in to comment.