Skip to content

Commit

Permalink
Add some integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ptpaterson committed Mar 20, 2024
1 parent d1d63ef commit 49a49a8
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 16 deletions.
4 changes: 2 additions & 2 deletions __tests__/integration/set.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ describe("SetIterator", () => {
beforeAll(async () => {
await client.query(fql`
if (Collection.byName("IterTestSmall") != null) {
IterTestSmall.definition.delete()
Collection.byName("IterTestSmall")!.delete()
}
if (Collection.byName("IterTestBig") != null) {
IterTestBig.definition.delete()
Collection.byName("IterTestBig")!.delete()
}
`);
await client.query(fql`
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
"build:node": "esbuild src/index.ts --bundle --sourcemap --platform=node --outfile=dist/node/index.js",
"build:types": "tsc -emitDeclarationOnly --declaration true",
"lint": "eslint -f unix \"src/**/*.{ts,tsx}\"",
"fauna-local": "docker start faunadb-local || docker run --rm -d --name faunadb-local -p 8443:8443 -p 8084:8084 fauna/faunadb",
"fauna-local": "docker start faunadb-local || docker run --rm -d --name faunadb-local -p 8443:8443 -p 8084:8084 --mount type=bind,source=\"$(pwd)\"/docker/feature-flags.json,target=/etc/feature-flag-periodic.d/feature-flags.json fauna/faunadb",
"fauna-local-alt-port": "docker start faunadb-local-alt-port || docker run --rm -d --name faunadb-local-alt-port -p 7443:8443 -p 7084:8084 fauna/faunadb",
"prepare": "husky install",
"test": "yarn fauna-local; yarn fauna-local-alt-port; ./prepare-test-env.sh; jest",
Expand Down
60 changes: 52 additions & 8 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,10 @@ export class Client {
* ```
*/
// TODO: implement options
stream(
stream<T extends QueryValue>(
tokenOrQuery: StreamToken | Query,
options?: Partial<StreamClientConfiguration>
): StreamClient {
): StreamClient<T> {
if (this.#isClosed) {
throw new ClientClosedError(
"Your client is closed. No further requests can be issued."
Expand Down Expand Up @@ -642,6 +642,8 @@ in an environmental variable named FAUNA_SECRET or pass it to the Client\
"query_timeout_ms",
"fetch_keepalive",
"http2_max_streams",
"max_backoff",
"max_attempts",
];
required_options.forEach((option) => {
if (config[option] === undefined) {
Expand All @@ -664,13 +666,21 @@ in an environmental variable named FAUNA_SECRET or pass it to the Client\
if (config.query_timeout_ms <= 0) {
throw new RangeError(`'query_timeout_ms' must be greater than zero.`);
}

if (config.max_backoff <= 0) {
throw new RangeError(`'max_backoff' must be greater than zero.`);
}

if (config.max_attempts <= 0) {
throw new RangeError(`'max_attempts' must be greater than zero.`);
}
}
}

/**
* A class to listen to Fauna streams.
*/
export class StreamClient {
export class StreamClient<T extends QueryValue = any> {
/** Whether or not this stream has been closed */
closed = false;
/** The stream client options */
Expand Down Expand Up @@ -709,6 +719,8 @@ export class StreamClient {
}

this.#clientConfiguration = clientConfiguration;

this.#validateConfiguration();
}

/**
Expand All @@ -719,8 +731,8 @@ export class StreamClient {
* provided, error will not be handled, and the stream will simply end.
*/
start(
onEvent: (event: StreamEventData | StreamEventStatus) => void,
onError: (error: Error) => void
onEvent: (event: StreamEventData<T> | StreamEventStatus) => void,
onError?: (error: Error) => void
) {
if (typeof onEvent !== "function") {
throw new TypeError(
Expand All @@ -747,7 +759,7 @@ export class StreamClient {
}

async *[Symbol.asyncIterator](): AsyncGenerator<
StreamEventData | StreamEventStatus
StreamEventData<T> | StreamEventStatus
> {
if (this.closed) {
throw new ClientError("The stream has been closed and cannot be reused.");
Expand Down Expand Up @@ -807,7 +819,7 @@ export class StreamClient {

async *#startStream(
start_ts?: number
): AsyncGenerator<StreamEventData | StreamEventStatus> {
): AsyncGenerator<StreamEventData<T> | StreamEventStatus> {
// Safety: This method must only be called after a stream token has been acquired
const streamToken = this.#streamToken as StreamToken;

Expand All @@ -825,7 +837,7 @@ export class StreamClient {

for await (const event of streamAdapter.read) {
// stream events are always tagged
const deserializedEvent: StreamEvent = TaggedTypeFormat.decode(event, {
const deserializedEvent: StreamEvent<T> = TaggedTypeFormat.decode(event, {
long_type: this.#clientConfiguration.long_type,
});

Expand All @@ -838,6 +850,11 @@ export class StreamClient {

this.#last_ts = deserializedEvent.txn_ts;

// TODO: remove this once all environments have updated the events to use "status" instead of "start"
if ((deserializedEvent.type as any) === "start") {
deserializedEvent.type = "status";
}

if (
!this.#clientConfiguration.status_events &&
deserializedEvent.type === "status"
Expand All @@ -848,6 +865,33 @@ export class StreamClient {
yield deserializedEvent;
}
}

#validateConfiguration() {
const config = this.#clientConfiguration;

const required_options: (keyof StreamClientConfiguration)[] = [
"long_type",
"httpStreamClient",
"max_backoff",
"max_attempts",
"secret",
];
required_options.forEach((option) => {
if (config[option] === undefined) {
throw new TypeError(
`ClientConfiguration option '${option}' must be defined.`
);
}
});

if (config.max_backoff <= 0) {
throw new RangeError(`'max_backoff' must be greater than zero.`);
}

if (config.max_attempts <= 0) {
throw new RangeError(`'max_attempts' must be greater than zero.`);
}
}
}

// Private types and constants for internal logic.
Expand Down
4 changes: 3 additions & 1 deletion src/http-client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ export * from "./fetch-client";
export * from "./http-client";
export * from "./node-http2-client";

export const getDefaultHTTPClient = (options: HTTPClientOptions): HTTPClient =>
export const getDefaultHTTPClient = (
options: HTTPClientOptions
): HTTPClient & HTTPStreamClient =>
nodeHttp2IsSupported()
? NodeHTTP2Client.getClient(options)
: new FetchClient(options);
Expand Down
8 changes: 4 additions & 4 deletions src/wire-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,14 +331,14 @@ export type StreamEventStatus = {
txn_ts: number;
stats: QueryStats;
};
export type StreamEventData = {
export type StreamEventData<T extends QueryValue> = {
type: "add" | "remove" | "update";
txn_ts: number;
stats: QueryStats;
data: QueryValue;
data: T;
};
export type StreamEventError = { type: "error" } & QueryFailure;
export type StreamEvent =
export type StreamEvent<T extends QueryValue> =
| StreamEventStatus
| StreamEventData
| StreamEventData<T>
| StreamEventError;

0 comments on commit 49a49a8

Please sign in to comment.