Skip to content

Commit

Permalink
Move streaming to GA (#262)
Browse files Browse the repository at this point in the history
Co-authored-by: Paul Paterson <[email protected]>
Co-authored-by: Lucas Pedroza <[email protected]>
  • Loading branch information
3 people authored May 20, 2024
1 parent a54f4a9 commit b534d62
Show file tree
Hide file tree
Showing 29 changed files with 2,406 additions and 1,188 deletions.
146 changes: 135 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ See the [Fauna Documentation](https://docs.fauna.com/fauna/current/) for additio
- [Query options](#query-options)
- [Query statistics](#query-statistics)
- [Pagination](#pagination)
- [Event Streaming (beta)](#event-streaming-beta)
- [Client configuration](#client-configuration)
- [Environment variables](#environment-variables)
- [Retry](#retry)
Expand All @@ -29,6 +28,11 @@ See the [Fauna Documentation](https://docs.fauna.com/fauna/current/) for additio
- [Query timeout](#query-timeout)
- [Client timeout](#client-timeout)
- [HTTP/2 session idle timeout](#http2-session-idle-timeout)
- [Event Streaming](#event-streaming)
- [Start a stream](#start-a-stream)
- [Iterate on a stream](#iterate-on-a-stream)
- [Close a stream](#close-a-stream)
- [Stream options](#stream-options)
- [Contributing](#contributing)
- [Set up the repo](#set-up-the-repo)
- [Run tests](#run-tests)
Expand Down Expand Up @@ -312,16 +316,6 @@ for await (const products of pages) {
client.close();
```


## Event Streaming (beta)

[Event Streaming](https://docs.fauna.com/fauna/current/learn/streaming) is
currently available in the beta version of the driver:

- [Beta JavaScript driver](https://www.npmjs.com/package/fauna/v/1.4.0-beta.0)
- [Beta JavaScript driver docs](https://github.com/fauna/fauna-js/tree/beta)


## Client configuration

The driver's `Client` instance comes with reasonable defaults that should be
Expand Down Expand Up @@ -443,6 +437,136 @@ const client = new Client({ http2_session_idle_ms: 6000 });
> **Warning**
> Setting `http2_session_idle_ms` to small values can lead to a race condition where requests cannot be transmitted before the session is closed, yielding `ERR_HTTP2_GOAWAY_SESSION` errors.
## Event Streaming

The driver supports [Event Streaming](https://docs.fauna.com/fauna/current/learn/streaming).

### Start a stream

To get a stream token, append
[`toStream()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/tostream)
or
[`changesOn()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/changeson)
to a set from a [supported
source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources).

To start and subscribe to the stream, pass the stream token to `Client.stream()`:

```javascript
const response = await client.query(fql`
let set = Product.all()
{
initialPage: set.pageSize(10),
streamToken: set.toStream()
}
`);
const { initialPage, streamToken } = response.data;

client.stream(streamToken)
```

You can also pass a query that produces a stream token directly to `Client.stream()`:

```javascript
const query = fql`Product.all().changesOn(.price, .quantity)`

client.stream(query)
```

### Iterate on a stream

You can iterate on the stream using an async loop:

```javascript
try {
for await (const event of stream) {
switch (event.type) {
case "update":
case "add":
case "remove":
console.log("Stream event:", event);
// ...
break;
}
}
} catch (error) {
// An error will be handled here if Fauna returns a terminal, "error" event, or
// if Fauna returns a non-200 response when trying to connect, or
// if the max number of retries on network errors is reached.

// ... handle fatal error
}
```

Or you can use a callback function:

```javascript
stream.start(
function onEvent(event) {
switch (event.type) {
case "update":
case "add":
case "remove":
console.log("Stream event:", event);
// ...
break;
}
},
function onFatalError(error) {
// An error will be handled here if Fauna returns a terminal, "error" event, or
// if Fauna returns a non-200 response when trying to connect, or
// if the max number of retries on network errors is reached.

// ... handle fatal error
}
);
```

### Close a stream

Use `<stream>.close()` to close a stream:

```javascript
const stream = await client.stream(fql`Product.all().toStream()`)

let count = 0;
for await (const event of stream) {
console.log("Stream event:", event);
// ...
count++;

// Close the stream after 2 events
if (count === 2) {
stream.close()
break;
}
}
```

### Stream options

The [client configuration](#client-configuration) sets default options for the
`Client.stream()` method.

You can pass an `options` object to override these defaults:

```javascript
const options = {
long_type: "number",
max_attempts: 5,
max_backoff: 1000,
secret: "YOUR_FAUNA_SECRET",
status_events: true,
};

client.stream(fql`Product.all().toStream()`, options)
```

For supported properties, see [Stream
options](https://docs.fauna.com/fauna/current/drivers/js-client#stream-options)
in the Fauna docs.


## Contributing

Expand Down
71 changes: 71 additions & 0 deletions __tests__/functional/stream-client-configuration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import {
StreamClient,
StreamToken,
getDefaultHTTPClient,
StreamClientConfiguration,
} from "../../src";
import { getDefaultHTTPClientOptions } from "../client";

const defaultHttpClient = getDefaultHTTPClient(getDefaultHTTPClientOptions());
const defaultConfig: StreamClientConfiguration = {
secret: "secret",
long_type: "number",
max_attempts: 3,
max_backoff: 20,
httpStreamClient: defaultHttpClient,
};
const dummyStreamToken = new StreamToken("dummy");

describe("StreamClientConfiguration", () => {
it("can be instantiated directly with a token", () => {
new StreamClient(dummyStreamToken, defaultConfig);
});

it("can be instantiated directly with a lambda", async () => {
new StreamClient(() => Promise.resolve(dummyStreamToken), defaultConfig);
});

it.each`
fieldName
${"long_type"}
${"httpStreamClient"}
${"max_backoff"}
${"max_attempts"}
${"secret"}
`(
"throws a TypeError if $fieldName provided is undefined",
async ({ fieldName }: { fieldName: keyof StreamClientConfiguration }) => {
expect.assertions(1);

const config = { ...defaultConfig };
delete config[fieldName];
try {
new StreamClient(dummyStreamToken, config);
} catch (e: any) {
expect(e).toBeInstanceOf(TypeError);
}
},
);

it("throws a RangeError if 'max_backoff' is less than or equal to zero", async () => {
expect.assertions(1);

const config = { ...defaultConfig, max_backoff: 0 };
try {
new StreamClient(dummyStreamToken, config);
} catch (e: any) {
expect(e).toBeInstanceOf(RangeError);
}
});

it("throws a RangeError if 'max_attempts' is less than or equal to zero", async () => {
expect.assertions(1);

const config = { ...defaultConfig, max_attempts: 0 };
try {
new StreamClient(dummyStreamToken, config);
} catch (e: any) {
expect(e).toBeInstanceOf(RangeError);
}
});
});
16 changes: 14 additions & 2 deletions __tests__/integration/doc.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,23 @@ describe("querying for doc types", () => {
expect(result.data.module).toBeInstanceOf(Module);
expect(result.data.document).toBeInstanceOf(Document);
expect(result.data.document.documentReference).toBeInstanceOf(
DocumentReference
DocumentReference,
);
expect(result.data.document.namedDocumentReference).toBeInstanceOf(
NamedDocumentReference
NamedDocumentReference,
);
expect(result.data.namedDocument).toBeInstanceOf(NamedDocument);
});

it("can set and read ttl", async () => {
const queryBuilder = fql`${testDoc}`;
const result = await client.query<Document>(queryBuilder);

expect(result.data.ttl).toBeUndefined();

const queryBuilderUpdate = fql`${testDoc}.update({ ttl: Time.now().add(1, "day") })`;
const resultUpdate = await client.query<Document>(queryBuilderUpdate);

expect(resultUpdate.data.ttl).toBeInstanceOf(TimeStub);
});
});
Loading

0 comments on commit b534d62

Please sign in to comment.