Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release/0.3.2 #3

Merged
merged 12 commits into from
Dec 15, 2023
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ic-websocket-js",
"version": "0.3.1",
"version": "0.3.2",
"description": "IC WebSocket on the Internet Computer",
"license": "MIT",
"repository": {
Expand Down
303 changes: 234 additions & 69 deletions src/ic-websocket.test.ts

Large diffs are not rendered by default.

64 changes: 55 additions & 9 deletions src/ic-websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { IDL } from "@dfinity/candid";
import { Principal } from "@dfinity/principal";
import {
CanisterAckMessageContent,
CanisterCloseMessageContent,
CanisterWsMessageArguments,
ClientKeepAliveMessageContent,
ClientKey,
Expand Down Expand Up @@ -36,10 +37,15 @@ import {
import { WsAgent } from "./agent";

/**
* The default expiration time for receiving an ack message from the canister after sending a message.
* It's **3/2 times** the canister's default send ack period.
* The default interval (in milliseconds) at which the canister sends an ack message.
*/
const DEFAULT_ACK_MESSAGE_TIMEOUT_MS = 450_000;
const DEFAULT_ACK_MESSAGE_INTERVAL_MS = 300_000;
/**
* The maximum communication latency allowed between the client and the canister (same as in the canister).
*
* Used to determine the ack message timeout.
*/
export const COMMUNICATION_LATENCY_BOUND_MS = 30_000;

/**
* Interface to create a new IcWebSocketConfig. For a simple configuration, use {@link createWsConfig}.
Expand All @@ -62,12 +68,12 @@ export interface IcWebSocketConfig<S extends _WS_CANISTER_SERVICE> {
*/
networkUrl: string;
/**
* The expiration (in milliseconds) time for receiving an ack message from the canister after sending a message.
* If the ack message is not received within this time, the connection will be closed.
* This parameter should always me **3/2 times or more** the canister's send ack period.
* @default 450_000 (7.5 minutes = 3/2 default send ack period on the canister)
* The interval (in milliseconds) at which the canister sends an ack message.
* This parameter must be **equal** to the canister's send ack interval.
*
* @default 300_000 (default send ack period on the canister)
*/
ackMessageTimeout?: number;
ackMessageIntervalMs?: number;
/**
* The maximum age of the certificate received from the canister, in minutes. You won't likely need to set this parameter. Used in tests.
*
Expand Down Expand Up @@ -104,6 +110,7 @@ export default class IcWebSocket<
private _clientKey: ClientKey;
private _gatewayPrincipal: Principal | null = null;
private _maxCertificateAgeInMinutes = 5;
private _openTimeout: NodeJS.Timeout | null = null;

onclose: ((this: IcWebSocket<S, ApplicationMessageType>, ev: CloseEvent) => any) | null = null;
onerror: ((this: IcWebSocket<S, ApplicationMessageType>, ev: ErrorEvent) => any) | null = null;
Expand Down Expand Up @@ -174,7 +181,7 @@ export default class IcWebSocket<
});

this._ackMessagesQueue = new AckMessagesQueue({
expirationMs: config.ackMessageTimeout || DEFAULT_ACK_MESSAGE_TIMEOUT_MS,
expirationMs: (config.ackMessageIntervalMs || DEFAULT_ACK_MESSAGE_INTERVAL_MS) + COMMUNICATION_LATENCY_BOUND_MS,
timeoutExpiredCallback: this._onAckMessageTimeout.bind(this),
});

Expand Down Expand Up @@ -226,6 +233,27 @@ export default class IcWebSocket<
this._incomingMessagesQueue.addAndProcess(event.data);
}

private _startOpenTimeout() {
// the timeout is double the maximum allowed network latency,
// because opening the connection involves a message sent by the client and one by the canister
this._openTimeout = setTimeout(() => {
if (!this._isConnectionEstablished) {
logger.error("[onWsOpen] Error: Open timeout expired before receiving the open message");
this._callOnErrorCallback(new Error("Open timeout expired before receiving the open message"));
this._wsInstance.close(4000, "Open connection timeout");
}

this._openTimeout = null;
}, 2 * COMMUNICATION_LATENCY_BOUND_MS);
}

private _cancelOpenTimeout() {
if (this._openTimeout) {
clearTimeout(this._openTimeout);
this._openTimeout = null;
}
}

private async _handleHandshakeMessage(handshakeMessage: GatewayHandshakeMessage): Promise<boolean> {
// at this point, we're sure that the gateway_principal is valid
// because the isGatewayHandshakeMessage function checks it
Expand All @@ -234,6 +262,8 @@ export default class IcWebSocket<

try {
await this._sendOpenMessage();

this._startOpenTimeout();
} catch (error) {
logger.error("[onWsMessage] Handshake message error:", error);
// if a handshake message fails, we can't continue
Expand Down Expand Up @@ -335,12 +365,17 @@ export default class IcWebSocket<
}

this._isConnectionEstablished = true;
this._cancelOpenTimeout();

this._callOnOpenCallback();

this._outgoingMessagesQueue.enableAndProcess();
} else if ("AckMessage" in serviceMessage) {
await this._handleAckMessageFromCanister(serviceMessage.AckMessage);
} else if ("CloseMessage" in serviceMessage) {
await this._handleCloseMessageFromCanister(serviceMessage.CloseMessage);
// we don't have to process any further message (there shouldn't be any anyway)
return false;
} else {
throw new Error("Invalid service message from canister");
}
Expand Down Expand Up @@ -369,6 +404,17 @@ export default class IcWebSocket<
await this._sendKeepAliveMessage();
}

private async _handleCloseMessageFromCanister(content: CanisterCloseMessageContent): Promise<void> {
if ("ClosedByApplication" in content.reason) {
logger.debug("[onWsMessage] Received close message from canister. Reason: ClosedByApplication");
this._wsInstance.close(4001, "ClosedByApplication");
} else {
logger.error("[onWsMessage] Received close message from canister. Reason:", content.reason);
this._callOnErrorCallback(new Error(`Received close message from canister. Reason: ${content.reason}`));
this._wsInstance.close(4000, "Received close message from canister");
}
}

private async _sendKeepAliveMessage(): Promise<void> {
const keepAliveMessageContent: ClientKeepAliveMessageContent = {
last_incoming_sequence_num: this._incomingSequenceNum - BigInt(1),
Expand Down
24 changes: 24 additions & 0 deletions src/idl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,26 @@ export type CanisterAckMessageContent = {
export type ClientKeepAliveMessageContent = {
'last_incoming_sequence_num': bigint,
};
export type CloseMessageReason = {
WrongSequenceNumber: null,
} | {
InvalidServiceMessage: null,
} | {
KeepAliveTimeout: null,
} | {
ClosedByApplication: null
};
export type CanisterCloseMessageContent = {
reason: CloseMessageReason,
};
export type WebsocketServiceMessageContent = {
OpenMessage: CanisterOpenMessageContent,
} | {
AckMessage: CanisterAckMessageContent,
} | {
KeepAliveMessage: ClientKeepAliveMessageContent,
} | {
CloseMessage: CanisterCloseMessageContent,
};

const CanisterOpenMessageContentIdl = IDL.Record({
Expand All @@ -95,10 +109,20 @@ const CanisterAckMessageContentIdl = IDL.Record({
const ClientKeepAliveMessageContentIdl = IDL.Record({
'last_incoming_sequence_num': IDL.Nat64,
});
const CloseMessageReasonIdl = IDL.Variant({
'WrongSequenceNumber': IDL.Null,
'InvalidServiceMessage': IDL.Null,
'KeepAliveTimeout': IDL.Null,
'ClosedByApplication': IDL.Null,
});
const CanisterCloseMessageContentIdl = IDL.Record({
'reason': CloseMessageReasonIdl,
})
const WebsocketServiceMessageContentIdl = IDL.Variant({
'OpenMessage': CanisterOpenMessageContentIdl,
'AckMessage': CanisterAckMessageContentIdl,
'KeepAliveMessage': ClientKeepAliveMessageContentIdl,
'CloseMessage': CanisterCloseMessageContentIdl,
});

export const decodeWebsocketServiceMessageContent = (bytes: Uint8Array): WebsocketServiceMessageContent => {
Expand Down
12 changes: 6 additions & 6 deletions src/queues.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ describe("BaseQueue", () => {

beforeEach(() => {
queue = new BaseQueue({
itemCallback: (message: string) => true,
itemCallback: (_: string) => true,
});
});

Expand Down Expand Up @@ -121,6 +121,7 @@ describe("AckMessagesQueue", () => {
const expirationMs = 1000;

beforeEach(() => {
jest.useFakeTimers();
queue = new AckMessagesQueue({
expirationMs,
timeoutExpiredCallback: jest.fn(),
Expand All @@ -140,7 +141,7 @@ describe("AckMessagesQueue", () => {
});

it("should call the timeoutExpiredCallback for expired items when not receiving any ack", () => {
jest.useFakeTimers().setSystemTime(Date.now() + expirationMs + 1);
jest.setSystemTime(Date.now() + expirationMs + 1);
queue.add(BigInt(1));
jest.advanceTimersByTime(expirationMs + 1);
expect(queue.last()).toBeNull();
Expand Down Expand Up @@ -170,19 +171,18 @@ describe("AckMessagesQueue", () => {

it("should call the timeoutExpiredCallback for expired items when receiving the ack", () => {
queue.add(BigInt(1));
jest.useFakeTimers().setSystemTime(Date.now() + expirationMs + 1);
queue.add(BigInt(2));
queue.add(BigInt(3));
jest.setSystemTime(Date.now() + expirationMs + 1);
queue.ack(BigInt(1));
jest.advanceTimersByTime(expirationMs + 1);
expect(queue.last()).toBeNull();
expect(queue["_timeoutExpiredCallback"]).toHaveBeenCalledWith([BigInt(2)]);
expect(queue["_timeoutExpiredCallback"]).toHaveBeenCalledWith([BigInt(2), BigInt(3)]);
});

it("should call the timeoutExpiredCallback for all expired items after not receiving the ack", () => {
queue.add(BigInt(1));
queue.add(BigInt(2));
queue.add(BigInt(3));
jest.useFakeTimers();
queue.ack(BigInt(1));
jest.advanceTimersByTime(expirationMs);
expect(queue.last()).toBeNull();
Expand Down
11 changes: 4 additions & 7 deletions src/queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,10 @@ export class AckMessagesQueue {
}

// for the remaining items in the queue, check if they have expired
// if yes, call the callback for the first expired item
for (const item of this._queue) {
if (Date.now() - item.addedAt >= this._expirationMs) {
// if it has expired and is still in the queue,
// it means it has not been acked, so we call the callback
return this._onTimeoutExpired([item]);
}
// if yes, call the callback for the expired items
const expiredItems = this._queue.filter((item) => Date.now() - item.addedAt >= this._expirationMs);
if (expiredItems.length > 0) {
return this._onTimeoutExpired(expiredItems);
}

this._restartLastAckTimeout();
Expand Down
6 changes: 2 additions & 4 deletions src/test/clients.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ import { Principal } from "@dfinity/principal";

export const canisterId = Principal.fromText("bnz7o-iuaaa-aaaaa-qaaaa-cai");

// Principal: "pmisz-prtlk-b6oe6-bj4fl-6l5fy-h7c2h-so6i7-jiz2h-bgto7-piqfr-7ae"
// const client1Seed = "rabbit fun moral twin food kangaroo egg among adjust pottery measure seek";
export const client1Key: ClientKey = {
client_principal: Principal.fromText("pmisz-prtlk-b6oe6-bj4fl-6l5fy-h7c2h-so6i7-jiz2h-bgto7-piqfr-7ae"),
client_nonce: BigInt("5768810803147064100"),
client_principal: Principal.fromText("kj67s-b5v2y-ahlkr-kmume-xbow6-zwbtj-j4j3m-ae46e-qqrcu-uxiby-yae"),
client_nonce: BigInt("385892949151814926"),
};
5 changes: 4 additions & 1 deletion src/test/constants.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import { fromHex } from "@dfinity/agent";
import { Principal } from "@dfinity/principal";

export const GATEWAY_PRINCIPAL = Principal.fromText("i3gux-m3hwt-5mh2w-t7wwm-fwx5j-6z6ht-hxguo-t4rfw-qp24z-g5ivt-2qe");
export const GATEWAY_PRINCIPAL = Principal.fromText("sqdfl-mr4km-2hfjy-gajqo-xqvh7-hf4mf-nra4i-3it6l-neaw4-soolw-tae");

export const LOCAL_REPLICA_ROOT_KEY = fromHex("d9d9f7a66e69635f6170695f76657273696f6e66302e31382e3068726f6f745f6b65795885308182301d060d2b0601040182dc7c0503010201060c2b0601040182dc7c050302010361008005229d89a17c6f9ec403a4b1a8aa103fc48055046c95f1e60ee2fbfb0bb23ab21617a93f48b99b1199ac89008cf3cf0a83e9da35f5cf27d0d51535ceff89c43ee236c31c3a7865cc6b333194ad3f7155b2931a7ffec2066777dffb20f277ca6c696d706c5f76657273696f6e65302e382e3069696d706c5f68617368784064613931633732316637386462393433346561336630303437383939383836346439313731346538626561363862333963633736326662306263383937313662757265706c6963615f6865616c74685f737461747573676865616c746879706365727469666965645f68656967687418d4");
Loading