Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release/0.3.2 #5

Merged
merged 18 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/ic-websocket-cdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ const LABEL_WEBSOCKET: &[u8] = b"websocket";
const DEFAULT_MAX_NUMBER_OF_RETURNED_MESSAGES: usize = 50;
/// The default interval at which to send acknowledgements to the client.
const DEFAULT_SEND_ACK_INTERVAL_MS: u64 = 300_000; // 5 minutes
/// The maximum latency allowed between the client and the canister.
const MAX_ALLOWED_CONNECTION_LATENCY_MS: u64 = 30_000; // 30 seconds
ilbertt marked this conversation as resolved.
Show resolved Hide resolved
/// The default timeout to wait for the client to send a keep alive after receiving an acknowledgement.
const DEFAULT_CLIENT_KEEP_ALIVE_TIMEOUT_MS: u64 = 60_000; // 1 minute
const CLIENT_KEEP_ALIVE_TIMEOUT_MS: u64 = 2 * MAX_ALLOWED_CONNECTION_LATENCY_MS;
/// Same as [CLIENT_KEEP_ALIVE_TIMEOUT_MS], but in nanoseconds.
const CLIENT_KEEP_ALIVE_TIMEOUT_NS: u64 = CLIENT_KEEP_ALIVE_TIMEOUT_MS * 1_000_000;

/// The initial nonce for outgoing messages.
const INITIAL_OUTGOING_MESSAGE_NONCE: u64 = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use super::utils::{
clients::{generate_random_client_key, CLIENT_1_KEY, GATEWAY_1},
messages::{get_next_polling_nonce_from_messages, verify_messages, AppMessage},
test_env::{
get_test_env, DEFAULT_TEST_KEEP_ALIVE_TIMEOUT_MS,
DEFAULT_TEST_MAX_NUMBER_OF_RETURNED_MESSAGES, DEFAULT_TEST_SEND_ACK_INTERVAL_MS,
get_test_env, DEFAULT_TEST_MAX_NUMBER_OF_RETURNED_MESSAGES,
DEFAULT_TEST_SEND_ACK_INTERVAL_MS,
},
};

Expand Down Expand Up @@ -168,7 +168,6 @@ proptest! {
get_test_env().reset_canister(
1_000, // avoid the queue size limit
DEFAULT_TEST_SEND_ACK_INTERVAL_MS,
DEFAULT_TEST_KEEP_ALIVE_TIMEOUT_MS,
);
// second, register client 1
let client_1_key = CLIENT_1_KEY.deref();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
types::CloseMessageReason,
CanisterOutputCertifiedMessages, CanisterWsCloseArguments, CanisterWsCloseResult,
CanisterWsGetMessagesArguments, CanisterWsMessageArguments, ClientKeepAliveMessageContent,
WebsocketServiceMessageContent,
WebsocketServiceMessageContent, CLIENT_KEEP_ALIVE_TIMEOUT_MS,
};

use super::utils::{
Expand All @@ -22,9 +22,7 @@ use super::utils::{
create_websocket_message, decode_websocket_service_message_content,
encode_websocket_service_message_content, get_websocket_message_from_canister_message,
},
test_env::{
get_test_env, DEFAULT_TEST_KEEP_ALIVE_TIMEOUT_MS, DEFAULT_TEST_SEND_ACK_INTERVAL_MS,
},
test_env::{get_test_env, DEFAULT_TEST_SEND_ACK_INTERVAL_MS},
};

#[test]
Expand Down Expand Up @@ -72,7 +70,7 @@ fn test_2_client_is_removed_if_keep_alive_timeout_is_reached() {
helpers::check_ack_message_result(&msgs, client_1_key, 0, 2);

// advance the canister time to make sure the keep alive timeout expires
get_test_env().advance_canister_time_ms(DEFAULT_TEST_KEEP_ALIVE_TIMEOUT_MS);
get_test_env().advance_canister_time_ms(CLIENT_KEEP_ALIVE_TIMEOUT_MS);

// check if the gateway put the close message in the queue
let msgs = call_ws_get_messages_with_panic(
Expand Down Expand Up @@ -137,7 +135,7 @@ fn test_3_client_is_not_removed_if_it_sends_a_keep_alive_before_timeout() {
},
);
// advance the canister time to make sure the keep alive timeout expires and the canister checks the keep alive
get_test_env().advance_canister_time_ms(DEFAULT_TEST_KEEP_ALIVE_TIMEOUT_MS);
get_test_env().advance_canister_time_ms(CLIENT_KEEP_ALIVE_TIMEOUT_MS);
// send a message to the canister to see the sequence number increasing in the ack message
// and be sure that the client has not been removed
call_ws_message_with_panic(
Expand All @@ -147,9 +145,8 @@ fn test_3_client_is_not_removed_if_it_sends_a_keep_alive_before_timeout() {
},
);
// wait for the canister to send the next ack
get_test_env().advance_canister_time_ms(
DEFAULT_TEST_SEND_ACK_INTERVAL_MS - DEFAULT_TEST_KEEP_ALIVE_TIMEOUT_MS,
);
get_test_env()
.advance_canister_time_ms(DEFAULT_TEST_SEND_ACK_INTERVAL_MS - CLIENT_KEEP_ALIVE_TIMEOUT_MS);
let res = call_ws_get_messages_with_panic(
GATEWAY_1.deref(),
CanisterWsGetMessagesArguments { nonce: 2 }, // skip the service open message and the fist ack message
Expand Down Expand Up @@ -183,11 +180,10 @@ fn test_4_client_is_not_removed_if_it_connects_while_canister_is_waiting_for_kee
);

// wait for the keep alive timeout to expire
get_test_env().advance_canister_time_ms(DEFAULT_TEST_KEEP_ALIVE_TIMEOUT_MS);
get_test_env().advance_canister_time_ms(CLIENT_KEEP_ALIVE_TIMEOUT_MS);
// wait for the canister to send the next ack
get_test_env().advance_canister_time_ms(
DEFAULT_TEST_SEND_ACK_INTERVAL_MS - DEFAULT_TEST_KEEP_ALIVE_TIMEOUT_MS,
);
get_test_env()
.advance_canister_time_ms(DEFAULT_TEST_SEND_ACK_INTERVAL_MS - CLIENT_KEEP_ALIVE_TIMEOUT_MS);

let res = call_ws_get_messages_with_panic(
GATEWAY_1.deref(),
Expand Down
19 changes: 3 additions & 16 deletions src/ic-websocket-cdk/src/tests/integration_tests/utils/test_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@ pub const DEFAULT_TEST_MAX_NUMBER_OF_RETURNED_MESSAGES: u64 = 50;
/// Value: `300_000` = 5 minutes
pub const DEFAULT_TEST_SEND_ACK_INTERVAL_MS: u64 = 300_000;

/// The interval between keep alive checks in the canister.
/// Set to a high value to make sure the canister doesn't reset the client while testing other functions.
///
/// Value: `120_000` (2 minutes)
pub const DEFAULT_TEST_KEEP_ALIVE_TIMEOUT_MS: u64 = 120_000;

lazy_static! {
pub static ref TEST_ENV: Mutex<TestEnv> = Mutex::new(TestEnv::new());
}
Expand All @@ -41,8 +35,8 @@ pub struct TestEnv {
root_ic_key: Vec<u8>,
}

/// (`max_number_or_returned_messages`, `send_ack_interval_ms`, `send_ack_timeout_ms`)
type CanisterInitArgs = (u64, u64, u64);
/// (`max_number_or_returned_messages`, `send_ack_interval_ms`)
type CanisterInitArgs = (u64, u64);

impl TestEnv {
pub fn new() -> Self {
Expand All @@ -62,7 +56,6 @@ impl TestEnv {
let arguments: CanisterInitArgs = (
DEFAULT_TEST_MAX_NUMBER_OF_RETURNED_MESSAGES,
DEFAULT_TEST_SEND_ACK_INTERVAL_MS,
DEFAULT_TEST_KEEP_ALIVE_TIMEOUT_MS,
);
pic.install_canister(
canister_id,
Expand All @@ -86,13 +79,8 @@ impl TestEnv {
&mut self,
max_number_or_returned_messages: u64,
send_ack_interval_ms: u64,
keep_alive_delay_ms: u64,
) {
let arguments: CanisterInitArgs = (
max_number_or_returned_messages,
send_ack_interval_ms,
keep_alive_delay_ms,
);
let arguments: CanisterInitArgs = (max_number_or_returned_messages, send_ack_interval_ms);
let res = self.pic.reinstall_canister(
self.canister_id,
self.wasm_module.to_owned(),
Expand All @@ -115,7 +103,6 @@ impl TestEnv {
self.reset_canister(
DEFAULT_TEST_MAX_NUMBER_OF_RETURNED_MESSAGES,
DEFAULT_TEST_SEND_ACK_INTERVAL_MS,
DEFAULT_TEST_KEEP_ALIVE_TIMEOUT_MS,
);
}

Expand Down
32 changes: 10 additions & 22 deletions src/ic-websocket-cdk/src/tests/unit_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,38 +119,25 @@ fn test_ws_init_params() {
DEFAULT_MAX_NUMBER_OF_RETURNED_MESSAGES
);
assert_eq!(params.send_ack_interval_ms, DEFAULT_SEND_ACK_INTERVAL_MS);
assert_eq!(
params.keep_alive_timeout_ms,
DEFAULT_CLIENT_KEEP_ALIVE_TIMEOUT_MS
);

let params = WsInitParams::new(handlers.clone())
.with_max_number_of_returned_messages(5)
.with_send_ack_interval_ms(10)
.with_keep_alive_timeout_ms(2);
.with_send_ack_interval_ms(120_000);
assert_eq!(params.max_number_of_returned_messages, 5);
assert_eq!(params.send_ack_interval_ms, 10);
assert_eq!(params.keep_alive_timeout_ms, 2);
assert_eq!(params.send_ack_interval_ms, 120_000);
}

#[test]
#[should_panic = "send_ack_interval_ms must be greater than keep_alive_timeout_ms"]
#[should_panic = "send_ack_interval_ms must be greater than CLIENT_KEEP_ALIVE_TIMEOUT_MS"]
fn test_ws_init_params_keep_alive_greater() {
let params = WsInitParams::new(WsHandlers::default())
.with_send_ack_interval_ms(5)
.with_keep_alive_timeout_ms(10);

params.check_validity();
WsInitParams::new(WsHandlers::default()).with_send_ack_interval_ms(5);
}

#[test]
#[should_panic = "send_ack_interval_ms must be greater than keep_alive_timeout_ms"]
#[should_panic = "send_ack_interval_ms must be greater than CLIENT_KEEP_ALIVE_TIMEOUT_MS"]
fn test_ws_init_params_keep_alive_equal() {
let params = WsInitParams::new(WsHandlers::default())
.with_send_ack_interval_ms(10)
.with_keep_alive_timeout_ms(10);

params.check_validity();
WsInitParams::new(WsHandlers::default())
.with_send_ack_interval_ms(CLIENT_KEEP_ALIVE_TIMEOUT_MS);
}

#[test]
Expand Down Expand Up @@ -901,13 +888,14 @@ proptest! {
}

#[test]
fn test_delete_old_messages_for_gateway_queue(
fn test_delete_old_messages_for_gateway(
gateway_principal in any::<u8>().prop_map(|_| common::generate_random_principal()),
old_messages_count in 0..100usize,
new_messages_count in 0..100usize,
test_ack_interval_ms in (1..100u64),
test_ack_interval_delta_ms in (1..100u64),
) {
// Set up
let test_ack_interval_ms = CLIENT_KEEP_ALIVE_TIMEOUT_MS + test_ack_interval_delta_ms;
PARAMS.with(|p|
*p.borrow_mut() = WsInitParams::new(WsHandlers::default())
.with_send_ack_interval_ms(test_ack_interval_ms)
Expand Down
20 changes: 11 additions & 9 deletions src/ic-websocket-cdk/src/timers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use std::cell::RefCell;
use std::rc::Rc;
use std::time::Duration;

use crate::custom_print;
use crate::state::*;
use crate::types::*;
use crate::utils::*;
use crate::{custom_print, CLIENT_KEEP_ALIVE_TIMEOUT_MS, CLIENT_KEEP_ALIVE_TIMEOUT_NS};

thread_local! {
/// The acknowledgement active timer.
Expand Down Expand Up @@ -61,10 +61,12 @@ pub(crate) fn schedule_send_ack_to_clients() {
///
/// The timer callback is [check_keep_alive_timer_callback].
fn schedule_check_keep_alive() {
let keep_alive_timeout_ms = get_params().keep_alive_timeout_ms;
let timer_id = set_timer(Duration::from_millis(keep_alive_timeout_ms), move || {
check_keep_alive_timer_callback(keep_alive_timeout_ms);
});
let timer_id = set_timer(
Duration::from_millis(CLIENT_KEEP_ALIVE_TIMEOUT_MS),
move || {
check_keep_alive_timer_callback();
},
);

put_keep_alive_timer_id(timer_id);
}
Expand Down Expand Up @@ -110,7 +112,7 @@ fn send_ack_to_clients_timer_callback() {

/// Checks if the clients for which we are waiting for keep alive have sent a keep alive message.
/// If a client has not sent a keep alive message, it is removed from the connected clients.
fn check_keep_alive_timer_callback(keep_alive_timeout_ms: u64) {
fn check_keep_alive_timer_callback() {
let client_keys_to_remove: Vec<ClientKey> = CLIENTS_WAITING_FOR_KEEP_ALIVE
.with(Rc::clone)
.borrow()
Expand All @@ -121,7 +123,7 @@ fn check_keep_alive_timer_callback(keep_alive_timeout_ms: u64) {
REGISTERED_CLIENTS.with(Rc::clone).borrow().get(client_key)
{
let last_keep_alive = client_metadata.get_last_keep_alive_timestamp();
if get_current_time() - last_keep_alive > (keep_alive_timeout_ms * 1_000_000) {
if get_current_time() - last_keep_alive > CLIENT_KEEP_ALIVE_TIMEOUT_NS {
Some(client_key.to_owned())
} else {
None
Expand All @@ -136,9 +138,9 @@ fn check_keep_alive_timer_callback(keep_alive_timeout_ms: u64) {
remove_client(&client_key, Some(CloseMessageReason::KeepAliveTimeout));

custom_print!(
"[check-keep-alive-timer-cb]: Client {} has not sent a keep alive message in the last {} ms and has been removed",
"[check-keep-alive-timer-cb]: Client {} has not sent a keep alive message in the last {}ms and has been removed",
client_key,
keep_alive_timeout_ms
CLIENT_KEEP_ALIVE_TIMEOUT_MS
);
}

Expand Down
33 changes: 15 additions & 18 deletions src/ic-websocket-cdk/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
use serde_cbor::Serializer;

use crate::{
custom_trap, errors::WsError, utils::get_current_time, DEFAULT_CLIENT_KEEP_ALIVE_TIMEOUT_MS,
custom_trap, errors::WsError, utils::get_current_time, CLIENT_KEEP_ALIVE_TIMEOUT_MS,
DEFAULT_MAX_NUMBER_OF_RETURNED_MESSAGES, DEFAULT_SEND_ACK_INTERVAL_MS,
INITIAL_OUTGOING_MESSAGE_NONCE,
};
Expand Down Expand Up @@ -397,21 +397,16 @@ pub struct WsInitParams {
/// The callback handlers for the WebSocket.
pub handlers: WsHandlers,
/// The maximum number of messages to be returned in a polling iteration.
///
/// Defaults to `50`.
pub max_number_of_returned_messages: usize,
/// The interval at which to send an acknowledgement message to the client,
/// so that the client knows that all the messages it sent have been received by the canister (in milliseconds).
///
/// Must be greater than `keep_alive_timeout_ms`.
/// Must be greater than [`CLIENT_KEEP_ALIVE_TIMEOUT_MS`] (1 minute).
///
/// Defaults to `300_000` (5 minutes).
pub send_ack_interval_ms: u64,
/// The delay to wait for the client to send a keep alive after receiving an acknowledgement (in milliseconds).
///
/// Must be lower than `send_ack_interval_ms`.
///
/// Defaults to `60_000` (1 minute).
pub keep_alive_timeout_ms: u64,
}

impl WsInitParams {
Expand All @@ -428,13 +423,13 @@ impl WsInitParams {
}

/// Checks the validity of the timer parameters.
/// `send_ack_interval_ms` must be greater than `keep_alive_timeout_ms`.
/// `send_ack_interval_ms` must be greater than [`CLIENT_KEEP_ALIVE_TIMEOUT_MS`].
///
/// # Traps
/// If `send_ack_interval_ms` <= `keep_alive_timeout_ms`.
/// If `send_ack_interval_ms` <= [`CLIENT_KEEP_ALIVE_TIMEOUT_MS`].
pub(crate) fn check_validity(&self) {
if self.keep_alive_timeout_ms >= self.send_ack_interval_ms {
custom_trap!("send_ack_interval_ms must be greater than keep_alive_timeout_ms");
if self.send_ack_interval_ms <= CLIENT_KEEP_ALIVE_TIMEOUT_MS {
custom_trap!("send_ack_interval_ms must be greater than CLIENT_KEEP_ALIVE_TIMEOUT_MS");
}
}

Expand All @@ -446,13 +441,16 @@ impl WsInitParams {
self
}

/// Sets the interval (in milliseconds) at which to send an acknowledgement message
/// to the connected clients.
///
/// Must be greater than [`CLIENT_KEEP_ALIVE_TIMEOUT_MS`] (1 minute).
///
/// # Traps
/// If `send_ack_interval_ms` <= [`CLIENT_KEEP_ALIVE_TIMEOUT_MS`]. See [WsInitParams::check_validity].
pub fn with_send_ack_interval_ms(mut self, send_ack_interval_ms: u64) -> Self {
self.send_ack_interval_ms = send_ack_interval_ms;
self
}

pub fn with_keep_alive_timeout_ms(mut self, keep_alive_timeout_ms: u64) -> Self {
self.keep_alive_timeout_ms = keep_alive_timeout_ms;
self.check_validity();
self
}
}
Expand All @@ -463,7 +461,6 @@ impl Default for WsInitParams {
handlers: WsHandlers::default(),
max_number_of_returned_messages: DEFAULT_MAX_NUMBER_OF_RETURNED_MESSAGES,
send_ack_interval_ms: DEFAULT_SEND_ACK_INTERVAL_MS,
keep_alive_timeout_ms: DEFAULT_CLIENT_KEEP_ALIVE_TIMEOUT_MS,
}
}
}
19 changes: 3 additions & 16 deletions src/test_canister/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ use ic_websocket_cdk::{
mod canister;

#[init]
fn init(
max_number_of_returned_messages: usize,
send_ack_interval_ms: u64,
keep_alive_timeout_ms: u64,
) {
fn init(max_number_of_returned_messages: usize, send_ack_interval_ms: u64) {
let handlers = WsHandlers {
on_open: Some(on_open),
on_message: Some(on_message),
Expand All @@ -26,23 +22,14 @@ fn init(
handlers,
max_number_of_returned_messages,
send_ack_interval_ms,
keep_alive_timeout_ms,
};

ic_websocket_cdk::init(params)
}

#[post_upgrade]
fn post_upgrade(
max_number_of_returned_messages: usize,
send_ack_interval_ms: u64,
keep_alive_timeout_ms: u64,
) {
init(
max_number_of_returned_messages,
send_ack_interval_ms,
keep_alive_timeout_ms,
);
fn post_upgrade(max_number_of_returned_messages: usize, send_ack_interval_ms: u64) {
init(max_number_of_returned_messages, send_ack_interval_ms);
}

// method called by the WS Gateway after receiving FirstMessage from the client
Expand Down
Loading
Loading